think-queue消息队列

 admin   2021-02-09 21:53   181 人阅读  0 条评论

关于消息队列应用场景自行百度吧。另外,ThinkPHP > 5.1版本的可以直接使用think-queue2.0版本,具体教程参考:https://gitee.com/wslmf/notes/tree/master/thinkphp-queue

准备

    我们使用ThinkPHP5.0框架作为基础框架,因此安装think-queue版本就需要指定到1.6版本了

composer require topthink/think-queue 1.1.6

   然后需要在服务器安装redis和php的redis扩展

配置

    上诉think-queue安装后会在生成一个extra的目录,这里面包含queue.php也就是消息队列的配置,文件路径: application/extra/queue.php

return [
//    'connector' => 'Sync'
    'connector'  => 'Redis',        // Redis 驱动
    'expire'     => 60,        // 任务的过期时间,默认为60秒; 若要禁用,则设置为 null
    'default'    => 'default',        // 默认的队列名称
    'host'       => '127.0.0.1',    // redis 主机ip
    'port'       => 6379,        // redis 端口
    'password'   => '',        // redis 密码
    'select'     => 1,        // 使用哪一个 db,默认为 db0
    'timeout'    => 0,        // redis连接的超时时间
    'persistent' => false,        // 是否是长连接
];

代码

创建一个控制器,执行队列里的任务,例如创建一个app\message\controller\DoJob.php。

namespace app\message\controller;
use think\Db;
use think\Queue\Job;

class DoJob{


    /**
     * fire方法是消息队列默认调用的方法
     * @param Job $job 当前的任务对象
     * @param $data
     * @return int
     */
    public function fire(Job $job,$data){
        //这里$data定义格式为(也就是push队列的时候传递过来的参数):$data = [ 'type'=>1, 'data_id' => 123,'ts' => time()]
        if(empty($data)){
            $job->delete();
            return 0;
        }
        // 有些消息在到达消费者时,可能已经不再需要执行了
        $isJobStillNeedToBeDone = $this->checkDatabaseToSeeIfJobNeedToBeDone($data);
        if(!$isJobStillNeedToBeDone){
            $job->delete();
            return 0;
        }
        
        if(is_array($data) && isset($data['type'])){
            $type = $data['type'];
            if($type == 1){
                //执行发送邮件业务
                $isJobDone = $this->sendEmail($data['data_id']);
            }else if($type == 2){
                //执行APP推送消息业务
                $isJobDone = $this->sendAppMessage($data['data_id']);
            }else if($type == 3){
                //执行订单业务
                $isJobDone = $this->orderService($data['data_id']);
            }else{
                return 0;
            }
        }else{
            return 0;
        }
        
        if ($isJobDone) {
            // 如果任务执行成功,删除任务
            $job->delete();
        }else{
            if ($job->attempts() > 3) {
                //通过这个方法可以检查这个任务已经重试了几次了
                $job->delete();
                // 也可以重新发布这个任务
                //$job->release(2); //$delay为延迟时间,表示该任务延迟2秒后再执行
            }
        }
    }
    
    // 检查是否需要继续执行任务
    private function checkDatabaseToSeeIfJobNeedToBeDone($data){
        return true;
    }
    


    //发App消息业务
    private function sendAppMessage($id){

    }

    //处理订单业务
    private function orderService($id){

    }
    
}

发布任务到队列中,创建一个测试代码,例如创建一个app\message\controller\Addjob.php:

namespace app\message\controller;

use think\Controller;
use think\Queue;

class Addjob extends Controller{

  public function index()
  {
       // 定义要传递给队列的数据(类型是数组)
        $jobData = [ 'type'=>1, 'data_id' => 12,'ts' => time()] ;
        // 将该任务推送到消息队列,等待对应的消费者去执行
        $isPushed = Queue::push('app\message\controller\DoJob' , $jobData , 'JobQueue');
        if( $isPushed !== false ){
            return '消息已发出';
        }else{
            return '消息发送出错';
        }
    }
}

我们直接访问上方的测试代码控制器,http://you domain/index.php?s=/message/addjob/index ,会出现消息已发送提示,这时候表示队列已经添加ok,我们使用命令监听下

cd指定到ThinkPHP框架根目录下,使用think进行监听执行

# 测试监听
php queue:listen --queue JobQueue &

# 实际生产中使用下方命令进行后台监听
nohup php think queue:listen --queue JobQueue 2>&1 &

保活

如果经一段时间发现,无论怎么样退出客户端后linux还是无法继续监听执行queue的线程,那么我们可以使用定时任务进行保活监控

我们保活的话要使用queue:work 而不是使用listen,因为work是单线程的listen是多线程,如果服务器内容太小分分钟“爆炸”。

cd /www/wwwroot/baidu.com && nohup php think queue:work --queue JobQueue&

上面就是我们需要定时监控的脚本指令了,也可以使用自己的宝塔定时任务监控,监控频率1分钟!

调试日志

在DoJob文件中使用Log::write('这是调试日志','info'),此方式进行日志打印会发现很容易分辨出来,在runtime/log 目录下会有一个 日期_cli.log的文件,比如:09_cli.log 这里面就全部是消息队列打出来的日志信息了,与其他日志区分了文件还是比较方便看的。

其他

在消息队列的逻辑处理中,一旦处理完毕需要记得自己删除任务,否则可能会一直执行下去来消耗内存。关于listen和work的区别看下:work和listen模式的区别,进行查看。

参考

packagist官网:https://packagist.org/packages/topthink/think-queue#v1.1.6

github:https://github.com/top-think/think-queue


发表评论:


表情

还没有留言,还不快点抢沙发?