專 欄
❈
ZZR,Python中文社区专栏作者,OpenStack工程师,曾经的NLP研究者。主要兴趣方向:OpenStack、Python爬虫、Python数据分析。
Blog:http://skydream.me/
CSDN:http://blog.csdn.net/titan0427/article/details/50365480
❈
—
—
在Openstack中,各个组件内部使用消息队列进行通信,其中,RabbitMQ是常用的一种开源消息代理软件。这里作一个简要介绍。
RabbitMQ介绍
RabbitMQ实现了高级消息队列协议(AMQP)。
AMQP
AMQP是一个定义了应用之间消息传送协议的开放标准. AMQP旨在解决在两个应用之间传送消息存在的以下问题:
- 网络是不可靠的 -> 消息需要保存后再转发并有出错处理机制
- 与本地调用相比,网络速度慢 -> 得支持异步调用
- 应用之间是不同的(比如实现语言不同, 操作系统不同),且应用会经常变化 -> 得与应用无关
AMQP 使用异步的、应用对应用的、二进制数据通信来解决这些问题。
基本组件
RabbitMQ 是 AMQP 的一种实现, 其基本组件包括:
- Producer:Message的生产者, 负责产生消息并把消息发到Exchange。
- Message:RabbitMQ 转发的二进制对象,包括Headers、Properties和 Data。其中Data不是必要的。
- Exchange:负责接收Producer的Message, 并把它转发到合适的Queue.
- Binding:标识Queue和Exchange之间的关系。Exchange根据Message的Properties和Binding的Properties来确定将消息转发到哪些Queue。一个重要的Properties是binding_key。
- Queue:缓存Exchange发来的消息,并将消息主动发给Consumer或者由Consumer主动来获取消息。
- Consumer:使用Queue从Exchange中获取Message。
Message和Exchange
Message
消息结构包括:Headers、Properties和data。
其中,Properties包括几个重要的属性:
- routing_key:Direct和Topic类型的exchange会根据本属性来转发消息。
- delivery_mode:将其值设置为2将使用消息持久化。持久化的消息会被保存到磁盘。
- reply_to:客户端回调队列的名字。
- correlation_id
- content_type
Exchange
Exchange有几个重要的属性:
- name:exchange名字。空字符串名字的exchange为默认的exchange。
- type:决定了exchange的消息转发方式, 包括direct、fanout、topic和headers。
- durable:值为True的exchange会在rabbitmq重启会自动创建。Openstack使用的exchange该值都为False。
- auto_delete:值为True的exchange当消费者的连接都关闭后会被自动删除。 Openstack使用的exchange该值都为False。
- exclusive:设置为True的话,该exchange只允许被创建的connection使用,且在该connection关闭后它会自动删除。
RabbitMQ消息路由机制
决定Exchange消息路由的属性有:
- Exchange的type
- Message的routing_key
- Binding的binding_key
具体规则如下:
RabbitMQ有多种版本的客户端,本文使用Pika,安装如下。
1. `$ pip install pika`
RabbitMQ扩展插件
Management Plugin
提供GUI来管理RabbitMQ。官方地址: https://www.rabbitmq.com/management.html
RabbitMQ用户密码可以在
/etc/rabbitmq/abbitmq.config
查看:
打开图形界面:
1. `# rabbitmq-plugins enable rabbitmq\_management`
然后通过端口15672就可以访问web管理界面。
Hello World!
首先,我们尝试从Publisher发送一条消息“Hello World”到Consumer。
Publisher
发送消息 主要包括以下几个操作:
- 与RabbitMQ建立连接。
- 声明要使用的queue。
- RabbitMQ中,消息不会直接发到queue,而是发到exchange,由exchange转发到相应的queue。下面的例子中使用了默认的exchange,它会进行定向转发,也就是将message发到routing_key所指定的queue中。
- 最后,为了保证网络缓存flushed(也就是消息被发出去了),手动关闭连接。
1. `# filename: send.py`
2. `#!/usr/bin/env python`
3. `import pika`
4.
5.
6. `connection = pika.BlockingConnection(pika.ConnectionParameters(`
7. `host='localhost'))`
8. `channel = connection.channel()`
9. `channel.queue_declare(queue='hello')`
10. `channel.basic_publish(exchange='',`
11. `routing_key='hello',`
12. `body='Hello World!')`
13. `print(" [x] Sent 'Hello World!'")`
14. `connection.close()`
执行完以上指令,通过命令行你可以看到queue已经被建立且包含了我们发出的信息:
Consumer
接收消息 主要包括以下几个操作:
- 与RabbitMQ建立连接。
- 声明监听的queue。
- 建立consumer。comsumer需要一个回调函数来负责处理接收到的消息。
- start_consuming(),其本质是一个while循环,不断取出队列中的消息。
1. `# filename: receive.py`
2. `#!/usr/bin/env python`
3. `import pika`
4.
5.
6. `connection = pika.BlockingConnection(pika.ConnectionParameters(`
7. `host='localhost'))`
8. `channel = connection.channel()`
9. `channel.queue_declare(queue='hello')`
10.
11.
12. `def callback(ch, method, properties, body):`
13. `print(" [x] Received %r" % body)`
14. `channel.basic_consume(callback,`
15. `queue='hello',`
16. `no_ack=True)`
17. `print(' [*] Waiting for messages. To exit press CTRL+C')`
18. `channel.start_consuming()`
Work Queues
在上一个例子中,consumer只是简单地打印信息,在这个例子中,我们将consumer改为一个worker,它将根据消息完成一些任务。其本质和print("hello world")并没有什么区别,但是为了保证任务能正确完成,需要一些额外的操作,使workder更健壮。
consumer挂了怎么办
改写以上代码。
publisher发送的消息可以从命令参数中读取。参数包括若干个点,点的数量决定了consumer需要花多少秒来完成任务:
1. `# send.py`
2. `message = ' '.join(sys.argv[1:]) or "Hello World!"`
3. `channel.basic_publish(exchange='',`
4. `routing_key='hello',`
5. `body=message)`
6. `print(" [x] Sent %r" % message)`
consumer处理消息的回调函数,将根据message进行sleep():
1. `# receive.py`
2. `def callback(ch, method, properties, body):`
3. `print(" [x] Received %r" % body)`
4. `time.sleep(body.count(b'.'))`
5. `print(" [x] Done")`
6. `ch.basic_ack(delivery_tag=method.delivery_tag)`
7. `channel.basic_consume(callback,`
8. `queue='hello')`
当consumer处理完任务,会回复ack。如果没有ack,这个消息将在queue中处于unacknowledged状态。如果这个consumer处理过程中挂了,这个message将被分发给其它consumer。这个机制保证了所有的消息都可以被处理 。(一种很坏的情况是,consumer处理了message但没有返回ack,但这个consumer又一直不挂,那么这些被它处理的message就会一直以unack的状态保存在queue中。)
rabbitmq挂了怎么办
为了保证rabbitmq挂了都不会使message消失,我们必须保证:
- queue持久化
- message持久化
由于rabbitmq不允许两个队列重名,下面的代码改用task_queue作为队列名。修改代码如下:
1. `# filename: send.py`
2. `channel.queue_declare(queue='task_queue', durable=True)`
3. `channel.basic_publish(exchange='',`
4. `routing_key="task_queue",`
5. `body=message,`
6. `properties=pika.BasicProperties(`
7. `delivery_mode=2, )`
8. `)`
1. `# filename: receive.py`
2. `channel.queue_declare(queue='task_queue', durable=True)`
任务平均分配
目前的情况是,任务将被平均分配给每一个consumer。比如,如果有两个consumer,那么任务将你一个我一个来分配,而不会根据任务的复杂度来分配。一种极端情况是,奇数任务复杂度很高,偶数任务复杂度很低,那么就会导致一个consumer一直很忙,而另一个一直很闲。为此,进一步修改代码:
1. `# filename: receive.py`
2. `channel.basic_qos(prefetch_count=1)`
这个参数限制了consumer手上的message数量。如果consumer手上已经有一个unack的message,那么后续的message就不会发给它了。
完整代码
1. `# filename: send.py`
2. `# !/usr/bin/env python`
3. `import pika`
4. `import sys`
5.
6.
7. `connection = pika.BlockingConnection(pika.ConnectionParameters(`
8. `host='localhost'))`
9. `channel = connection.channel()`
10. `channel.queue_declare(queue='task_queue', durable=True)`
11. `message = ' '.join(sys.argv[1:]) or "Hello World!"`
12. `channel.basic_publish(exchange='',`
13. `routing_key="task_queue",`
14. `body=message,`
15. `properties=pika.BasicProperties(`
16. `delivery_mode=2, )`
17. `)`
18. `print(" [x] Sent %r" % message)`
19. `connection.close()`
1. `# filename: receive.py`
2. `#!/usr/bin/env python`
3. `import pika`
4. `import time`
5.
6.
7. `connection = pika.BlockingConnection(pika.ConnectionParameters(`
8. `host='localhost'))`
9. `channel = connection.channel()`
10. `channel.queue_declare(queue='task_queue', durable=True)`
11. `channel.basic_qos(prefetch_count=1)`
12.
13.
14. `def callback(ch, method, properties, body):`
15. `print(" [x] Received %r" % body)`
16. `time.sleep(body.count(b'.'))`
17. `print(" [x] Done")`
18. `ch.basic_ack(delivery_tag=method.delivery_tag)`
19. `channel.basic_consume(callback,`
20. `queue='task_queue')`
21. `print(' [*] Waiting for messages. To exit press CTRL+C')`
22. `channel.start_consuming()`
再回顾一下上面的代码。首先明确,这种情况使用的是默认exchange。对于producer,它将消息交给exchange,然后exchange通过routing key来判断要将消息交到哪个queue。实际上相当于将消息直接发送到queue中。而consumer直接指定queue的名字,也就是它直接绑定到这个queue。这个过程中exchange其实没什么存在感。
ARTICLES
近期热门文章
⊙ 生成器 :
关于生成器的那些事儿
⊙ 爬虫代理 :
如何构建爬虫代理服务
⊙ 地理编码 :
怎样用Python实现地理编码
⊙ nginx日志 :
使用Python分析nginx日志
⊙ 淘宝女郎 :
一个批量抓取淘女郎写真图片的爬虫
⊙ IP代理池 :
突破反爬虫的利器——开源IP代理池
⊙ 布隆去重 :
基于Redis的Bloomfilter去重(附代码)
⊙ 内建函数 :
Python中内建函数的用法
⊙ QQ空间爬虫 :
QQ空间爬虫最新分享,一天 400 万条数据
⊙ 对象 :
Python教你找到最心仪对象
⊙ 线性回归 :
Python机器学习算法入门之梯度下降法实现线性回归
⊙ 匿名代理池 :
进击的爬虫:用Python搭建匿名代理池
⊙ 发射导弹 :
Python发射导弹的正确姿势
在公众号底部回复上述关键词可直接打开相应文章
Python 中 文 开 发 者 的 精 神 家 园
§
§
Python中文社区
致力于成为
国内最好的Python社区
QQ群:152745094
专栏作者申请邮箱
— Life is short,we use Python —
