1. 操作ETCD

    这里使用官方的包来连接etcd并进行相关操作。

    1.1.2. put和get操作

    put命令用来设置键值对数据,get命令用来根据key获取值。

    1. package main
    2. import (
    3. "context"
    4. "fmt"
    5. "time"
    6. "go.etcd.io/etcd/clientv3"
    7. )
    8. // etcd client put/get demo
    9. // use etcd/clientv3
    10. func main() {
    11. cli, err := clientv3.New(clientv3.Config{
    12. Endpoints: []string{"127.0.0.1:2379"},
    13. DialTimeout: 5 * time.Second,
    14. })
    15. if err != nil {
    16. // handle error!
    17. fmt.Printf("connect to etcd failed, err:%v\n", err)
    18. return
    19. }
    20. fmt.Println("connect to etcd success")
    21. defer cli.Close()
    22. // put
    23. ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    24. _, err = cli.Put(ctx, "lmh", "lmh")
    25. cancel()
    26. if err != nil {
    27. fmt.Printf("put to etcd failed, err:%v\n", err)
    28. return
    29. }
    30. // get
    31. ctx, cancel = context.WithTimeout(context.Background(), time.Second)
    32. resp, err := cli.Get(ctx, "lmh")
    33. cancel()
    34. if err != nil {
    35. fmt.Printf("get from etcd failed, err:%v\n", err)
    36. return
    37. }
    38. for _, ev := range resp.Kvs {
    39. fmt.Printf("%s:%s\n", ev.Key, ev.Value)
    40. }
    41. }

    watch用来获取未来更改的通知。

    1. package main
    2. import (
    3. "context"
    4. "fmt"
    5. "go.etcd.io/etcd/clientv3"
    6. )
    7. // watch demo
    8. func main() {
    9. cli, err := clientv3.New(clientv3.Config{
    10. Endpoints: []string{"127.0.0.1:2379"},
    11. DialTimeout: 5 * time.Second,
    12. })
    13. if err != nil {
    14. fmt.Printf("connect to etcd failed, err:%v\n", err)
    15. return
    16. }
    17. fmt.Println("connect to etcd success")
    18. defer cli.Close()
    19. // watch key:lmh change
    20. rch := cli.Watch(context.Background(), "lmh") // <-chan WatchResponse
    21. for wresp := range rch {
    22. for _, ev := range wresp.Events {
    23. fmt.Printf("Type: %s Key:%s Value:%s\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
    24. }
    25. }
    26. }

    例如:我们打开终端执行以下命令修改、删除、设置lmh这个key。

    上面的程序都能收到如下通知。

    1. watch>watch.exe
    2. connect to etcd success
    3. Type: PUT Key:lmh Value:lmh1
    4. Type: DELETE Key:lmh Value:
    5. Type: PUT Key:lmh Value:lmh2

    1.1.4. lease租约

    1. package main
    2. import (
    3. "fmt"
    4. "time"
    5. )
    6. // etcd lease
    7. import (
    8. "context"
    9. "log"
    10. "go.etcd.io/etcd/clientv3"
    11. )
    12. func main() {
    13. cli, err := clientv3.New(clientv3.Config{
    14. Endpoints: []string{"127.0.0.1:2379"},
    15. DialTimeout: time.Second * 5,
    16. })
    17. if err != nil {
    18. log.Fatal(err)
    19. }
    20. fmt.Println("connect to etcd success.")
    21. defer cli.Close()
    22. // 创建一个5秒的租约
    23. if err != nil {
    24. log.Fatal(err)
    25. }
    26. // 5秒钟之后, /lmh/ 这个key就会被移除
    27. _, err = cli.Put(context.TODO(), "/lmh/", "lmh", clientv3.WithLease(resp.ID))
    28. if err != nil {
    29. log.Fatal(err)
    30. }
    31. }

    1.1.6. 基于etcd实现分布式锁

    go.etcd.io/etcd/clientv3/concurrency在etcd之上实现并发操作,如分布式锁、屏障和选举。

    1. import "go.etcd.io/etcd/clientv3/concurrency"

    基于etcd实现的分布式锁示例:

    1. cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
    2. if err != nil {
    3. log.Fatal(err)
    4. }
    5. defer cli.Close()
    6. // 创建两个单独的会话用来演示锁竞争
    7. s1, err := concurrency.NewSession(cli)
    8. if err != nil {
    9. log.Fatal(err)
    10. }
    11. defer s1.Close()
    12. m1 := concurrency.NewMutex(s1, "/my-lock/")
    13. s2, err := concurrency.NewSession(cli)
    14. if err != nil {
    15. log.Fatal(err)
    16. }
    17. defer s2.Close()
    18. m2 := concurrency.NewMutex(s2, "/my-lock/")
    19. // 会话s1获取锁
    20. if err := m1.Lock(context.TODO()); err != nil {
    21. log.Fatal(err)
    22. }
    23. fmt.Println("acquired lock for s1")
    24. m2Locked := make(chan struct{})
    25. go func() {
    26. defer close(m2Locked)
    27. // 等待直到会话s1释放了/my-lock/的锁
    28. if err := m2.Lock(context.TODO()); err != nil {
    29. log.Fatal(err)
    30. }
    31. }()
    32. if err := m1.Unlock(context.TODO()); err != nil {
    33. log.Fatal(err)
    34. }
    35. fmt.Println("released lock for s1")
    36. <-m2Locked
    37. fmt.Println("acquired lock for s2")

    输出: