Queue

    In the development of daily tasks, we will have many asynchronous, batch, timing, and delayed tasks to be processed. There is go-queue in go-zero. It is recommended to use go-queue for processing. Go-queue itself is also developed based on go-zero. There are two modes

    • dq : Depends on beanstalkd, distributed, can be stored, delayed, timing settings, shutdown and restart can be re-executed, messages will not be lost, very simple to use, redis setnx is used in go-queue to ensure that each message is only consumed once, usage scenarios Mainly used for daily tasks.
    • kq: Depends on Kafka, so I won’t introduce more about it, the famous Kafka, the usage scenario is mainly to do message queue

    etc/job.yaml : Configuration file

    Internal/config/config.go: Parse dq corresponding configuration

    1. /**
    2. * @Description Configuration file
    3. * @Author Mikael
    4. * @Email 13247629622@163.com
    5. * @Date 2021/1/18 12:05
    6. * @Version 1.0
    7. **/
    8. package config
    9. import (
    10. "github.com/tal-tech/go-queue/dq"
    11. "github.com/tal-tech/go-zero/core/service"
    12. )
    13. type Config struct {
    14. service.ServiceConf
    15. DqConf dq.DqConf
    16. }

    ProducerLogic: One of the job business logic

    1. /**
    2. * @Description Producer task
    3. * @Author Mikael
    4. * @Email 13247629622@163.com
    5. * @Date 2021/1/18 12:05
    6. * @Version 1.0
    7. **/
    8. package logic
    9. import (
    10. "context"
    11. "github.com/tal-tech/go-queue/dq"
    12. "github.com/tal-tech/go-zero/core/logx"
    13. "strconv"
    14. "time"
    15. )
    16. type Producer struct {
    17. ctx context.Context
    18. svcCtx *svc.ServiceContext
    19. logx.Logger
    20. }
    21. func NewProducerLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Producer {
    22. return &Producer{
    23. ctx: ctx,
    24. svcCtx: svcCtx,
    25. Logger: logx.WithContext(ctx),
    26. }
    27. }
    28. func (l *Producer)Start() {
    29. logx.Infof("start Producer \n")
    30. threading.GoSafe(func() {
    31. producer := dq.NewProducer([]dq.Beanstalk{
    32. {
    33. Endpoint: "localhost:7771",
    34. Tube: "tube1",
    35. },
    36. {
    37. Endpoint: "localhost:7772",
    38. Tube: "tube2",
    39. },
    40. })
    41. _, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second * 1)
    42. if err != nil {
    43. logx.Error(err)
    44. }
    45. }
    46. })
    47. }
    48. func (l *Producer)Stop() {
    49. logx.Infof("stop Producer \n")
    50. }

    Another job business logic

    1. /**
    2. * @Description Configuration
    3. * @Author Mikael
    4. * @Email 13247629622@163.com
    5. * @Date 2021/1/18 12:05
    6. * @Version 1.0
    7. **/
    8. package svc
    9. import (
    10. "job/internal/config"
    11. "github.com/tal-tech/go-queue/dq"
    12. )
    13. type ServiceContext struct {
    14. Config config.Config
    15. Consumer dq.Consumer
    16. }
    17. func NewServiceContext(c config.Config) *ServiceContext {
    18. return &ServiceContext{
    19. Config: c,
    20. Consumer: dq.NewConsumer(c.DqConf),

    main.go startup file