作者: Unmesh Joshi
译者: java达人
由一台服务器来协调一组服务器的复制。
问题
为了在管理数据的系统中实现容错,需要将数据复制到多台服务器上。为客户保持一致性也很重要。在多台服务器上更新数据时,需要决定何时使更新对客户端可见。写入和读取Quorum 机制是不够的,因为某些故障情况可能导致客户端查看不一致的数据。每个单一的服务器都不了解quorum中其他服务器上的数据状态,仅当从多个服务器读取数据时,才能解决不一致问题。在某些情况下,这还不够。需要对发送到客户端的数据有更强的一致性保证。
解决方案
在集群中选择一台服务器作为领导者。领导者负责代表整个集群做出决策,并将决策传播到所有其他服务器。
每台服务器在启动时都会寻找一个现有的领导者。如果未找到领导者,则会触发一次领导者选举。仅在成功选择领导者后,服务器才会接受请求。只有领导者处理客户的请求。如果将请求发送到追随者服务器,则追随者可以将其转发到领导者服务器。
领导者选举
Figure 1: Election
Figure 2: Votes
Figure 3: Leader Heartbeats
对于三到五个节点的较小集群,例如在实现共识的系统中,可以在数据集群本身内实现领导者选举,而无需依赖任何外部系统。领导者选举在服务器启动时发生。每个服务器在启动时都会开始领导者选举,并尝试选举一个领导者。除非选举出领导者,否则系统不接受任何客户请求。如“Generation Clock”模式中所述,每次领导人选举也都需要更新generation 编号。服务器可以始终处于领导者,追随者或候选者三种状态之一
public enum ServerRole { LOOKING_FOR_LEADER, FOLLOWING, LEADING;}
HeartBeat机制用于检测现有的领导者是否失败,以便可以开始新的领导者选举。通过向每个成员服务器发送一条请求投票的消息来开始新的领导者选举。
class ReplicationModule… private void startLeaderElection() { replicationState.setGeneration(replicationState.getGeneration() + 1); registerSelfVote(); requestVoteFrom(followers); }选举算法
选举领导者时要考虑两个因素。
因为这些系统主要用于数据复制,它对选举成功作了额外的限制。只有最近更新的服务器才能成为合法的领导者。例如,在典型的基于共识的系统中,“最近更新”是由两者定义的:
?最新的Generation Clock?Write-Ahead Log中的最近日志索引
如果所有服务器都处于最新状态,则根据以下条件选择领导者:
?一些特定实现的标准,例如哪个服务器排名更高或具有更大的ID。(例如Zab)?确保同时只有一台服务器要求投票,哪个先于其他服务器开始选举。(例如Raft)
在给定的Generation Clock为服务器投票后,将始终为该generation返回相同的投票。这样可以确保在已经成功进行选举的情况下,不会选举其他请求同generation的投票的服务器。投票请求处理如下:
class ReplicationModule… VoteResponse handleVoteRequest(VoteRequest voteRequest) { VoteTracker voteTracker = replicationState.getVoteTracker(); Long requestGeneration = voteRequest.getGeneration(); if (replicationState.getGeneration() > requestGeneration) { return rejectVote();
} else if (replicationState.getGeneration() < requestGeneration) { becomeFollower(requestGeneration); voteTracker.registerVote(voteRequest.getServerId()); return grantVote(); }
return handleVoteRequestForSameGeneration(voteRequest); }
private VoteResponse handleVoteRequestForSameGeneration(VoteRequest voteRequest) { Long requestGeneration = voteRequest.getGeneration(); VoteTracker voteTracker = replicationState.getVoteTracker();
if (voteTracker.alreadyVoted()) { return voteTracker.grantedVoteForSameServer(voteRequest.getServerId()) ? grantVote():rejectVote();
}
if (voteRequest.getLogIndex() >= (Long) wal.getLastLogEntryId()) { becomeFollower(requestGeneration); voteTracker.registerVote(voteRequest.getServerId()); return grantVote(); }
return rejectVote(); }
private void becomeFollower(Long generation) { replicationState.setGeneration(generation); transitionTo(ServerRole.FOLLOWING); }
private VoteResponse grantVote() { return VoteResponse.granted(serverId(), replicationState.getGeneration(), wal.getLastLogEntryId()); }
private VoteResponse rejectVote() { return VoteResponse.rejected(serverId(), replicationState.getGeneration(), wal.getLastLogEntryId()); }
接收大多数服务器投票的服务器转换为领导者状态。如Quorum中所述确定多数派。一旦当选,领导者不断地发送心跳给所有的追随者。如果跟随者在指定的时间间隔内未获得心跳,则会触发新的领导者选举。
使用外部[Linearizable]存储进行领导者选举
在较小的集群中,在数据集群中进行领导者选举非常有效。对于可能多达数千个节点的大型数据集群,使用诸如Zookeeper或etcd之类的外部存储更加容易。(它内部使用共识并提供线性一致性保证)。这些大型集群通常具有标记为主节点或控制器节点的服务器,该服务器代表整个集群做出所有决策。实现领导者选举需要三个功能:
?compareAndSwap指令,用于自动设置key。?心跳key过期的实现,如果从当选的领导者没有收到心跳,让新的选举可以被触发。?一种通知机制,用于在key过期时通知所有订阅的服务器。
为了选举领导者,每台服务器都使用compareAndSwap指令尝试在外部存储中创建key,并且首先成功的服务器将被选为领导者。根据使用的外部存储,创建key的时间很短。当选领导者多次在这个时间前更新值以使其有效。每个服务器都可以对此key进行观察,并且如果key过期没有在存活时间范围被领导者更新,则服务器会收到通知。例如 etcd仅在先前不存在key的情况下才允许设置key操作,从而允许compareAndSwap操作。在Zookeeper中,不支持任何显式的compareAndSwap操作,但是可以通过尝试创建一个节点并在该节点已经存在的情况下抛出异常来实现该操作。也没有明确的生存时间,但是Zookeeper有一个临时节点的概念。服务器与Zookeeper有活跃会话,该节点将一直存在,否则该节点将被删除,并且将通知所有正在监视该节点的人。例如,可以使用Zookeeper来选举领导者,如下所示:
class ServerImpl… public void startup() { zookeeperClient.subscribeLeaderChangeListener(this); elect(); }
public void elect() { var leaderId = serverId; try { zookeeperClient.tryCreatingLeaderPath(leaderId); this.currentLeader = serverId; onBecomingLeader(); } catch (ZkNodeExistsException e) { //back off this.currentLeader = zookeeperClient.getLeaderId(); } }
所有其他服务器都在监视现有领导者是否存活。当检测到现有领导者掉线时,将触发新的领导者选举。使用与领导者选举相同的外部linearizable存储来进行故障检测。该外部存储还具有实现组成员身份和故障检测机制的功能。例如,通过扩展上述基于Zookeeper的实现,可以使用Zookeeper配置变更监听器,当现有领导者节点发生变更时触发该变更监听器。
class ZookeeperClient… public void subscribeLeaderChangeListener(IZkDataListener listener) { zkClient.subscribeDataChanges(LeaderPath, listener); }
集群中的每个服务器都订阅了这个更改,每当触发回调时,就会按照上面所示的相同方式再次触发新的选举。
class ServerImpl… @Override public void handleDataDeleted(String dataPath) throws Exception { elect(); }
诸如etcd或Consul之类的系统可以以相同的方式用于实现领导者选举。
并发性、锁和状态更新 状态更新可以通过使用Singular Update Queue来完成,而无需操作同步和锁
ZAB和RAFT 有两种主流实现,它们的领导者选举算法几乎没有细微的差别。Zab,作为Zookeeper选举算法的实现;Raft 在generation增加的时间点、服务器启动的默认状态以及如何确保没有投票分裂等方面存在细微的差异。只有选举产生的领导者才能增加generation数,选举要避免投票分裂,当多个服务器都处于最新状态时,通过确保每个服务器运行相同的逻辑来选择一个领导者。raft下,服务器默认以追随者状态启动,期望从现有的领导者那里获得心跳。如果没有接收到心跳信号,则通过增加generation来启动选举。在开始选举前使用随机超时来避免投票分裂。
为什么Quorum读/写不足以提供强大的一致性保证
看起来像由Cassandra这样的Dynamo式数据库提供的Quorum读/写足以在服务器发生故障时获得强大的一致性。但事实并非如此。考虑以下示例。假设我们有一个包含三台服务器的集群。变量x存储在所有三个服务器上。(其复制因子为3)。启动时x = 1。
?假设writer1写入x = 2,复制因子为3。写入请求将发送到所有三个服务器。在server1上写入成功,但在server2和server3上写入失败。(在向服务器1发送写请求后,网络故障或writer1只是进入了长时间的垃圾回收暂停状态。)?客户端c1从server1和server2读取x的值。因为server1具有最新值,所以它获得x = 2的最新值。?客户端c2触发对x的读取。但是Server1暂时关闭。因此,c1从server2,server3读取它,它们具有x的旧值,x = 1。因此,即使在c1读取最新值之后读取它,c2也会获得旧值。
这样,连续两次读取, 显示最新值消失了。一旦server1恢复,后续读取将提供最新值。并且假设读取修复或 Anti-Entropy机制正在运行,其余服务器也将“最终”获得最新值。但是,存储集群无法保证特定值对任何客户端可见后,即使服务器发生故障,所有后续读取也将继续获得该值。
例子
?对于实现共识的系统,重要的是只能有一台服务器协调复制过程的活动。正如Paxos Made Simple论文所述,这对于系统的活性很重要。?在Raft和Zab共识算法中,领导者选举是一个明确的阶段,发生在启动或领导者失败时?Viewstamp复制算法具有Primary的概念,与其他算法的领导者相似?Kafka有一个Controller,负责代表集群的其余部分做出所有决策。它对Zookeeper的事件做出反应,并且对于kafka中的每个分区,都有指定的领导者broker和追随者broker。领导者和跟随者的选择是由Controller broker完成的。