目录

MongoDB并发复制

如果没记错的话,支持WiredTiger存储引擎后,MongoDB实现了oplog并发回放。 之前凭借经验写过一个基于oplog并发回放的实时同步工具,一直想看看原生实现,以下分析参考v3.0.12源码。

核心函数是SyncTail::oplogApplication(),做了两件事:

  • 批量读取oplog
  • 并发回放
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
/* tail an oplog.  ok to return, will be re-called. */
void SyncTail::oplogApplication() {
    ReplicationCoordinator* replCoord = getGlobalReplicationCoordinator();

    while (!inShutdown()) {
        OpQueue ops;
        OperationContextImpl txn;

        Timer batchTimer;
        int lastTimeChecked = 0;

        do {
            int now = batchTimer.seconds();

            // apply replication batch limits
            if (!ops.empty()) {
                if (now > replBatchLimitSeconds)
                    break;
                if (ops.getDeque().size() > replBatchLimitOperations)
                    break;
            }   
            // occasionally check some things
            // (always checked in the first iteration of this do-while loop, because
            // ops is empty)
            if (ops.empty() || now > lastTimeChecked) {
                BackgroundSync* bgsync = BackgroundSync::get();
                if (bgsync->getInitialSyncRequestedFlag()) {
                    // got a resync command
                    return;
                }   
                lastTimeChecked = now;
                // can we become secondary?
                // we have to check this before calling mgr, as we must be a secondary to
                // become primary
                tryToGoLiveAsASecondary(&txn, replCoord);
            }   

            const int slaveDelaySecs = replCoord->getSlaveDelaySecs().total_seconds();
            if (!ops.empty() && slaveDelaySecs > 0) {
                const BSONObj& lastOp = ops.getDeque().back();
                const unsigned int opTimestampSecs = lastOp["ts"]._opTime().getSecs();

                // Stop the batch as the lastOp is too new to be applied. If we continue
                // on, we can get ops that are way ahead of the delay and this will
                // make this thread sleep longer when handleSlaveDelay is called
                // and apply ops much sooner than we like.
                if (opTimestampSecs > static_cast<unsigned int>(time(0) - slaveDelaySecs)) {
                    break;
                }
            }
            // keep fetching more ops as long as we haven't filled up a full batch yet
        } while (!tryPopAndWaitForMore(&txn, &ops, replCoord) &&  // tryPopAndWaitForMore returns
                                                                  // true when we need to end a
                                                                  // batch early
                 (ops.getSize() < replBatchLimitBytes) &&
                 !inShutdown());

        // For pausing replication in tests
        while (MONGO_FAIL_POINT(rsSyncApplyStop)) {
            sleepmillis(0);
        }

        if (ops.empty()) {
            continue;
        }

        const BSONObj& lastOp = ops.getDeque().back();
        handleSlaveDelay(lastOp);

        // Set minValid to the last op to be applied in this next batch.
        // This will cause this node to go into RECOVERING state
        // if we should crash and restart before updating the oplog
        OpTime minValid = lastOp["ts"]._opTime();
        setMinValid(&txn, minValid);
        multiApply(&txn, ops.getDeque());
    }
}

批量读取oplog

这里用了一个do…while循环,do附加一些硬性限制以终止读取,比如读取时间超过1s、oplog数量超过5000条;while是核心逻辑,除了oplog在体积上的硬性限制(32位系统是50MB,64位系统是100MB),还有对文档操作和命令/索引操作的区分。

核心函数是SyncTail::tryPopAndWaitForMore(),从oplog缓冲队列预读一条oplog,判断其类型,如果是文档操作,返回false,继续取下一条;如果是command或索引操作,返回true,终止do…while循环,就是说文档操作可以批量回放,而命令及索引操作必须单条阻塞回放,因为此类操作的锁粒度较大(数据库锁,非文档锁)。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
    // check for commands
    if ((op["op"].valuestrsafe()[0] == 'c') ||
        // Index builds are acheived through the use of an insert op, not a command op.
        // The following line is the same as what the insert code uses to detect an index build.
        (*ns != '\0' && nsToCollectionSubstring(ns) == "system.indexes")) {
        if (ops->empty()) {
            // apply commands one-at-a-time
            ops->push_back(op);
            _networkQueue->consume();
        }                     
        
        // otherwise, apply what we have so far and come back for the command
        return true;
    }

并发回放

SyncTail::multiApply()实现对文档操作的并发回放。

核心函数是SyncTail::fillWriterVectors(),对oplog分组。首先对oplog的namespace散列,如果存储引擎支持文档锁并且oplog类型是i/u/d,则进一步对ObjectID散列,这样保证对同一文档的操作在一个工作线程内有序回放。

SyncTail::applyOps()将各个oplog分组投递到工作线程,主进程等待回放完成,再继续取下一批oplog。

ps:所采用的散列算法是murmurhash3,特点是对于规律性较强的key,随机分布特性表现良好。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
void SyncTail::fillWriterVectors(OperationContext* txn,
                                 const std::deque<BSONObj>& ops,
                                 std::vector<std::vector<BSONObj>>* writerVectors) {
    const bool supportsDocLocking =
        getGlobalEnvironment()->getGlobalStorageEngine()->supportsDocLocking();

    Lock::GlobalRead globalReadLock(txn->lockState());
    CachingCappedChecker isCapped;

    for (std::deque<BSONObj>::const_iterator it = ops.begin(); it != ops.end(); ++it) {
        const BSONElement e = it->getField("ns");
        verify(e.type() == String);
        const char* ns = e.valuestr();
        int len = e.valuestrsize();
        uint32_t hash = 0;
        MurmurHash3_x86_32(ns, len, 0, &hash);

        const char* opType = it->getField("op").valuestrsafe();

        // For doc locking engines, include the _id of the document in the hash so we get
        // parallelism even if all writes are to a single collection. We can't do this for capped
        // collections because the order of inserts is a guaranteed property, unlike for normal
        // collections.
        if (supportsDocLocking && isCrudOpType(opType) && !isCapped(txn, ns)) {
            BSONElement id;
            switch (opType[0]) {
                case 'u':
                    id = it->getField("o2").Obj()["_id"];
                    break;
                case 'd':
                case 'i':
                    id = it->getField("o").Obj()["_id"];
                    break;
            }

            const size_t idHash = BSONElement::Hasher()(id);
            MurmurHash3_x86_32(&idHash, sizeof(idHash), hash, &hash);
        }

        (*writerVectors)[hash % writerVectors->size()].push_back(*it);
    }
}