首页 > Computer > ZeroMQ指南:第2章:中级材料
2013
06-09

ZeroMQ指南:第2章:中级材料

1 零之禅

名字中Zero的含义:

  • 无需代理(zero broker)
  • 零延迟(zero latency as possible)

Zero也表示设计目标:

  • 零管理
  • 零开销
  • 零浪费

概括地说,Zero表示最简化的设计理念:通过移除复杂性来增强能力,而不是通过增加新功能。

2 套接字API

ZeroMQ中的套接字管理分成四个部分:

  • 创建和销毁套接字:zmq_socket和zmq_close
  • 设置和获取套接字选项:zmq_setsockopt和zmq_getsockopt
  • 通过创建连接来将套接字插入到网络拓扑结构中:zmq_bind和zmq_connect
  • 使用套接字收发消息:zmq_recv和zmq_send

一般来说,ZeroMQ应用中,套接字属于ZeroMQ,消息属于你的代码。

3 将套接字插入到拓扑结构中

要在两个节点间创建一个连接,则需要对一个节点调用zmq_bind,对另一个调用zmq_connect。一般来说,调用zmq_bind的节点是服务器,使用众所周知的网络地址;而调用zmq_connect的节点是客户端,使用未知或者任意的网络地址。

ZeroMQ连接与TCP连接的主要不同之处:

  • ZeroMQ可使用多种传输端点(inproc、ipc、tcp、pgm或者epgm)
  • 客户端调用zmq_connect的时候连接就存在了,不论此时服务器是否已经调用zmq_bind
  • ZeroMQ是异步的,在必要的时候会有队列
  • 根据两端使用的套接字的类型,连接可以表达某种“消息模式”
  • 一个套接字可以有多个输出和输入连接
  • 没有zmq_accept函数。绑定到端点的时候套接字会自动开始接受连接。
  • ZeroMQ应用不能直接使用相关的底层连接(如TCP连接),它们被封装在ZeroMQ套接字之中了。

如果客户端先启动,开始发送消息,而此时服务器还没有启动,则消息会被存放到消息队列中(当前前提是队列还没有满),等服务器启动后才开始投递。

服务器节点可以绑定到多个端点;当然,客户端也可以连接到多个端点。客户端每次调用zmq_connect都会使得服务器端的套接字获得一个新的连接。

套接字有不同的类型。套接字类型定义了套接字的语义,以及路由消息和消息排队的策略等等。套接字类型与“消息模式”相关。

以不同方式连接套接字的能力给了ZeroMQ作为一个消息队列系统所需要的基本能力。使用ZeroMQ定义网络体系的时候,只需要把各个部分插接到一起,就像玩积木一样。

4 使用套接字传输数据

使用zmq_send和zmq_recv来发送和接收消息。在传输数据方面,ZeroMQ套接字与TCP套接字的主要不同是:

  • ZeroMQ套接字传输消息,而不是字节(如TCP)或者数据包(如UDP)。消息是带有长度指定的二进制数据块。
  • ZeroMQ套接字在后台线程中进行IO。这意味着,在应用进行其他事情的同时,消息可以到达本地输入队列,也可以发送本地输出队列中的消息。当然,队列是可以配置的。
  • 根据类型的不同,套接字可以连接到多个其他套接字,或者被其他多个套接字连接。TCP就像一对一的电话呼叫,而ZeroMQ实现了一对多(像无线电广播)、多对多(像邮局)、多对一(像邮箱),以及一对一模式。
  • ZeroMQ可以将消息发送到很多端点(扇出模型),或者从多个端点接收消息(扇入模型)。

 

zmq_recv可以从所有连接上收集消息。它使用公平排队算法,让每个发送者都有机会把数据发送给接收者。

zmq_send不会真正把消息发送到套接字连接中。它只是对消息进行排队,以便IO线程可以异步地发送消息。除少数特殊情况外zmq_send不会阻塞。

5 单播传输端点

  • 单播传输端点包括inproc、ipc和tcp。
  • 大多数情况下请使用tcp。tcp是可断开的(disconnected)传输端点。“可断开”的意思是:执行连接的时候,不要求对端已经存在。客户端和服务器可以在任何时刻进行绑定和连接,可以下线和上线,而这对应用是透明的。
  • ipc是进程间传输端点,与tcp类似,只是只能用在局域网中,不需要指定IP地址或者域名。ipc传输端点也是“可断开的”。当前ipc还不能在Windows中工作,以后的版本中也许可以。在Unix系统中工作的时候需要合适的权限,否则不同用户的进程可能没法共享传输端点。
  • inproc是线程间(进程内?)传输端点。inproc是一种连接的信号传输端点(connected signaling transport)。inproc比tcp和ipc要快很多。与tcp和ipc的最大不同是:必须在connect之前调用bind。未来版本的ZeroMQ可能不会要求必须这样,但是当前是这样的。

6 ZeroMQ不是中性载体

ZeroMQ不是中性载体,它在传输层上增加了自己的帧结构。这种结构不能与现有的协议(如HTTP)兼容。也就是说,ZeroMQ不是像boost.asio那样直接使用传输层传输客户的数据,而是额外增加了少量管理信息,这就导致了不兼容。比如说,如果写一个ZeroMQ应用,给Web服务器发送HTTP请求,则Web服务器没法理解请求,因为ZeroMQ在HTTP请求上增加了管理信息,发出的数据已经不是正确的HTTP请求了。

7 核心消息模式

ZeroMQ迅速高效地将数据块(消息)传递到节点中。节点可以映射到线程、进程,或者机器。应用程序只需要使用单个简单的套接字API就可以了,而不用管实际的传输端点是什么(进程内、进程间、TCP,或者多播等)。ZeroMQ会自动处理重连。必要的时候ZeroMQ会在发送端和接收端进行消息排队。ZeroMQ仔细地管理消息队列以确保进程不会耗尽内存,必要的时候会把消息存放到磁盘中。ZeroMQ处理了套接字错误。ZeroMQ在后台进行所有IO。ZeroMQ使用无锁(lock-free)技术进行节点间的通信,所以不需要信号量、锁和等待,也不会死锁。

ZeroMQ根据消息模式进行消息的排队和路由。正是消息模式给了ZeroMQ智能。消息模式封装了难以学到的、用最好的方式分布数据和任务的经验。ZeroMQ的消息模式是硬编码到库中的,未来的版本也许会允许用户自定义模式。内建的消息模式有:

  • 请求-应答模式:连接一系列客户端和一系列服务。这是一种远程过程调用和任务分布模式。
  • 发布-订阅模式:连接一系列发布者和一系列订阅者。这是一种数据分布模式。
  • 管线模式:以扇出/扇入模式连接节点,可以有多步和循环。这是一种并行任务分布和收集模式。
  • 专用套接字对模式:与TCP套接字类似,连接两个套接字,用于特定的高级情况。

上述四种核心消息模式是内建在ZeroMQ中的,是API的一部分,由C++库实现。在这四种模式的基础上还有高层消息模式。高层模式不是库的一部分,不包含在ZeroMQ包中,而是ZeroMQ社区的组成部分。

消息模式由匹配的套接字对实现。要理解消息模式必须理解套接字类型以及它们如何协同工作。有效的套接字组合有:

 

其他类型的套接字组合是非法的,将会导致不确定的结果,未来版本的ZeroMQ可能会在使用非法套接字对的时候返回错误。当然,可以用代码将各种类型的套接字桥接起来。

8 使用消息

线路中的ZeroMQ消息是适合于内存的零字节或者多字节数据。你需要使用Google Protocol Buffers、XDR、JSON或者其他方式来进行序列化。

内存中的ZeroMQ消息是zmq_msg_t结构体(或者类,取决于使用的编程语言)。在C中使用ZeroMQ消息的基本规则是:

  • 创建和传递zmq_msg_t对象,而不是数据块。
  • 读取消息:调用zmq_msg_init创建空的消息,然后传递给zmq_recv。
  • 写入消息:使用zmq_msg_init_size创建指定大小的消息,使用memcpy填入数据,然后传递给zmq_send。
  • 释放(不是销毁)消息:调用zmq_msg_close。这会减小引用计数,最终ZeroMQ会销毁消息。
  • 使用zmq_msg_data获取消息内容指针;使用zmq_msg_size得知消息内容大小。
  • 除非精确地知道自己在做什么,不要使用zmq_msg_move、zmq_msg_copy或者zmq_msg_init_data。

一旦把消息传递给zmq_send,ZeroMQ会清除(clear)消息,也就是设置其大小为零。发送之后就不能访问消息了,不能两次发送同一个消息。如果确实要两次发送相同的消息,正确的做法是:使用zmq_msg_init创建一个新的消息对象,然后调用zmq_msg_copy创建第一个消息的复本。zmq_msg_copy不会复制消息内容,而只是增加引用计数。这样就可以两次(或者多次,如果你创建多个复本)发送相同的消息了。ZeroMQ会在最后一个复本发送完成或者关闭后销毁消息。

关于消息,还应该知道:

  • ZeroMQ原子地发送和接收消息,要么得到整个消息,要么得不到消息。
  • ZeroMQ不会立即发送消息,而是有一些不确定的延迟。
  • 可以发送零字节的消息,这通常用于从一个线程向另一个线程发送信号。
  • 消息必须适合于内存。如果要发送任意大小的文件,应该把文件切分成适合于内存的片段,将每个片段作为单独的消息发送。
  • 使用没有自动垃圾回收功能的语言编程时,使用完消息之后必须调用zmq_msg_close。

再次强调,暂时不要使用zmq_msg_init_data。这个零拷贝方法现在只会给你带来麻烦。

9 处理多个套接字

低效的方式

 

ZeroMQ推荐的方式

 

更好的方法是将zmq_poll封装到一个框架中,成为一个优良的事件驱动的反应器。

10 处理错误和ETERM

我们认为,进程应该对内部错误是脆弱的,但是对外部攻击和错误则尽可能地健壮。不清楚某个错误是内部的还是外部的,这是一个设计上的错误,应该被修正。

真正使用的代码应该对每个ZeroMQ调用做错误处理。其他语言绑定可能会帮你处理错误,但是使用C则要求你自己处理错误。关于错误处理,一般有下述原则:

  • 如果失败,创建对象的方法会返回NULL
  • 错误码由errno或者zmq_errno()提供
  • zmq_strerror可以提供用于日志的描述性错误文本
  • 如果线程带NOBLOCK选项调用zmq_recv,并且没有等待收取的数据,则ZeroMQ会返回-1并且设置errno为EAGAIN
  • 如果线程调用zmq_ctx_destroy,同时还有其他线程在进行阻塞的工作。则zmq_ctx_destroy调用会关闭上下文,而所有的阻塞调用会返回-1,并且errno被设置为ETERM

我们来看看在前面的并行管线示例中如何正确地结束进程。设想我们在后台启动了很多工作者进程,现在批处理完成了,需要杀死这些工作者进程。我们通过发送kill消息来杀死工作者进程。最好是在汇聚点做这个工作,因为只有汇聚点真正知道什么时候批处理完成了。

如何将汇聚点连接到工作者?PUSH/PULL套接字仅仅是单向的。ZeroMQ的标准答案是:为需要解决的每种问题创建一个新的套接字流程。我们使用发布-订阅模型来向工作者发送kill消息:

  • 汇聚点在一个新的端点创建一个PUB套接字
  • 工作者绑定其输入套接字到这个端点
  • 汇聚点检测到批处理完成时,通过PUB套接字发送一个kill消息
  • 工作者检测到kill消息时退出

 

汇聚点只需要增加少量代码:

 

工作者代码

#include <zmq.h>
#include <zmq_helpers.h>
#include <stdio.h>
#pragma comment(lib,"libzmq-v100-mt.lib")
int main(void){
void* ctx = zmq_ctx_new();
void* receiver = zmq_socket(ctx,ZMQ_PULL);
zmq_connect(receiver,"tcp://localhost:5557");
void* sender = zmq_socket(ctx,ZMQ_PUSH);
zmq_connect(sender,"tcp://localhost:5558");
void* notify_end = zmq_socket(ctx,ZMQ_SUB);
zmq_connect(notify_end,"tcp://localhost:5559");
zmq_setsockopt(notify_end,ZMQ_SUBSCRIBE,"",0);
zmq_pollitem_t poll_item[] = {
{receiver,0,ZMQ_POLLIN,0},
{notify_end,0,ZMQ_POLLIN,0},
};
while(zmq_poll(poll_item,2,-1) >= 0){
if (poll_item[0].revents & ZMQ_POLLIN){
// 接收任务
char* p_work = s_recv(receiver);
fflush(stdout);
printf("%s\n",p_work);
// 执行任务
s_sleep(atoi(p_work));
free(p_work);
// 通知汇聚点已经完成一个任务
s_send(sender,"");
}
if (poll_item[1].revents & ZMQ_POLLIN){
break;
}
}
zmq_close(receiver);
zmq_close(sender);
zmq_close(notify_end);
zmq_ctx_destroy(ctx);
return 0;
}

汇聚点代码

#include <zmq.h>
#include <zmq_helpers.h>
#include <stdio.h>
#pragma comment(lib,"libzmq-v100-mt.lib")
int main(void){
void* ctx = zmq_ctx_new();
void* sink = zmq_socket(ctx,ZMQ_PULL);
zmq_bind(sink,"tcp://*:5558");
void* notify_end = zmq_socket(ctx,ZMQ_PUB);
zmq_bind(notify_end,"tcp://*:5559");
// 等待任务发生器通知开始
char* p_start = s_recv(sink);
free(p_start);
int64_t start_time = s_clock();
printf("开始收集任务执行结果\n");
// 收集个任务执行结果
int task_cnt;
for(task_cnt = 0; task_cnt < 100; ++task_cnt){
char* p_done = s_recv(sink);
free(p_done);
printf(".");
if ((task_cnt + 1) % 10 == 0){
printf("\n");
}
fflush(stdout);
}
printf("100个任务实际耗时: %d 毫秒\n",int(s_clock() - start_time));
// 通知工作者批处理操作完成
s_send(notify_end,"DONE");
s_sleep(1000);
zmq_close(sink);
zmq_close(notify_end);
zmq_ctx_destroy(ctx);
return 0;
}

11 处理中断信号

真正实用的应用程序需要在被Ctrl-C或者SIGTERM等信号中断的时候正确地退出。默认情况下Ctrl-C和SIGTERM会杀死进程,那么消息不会被清空(flushed)、文件不会被正确地关闭等等。

处理Ctrl-C和SIGTERM的代码

 

使用方法

  • 在main函数开始处调用s_catch_signals(),这会建立信号处理。
  • 如果代码阻塞在zmq_recv、zmq_poll或者zmq_send上,则信号到达时调用会返回EINTR。
  • s_recv()之类的封装函数会在被中断的时候返回NULL。
  • 所以应用程序应该检查EINTR、NULL返回值,或者s_interrupted。

典型代码片段

 

12 检测内存泄露

长期运行的应用需要正确地管理内存,否则可能会耗尽可用内存并且崩溃。如果使用自动管理内存的语言,那么不用关心这个问题。但是如果使用C/C++,则你就需要负责内存管理了。以下是使用valgrind检测内存泄露的简要步骤:

  • 安装valgrind。在Ubuntu或者Debina中执行:sudo apt-get install valgrind。
  • 默认情况下ZeroMQ会让valgrind产生很多警告。要移除这些警告,请创建包含以下内容的valgrind.supp文件:
  • 修改应用代码,使之在Ctrl-C之后正确地执行清理动作。对于自己退出的应用程序,这是不需要的,但是对于长期运行的应用,这是必须的,否则valgrind会对所有当前分配的内存发出警告。
  • 带-DDEBUG标志创建应用。这让valgrind可以准确地告诉你内存泄露的位置。
  • 最后,运行valgrind:

    修复valgrind报告的错误之后可以得到以下输出信息:

13 多段消息

发送多段消息

 

接收多段消息

 

注解

  • 发送多段消息的时候,只有在给出最后一段之后,所有的分段才开始发送
  • 如果使用zmq_poll,则收到第一段的时候,其他段就也已经到达了
  • 要么收到消息的所有分段,要么一个也收不到
  • 消息的每一段都是一个单独的zmq_msg_t对象
  • 不论是否检查RCVMORE选项,都会收到消息的所有分段
  • 除非关闭套接字,否则没有方法取消部分发送了的消息

14 中间层和设备

系统成员增加时,让各个成员相互了解的开销会迅速增加,为解决这个问题,可以将系统划分成较小的部分,然后用中间层连接各个部分。中间层(intermediaries)通常称作批发商(wholesalers)、分配器(distributors)、管理者(managers)等。

增长到一定尺度时ZeroMQ网络也需要中间件。ZeroMQ中的中间件称作“设备(device)”。

开始设计的时候,通常将应用程序建造为网络中一系列相互交流的节点,没有中间件。

 

以后将应用扩展到更大的网络中时,在特定的位置放置设备,增加节点数。

 

虽然没有严格的设计规则,但ZeroMQ设备通常将一系列“前端(frontend)”套接字连接到一系列“后端(backend)”套接字。设备通常是无状态的,这样才能用尽可能多的中间件来扩展应用。可以在进程中的线程里运行设备,或者作为单独的进程来运行。

设备可以作为寻址、服务、队列或者其他你可以在消息和套接字层之上定义的抽象的中间件。不同的消息模式有不同的复杂度问题,需要不同类型的中间件。比如说,请求-应答模式可以很好地与队列和服务抽象配合工作;而发布-订阅模式可以很好地与流(streams)和主题(topics)配合工作。

ZeroMQ与传统的中心化的代理的不同在于,你可以精确地将设备放置到你需要的地方,而设备能够优化地进行中介(intermediation)工作。

14.1 一个发布-订阅代理

要求将发布-订阅体系扩展到多个网段或者传输端点是很常见的一种需求。可能有一组订阅者位于远程位置。可能我们想用多播向本地订阅者发布消息,而用TCP向远程订阅者发布消息。

我们来写一个简单的代理服务器,它位于一系列发布者和一系列订阅者之间,桥接两个网络。这个可能是最简单的设备了。设备有两个套接字:面向内部网络(也就是天气服务器所在网络)的前端;面向外部网络中订阅者的后端。设备从前端套接字订阅天气信息,重新在后端套接字上发布。

#include <zmq.h>
#include <zmq_helpers.h>
#include <stdio.h>
#pragma comment(lib,"libzmq-v100-mt.lib")
int main(void){
void* ctx = zmq_ctx_new();
void* frontend = zmq_socket(ctx,ZMQ_SUB);
zmq_setsockopt(frontend,ZMQ_SUBSCRIBE,"",0);
zmq_connect(frontend,"tcp://localhost:5556");
void* backend = zmq_socket(ctx,ZMQ_PUB);
zmq_bind(backend,"tcp://*:8100");
while(true){
zmq_msg_t report;
zmq_msg_init(&report);
zmq_recvmsg(frontend,&report,0);
int hasmore;
size_t optlen = sizeof(hasmore);
zmq_getsockopt(frontend,ZMQ_RCVMORE,&hasmore,&optlen);
zmq_sendmsg(backend,&report,((hasmore == 0) ? 0 : ZMQ_SNDMORE));
zmq_msg_close(&report);
}
zmq_close(frontend);
zmq_close(backend);
zmq_ctx_destroy(ctx);
return 0;
}

我们称这个设备为“代理服务器(proxy)”,因为对于发布者而言,它是订阅者;对于订阅者而言,它是发布者。这样你可以把它插入到现有的网络中而不会影响它(当然新的订阅者需要知道与代理服务器交流)。

 

这个应用可以正确地处理多段消息。设计设备的时候,总是应该让它可以正确地处理多段消息。

14.2 一个请求-应答代理

前面的Hello World应用中,一个客户端与一个服务交流。但是真实的世界中我们通常有多个服务和多个客户端。我们来试着扩展服务的能力。唯一的限制是服务必须是无状态的,所有的状态信息都在请求中或者在某种共享存储器中,如数据库。

通常的方法

有两种方法将多个客户端连接到多个服务器。通常的、暴力的方法是,让每个客户端套接字连接到多个服务端点。应用需要自己处理负载均衡。

 

如上图所示,应用自己处理负载均衡,将请求R1和R4发送给服务A,将请求R2发送给服务B,将请求R3发送给服务C。

这个设计中可以很容易地增加更多客户端,也可以增加更多服务。每个客户端会将自己的请求均衡到多个服务中。但是客户端需要了解服务的拓扑结构。如果有100个客户端,然后要增加3个服务,则需要重新配置和启动这100个客户端,以便让客户端了解新的3个服务的信息。而这显然不是在我们的超级计算集群耗尽资源,我们需要增加数百个服务节点时想做的事情。太多固定的部分就像混凝土一样:拓扑信息是分布的,固定的部分越多,改变拓扑结构越麻烦。我们需要一种位于客户端和服务之间的东西,让它中心化拓扑信息。最好是让我们能够在任何时候添加和移除服务,而不用关心拓扑结构的其他部分。

消息队列代理

我们来写一个小的消息队列代理以提供这种灵活性。这个代理绑定到两个端点:对客户端的前端,以及对服务的后端。代理使用zmq_poll监测两个套接字的活动,在两个套接字直接转发信息。代理实际上不会管理任何队列,ZeroMQ会自动地为每个套接字处理消息队列。

 

使用REQ与REP交流的时候得到的是严格同步的请求-应答会话:客户端发送请求,服务读取请求,发送回应,然后客户端读取回应。如果客户端或者服务企图做其他事情(比如说,连续发送两个请求而不等待回应),则会发生错误。

但是我们的代理必须是非阻塞的。显然必须使用zmq_poll来等待套接字上的活动,但除此之外,我们还不能使用REP和REQ。

DEALER和ROUTER类型的套接字让我们可以进行非阻塞的请求-回应。它们以前的名字是XREQ和XREP,在旧代码中还可以看到这两个名字。第三章会介绍如何使用DEALER和ROUTER套接字创建各种类型的请求-应答流程。 

代码

使用请求-应答代理让客户端-服务器体系结构能够较容易地扩展,因为客户端看不到服务,服务也看不到客户端。唯一保持不变的节点是中间的设备。

 

客户端

#include <zmq.h>
#include <zmq_helpers.h>
#include <stdio.h>
#pragma comment(lib,"libzmq-v100-mt.lib")
int main(void){
void* ctx = zmq_ctx_new();
void* client = zmq_socket(ctx,ZMQ_REQ);
zmq_connect(client,"tcp://localhost:5559");
s_sleep(500);
for(int idx = 0; idx < 10; ++idx){
zmq_msg_t request;
zmq_msg_init_size(&request,6);
memcpy(zmq_msg_data(&request),"Hello",6);
printf("发送Hello\n");
zmq_sendmsg(client,&request,0);
zmq_msg_close(&request);
zmq_msg_t reply;
zmq_msg_init(&reply);
zmq_recvmsg(client,&reply,0);
printf("收到%s\n",(char*)zmq_msg_data(&reply));
zmq_msg_close(&reply);
}
zmq_close(client);
zmq_ctx_destroy(ctx);
return 0;
}

服务端

#include <zmq.h>
#include <zmq_helpers.h>
#include <stdio.h>
#pragma comment(lib,"libzmq-v100-mt.lib")
int main(void){
void* ctx = zmq_ctx_new();
void* responder = zmq_socket(ctx,ZMQ_REP);
zmq_connect(responder,"tcp://localhost:5560");
s_sleep(1000);
while(1){
zmq_msg_t request;
zmq_msg_init(&request);
zmq_recvmsg(responder,&request,0);
printf("收到%s\n",(char*)zmq_msg_data(&request));
zmq_msg_close(&request);
s_sleep(1000);
zmq_msg_t reply;
zmq_msg_init_size(&reply,6);
memcpy(zmq_msg_data(&reply),"World",6);
printf("发送World\n");
zmq_sendmsg(responder,&reply,0);
zmq_msg_close(&reply);
}
zmq_close(responder);
zmq_ctx_destroy(ctx);
return 0;
}

代理

#include <zmq.h>
#include <zmq_helpers.h>
#include <stdio.h>
#pragma comment(lib,"libzmq-v100-mt.lib")
int main(void){
void* ctx = zmq_ctx_new();
void* frontend = zmq_socket(ctx,ZMQ_ROUTER);
zmq_bind(frontend,"tcp://*:5559");
void* backend = zmq_socket(ctx,ZMQ_DEALER);
zmq_bind(backend,"tcp://*:5560");
zmq_pollitem_t items[]={
{frontend,0,ZMQ_POLLIN,0},
{backend,0,ZMQ_POLLIN,0},
};
while(zmq_poll(items,2,-1) >= 0){
// router --> dealer
if (items[0].revents & ZMQ_POLLIN){
while(true){
zmq_msg_t a_msg;
zmq_msg_init(&a_msg);
zmq_recvmsg(frontend,&a_msg,0);
int hasmore;
size_t optlen = sizeof(hasmore);
zmq_getsockopt(frontend,ZMQ_RCVMORE,&hasmore,&optlen);
zmq_sendmsg(backend,&a_msg,((hasmore == 0) ? 0 : ZMQ_SNDMORE));
zmq_msg_close(&a_msg);
if (!hasmore){
break;
}
}
}
// dealer --> router
if (items[1].revents & ZMQ_POLLIN){
while(true){
zmq_msg_t a_msg;
zmq_msg_init(&a_msg);
zmq_recvmsg(backend,&a_msg,0);
int hasmore;
size_t optlen = sizeof(hasmore);
zmq_getsockopt(backend,ZMQ_RCVMORE,&hasmore,&optlen);
zmq_sendmsg(frontend,&a_msg,((hasmore == 0) ? 0 : ZMQ_SNDMORE));
zmq_msg_close(&a_msg);
if (!hasmore){
break;
}
}
}
}
zmq_close(frontend);
zmq_close(backend);
zmq_ctx_destroy(ctx);
return 0;
}

14.3 内建的设备

ZeroMQ提供了三种内建的设备:

  • QUEUE 用作请求-应答代理,要求ROUTER/DEALER套接字对
  • FORWARDER 用作发布-订阅代理,要求PUB/SUB套接字对
  • STREAMER 与FORWARDER相似,只是用于管线流程,要求PULL/PUSH套接字对

启动设备的代码如下:

 

完整的示例代码如下: 

#include <zmq.h>
#include <zmq_helpers.h>
#include <stdio.h>
#pragma comment(lib,"libzmq-v100-mt.lib")
int main(void){
void* ctx = zmq_ctx_new();
void* frontend = zmq_socket(ctx,ZMQ_ROUTER);
zmq_bind(frontend,"tcp://*:5559");
void* backend = zmq_socket(ctx,ZMQ_DEALER);
zmq_bind(backend,"tcp://*:5560");
zmq_device(ZMQ_QUEUE,frontend,backend);
zmq_close(frontend);
zmq_close(backend);
zmq_ctx_destroy(ctx);
return 0;
}

不要邪恶地想:如果把任意类型的套接字对传入到zmq_device函数中会发生什么?不要那么做,那会导致不确定的行为。如果真的需要那么做,你需要编写自己的设备。

15 多线程

使用ZeroMQ编写多线程程序不需要互斥量和锁,除了在套接字之间传递的消息之外,也不需要任何其他形式的线程间通信。需要遵循的规则如下:

  • 不得从多个线程访问相同的数据。在ZeroMQ应用中使用互斥量等典型的多线程编程技术是违背模式的(anti-pattern)。唯一的例外是ZeroMQ上下文对象,它是线程安全的。
  • 必须为进程创建ZeroMQ上下文,然后传递给想要通过inproc套接字互联的所有线程。
  • 可以将线程看做单独的任务,有自己的上下文,但是这些线程将不能使用inproc套接字相互通信。只是这样会让线程能够较容易地切分成单独的进程。
  • 不得在线程间共享ZeroMQ套接字,因为它不是线程安全的。虽然在技术上可以做到共享ZeroMQ套接字,但是这要求信号量、锁,或者互斥量。这些会让你的应用变慢和变得脆弱。唯一需要在线程间共享套接字的场合是需要对套接字进行特殊操作的语言绑定,比如说垃圾收集。

比如说,如果要启动多个设备,则每个设备都需要在自己的线程中运行。常见的错误是在一个线程中创建设备套接字,然后将其传递给另一个线程中的设备。这好像能够工作,但是会随机出现故障。记住:不要在创建套接字的线程之外使用或者关闭套接字。

如果遵循上述原则,则在必要的时候可以很容易地将线程切分到单独的进程中。

我们来扩展前面的Hello World应用,使用多个工作线程来处理请求。当然可以用队列设备和外部工作者进程来实现,但是用一个进程耗尽16个CPU核心的处理能力通常比用16进程,每个耗尽1个核心的处理能力要容易。而且,将工作者作为线程运行也会减少网络跳数,延迟,以及网络流量。

#include <zmq.h>
#include <zmq_helpers.h>
#include <boost/thread.hpp>
#include <stdio.h>
#pragma comment(lib,"libzmq-v100-mt.lib")
static void worker_proc(void* ctx){
void* responder = zmq_socket(ctx,ZMQ_REP);
zmq_connect(responder,"inproc://workers");
while(1){
zmq_msg_t request;
zmq_msg_init(&request);
zmq_recvmsg(responder,&request,0);
printf("收到%s\n",(char*)zmq_msg_data(&request));
zmq_msg_close(&request);
s_sleep(1000);
zmq_msg_t reply;
zmq_msg_init_size(&reply,6);
memcpy(zmq_msg_data(&reply),"World",6);
printf("发送World\n");
zmq_sendmsg(responder,&reply,0);
zmq_msg_close(&reply);
}
}
int main(void){
void* ctx = zmq_ctx_new();
void* frontend = zmq_socket(ctx,ZMQ_ROUTER);
zmq_bind(frontend,"tcp://*:5559");
void* backend = zmq_socket(ctx,ZMQ_DEALER);
zmq_bind(backend,"inproc://workers");
for(int idx = 0; idx < 5; ++idx){
boost::thread worker(worker_proc,ctx);
}
zmq_device(ZMQ_QUEUE,frontend,backend);
zmq_close(frontend);
zmq_close(backend);
zmq_ctx_destroy(ctx);
return 0;
}

上述多线程版本的Hello World服务把队列设备和工作者合并到单个进程中了。

 

16 线程间的信号

使用ZeroMQ编写多线程应用时会遇到协调线程的问题。虽然可以使用sleep或者其他多线程技术,如信号量或者互斥量,但是你应该使用的唯一机制是ZeroMQ消息。

假设三个线程需要依次宣告自己准备好了。这个例子中我们在inproc传输端点上使用PAIR套接字:

#include <zmq.h>
#include <zmq_helpers.h>
#include <boost/thread.hpp>
#include <stdio.h>
#pragma comment(lib,"libzmq-v100-mt.lib")
static void step1(void* ctx){
void* this_step = zmq_socket(ctx,ZMQ_PAIR);
zmq_connect(this_step,"inproc://step1");
s_send(this_step,"step1");
zmq_close(this_step);
}
static void step2(void* ctx){
void* prev_step = zmq_socket(ctx,ZMQ_PAIR);
zmq_bind(prev_step,"inproc://step1");
boost::thread step1_(step1,ctx);
char* p = s_recv(prev_step);
free(p);
zmq_close(prev_step);
//---------------------------------
void* this_step = zmq_socket(ctx,ZMQ_PAIR);
zmq_connect(this_step,"inproc://step2");
s_send(this_step,"step2");
zmq_close(this_step);
}
static void step3(void* ctx){
void* prev_step = zmq_socket(ctx,ZMQ_PAIR);
zmq_bind(prev_step,"inproc://step2");
boost::thread step2_(step2,ctx);
char* p = s_recv(prev_step);
free(p);
zmq_close(prev_step);
//---------------------------------
void* this_step = zmq_socket(ctx,ZMQ_PAIR);
zmq_connect(this_step,"inproc://step3");
s_send(this_step,"step3");
zmq_close(this_step);
}
int main(void){
void* ctx = zmq_ctx_new();
void* last_step = zmq_socket(ctx,ZMQ_PAIR);
zmq_bind(last_step,"inproc://step3");
boost::thread(step3,ctx);
char* p = s_recv(last_step);
free(p);
zmq_close(last_step);
zmq_ctx_destroy(ctx);
return 0;
}

 

这是ZeroMQ用于多线程时的一种典型模式:

  • 两个线程使用共享的上下文中的inproc传输端点进行通信
  • 父线程创建一个套接字,绑定到inproc://端点,然后启动子线程,传入上下文
  • 子线程创建第二个套接字,连接到inproc://端点,然后通知父线程自己已经准备好了

注意:使用这种模式的多线程代码不能扩展到多个进程。使用inproc传输端点和套接字对会创建紧密耦合的应用。请在低延迟确实非常重要的时候才这么做。对于通常的应用,请对每个线程使用一个上下文,使用ipc或者tcp传输端点。这样在需要的时候就可以很容易地将线程划分到单独的进程中。

为什么使用PAIR套接字?其他套接字组合可能能够工作,但是都有可能会影响信号的副作用:

  • 可以使用PUSH作为发送方,使用PULL作为接收方。但是PUSH会将消息负载均衡给所有可用的接收者。如果你意外地启动了两个接收者,则会丢失一半的信号。PAIR的优点在于拒绝多个连接,它是独占的。
  • 可以使用DEALER作为发送方,使用ROUTER作为接收方。但是ROUTER会使用“信封”封装消息,使得零字节的信号称为多段消息。如果你不关心数据,把任何数据当做是有效的信号,并且只在套接字上执行一次读取操作,那么没有问题。但是如果你决定发送实际的数据,则会发现ROUTER会给你“错误的”消息。DEALER也是负载平衡的,和PUSH有相同的风险。
  • 可以使用PUB作为发送方,使用SUB作为接收方。这样可以正确地投递消息,而且PUB不会像PUSH和DEALER那样进行负载平衡。但是你需要配置订阅者执行一个空的订阅,这是比较麻烦的。更糟的是,PUB-SUB链路的可靠性依赖于时间,如果PUB套接字发送消息的时候SUB套接字正在进行连接,则消息会丢失。

因为上述原因,PAIR是用于协调两个线程的最佳选择。

17 节点协调

要协调节点的时候,PAIR套接字就不管用了。这是线程和节点的策略不同的少数情况之一。理论上说节点可以上线和下线,而线程是不受影响的。但是远端节点离开然后重新上线时,PAIR套接字不会自动进行重新连接。

线程和节点的第二点不同在于,通常线程的数量是固定的,但是节点数是可变的。我们来在先前的场景(天气服务器和客户端)中使用节点协调,以保证订阅者在启动过程中不会错过(天气)数据。

应用程序的工作过程:

  • 发布者预先知道期望有多少个订阅者。这个数目是从别处得到的。
  • 启动发布者,等待所有订阅者连接。这部分工作就是节点协调。每个订阅者进行订阅操作,并且通过另一个套接字通知发布者,自己已经准备好了。
  • 发布者等到所有订阅者连接后,开始发布数据。

这个例子中我们使用REQ-REP套接字来同步发布者和订阅者。

 

发布者

 

订阅者

 

不能认为REQ/REP会话完成的时候SUB已经完成了了连接。除非使用inproc传输端点,否则没有办法确定外出的连接已经完成。所以示例代码只是简单地在发起订阅和进行REQ/REP同步之间调用sleep。

更健壮的做法可以是:

  • 发布者打开PUB套接字,发送”Hello”消息(不是实际的数据)
  • 订阅者打开SUB套接字,进行连接,并且在接收到Hello消息后通过一个REQ/REP套接字对通知发布者
  • 发布者收到所有必要的确认后,开始发送真实的数据

18 零拷贝

第一章中我们提醒过你,作为新手,最好不要使用零拷贝。如果你看到了这里,那么可能你已经准备好使用零拷贝了。然而,要记住的是,很多路是通向地狱的,过早的优化并不好。在应用的体系并不完美的时候就试图进行零拷贝只是浪费时间,只会让事情更糟糕,而不是更好。

要进行零拷贝,请使用zmq_msg_init_data创建一个消息,代表一个用malloc在堆上分配的数据块,然后传递给zmq_send。在创建消息的时候还需要传入一个函数,ZeroMQ将在发送完消息后调用这个函数来释放数据块。下面是一个简单的示例:

 

不能在接收的时候进行零拷贝:ZeroMQ会给你一个缓冲区,你想用多久就用多久,但是不能直接把收到的数据写入到应用程序的缓冲区中。

零拷贝可以用于多段消息。传统的消息系统中,发送多段消息之前需要把多个不同的缓冲区列集(marshal)到一个缓冲区中。这就需要复制数据。ZeroMQ让你可以将多个不同来源的缓冲区作为单个消息帧来发送。ZeroMQ会将每个缓冲区作为一个限定长度的帧发送。从应用程序来看,这需要一系列的send和recv调用。但是在ZeroMQ内部,所有数据通过单个系统调用写入到网络中,通过单个系统调用读回数据,所以效率很高。

19 发布-订阅模式中的消息信封

前面简单介绍过多段消息,现在来看看它的主要用途:消息信封。

发布-订阅基于前缀来匹配消息。把关键字放入到单独的帧中让匹配非常直接。

 

带信封的发布者

 

处理信封的订阅者

 

注意:订阅过滤器拒绝或者接受整个多段消息,不能只接受多段消息的一部分。

 

当然,也可以使用多个信封。

 

20 高水位标记

如果进程A发送消息给进程B,而B突然变得很忙(垃圾收集、CPU过载等等),那么进程A想发送的消息会怎样?有些消息已经在B的网络缓冲区中;有些还在以太网线路中;有些在A的网络缓冲区中;而其他的则累积在A的内存中。如果不采取某些措施,A很容易耗尽内存然后崩溃。这是消息代理中常见的典型问题。

如何解决这个问题?方法之一是向上传递问题。A从其他什么地方取得消息,所以告诉那个地方暂停就可以了。这就是“流控制”。有些情况下可以使用流控制,但是在更多情况下,传输层不能告诉应用层“暂停”发送消息。

正确的答案是对缓冲区大小设置一个限制,达到这个限制的时候采取一些措施。大多数情况下,采取的措施是丢弃消息;少数其他情况下则是等待。

  • ZeroMQ使用概念“高水位标记(high water mark)”或者说HWM来定义其内部管道的容量。套接字上的每个外出连接或者进入连接都有自己的管道以及HWM容量。
  • ZeroMQ 2.x版本中默认的HWM值为无限大;ZeroMQ 3.x版本中默认的HWM值为1000。
  • 高水位标记同时影响套接字的发送和接收缓冲区。有些套接字(PUB、PUSH)只有发送缓冲区;有些(SUB、PULL、REQ?、REP)则只有接收缓冲区;有些(DEALER、ROUTER、PAIR)则同时有发送和接收缓冲区。
  • 根据套接字类型的不同,达到高水位标记的时候,套接字要么阻塞,要么丢弃数据:PUB套接字会在达到高水位标记的时候丢弃数据;而其他类型的套接字则会阻塞。
  • inproc传输端点上的发送方和接收方共享同一个缓冲区,所以实际的HWM是两端设置的HWM的和。如果一方没有设置HWM,则缓冲区大小没有限制(仅对于ZeroMQ 2.x版本?)。

ZeroMQ应用就像插接到一起的一些盒子,唯一的限制只是你的想象。

可伸缩的体系结构并不是新的概念,基于流的程序设计(flow-based programming)和Erlang等语言就是这样工作的,但是ZeroMQ让它前所未有地容易使用。

最后编辑:
作者:wy182000
这个作者貌似有点懒,什么都没有留下。

留下一个回复