Queue
In the development of daily tasks, we will have many asynchronous, batch, timing, and delayed tasks to be processed. There is go-queue in go-zero. It is recommended to use go-queue for processing. Go-queue itself is also developed based on go-zero. There are two modes
- dq : Depends on beanstalkd, distributed, can be stored, delayed, timing settings, shutdown and restart can be re-executed, messages will not be lost, very simple to use, redis setnx is used in go-queue to ensure that each message is only consumed once, usage scenarios Mainly used for daily tasks.
kq: Depends on Kafka, so I won’t introduce more about it, the famous Kafka, the usage scenario is mainly to do message queue
etc/job.yaml : Configuration file
Internal/config/config.go: Parse dq corresponding configuration
/**
* @Description Configuration file
* @Author Mikael
* @Email 13247629622@163.com
* @Date 2021/1/18 12:05
* @Version 1.0
**/
package config
import (
"github.com/tal-tech/go-queue/dq"
"github.com/tal-tech/go-zero/core/service"
)
type Config struct {
service.ServiceConf
DqConf dq.DqConf
}
ProducerLogic: One of the job business logic
/**
* @Description Producer task
* @Author Mikael
* @Email 13247629622@163.com
* @Date 2021/1/18 12:05
* @Version 1.0
**/
package logic
import (
"context"
"github.com/tal-tech/go-queue/dq"
"github.com/tal-tech/go-zero/core/logx"
"strconv"
"time"
)
type Producer struct {
ctx context.Context
svcCtx *svc.ServiceContext
logx.Logger
}
func NewProducerLogic(ctx context.Context, svcCtx *svc.ServiceContext) *Producer {
return &Producer{
ctx: ctx,
svcCtx: svcCtx,
Logger: logx.WithContext(ctx),
}
}
func (l *Producer)Start() {
logx.Infof("start Producer \n")
threading.GoSafe(func() {
producer := dq.NewProducer([]dq.Beanstalk{
{
Endpoint: "localhost:7771",
Tube: "tube1",
},
{
Endpoint: "localhost:7772",
Tube: "tube2",
},
})
_, err := producer.Delay([]byte(strconv.Itoa(i)), time.Second * 1)
if err != nil {
logx.Error(err)
}
}
})
}
func (l *Producer)Stop() {
logx.Infof("stop Producer \n")
}
Another job business logic
/**
* @Description Configuration
* @Author Mikael
* @Email 13247629622@163.com
* @Date 2021/1/18 12:05
* @Version 1.0
**/
package svc
import (
"job/internal/config"
"github.com/tal-tech/go-queue/dq"
)
type ServiceContext struct {
Config config.Config
Consumer dq.Consumer
}
func NewServiceContext(c config.Config) *ServiceContext {
return &ServiceContext{
Config: c,
Consumer: dq.NewConsumer(c.DqConf),
main.go startup file