think-queue消息队列配合Supervisor

流氓凡 技术分享 2021-04-18 2.66 K 0

关于消息队列应用场景自行百度吧。另外,ThinkPHP > 5.1版本的可以直接使用think-queue2.0版本,具体教程参考:https://gitee.com/wslmf/notes/tree/master/thinkphp-queue

准备

    我们使用ThinkPHP5.0框架作为基础框架,因此安装think-queue版本就需要指定到1.6版本了

composer require topthink/think-queue 1.1.6

   然后需要在服务器安装redis和php的redis扩展

配置

    上诉think-queue安装后会在生成一个extra的目录,这里面包含queue.php也就是消息队列的配置,文件路径: application/extra/queue.php

return [
//    'connector' => 'Sync'
    'connector'  => 'Redis',        // Redis 驱动
    'expire'     => 60,        // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
    'default'    => 'default',        // 默认的队列名称
    'host'       => '127.0.0.1',    // redis 主机ip
    'port'       => 6379,        // redis 端口
    'password'   => '',        // redis 密码
    'select'     => 1,        // 使用哪一个 db,默认为 db0
    'timeout'    => 0,        // redis连接的超时时间
    'persistent' => false,        // 是否是长连接
];

代码

创建一个控制器,执行队列里的任务,例如创建一个app\message\controller\DoJob.php。

namespace app\message\controller;
use think\Db;
use think\Queue\Job;

class DoJob{


    /**
     * fire方法是消息队列默认调用的方法
     * @param Job $job 当前的任务对象
     * @param $data
     * @return int
     */
    public function fire(Job $job,$data){
        //这里$data定义格式为(也就是push队列的时候传递过来的参数):$data = [ 'type'=>1, 'data_id' => 123,'ts' => time()]
        if(empty($data)){
            $job->delete();
            return 0;
        }
        // 有些消息在到达消费者时,可能已经不再需要执行了
        $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
        if(!$isJobStillNeedToBeDone){
            $job->delete();
            return 0;
        }
        
        if(is_array($data) && isset($data['type'])){
            $type = $data['type'];
            if($type == 1){
                //执行发送邮件业务
                $isJobDone = $this->sendEmail($data['data_id']);
            }else if($type == 2){
                //执行APP推送消息业务
                $isJobDone = $this->sendAppMessage($data['data_id']);
            }else if($type == 3){
                //执行订单业务
                $isJobDone = $this->orderService($data['data_id']);
            }else{
                return 0;
            }
        }else{
            return 0;
        }
        
        if ($isJobDone) {
            // 如果任务执行成功,删除任务
            $job->delete();
        }else{
            if ($job->attempts() > 3) {
                //通过这个方法可以检查这个任务已经重试了几次了
                $job->delete();
                // 也可以重新发布这个任务
                //$job->release(2); //$delay为延迟时间,表示该任务延迟2秒后再执行
            }
        }
    }
    
    // 检查是否需要继续执行任务
    private function checkDatabaseToSeeIfJobNeedToBeDone($data){
        return true;
    }
    


    //发App消息业务
    private function sendAppMessage($id){

    }

    //处理订单业务
    private function orderService($id){

    }
    
}

发布任务到队列中,创建一个测试代码,例如创建一个app\message\controller\Addjob.php:

namespace app\message\controller;

use think\Controller;
use think\Queue;

class Addjob extends Controller{

  public function index()
  {
       // 定义要传递给队列的数据(类型是数组)
        $jobData = [ 'type'=>1, 'data_id' => 12,'ts' => time()] ;
        // 将该任务推送到消息队列,等待对应的消费者去执行
        $isPushed = Queue::push('app\message\controller\DoJob' , $jobData , 'JobQueue');
        if( $isPushed !== false ){
            return '消息已发出';
        }else{
            return '消息发送出错';
        }
    }
}

我们直接访问上方的测试代码控制器,http://you domain/index.php?s=/message/addjob/index ,会出现消息已发送提示,这时候表示队列已经添加ok,我们使用命令监听下

cd指定到ThinkPHP框架根目录下,使用think进行监听执行

# 测试监听
php queue:listen --queue JobQueue &

# 实际生产中使用下方命令进行后台监听
nohup php think queue:listen --queue JobQueue 2>&1 &

保活(2021年04月18日10:29:25更新)

我们可以选择使用Supervisor或者宝塔面板自带的堡塔应用管理器,下面将分别说明使用方法,在这前提我们要确定保活的指令command

选择采用work加循环的参数模式(单进程多线程,需确保自己队列业务处理逻辑正确,同时设置了队列单任务超时时间)

如果是宝塔用户推荐使用堡塔应用管理器进行保活监控(宝塔版本6.x及以上)

使用堡塔应用管理器

image.png

使用Supervisor进行保活

首先是安装它

yum install epel-release
yum install -y supervisor
//设置开机自动启动
systemctl enable supervisord

找到它的配置文件并替换成下面的示例:

vi /etc/supervisor.conf
; Sample supervisor config file.
;
; For more information on the config file, please see:
; http://supervisord.org/configuration.html
;
; Notes:
;  - Shell expansion ("~" or "$HOME") is not supported.  Environment
;    variables can be expanded using this syntax: "%(ENV_HOME)s".
;  - Quotes around values are not supported, except in the case of
;    the environment= options as shown below.
;  - Comments must have a leading space: "a=b ;comment" not "a=b;comment".
;  - Command will be truncated if it looks like a config file comment, e.g.
;    "command=bash -c 'foo ; bar'" will truncate to "command=bash -c 'foo ".
;
; Warning:
;  Paths throughout this example file use /tmp because it is available on most
;  systems.  You will likely need to change these to locations more appropriate
;  for your system.  Some systems periodically delete older files in /tmp.
;  Notably, if the socket file defined in the [unix_http_server] section below
;  is deleted, supervisorctl will be unable to connect to supervisord.
 
[unix_http_server]
file=/tmp/supervisor.sock   ; the path to the socket file
;chmod=0700                 ; socket file mode (default 0700)
;chown=nobody:nogroup       ; socket file uid:gid owner
;username=user              ; default is no username (open server)
;password=123               ; default is no password (open server)
 
; Security Warning:
;  The inet HTTP server is not enabled by default.  The inet HTTP server is
;  enabled by uncommenting the [inet_http_server] section below.  The inet
;  HTTP server is intended for use within a trusted environment only.  It
;  should only be bound to localhost or only accessible from within an
;  isolated, trusted network.  The inet HTTP server does not support any
;  form of encryption.  The inet HTTP server does not use authentication
;  by default (see the username= and password= options to add authentication).
;  Never expose the inet HTTP server to the public internet.
 
;[inet_http_server]         ; inet (TCP) server disabled by default
;port=127.0.0.1:9001        ; ip_address:port specifier, *:port for all iface
;username=user              ; default is no username (open server)
;password=123               ; default is no password (open server)
 
[supervisord]
logfile=/tmp/supervisord.log ; main log file; default $CWD/supervisord.log
logfile_maxbytes=50MB        ; max main logfile bytes b4 rotation; default 50MB
logfile_backups=10           ; # of main logfile backups; 0 means none, default 10
loglevel=info                ; log level; default info; others: debug,warn,trace
pidfile=/tmp/supervisord.pid ; supervisord pidfile; default supervisord.pid
nodaemon=false               ; start in foreground if true; default false
silent=false                 ; no logs to stdout if true; default false
minfds=1024                  ; min. avail startup file descriptors; default 1024
minprocs=200                 ; min. avail process descriptors;default 200
;umask=022                   ; process file creation umask; default 022
;user=supervisord            ; setuid to this UNIX account at startup; recommended if root
;identifier=supervisor       ; supervisord identifier, default is 'supervisor'
;directory=/tmp              ; default is not to cd during start
;nocleanup=true              ; don't clean up tempfiles at start; default false
;childlogdir=/tmp            ; 'AUTO' child log dir, default $TEMP
;environment=KEY="value"     ; key value pairs to add to environment
;strip_ansi=false            ; strip ansi escape codes in logs; def. false
 
; The rpcinterface:supervisor section must remain in the config file for
; RPC (supervisorctl/web interface) to work.  Additional interfaces may be
; added by defining them in separate [rpcinterface:x] sections.
 
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
 
; The supervisorctl section configures how supervisorctl will connect to
; supervisord.  configure it match the settings in either the unix_http_server
; or inet_http_server section.
 
[supervisorctl]
serverurl=unix:///tmp/supervisor.sock ; use a unix:// URL  for a unix socket
;serverurl=http://127.0.0.1:9001 ; use an http:// url to specify an inet socket
;username=chris              ; should be same as in [*_http_server] if set
;password=123                ; should be same as in [*_http_server] if set
;prompt=mysupervisor         ; cmd line prompt (default "supervisor")
;history_file=~/.sc_history  ; use readline history if available
 
; The sample program section below shows all possible program subsection values.
; Create one or more 'real' program: sections to be able to control them under
; supervisor.
 
;[program:theprogramname]
;command=/bin/cat              ; the program (relative uses PATH, can take args)
;process_name=%(program_name)s ; process_name expr (default %(program_name)s)
;numprocs=1                    ; number of processes copies to start (def 1)
;directory=/tmp                ; directory to cwd to before exec (def no cwd)
;umask=022                     ; umask for process (default None)
;priority=999                  ; the relative start priority (default 999)
;autostart=true                ; start at supervisord start (default: true)
;startsecs=1                   ; # of secs prog must stay up to be running (def. 1)
;startretries=3                ; max # of serial start failures when starting (default 3)
;autorestart=unexpected        ; when to restart if exited after running (def: unexpected)
;exitcodes=0                   ; 'expected' exit codes used with autorestart (default 0)
;stopsignal=QUIT               ; signal used to kill process (default TERM)
;stopwaitsecs=10               ; max num secs to wait b4 SIGKILL (default 10)
;stopasgroup=false             ; send stop signal to the UNIX process group (default false)
;killasgroup=false             ; SIGKILL the UNIX process group (def false)
;user=chrism                   ; setuid to this UNIX account to run the program
;redirect_stderr=true          ; redirect proc stderr to stdout (default false)
;stdout_logfile=/a/path        ; stdout log path, NONE for none; default AUTO
;stdout_logfile_maxbytes=1MB   ; max # logfile bytes b4 rotation (default 50MB)
;stdout_logfile_backups=10     ; # of stdout logfile backups (0 means none, default 10)
;stdout_capture_maxbytes=1MB   ; number of bytes in 'capturemode' (default 0)
;stdout_events_enabled=false   ; emit events on stdout writes (default false)
;stdout_syslog=false           ; send stdout to syslog with process name (default false)
;stderr_logfile=/a/path        ; stderr log path, NONE for none; default AUTO
;stderr_logfile_maxbytes=1MB   ; max # logfile bytes b4 rotation (default 50MB)
;stderr_logfile_backups=10     ; # of stderr logfile backups (0 means none, default 10)
;stderr_capture_maxbytes=1MB   ; number of bytes in 'capturemode' (default 0)
;stderr_events_enabled=false   ; emit events on stderr writes (default false)
;stderr_syslog=false           ; send stderr to syslog with process name (default false)
;environment=A="1",B="2"       ; process environment additions (def no adds)
;serverurl=AUTO                ; override serverurl computation (childutils)
 
; The sample eventlistener section below shows all possible eventlistener
; subsection values.  Create one or more 'real' eventlistener: sections to be
; able to handle event notifications sent by supervisord.
 
;[eventlistener:theeventlistenername]
;command=/bin/eventlistener    ; the program (relative uses PATH, can take args)
;process_name=%(program_name)s ; process_name expr (default %(program_name)s)
;numprocs=1                    ; number of processes copies to start (def 1)
;events=EVENT                  ; event notif. types to subscribe to (req'd)
;buffer_size=10                ; event buffer queue size (default 10)
;directory=/tmp                ; directory to cwd to before exec (def no cwd)
;umask=022                     ; umask for process (default None)
;priority=-1                   ; the relative start priority (default -1)
;autostart=true                ; start at supervisord start (default: true)
;startsecs=1                   ; # of secs prog must stay up to be running (def. 1)
;startretries=3                ; max # of serial start failures when starting (default 3)
;autorestart=unexpected        ; autorestart if exited after running (def: unexpected)
;exitcodes=0                   ; 'expected' exit codes used with autorestart (default 0)
;stopsignal=QUIT               ; signal used to kill process (default TERM)
;stopwaitsecs=10               ; max num secs to wait b4 SIGKILL (default 10)
;stopasgroup=false             ; send stop signal to the UNIX process group (default false)
;killasgroup=false             ; SIGKILL the UNIX process group (def false)
;user=chrism                   ; setuid to this UNIX account to run the program
;redirect_stderr=false         ; redirect_stderr=true is not allowed for eventlisteners
;stdout_logfile=/a/path        ; stdout log path, NONE for none; default AUTO
;stdout_logfile_maxbytes=1MB   ; max # logfile bytes b4 rotation (default 50MB)
;stdout_logfile_backups=10     ; # of stdout logfile backups (0 means none, default 10)
;stdout_events_enabled=false   ; emit events on stdout writes (default false)
;stdout_syslog=false           ; send stdout to syslog with process name (default false)
;stderr_logfile=/a/path        ; stderr log path, NONE for none; default AUTO
;stderr_logfile_maxbytes=1MB   ; max # logfile bytes b4 rotation (default 50MB)
;stderr_logfile_backups=10     ; # of stderr logfile backups (0 means none, default 10)
;stderr_events_enabled=false   ; emit events on stderr writes (default false)
;stderr_syslog=false           ; send stderr to syslog with process name (default false)
;environment=A="1",B="2"       ; process environment additions
;serverurl=AUTO                ; override serverurl computation (childutils)
 
; The sample group section below shows all possible group values.  Create one
; or more 'real' group: sections to create "heterogeneous" process groups.
 
;[group:thegroupname]
;programs=progname1,progname2  ; each refers to 'x' in [program:x] definitions
;priority=999                  ; the relative start priority (default 999)
 
; The [include] section can just contain the "files" setting.  This
; setting can list multiple files (separated by whitespace or
; newlines).  It can also contain wildcards.  The filenames are
; interpreted as relative to this file.  Included files *cannot*
; include files themselves.
 
[include]
files = /etc/supervisord.d/*.ini

在/etc/supervisord.d目录下创建一个queue.ini文件,文件内容如下

[program:queue] ; 程序名称,在 supervisorctl 中通过这个值来对程序进行一系列的操作
autorestart=True      ; 程序异常退出后自动重启
autostart=True        ; 在 supervisord 启动的时候也自动启动
redirect_stderr=True  ; 把 stderr 重定向到 stdout,默认 false
user=root           ; 用哪个用户启动
command=php /opt/lampp/htdocs/tp6/think queue:work --queue JobQueue --daemon ; 保活监控的命令
stdout_logfile_maxbytes = 2MB  ; stdout 日志文件大小,默认 50MB
stdout_logfile_backups = 2     ; stdout 日志文件备份数
; stdout 日志文件,需要注意当指定目录不存在时无法正常启动,所以需要手动创建目录(supervisord 会自动创建日志文件)
stdout_logfile = /run/log/queue.log

全部配置好之后我们启动它

supervisord -c /etc/supervisord.conf

如果启动报错如下:

$ supervisord -c supervisord.conf
Error: Another program is already listening on a port that one of our HTTP servers is configured to use.  Shut this program down first before starting supervisord.
For help, use /usr/bin/supervisord -h

我们需要执行下方命令,然后再次重新启动即可

unlink /tmp/supervisor.sock

当增加新的配置或者修改配置时,可以使用下面命令更新

supervisorctl update

最后查看下 状态吧

supervisorctl status
//返回信息如下,可以看到设置的queue进程已经启动
queue                            RUNNING   pid 96906, uptime 0:06:19
自写保活脚本

此脚本每间隔10秒检查一次,自行添加开启自启

#!/bin/bash

work_path=/www/wwwroot/domain.com

while true
do
	count=`ps -ef|grep JobQueue|grep -v grep |  wc -l`
	if [ 6 -gt $count ]; then
		echo "start JobQueue ..."
		cd $work_path/addons/burst_shop/source && nohup php think queue:listen --queue JobQueue 2>&1 &
		cd $work_path/addons/burst_shop/source && nohup php think queue:listen --queue JobQueue 2>&1 &
		cd $work_path/addons/burst_shop/source && nohup php think queue:listen --queue JobQueue 2>&1 &
		cd $work_path/addons/burst_shop/source && nohup php think queue:listen --queue JobQueue 2>&1 &
		cd $work_path/addons/burst_shop/source && nohup php think queue:listen --queue JobQueue 2>&1 &
		cd $work_path/addons/burst_shop/source && nohup php think queue:listen --queue JobQueue 2>&1 &
		cd $work_path/addons/burst_shop/source && nohup php think queue:listen --queue JobQueue 2>&1 &
		sleep 1
	else
		sleep 10
	fi

done

调试日志

在DoJob文件中使用Log::write('这是调试日志','info'),此方式进行日志打印会发现很容易分辨出来,在runtime/log 目录下会有一个 日期_cli.log的文件,比如:09_cli.log 这里面就全部是消息队列打出来的日志信息了,与其他日志区分了文件还是比较方便看的。

其他

在消息队列的逻辑处理中,一旦处理完毕需要记得自己删除任务,否则可能会一直执行下去来消耗内存。关于listen和work的区别看下:work和listen模式的区别,进行查看。

参考

packagist官网:https://packagist.org/packages/topthink/think-queue#v1.1.6

github:https://github.com/top-think/think-queue


评论