Gossip
Table of Contents
1. Gossip 协议简介
Gossip 又称为传染病算法(Epidemic Algorithm),使用 Gossip 协议可以使集群中节点的数据达到最终一致。 Gossip 消息中每个数据都带一个版本号,节点检查接收到的消息中的版本号,就可以知道哪些数据是新的,然后更新本地数据,同时把自己较新的数据发送给别人,这很像现实生活中的八卦,一传十,十传百,最后尽人皆知。
参考:
A. Demers, D. Greene, C. Hauser, W. Irish, J. Larson, S. Shenker, H. Sturgis, D. Swinehart, and D. Terry. “Epidemic Algorithms for Replicated Database Maintenance.” In Proc. Sixth Symp. on Principles of Distributed Computing, pp. 1–12, Aug. 1987.
Gossip protocols for large-scale distributed systems
https://managementfromscratch.wordpress.com/2016/04/01/introduction-to-gossip/
Distributed Systems - Concepts and Design, 5th Edition, 18.4 Case studies of highly available services: The gossip architecture, Bayou and Coda
1.1. 节点的状态(Infective/Susceptible/Removed)
每个节点的可能状态:
Infective – node that holds an update it is willing to share
Susceptible – node that has not yet received an update
Removed – node that has received an update but is no longer willing to share the update
注 1:这些概念来自于传染病数学模型,https://en.wikipedia.org/wiki/Mathematical_modelling_of_infectious_disease
注 2:在 Simple Epidemic 算法中假设节点不会处于 Removed 状态,而在 Complex Epidemic 中节点可能会处于 Removed 状态。
2. Anti-entropy (Simple Epidemic)
Anti-entropy 是一种“Simple Epidemic”,它的每个节点只有两个状态:Infective 和 Susceptible,所以又称为 SI Model。
2.1. Anti-entropy 描述
Anti-entropy 算法如图 1 所示(摘自 Gossip protocols for large-scale distributed systems )。
Figure 1: A generic gossip protocol
从图 1 中可知,每个节点进程中有两个线程。getPeer()表示随机选择其它的 Peer;prepareMsg()表示准备好当前节点中存在的数据(带版本信息);update()有下面三种实现(下面 time 就表示版本信息):
1、Push 策略:
if p.value.time > r.value.time then r.value ← p.value
2、Pull 策略:
if p.value.time < r.value.time then p.value ← r.value
3、Push-pull 策略(在这个策略下,节点信息可以更快地达到一致):
if p.value.time > r.value.time then r.value ← p.value else p.value ← r.value
2.2. Anti-entropy 简单实现
下面是一个 Anti-entropy 的简单实现:
#!/usr/bin/env python3 # -*- coding: utf-8 -*- from socketserver import BaseRequestHandler, UDPServer from socket import socket, AF_INET, SOCK_DGRAM import time import threading import random import sys if len(sys.argv) != 2 or sys.argv[1] not in [ '10001', '10002', '10003', '10004', '10005' ]: print('usage: ' + sys.argv[0] + ' 10001|10002|10003|10004|10005') sys.exit(1) CURRENT_SERV_HOST = 'localhost' CURRENT_SERV_PORT = int(sys.argv[1]) CURRENT_DATA_FILE = 'data_localhost_' + sys.argv[1] ### 下面是data_localhost_10001的例子: ### #key;value;version(or timestamp) ### a;100;3 ### b;200;16 ### c;300;4 PEERS = [ "localhost:10001", "localhost:10002", "localhost:10003", "localhost:10004", "localhost:10005" ] GOSSIP_INTERVAL = 3 MSG_TYPE_REQ = 'request' MSG_TYPE_REP = 'replay' def debug(s): print("[debug] " + s) def get_peers(num): "Get random num peers (exclude itself) from PEERS" current_peer = CURRENT_SERV_HOST + ':' + str(CURRENT_SERV_PORT) return random.sample([peer for peer in PEERS if peer != current_peer], num) def prepare_msg(msg_type): "第一行是消息的类型,后面每行都表示一个key,value,version三元组" msg = "msg_type=" + msg_type + '\n' try: with open(CURRENT_DATA_FILE, 'rt') as f: for line in f: if line.startswith('#'): pass else: msg += line except FileNotFoundError: pass return msg.encode('utf-8') def send_msg(peers, msg): s = socket(AF_INET, SOCK_DGRAM) for _, peer in enumerate(peers): debug('send msg to ' + peer) debug('msg is:[' + msg.decode("utf-8") + ']') hostaddr = peer.split(':')[0] port = peer.split(':')[1] s.sendto(msg, (hostaddr, int(port))) def active_thread_impl(gossip_interval): while True: time.sleep(gossip_interval) peers = get_peers(3) msg = prepare_msg(MSG_TYPE_REQ) send_msg(peers, msg) def update_data(received_msg): data = {} # read data from file try: with open(CURRENT_DATA_FILE, 'rt') as f: for line in f: if not line or line.startswith('#'): # empty or comment pass else: # format is: key;value;version(timestamp) token = line.split(';', 3) data[token[0]] = (token[1], token[2]) except FileNotFoundError: pass need_update = False for _, line in enumerate(received_msg.split('\n')): if not line: # empty line continue # format is: key;value;version(timestamp) # debug('received line=[' + line + ']') token = line.split(';', 3) key = token[0] # value = token[1] ver = token[2] # debug('rec version=[' + version + ']') if key in data: # update if received data is newer than current data ver_cur_data = data[key][1] if int(ver) > int(ver_cur_data): need_update = True data[key] = (token[1], token[2]) else: need_update = True data[key] = (token[1], token[2]) # write data back to file if need_update: merged_data = '' for key, value_and_ver in data.items(): merged_data += key + ";" + value_and_ver[0].rstrip( ) + ";" + value_and_ver[1].rstrip() + '\n' # debug('merged_data=[' + merged_data + ']') with open(CURRENT_DATA_FILE, 'wt') as f: f.write(merged_data) class MyRequestHandler(BaseRequestHandler): def handle(self): # print('Got connection from', self.client_address) # Get message and client socket buf, sock = self.request msg = buf.decode("utf-8") debug('Got message:[' + msg + ']') first_line = msg.split('\n')[0] if not first_line.startswith('msg_type='): debug('Got a bad message, just ignore it.') return msg_type = first_line[len('msg_type='):] # Send back replay msg if receive request msg if msg_type == MSG_TYPE_REQ: debug('Got a request msg, will send a replay msg.') # send replay msg my_msg = prepare_msg(MSG_TYPE_REQ) sock.sendto(my_msg, self.client_address) # update current peer's data update_data(msg[len(first_line + '\n'):]) def passive_thread_impl(): serv = UDPServer((CURRENT_SERV_HOST, CURRENT_SERV_PORT), MyRequestHandler) serv.serve_forever() # 一般采用UDP来实现Gossip数据交换 if __name__ == '__main__': t1 = threading.Thread(target=passive_thread_impl) t1.start() t2 = threading.Thread(target=active_thread_impl, args=(GOSSIP_INTERVAL, )) t2.start()
测试时,先准备一个数据文件 data_localhost_10001(作为第一个节点的数据),内容为:
#key;value;version(or timestamp) a;100;3 b;200;10 c;300;4
然后打开 5 个终端,分别执行下面命令启动一个节点。
$ python test.py 10001 # 终端1中执行 $ python test.py 10002 # 终端2中执行 $ python test.py 10003 # 终端3中执行 $ python test.py 10004 # 终端4中执行 $ python test.py 10005 # 终端5中执行
你可以发现,会生成文件 data_localhost_10002/data_localhost_10003/data_localhost_10004/data_localhost_10005,且内容和 data_localhost_10001 相同,这样 5 个节点的数据达到了一致。而且修改其中任意一个文件中某个 key 对应的 value(注意同时增大其 version),其它文件中这个 key 关联的数据会得到更新。
3. Rumor mongering (Complex Epidemic)
Rumor mongering 是一种“Complex Epidemic”,它的每个节点多了 Removed 状态,即有三个状态:Infective、Susceptible、Removed,所以又称为 SIR Model。
3.1. Remove 策略
Rumor mongering 相比 Anti-entropy,节点多了一个 Removed 状态,可以认为处于 Removed 状态的节点:“病”好了,不再传染给别人。
3.1.1. 哪种节点应该被 Remove?(Counter vs. Coin)
哪种节点应该被 Remove?有两种策略:
Counter: Removed after k unnecessary contacts.
Coin(random): Removed with probability 1/k after each unsuccessful attempt.
3.1.2. 什么时候执行 Remove 操作?(Blind vs. Feedback)
什么时候执行 Remove 操作?有两种策略:
Blind: Removal algorithm is executed in each cycle.
Feedback: Removal algorithm (coin/counter) is executed if the contacted node was in infective state.
3.2. Anti-entropy vs. Rumor mongering
Anti-entropy 的缺点在于它会占用较多的 traffic,优点在于节点最终都会及时得到更新。
Rumor mongering的优点是占用的traffic较少,但传播暂停后存在节点没有收到最新数据的可能性。