golang 实现延迟消息原理与方法

实现延迟消息具体思路我是看的下面这篇文章

https://mp.weixin.qq.com/s/eDMV25YqCPYjxQG-dvqSqQ

实现延迟消息最主要的两个结构:

环形队列:通过golang中的数组实现,分成3600个slot。

任务集合:通过map[key]*Task,每个slot一个map,map的值就是我们要执行的任务。

原理图如下:

实现代码如下:

package main;

import (
	"time"
	"errors"
	"fmt"
)

//延迟消息
type DelayMessage struct {
	//当前下标
	curIndex int;
	//环形槽
	slots [3600]map[string]*Task;
	//关闭
	closed chan bool;
	//任务关闭
	taskClose chan bool;
	//时间关闭
	timeClose chan bool;
	//启动时间
	startTime time.Time;
}

//执行的任务函数
type TaskFunc func(args ...interface{});

//任务
type Task struct {
	//循环次数
	cycleNum int;
	//执行的函数
	exec   TaskFunc;
	params []interface{};
}

//创建一个延迟消息
func NewDelayMessage() *DelayMessage {
	dm := &DelayMessage{
		curIndex:  0,closed:    make(chan bool),taskClose: make(chan bool),timeClose: make(chan bool),startTime: time.Now(),};
	for i := 0; i < 3600; i++ {
		dm.slots[i] = make(map[string]*Task);
	}
	return dm;
}

//启动延迟消息
func (dm *DelayMessage) Start() {
	go dm.taskLoop();
	go dm.timeLoop();
	select {
	case <-dm.closed:
		{
			dm.taskClose <- true;
			dm.timeClose <- true;
			break;
		}
	};
}

//关闭延迟消息
func (dm *DelayMessage) Close() {
	dm.closed <- true;
}

//处理每1秒的任务
func (dm *DelayMessage) taskLoop() {
	defer func() {
		fmt.Println("taskLoop exit");
	}();
	for {
		select {
		case <-dm.taskClose:
			{
				return;
			}
		default:
			{
				//取出当前的槽的任务
				tasks := dm.slots[dm.curIndex];
				if len(tasks) > 0 {
					//遍历任务,判断任务循环次数等于0,则运行任务
					//否则任务循环次数减1
					for k,v := range tasks {
						if v.cycleNum == 0 {
							go v.exec(v.params...);
							//删除运行过的任务
							delete(tasks,k);
						} else {
							v.cycleNum--;
						}
					}
				}
			}
		}
	}
}

//处理每1秒移动下标
func (dm *DelayMessage) timeLoop() {
	defer func() {
		fmt.Println("timeLoop exit");
	}();
	tick := time.NewTicker(time.Second);
	for {
		select {
		case <-dm.timeClose:
			{
				return;
			}
		case <-tick.C:
			{
				fmt.Println(time.Now().Format("2006-01-02 15:04:05"));
				//判断当前下标,如果等于3599则重置为0,否则加1
				if dm.curIndex == 3599 {
					dm.curIndex = 0;
				} else {
					dm.curIndex++;
				}
			}
		}
	}
}

//添加任务
func (dm *DelayMessage) AddTask(t time.Time,key string,exec TaskFunc,params []interface{}) error {
	if dm.startTime.After(t) {
		return errors.New("时间错误");
	}
	//当前时间与指定时间相差秒数
	subSecond := t.Unix() - dm.startTime.Unix();
	//计算循环次数
	cycleNum := int(subSecond / 3600);
	//计算任务所在的slots的下标
	ix := subSecond % 3600;
	//把任务加入tasks中
	tasks := dm.slots[ix];
	if _,ok := tasks[key]; ok {
		return errors.New("该slots中已存在key为" + key + "的任务");
	}
	tasks[key] = &Task{
		cycleNum: cycleNum,exec:     exec,params:   params,};
	return nil;
}

func main() {
	//创建延迟消息
	dm := NewDelayMessage();
	//添加任务
	dm.AddTask(time.Now().Add(time.Second*10),"test1",func(args ...interface{}) {
		fmt.Println(args...);
	},[]interface{}{1,2,3});
	dm.AddTask(time.Now().Add(time.Second*10),"test2",[]interface{}{4,5,6});
	dm.AddTask(time.Now().Add(time.Second*20),"test3",[]interface{}{"hello","world","test"});
	dm.AddTask(time.Now().Add(time.Second*30),"test4",func(args ...interface{}) {
		sum := 0;
		for arg := range args {
			sum += arg;
		}
		fmt.Println("sum : ",sum);
	},3});

	//40秒后关闭
	time.AfterFunc(time.Second*40,func() {
		dm.Close();
	});
	dm.Start();
}

测试结果如下:

版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

相关推荐


Golang的文档和社区资源:为什么它可以帮助开发人员快速上手?
Golang:AI 开发者的实用工具
Golang的标准库:为什么它可以大幅度提高开发效率?
Golang的部署和运维:如何将应用程序部署到生产环境中?
高性能AI开发:Golang的优势所在
本篇文章和大家了解一下go语言开发优雅得关闭协程的方法。有一定的参考价值,有需要的朋友可以参考一下,希望对大家有所帮助。1.简介本文将介绍首先为什么需要主...
这篇文章主要介绍了Go关闭goroutine协程的方法,具有一定借鉴价值,需要的朋友可以参考下。下面就和我一起来看看吧。1.简介本文将介绍首先为什么需要主动关闭gor...
本篇文章和大家了解一下go关闭GracefulShutdown服务的几种方法。有一定的参考价值,有需要的朋友可以参考一下,希望对大家有所帮助。目录Shutdown方法Regi...
这篇文章主要介绍了Go语言如何实现LRU算法的核心思想和实现过程,具有一定借鉴价值,需要的朋友可以参考下。下面就和我一起来看看吧。GO实现Redis的LRU例子常
今天小编给大家分享的是Go简单实现多租户数据库隔离的方法,相信很多人都不太了解,为了让大家更加了解,所以给大家总结了以下内容,一起往下看吧。一定会...
这篇“Linux系统中怎么安装NSQ的Go语言客户端”文章的知识点大部分人都不太理解,所以小编给大家总结了以下内容,内容详细,步骤清晰,具有一定的借鉴价值,希
本文小编为大家详细介绍“怎么在Go语言中实现锁机制”,内容详细,步骤清晰,细节处理妥当,希望这篇“怎么在Go语言中实现锁机制”文章能帮助大家解决疑惑,下面...
今天小编给大家分享一下Go语言中interface类型怎么使用的相关知识点,内容详细,逻辑清晰,相信大部分人都还太了解这方面的知识,所以分享这篇文章给大家参考
这篇文章主要介绍“怎么以正确的方式替换Go语言程序自身”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希
本文小编为大家详细介绍“Go语言中除法运算的效率怎么提高”,内容详细,步骤清晰,细节处理妥当,希望这篇“Go语言中除法运算的效率怎么提高”文章能帮助大家解...
本文小编为大家详细介绍“Go语言中的next()方法怎么使用”,内容详细,步骤清晰,细节处理妥当,希望这篇“Go语言中的next()方法怎么使用”文章能帮助大家解决疑...
这篇文章主要介绍了Go语言中slice的反转方法怎么使用的相关知识,内容详细易懂,操作简单快捷,具有一定借鉴价值,相信大家阅读完这篇Go语言中slice的反转方法怎...
这篇文章主要介绍“怎么使用Go语言实现数据转发功能”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“怎么使用Go语
这篇文章主要讲解了“Go语言中怎么实现代码跳转”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究
这篇文章主要讲解了“Go语言如何多开协程”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Go语言如何多开协...