Gossip

Table of Contents

1. Gossip 协议简介

Gossip 又称为传染病算法(Epidemic Algorithm),使用 Gossip 协议可以使集群中节点的数据达到最终一致。 Gossip 消息中每个数据都带一个版本号,节点检查接收到的消息中的版本号,就可以知道哪些数据是新的,然后更新本地数据,同时把自己较新的数据发送给别人,这很像现实生活中的八卦,一传十,十传百,最后尽人皆知。

参考:
A. Demers, D. Greene, C. Hauser, W. Irish, J. Larson, S. Shenker, H. Sturgis, D. Swinehart, and D. Terry. “Epidemic Algorithms for Replicated Database Maintenance.” In Proc. Sixth Symp. on Principles of Distributed Computing, pp. 1–12, Aug. 1987.
Gossip protocols for large-scale distributed systems
https://managementfromscratch.wordpress.com/2016/04/01/introduction-to-gossip/
Distributed Systems - Concepts and Design, 5th Edition, 18.4 Case studies of highly available services: The gossip architecture, Bayou and Coda

1.1. 节点的状态(Infective/Susceptible/Removed)

每个节点的可能状态:
Infective – node that holds an update it is willing to share
Susceptible – node that has not yet received an update
Removed – node that has received an update but is no longer willing to share the update

注 1:这些概念来自于传染病数学模型,https://en.wikipedia.org/wiki/Mathematical_modelling_of_infectious_disease
注 2:在 Simple Epidemic 算法中假设节点不会处于 Removed 状态,而在 Complex Epidemic 中节点可能会处于 Removed 状态。

2. Anti-entropy (Simple Epidemic)

Anti-entropy 是一种“Simple Epidemic”,它的每个节点只有两个状态:Infective 和 Susceptible,所以又称为 SI Model。

2.1. Anti-entropy 描述

Anti-entropy 算法如图 1 所示(摘自 Gossip protocols for large-scale distributed systems )。

gossip_anti_entropy.png

Figure 1: A generic gossip protocol

从图 1 中可知,每个节点进程中有两个线程。getPeer()表示随机选择其它的 Peer;prepareMsg()表示准备好当前节点中存在的数据(带版本信息);update()有下面三种实现(下面 time 就表示版本信息):
1、Push 策略:

if p.value.time > r.value.time then
    r.value ← p.value

2、Pull 策略:

if p.value.time < r.value.time then
    p.value ← r.value

3、Push-pull 策略(在这个策略下,节点信息可以更快地达到一致):

if p.value.time > r.value.time then
    r.value ← p.value
else
    p.value ← r.value

2.2. Anti-entropy 简单实现

下面是一个 Anti-entropy 的简单实现:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

from socketserver import BaseRequestHandler, UDPServer
from socket import socket, AF_INET, SOCK_DGRAM

import time
import threading
import random

import sys

if len(sys.argv) != 2 or sys.argv[1] not in [
        '10001', '10002', '10003', '10004', '10005'
]:
    print('usage: ' + sys.argv[0] + ' 10001|10002|10003|10004|10005')
    sys.exit(1)

CURRENT_SERV_HOST = 'localhost'
CURRENT_SERV_PORT = int(sys.argv[1])
CURRENT_DATA_FILE = 'data_localhost_' + sys.argv[1]
### 下面是data_localhost_10001的例子:
### #key;value;version(or timestamp)
### a;100;3
### b;200;16
### c;300;4

PEERS = [
    "localhost:10001", "localhost:10002", "localhost:10003", "localhost:10004",
    "localhost:10005"
]

GOSSIP_INTERVAL = 3

MSG_TYPE_REQ = 'request'
MSG_TYPE_REP = 'replay'


def debug(s):
    print("[debug] " + s)


def get_peers(num):
    "Get random num peers (exclude itself) from PEERS"
    current_peer = CURRENT_SERV_HOST + ':' + str(CURRENT_SERV_PORT)
    return random.sample([peer for peer in PEERS if peer != current_peer], num)


def prepare_msg(msg_type):
    "第一行是消息的类型,后面每行都表示一个key,value,version三元组"
    msg = "msg_type=" + msg_type + '\n'
    try:
        with open(CURRENT_DATA_FILE, 'rt') as f:
            for line in f:
                if line.startswith('#'):
                    pass
                else:
                    msg += line
    except FileNotFoundError:
        pass
    return msg.encode('utf-8')


def send_msg(peers, msg):
    s = socket(AF_INET, SOCK_DGRAM)
    for _, peer in enumerate(peers):
        debug('send msg to ' + peer)
        debug('msg is:[' + msg.decode("utf-8") + ']')
        hostaddr = peer.split(':')[0]
        port = peer.split(':')[1]
        s.sendto(msg, (hostaddr, int(port)))


def active_thread_impl(gossip_interval):
    while True:
        time.sleep(gossip_interval)
        peers = get_peers(3)
        msg = prepare_msg(MSG_TYPE_REQ)
        send_msg(peers, msg)


def update_data(received_msg):
    data = {}
    # read data from file
    try:
        with open(CURRENT_DATA_FILE, 'rt') as f:
            for line in f:
                if not line or line.startswith('#'):
                    # empty or comment
                    pass
                else:
                    # format is: key;value;version(timestamp)
                    token = line.split(';', 3)
                    data[token[0]] = (token[1], token[2])
    except FileNotFoundError:
        pass

    need_update = False
    for _, line in enumerate(received_msg.split('\n')):
        if not line:  # empty line
            continue
        # format is: key;value;version(timestamp)
        # debug('received line=[' + line + ']')
        token = line.split(';', 3)
        key = token[0]
        # value = token[1]
        ver = token[2]
        # debug('rec version=[' + version + ']')
        if key in data:
            # update if received data is newer than current data
            ver_cur_data = data[key][1]
            if int(ver) > int(ver_cur_data):
                need_update = True
                data[key] = (token[1], token[2])
        else:
            need_update = True
            data[key] = (token[1], token[2])

    # write data back to file
    if need_update:
        merged_data = ''
        for key, value_and_ver in data.items():
            merged_data += key + ";" + value_and_ver[0].rstrip(
            ) + ";" + value_and_ver[1].rstrip() + '\n'
            # debug('merged_data=[' + merged_data + ']')
        with open(CURRENT_DATA_FILE, 'wt') as f:
            f.write(merged_data)


class MyRequestHandler(BaseRequestHandler):
    def handle(self):
        # print('Got connection from', self.client_address)
        # Get message and client socket
        buf, sock = self.request
        msg = buf.decode("utf-8")
        debug('Got message:[' + msg + ']')
        first_line = msg.split('\n')[0]
        if not first_line.startswith('msg_type='):
            debug('Got a bad message, just ignore it.')
            return
        msg_type = first_line[len('msg_type='):]
        # Send back replay msg if receive request msg
        if msg_type == MSG_TYPE_REQ:
            debug('Got a request msg, will send a replay msg.')
            # send replay msg
            my_msg = prepare_msg(MSG_TYPE_REQ)
            sock.sendto(my_msg, self.client_address)
        # update current peer's data
        update_data(msg[len(first_line + '\n'):])


def passive_thread_impl():
    serv = UDPServer((CURRENT_SERV_HOST, CURRENT_SERV_PORT), MyRequestHandler)
    serv.serve_forever()     # 一般采用UDP来实现Gossip数据交换


if __name__ == '__main__':
    t1 = threading.Thread(target=passive_thread_impl)
    t1.start()

    t2 = threading.Thread(target=active_thread_impl, args=(GOSSIP_INTERVAL, ))
    t2.start()

测试时,先准备一个数据文件 data_localhost_10001(作为第一个节点的数据),内容为:

#key;value;version(or timestamp)
a;100;3
b;200;10
c;300;4

然后打开 5 个终端,分别执行下面命令启动一个节点。

$ python test.py 10001   # 终端1中执行
$ python test.py 10002   # 终端2中执行
$ python test.py 10003   # 终端3中执行
$ python test.py 10004   # 终端4中执行
$ python test.py 10005   # 终端5中执行

你可以发现,会生成文件 data_localhost_10002/data_localhost_10003/data_localhost_10004/data_localhost_10005,且内容和 data_localhost_10001 相同,这样 5 个节点的数据达到了一致。而且修改其中任意一个文件中某个 key 对应的 value(注意同时增大其 version),其它文件中这个 key 关联的数据会得到更新。

3. Rumor mongering (Complex Epidemic)

Rumor mongering 是一种“Complex Epidemic”,它的每个节点多了 Removed 状态,即有三个状态:Infective、Susceptible、Removed,所以又称为 SIR Model。

3.1. Remove 策略

Rumor mongering 相比 Anti-entropy,节点多了一个 Removed 状态,可以认为处于 Removed 状态的节点:“病”好了,不再传染给别人。

3.1.1. 哪种节点应该被 Remove?(Counter vs. Coin)

哪种节点应该被 Remove?有两种策略:
Counter: Removed after k unnecessary contacts.
Coin(random): Removed with probability 1/k after each unsuccessful attempt.

3.1.2. 什么时候执行 Remove 操作?(Blind vs. Feedback)

什么时候执行 Remove 操作?有两种策略:
Blind: Removal algorithm is executed in each cycle.
Feedback: Removal algorithm (coin/counter) is executed if the contacted node was in infective state.

3.2. Anti-entropy vs. Rumor mongering

Anti-entropy 的缺点在于它会占用较多的 traffic,优点在于节点最终都会及时得到更新。
Rumor mongering的优点是占用的traffic较少,但传播暂停后存在节点没有收到最新数据的可能性。

Author: cig01

Created: <2017-09-16 Sat>

Last updated: <2020-04-11 Sat>

Creator: Emacs 27.1 (Org mode 9.4)