发布网友 发布时间:2024-10-01 20:32
共1个回答
热心网友 时间:2024-10-17 18:30
一、简介Point-to-Point,点对点通信模型。PTP是基于队列(Queue)的,一个队列可以有多个生产者,和多个消费者。消息服务器按照收到消息的先后顺序,将消息放到队列中。队列中的每一条消息,只能由一个消费者进行消费,消费之后就会从队列中移除。
特点:
每个消息只用一个消费者;
发送者和接受者没有时间依赖;
接受者确认消息接受和处理成功。
P 表示为生产者 、C 表示为消费者,红色表示队列。
在RabbitMQ中有生产者,消费者的概念,本篇主要是消息如何生产以及消费者这部分的实现。使用的laravel框架,php-amqplib拓展。
二、Laravel中添加依赖在项目根目录下执行一下命令
composer require php-amqplib/php-amqplib
lishuo@李硕的MacBook?Pro:~/Code/php/www.zfw.com?(branch:?master!)$?composer?require?php-amqplib/php-amqplibUsing?version?^3.1?for?php-amqplib/php-amqplib./composer.json?has?been?updatedRunning?composer?update?php-amqplib/php-amqplibLoading?composer?repositories?with?package?informationUpdating?dependenciesNothing?to?modify?in?lock?fileInstalling?dependencies?from?lock?file?(including?require-dev)Nothing?to?install,?update?or?removePackage?caouecs/laravel-lang?is?abandoned,?you?should?avoid?using?it.?Use?https://github.com/Laravel-Lang/lang?instead.Package?swiftmailer/swiftmailer?is?abandoned,?you?should?avoid?using?it.?Use?symfony/mailer?instead.Generating?optimized?autoload?files>?Illuminate\Foundation\ComposerScripts::postAutoloadDump>?@php?artisan?package:discover?--ansiDiscovered?Package:?barryvdh/laravel-ide-helperDiscovered?Package:?facade/ignitionDiscovered?Package:?fruitcake/laravel-corsDiscovered?Package:?jenssegers/mongodbDiscovered?Package:?laravel/passportDiscovered?Package:?laravel/sailDiscovered?Package:?laravel/sanctumDiscovered?Package:?laravel/tinkerDiscovered?Package:?maatwebsite/excelDiscovered?Package:?nesbot/carbonDiscovered?Package:?nunomaduro/collisionPackage?manifest?generated?successfully.100?packages?you?are?using?are?looking?for?funding.Use?the?`composer?fund`?command?to?find?out?more!>?@php?artisan?vendor:publish?--tag=laravel-assets?--ansiNo?publishable?resources?for?tag?[laravel-assets].Publishing?complete.三、使用Laravel的command来实现消息的生产和消费1.创建生产者执行以下命令快速创建生产者
php artisan make:command RabbitmqProducerCommand
lishuo@李硕的MacBook?Pro:~/Code/php/www.zfw.com?(branch:?master!)$?php?artisan?make:command?RabbitmqProducerCommandConsole?command?created?successfully.基本代码(接下来就在command里面写生产消息的逻辑)<?phpnamespace?App\Console\Commands;use?Illuminate\Console\Command;//引入amqp扩展use?PhpAmqpLib\Connection\AMQPStreamConnection;use?PhpAmqpLib\Message\AMQPMessage;class?RabbitmqProducerCommand?extends?Command{????/**?????*?The?name?and?signature?of?the?console?command.?????*?????*?@var?string?????*/????protected?$signature?=?'rabbitmq_producer';//给生产者起个command名称????/**?????*?The?console?command?description.?????*?????*?@var?string?????*/????protected?$description?=?'Command?description';????/**?????*?Create?a?new?command?instance.?????*?????*?@return?void?????*/????public?function?__construct()????{????????parent::__construct();????}????/**?????*?Execute?the?console?command.?????*??生产者消息代码?????*?@return?int?????*/????public?function?handle()????{????????//创建服务器连接????????$connection?=?new?AMQPStreamConnection('localhost',?5672,?'guest',?'guest');????????//连接信道????????//信道是生产消费者与rabbit通信的渠道,生产者publish或者消费者消费一个队列都是需要通过信道来通信的????????//信道是建立在TCP上面的虚拟链接,也就是rabbitMQ在一个TCP上面建立成百上千的信道来达到多个线程处理。????????//注意是一个TCP?被多个线程共享,每个线程对应一个信道,信道在rabbit都有唯一的ID,保证了信道的私有性,对应上唯一的线程使用。????????$channel?=?$connection->channel();????????//channel->queue_declare通过信道创建一个是否是持久化的消息队列????????//queue第一个参数代表消息队列名称????????$channel->queue_declare('test',?false,?false,?false,?false);????????//往队列里要发送内容,待发送的内容????????$msg?=?new?AMQPMessage('我是一个生产者消息');????????//通过信道来进行发送消息????????//而exchange是怎么知道消息应该推到哪个queue呢,这就要通过绑定queue与exchange时的routingkey了,通过代码进行绑定并且指定routingkey,下面有一张关系图,p(发布者)?—>?x(exchange)?bindding(绑定关系也就是我们的routingkey)?红色代表着queue????????$channel->basic_publish($msg,?'',?'test');????????echo?"?[x]?Sent?'我是一个生产者消息!'\n";????????//关闭信道????????$channel->close();????????//关闭连接????????$connection->close();????}}2.创建消费者因为消费者是需要常驻内存的,所以需要在cli下运行,我们可以通过以下操作创建一个任务。
?php?artisan?make:command?RabbitmqConsumerCommand基本代码(接下来就在command里面写消费消息的逻辑)<?phpnamespace?App\Console\Commands;use?Illuminate\Console\Command;use?PhpAmqpLib\Connection\AMQPStreamConnection;class?RabbitmqConsumerCommand?extends?Command{????/**?????*?The?name?and?signature?of?the?console?command.?????*?????*?@var?string?????*/????protected?$signature?=?'rabbitmq_consumer';//给消费者起个command名称????/**?????*?The?console?command?description.?????*?????*?@var?string?????*/????protected?$description?=?'Command?description';????/**?????*?Create?a?new?command?instance.?????*?????*?@return?void?????*/????public?function?__construct()????{????????parent::__construct();????}????/**?????*?Execute?the?console?command.?????*?????*?@return?int?????*/????public?function?handle()????{????????//创建服务器连接????????$connection?=?new?AMQPStreamConnection('localhost',?5672,?'guest',?'guest');????????//连接信道????????//信道是生产消费者与rabbit通信的渠道,生产者publish或者消费者消费一个队列都是需要通过信道来通信的????????//信道是建立在TCP上面的虚拟链接,也就是rabbitMQ在一个TCP上面建立成百上千的信道来达到多个线程处理。????????//注意是一个TCP?被多个线程共享,每个线程对应一个信道,信道在rabbit都有唯一的ID,保证了信道的私有性,对应上唯一的线程使用。????????$channel?=?$connection->channel();????????//channel->queue_declare通过信道创建一个是否是持久化的消息队列????????//queue第一个参数代表消息队列名称????????$channel->queue_declare('test',?false,?false,?false,?false);????????echo?"?[*]?Waiting?for?messages.?To?exit?press?CTRL+C\n";????????//进行监听消费者是否有消息,如果有进行输出消息内容????????$callback?=?function?($msg)?{????????????echo?'?[x]?Received?',?$msg->body,?"\n";????????};????????//通过信道进行消费消息????????$channel->basic_consume('test',?'',?false,?true,?false,?false,?$callback);????????//如果信道是打开状态????????while?($channel->is_open())?{????????????//然后让信道一直处于监听等待状态????????????$channel->wait();????????}????????//关闭信道????????$channel->close();????????//关闭连接????????$connection->close();????}}三、使用command进行测试生产消息和消费消息是否成功执行生产消息 php artisan rabbitmq_producer\ 执行消费消息 hp artisan rabbitmq_consumer