Gossip
Table of Contents
1. Gossip 协议简介
Gossip 又称为传染病算法(Epidemic Algorithm),使用 Gossip 协议可以使集群中节点的数据达到最终一致。 Gossip 消息中每个数据都带一个版本号,节点检查接收到的消息中的版本号,就可以知道哪些数据是新的,然后更新本地数据,同时把自己较新的数据发送给别人,这很像现实生活中的八卦,一传十,十传百,最后尽人皆知。
参考:
A. Demers, D. Greene, C. Hauser, W. Irish, J. Larson, S. Shenker, H. Sturgis, D. Swinehart, and D. Terry. “Epidemic Algorithms for Replicated Database Maintenance.” In Proc. Sixth Symp. on Principles of Distributed Computing, pp. 1–12, Aug. 1987.
Gossip protocols for large-scale distributed systems
https://managementfromscratch.wordpress.com/2016/04/01/introduction-to-gossip/
Distributed Systems - Concepts and Design, 5th Edition, 18.4 Case studies of highly available services: The gossip architecture, Bayou and Coda
1.1. 节点的状态(Infective/Susceptible/Removed)
每个节点的可能状态:
Infective – node that holds an update it is willing to share
Susceptible – node that has not yet received an update
Removed – node that has received an update but is no longer willing to share the update
注 1:这些概念来自于传染病数学模型,https://en.wikipedia.org/wiki/Mathematical_modelling_of_infectious_disease
注 2:在 Simple Epidemic 算法中假设节点不会处于 Removed 状态,而在 Complex Epidemic 中节点可能会处于 Removed 状态。
2. Anti-entropy (Simple Epidemic)
Anti-entropy 是一种“Simple Epidemic”,它的每个节点只有两个状态:Infective 和 Susceptible,所以又称为 SI Model。
2.1. Anti-entropy 描述
Anti-entropy 算法如图 1 所示(摘自 Gossip protocols for large-scale distributed systems )。

Figure 1: A generic gossip protocol
从图 1 中可知,每个节点进程中有两个线程。getPeer()表示随机选择其它的 Peer;prepareMsg()表示准备好当前节点中存在的数据(带版本信息);update()有下面三种实现(下面 time 就表示版本信息):
1、Push 策略:
if p.value.time > r.value.time then
r.value ← p.value
2、Pull 策略:
if p.value.time < r.value.time then
p.value ← r.value
3、Push-pull 策略(在这个策略下,节点信息可以更快地达到一致):
if p.value.time > r.value.time then
r.value ← p.value
else
p.value ← r.value
2.2. Anti-entropy 简单实现
下面是一个 Anti-entropy 的简单实现:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from socketserver import BaseRequestHandler, UDPServer
from socket import socket, AF_INET, SOCK_DGRAM
import time
import threading
import random
import sys
if len(sys.argv) != 2 or sys.argv[1] not in [
'10001', '10002', '10003', '10004', '10005'
]:
print('usage: ' + sys.argv[0] + ' 10001|10002|10003|10004|10005')
sys.exit(1)
CURRENT_SERV_HOST = 'localhost'
CURRENT_SERV_PORT = int(sys.argv[1])
CURRENT_DATA_FILE = 'data_localhost_' + sys.argv[1]
### 下面是data_localhost_10001的例子:
### #key;value;version(or timestamp)
### a;100;3
### b;200;16
### c;300;4
PEERS = [
"localhost:10001", "localhost:10002", "localhost:10003", "localhost:10004",
"localhost:10005"
]
GOSSIP_INTERVAL = 3
MSG_TYPE_REQ = 'request'
MSG_TYPE_REP = 'replay'
def debug(s):
print("[debug] " + s)
def get_peers(num):
"Get random num peers (exclude itself) from PEERS"
current_peer = CURRENT_SERV_HOST + ':' + str(CURRENT_SERV_PORT)
return random.sample([peer for peer in PEERS if peer != current_peer], num)
def prepare_msg(msg_type):
"第一行是消息的类型,后面每行都表示一个key,value,version三元组"
msg = "msg_type=" + msg_type + '\n'
try:
with open(CURRENT_DATA_FILE, 'rt') as f:
for line in f:
if line.startswith('#'):
pass
else:
msg += line
except FileNotFoundError:
pass
return msg.encode('utf-8')
def send_msg(peers, msg):
s = socket(AF_INET, SOCK_DGRAM)
for _, peer in enumerate(peers):
debug('send msg to ' + peer)
debug('msg is:[' + msg.decode("utf-8") + ']')
hostaddr = peer.split(':')[0]
port = peer.split(':')[1]
s.sendto(msg, (hostaddr, int(port)))
def active_thread_impl(gossip_interval):
while True:
time.sleep(gossip_interval)
peers = get_peers(3)
msg = prepare_msg(MSG_TYPE_REQ)
send_msg(peers, msg)
def update_data(received_msg):
data = {}
# read data from file
try:
with open(CURRENT_DATA_FILE, 'rt') as f:
for line in f:
if not line or line.startswith('#'):
# empty or comment
pass
else:
# format is: key;value;version(timestamp)
token = line.split(';', 3)
data[token[0]] = (token[1], token[2])
except FileNotFoundError:
pass
need_update = False
for _, line in enumerate(received_msg.split('\n')):
if not line: # empty line
continue
# format is: key;value;version(timestamp)
# debug('received line=[' + line + ']')
token = line.split(';', 3)
key = token[0]
# value = token[1]
ver = token[2]
# debug('rec version=[' + version + ']')
if key in data:
# update if received data is newer than current data
ver_cur_data = data[key][1]
if int(ver) > int(ver_cur_data):
need_update = True
data[key] = (token[1], token[2])
else:
need_update = True
data[key] = (token[1], token[2])
# write data back to file
if need_update:
merged_data = ''
for key, value_and_ver in data.items():
merged_data += key + ";" + value_and_ver[0].rstrip(
) + ";" + value_and_ver[1].rstrip() + '\n'
# debug('merged_data=[' + merged_data + ']')
with open(CURRENT_DATA_FILE, 'wt') as f:
f.write(merged_data)
class MyRequestHandler(BaseRequestHandler):
def handle(self):
# print('Got connection from', self.client_address)
# Get message and client socket
buf, sock = self.request
msg = buf.decode("utf-8")
debug('Got message:[' + msg + ']')
first_line = msg.split('\n')[0]
if not first_line.startswith('msg_type='):
debug('Got a bad message, just ignore it.')
return
msg_type = first_line[len('msg_type='):]
# Send back replay msg if receive request msg
if msg_type == MSG_TYPE_REQ:
debug('Got a request msg, will send a replay msg.')
# send replay msg
my_msg = prepare_msg(MSG_TYPE_REQ)
sock.sendto(my_msg, self.client_address)
# update current peer's data
update_data(msg[len(first_line + '\n'):])
def passive_thread_impl():
serv = UDPServer((CURRENT_SERV_HOST, CURRENT_SERV_PORT), MyRequestHandler)
serv.serve_forever() # 一般采用UDP来实现Gossip数据交换
if __name__ == '__main__':
t1 = threading.Thread(target=passive_thread_impl)
t1.start()
t2 = threading.Thread(target=active_thread_impl, args=(GOSSIP_INTERVAL, ))
t2.start()
测试时,先准备一个数据文件 data_localhost_10001(作为第一个节点的数据),内容为:
#key;value;version(or timestamp) a;100;3 b;200;10 c;300;4
然后打开 5 个终端,分别执行下面命令启动一个节点。
$ python test.py 10001 # 终端1中执行 $ python test.py 10002 # 终端2中执行 $ python test.py 10003 # 终端3中执行 $ python test.py 10004 # 终端4中执行 $ python test.py 10005 # 终端5中执行
你可以发现,会生成文件 data_localhost_10002/data_localhost_10003/data_localhost_10004/data_localhost_10005,且内容和 data_localhost_10001 相同,这样 5 个节点的数据达到了一致。而且修改其中任意一个文件中某个 key 对应的 value(注意同时增大其 version),其它文件中这个 key 关联的数据会得到更新。
3. Rumor mongering (Complex Epidemic)
Rumor mongering 是一种“Complex Epidemic”,它的每个节点多了 Removed 状态,即有三个状态:Infective、Susceptible、Removed,所以又称为 SIR Model。
3.1. Remove 策略
Rumor mongering 相比 Anti-entropy,节点多了一个 Removed 状态,可以认为处于 Removed 状态的节点:“病”好了,不再传染给别人。
3.1.1. 哪种节点应该被 Remove?(Counter vs. Coin)
哪种节点应该被 Remove?有两种策略:
Counter: Removed after k unnecessary contacts.
Coin(random): Removed with probability 1/k after each unsuccessful attempt.
3.1.2. 什么时候执行 Remove 操作?(Blind vs. Feedback)
什么时候执行 Remove 操作?有两种策略:
Blind: Removal algorithm is executed in each cycle.
Feedback: Removal algorithm (coin/counter) is executed if the contacted node was in infective state.
3.2. Anti-entropy vs. Rumor mongering
Anti-entropy 的缺点在于它会占用较多的 traffic,优点在于节点最终都会及时得到更新。
Rumor mongering的优点是占用的traffic较少,但传播暂停后存在节点没有收到最新数据的可能性。