当前位置: 首页 >资讯 >

python操作rabbitmq

来源:博客园 2023-06-25 13:28:41

rabbitmq安装部署

RabbitMq生产者消费者模型

生产者(producter)队列消息的产生者,复制生产消息,并将消息传入队列生产者代码:


【资料图】

import pikaimport jsoncredentials = pika.PlainCredentials("admin","admin")#mq用户名和密码,用于认证#虚拟队列需要指定参数virtual_host,如果是默认的可以不填connection = pika.BlockingConnection(pika.ConnectionParameters(host="10.0.0.24",port=5672,virtual_host="/",credentials=credentials))channel = connection.channel()# 创建一个AMQP信道#声明队列,并设置durable为True,为了避免rabbitMq-server挂掉数据丢失,将durable设为Truechannel.queue_declare(queue="1",durable=True)for i in range(10):   # 创建10个q    message = json.dumps({"OrderId":"1000%s"%i})    # exchange表示交换器,可以精确的指定消息应该发到哪个队列中,route_key设置队列的名称,body表示发送的内容    channel.basic_publish(exchange="",routing_key="1",body=message)    print(message)connection.close()
操作前

通过pika生命一个认证用的凭证,然后用pika创建rabbitmq的块连接,再用上面的连接创建一个AMQP信道 。创建消息队列的连接时,需要指定ip,断开,虚拟主机,凭证。

然后根据上面的信道,声明一个队列,

我们可以看到,下面信道点队列声明里的queue参数值就队列的名字。这里是遍历0到9,然后打印了下消息,这里的生成的消息,是json序列化后的数据。然后将数据作为i,信道点基础发布的body参数的值。上面信道点队列声明是创建一个队列,队列名字是’1‘,下面我们用信道点基本发布,是将我们创建的消息体发送到队列中,路由_key就是指定队列名称,指定发布消息到哪个队列,消息是作为body的参数,

最后,需要将这个消息队列的连接关闭。

我们通过页面可以看到,已经创建好了这个队列,队列名字为1,并且已经通过遍历生成的10个消息,调用十次信道点基础发布方法,将这十个产生的消息发布到消息队列中

我们可以再看下,可以看到我们创建的消息的具体内容。

消费者(consumer):队列消息的接收者,扶着接收并处理消息队列中的消息

import pikacredentials = pika.PlainCredentials("admin","admin")connection = pika.BlockingConnection(pika.ConnectionParameters(    host="10.0.0.24",    port=5672,    virtual_host="/",    credentials=credentials))channel = connection.channel()#声明消息队列,消息在这个队列中传递,如果不存在,则创建队列channel.queue_declare(queue="1",durable=True)# 定义一个回调函数来处理消息队列中消息,这里是打印出来def callback(ch,method,properties,body):    ch.basic_ack(delivery_tag=method.delivery_tag)    print(body.decode())#告诉rabbitmq,用callback来接收消息channel.basic_consume("1",callback)#开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理channel.start_consuming()

获取消息,创建凭证,连接,信道,然后什么一下队列。指定我们要获取哪个队列中的消息,如果没有这个队列,就会创建这个队列,存在,那么后面使用这个信道,就会从这个队列中获取数据。信道是通过rabbitmq的连接对象来生成的,连接对象中放了连接用的凭证。所以,信道点基础消费方法,指定是哪个消息队列,那么就会从这个队列中获取消息。然后传参回调函数。而回调函数中,

我们可以看到,基础消费方法里面有消息回调,就是上面我们自定义的回调函数

这个方法定义了回调函数的写法。第一个参数是信道

第二个参数是方法,第三个参数是属性,第四个是body,这些不用管,只需要按如下格式,就可以从body,做个解码,就将信道点基础消费中指定的队列中的消息,取出来了,我们是用回调函数来接收消息,当需要获取消息的时候,就需要执行信道点开始消费的方法。这里好像是遍历队列一个一个的将消息获取出来。那么怎样实现,实时监听消息,实时消费呢

RabbitMq持久化

RabbitMq持久化MQ默认建立的临时的queue和exchange,如果不声明持久化,一旦rabbitmq挂掉,queue,exchange将会全部丢失,所以我们一般在创建queue或者exchange的时候会声明持久化

1.queue声明持久化

# 声明消息队列,消息将在这个队列传递,如不存在,则创建。durable = True 代表消息队列持久化存储,False 非持久化存储result = channel.queue_declare(queue = "python-test",durable = True)

使用True

重启消息队列服务

消息队列还在,但是消息被清空了

当我改为false的时候,因为队列1已经存在,并且是Tue声明的,所以这里就报错了

我们设置为false,然后声明一个不存在的队列2

创建好了队列,并且10个消息

重启一下消息队列服务

刚刚上面创建的队列2已经不存在,这已经不是消息被清空了,而是队列直接被清除了

也就是这个Ture,是保留队列用的,持久化队列的。

channel.queue_declare(queue="2",durable=True)

2、exchange声明持久化

# 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建.durable = True 代表exchange持久化存储,False 非持久化存储channel.exchange_declare(exchange = "python-test", durable = True)

注意:如果已存在一个非持久化的queue或exchange,执行上述代码会报错,因为当前状态不能更该queue 或 exchange存储属性,需要删除重建,如果queue和exchange中一个声明了持久化,另一个没有声明持久化,则不允许绑定

我们在1处改了,但是在2处没有修改。结果有问题。

队列2不存在,所以没有将消息放进去

而exchange这里,没有写将消息推送到声明的python-test里面,所以里面也没有消息

这次是声明的exchange,并且将消息推送到python-test里面

还是没有看到有东西呀

我们这里发布个消息,可以看到,是需要路由的

加上路由,再次执行程序

由于队列2 不存在,好像还是不行

我在这里给它bind一个路由

感觉还是没有弄明白,先放弃了

原来是如下方式呀。

首先,在python-test2里面,

给exchange绑定队列1和2

1和2目前的消息数量

我往路由1里面push一个消息

push成功

然后再看队列1里面,可以看到多了一条刚刚push的消息

接下来用程序实现,声明exchange,然后发布方法不变,发布到exchage中,因为已经绑定了两个路由了,这里指定路由key,根据路由key,可以将消息push到对应的队列中去

我们可以看到,之前是页面点击push了一条,上面程序push了十条到exchange,现在这个队列就有11条数据。可是这个exchange和队列的绑定,是我自己在页面上绑定的,这个应该不合理。以后有时间看下,怎么用程序绑定。

我们可以看到,应该是程序中缺少使用这个绑定方法吧

3、消息持久化

虽然exchange和queue都声明了持久化,但如果消息只存在内存里,rabbitmq重启后,内存里的东西还是会丢失,所以必须声明消息也是持久化,从内存转存到到硬盘

# 向队列插入数值 routing_key是队列名。delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化channel.basic_publish(exchange = "",routing_key = "python-test",body = message, properties=pika.BasicProperties(delivery_mode = 2))

我们这里先重启一下rabbitmq,把之前的写入队列的消息清空

不过我们看到,这里已经有持久化存储的消息了,之前好像是页面点击推送的消息

总共一条,持久化1条。持久化的,即使重启服务,消息也不会丢失

我们再去推送一条

可以看到刚刚推送的这条也是持久化存储的

我们在发布的方法里面,添加属性发布的模式是2,

刚才是2条持久化的,现在新增10条数据,且是持久化的消息

如果改成1

可以看到,刚刚新增了10条消息,但是这10条消息没有持久化。

4、acknowledgement消息不丢失

消费者(consume)调用callback函数时,会存在处理消息失败的风险,如果处理失败,则消息会丢失,但是也可以选择消费者处理失败时,将消息回退给rabbitmq,重新再被消费者消费,这个时候需要设置确认标识。

channel.basic_consume(callback,queue = "python-test",# no_ack 设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。True,无论调用callback成功与否,消息都被消费掉             no_ack = False)

目前队列2中有10条没有持久化的,有12条持久化的消息

执行消费程序

再看队列2中,可以看到之前12条持久化和10条没有持久化的消息数据都已经被消费了。我们可以看到消费者这里,多了一个消费者。消费者有个tag,还有ack的确认。在详情那里,也可以看到 消费者数量是1

我们push了一条消息,但是没有发现推送到队列中,难道是因为队列绑定exchange的原因?

push的时候,有个持久化的选择,发现还是没有push进去

在exchange这里push了,

发现队列1有数据,2没有消息

往路由key这里发送多次消息

还是没有,难道上面都是失败的发送嘛

我们再看消费者程序,我们看到运行程序之后,这个程序一直没有退出,处于监听状态,正如我们在队列中看到的那样,有个消费者是up状态,也就是这个消费者一直在监听我们上面的那个队列,程序并没有退出。因此,我们上面在页面push的sss之类的消息,都被这个消费者消费掉了,因此没有看到新增的消息。

我们将上面的消费者程序停掉之后,就可以看到队列下面已经显示没有消费者了,然后再推送消息的时候,页面选择持久化,

我们可以看到,推送的消息,是持久化的。由上面的学习,了解到,消息是否持久化,好像是取决于生产者的设置,而不是说消息没有持久化,我给它用命令持久化一下,至于是否可以用命令持久化一下,本来不需要持久化的消息,暂且不考虑。

RabbitMq发布与订阅

在上一章中,我们创建了一个工作队列,工作队列模式的设想是每一条消息只会被转发给一个消费者。本章将会讲解完全不一样的场景: 我们会把一个消息转发给多个消费者,这种模式称之为发布-订阅模式。RabbitMq消息模式的核心思想是:一个生产者并不会直接往一个队列中发送消息,事实上,生产者根本不知道它发送的消息将被转发到哪些队列。实际上,生产者只能把消息发送给一个exchange,exchange只做一件简单的事情:一方面它们接收从生产者发送过来的消息,另一方面,它们把接收到的消息推送给队列。一个exchage必须清楚地知道如何处理一条消息.  rabbitmq的发布与订阅要借助交换机(Exchange)的原理实现:

Exchange 一共有三种工作模式:fanout, direct, topicd

模式一:fanout

这种模式下,传递到exchange的消息将会==转发到所有于其绑定的queue上

不需要指定routing_key,即使指定了也是无效的。需要提前将exchange和queue绑定,一个exchange可以绑定多个queue,一个queue可以绑定多个exchange。需要先启动订阅者,此模式下的队列是consume随机生成的,发布者仅仅发布消息到exchange,由exchange转消息至queue。exchange交换器首先我们创建一个fanout类型的交换器,我们称之为:python-test:

channel.exchange_declare(exchange = "python-test",durable = True, exchange_type="fanout")

广播模式交换器很简单,从字面意思也能理解,它其实就是把接收到的消息推送给所有它知道的队列。  想查看当前系统中有多少个exchange,可以从控制台查看

可以看到有很多以amq.*开头的交换器,以及(AMQP default)默认交换器,这些是默认创建的交换器。  在前面,我们并不知道交换器的存在,但是依然可以将消息发送到队列中,那其实并不是因为我们可以不使用交换器,实际上是我们使用了默认的交换器(我们通过指定交换器为字字符串:""),回顾一下我们之前是如何发送消息的:

channel.basic_publish(exchange="",routing_key="1",body=message)

第一个参数是交换器的名字,空字符串表示它是一个默认或无命名的交换器,消息将会由指定的路由键(第二个参数,routingKey,后面会讲)转发到队列。  你可能会有疑问:既然exchange可以指定为空字符串(""),那么可否指定为null?    答案是:不能!

通过跟踪发布消息的代码,在AMQImpl类中的Publish()方面中,可以看到,不光是exchange不能为null,同时routingKey路由键也不能为null,否则会抛出异常:

临时队列

在前面的例子中,我们使用的队列都是有具体的队列名,创建命名队列是很必要的,因为我们需要将消费者指向同一名字的队列。因此,要想在生产者和消费者中间共享队列就必须要使用命名队列。

发布者:

import pikaimport jsoncredentials = pika.PlainCredentials("admin", "admin")  # mq用户名和密码# 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。connection = pika.BlockingConnection(pika.ConnectionParameters(host = "10.0.0.24",port = 5672,virtual_host = "/",credentials = credentials))channel=connection.channel()# 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。durable = True 代表exchange持久化存储,False 非持久化存储channel.exchange_declare(exchange = "python-test",durable = True, exchange_type="fanout")for i in range(10):    message=json.dumps({"OrderId":"1000%s"%i})# 向队列插入数值 routing_key是队列名。delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化。routing_key 不需要配置    channel.basic_publish(exchange = "python-test",routing_key = "",body = message,                          properties=pika.BasicProperties(delivery_mode = 2))    print(message)connection.close()

订阅者1:

import pikacredentials = pika.PlainCredentials("admin", "admin")connection = pika.BlockingConnection(pika.ConnectionParameters(host = "10.0.0.24",port = 5672,virtual_host = "/",credentials = credentials))channel = connection.channel()# 创建临时队列,队列名传空字符,consumer关闭后,队列自动删除result = channel.queue_declare("4")# 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。durable = True 代表exchange持久化存储,False 非持久化存储channel.exchange_declare(exchange = "python-test",durable = True, exchange_type="fanout")# 绑定exchange和队列  exchange 使我们能够确切地指定消息应该到哪个队列去channel.queue_bind(exchange = "python-test",queue = "4")# 定义一个回调函数来处理消息队列中的消息,这里是打印出来def callback(ch, method, properties, body):    ch.basic_ack(delivery_tag = method.delivery_tag)    print(body.decode())channel.basic_consume(result.method.queue,callback,# 设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。True,无论调用callback成功与否,消息都被消费掉                      auto_ack = False)channel.start_consuming()

订阅者2

import pikacredentials = pika.PlainCredentials("admin", "admin")connection = pika.BlockingConnection(pika.ConnectionParameters(host = "10.0.0.24",port = 5672,virtual_host = "/",credentials = credentials))channel = connection.channel()# 创建临时队列,队列名传空字符,consumer关闭后,队列自动删除result = channel.queue_declare("2",durable=True)# 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。durable = True 代表exchange持久化存储,False 非持久化存储channel.exchange_declare(exchange = "python-test",durable = True, exchange_type="fanout")# 绑定exchange和队列  exchange 使我们能够确切地指定消息应该到哪个队列去channel.queue_bind(exchange = "python-test",queue = "2")# 定义一个回调函数来处理消息队列中的消息,这里是打印出来def callback(ch, method, properties, body):    ch.basic_ack(delivery_tag = method.delivery_tag)    print(body.decode())channel.basic_consume(result.method.queue,callback,# 设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。True,无论调用callback成功与否,消息都被消费掉                      auto_ack = False)channel.start_consuming()

当前的队列如下

发布消息,exchange类型不对

下面这就是直连类型

进去之后把找个已经存在的exchange删除了,这个暂时没用

发布,这里也没有指的路由key

可以看到新建的exchange类型是fanout

因为没有绑定队列,所以程序推送的消息,好像是丢失了

开启订阅者1,声明队列4并绑定到前面创建的python-test这个exchange。

查看,队列4已经创建

有个消费者正连接着4

并且订阅者1声明的队列,也跟指定的exchange已经绑定了,路由key,默认就是用的队列名称

pika.exceptions.ChannelClosedByBroker: (406, "PRECONDITION_FAILED - inequivalent arg "durable" for queue "2" in vhost "/": received "false" but current is "true"")

开启订阅者2,但是报错了,因为队列2已经存在了,并且是Ture,是持久化的,而这里信道点队列声明2,是没有指定那个参数,那就是默认是Flase,非持久化的队列,重启下服务这个队列就不存在了。因此保持了。我们先将这个已经存在的队列删除,然后重新声明一下吧,或者是直接给它加个持久化的参数也行

加上之后,就能正常开启这个订阅者2了

我们创建的4,是非持久化的队列,这里这个d的标记,可能就是durable参数,是否持久化队列的意思吧

我们重新执行一次发布者程序,发布者并没有指定路由key,只是指定了exchange,而订阅者1和2程序里面,都是有绑定这个exchange的

我们可以看到,订阅者1获取到了发布到这个exchage的消息

订阅者2也获取到了发布到这个exchage的消息

再来看下这个exchange的情况

它对应的两个队列

队列2有个消费者

队列4也有个消费者,这两个消费者各自对应一个队列,每个消费者请求过来是的端口不同,消费者tag不同。两个队列中的消息,都被订阅者程序获取并打印在pycharm上进行消费了,因此,队列中也就没有数据了。

难道,一个队列,就是一个订阅者吗?当发布者发布消息的时候,难道是基础发布方法里面,指定exchange,不指定路由key,这样就会将生产者生产的消息,发送给所有绑定这个exchange的队列吗,而订阅者和队列一一对应,然后每个订阅者就从自己对应的队列中将这个消息消费掉吗?

把两个订阅者,都停止掉,查看目前这两个队列,都是没有消息的。

我执行发布者程序,发布消息,指定exchange,不指定路由key。

我们可以看到,这种情况下,的确是将消息发布给所有绑定这个exchange的队列了,如下,2和4队列都绑定了,所以都接收到了十条消息。

我们发布消息的参数,指定消息是持久化的,因为队列2是个持久化的队列,因此,进入队列2的消息也是持久化的

由于声明队列4,不是持久化的队列,因此,即使发布消息时,指定消息是持久化的,但是实际上这个消息也是没有在这个非持久化的队列中进行持久化,也只是临时的罢了。

我开启订阅者1

订阅者1对应着队列4,队列4的消息已经被消费了,已经在上图中打印出来了。

开启订阅者2

订阅者2对应的队列是2,也将消息消费掉了,并在订阅者2程序中打印了出来

如果,队列或者消息是临时的,消费者还没消费的消息,因为重启服务,那么就会丢失消息,消费者应该就消费不到那个丢失的消息了。

模式二:direct

这种工作模式的原理是消息发送至exchange,exchange根据**路由键(routing_key)**转发到相对应的queue上。

  • 可以使用默认exchange=’ ",也可以自定义exchange
  • 这种模式下不需要将exchange和任何进行绑定,当然绑定也是可以的,可以将exchange和queue,routing_key和queue进行绑定
  • 传递或接收消息时,需要指定routing_key
  • 需要先启动订阅者,此模式下队列是consumer随机生成的,发布者仅仅发布消息到exchange,由exchange转发消息至queue。

发布者:

import pikaimport jsoncredentials = pika.PlainCredentials("admin", "admin")  # mq用户名和密码# 虚拟队列需要指定参数 virtual_host,如果是默认的可以不填。connection = pika.BlockingConnection(pika.ConnectionParameters(host = "10.0.0.24",port = 5672,virtual_host = "/",credentials = credentials))channel=connection.channel()# 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。durable = True 代表exchange持久化存储,False 非持久化存储channel.exchange_declare(exchange = "python-test",durable = True, exchange_type="direct")for i in range(10):    message=json.dumps({"OrderId":"1000%s"%i})# 指定 routing_key。delivery_mode = 2 声明消息在队列中持久化,delivery_mod = 1 消息非持久化    channel.basic_publish(exchange = "python-test",routing_key = "OrderId",body = message,                          properties=pika.BasicProperties(delivery_mode = 2))    print(message)connection.close()

订阅者:

import pikacredentials = pika.PlainCredentials("admin", "admin")connection = pika.BlockingConnection(pika.ConnectionParameters(host = "10.0.0.24",port = 5672,virtual_host = "/",credentials = credentials))channel = connection.channel()# 创建临时队列,队列名传空字符,consumer关闭后,队列自动删除result = channel.queue_declare("",exclusive=True)# 声明exchange,由exchange指定消息在哪个队列传递,如不存在,则创建。durable = True 代表exchange持久化存储,False 非持久化存储channel.exchange_declare(exchange = "python-test",durable = True, exchange_type="direct")# 绑定exchange和队列  exchange 使我们能够确切地指定消息应该到哪个队列去channel.queue_bind(exchange = "python-test",queue = result.method.queue,routing_key="OrderId")# 定义一个回调函数来处理消息队列中的消息,这里是打印出来def callback(ch, method, properties, body):    ch.basic_ack(delivery_tag = method.delivery_tag)    print(body.decode())#channel.basic_qos(prefetch_count=1)# 告诉rabbitmq,用callback来接受消息channel.basic_consume(result.method.queue,callback,# 设置成 False,在调用callback函数时,未收到确认标识,消息会重回队列。True,无论调用callback成功与否,消息都被消费掉                      auto_ack = False)channel.start_consuming()

将之前测试用的exchanges删除,队列也删除

使用direct类型的exchange,发布消息

没有队列生成

开启消费者程序,exchange声明的类型是direct,队列绑定exchange,指定路由key,这个路由key,并没有这个名字的队列

开启上面的消费者程序之后,就生成了一个队列。这个生成的队列,进入可以看到是有消费者在监听这个队列的。这个队列,以上面命名的路由key,来绑定了前面定义的exchange。

我们进入这个exchange查看下,路由key,定向到某个队列

我们看下发布消息的程序,就是exchange声明里面,定义了direct方式,而基础发布方法里面,就指定发布到上面定义的exchange,然后指定路由key为之前执行消费者程序时,随机生成名字的队列,绑定exchange时使用的路由key。这样,我们发布消息的时候,发布给exchange,就会根据路由key,然后找到对应的队列,将消息推送到这个队列中。

由于我们的订阅者,一直在监听,当上面发布消息到队列中后,订阅者就从exchange下根据路由key,找到对应的队列,然后将队列中的消息消费,打印到pycharm上,

模式三:topicd

这种模式和第二种差不多,exchange也是通过路由键routing_key来转发消息到指定的queue。不同之处在于:**routing_key使用正则表达式支持模糊匹配,**但匹配规则又与常规正则表达式不同,比如"#"是匹配全部,“*”是匹配一个词。举例:routing_key =“#orderid#”,意思是将消息转发至所有 routing_key 包含 “orderid” 字符的队列中。代码和模式二 类似,

我们用上面的代码改 一下,再复制处两个订阅者,只需要修改下路由key为带2的 带3的数字就可以

我们再改一下

我们看页面,可以看到又多了两个队列了

可以看到这个exchange对应三个队列,路由key都是带有OrderId,

我们将路由key,改为匹配的方式,然后发布消息

演示失败

参考链接:https://blog.csdn.net/weixin_45144837/article/details/104335115

相关阅读

关键词:

大家还在看
热词