Go使用asynq

发布时间 2023-12-02 15:06:39作者: 朝阳1

asynq是基于reids的队列,支持多种形式

消费者

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/hibiken/asynq"
)

// HandleMsg 处理msg
func HandleMsg(ctx context.Context, t *asynq.Task) error {
	//fmt.Println("------HandleMsg start------")

	log.Printf("type: %v, payload: %s", t.Type(), string(t.Payload()))

	return nil
}

func main() {
	// asynq server
	srv := asynq.NewServer(
		asynq.RedisClientOpt{
			Addr: "192.168.252.128:6379",
			DB:   0,
		},
		asynq.Config{Concurrency: 20},
	)

	mux := asynq.NewServeMux()

	// some middlewares
	mux.Use(func(next asynq.Handler) asynq.Handler {
		return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
			// just record a log
			fmt.Println(fmt.Printf("[%s] log - %v", time.Now().Format("2006-01-02 15:04:05"), string(t.Payload())))

			return next.ProcessTask(ctx, t)
		})
	})

	// some workers
	mux.HandleFunc("msg", HandleMsg)

	// start server
	if err := srv.Start(mux); err != nil {
		log.Fatalf("could not start server: %v", err)
	}

	// Wait for termination signal.
	c := make(chan os.Signal, 1)

	signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)

	for {
		s := <-c
		switch s {
		case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT:
			fmt.Println("Program Exit...", s)
			srv.Shutdown()
			srv.Stop()
			return
		default:
			fmt.Println("other signal", s)
		}
	}
}

测试生产者

package main

import (
	"encoding/json"
	"fmt"
	"os"
	"testing"
	"time"

	"github.com/hibiken/asynq"
)

var c *asynq.Client

func TestMain(m *testing.M) {
	r := asynq.RedisClientOpt{
		Addr: "192.168.252.128:6379",
		DB:   0,
	}
	c = asynq.NewClient(r)
	ret := m.Run()
	c.Close()
	os.Exit(ret)
}

// 即时消费
func Test_Enqueue(t *testing.T) {
	payload := map[string]interface{}{"user_id": 1, "message": "i'm immediately message"}
	bytes, err := json.Marshal(payload)
	if err != nil {
		fmt.Println("转换失败:", err)
		return
	}
	task := asynq.NewTask("msg", bytes)
	res, err := c.Enqueue(task)
	if err != nil {
		t.Errorf("could not enqueue task: %v", err)
		t.FailNow()
	}
	fmt.Printf("Enqueued Result: %+v\n", res)
}

// 延时消费
func Test_EnqueueDelay(t *testing.T) {
	payload := map[string]interface{}{"user_id": 1, "message": "i'm delay 5 seconds message"}
	bytes, err := json.Marshal(payload)
	if err != nil {
		fmt.Println("转换失败:", err)
		return
	}
	task := asynq.NewTask("msg", bytes)
	res, err := c.Enqueue(task, asynq.ProcessIn(5*time.Second))
	// res, err := c.Enqueue(task, asynq.ProcessAt(time.Now().Add(5*time.Second)))
	if err != nil {
		t.Errorf("could not enqueue task: %v", err)
		t.FailNow()
	}
	fmt.Printf("Enqueued Result: %+v\n", res)
}

// 超时、重试、过期
func Test_EnqueueOther(t *testing.T) {
	payload := map[string]interface{}{"user_id": 1, "message": "i'm delay,try,timeout 5 seconds message"}
	bytes, err := json.Marshal(payload)
	if err != nil {
		fmt.Println("转换失败:", err)
		return
	}
	task := asynq.NewTask("msg", bytes)
	// 10秒超时,最多重试3次,20秒后过期
	res, err := c.Enqueue(task, asynq.MaxRetry(3), asynq.Timeout(10*time.Second), asynq.Deadline(time.Now().Add(20*time.Second)))
	if err != nil {
		t.Errorf("could not enqueue task: %v", err)
		t.FailNow()
	}
	fmt.Printf("Enqueued Result: %+v\n", res)
}