zookeepr操作

使用人比较多的zookeeper go client有

操作zookeeper #

连接 #

package main

import (
	"fmt"
	"github.com/samuel/go-zookeeper/zk"
	"time"
)

func main() {
	// 创建zk连接地址
	hosts := []string{"127.0.0.1:2181"}
	// 连接zk
	conn, _, err := zk.Connect(hosts, time.Second*5)
	defer conn.Close()
	if err != nil {
		fmt.Println(err)
		return
	}
	println(conn.Server())
}

增删改查 #

package main

import (
	"fmt"
	"github.com/samuel/go-zookeeper/zk"
	"time"
)

var (
	path = "/test"
)

// 增
func add(conn *zk.Conn) {
	var data = []byte("test value")
	// flags有4种取值:
	// 0:永久,除非手动删除
	// zk.FlagEphemeral = 1:短暂,session断开则该节点也被删除
	// zk.FlagSequence  = 2:会自动在节点后面添加序号
	// 3:Ephemeral和Sequence,即,短暂且自动添加序号
	var flags int32 = 0
	// 获取访问控制权限
	acls := zk.WorldACL(zk.PermAll)
	s, err := conn.Create(path, data, flags, acls)
	if err != nil {
		fmt.Printf("创建失败: %v\n", err)
		return
	}
	fmt.Printf("创建: %s 成功", s)
}

// 查
func get(conn *zk.Conn) {
	data, _, err := conn.Get(path)
	if err != nil {
		fmt.Printf("查询%s失败, err: %v\n", path, err)
		return
	}
	fmt.Printf("%s 的值为 %s\n", path, string(data))
}

// 删改与增不同在于其函数中的version参数,其中version是用于 CAS支持
// 可以通过此种方式保证原子性
// 改
func modify(conn *zk.Conn) {
	new_data := []byte("hello zookeeper")
	_, sate, _ := conn.Get(path)
	_, err := conn.Set(path, new_data, sate.Version)
	if err != nil {
		fmt.Printf("数据修改失败: %v\n", err)
		return
	}
	fmt.Println("数据修改成功")
}

// 删
func del(conn *zk.Conn) {
	_, sate, _ := conn.Get(path)
	err := conn.Delete(path, sate.Version)
	if err != nil {
		fmt.Printf("数据删除失败: %v\n", err)
		return
	}
	fmt.Println("数据删除成功")
}

func main() {
	// 创建zk连接地址
	hosts := []string{"127.0.0.1:2181"}
	// 连接zk
	conn, _, err := zk.Connect(hosts, time.Second*5)
	defer conn.Close()
	if err != nil {
		fmt.Println(err)
		return
	}

	/* 增删改查 */
	//add(conn)
	//get(conn)
	//modify(conn)
	del(conn)
	get(conn)
}

WatchEvent #

全局监控 #

package main

import (
	"fmt"
	"github.com/samuel/go-zookeeper/zk"
	"time"
)

var (
	hosts       = []string{"127.0.0.1:2181"}
	path        = "/wtzk"
	flags int32 = zk.FlagEphemeral
	data        = []byte("zk data 001")
	acls        = zk.WorldACL(zk.PermAll)
)

func main() {
	// 创建监听的option,用于初始化zk
	eventCallbackOption := zk.WithEventCallback(callback)
	// 连接zk
	conn, _, err := zk.Connect(hosts, time.Second*5, eventCallbackOption)
	defer conn.Close()
	if err != nil {
		fmt.Println(err)
		return
	}

	// 开始监听path
	_, _, _, err = conn.ExistsW(path)
	if err != nil {
		fmt.Println(err)
		return
	}

	// 触发创建数据操作
	create(conn, path, data)

	//再次监听path
	_, _, _, err = conn.ExistsW(path)
	if err != nil {
		fmt.Println(err)
		return
	}
	// 触发删除数据操作
	del(conn, path)

}

// zk watch 回调函数
func callback(event zk.Event) {
	// zk.EventNodeCreated
	// zk.EventNodeDeleted
	fmt.Println("###########################")
	fmt.Println("path: ", event.Path)
	fmt.Println("type: ", event.Type.String())
	fmt.Println("state: ", event.State.String())
	fmt.Println("---------------------------")
}

// 创建数据
func create(conn *zk.Conn, path string, data []byte) {
	_, err := conn.Create(path, data, flags, acls)
	if err != nil {
		fmt.Printf("创建数据失败: %v\n", err)
		return
	}
	fmt.Println("创建数据成功")
}

// 删除数据
func del(conn *zk.Conn, path string) {
	_, stat, _ := conn.Get(path)
	err := conn.Delete(path, stat.Version)
	if err != nil {
		fmt.Printf("删除数据失败: %v\n", err)
		return
	}
	fmt.Println("删除数据成功")
}

局部监控 #

1.调用conn.ExistsW(path) 或GetW(path)为对应节点设置监听,该监听只生效一次 2.开启一个协程处理chanel中传来的event事件 (注意:watchCreataNode一定要放在一个协程中,不能直接在main中调用,不然会阻塞main)

package main

import (
	"fmt"
	"github.com/samuel/go-zookeeper/zk"
	"time"
)

var (
	hosts       = []string{"127.0.0.1:2181"}
	path        = "/wtzk"
	flags int32 = zk.FlagEphemeral
	data        = []byte("zk data 001")
	acls        = zk.WorldACL(zk.PermAll)
)

func main() {
	// 连接zk
	conn, _, err := zk.Connect(hosts, time.Second*5)
	defer conn.Close()
	if err != nil {
		fmt.Println(err)
		return
	}

	// 开始监听path
	_, _, event, err := conn.ExistsW(path)
	if err != nil {
		fmt.Println(err)
		return
	}

	// 协程调用监听事件
	go watchZkEvent(event)

	// 触发创建数据操作
	create(conn, path, data)

}

// zk 回调函数
func watchZkEvent(e <-chan zk.Event) {
	event := <-e
	fmt.Println("###########################")
	fmt.Println("path: ", event.Path)
	fmt.Println("type: ", event.Type.String())
	fmt.Println("state: ", event.State.String())
	fmt.Println("---------------------------")
}

// 创建数据
func create(conn *zk.Conn, path string, data []byte) {
	_, err := conn.Create(path, data, flags, acls)
	if err != nil {
		fmt.Printf("创建数据失败: %v\n", err)
		return
	}
	fmt.Println("创建数据成功")
}

1.如果即设置了全局监听又设置了部分监听,那么最终是都会触发的,并且全局监听在先执行 2.如果设置了监听子节点,那么事件的触发是先子节点后父节点

客户端随机hostname支持 #

最终就是Round Robin负载均衡算法

var hosts = []string{"host01:2181", "host02:2181", "host03:2181",}
hostPro := new(zk.DNSHostProvider)
//先初始化
err := hostPro.Init(hosts)

if err != nil {
	fmt.Println(err)
	return
}
//获得host
server, retryStart := hostPro.Next()
fmt.Println(server, retryStart)
//连接成功后会调用
hostPro.Connected()

探针程序示例 #

说明:客户端启动程序,注册节点到zookeeper,关闭探针,注册的节点删除,服务端去取节点查看探针是否启动

package main

import (
	"context"
	"fmt"
	"github.com/samuel/go-zookeeper/zk"
	"os"
	"os/signal"
	"syscall"
	"time"
)

var (
	hosts       = []string{"127.0.0.1:2181"}
	path        = "/wtzk"
	/*
	   flags有4种取值:
	   0:永久,除非手动删除
	   1:短暂,session断开则改节点也被删除
	   2:会自动在节点后面添加序号
	   3:即,短暂且自动添加序号
	*/
	flags int32 = 0
	data        = []byte("zk data 001")
	acls        = zk.WorldACL(zk.PermAll)
)

func main()  {
	ctx,cancel := context.WithCancel(context.Background())
	conn, _, err := zk.Connect(hosts, time.Second*5)
	if err != nil {
		fmt.Println(err)
		return
	}

	// 开始监听path
	ok, _,err := conn.Exists(path)
	if err != nil {
		fmt.Println(err)
		return
	}

	if !ok {
		create(conn, path, data)
	}

	go func(ctx context.Context) {
		t := time.NewTicker(time.Second * 5)
		for _ = range t.C {
			select {
			case <-ctx.Done():
				fmt.Println("关闭子协程")
				t.Stop()
				break
			default:
			}
			get(conn,path)
		}
	}(ctx)

	sigs := make(chan os.Signal,1)
	signal.Notify(sigs,syscall.SIGINT,syscall.SIGTERM)
	select {
	case <-sigs:
		del(conn,path)
		cancel()
		conn.Close()

	}
}


// zk 回调函数
func watchZkEvent(e <-chan zk.Event) {
	event := <-e
	fmt.Println("watchzkEvent func")
	fmt.Println("path: ", event.Path)
	fmt.Println("type: ", event.Type.String())
	fmt.Println("state: ", event.State.String())
	fmt.Println("---------------------------")
}

func callback(event zk.Event) {
	fmt.Println("callback func")
	fmt.Println("path: ", event.Path)
	fmt.Println("type: ", event.Type.String())
	fmt.Println("state: ", event.State.String())
	fmt.Println("---------------------------")
}

// 创建数据
func create(conn *zk.Conn, path string, data []byte) {
	_, err := conn.Create(path, data, flags, acls)
	if err != nil {
		fmt.Printf("创建数据失败: %v\n", err)
		return
	}
	fmt.Println("创建数据成功")
}

func del(conn *zk.Conn, path string) {
	_, stat, _ := conn.Get(path)
	err := conn.Delete(path, stat.Version)
	if err != nil {
		fmt.Printf("删除数据失败: %v\n", err)
		return
	}
	fmt.Println("删除数据成功")
}

func get(conn *zk.Conn,path string) {
	b, stat,err := conn.Get(path)
	if err != nil {
		fmt.Println(err)
	}
	fmt.Println("节点数据:", string(b))
	fmt.Println("节点信息", stat.Czxid, stat.Version)

}