搭建zookeeper的服务就不说,上网一大堆,直接上代码
common.go
package main import ( "fmt" "time" "github.com/go-zookeeper/zk" ) func GetConnect() (conn *zk.Conn, err error) { hosts := []string{"localhost:8090"} conn, _, err = zk.Connect(hosts, 5*time.Second) if err != nil { fmt.Println(err) } // _, err = conn.Create("/config", nil, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) // fmt.Println("err:", err) // stat, err := conn.Set("/config", []byte("hello world"), -1) // fmt.Println("stat:", stat) // fmt.Println("err:", err) // buf, stat, err := conn.Get("/config") // fmt.Println("buf:", string(buf)) // fmt.Println("stat:", stat) // fmt.Println("err:", err) return } func RegistServer(conn *zk.Conn, host string) (err error) { _, err = conn.Create("/go_servers/"+host, nil, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) return } func GetServerList(conn *zk.Conn) (list []string, err error) { list, _, err = conn.Children("/go_servers") fmt.Println("list:", list) return } //watch机制,服务器有断开或者重连,收到消息 func watchServerList(conn *zk.Conn, path string) (chan []string, chan error) { snapshots := make(chan []string) errors := make(chan error) go func() { for { snapshot, _, events, err := conn.ChildrenW(path) if err != nil { errors <- err return } snapshots <- snapshot evt := <-events if evt.Err != nil { errors <- evt.Err return } } }() return snapshots, errors } //watch机制,监听配置文件变化的过程 func watchGetDat(conn *zk.Conn, path string) (chan []byte, chan error) { snapshots := make(chan []byte) errors := make(chan error) go func() { for { dataBuf, _, events, err := conn.GetW(path) if err != nil { errors <- err return } snapshots <- dataBuf evt := <-events if evt.Err != nil { errors <- evt.Err return } } }() return snapshots, errors } func checkError(err error) { if err != nil { fmt.Println("err:", err) //panic(err) } }
client.go
package main /** 客户端doc地址:github.com/samuel/go-zookeeper/zk **/ import ( "errors" "fmt" "math/rand" "net" "time" ) var serverList []string func main() { conn, err := GetConnect() if err != nil { fmt.Printf(" connect zk error: %s \n ", err) return } defer conn.Close() serverList, err = GetServerList(conn) if err != nil { fmt.Printf(" get server list error: %s \n", err) return } count := len(serverList) if count == 0 { err = errors.New("server list is empty \n") return } //用来实时监听服务的上线与下线功能,serverList时刻保持最新的在线服务 snapshots, errors := watchServerList(conn, "/go_servers") go func() { for { select { case serverList = <-snapshots: fmt.Printf("1111:%+v\n", serverList) case err := <-errors: fmt.Printf("2222:%+v\n", err) } } }() configs, errors := watchGetDat(conn, "/config") go func() { for { select { case configData := <-configs: fmt.Printf("333:%+v\n", string(configData)) case err := <-errors: fmt.Printf("4444:%+v\n", err) } } }() for { time.Sleep(1 * time.Second) } for i := 0; i < 100; i++ { fmt.Println("start Client :", i) startClient() time.Sleep(1 * time.Second) } } func startClient() { defer func() { if err := recover(); err != nil { fmt.Println("err:", err) } }() // service := "127.0.0.1:8899" //获取地址 serverHost, err := getServerHost() if err != nil { fmt.Printf("get server host fail: %s \n", err) return } //serverHost := "127.0.0.1:8899" fmt.Println("connect host: " + serverHost) //tcpAddr, err := net.ResolveTCPAddr("tcp4", serverHost) //checkError(err) conn, err := net.Dial("tcp", serverHost) checkError(err) defer conn.Close() fmt.Println("connect ok") _, err = conn.Write([]byte("timestamp")) checkError(err) fmt.Println("write ok") // result, err := ioutil.ReadAll(conn) // checkError(err) // fmt.Println("recv:", string(result)) return } func getServerHost() (host string, err error) { //随机选中一个返回 r := rand.New(rand.NewSource(time.Now().UnixNano())) host = serverList[r.Intn(3)] return }
server.go
package main /** 客户端doc地址:github.com/samuel/go-zookeeper/zk **/ import ( "fmt" "net" "os" ) func main() { go starServer("127.0.0.1:9897") go starServer("127.0.0.1:9898") go starServer("127.0.0.1:9899") a := make(chan bool, 1) <-a } func starServer(port string) { tcpAddr, err := net.ResolveTCPAddr("tcp4", port) fmt.Println(tcpAddr) checkError(err) listener, err := net.ListenTCP("tcp", tcpAddr) checkError(err) //注册zk节点q conn, err := GetConnect() if err != nil { fmt.Printf(" connect zk error: %s ", err) } defer conn.Close() err = RegistServer(conn, port) if err != nil { fmt.Printf(" regist node error: %s ", err) } for { conn, err := listener.Accept() if err != nil { fmt.Fprintf(os.Stderr, "Error: %s", err) continue } go handleCient(conn, port) } fmt.Println("aaaaaa") } func handleCient(conn net.Conn, port string) { fmt.Println("new client:", conn.RemoteAddr()) for { buf := make([]byte, 1024) length, err := conn.Read(buf) if err != nil { fmt.Println("Error reading:", err.Error()) return } fmt.Println("Receive data from client:", string(buf[:length])) // conn.Write([]byte("hello world")) } }