传播流言蜚语

使用随机选择节点来传递信息,以确保信息到达集群中的所有节点,而不会淹没网络

2021年6月17日

问题

在节点集群中,每个节点都需要将其拥有的元数据信息传递给集群中的所有其他节点,而不依赖于共享存储。在大型集群中,如果所有服务器都与其他所有服务器通信,那么会消耗大量的网络带宽。即使某些网络连接出现问题,信息也应该到达所有节点。

解决方案

集群节点使用八卦样式通信来传播状态更新。每个节点选择一个随机节点来传递它所拥有的信息。这是一个有规律的间隔,比如每1秒。每一次,都选择一个随机节点来传递信息。

在大型集群中,需要考虑以下事项:

  • 对每个服务器生成的消息数量设置一个固定的限制
  • 这些消息不应该消耗大量的网络带宽。应该有一个上限,比如几百Kbs,以确保应用程序的数据传输不会受到跨集群的太多消息的影响。
  • 元数据传播应该能够容忍网络和少量服务器故障。它应该能够到达所有的集群节点,即使一些网络连接断开,或者一些服务器出现故障。

正如侧边栏中所讨论的,八卦风格的交流满足了所有这些需求。

每个集群节点将元数据存储为与每个节点关联的键值对列表,如下所示:

类八卦……

Map clusterMetadata = new HashMap<>();

类NodeState……

Map values = new HashMap<>();

在启动时,每个集群节点添加关于自身的元数据,需要将其传播到其他节点。元数据的一个例子可以是节点监听的IP地址和端口、它负责的分区等等。八卦实例需要知道至少一个其他节点才能启动八卦通信。用于初始化Gossip实例的众所周知的集群节点称为种子节点或引入节点。任何节点都可以作为引入器。

类八卦……

public Gossip(InetAddressAndPort listenAddress, List seedNodes, String nodeId) throws IOException {this。listenAddress = listenAddress;//过滤这个节点本身,以防它是种子节点的一部分。seedNodes = removeSelfAddress (seedNodes);这一点。nodeId = new nodeId (nodeId);addLocalState (GossipKeys。地址,listenAddress.toString ());这一点。socketServer = new NIOSocketListener(newgossip prequestconsumer (), listenAddress);} private void addLocalState(String key, String value) {NodeState NodeState = clusterMetadata.get(listenAddress);if (nodeState == null) {nodeState = new nodeState ();clusterMetadata。把(nodeId nodeState);} nodeState。add(key, new VersionedValue(value, incremenetVersion()));}

每个集群节点调度一个作业,以将其拥有的元数据定期传输到其他节点。

类八卦……

private ScheduledThreadPoolExecutor八卦executor = new ScheduledThreadPoolExecutor(1);private long gossipIntervalMs = 1000;私人ScheduledFuture < ?> taskFuture;public void start() {socketServer.start();taskFuture =八卦执行器。scheduleatfixedrate (()-> doGossip(),八卦间隔,八卦间隔,时间单位。毫秒);}

当调度任务被调用时,它会从元数据映射的服务器列表中选取一小组随机节点。一个小的常量定义为八卦扇出,它决定了有多少个节点可以作为八卦目标。如果还不知道什么,它就选择一个随机的种子节点,并将它拥有的元数据映射发送到该节点。

类八卦……

public void doGossip() {List knownClusterNodes = liveNodes();if (knownClusterNodes.isEmpty()) {sendGossip(seedNodes, gossipFanout);} else {sendGossip(knownClusterNodes, gossipFanout);}} private List liveNodes() {Set nodes = clusterMetadata.values() .stream() .map(n -> InetAddressAndPort.parse(n.get(xuepkeys.address).getValue())) .collect(collecters . toset ());返回removeSelfAddress(节点);}
private void sendGossip(List knownClusterNodes, int gossipFanout) {if (knownClusterNodes. isempty ()) {return;} for (int I = 0;我< gossipFanout;我+ +){InetAddressAndPort nodeAddress = pickRandomNode(knownClusterNodes);sendGossipTo (nodeAddress);}} private void sendGossipTo(InetAddressAndPort nodeAddress) {try {getLogger().info(“发送八卦状态”+ nodeAddress);SocketClient SocketClient = new SocketClient(nodeAddress);GossipStateMessage GossipStateMessage = new GossipStateMessage(this.clusterMetadata);RequestOrResponse请求= createGossipStateRequest(gossipStateMessage);byte[] responseBytes = socketClient.blockingSend(请求);闲话statemessage responseState =反序列化(responseBytes);合并(responseState.getNodeStates ());} catch (IOException e) {getLogger()。error("IO error while sending gossip state to " + nodeAddress, e); } } private RequestOrResponse createGossipStateRequest(GossipStateMessage gossipStateMessage) { return new RequestOrResponse(RequestId.PushPullGossipState.getId(), JsonSerDes.serialize(gossipStateMessage), correlationId++); }

接收流言消息的集群节点检查其拥有的元数据并发现三件事。

  • 传入消息中但在该节点的状态映射中不可用的值
  • 它所具有的价值,而传入的八卦信息却不具备
  • 当节点的值出现在传入消息中时,将选择较高的版本值

然后将缺失的值添加到自己的状态映射中。传入消息中缺少的任何值都将作为响应返回。

发送Gossip消息的集群节点将从Gossip响应获得的值添加到其自己的状态。

类八卦……

private void handlegossip (org. distribution .patterns.common. message  request) {GossipStateMessage GossipStateMessage =反序列化(request. getrequest ());Map闲聊状态=闲聊状态。getnodestates ();getLogger().info(“合并状态从”+ request.getClientSocket());合并(gossipedState);Map diff = delta(this。clusterMetadata gossipedState);GossipStateMessage diffResponse = new GossipStateMessage(diff);getLogger().info(“发送diff响应”+ diff);request.getClientSocket()。write(new RequestOrResponse(requesttid . pushpullgossip state . getid (), jsonserdesserialize (diffResponse), request.getRequest().getCorrelationId()));}
public Map delta(Map fromMap, Map toMap) {Map delta = new HashMap<>();for (NodeId key: fromap . keyset ()) {if (!把(关键fromMap.get(键));继续;} NodeState fromStates = fromMap.get(key);NodeState toStates = toMap.get(key);NodeState diffStates = fromStates.diff(toStates);if (!diffStates.isEmpty()) {把(关键,diffStates);}}返回delta;}
public void merge(Map otherState) {Map diff = delta(otherState, this.clusterMetadata);for (NodeId diffKey: diffKey . keyset ()) {if(!this.clusterMetadata. containskey (diffKey)) {把(diffKey diff.get (diffKey));} else {NodeState stateMap = this.clusterMetadata.get(diffKey);stateMap.putAll (diff.get (diffKey));}}}

这个过程每一秒钟在每个集群节点上发生一次,每次选择一个不同的节点来交换状态。

避免不必要的状态交换

上面的代码示例显示了在Gossip消息中发送节点的完整状态。这对于新加入的节点来说没有问题,但是一旦状态是最新的,就没有必要发送完整的状态。集群节点只需要发送自上次闲谈以来的状态更改。为了实现这一点,每个节点维护一个版本号,该版本号在每次本地添加新的元数据项时递增。

类八卦……

private int八卦stateversion = 1;private int incremenetVersion(){返回gossipStateVersion++;}

集群元数据中的每个值都使用一个版本号来维护。这是一个模式的例子版本化的价值

类VersionedValue……

int版本;字符串值;public VersionedValue(String value, int version) {this。版本=;这一点。值=价值;} public int getVersion(){返回版本;} public String getValue(){返回值;}

然后,每个八卦周期可以交换一个特定版本的状态。

类八卦……

private void sendKnownVersions(InetAddressAndPort gossipTo) throws IOException {Map maxKnownNodeVersions = getMaxKnownNodeVersions();RequestOrResponse knownVersionRequest = new RequestOrResponse(requesttid . gossip versions . getid (), JsonSerDes。序列化(新GossipStateVersions (maxKnownNodeVersions)), 0);SocketClient SocketClient = new SocketClient(gossipTo);字节[]knownVersionResponseBytes = socketClient.blockingSend(knownVersionRequest);} private Map getMaxKnownNodeVersions() {return clusterMetadata.entrySet() .stream() .collect(Collectors. getMaxKnownNodeVersions();toMap (e - > e.getKey (), e - > e.getValue () .maxVersion ()));}

类NodeState……

public int maxVersion() {return values.values().stream()。地图(v - > v.getVersion ()) .max (Comparator.naturalOrder ()) .orElse (0);}

只有当版本大于请求中的版本时,接收节点才能发送值。

类八卦……

Map getMissingAndNodeStatesHigherThan(Map nodeMaxVersions) {Map delta = new HashMap<>();delta.putAll (higherVersionedNodeStates (nodeMaxVersions));delta.putAll (missingNodeStates (nodeMaxVersions));返回三角洲;} private Map missingNodeStates(Map nodeMaxVersions) {Map delta = new HashMap<>();List missingKeys = clusterMetadata.keySet().stream()。过滤器(键- > ! nodeMaxVersions.containsKey(关键)).collect (Collectors.toList ());for (NodeId missingKey: missingKeys) {把(missingKey clusterMetadata.get (missingKey));}返回三角洲; } private Map higherVersionedNodeStates(Map nodeMaxVersions) { Map delta = new HashMap<>(); Set keySet = nodeMaxVersions.keySet(); for (NodeId node : keySet) { Integer maxVersion = nodeMaxVersions.get(node); NodeState nodeState = clusterMetadata.get(node); if (nodeState == null) { continue; } NodeState deltaState = nodeState.statesGreaterThan(maxVersion); if (!deltaState.isEmpty()) { delta.put(node, deltaState); } } return delta; }

八卦实现(cassandra)通过三次握手优化状态交换,其中接收流言消息的节点还从发送方发送它需要的版本,以及它返回的元数据。发送方可以立即响应请求的元数据。这避免了在其他情况下需要的额外消息。

八卦协议用于(cockroachdb)维护每个连接节点的状态。对于每个连接,它维护发送到该节点的最后版本,以及从该节点接收到的版本。这样它就可以发送'state since the last sent version',并请求'state from the last received version'。

还可以使用其他一些有效的替代方案,如发送整个Map的散列,如果散列相同,则不执行任何操作。

选择八卦节点的标准

集群节点随机选择发送八卦消息的节点。Java中的示例实现可以使用Java .util. random,如下所示:

类八卦……

private随机随机= new随机();private InetAddressAndPort pickRandomNode(List knownClusterNodes) {int randomNodeIndex = random.nextInt(knownClusterNodes.size());InetAddressAndPort八卦to = knownClusterNodes.get(randomNodeIndex);返回gossipTo;}

还可以考虑其他因素,比如最少接触的节点。例如,八卦协议Cockroachdb以这种方式选择节点。

network-topology-aware八卦对象选择的方式也存在。

这些都可以在pickRandomNode()方法中模块化地实现。

组成员和故障检测

维护集群中可用节点的列表是Gossip协议最常见的用法之一。目前有两种方法。

  • (swim-gossip)使用单独的探测组件连续探测集群中的不同节点,以检测它们是否可用。如果它检测到节点是活的或死的,该结果将通过Gossip通信传播到整个集群。探测器随机选择一个节点发送八卦消息。如果接收节点检测到这是新的信息,它立即将消息发送到随机选择的节点。通过这种方式,整个集群很快就会知道集群中某个节点或新加入的节点的故障。
  • 集群节点可以定期更新自己的状态,以反映其心跳。然后通过交换的八卦消息将此状态传播到整个集群。然后,每个集群节点可以检查是否在固定的时间内收到了特定集群节点的任何更新,或者将该节点标记为停机。在这种情况下,每个集群节点独立地决定一个节点是启动还是关闭。

处理节点重启

如果节点崩溃或重启,版本化的值就不能很好地工作,因为所有的内存状态都丢失了。更重要的是,对于相同的键,节点可以有不同的值。例如,集群节点可以使用不同的IP地址和端口启动,也可以使用不同的配置启动。一代时钟可以用每个值来标记生成,这样当元数据状态被发送到随机集群节点时,接收节点不仅可以通过版本号来检测变化,还可以通过生成来检测变化。

值得注意的是,核心Gossip协议并不是必须使用这种机制。但在实践中,它的实现是为了确保正确跟踪状态更改。

例子

(cassandra)使用Gossip协议对集群节点进行组成员和故障检测。每个集群节点的元数据(例如分配给每个集群节点的令牌)也使用Gossip协议进行传输。

(高)使用(swim-gossip)领事代理的组成员和故障检测协议。

(cockroachdb)使用八卦协议传播节点元数据。

区块链的实现Hyperledger织物使用八卦协议组成员和发送分类账元数据。

重大修改