ZMQ 的常用模型

ZMQ 的常用模式包括

  • REQ/RES
  • PUSH/PULL
  • PUB/SUB

三种模式, 如下为简单实用说明

REQ/RES 模式

请求、反馈模式。为单对单模式。

如下, 请求、反馈分为两步

  1. REQ —> RES // REQ 请求方发送请求命令到反馈方
  2. RES —> REQ // RES 反馈方处理后返回给请求方

需要注意,REQ/RES 模式为 单对单 模式。 当先启动 REQ 再启动 RES 后,如果 RES 死亡后重新启动,无法 正常重新建立链接。但是如果 REQ 死亡后重新启动,可正常恢复链接。当启动多个 RES 后,只有第一个 RES 可正常运行。

如上:

  • 一方死亡后,比如重启 REQ 才可恢复链接
  • 一对一
  • 有反馈
  • 阻塞、等待(只有 REQ 存在,才请求,等待反馈)

代码示例:

REQ.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import zmq

context = zmq.Context()
socket = context.socket(zmq.REQ)

port = 5556
socket.connect('ipc://%s.ipc' % port)

i = 0

while True:
message = str(i)
i += 1
socket.send(message)
print(socket.recv())

RES.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import zmq
import time

port = 5556

context = zmq.Context()
socket = context.socket(zmq.REP)
socket.bind('ipc://%s.ipc' % port)

while True:
msg = socket.recv()
print(msg)
time.sleep(1)
socket.send(msg)

PUB/SUB 模式

发布订阅模式。为单对多模式。

1
2
3
4
5
6
7
8
9
10
11
12
        PUB
+
|
|
v
socket
+
+---------------+
| | |
| D1 | D2 | D3
v v v
SUB1 SUB2 SUB3

PUB 发送到 socketsocket 被不同的 SUB 进行了监听, 每个 SUB 订阅自己了自己需要了解的频段。当 PUB 发送到了该 SUB 需要的频段后。才进行处理。

需要注意。SUB 死亡后,SUB 再次启动,监听的频段信息会 一股脑 发到 SUB。当有多个 SUB 存在的情况下,只有一个 SUB 可以工作,当当前可工作的 SUB 死亡后,其他的 SUB 也会自动接管工作,依然死亡,除非有新的 SUB 启动。

SUBPUB 任意一方死亡后,重启启动即可。

如上:

  • 任意一方死亡都不影响对方
  • 一对多
  • 无反馈
  • SUB 死亡后重启会出现大量数据
  • 不阻塞,不等待(无论是否有 SUB, PUB 都正常 PUBLISH)

代码示例

PUB.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import zmq
import random

context = zmq.Context()
socket = context.socket(zmq.PUB)

port = 5556
socket.connect('ipc://%s.ipc' % port)

while True:
topic = random.randrange(10001, 99999)
messagedata = random.randrange(1, 215) - 80
print('%d %d' % (topic, messagedata))
socket.send('%d %d' % (topic, messagedata))

SUB.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import zmq
import sys

port = sys.argv[1] if len(sys.argv) > 1 else 5556

context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.bind('ipc://%s.ipc' % port)


for topicfilter in xrange(10001, 10005):
socket.setsockopt(zmq.SUBSCRIBE, str(topicfilter))

while True:
string = socket.recv()
topic, messagedata = string.split()
print(topic, messagedata)

PUSH/PULL

推送抓取模式。为单对多模式。

1
2
3
4
5
6
7
8
9
10
11
12
        PUSH
+
|
|
v
socket
+
+---------------+
| | |
|Data |Data |Data
v v v
PULL1 PULL2 PULL3

结构和 PUB/SUB 类似,不过 PULL 不订阅,有数据即获取。当没有 PULL 后,PUSH 无法工作。而且 PULL 如果有多个后,会轮训推送。
例如,PUSH 每秒推送一个数据。PULL1 先获取到数据,下一次数据 PULL2 获取到,下下一次数据 PULL3 获取到。

如上:

  • 阻塞、等待(当 PULL 不存在,PUSH 不工作)
  • 一对多
  • 轮训获取数据。

代码示例

PUSH.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import zmq
import time

context = zmq.Context()

socket = context.socket(zmq.PUSH)

socket.bind('ipc://5566.ipc')

i = 0
while True:
i += 1
print(i)
socket.send(str(i))
time.sleep(1)

PULL.py

1
2
3
4
5
6
7
8
9
10
11
12
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import zmq

context = zmq.Context()
socket = context.socket(zmq.PULL)

socket.connect('ipc://5566.ipc')

while True:
print(socket.recv())

应用场景

多个 worker

为更好发挥服务器性能,可以考虑开启多个 worker,使用 PUSH/PULL 模式进行设计。

杂数据过滤

针对服务器一些列数据,可使用 PUB/SUB 发送,由 SUB 方进行数据订阅后获取。

请求、反馈

使用 REQ/RES 模式,发送后处理等待反馈结果。

tips:

  • 通常可配合 msgpack 等一些数据序列化工具使用。