原创的amqp原生封装rabbitmq框架代码示例,一对多发送任务消息,开箱即用、也可以在php框架里面使用
可以兼容thinkphp、laravel、ci等任何框架
代码使用与学习跳转github地址、打开链接有使用说明!!
https://github.com/linsonggao/php-rabbitmq-bus
这里写目录标题
- 1.核心代码、监听消息、发送消息
- 2.交换机绑定关系相关代码
- 3.rabbit交换机规则class
- 4.交换机队列枚举类
- 5.监听代码
- 6.测试发送消息demo代码
1.核心代码、监听消息、发送消息
<?php
namespace PhpRabbitMq\Lib\MessageBus\base;
use PhpRabbitMq\Lib\Instance;
use PhpRabbitMq\Lib\MessageBus\BusPassenger;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Connection\AMQPStreamConnection;
class AmqpBus extends AbstractBus implements BusInterface
{
use Instance;
/**
* 启动
* @return \think\Response
*/
public function start($bus_name)
{
//dump("ensureBus");
$param = config('rabbitmq.AMQP');
//$amqpDetail = config('rabbitmq.direct_queue');
$connection = new AMQPStreamConnection(
$param['host'],
$param['port'],
$param['login'],
$param['password'],
$param['vhost'],
);
//dump("ensureBus");
/*
* 创建通道
*/
$channel = $connection->channel();
/*
* 设置消费者(Consumer)客户端同时只处理一条队列
* 这样是告诉RabbitMQ,再同一时刻,不要发送超过1条消息给一个消费者(Consumer),
* 直到它已经处理了上一条消息并且作出了响应。这样,RabbitMQ就会把消息分发给下一个空闲的消费者(Consumer)。
*/
//dump("ensureBus");
$channel->basic_qos(0, 1, false);
/*
* 同样是创建路由和队列,以及绑定路由队列,注意要跟publisher的一致
* 这里其实可以不用,但是为了防止队列没有被创建所以做的容错处理
*/
var_dump("ensureBus");
$this->ensureBus($channel, $bus_name);
//$channel->queue_bind($amqpDetail['queue_name'], $amqpDetail['exchange_name'],$amqpDetail['route_key']);
/*
queue: 从哪里获取消息的队列
consumer_tag: 消费者标识符,用于区分多个客户端
no_local: 不接收此使用者发布的消息
no_ack: 设置为true,则使用者将使用自动确认模式。详情请参见.
自动ACK:消息一旦被接收,消费者自动发送ACK
手动ACK:消息接收后,不会发送ACK,需要手动调用
exclusive:是否排他,即这个队列只能由一个消费者消费。适用于任务不允许进行并发处理的情况下
nowait: 不返回执行结果,但是如果排他开启的话,则必须需要等待结果的,如果两个一起开就会报错
callback: :回调逻辑处理函数,PHP回调 array($this, 'process_message') 调用本对象的process_message方法
*/
$noAck = $options['no_ack'] ?? true;
//消费队列~
$busInfo = BusPassenger::instance()->getBusInfo($bus_name);
//var_dump($busInfo);
foreach ($busInfo['passengers'] as $queue_name) {
//$this->queue_name = $queue_name;
//var_dump($queue_name);
$channel->basic_consume($queue_name, '', false, $noAck, false, false, function (AMQPMessage $msg) use ($queue_name) {
$messageData = json_decode($msg->body, true);
//执行对应的class即可
//todo.
//var_dump($queue_name);
$handler = BusPassenger::Instance()->getWork()[$queue_name]['class'];
try {
/**@var AbstractWork $handler**/
$handler::Instance($handler)->handler($messageData);
} catch (\Throwable $e) {
var_dump("消息消费错误信息:".$e->getMessage());
};
});
}
register_shutdown_function(array($this, 'shutdown'), $channel, $connection);
// AMQP 队列的轮询时长
$waitSeconds = $options['wait_seconds'] ?? 15;
try {
$channel->consume($waitSeconds);
} catch (AMQPConnectionClosedException $e