Kyle's Notebook

分布式系统模式:Gossip 传播(译)

Word count: 3.5kReading time: 14 min
2021/07/01

原文链接:Gossip Dissemination

分布式系统模式:Gossip 传播

通过随机选择的节点来传递信息,以确保消息到达集群中的所有节点,而且不会在网络中泛滥。

在大型集群中,每个节点都需要将其元数据传递其他所有节点(不依赖于共享存储)。

如果每个节点都与其他节点发生通信,则会消耗大量网络带宽;而且当某些网络链接出现问题,信息也应该到达所有节点。

解决方案

流行病、谣言和计算机通信

流行病学研究流行病或谣言如何在社会中传播,Gossip 就是传播基于流行病学的数学模型。其主要特征是即使每个人只随机接触少数几个人,它们也会传播得非常快。且即使只有很少的互动,整个人群也可能被感染。更具体地说,如果 n 是给定人口中的总人数,则交互作用与 log(n) 成正比。正如 Indranil Gupta 教授在他的 Gossip 分析中所讨论的,log(n) 几乎可以被视为一个常数。

这种特性对于在一组传播信息过程中非常有用。即使一个给定的进程只随机与几个进程通信,在很少的轮次中,集群中的所有节点都会有相同的信息。Hashicorp 有一个非常好的收敛模拟器,可用来演示即使在发生网络丢失和节点故障的情况下,信息传播到整个集群的速度有多快。

集群节点可使用 Gossip 风格的通信来传播状态更新:每个节点每次定期(比如 1s)随机选择一个其他节点来传递它所拥有的信息。

在大型集群中,需要考虑以下事项:

  • 要为每个节点生成的消息数量设置固定限额。

  • 消息不应消耗大量网络带宽。如设置数百 Kbs 的上限,以确保应用程序数据传输不会因集群消息太多而受到影响。

  • 元数据传播需要容忍一些网络和服务器故障,即使出现部分故障,它也应该到达所有集群节点。

每个集群节点将元数据存储为与节点关联的键值对列表,如下所示:

1
2
3
4
class Gossip {
// 节点 id -> 节点状态信息
Map<NodeId, NodeState> clusterMetadata = new HashMap<>();
}
1
2
3
4
class NodeState {
// 状态键值 -> 版本化值
Map<String, VersionedValue> values = new HashMap<>();
}

在集群启动时,每个集群节点都会添加关于自己的元数据(比如节点监听的 IP 地址和端口、负责的分区等),这些元数据需要传播到其他节点。

Gossip 实例需要知道至少一个其他节点才能启动通信,被其他节点知悉的集群节点被称为 种子节点介绍人(任何节点都可以充当),用于初始化 Gossip 实例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Gossip {

// 初始化 Gossip 实例
public Gossip(InetAddressAndPort listenAddress, List<InetAddressAndPort> seedNodes, String nodeId) throws IOException {
this.listenAddress = listenAddress;
// 过滤本节点以避免其称为种子节点。
this.seedNodes = removeSelfAddress(seedNodes);
this.nodeId = new NodeId(nodeId);
addLocalState(GossipKeys.ADDRESS, listenAddress.toString());
this.socketServer = new NIOSocketListener(newGossipRequestConsumer(), listenAddress);
}

// 添加本地节点状态。
private void addLocalState(String key, String value) {
NodeState nodeState = clusterMetadata.get(listenAddress);
if (nodeState == null) {
nodeState = new NodeState();
clusterMetadata.put(nodeId, nodeState);
}
nodeState.add(key, new VersionedValue(value, incremenetVersion()));
}
}

每个集群节点都会调度一个定时作业,用于将其拥有的元数据传输到其他节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class Gossip {

// 定时调度线程池。
private ScheduledThreadPoolExecutor gossipExecutor = new ScheduledThreadPoolExecutor(1);

// 调度周期
private long gossipIntervalMs = 1000;

// 异步返回值。
private ScheduledFuture<?> taskFuture;

public void start() {
socketServer.start();
taskFuture = gossipExecutor.scheduleAtFixedRate(
()-> doGossip(), gossipIntervalMs, gossipIntervalMs, TimeUnit.MILLISECONDS
);
}
}

当定时任务被调用时,从元数据映射的节点列表中随机选取一组节点。其中使用常数 Gossip fanout 定义可以作为 Gossip 目标的节点(活跃节点)个数。

如果没有任何已知的活跃节点,它会随机选择一个种子节点,并将它持有的元数据映射发送到该节点。

使用 UDP 还是 TCP?

Gossip 通信假设网络不可靠,因此可以使用 UDP 作为传输机制。

但是集群节点一般需要保证状态快速收敛,因此使用基于 TCP 的传输来交换 Gossip 状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class Gossip {

// 发送 Gossip 消息。
public void doGossip() {
// 查询活跃节点,如果查询结果为空,则发送消息到种子节点,否则发送到活跃节点。
List<InetAddressAndPort> knownClusterNodes = liveNodes();
if (knownClusterNodes.isEmpty()) {
sendGossip(seedNodes, gossipFanout);
} else {
sendGossip(knownClusterNodes, gossipFanout);
}
}

// 查询活跃节点。
private List<InetAddressAndPort> liveNodes() {
Set<InetAddressAndPort> nodes = clusterMetadata.values()
.stream()
.map(n -> InetAddressAndPort.parse(n.get(GossipKeys.ADDRESS).getValue()))
.collect(Collectors.toSet());
return removeSelfAddress(nodes);
}

// 发送指定个数的 Gossip 消息。
private void sendGossip(List<InetAddressAndPort> knownClusterNodes, int gossipFanout) {
if (knownClusterNodes.isEmpty()) {
return;
}
for (int i = 0; i < gossipFanout; i++) {
InetAddressAndPort nodeAddress = pickRandomNode(knownClusterNodes);
sendGossipTo(nodeAddress);
}
}

// 发送 Gossip 消息到指定 ip。
private void sendGossipTo(InetAddressAndPort nodeAddress) {
try {
getLogger().info("Sending gossip state to " + nodeAddress);
SocketClient<RequestOrResponse> socketClient = new SocketClient(nodeAddress);
GossipStateMessage gossipStateMessage = new GossipStateMessage(this.clusterMetadata);
RequestOrResponse request = createGossipStateRequest(gossipStateMessage);
byte[] responseBytes = socketClient.blockingSend(request);
GossipStateMessage responseState = deserialize(responseBytes);
merge(responseState.getNodeStates());

} catch (IOException e) {
getLogger().error("IO error while sending gossip state to " + nodeAddress, e);
}
}

private RequestOrResponse createGossipStateRequest(GossipStateMessage gossipStateMessage) {
return new RequestOrResponse(RequestId.PushPullGossipState.getId(), JsonSerDes.serialize(gossipStateMessage), correlationId++);
}

}

收到 Gossip 消息的节点检查它拥有的元数据,并找到:

  • 传入消息中持有,但在此节点的 state map 中不可用的值。

  • 此节点持有,但传入的 Gossip 消息没有的值。

  • 此节点已持有传入消息中的值时,将选择较高的版本值。

然后该节点将缺失值添加到自己的 state map 中。无论传入消息中缺少什么值,都将作为响应返回。

发送 Gossip 消息的节点,会将它从 Gossip 响应中获得的值添加到自己的状态中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class Gossip {

private void handleGossipRequest(org.distrib.patterns.common.Message<RequestOrResponse> request) {
GossipStateMessage gossipStateMessage = deserialize(request.getRequest());
Map<NodeId, NodeState> gossipedState = gossipStateMessage.getNodeStates();
getLogger().info("Merging state from " + request.getClientSocket());

// 合并接收的消息和已持有的消息。
merge(gossipedState);

// 筛选缺失的差值,作为响应返回。
Map<NodeId, NodeState> diff = delta(this.clusterMetadata, gossipedState);
GossipStateMessage diffResponse = new GossipStateMessage(diff);
getLogger().info("Sending diff response " + diff);
request.getClientSocket().write(new RequestOrResponse(RequestId.PushPullGossipState.getId(), JsonSerDes.serialize(diffResponse), request.getRequest().getCorrelationId()));
}

// 筛选差值。
public Map<NodeId, NodeState> delta(Map<NodeId, NodeState> fromMap, Map<NodeId, NodeState> toMap) {
Map<NodeId, NodeState> delta = new HashMap<>();
for (NodeId key : fromMap.keySet()) {

// 缺失的值放入响应集中。
if (!toMap.containsKey(key)) {
delta.put(key, fromMap.get(key));
continue;
}
NodeState fromStates = fromMap.get(key);
NodeState toStates = toMap.get(key);
NodeState diffStates = fromStates.diff(toStates);
if (!diffStates.isEmpty()) {
delta.put(key, diffStates);
}
}
return delta;
}

// 合并状态。
public void merge(Map<NodeId, NodeState> otherState) {
Map<NodeId, NodeState> diff = delta(otherState, this.clusterMetadata);
for (NodeId diffKey : diff.keySet()) {
// 传入消息中持有,但在此节点的 state map 中不可用的值。
if(!this.clusterMetadata.containsKey(diffKey)) {
this.clusterMetadata.put(diffKey, diff.get(diffKey));
}
// 节点已持有传入消息中的值时,将选择较高的版本值。
else {
NodeState stateMap = this.clusterMetadata.get(diffKey);
stateMap.putAll(diff.get(diffKey));
}
}
}
}

这个过程在每个集群节点每隔固定的时间发生一次,每次选择不同的节点来交换状态。

避免不必要的状态交换

上面的代码表明,在 Gossip 消息中发送了节点的完整状态。在节点刚加入时很好,但当状态都是最新的,就没有发送完整状态的必要了。集群节点只需要发送自上次 Gossip 传递以来的状态变化。

为了实现这一点,每个节点维护一个版本号,每次在本地添加新的元数据条目时该版本号都会递增。

1
2
3
4
5
6
7
8
9
class Gossip {

// 状态版本值,初始化为 1,每次添加新条目时递增。
private int gossipStateVersion = 1;

private int incremenetVersion() {
return gossipStateVersion++;
}
}

集群元数据中的每个值都使用一个版本号来维护。版本化值 的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class VersionedValue {

// 版本号。
int version;

// 值。
String value;

public VersionedValue(String value, int version) {
this.version = version;
this.value = value;
}

public int getVersion() {
return version;
}

public String getValue() {
return value;
}
}

每个 Gossip 循环都可以在特定版本中交换状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class Gossip {

private void sendKnownVersions(InetAddressAndPort gossipTo) throws IOException {
// 获取每个节点的最大版本号。
Map<NodeId, Integer> maxKnownNodeVersions = getMaxKnownNodeVersions();
// 交换状态时关心最大版本号。
RequestOrResponse knownVersionRequest = new RequestOrResponse(RequestId.GossipVersions.getId(), JsonSerDes.serialize(new GossipStateVersions(maxKnownNodeVersions)), 0);
SocketClient<RequestOrResponse> socketClient = new SocketClient(gossipTo);
byte[] knownVersionResponseBytes = socketClient.blockingSend(knownVersionRequest);
}

// 获取节点的最大版本号。
private Map<NodeId, Integer> getMaxKnownNodeVersions() {
return clusterMetadata.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().maxVersion()));
}

}
1
2
3
4
5
class NodeState {
public int maxVersion() {
return values.values().stream().map(v -> v.getVersion()).max(Comparator.naturalOrder()).orElse(0);
}
}

只有当版本大于请求中的版本时,接收节点才能发送值(表示接收节点的状态比发送节点的状态新)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class Gossip {

Map<NodeId, NodeState> getMissingAndNodeStatesHigherThan(Map<NodeId, Integer> nodeMaxVersions) {
Map<NodeId, NodeState> delta = new HashMap<>();

// 选取当前版本大于请求中的版本的节点及其状态。
delta.putAll(higherVersionedNodeStates(nodeMaxVersions));

// 选取当前节点持有、接收消息中不持有的节点及其状态。
delta.putAll(missingNodeStates(nodeMaxVersions));
return delta;
}

private Map<NodeId, NodeState> missingNodeStates(Map<NodeId, Integer> nodeMaxVersions) {
Map<NodeId, NodeState> delta = new HashMap<>();
List<NodeId> missingKeys = clusterMetadata.keySet().stream().filter(key -> !nodeMaxVersions.containsKey(key)).collect(Collectors.toList());
for (NodeId missingKey : missingKeys) {
delta.put(missingKey, clusterMetadata.get(missingKey));
}
return delta;
}

private Map<NodeId, NodeState> higherVersionedNodeStates(Map<NodeId, Integer> nodeMaxVersions) {
Map<NodeId, NodeState> delta = new HashMap<>();
Set<NodeId> keySet = nodeMaxVersions.keySet();
for (NodeId node : keySet) {
Integer maxVersion = nodeMaxVersions.get(node);
NodeState nodeState = clusterMetadata.get(node);
if (nodeState == null) {
continue;
}

// 筛选版本大于请求中的版本时,接收节点才能发送值。
NodeState deltaState = nodeState.statesGreaterThan(maxVersion);
if (!deltaState.isEmpty()) {
delta.put(node, deltaState);
}
}
return delta;
}
}

Cassandra 中实现了 Gossip 传播,并通过三向握手来优化状态交换,其中接收 Gossip 消息的节点除了自身元数据以外,还向发送方返回它需要的版本原属一句。然后发送方可以立即使用它所请求的元数据进行响应,这能避免原本的额外消息传递。

CockroachDB 使用 Gossip 协议维护每个连接节点的状态。对于每个连接,它维护发送到该节点的最后一个版本,以及从该节点接收的版本。使之可以发送 自上次发送版本以来的状态 并请求 来自上次接收版本的状态

另外也可以使用其他有效的替代方法,比如发送 state map 的 hash 值,如果 hash 值相同就什么都不做。

选择 Gossip 节点

集群随机选择发送 Gossip 消息的节点。在 Java 中可以使用 java.util.Random 实现随机:

1
2
3
4
5
6
7
8
9
10
class Gossip {

private Random random = new Random();

private InetAddressAndPort pickRandomNode(List<InetAddressAndPort> knownClusterNodes) {
int randomNodeIndex = random.nextInt(knownClusterNodes.size());
InetAddressAndPort gossipTo = knownClusterNodes.get(randomNodeIndex);
return gossipTo;
}
}

节点选择还有其他考虑因素,例如最少接触的节点(CockroachDB)、网络拓扑感知的 Gossip 目标选择,都可以在 pickRandomNode() 方法中模块化实现。

组成员与故障检测

Gossip 协议其中一个最常见的用法是维护集群中可用节点的列表:

  • Swim-Gossip:使用一个独立的探测组件,不断探测集群中的节点是否可用。检测结果会通过 Gossip 通信传播到整个集群。探测器随机选择一个节点来发送 Gossip 消息。如果接收节点检测到这是新信息,它会立即将消息发送到随机选择的节点。这样集群中节点故障或新节点加入很快就会被整个集群知悉。

  • 集群节点可以通过心跳反映自己定期更新的状态,通过交换的 Gossip 消息将此状态传播到整个集群。然后每个节点可以检查它是否在固定时间内收到了特定集群节点的更新,或者将该节点标记为关闭。在这种情况下,每个集群节点独立地确定其他节点是否存活。

最终一致性

Gossip 协议实现的信息交换本质上最终是一致的。即使它的状态收敛非常快,在集群识别新节点或检测到节点故障之前也会有一些延迟。对于需要强一致性的操作,需要使用 Consistent Core

在同一个集群中使用两种协议是很常见的做法,例如 Consul 利用 Gossip 协议实现组成员管理和故障检测,但使用基于 Raft 的 Consistent Core 来存储强一致性服务目录。

处理节点重启

如果节点崩溃或重新启动,由于内存中所有的状态都将丢失,版本化值将无法正常工作。更重要的是,节点对于同一个键可以有不同的值,例如集群节点可以从不同的 IP 地址和端口开始,也可以从不同的配置开始。

分代时钟可用每个值来标记代数,当元数据状态发送到随机节点时,接收节点可以通过版本号和代数来检测变化。

需要注意的是,这种机制对于核心 Gossip 协议的工作而言不是必需的,但它在实践中可以确保跟踪状态更改。

应用实例

  • Cassandra 利用 Gossip 协议实现集群的组成员管理和故障检测。每个集群节点的元数据(例如分配给每个节点的令牌)也使用 Gossip 协议传输。

  • Consul 利用 Swim-Gossip 协议实现组成员管理和 Consul 代理故障检测。

  • CockroachDB 利用 Gossip 协议来传播节点元数据。

  • Hyperledger Fabric 等区块链实现利用 Gossip 协议进行组成员管理和总账元数据的发送。

CATALOG
  1. 1. 分布式系统模式:Gossip 传播
    1. 1.1. 解决方案
      1. 1.1.1. 流行病、谣言和计算机通信
      2. 1.1.2. 使用 UDP 还是 TCP?
    2. 1.2. 避免不必要的状态交换
    3. 1.3. 选择 Gossip 节点
    4. 1.4. 组成员与故障检测
      1. 1.4.1. 最终一致性
    5. 1.5. 处理节点重启
    6. 1.6. 应用实例