








开源库地址:https://github.com/hashicorp/memberlist
demo地址:https://github.com/asim/memberlist
我们熟知的alertmanaer和consul组件都是使用的memberlist gossip开源库来实现gossip业务逻辑的
问题:异步如何保证数据不会丢失?
broadcast消息无法完全保证。udp本就有一定的丢失率,但是只要有一个节点成功,数据就会同步到其他节点。如果100%丢失就不行了。如果需要可以调用其他保证可靠性的接口。broadcast一般用作增量数据的同步,memberlist还提供了同步全量数据的能力,通过pushPullMsg的tcp连接同步对端节点来保证可靠性,以达到最终一致性的目的。
pushPullMsg:
用作全量节点状态同步,由对端pushPullTrigger time定期触发。
收到对端的pushPullMsg之后,首先会将本地的nodes状态也应答给对方,这样就完整了一次完整的全量数据交换。此过程中可以携带用户数据,一般用作调用方全量数据同步。
package main
import (
"encoding/json"
"flag"
"fmt"
"net/http"
"os"
"strings"
"sync"
"github.com/hashicorp/memberlist"
"github.com/pborman/uuid"
)
var (
mtx sync.RWMutex
members = flag.String("members", "", "comma seperated list of members")
port = flag.Int("port", 4001, "http port")
items = map[string]string{}
broadcasts *memberlist.TransmitLimitedQueue
)
type broadcast struct {
msg []byte
notify chan<- struct{}
}
type delegate struct{}
type update struct {
Action string // add, del
Data map[string]string
}
func init() {
flag.Parse()
}
func (b *broadcast) Invalidates(other memberlist.Broadcast) bool {
return false
}
func (b *broadcast) Message() []byte {
return b.msg
}
func (b *broadcast) Finished() {
if b.notify != nil {
close(b.notify)
}
}
func (d *delegate) NodeMeta(limit int) []byte {
return []byte{}
}
func (d *delegate) NotifyMsg(b []byte) {
if len(b) == 0 {
return
}
switch b[0] {
case 'd': // data
var updates []*update
if err := json.Unmarshal(b[1:], &updates); err != nil {
return
}
mtx.Lock()
for _, u := range updates {
for k, v := range u.Data {
switch u.Action {
case "add":
items[k] = v
case "del":
delete(items, k)
}
}
}
mtx.Unlock()
}
}
func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
return broadcasts.GetBroadcasts(overhead, limit)
}
func (d *delegate) LocalState(join bool) []byte {
mtx.RLock()
m := items
mtx.RUnlock()
b, _ := json.Marshal(m)
return b
}
func (d *delegate) MergeRemoteState(buf []byte, join bool) {
if len(buf) == 0 {
return
}
if !join {
return
}
var m map[string]string
if err := json.Unmarshal(buf, &m); err != nil {
return
}
mtx.Lock()
for k, v := range m {
items[k] = v
}
mtx.Unlock()
}
type eventDelegate struct{}
func (ed *eventDelegate) NotifyJoin(node *memberlist.Node) {
fmt.Println("A node has joined: " + node.String())
}
func (ed *eventDelegate) NotifyLeave(node *memberlist.Node) {
fmt.Println("A node has left: " + node.String())
}
func (ed *eventDelegate) NotifyUpdate(node *memberlist.Node) {
fmt.Println("A node was updated: " + node.String())
}
func addHandler(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
key := r.Form.Get("key")
val := r.Form.Get("val")
mtx.Lock()
items[key] = val
mtx.Unlock()
b, err := json.Marshal([]*update{
{
Action: "add",
Data: map[string]string{
key: val,
},
},
})
if err != nil {
http.Error(w, err.Error(), 500)
return
}
broadcasts.QueueBroadcast(&broadcast{
msg: append([]byte("d"), b...),
notify: nil,
})
}
func delHandler(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
key := r.Form.Get("key")
mtx.Lock()
delete(items, key)
mtx.Unlock()
b, err := json.Marshal([]*update{{
Action: "del",
Data: map[string]string{
key: "",
},
}})
if err != nil {
http.Error(w, err.Error(), 500)
return
}
broadcasts.QueueBroadcast(&broadcast{
msg: append([]byte("d"), b...),
notify: nil,
})
}
func getHandler(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
key := r.Form.Get("key")
mtx.RLock()
val := items[key]
mtx.RUnlock()
w.Write([]byte(val))
}
func start() error {
hostname, _ := os.Hostname()
c := memberlist.DefaultLocalConfig()
c.Events = &eventDelegate{}
c.Delegate = &delegate{}
c.BindPort = 0
c.Name = hostname + "-" + uuid.NewUUID().String()
m, err := memberlist.Create(c)
if err != nil {
return err
}
if len(*members) > 0 {
parts := strings.Split(*members, ",")
_, err := m.Join(parts)
if err != nil {
return err
}
}
broadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
return m.NumMembers()
},
RetransmitMult: 3,
}
node := m.LocalNode()
fmt.Printf("Local member %s:%d\n", node.Addr, node.Port)
return nil
}
func main() {
if err := start(); err != nil {
fmt.Println(err)
}
http.HandleFunc("/add", addHandler)
http.HandleFunc("/del", delHandler)
http.HandleFunc("/get", getHandler)
fmt.Printf("Listening on :%d\n", *port)
if err := http.ListenAndServe(fmt.Sprintf(":%d", *port), nil); err != nil {
fmt.Println(err)
}
}
go get github.com/asim/memberlist
memberlist
-members="": comma seperated list of members
-port=4001: http port
Start first node
memberlist
Make a note of the local member address
Local member 192.168.1.64:60496
Listening on :4001
Start second node with first node as part of the member list
memberlist --members=192.168.1.64:60496 --port=4002
You should see the output
2015/10/17 22:13:49 [DEBUG] memberlist: Initiating push/pull sync with: 192.168.1.64:60496
Local member 192.168.1.64:60499
Listening on :4002
First node output will log the new connection
2015/10/17 22:13:49 [DEBUG] memberlist: TCP connection from: 192.168.1.64:60500
2015/10/17 22:13:52 [DEBUG] memberlist: Initiating push/pull sync with: 192.168.1.64:60499
HTTP API
Query params expected are key and val
# add
curl "http://localhost:4001/add?key=foo&val=bar"
# get
curl "http://localhost:4001/get?key=foo"
# delete
curl "http://localhost:4001/del?key=foo"