如果没记错的话,支持WiredTiger存储引擎后,MongoDB实现了oplog并发回放。
之前凭借经验写过一个基于oplog并发回放的实时同步工具,一直想看看原生实现,以下分析参考v3.0.12源码。
核心函数是SyncTail::oplogApplication(),做了两件事:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
|
/* tail an oplog. ok to return, will be re-called. */
void SyncTail::oplogApplication() {
ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();
while (!inShutdown()) {
OpQueue ops;
OperationContextImpl txn;
Timer batchTimer;
int lastTimeChecked = 0;
do {
int now = batchTimer.seconds();
// apply replication batch limits
if (!ops.empty()) {
if (now > replBatchLimitSeconds)
break;
if (ops.getDeque().size() > replBatchLimitOperations)
break;
}
// occasionally check some things
// (always checked in the first iteration of this do-while loop, because
// ops is empty)
if (ops.empty() || now > lastTimeChecked) {
BackgroundSync* bgsync = BackgroundSync::get();
if (bgsync->getInitialSyncRequestedFlag()) {
// got a resync command
return;
}
lastTimeChecked = now;
// can we become secondary?
// we have to check this before calling mgr, as we must be a secondary to
// become primary
tryToGoLiveAsASecondary(&txn, replCoord);
}
const int slaveDelaySecs = replCoord->getSlaveDelaySecs().total_seconds();
if (!ops.empty() && slaveDelaySecs > 0) {
const BSONObj& lastOp = ops.getDeque().back();
const unsigned int opTimestampSecs = lastOp["ts"]._opTime().getSecs();
// Stop the batch as the lastOp is too new to be applied. If we continue
// on, we can get ops that are way ahead of the delay and this will
// make this thread sleep longer when handleSlaveDelay is called
// and apply ops much sooner than we like.
if (opTimestampSecs > static_cast<unsigned int>(time(0) - slaveDelaySecs)) {
break;
}
}
// keep fetching more ops as long as we haven't filled up a full batch yet
} while (!tryPopAndWaitForMore(&txn, &ops, replCoord) && // tryPopAndWaitForMore returns
// true when we need to end a
// batch early
(ops.getSize() < replBatchLimitBytes) &&
!inShutdown());
// For pausing replication in tests
while (MONGO_FAIL_POINT(rsSyncApplyStop)) {
sleepmillis(0);
}
if (ops.empty()) {
continue;
}
const BSONObj& lastOp = ops.getDeque().back();
handleSlaveDelay(lastOp);
// Set minValid to the last op to be applied in this next batch.
// This will cause this node to go into RECOVERING state
// if we should crash and restart before updating the oplog
OpTime minValid = lastOp["ts"]._opTime();
setMinValid(&txn, minValid);
multiApply(&txn, ops.getDeque());
}
}
|
批量读取oplog
这里用了一个do…while循环,do附加一些硬性限制以终止读取,比如读取时间超过1s、oplog数量超过5000条;while是核心逻辑,除了oplog在体积上的硬性限制(32位系统是50MB,64位系统是100MB),还有对文档操作和命令/索引操作的区分。
核心函数是SyncTail::tryPopAndWaitForMore(),从oplog缓冲队列预读一条oplog,判断其类型,如果是文档操作,返回false,继续取下一条;如果是command或索引操作,返回true,终止do…while循环,就是说文档操作可以批量回放,而命令及索引操作必须单条阻塞回放,因为此类操作的锁粒度较大(数据库锁,非文档锁)。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
// check for commands
if ((op["op"].valuestrsafe()[0] == 'c') ||
// Index builds are acheived through the use of an insert op, not a command op.
// The following line is the same as what the insert code uses to detect an index build.
(*ns != '\0' && nsToCollectionSubstring(ns) == "system.indexes")) {
if (ops->empty()) {
// apply commands one-at-a-time
ops->push_back(op);
_networkQueue->consume();
}
// otherwise, apply what we have so far and come back for the command
return true;
}
|
并发回放
SyncTail::multiApply()实现对文档操作的并发回放。
核心函数是SyncTail::fillWriterVectors(),对oplog分组。首先对oplog的namespace散列,如果存储引擎支持文档锁并且oplog类型是i/u/d,则进一步对ObjectID散列,这样保证对同一文档的操作在一个工作线程内有序回放。
SyncTail::applyOps()将各个oplog分组投递到工作线程,主进程等待回放完成,再继续取下一批oplog。
ps:所采用的散列算法是murmurhash3,特点是对于规律性较强的key,随机分布特性表现良好。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
void SyncTail::fillWriterVectors(OperationContext* txn,
const std::deque<BSONObj>& ops,
std::vector<std::vector<BSONObj>>* writerVectors) {
const bool supportsDocLocking =
getGlobalEnvironment()->getGlobalStorageEngine()->supportsDocLocking();
Lock::GlobalRead globalReadLock(txn->lockState());
CachingCappedChecker isCapped;
for (std::deque<BSONObj>::const_iterator it = ops.begin(); it != ops.end(); ++it) {
const BSONElement e = it->getField("ns");
verify(e.type() == String);
const char* ns = e.valuestr();
int len = e.valuestrsize();
uint32_t hash = 0;
MurmurHash3_x86_32(ns, len, 0, &hash);
const char* opType = it->getField("op").valuestrsafe();
// For doc locking engines, include the _id of the document in the hash so we get
// parallelism even if all writes are to a single collection. We can't do this for capped
// collections because the order of inserts is a guaranteed property, unlike for normal
// collections.
if (supportsDocLocking && isCrudOpType(opType) && !isCapped(txn, ns)) {
BSONElement id;
switch (opType[0]) {
case 'u':
id = it->getField("o2").Obj()["_id"];
break;
case 'd':
case 'i':
id = it->getField("o").Obj()["_id"];
break;
}
const size_t idHash = BSONElement::Hasher()(id);
MurmurHash3_x86_32(&idHash, sizeof(idHash), hash, &hash);
}
(*writerVectors)[hash % writerVectors->size()].push_back(*it);
}
}
|