6.824 课程的 Lab2 是实现 Raft,分为 2A、2B、2C 三部分。 2A 实现 leader 选举与心跳;2B 实现日志复制;2C 实现持久化。
2A
2A 部分比较简单,当时的实现相当粗暴:
采用互斥锁的并发范式,Node 持有Arc<Mutex<Raft>>
。
Raft 线程执行一个无尽的循环,每次循环都睡 200ms。
如果该节点为 leader,则给其他节点发送心跳;
否则检查是否选举超时,若已超时,则从 follower 成为 candidate 并发起选举。
/// Create a new raft service.
pub fn new(raft: Raft) -> Node {
let raft = Arc::new(Mutex::new(raft));
let rf = raft.clone();
std::thread::spawn(move || loop {
std::thread::sleep(Duration::from_millis(200));
if rf.lock().unwrap().state.is_leader() {
let raft = rf.lock().unwrap();
Self::send_heartbeat(raft);
} else {
let millis = rand::thread_rng().gen_range(1000, 4000);
let election_timeout = Duration::from_millis(millis);
if rf.lock().unwrap().last_heartbeat.elapsed() > election_timeout {
Self::start_election(rf.clone());
}
}
});
Node { raft }
}
现在看来,这个实现有 2 个明显的问题:
- Raft 线程长期出于休眠状态,能做的事有限,不方便后续增加新的逻辑
- 发起选举的时机不对:比如,选举超时是 410ms,该 Node 并不会在 410ms 后发起选举,而是在 600ms 时发起。 因为 400ms 时检查尚未超时,下次检查则是 200ms 后。这就大大增加了多个节点同时发起选举的概率。
针对第一个问题,改进方案是用 futures_timer
中的 Interval
代替 thread::sleep()
,
Interval
实现了 Stream
,后续通过 select(event_receiver)
(mailbox),采用 Actor 模型,可以方便地增加其他逻辑。
第一个问题解决后,第二个问题则迎刃而解:随机出一个选举超时时间,通过 Delay
准时将发起选举的事件发到 Raft 线程的 mailbox
。
初步改进:
std::thread::spawn(move || {
Interval::new(Duration::from_millis(200))
.for_each(|_| {
if rf.lock().unwrap().state.is_leader() {
let raft = rf.lock().unwrap();
Self::send_heartbeat(raft);
} else {
let millis = rand::thread_rng().gen_range(1000, 4000);
let election_timeout = Duration::from_millis(millis);
if rf.lock().unwrap().last_heartbeat.elapsed() > election_timeout {
Self::start_election(rf.clone());
}
}
Ok(())
})
.wait()
.unwrap();
});
2B
2B 部分为日志复制,实现的过程中有部分精力用于重构 2A。
2B 本身不复杂,但需要注意一些细节:term
、prev_log_term
、match_index
、next_index
。
例如,由于粗心,AppendEntries
参数中把 term
做为 prev_log_term
传递;
以及 next_index
维护逻辑、日志 up-to-date
的判断条件等。
总之,精力浪费在了寻找各种小细节导致的 bug 和重构,不过过程中思路愈加清晰。
2B 部分完成后的 Raft 线程除了发送心跳外,可以接收其他事件并进行相应的逻辑,大致如下:
fn start_raft_thread(raft: Arc<Mutex<Raft>>) -> UnboundedSender<Event> {
let (event_tx, event_rx) = futures::sync::mpsc::unbounded();
raft.lock().unwrap().event_ch = Some(event_tx.clone());
std::thread::spawn(move || {
let event_rx = event_rx.map_err(|_| ()).map(Some);
Interval::new(INTERVAL_PERIOD)
.map(|_| None)
.map_err(|_| ())
.select(event_rx)
.for_each(|event| {
if let Some(event) = event {
raft.lock().unwrap().handle_event(event);
} else {
raft.lock().unwrap().step();
}
Ok(())
})
.wait()
.unwrap();
});
event_tx
}
Node
结构体则持有 Sender
, 由于个别场景直接用锁执行 Raft
方法非常方便,
没有完全去除 Arc<Mutex<Raft>>
:
#[derive(Clone)]
pub struct Node {
sender: UnboundedSender<Event>,
raft: Arc<Mutex<Raft>>,
}
目前来看,Node
可以通过 sender
给 Raft
发送事件,但很多时候需要拿到事件的处理结果,
于是在 Event
上做文章:
enum Event {
RequestVote(RequestVoteArgs, oneshot::Sender<Reply>),
AppendEntries(AppendEntriesArgs, oneshot::Sender<Reply>),
VoteResult(u64, usize),
// ... 省略其他
}
#[derive(Clone)]
enum Reply {
RequestVote(RequestVoteReply),
AppendEntries(AppendEntriesReply),
}
Event
可以同时携带参数和用于发送处理结果的 Sender
。
如果需要某事件的处理结果,则创建新的 channel
,将参数和 sender
一并发给 Raft
,
而 Node
则将 receiver
作为一个 RpcFuture
返回给调用者,Raft
处理完后,用 send
发送处理结果。
例如:
let (tx, rx) = oneshot::channel();
let result = self
.sender
.unbounded_send(Event::AppendEntries(args, tx))
.map_err(|e| RpcError::Other(e.to_string()));
if let Err(e) = result {
return Box::new(future::err(e));
}
Box::new(rx.then(|reply| match reply {
Ok(Reply::AppendEntries(reply)) => Ok(reply),
_ => Err(RpcError::Timeout),
}))
2C
接下来实现持久化部分。本以为,2C 最简单,
只要在论文中所说的 persistent state
发生变化后及时调用 persist
即可。
然而完成后发现多个用例不通过。
第一个原因是,目前的实现存在一个 bug:没有判断事件的时效性。
用例模拟了网络不稳定,RPC 调用有快有慢、以及特定概率地超时等各种场景。
节点收到的事件可能不是本个 term
的,或者节点角色已发生变化,此时应丢弃改事件。
第二个原因是没有实现 kill
方法。该方法用于模拟节点崩溃的场景,
测试用例使某节点崩溃方式是调用其 kill
方法。此方法没有实现,
会导致本该崩溃的节点仍在运行,干扰正常流程。
第一个问题的解决比较简单,增加判断即可,第二个问题需要通知 Raft 线程退出。
在循环中跳出比较简单,break
即可,而 Raft 线程的逻辑在 Interval
中,
似乎是无穷的。经查阅文档发现 take_while
,完美解决:
Interval::new(INTERVAL_PERIOD)
.map(|_| None)
.map_err(|_| ())
.select(event_rx)
.take_while(|event| {
let has_next = if let Some(Event::Shutdown) = event {
info!("Peer {} shutdown", raft.lock().unwrap().me);
false
} else {
true
};
future::ok(has_next)
})
.for_each(|event| {
if let Some(event) = event {
raft.lock().unwrap().handle_event(event);
} else {
raft.lock().unwrap().step();
}
Ok(())
})
.wait()
.unwrap();
到这里,事情并未结束。仍有个别用例失败。经过观察发现,
unreliable
的情况下,RPC 是非常的 unreliable, 超时概率非常高,
这就导致 leader 选举频繁失败,通常数次才能成功选举,
而刚选举成功的 leader 极有可能连续多次往某个节点发送心跳失败,该节点重新发起选举。
目前的解决方案是增加心跳的频率,比如原来的 200ms 一次,到现在的 50ms 一次, 心跳成功率不够,用次数弥补。 加大选举超时时长,允许一定次数的心跳丢失。不过这种方式会使 RPC 次数增多,影响效率。