关于TiKV主要的研究点在KV的存储,TiDB将SQL转成了KV数据对,TiKV就是将KV数据进行存储并提供查询,对于多节点中每节点的数据一致性和Percolator事务prewrite/commit等就是主要要解决的事情,从而实现稳定的分布式存储。
peerfsm处理日志消息的raft状态变更,applyfsm对日志进行落盘,这两者都是围绕raftstore与日志展开,这里涉及到的Engine是RaftEngine,关于实际数据写入RocksDB数据库的情况之后记录。
从batch.rs中看状态机的调度方法:所有Fsm之间的关联使用的是状态机的驱动,Poll在等待handle_raft_ready处理完之后会通过一个状态机的驱动结构reschedule_fsms(fsm_cnt,ReschedulePolicy)(Reschedule来记录一个状态及接下来的调度策略是移除、释放还是重新调度,重新调度会传入当前fsm在数组中的下标,重新将该fsm加入到调度中)。
用栈的弹入弹出操作来模拟一个驱动流程:
1.每次循环进入时会调用fetch_fsm方法,尝试去从fsm_receiver中获取当前传入的fsm,加入到batch中
2.使用该fsm的handle_normal/handle_control函数对该fsm进行处理
3.处理完后弹出上一个batch中的normal驱动机,出栈再入栈或者直接入栈加入新的下一个驱动机进行ReschedulePolicy::Schedule
4.Schedule所做的就是调用batch.reschedule(&self.router,r)对新加入的fsm进行schedule->schedule中使用send方法发送至下一个fsm的路径位置。
store.rs和apply.rs中实现了batch.rs中定义的trait方法,fsm加入后就可以调度store.rs和apply.rs中的具现化方法来执行每个fsm对应的操作。
从store.rs中的BatchSystem入手,其中有一个HandlerBuilder,里面含有一个Poller,使用PollHandler实现,PollHandler由Apply.rs和Peer.rs一人提供一个,可以通过Batch中的Poller获取ApplyFsm和PeerFsm的信息从而poll相应的Fsm进行下一步操作。
BatchSystem(HandlerBuilder)->spawn->start_poller->poller.poll()->(PollHandler).handle_control/handle_normal
HandlerBuilder结构体中的handler为PollHandler,HandlerBuilder的build()就是为了得到其中的PollHandler,根据PollHandler又可以构建出Poller结构体,从而可以调用Poller中的Poll方法,PollHandler中存在apply.rs(ApplyPoller)和store.rs(RaftPoller)中,提供了Handle_Control/Handle_Normal主要函数,会在下面分析。在Poll中可以不断的去调用handle函数处理状态机中的各种事件。
PollHandler中提供了handle_normal和handle_control,方便BatchSystem调用对应的FSM。
接下来分别从整体框架、peer的handle_normal和apply的handle_normal开始分析:
主要考虑如下交互,交互形式以channel的tx/rx作为消息通道,消息中使用cb进行消息传递,也可以使用rx.recv/tx.send进行。
cb对结果执行下一步操作与tx.send发送结果回peer需要区分开,cb完全可以执行tx.send发送结果回peer的操作,还可以包括失败数据的gc,还可以处理需要多步操作才能完成的请求等等。在apply代码中会先执行cb再执行tx.send向peer发送响应,因此某些情况可以自己定制cb用于向peer返回特定的结果,不仅限于Snapshot的结果(读请求)或Ok的结果(写请求)。
消息中会带上callback,用于记录一条消息完成后需要自己定义的一些操作:
peerfsm发送给applyfsm的ApplyTask::Apply中会有一个proposal,
proposal中记录了callback,apply接收后proposal存在pending_cmds中,proposal中的callback以后暂时存在applied_batch中
//ApplyDelegate主要记录的是对apply处理的各种方法,ApplyContext主要记录的是apply中会用到的各种数据 //apply中的pending_cmds(在结构体ApplyDelegate中)记录proposal /// Handles proposals, and appends the commands to the apply delegate. fn append_proposal(&mut self, props_drainer: Drain<Proposal<EK::Snapshot>>) { let (region_id, peer_id) = (self.delegate.region_id(), self.delegate.id()); ... for p in props_drainer { let cmd = PendingCmd::new(p.index, p.term, p.cb); if p.is_conf_change { if let Some(cmd) = self.delegate.pending_cmds.take_conf_change() { // if it loses leadership before conf change is replicated, there may be // a stale pending conf change before next conf change is applied. If it // becomes leader again with the stale pending conf change, will enter // this block, so we notify leadership may have been changed. notify_stale_command(region_id, peer_id, self.delegate.term, cmd); } self.delegate.pending_cmds.set_conf_change(cmd); } else { self.delegate.pending_cmds.append_normal(cmd); } } } //apply中applied_batch记录(在ApplyContext结构体中) fn handle_raft_entry_normal<W: WriteBatch<EK>>( &mut self, apply_ctx: &mut ApplyContext<EK, W>, entry: &Entry, ) -> ApplyResult<EK::Snapshot> { ... while let Some(mut cmd) = self.pending_cmds.pop_normal(std::u64::MAX, term - 1) { if let Some(cb) = cmd.cb.take() { apply_ctx .applied_batch .push_cb(cb, cmd_resp::err_resp(Error::StaleCommand, term)); } } }
写入数据库以后实际调用proposal中的回调函数,一般可能是记录了这个结果对应的下一步操作的,对于某个需要两步请求发送才能完成的操作,在第一个实现后会紧接着执行第二步;当然也可以为tx.send(Snapshot)/tx.send(Ok{})向peer发送响应结果;还可以对发送失败的消息做一些gc处理操作。在回调函数完成后会有finish_for收尾将实际的结果发送至peer中(为tx.send(Snapshot)/tx.send(Ok{}))。因此cb所做的事可以包含tx.send。
/// Writes all the changes into RocksDB. /// If it returns true, all pending writes are persisted in engines. pub fn write_to_db(&mut self) -> bool { let need_sync = self.sync_log_hint; //写RocksDB ... // Take the applied commands and their callback let ApplyCallbackBatch { cmd_batch, batch_max_level, mut cb_batch, } = mem::replace(&mut self.applied_batch, ApplyCallbackBatch::new()); // Call it before invoking callback for preventing Commit is executed before Prewrite is observed. self.host .on_flush_applied_cmd_batch(batch_max_level, cmd_batch, &self.engine); // Invoke callbacks let now = Instant::now(); for (cb, resp) in cb_batch.drain(..) { if let Some(times) = cb.get_request_times() { for t in times { self.apply_time .observe(duration_to_sec(now.saturating_duration_since(*t))); } } cb.invoke_with_response(resp); } self.apply_time.flush(); self.apply_wait.flush(); need_sync }
在peer中会有apply_router.schedule_task,该函数定义在apply中,用于对apply发送来自peer的消息,未来会在apply中进行日志等数据的落盘。
//peer中 //activate函数中: ctx.apply_router .schedule_task(self.region_id, ApplyTask::register(self)); //handle_raft_ready_append中 //在ready前尽量从applyfsm中获取较新的Shapshot,在发送ApplyTask::apply(apply)消息后也需要获取一次Snapshot,避免保留旧快照导致应用日志等数据时存在问题。 ctx.apply_router .schedule_task(self.region_id, ApplyTask::Snapshot(gen_task)); //handle_raft_committed_entries中 //发送apply是里面是记录了vec<proposal<S>的,propose里可以定义callback, //在handle_raft_committed_entries中当在raftstore中得到ready以后,才会从ready中取出committed_entries,去到applyfsm中将日志进行落盘操作。 ctx.apply_router .schedule_task(self.region_id, ApplyTask::apply(apply)); //在fsm/peer中还存在等从apply中获取一些帮助peer进行状态转换的消息如下: //on_merge_result self.ctx .apply_router .schedule_task(job.region_id, ApplyTask::destroy(job.region_id, false)); //on_catch_up_logs_for_merge self.ctx.apply_router.schedule_task(self.fsm.region_id(), ApplyTask::LogsUpToDate(self.fsm.peer.catch_up_logs.take().unwrap()), ); //on_capture_change //注意这里发送一个propose方法,cb是会在propose_raft_command-> self.fsm.peer.propose(self.ctx, cb, msg, resp, diskfullopt)->self.raft_group.propose(ctx.to_vec(), data)中延迟调用,可以看到这里在调用cb的节点时还会发送ApplyTask::Change的请求。 self.propose_raft_command( msg, Callback::Read(Box::new(move |resp| { // Return the error if resp.response.get_header().has_error() { cb.invoke_read(resp); return; } apply_router.schedule_task( region_id, ApplyTask::Change { cmd, region_epoch, cb, }, ) })),
在handle_normal中,会先接收发往apply的消息,之后根据得到的情况将msg_buf传给normal.handle_tasks,normal是applyfsm的一个实现,applyfsm本身自带一个Receiver:
//apply中的接收 while self.msg_buf.len() < self.messages_per_tick { match normal.receiver.try_recv() { Ok(msg) => self.msg_buf.push(msg), Err(TryRecvError::Empty) => { expected_msg_count = Some(0); break; } Err(TryRecvError::Disconnected) => { normal.delegate.stopped = true; expected_msg_count = Some(0); break; } } } normal.handle_tasks(&mut self.apply_ctx, &mut self.msg_buf);
在handle_tasks中第一步会接收到Msg::Registration(reg)消息,收到消息后会设置好此次注册传输消息的tx和rx。
调用handle_normal->handle_tasks->handle_registration(reg)->from_registration,此函数中会设置fsmdelegate的tx和rx,之后的消息传递就从这里的tx通过schedule_task传至apply,得到了一个delegate之后,我们就可以调用delegate相关的方法对peer传来的命令进行处理。
//apply中的注册 impl<EK> ApplyFsm<EK> where EK: KvEngine, { /// Handles peer registration. When a peer is created, it will register an apply delegate. fn handle_registration(&mut self, reg: Registration) { ... self.delegate.term = reg.term; self.delegate.clear_all_commands_as_stale(); self.delegate = ApplyDelegate::from_registration(reg); } fn from_registration(reg: Registration) -> (LooseBoundedSender<Msg<EK>>, Box<ApplyFsm<EK>>) { let (tx, rx) = loose_bounded(usize::MAX); let delegate = ApplyDelegate::from_registration(reg); ( tx, Box::new(ApplyFsm { delegate, receiver: rx, mailbox: None, }), ) }
apply处理完后会得到一个apply_res数组(记录在ApplyContext),该数组不为空时就会通过flush操作将未处理的写操作数据落盘将返回结果发送PeerMsg给到peerfsm,这个感觉是作为callback的一个加强操作,如果callback没有定义好,则默认一定会在某些需要发送结果的操作中必须发送返回结果。
//apply中发送回复消息至peer,一般在handle_snapshot/end等操作时调用flush pub fn flush(&mut self) -> bool { ... // Write to engine // raftstore.sync-log = true means we need prevent data loss when power failure. // take raft log gc for example, we write kv WAL first, then write raft WAL, // if power failure happen, raft WAL may synced to disk, but kv WAL may not. let is_synced = self.write_to_db(); if !self.apply_res.is_empty() { let apply_res = mem::take(&mut self.apply_res); self.notifier.notify(apply_res); } } //关于notify的可能的实现: pub struct TestNotifier<EK: KvEngine> { tx: Sender<PeerMsg<EK>>, } impl<EK: KvEngine> Notifier<EK> for TestNotifier<EK> { fn notify(&self, apply_res: Vec<ApplyRes<EK::Snapshot>>) { for r in apply_res { let res = TaskRes::Apply(r); let _ = self.tx.send(PeerMsg::ApplyRes { res }); } } }
当peerfsm收到来自apply的applyres之后会进行一些收尾工作,对于上面提到的ApplyRes:Apply,会用post_apply后续更新 apply_state/applied_index_term/metrics
//peer中 fn on_apply_res(&mut self, res: ApplyTaskRes<EK::Snapshot>) { fail_point!("on_apply_res", |_| {}); match res { ApplyTaskRes::Apply(mut res) => { debug!( "async apply finish"; "region_id" => self.region_id(), "peer_id" => self.fsm.peer_id(), "res" => ?res, ); self.on_ready_result(&mut res.exec_res, &res.metrics); if self.fsm.stopped { return; } self.fsm.has_ready |= self.fsm.peer.post_apply( self.ctx, res.apply_state, res.applied_index_term, &res.metrics, ); // After applying, several metrics are updated, report it to pd to // get fair schedule. if self.fsm.peer.is_leader() { self.register_pd_heartbeat_tick(); self.register_split_region_check_tick(); } } ApplyTaskRes::Destroy { region_id, peer_id, merge_from_snapshot, } => { assert_eq!(peer_id, self.fsm.peer.peer_id()); if !merge_from_snapshot { self.destroy_peer(false); } else { // Wait for its target peer to apply snapshot and then send `MergeResult` back // to destroy itself let mut meta = self.ctx.store_meta.lock().unwrap(); // The `need_atomic` flag must be true assert!(*meta.destroyed_region_for_snap.get(®ion_id).unwrap()); let target_region_id = *meta.targets_map.get(®ion_id).unwrap(); let is_ready = meta .atomic_snap_regions .get_mut(&target_region_id) .unwrap() .get_mut(®ion_id) .unwrap(); *is_ready = true; } } } }
handle_normal(store.rs中)=>(PeerFsmDelegate)handle_msgs->处理各种PeerMsg,对于RaftCommand构建self.fsm.batch_req_builder后(add,build)调用propose_raft_command->self.fsm.peer.propose(raft_group RawNode<PeerStorage<EK,ER>>)->propose_normal->raft_group propose
propose是从fsm.batch_req_builder.build(&mut self.ctx_raft_metrics)中取得cmd,再通过self.propose_raft_command(cmd.request,cmd.callback,DiskFullOpt::NotAllowedOnFull)会设置好proposal封装进apply中。在peer中的发送proposal至apply进行落盘主要是peer调用apply_router.schedule_task(region_id,msg)函数,
let mut apply = Apply::new( self.peer_id(), self.region_id, self.term(), committed_entries, cbs,//Vec<Proposal<S>>, ); apply.on_schedule(&ctx.raft_metrics); ... ctx.apply_router .schedule_task(self.region_id, ApplyTask::apply(apply));
propose之后在peer中会有raftstore状态转换得到ready(大多节点都认可了该消息,已得到保障可以进行落盘),ready在collect_ready中进行处理。
ready结构是一个self.raft_group.ready(),在ready中是会存在参数peer_id的(还有snapshot/hs/entries/msgs等) peer
会记录在ctx.store_meta.readers中,在handle_raft_ready_append中产生了一个ready并加入ready_res数组,之后调用post_raft_ready_append->send->send_raft_message,将msgs传递到对应的apply。
handle_normal(store.rs中)=>collect_ready->
1.对于raft消息是通过on_raft_message得到的,peer在收到raft消息之后会调用Raft::step,最终成功后走committed_entries去到applyfsm。ready是handle_raft_ready_append中raft_group ready得来的,会反应在Committed_entries中->handle_raft_committed_entries->ctx.apply_router.schedule_task(self.region_id,ApplyTask::apply(apply))想applyfsm发送apply请求(apply中记录了committed_entries)。
所以说raft与apply之间的关联就在于ready与committed_entries。
2.self.fsm.peer.handle_raft_ready_append(self.ctx)得到ready后self.ctx.ready_res.push将ready结果push进去
3.调用store里的handle_raft_ready对ready_res中的结果进行写入,kv的写入kvEngine,raft的写入raftEngine调用post_raft_ready_append,做一些对ready的善后操作(在handle_raft_ready_append中会处理PollContext会调用handle_raft_ready,用于更新各种数据至kv/raft中后将InvokeContext结果返回给handle_raft_ready_append),然后调用当前fsm handler中的end()方法对当前fsm进行收尾。
对raft数据做完ready处理后就以cpmmitted_entries的形式进入到applyfsm部分进行apply操作。
5.在PollContext中有router(RaftRouter)/apply_router(ApplyRouter),对于apply_router主要调用的是schedule_task方法。
ctx.apply_router .schedule_task(self.region_id, ApplyTask::Snapshot(gen_task)); //将消息发送给ApplyFsm self.send(ctx, msgs); //将数据发送给其他Store
Apply的Builder中有RaftPollerBuilder和ApplyRouter,RaftPollerBuilder中有engines和store。
Apply的HandlerBuilder就是基于Builder,里面封装了ApplyPoller->ApplyContext->ApplyDelegate->WriteBatch的关于写数据库put/delete方法以及处理一些命令的代码。
handle_normal->handle_tasks->handle_apply/resume_pending->
1.append_proposal->append_normal(将命令push back至pending_cmds)
2.handle_raft_committed_entries->
2.1handle_raft_entry_normal->commit->commit_opt->write_to_db->write_opt并通过callback的invoke_with_response(resp)处理返回ReadResponse或WriteResponse的结果。
2.2handle_raft_entry_normal->process_raft_cmd->
2.2.1apply_raft_cmd->exec_raft_cmd->exec_write_cmd/exec_admin_cmd->WriteBatch的handle_put/handle_delete->wb.put()
2.2.2find_pending(将pending_cmds里的命令取出)得到命令对应的callback
2.2.3将找出的callback push至apply_ctx.applied_batch,等到apply_raft_cmd中返回的执行结果
3.handle_raft_committed_entries最终得到的结果就是raft的运行结果,使用这个结果对apply_ctx做一些finish_for收尾操作,将得到的results放入apply_ctx的apply_res数组。返回到最开始的handle_normal也就结束了。
此时在flush函数中有如下操作,会调用notifier.notify
if !self.apply_res.is_empty() { let apply_res = mem::take(&mut self.apply_res); self.notifier.notify(apply_res); } impl<EK: KvEngine> Notifier<EK> for TestNotifier<EK> { fn notify(&self, apply_res: Vec<ApplyRes<EK::Snapshot>>) { for r in apply_res { let res = TaskRes::Apply(r); let _ = self.tx.send(PeerMsg::ApplyRes { res }); } } fn notify_one(&self, _: u64, msg: PeerMsg<EK>) { let _ = self.tx.send(msg); } fn clone_box(&self) -> Box<dyn Notifier<EK>> { Box::new(self.clone()) } }
当schedule实际完成处理以后,需要返回结果的就返回一个peermsg,不需要的就将不返回,所以apply中的handle_normal主要是对apply_ctx进行处理,当最终在exec_raft_cmd中对命令实际完成后就一路向上返回最终的处理结果。
fn batch_messages<E>(router: &ApplyRouter<E>, region_id: u64, msgs: Vec<Msg<E>>) where E: KvEngine, { let (notify1, wait1) = mpsc::channel(); let (notify2, wait2) = mpsc::channel(); router.schedule_task( region_id, Msg::Validate( region_id, Box::new(move |_| { notify1.send(()).unwrap(); wait2.recv().unwrap(); }), ), ); wait1.recv().unwrap(); for msg in msgs { router.schedule_task(region_id, msg); } notify2.send(()).unwrap(); } ... //此处可以看到proposal中带上了callback,对于得到返回结果会有怎样的处理,在这里当得到write结果时,需要想resp_tx发送结果 let (resp_tx, resp_rx) = mpsc::channel(); let p = proposal( false, 1, 0, Callback::write(Box::new(move |resp: WriteResponse| { resp_tx.send(resp.response).unwrap(); })), ); router.schedule_task( 1, Msg::apply(apply(1, 1, 0, vec![new_entry(0, 1, true)], vec![p])),//此处写resp_rx ); // unregistered region should be ignored and notify failed. let resp = resp_rx.recv_timeout(Duration::from_secs(3)).unwrap(); ... //此处可以看到一个主要的操作是将peer需要的信息通过schedule_task下发给到applyfsm,之后peer可以从rx中获取返回applyres(snapshot)即可。 let (snap_tx, _) = mpsc::sync_channel(0); batch_messages( &router, 2, vec![ Msg::apply(apply(1, 2, 11, vec![new_entry(5, 5, false)], vec![])), Msg::Snapshot(GenSnapTask::new_for_test(2, snap_tx)), ], ); let apply_res = match rx.recv_timeout(Duration::from_secs(3)) { Ok(PeerMsg::ApplyRes { res: TaskRes::Apply(res), .. }) => res, e => panic!("unexpected apply result: {:?}", e), };
关于RaftRouter和Transport的关系:Transport是发给Store的,RaftRouter是发给Region的,参数为Router.force_send(region_id,msg),
Peer中用到了router中的地方:send_raft_commad/handle_raft_commited_entries/handle_raft_ready_append/activate
一个Region可以通过范围拆分分布在多个Store上,也就是所谓的MultiRaft。
使用Transport时可以直接使用Transport内RaftClient中的send方法
ctx.trans.send(SendMsg)/
RaftMessage::default/
handle_raft_ready_append: eraftpb::Message/
post_raft_ready_append: ready.take_messages() (raft_group.ready() raft_group是RawNode::New)
handle_raft_ready_advance: light_rd.take_messages()(raft_group.advance_append(ready))。
send_extra_message/send_raft_message/perpare_raft_message() (Raft_Message::Default)
一些消息参数:
PeerMsg::RaftCommand/
PeerMsg::RaftMessage/
PeerMsg::ApplyRes/
PeerMsg::Start/
PeerMsg::CasualMsg/
PeerMsg::replicate/
ControlMsg::LatencyInspect/
StoreMsg::RaftMessage/
StoreMsg::Tick/
StoreMsg::Start/