2020-03-24 19:11:00
config.locks
集合中,结合
config.lockpings
中的信息来完成相关逻辑实现。
config.lockpings
和 config.locks
集合中存储的内容config.lockpings
集合跟踪记录分片集群中所有活跃的组件。
如果一个 mongos 运行在 example.com:30000 ,那么 config.lockpings
关于这个mongos的记录是这个样子的
{ "_id" : "example.com:30000:1350047994:16807", // 记录组件的标识, mongos 和 shard 节点均需要定期和 config 节点保持心跳 "ping" : ISODate("2020-07-12T18:32:54.892Z") // 组件定期与 `lockpings` 集合保持心跳,即更新 `ping` 字段 }
_id
字段内部称为processID
。processID
对于 config 节点固定为"ConfigServer";对于 mongos / shard 节点,则是以":"分隔的四段信息分别为:hostname / port / timestamp / 随机int64值。该4项信息在进程启动时即已决定,进程存活期间不会被修改。
config.locks
集合存储了分布式锁信息。
{ "_id" : "test.myShardedCollection", // 锁的名称,下文简称 lockName。对 database 或者 namespace 的部分场景操作需要获取分布式锁,所以一般 database 或者 namespace "state" : 2, // 锁的状态。0 表示 UNLOCKED,2 表示 LOCKED,1 表示 LOCK_PREP(仅对老版本3个config节点,目前代码中已无相关逻辑) "process" : "ConfigServer", // 即 processID。与上文讲到的 config.lockpings 集合中 _id 字段的取值是一样的 "ts" : ObjectId("5be0b9ede46e4f441a60d891"), // 锁ID,下文简称 lockID。每次尝试获取分布式锁时的 锁ID 都是独有的。 "when" : ISODate("2020-07-12T21:52:00.846Z"), // 获取锁的时间 "who" : "ConfigServer:Balancer", // 获取锁的角色。 以":"分隔,第一段与 process字段 相同, 第二段 进程获取锁的线程名称。 ConfigServer:Balancer 表示 config进程的Balancer线程 "why" : "Migrating chunk(s) in collection test.myShardedCollection" // 获取锁的原因 }
_id
下文简称lockName
,process
下文简称processID
,ts
下文简称lockID
config.lockpings
和 config.locks
的基本交互与上述2个集合的基础交互(
DistLockCatalog
提供了与上述2个集合的基础交互动作,而DistLockCatalogImpl
则是接口的具体实现)
与 config.lockpings
的交互有 3 种场景:
replSetDistLockPinger
线程每隔 30s 通过一个 upsert: true
的 findAndModify
请求更新ping字段update: {}
的 findAndModify
请求清理掉对应的document与 config.locks
交互便是我们关心的分布式锁交互过程:
grabLock
: 没有期望的 lockName
记录或者有期望的 lockName
记录且 state = 0(UNLOCKED)
,通过更新 state = 2(LOCKED)
, 同时更新processID
/lockID
/who
/when
/why
字段的方式获取锁overtakeLock
: 对于**期望 lockName
记录,其 state = 0(UNLOCKED)
或者 lockID = oldTS
**,通过更新 state = 2(LOCKED)
、更新 lockID = newTS
、更新processID
/who
/when
/why
字段的方式抢占锁主要是通过
ReplSetDistLockManager::lockWithSessionID()
来完成的
UNLOCKED
。所以首先通过 grabLock
的方式来获取锁grabLock
获取失败,则说明可能存在锁竞争的情况。则通过锁的名称来获取目前 config.locks
集合中记录的对应锁的信息overtakeLock
的方式抢占锁
ReplSetDistLockManager
内部维护一个 _pingHistory
用于协助判断 config.locks
中记录的锁是否超时。_pingHistory
中记录了 processID
/pingValue
/config节点的serverTime
/lockID
/config节点的electionId
。
config.lockpings
集合中找到对应组件的心跳记录,则认为锁未超时_pingHistory
中不存在对应 lockName
的记录,则认为锁未超时,并将相关结果记录到 _pingHistor
中_pingHistory
中记录相比,锁的持有者心跳正常(pingValue
字段持续更新) 或者 锁的lockID
发生变化 或者 config发生主节点变更(electionId
发生变化) 则认为锁未超时,同时将相关结果记录到 _pingHistory
便于下次对比_pingHistory
中记录的 config节点的serverTime
时间与当前时间超过 15min,则认为锁已经超时ts
为当前请求的ts
waitFor
)则返回LockBusy
借助简要代码看下:
入参中:
name
为锁的名称(即_id
字段),lockSessionID
为获取锁唯一的锁ID(即ts
字段),waitFor
为预期等待锁的时间。
StatusWith<DistLockHandle> ReplSetDistLockManager::lockWithSessionID(OperationContext* opCtx, StringData name, StringData whyMessage, const OID& lockSessionID, Milliseconds waitFor) { ... // Distributed lock acquisition works by tring to update the state of the lock to 'taken'. If // the lock is currently taken, we will back off and try the acquisition again, repeating this // until the lockTryInterval has been reached. If a network error occurs at each lock // acquisition attempt, the lock acquisition will be retried immediately. while (waitFor <= Milliseconds::zero() || Milliseconds(timer.millis()) < waitFor) { ... auto lockResult = _catalog->grabLock( opCtx, name, lockSessionID, who, _processID, Date_t::now(), whyMessage.toString()); auto status = lockResult.getStatus(); if (status.isOK()) { ... return lockSessionID; } // Get info from current lock and check if we can overtake it. auto getLockStatusResult = _catalog->getLockByName(opCtx, name); const auto& getLockStatus = getLockStatusResult.getStatus(); ... // Note: Only attempt to overtake locks that actually exists. If lock was not // found, use the normal grab lock path to acquire it. if (getLockStatusResult.isOK()) { auto currentLock = getLockStatusResult.getValue(); auto isLockExpiredResult = isLockExpired(opCtx, currentLock, lockExpiration); if (isLockExpiredResult.getValue() || (lockSessionID == currentLock.getLockID())) { auto overtakeResult = _catalog->overtakeLock(xxx); ... } } ... if (waitFor == Milliseconds::zero()) { break; } const Milliseconds timeRemaining = std::max(Milliseconds::zero(), waitFor - Milliseconds(timer.millis())); sleepFor(std::min(kLockRetryInterval, timeRemaining)); } return {ErrorCodes::LockBusy, str::stream() << "timed out waiting for " << name}; }
collectin操作:createCollection
/ dropCollection
/ shardCollection
会同时对 collection 的 namespace 和 database 加锁
database操作:movePrimary
/ enableSharding
/ createDatabase
/ dropDatabase
时会对 database 加锁,dropDatabase
还会依次对 db 下所有的 collection 加锁(dropCollection
)
chunk操作:Migrating chunk(s) in collection / merging chunks / splitting chunk
map-reduce操作:mr-post-process
DistLockManager::ScopedDistLock
的对象,并在锁使用完成后触发该对象的析构函数,释放锁(修改state=UNLOCKED)。grabLock
时,如果获取失败返回异常是由于 config节点 状态异常导致,那么也会进行 unlock
方便下次可以直接 grabLock
完成加锁。_unlockList
队列,在 replSetDistLockPinger
定期执行时也会重新进行 unlock 操作DistLockCatalogImpl : DistLockCatalog
: 对分布式锁的一些具体操作/** * Interface for the distributed lock operations. */ class DistLockCatalog {}
其中grabLock
和 overtakeLock
是两个核心的获取锁的方法:
grabLock
: 将lockID
的锁更新为指定的lockSessionID
overtakeLock
: 强制将锁的持有者从currentHolderTS
更改为lockSessionID
ReplSetDistLockManager : DistLockManager
: 分布式锁的一些接口,主要封装DistLockCatalogImpl而实现/** * Interface for handling distributed locks. * * Usage: * * auto scopedDistLock = mgr->lock(...); * * if (!scopedDistLock.isOK()) { * // Did not get lock. scopedLockStatus destructor will not call unlock. * } * * // To check if lock is still owned: * auto status = scopedDistLock.getValue().checkStatus(); * * if (!status.isOK()) { * // Someone took over the lock! Unlock will still be called at destructor, but will * // practically be a no-op since it doesn't own the lock anymore. * } */ class DistLockManager {}
replSetDistLockPinger
,用户定时与config.lockpings心跳,并对需要unlock的锁进行unlockmongos初始化时会生成一个与host、port、时间戳、随机值有关的一个distLockProcessId
作为ReplSetDistLockManager
的唯一标识,并在makeCatalogClient中完成对DistLockCatalogImpl、ReplSetDistLockManager、ShardingCatalogClientImpl的初始化
DistLockCatalog
的具体实现。默认初始化方法存储了config.locks
,config.lockpings
表名DistLockManager
的具体实现。初始化方法存储了上面提到的distLockProcessId
, DistLockCatalog
并完成了pingInterval
, lockExpiration
的初始化,其中pingInterval
默认为30s,lockExpiration
默认为15minShardingCatalogClient
的具体实现。初始化方法存储了上面提到的DistLockManager
。(该类只提供了一个获取DistLockManager的方式及start、shutdown的方法,与DistLockManager无其他关系)然后将ShardingCatalogClientImpl作为一个数据成员存储在全局的Grid中
grid初始化完成后,紧接着会调用grid->catalogClient()->startup();
,该语句实际上最终调用到ReplSetDistLockManager::startUp()
,启动一个replSetDistLockPinger线程,线程的具体执行在ReplSetDistLockManager::doTask()
中
_unlockList
,对需要unlock的锁调用DistLockCatalog::unlock()。如果返回失败则打印warning日志并重新加入_unlockList中whyMessage | _id | function | file |
---|---|---|---|
“createCollection” | database | lock | src/mongo/db/s/config/configsvr_create_collection_command.cpp |
“createCollection” | namespace | lock | src/mongo/db/s/config/configsvr_create_collection_command.cpp |
“dropCollection” | database | lock | src/mongo/db/s/config/configsvr_drop_collection_command.cpp |
“dropCollection” | namespace | lock | src/mongo/db/s/config/configsvr_drop_collection_command.cpp |
“shardCollection” | database | lock | src/mongo/db/s/config/configsvr_shard_collection_command.cpp |
“shardCollection” | namespace | lock | src/mongo/db/s/config/configsvr_shard_collection_command.cpp |
whyMessage | _id | function | file |
---|---|---|---|
“movePrimary” | database | lock | src/mongo/db/s/config/configsvr_move_primary_command.cpp |
“enableSharding” | database | lock | src/mongo/db/s/config/configsvr_enable_sharding_command.cpp |
“createDatabase” | database | lock | src/mongo/db/s/config/configsvr_create_database_command.cpp |
“dropDatabase” | database | lock | src/mongo/db/s/config/configsvr_drop_database_command.cpp |
“dropCollection” | namespace | lock | src/mongo/db/s/config/configsvr_drop_database_command.cpp |
whyMessage | _id | function | file |
---|---|---|---|
"splitting chunk " << chunkRange.toString() << " in " << nss.toString() | namespace | lock | src/mongo/db/s/split_chunk.cpp |
"merging chunks in " << nss.ns() << " from " << minKey << " to " << maxKey | namespace | lock | src/mongo/db/s/merge_chunks_command.cpp |
"Migrating chunk(s) in collection " << migrateType.getNss().ns()) | namespace | tryLockWithLocalWriteConcern | src/mongo/db/s/balancer/migration_manager.cpp |
"Migrating chunk(s) in collection " << nss.ns() | namespace | lockWithSessionID | src/mongo/db/s/balancer/migration_manager.cpp |
whyMessage | _id | function | file |
---|---|---|---|
“mr-post-process” | namespace | lock | src/mongo/s/commands/cluster_map_reduce_cmd.cpp |