think-queue消息队列配合Supervisor
关于消息队列应用场景自行百度吧。另外,ThinkPHP > 5.1版本的可以直接使用think-queue2.0版本,具体教程参考:https://gitee.com/wslmf/notes/tree/master/thinkphp-queue
准备
我们使用ThinkPHP5.0框架作为基础框架,因此安装think-queue版本就需要指定到1.6版本了
composer require topthink/think-queue 1.1.6
然后需要在服务器安装redis和php的redis扩展
配置
上诉think-queue安装后会在生成一个extra的目录,这里面包含queue.php也就是消息队列的配置,文件路径: application/extra/queue.php
return [ // 'connector' => 'Sync' 'connector' => 'Redis', // Redis 驱动 'expire' => 60, // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null 'default' => 'default', // 默认的队列名称 'host' => '127.0.0.1', // redis 主机ip 'port' => 6379, // redis 端口 'password' => '', // redis 密码 'select' => 1, // 使用哪一个 db,默认为 db0 'timeout' => 0, // redis连接的超时时间 'persistent' => false, // 是否是长连接 ];
代码
创建一个控制器,执行队列里的任务,例如创建一个app\message\controller\DoJob.php。
namespace app\message\controller; use think\Db; use think\Queue\Job; class DoJob{ /** * fire方法是消息队列默认调用的方法 * @param Job $job 当前的任务对象 * @param $data * @return int */ public function fire(Job $job,$data){ //这里$data定义格式为(也就是push队列的时候传递过来的参数):$data = [ 'type'=>1, 'data_id' => 123,'ts' => time()] if(empty($data)){ $job->delete(); return 0; } // 有些消息在到达消费者时,可能已经不再需要执行了 $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data); if(!$isJobStillNeedToBeDone){ $job->delete(); return 0; } if(is_array($data) && isset($data['type'])){ $type = $data['type']; if($type == 1){ //执行发送邮件业务 $isJobDone = $this->sendEmail($data['data_id']); }else if($type == 2){ //执行APP推送消息业务 $isJobDone = $this->sendAppMessage($data['data_id']); }else if($type == 3){ //执行订单业务 $isJobDone = $this->orderService($data['data_id']); }else{ return 0; } }else{ return 0; } if ($isJobDone) { // 如果任务执行成功,删除任务 $job->delete(); }else{ if ($job->attempts() > 3) { //通过这个方法可以检查这个任务已经重试了几次了 $job->delete(); // 也可以重新发布这个任务 //$job->release(2); //$delay为延迟时间,表示该任务延迟2秒后再执行 } } } // 检查是否需要继续执行任务 private function checkDatabaseToSeeIfJobNeedToBeDone($data){ return true; } //发App消息业务 private function sendAppMessage($id){ } //处理订单业务 private function orderService($id){ } }
发布任务到队列中,创建一个测试代码,例如创建一个app\message\controller\Addjob.php:
namespace app\message\controller; use think\Controller; use think\Queue; class Addjob extends Controller{ public function index() { // 定义要传递给队列的数据(类型是数组) $jobData = [ 'type'=>1, 'data_id' => 12,'ts' => time()] ; // 将该任务推送到消息队列,等待对应的消费者去执行 $isPushed = Queue::push('app\message\controller\DoJob' , $jobData , 'JobQueue'); if( $isPushed !== false ){ return '消息已发出'; }else{ return '消息发送出错'; } } }
我们直接访问上方的测试代码控制器,http://you domain/index.php?s=/message/addjob/index ,会出现消息已发送提示,这时候表示队列已经添加ok,我们使用命令监听下
cd指定到ThinkPHP框架根目录下,使用think进行监听执行
# 测试监听(work模式) php queue:work --queue JobQueue 或者 (listen模式,如果不想输出日志可加 -q参数) think queue:listen --queue JobQueue # 每次修改队列代码需重启队列 php think queue:restart
简述下work和listen在thinkphp框架中明显区别
1、work模式只能跑一条队列数据,如果想持续运行必须加 --daemon 参数;
2、work模式速度和性能更快,每次执行无需再加载框架所有文件,因此如果想修改队列代码必须重启队列,listen模式每次执行都会重新加载thinkphp文件,相当于用户访问一个接口一样;
3、work模式因为无需重复加载框架文件,因此也会带来MySQL链接超时的烦恼,目前thinkphp5.x全系版本无法解决此问题,所以如果队列中有操作数据库的不建议使用此模式;
保活(2021年04月18日10:29:25更新)
我们可以选择使用Supervisor或者宝塔面板自带的堡塔应用管理器,下面将分别说明使用方法,在这前提我们要确定保活的指令command
选择采用work加循环的参数模式(单进程多线程,需确保自己队列业务处理逻辑正确,同时设置了队列单任务超时时间)
如果是宝塔用户推荐使用堡塔应用管理器进行保活监控(宝塔版本6.x及以上)
使用堡塔应用管理器
使用Supervisor进行保活
【推荐】使用Supervisor管理器(宝塔插件)进行保活
自己安装Supervisor进行保活:
首先是安装它
yum install epel-release yum install -y supervisor //设置开机自动启动 systemctl enable supervisord
找到它的配置文件并替换成下面的示例:
vi /etc/supervisor.conf
; Sample supervisor config file. ; ; For more information on the config file, please see: ; http://supervisord.org/configuration.html ; ; Notes: ; - Shell expansion ("~" or "$HOME") is not supported. Environment ; variables can be expanded using this syntax: "%(ENV_HOME)s". ; - Quotes around values are not supported, except in the case of ; the environment= options as shown below. ; - Comments must have a leading space: "a=b ;comment" not "a=b;comment". ; - Command will be truncated if it looks like a config file comment, e.g. ; "command=bash -c 'foo ; bar'" will truncate to "command=bash -c 'foo ". ; ; Warning: ; Paths throughout this example file use /tmp because it is available on most ; systems. You will likely need to change these to locations more appropriate ; for your system. Some systems periodically delete older files in /tmp. ; Notably, if the socket file defined in the [unix_http_server] section below ; is deleted, supervisorctl will be unable to connect to supervisord. [unix_http_server] file=/tmp/supervisor.sock ; the path to the socket file ;chmod=0700 ; socket file mode (default 0700) ;chown=nobody:nogroup ; socket file uid:gid owner ;username=user ; default is no username (open server) ;password=123 ; default is no password (open server) ; Security Warning: ; The inet HTTP server is not enabled by default. The inet HTTP server is ; enabled by uncommenting the [inet_http_server] section below. The inet ; HTTP server is intended for use within a trusted environment only. It ; should only be bound to localhost or only accessible from within an ; isolated, trusted network. The inet HTTP server does not support any ; form of encryption. The inet HTTP server does not use authentication ; by default (see the username= and password= options to add authentication). ; Never expose the inet HTTP server to the public internet. ;[inet_http_server] ; inet (TCP) server disabled by default ;port=127.0.0.1:9001 ; ip_address:port specifier, *:port for all iface ;username=user ; default is no username (open server) ;password=123 ; default is no password (open server) [supervisord] logfile=/tmp/supervisord.log ; main log file; default $CWD/supervisord.log logfile_maxbytes=50MB ; max main logfile bytes b4 rotation; default 50MB logfile_backups=10 ; # of main logfile backups; 0 means none, default 10 loglevel=info ; log level; default info; others: debug,warn,trace pidfile=/tmp/supervisord.pid ; supervisord pidfile; default supervisord.pid nodaemon=false ; start in foreground if true; default false silent=false ; no logs to stdout if true; default false minfds=1024 ; min. avail startup file descriptors; default 1024 minprocs=200 ; min. avail process descriptors;default 200 ;umask=022 ; process file creation umask; default 022 ;user=supervisord ; setuid to this UNIX account at startup; recommended if root ;identifier=supervisor ; supervisord identifier, default is 'supervisor' ;directory=/tmp ; default is not to cd during start ;nocleanup=true ; don't clean up tempfiles at start; default false ;childlogdir=/tmp ; 'AUTO' child log dir, default $TEMP ;environment=KEY="value" ; key value pairs to add to environment ;strip_ansi=false ; strip ansi escape codes in logs; def. false ; The rpcinterface:supervisor section must remain in the config file for ; RPC (supervisorctl/web interface) to work. Additional interfaces may be ; added by defining them in separate [rpcinterface:x] sections. [rpcinterface:supervisor] supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface ; The supervisorctl section configures how supervisorctl will connect to ; supervisord. configure it match the settings in either the unix_http_server ; or inet_http_server section. [supervisorctl] serverurl=unix:///tmp/supervisor.sock ; use a unix:// URL for a unix socket ;serverurl=http://127.0.0.1:9001 ; use an http:// url to specify an inet socket ;username=chris ; should be same as in [*_http_server] if set ;password=123 ; should be same as in [*_http_server] if set ;prompt=mysupervisor ; cmd line prompt (default "supervisor") ;history_file=~/.sc_history ; use readline history if available ; The sample program section below shows all possible program subsection values. ; Create one or more 'real' program: sections to be able to control them under ; supervisor. ;[program:theprogramname] ;command=/bin/cat ; the program (relative uses PATH, can take args) ;process_name=%(program_name)s ; process_name expr (default %(program_name)s) ;numprocs=1 ; number of processes copies to start (def 1) ;directory=/tmp ; directory to cwd to before exec (def no cwd) ;umask=022 ; umask for process (default None) ;priority=999 ; the relative start priority (default 999) ;autostart=true ; start at supervisord start (default: true) ;startsecs=1 ; # of secs prog must stay up to be running (def. 1) ;startretries=3 ; max # of serial start failures when starting (default 3) ;autorestart=unexpected ; when to restart if exited after running (def: unexpected) ;exitcodes=0 ; 'expected' exit codes used with autorestart (default 0) ;stopsignal=QUIT ; signal used to kill process (default TERM) ;stopwaitsecs=10 ; max num secs to wait b4 SIGKILL (default 10) ;stopasgroup=false ; send stop signal to the UNIX process group (default false) ;killasgroup=false ; SIGKILL the UNIX process group (def false) ;user=chrism ; setuid to this UNIX account to run the program ;redirect_stderr=true ; redirect proc stderr to stdout (default false) ;stdout_logfile=/a/path ; stdout log path, NONE for none; default AUTO ;stdout_logfile_maxbytes=1MB ; max # logfile bytes b4 rotation (default 50MB) ;stdout_logfile_backups=10 ; # of stdout logfile backups (0 means none, default 10) ;stdout_capture_maxbytes=1MB ; number of bytes in 'capturemode' (default 0) ;stdout_events_enabled=false ; emit events on stdout writes (default false) ;stdout_syslog=false ; send stdout to syslog with process name (default false) ;stderr_logfile=/a/path ; stderr log path, NONE for none; default AUTO ;stderr_logfile_maxbytes=1MB ; max # logfile bytes b4 rotation (default 50MB) ;stderr_logfile_backups=10 ; # of stderr logfile backups (0 means none, default 10) ;stderr_capture_maxbytes=1MB ; number of bytes in 'capturemode' (default 0) ;stderr_events_enabled=false ; emit events on stderr writes (default false) ;stderr_syslog=false ; send stderr to syslog with process name (default false) ;environment=A="1",B="2" ; process environment additions (def no adds) ;serverurl=AUTO ; override serverurl computation (childutils) ; The sample eventlistener section below shows all possible eventlistener ; subsection values. Create one or more 'real' eventlistener: sections to be ; able to handle event notifications sent by supervisord. ;[eventlistener:theeventlistenername] ;command=/bin/eventlistener ; the program (relative uses PATH, can take args) ;process_name=%(program_name)s ; process_name expr (default %(program_name)s) ;numprocs=1 ; number of processes copies to start (def 1) ;events=EVENT ; event notif. types to subscribe to (req'd) ;buffer_size=10 ; event buffer queue size (default 10) ;directory=/tmp ; directory to cwd to before exec (def no cwd) ;umask=022 ; umask for process (default None) ;priority=-1 ; the relative start priority (default -1) ;autostart=true ; start at supervisord start (default: true) ;startsecs=1 ; # of secs prog must stay up to be running (def. 1) ;startretries=3 ; max # of serial start failures when starting (default 3) ;autorestart=unexpected ; autorestart if exited after running (def: unexpected) ;exitcodes=0 ; 'expected' exit codes used with autorestart (default 0) ;stopsignal=QUIT ; signal used to kill process (default TERM) ;stopwaitsecs=10 ; max num secs to wait b4 SIGKILL (default 10) ;stopasgroup=false ; send stop signal to the UNIX process group (default false) ;killasgroup=false ; SIGKILL the UNIX process group (def false) ;user=chrism ; setuid to this UNIX account to run the program ;redirect_stderr=false ; redirect_stderr=true is not allowed for eventlisteners ;stdout_logfile=/a/path ; stdout log path, NONE for none; default AUTO ;stdout_logfile_maxbytes=1MB ; max # logfile bytes b4 rotation (default 50MB) ;stdout_logfile_backups=10 ; # of stdout logfile backups (0 means none, default 10) ;stdout_events_enabled=false ; emit events on stdout writes (default false) ;stdout_syslog=false ; send stdout to syslog with process name (default false) ;stderr_logfile=/a/path ; stderr log path, NONE for none; default AUTO ;stderr_logfile_maxbytes=1MB ; max # logfile bytes b4 rotation (default 50MB) ;stderr_logfile_backups=10 ; # of stderr logfile backups (0 means none, default 10) ;stderr_events_enabled=false ; emit events on stderr writes (default false) ;stderr_syslog=false ; send stderr to syslog with process name (default false) ;environment=A="1",B="2" ; process environment additions ;serverurl=AUTO ; override serverurl computation (childutils) ; The sample group section below shows all possible group values. Create one ; or more 'real' group: sections to create "heterogeneous" process groups. ;[group:thegroupname] ;programs=progname1,progname2 ; each refers to 'x' in [program:x] definitions ;priority=999 ; the relative start priority (default 999) ; The [include] section can just contain the "files" setting. This ; setting can list multiple files (separated by whitespace or ; newlines). It can also contain wildcards. The filenames are ; interpreted as relative to this file. Included files *cannot* ; include files themselves. [include] files = /etc/supervisord.d/*.ini
在/etc/supervisord.d目录下创建一个queue.ini文件,文件内容如下
[program:queue] ; 程序名称,在 supervisorctl 中通过这个值来对程序进行一系列的操作 autorestart=True ; 程序异常退出后自动重启 autostart=True ; 在 supervisord 启动的时候也自动启动 redirect_stderr=True ; 把 stderr 重定向到 stdout,默认 false user=root ; 用哪个用户启动 command=php /opt/lampp/htdocs/tp6/think queue:work --queue JobQueue --daemon ; 保活监控的命令 stdout_logfile_maxbytes = 2MB ; stdout 日志文件大小,默认 50MB stdout_logfile_backups = 2 ; stdout 日志文件备份数 ; stdout 日志文件,需要注意当指定目录不存在时无法正常启动,所以需要手动创建目录(supervisord 会自动创建日志文件) stdout_logfile = /run/log/queue.log
全部配置好之后我们启动它
supervisord -c /etc/supervisord.conf
如果启动报错如下:
$ supervisord -c supervisord.conf Error: Another program is already listening on a port that one of our HTTP servers is configured to use. Shut this program down first before starting supervisord. For help, use /usr/bin/supervisord -h
我们需要执行下方命令,然后再次重新启动即可
unlink /tmp/supervisor.sock
当增加新的配置或者修改配置时,可以使用下面命令更新
supervisorctl update
最后查看下 状态吧
supervisorctl status //返回信息如下,可以看到设置的queue进程已经启动 queue RUNNING pid 96906, uptime 0:06:19
自写保活脚本
此脚本每间隔10秒检查一次,自行添加开启自启
#!/bin/bash work_path=/www/wwwroot/domain.com while true do count=`ps -ef|grep JobQueue|grep -v grep | wc -l` if [ 6 -gt $count ]; then echo "start JobQueue ..." cd $work_path/addons/burst_shop/source && nohup php think queue:listen --queue JobQueue 2>&1 & cd $work_path/addons/burst_shop/source && nohup php think queue:listen --queue JobQueue 2>&1 & cd $work_path/addons/burst_shop/source && nohup php think queue:listen --queue JobQueue 2>&1 & cd $work_path/addons/burst_shop/source && nohup php think queue:listen --queue JobQueue 2>&1 & cd $work_path/addons/burst_shop/source && nohup php think queue:listen --queue JobQueue 2>&1 & cd $work_path/addons/burst_shop/source && nohup php think queue:listen --queue JobQueue 2>&1 & cd $work_path/addons/burst_shop/source && nohup php think queue:listen --queue JobQueue 2>&1 & sleep 1 else sleep 10 fi done
调试日志
在DoJob文件中使用Log::write('这是调试日志','info'),此方式进行日志打印会发现很容易分辨出来,在runtime/log 目录下会有一个 日期_cli.log的文件,比如:09_cli.log 这里面就全部是消息队列打出来的日志信息了,与其他日志区分了文件还是比较方便看的。
其他
在消息队列的逻辑处理中,一旦处理完毕需要记得自己删除任务,否则可能会一直执行下去来消耗内存。关于listen和work的区别看下:work和listen模式的区别,进行查看。
参考
packagist官网:https://packagist.org/packages/topthink/think-queue#v1.1.6
github:https://github.com/top-think/think-queue
评论