PHP+Redis实现轻量级消息队列

Wednesday, March 7, 2018

概述

业务实现过程中,即便没有高并发与大流量,业务的解耦与异步化也是需要考虑实现的,此时MQ就显得很重要,中小型业务开发中,RabbitMQ就显得过重,这种业务下需要的就是一个轻量级的MQ,此时用Redis就刚刚好.

流程

基于Redis的轻量级的MQ用到了Redis的三个特性:

  • List存储类型
  • BRLPOP或者BLPOP操作List
  • 发布/订阅模式

大体流程是:主业务完成操作后写入消息到MQ,此时并发布消息到指定Channel,而之前已经订阅了该Channel的Worker收到消息后就去MQ获取并消费消息,如下图:

程序实现

主业务

主业务主要功能是接收两个参数:姓名和手机号,将这两个参数作为用户信息写入用户表,写入成功后插入一条消息到消息队列register_users,并且发送消息OK到频道register_success,主要代码如下:

<?php
require './lib.php';
$name = $argv[1];
$mobile = $argv[2];
if(empty($name) || empty($mobile)){
    die("参数错误");
}
$connection = getDBConnection('127.0.0.1', 'homestead', 'secret', 'mq');
// 开启事务
mysqli_begin_transaction($connection);
$sql = "insert into mq_user(name, mobile) values ('$name', '$mobile')";
if(!mysqli_query($connection, $sql)){
    die("写入用户信息失败,原因:".$connection->error);
}
$redis = getRedis();
// 添加消息
$result = $redis->lpush('register_users', json_encode(array('name'=>$name, 'mobile'=>$mobile), JSON_UNESCAPED_UNICODE));
if($result === false){
    mysqli_rollback($connection);
    die("添加消息队列失败");
}
// 发布消息
$redis->publish('register_success', 'ok');
// 所有操作完成后提交事务
mysqli_commit($connection);
$connection->close();
$redis->close();

Worker

Worker的主要功能是订阅频道register_success,如果收到消息且消息内容是OK就从队列register_users获取并消费消息,主要代码如下:

<?php
require './lib.php';
$redis = getRedis();
$redis->subscribe(['register_success'], function ($instance, $channelName, $message){
    if($channelName == "register_success" && $message = "ok"){
        $redis = getRedis();
        $arr = $redis->brPop(['register_users'], 20);
        if(count($arr)){
            $userInfo = json_decode($arr[1], true);
            stdout("新注册用户信息:");
            stdout("姓名:".$userInfo['name']);
            stdout("手机号:".$userInfo['mobile']);
            stdout();
        }
    }
});

完整代码访问php-redis-mq

运行结果

MessageQueue MessageQueue PHP Redis

消息队列Redis事务