thinkphp-queuer消息队列与superviso进程守护

本文简单介绍消息队列的使用,以及在使用过程中的注意事项。
项目环境:thinkphp6.0.3 + think-queue3.0

一、think-queue 安装与基本使用

1、使用 composer 安装 thinkphp6 与 think-queue

# 安装 thinkphp6
composer create-project topthink/think 6.0.*
# 安装 think-queue
composer require topthink/think-queue

2、queue 的配置

think-queue 内置了 Redis,Database,Sync等驱动,
这里介绍使用Redis做消息队列存储,
结合supervisor进程管理使队列常驻进程。
配置文件位于 config/queue.php

return [
// 驱动类型,可选择 sync(默认):同步执行,database:数据库驱动,redis:Redis驱动//或其他自定义的完整的类名
'default' => 'redis',// Redis驱动
'connections' => [
...
'redis' => [
'type' => 'redis',// Redis 驱动
'queue' => 'default',// 默认的队列名称
'host' => '127.0.0.1',// redis 主机ip
'port' => 6379,// redis 端口
'password' => '123456',// redis 密码
'select' => 0,// 使用哪一个 db,默认为 db0
'timeout' => 0,// redis连接的超时时间
'persistent' => false,// 是否是长连接
],
...
]
];

3、创建任务类

单模块项目推荐使用 app\job 作为任务类的命名空间 多模块项目可用使用 app\module\job 作为任务类的命名空间 也可以放在任意可以自动加载到的地方

任务类不需继承任何类,如果这个类只有一个任务,那么就只需要提供一个fire方法就可以了,如果有多个小任务,就写多个方法,下面发布任务的时候会有区别
每个方法会传入两个参数 think\queue\Job $job(当前的任务对象) 和 $data(发布任务时自定义的数据)

namespace app\job;
use think\queue\Job;

class MyJob
{
public function fire(Job $job,$data){
print("开始执行队列"." \n");
//....这里执行具体的任务
if ($job->attempts() > 3) {
//通过这个方法可以检查这个任务已经重试了几次了
}

//如果任务执行成功后 记得删除任务,不然这个任务会重复执行,直到达到最大重试次数后失败后,执行failed方法
$job->delete();
// 也可以重新发布这个任务
$job->release($delay); //$delay为延迟时间
}

public function failed($data){
// ...任务达到最大重试次数后,失败了
print("队列执行失败"." \n");
}
}

4、发布任务(生产者)

think\facade\Queue::push($job,$data = '',$queue = null)
think\facade\Queue::later($delay,$job,$queue = null)
# 两个方法,前者是立即执行,后者是在$delay秒后执行
  1. $job 是任务名
    单模块的,且命名空间是app\job的,比如上面的例子一,写Job1类名即可
    多模块的,且命名空间是app\module\job的,写model/Job1
    其他的需要些完整的类名,比如上面的例子二,需要写完整的类名app\lib\job\Job2
    如果一个任务类里有多个小任务的话,如上面的例子二,需要用@+方法名app\lib\job\Job2@task1app\lib\job\Job2@task2
  2. $data 是你要传到任务里的参数
  3. $queue 队列名,指定这个任务是在哪个队列上执行,同下面监控队列的时候指定的队列名,可不填
  4. namespace app\controller;
    use think\facade\Queue;

    class MyQueue extends Base
    {
    public function index($job_name = '',$job_class = '',$job_data = [],$type = 'later',$time = 5) //jio_data 需要发送的数据
    {
    // 当前任务将由哪个类来负责处理。
    $job_class = "app\job\MyJob@fire"; // 任务类 - 执行时调用该类的deal方法
    $job_queue_name = 'Check'; // 队列名称
    empty($job_data) && $job_data = [ 'ts' => time(),'bizId' => uniqid(),'data' => $_GET ];

    switch ($type) {
    case 'later':
    $is_push = Queue::later($time,$job_class,$job_data,$job_queue_name); // 延迟发送任务 5秒
    break;
    case 'push':
    $is_push = Queue::push($job_class,$job_queue_name); // 立即发送任务
    break;
    }
    // database 驱动时,返回值为 1|false ; redis 驱动时,返回值为 随机字符串|false
    if ($is_push !== false) {
    echo date('Y-m-d H:i:s') . " 新增任务" . "
    ";
    } else {
    echo '新增任务错误';
    }
    }
    }

    打开浏览器访问 http://域名/MyQueue/index,就可以看到任务新增成功

  5. 5、处理任务(消费者)

    打开终端切换到当前项目根目录下,执行下面的命令:

  6. 命令 描述
    php think queue:work 监听队列
    php think queue:listen 监听队列
    php think queue:restart 重启队列
    php think queue:subscribe 暂无,可能是保留的 官方有什么其他想法但是还没实现
    work和listen的区别详细点击(think-queue 解析上)查看,
    两种,具体的可选参数可以输入命令加 --help 查看。
    (注意:使用不同版本的thinkphp时,可选用的参数不同)
    (如:thinkphp6中就取消掉了 --daemon 参数,队列默认为循环模式)
  7. php think queue:work --queue Check

  8. 二、supervisor的安装和配置

    supervisor是用Python开发的一个client/server服务,是Linux/Unix系统下的一个进程管理工具。
    可以很方便的监听、启动、停止、重启一个或多个进程。

    1、安装

    1.配置好yum源后,可以直接安装

    yum install supervisor

    2.Debian/Ubuntu可通过apt安装

  9. apt-get install supervisor

    3.Python使用pip安装

  10. pip install supervisor

    2、supervisor的相关配置

    本项目演示使用yum安装:
    打开 supervisor 的主配置文件:/etc/supervisord.conf ,找到最后一行

  11. [include]
    ;files = supervisord.d/*.ini
    files = /etc/supervisord.d/*.conf

    最后一行,个人习惯命名conf。
    进入子进程配置文件路径:/etc/supervisord.d/,新建 Check.conf,相关配置说明:

  12. # 项目名
    [program:Check]
    # 脚本目录
    directory=/www/.../Check
    # 脚本执行命令
    command=php think queue:work --queue Check

    # supervisor启动的时候是否随着同时启动,默认True
    autostart=true
    # 当程序exit的时候,这个program不会自动重启,默认unexpected,设置子进程挂掉后自动重启的情况,有三个选项,false,unexpected和true。如果为false的时候,无论什么情况下,都不会被重新启动,如果为unexpected,只有当进程的退出码不在下面的exitcodes里面定义的
    autorestart=false
    # 这个选项是子进程启动多少秒之后,此时状态如果是running,则我们认为启动成功了。默认值为1
    startsecs=1

    # 脚本运行的用户身份
    user=www

    # 日志输出
    stderr_logfile=/tmp/Check_stderr.log
    stdout_logfile=/tmp/Check_stdout.log
    # 把stderr重定向到stdout,默认 false
    redirect_stderr = true
    # stdout日志文件大小,默认 50MB
    stdout_logfile_maxbytes = 20M
    # stdout日志文件备份数
    stdout_logfile_backups = 20
    #说明同上
    [program:Check]
    directory=/www/.../Check
    command=php think queue:work --queue Check --tries 10
    autostart=true
    autorestart=false
    stderr_logfile=/tmp/Check_stderr.log
    stdout_logfile=/tmp/Check_stdout.log
    #user = www

    对于单模块而言,不同的业务逻辑为了区分可能会存在多个队列名,这种情况将多个队列名用逗号拼接起来,内容如下:

  13. [program:queue]
    user=www
    command=php /www/wwwroot/tpqueue/think queue:work --queue Check,Check1,Check2

    3、supervisor相关命令

    启动 supervisor

  14. supervisorctl -c /etc/supervisord.conf

    上面这个命令会进入 supervisorctl 的 shell 界面,然后可以执行不同的命令了:

  15. supervisorctl restart    // 重启指定应用
    supervisorctl stop // 停止指定应用
    supervisorctl start // 启动指定应用
    supervisorctl restart all // 重启所有应用
    supervisorctl stop all // 停止所有应用
    supervisorctl start all // 启动所有应用
    supervisorctl status //查看所有进程的状态
    supervisorctl update //配置文件修改后使用该命令加载新的配置
    supervisorctl reload //重新启动配置中的所有程序

    若是centos7,加入开机启动:

  16. systemctl start supervisord //启动supervisor并加载默认配置文件
    systemctl enable supervisord //将supervisor加入开机启动项
  17. 执行命令来验证是否为开机启动

  18. systemctl is-enabled supervisord

    注意事项:

    1、在消息对列表执行过程中,当消费者执行队列出现问题时(达到失败后最大尝试次数),将会自动化清除当前队列。

  19. 原文链接:https://fengkui.net/article/137


本文简单介绍消息队列的使用,以及在使用过程中的注意事项。
项目环境:thinkphp6.0.3 + think-queue3.0

一、think-queue 安装与基本使用

1、使用 composer 安装 thinkphp6 与 think-queue

# 安装 thinkphp6
composer create-project topthink/think 6.0.*
# 安装 think-queue
composer require topthink/think-queue

2、queue 的配置

think-queue 内置了 Redis,Database,Sync等驱动,
这里介绍使用Redis做消息队列存储,
结合supervisor进程管理使队列常驻进程。
配置文件位于 config/queue.php

return [
// 驱动类型,可选择 sync(默认):同步执行,database:数据库驱动,Check2

3、supervisor相关命令

启动 supervisor

  • supervisorctl -c /etc/supervisord.conf

    上面这个命令会进入 supervisorctl 的 shell 界面,然后可以执行不同的命令了:

  • supervisorctl restart    // 重启指定应用
    supervisorctl stop // 停止指定应用
    supervisorctl start // 启动指定应用
    supervisorctl restart all // 重启所有应用
    supervisorctl stop all // 停止所有应用
    supervisorctl start all // 启动所有应用
    supervisorctl status //查看所有进程的状态
    supervisorctl update //配置文件修改后使用该命令加载新的配置
    supervisorctl reload //重新启动配置中的所有程序

    若是centos7,加入开机启动:

  • systemctl start supervisord //启动supervisor并加载默认配置文件
    systemctl enable supervisord //将supervisor加入开机启动项
  • 执行命令来验证是否为开机启动

  • systemctl is-enabled supervisord

    注意事项:

    1、在消息对列表执行过程中,当消费者执行队列出现问题时(达到失败后最大尝试次数),将会自动化清除当前队列。

  • 原文链接:https://fengkui.net/article/137


  • 原文地址:https://fengkui.net/article/137

    版权声明:本文内容由互联网用户自发贡献,该文观点与技术仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 dio@foxmail.com 举报,一经查实,本站将立刻删除。

    相关推荐


    (1)创建数据表: CREATE TABLE IF NOT EXISTS `think_form` (   `id` smallint(4) unsigned NOT NULL AUTO_INCREMENT,
    组合查询的主体还是采用数组方式查询,只是加入了一些特殊的查询支持,包括字符串模式查询(_string)、复合查询(_complex)、请求字符串查询(_query),混合查询中的特殊查询每次查询只能定义一个,由于采用数组的
    (1)创建模版:/App/Home/View/Form/edit.html   <FORM method=\"post\" action=\"__URL__/update\">
    自定义配置文件user.php: <?php return array(    \'sex\'=>\'man\', ); config.php: <?php return array(
    在一些成熟的CMS系统中,后台一般都包含一个配置中心(如织梦后台中系统设置),以方便站长在后台修改配置文件;那么这个功能是如果实现的呢?在ThinkPHP中有没有捷径可走呢?答案肯定是有的。下面大概说一下这个功能
    废话不多说先上图预览下,即本博客的分页; 这个分页类是在thinkphp框架内置的分页类的基础上修改而来,原分页类的一些设计,在实际运用中感觉不是很方便;
    在php中截取字符串的函数有很多,而在thinkphp中也可以直接使用php的函数,本文给大家简单的介绍thinkPHP模板中截取字符串的具体用法,希望能对各位有所帮助。
    thinkphp开发图片上传,图片异步上传是目前比较方便的功能,这里我就不写css文件了,将代码写出来。
    配置数据库:/app/Common/Conf/config.php 方法一: // 添加数据库配置信息 \'DB_TYPE\'   => \'mysql\',// 数据库类型
    /app/Home/Controller/IndexController.class.php
    (1)创建数据表: CREATE TABLE IF NOT EXISTS `think_data` (   `id` int(8) unsigned NOT NULL AUTO_INCREMENT,
    (1)控制器设置:/app/Home/Controller/IndexController.class.php <?php namespace HomeController; use ThinkController;
    (1)普通模式 http://localhost/index.php?m=module&a=action&var=value m参数表示模块,a操作表示操作(模块和操作的URL参数名称是可以配置的),后面的表示其他GET参数。
    入库的时候用htmlspecialchars()处理含有html代码的内容 输出的时候用htmlspecialchars_decode()处理含有html代码的内容
    <?php define(\'APP_NAME\',\'app\'); define(\'APP_PATH\',\'./app/\'); define(\'APP_DEBUG\',TRUE); // 开启调试模式
    (1)创建控制器中定义read方法:/App/Home/Controller/FormController.class.php public function read($id=0){
    一、实现不同字段相同的查询条件 $User = M(\"User\"); // 实例化User对象 $map[\'name|title\'] = \'thinkphp\';
    如果你的数据完全是内部操作写入而不是通过表单的话(也就是说可以充分信任数据的安全),那么可以直接使用add方法,如:
    查询表达式的使用格式: $map[\'字段名\'] = array(\'表达式\',\'查询条件\'); 表达式不分大小写,支持的查询表达式有下面几种,分别表示的含义是:
    一、使用字符串作为查询条件 $User = M(\"User\"); // 实例化User对象 $User->where(\'type=1 AND status=1\')->select();