PHP-Resque实现轻量级消息队列

Wednesday, March 7, 2018

概述

Resque是一个基于Redis的处理后台作业的库,通过将多个作业放在一个或多个队列上进行后台处理.本身是由Ruby编写的,并且被用各种语言实现,原项目访问resque/resque ,php版本访问chrisboulton/php-resque .

角色划分

php-resque包含如下3种角色:

  • Job: 需要被执行的任务,例如发送邮件\销库存\配货发货等
  • Queue: 消息队列,基于Redis实现
  • Worker: 负责消费消息队列中的任务,需要以守护进程运行在后台

一个Worker可以处理一个或多个队列,并且可以通过增加Worker的进程/线程来加快处理速度.

注意事项

  • Job运行时,Worker会将参数组装以数组的方式设置到对象的属性,可用$this->args访问
  • 任何异常都会导致任务失败,所以需要特别注意下异常处理
  • Job除了perform方法外还拥有两个方法:setUptearDown,分别在perform前后执行.

具体实现

安装扩展

由于涉及到进程的开辟与管理,php-resque使用了php的PCNTL函数,所以只能在Linux下运行,并且需要php编译PCNTL函数,具体的运行环境为:

  • PHP 5.3+
  • Redis 2.2+
  • 推荐使用 Composer
vagrant@homestead:~/code/php-resque-mq$ composer require chrisboulton/php-resque -vvv

创建Job

将需要处理的任务编写为Job,即一个独立的Class,并且这个Class必须实现一个perform方法,Worker执行的时候会自动运行这个方法.

<?php
require './lib.php';

class SmsJob{

    private $name;

    private $mobile;

    /**
     * 预处理
     */
    public function setUp(){
        $this->name = $this->args['name'];
        $this->mobile = $this->args['mobile'];
        stdout('新注册用户信息:');
        stdout('姓名:'.$this->name);
        stdout('手机号:'.$this->mobile);
    }

    /**
     * 具体任务
     */
    public function perform(){
        for($i=0;$i<10;$i++){
            if($i === 0){
                stdout('开始执行具体任务');
            }
            stdout('发送短信中--第'.($i+1).'秒');
            if($i === 10){
                stdout('任务执行完毕');
            }
            sleep(1);
        }
    }

    /**
     * 后续处理
     */
    public function tearDown(){
        stdout('任务执行完毕');
        stdout();
    }
}

插入队列

  • 任务入队

使用Resque::enqueue函数将Job加入Queue,包含四个参数:

  • queue: 队列名称
  • jobClass: 任务Class
  • jobArgs: 任务参数
  • trackStatus: 为true时返回任务token,用于后续跟踪任务状态
<?php
require './lib.php';
require './SmsJob.php';
require './vendor/autoload.php';

$name = $argv[1];
$mobile = $argv[2];
if(empty($name) || empty($mobile)){
    die("参数错误");
}
$connection = getDBConnection('127.0.0.1', 'homestead', 'secret', 'mq');
$sql = "insert into mq_user(name, mobile) values ('$name', '$mobile')";
if(!mysqli_query($connection, $sql)){
    die("写入用户信息失败,原因:".$connection->error);
}
$connection->close();
$token = Resque::enqueue('sms', SmsJob::class, array('name'=>$name, 'mobile'=>$mobile),true);
echo 'Job Token: '.$token.chr(10);

查询任务状态

根据Job Token来获取作业状态,状态有四个:

  • Resque_Job_Status::STATUS_WAITING: 等待执行
  • Resque_Job_Status::STATUS_RUNNING: 正在运行
  • Resque_Job_Status::STATUS_FAILED: 执行失败
  • Resque_Job_Status::STATUS_COMPLETE: 执行完毕
<?php
require_once './lib.php';
require_once './vendor/autoload.php';

$token = $argv[1];
if(empty($token)){
    die("参数错误");
}
$obj = new Resque_Job_Status($token);
if(!$obj->isTracking()) {
    die("该任务没有设定状态跟踪");
}
while(true) {
    $status = $obj->get();
    $statusLabels = ['', '等待执行', '正在运行', '执行失败', '执行完毕'];
    stdout('任务['.$token.']的当前状态:'.$statusLabels[$status]);
    if($status == 4){
        break;
    }else{
        sleep(1);
    }
}

启动Worker

从Queue中取出任务并处理,当队列中有Job时,实例化Job对应的Class对象并执行perform中方法.

  • QUEUE: 要处理的队列,可以设定為 QUEUE=* 表示执行所有任务
  • LOGLEVEL: 日志等级 0:不记录 1:记录简单日志 2:记录详细日志
  • COUNT: 启动Worker的数量,默认是1
  • INTERVAL: Worker检查Queue的间隔,默认是5秒
  • PIDFILE: 如果是单个Woker,可以指定Pid文件
  • PREFIX: 前缀
  • REDIS_BACKEND: Redis的IP和端口,默认是127.0.0.1:6379

完整代码访问php-resque-mq

测试流程

  • 执行主业务并将Job插入队列
php register.php 小毛 18702124521
  • 查看任务状态
php status 26bf81f1943d1a940ffd68a287a7d16c
  • 启动Worker
QUEUE=sms LOGLEVEL=2 php worker.php
  • 测试结果

Resque Web

resque-web是用Ruby开发的Web管理界面,安装步骤:

  • 安装Ruby及Ruby Dev
vagrant@homestead:~/code$ sudo apt install ruby ruby-dev -y
  • 安装resque-web
vagrant@homestead:~/code$ sudo gem install resque-web -v 0.0.8
  • 启动resque-web
vagrant@homestead:~/code$ sudo resque-web -p 666
[2018-09-21 10:24:31 +0000] Starting 'resque-web'...
[2018-09-21 10:24:31 +0000] trying port 666...
  • 截图示例

MessageQueue MessageQueue PHP Redis Resque

Yii2-Queue实现轻量级消息队列消息队列