博客分类:
· MQ
本来是想做个翻译的,奈何英文太差,还是逐个的对zeroMQ各用法进行简析,文中代码主要来自pyzmq中的example,详细原文请自行参看这里,也不清楚有没有兄台做过类似工作,这里主要供自个儿学习备忘,如有谬误,欢迎指出~
简介:
ØMQ (ZeroMQ, 0MQ, zmq),这一堆表达方式看哪个顺眼就选哪个吧,都指的咱要讲的这玩意儿。
它出现的目的只有一个:更高效的利用机器。好吧,这是我个人的看法,官方说法是:让任何地方、任何代码可以互联。
应该很明白吧,如果非要做联想类比,好吧,可以想成经典的C/S模型,这个东东封装了所有底层细节,开发人员只要关注代码逻辑就可以了。(虽然联想成C/S,但可不仅仅如此哦,具体往下看)。
它的通信协议是AMQP,具体的Google之吧,在自由市场里,它有一个对头RabbitMQ,关于那只”兔子”,那又是另外一个故事了。
C/S模式:
server
1. import zmq
2.
3. c = zmq.Context()
4. s = c.socket(zmq.REP)
5. #s.bind(‘tcp://127.0.0.1:10001’)
6. s.bind(‘ipc:///tmp/zmq’)
7.
8. while True:
9. msg = s.recv_pyobj()
10. s.send_pyobj(msg)
11. s.close()
client
1. import zmq
2.
3. c = zmq.Context()
4. s = c.socket(zmq.REQ)
5. #s.connect(‘tcp://127.0.0.1:10001’)
6. s.connect(‘ipc:///tmp/zmq’)
7. s.send_pyobj(‘hello’)
8. msg = s.recv_pyobj()
9. print msg
注意:
这个经典的模式在zeroMQ中是应答状态的,不能同时send多个数据,只能ababab这样。还有这里send_pyobj是pyzmq特有的,用以传递python的对象,通用的还是如同socket的send~
zeroMQ初体验-2.发布订阅模式(pub/sub)
博客分类:
· MQ
1. import itertools
2. import sys
3. import time
4.
5. import zmq
6.
7. def main():
8. if len (sys.argv) != 2:
9. print ‘usage: publisher <bind-to>’
10. sys.exit (1)
11.
12. bind_to = sys.argv[1]
13.
14. all_topics = [‘sports.general’,’sports.football’,’sports.basketball’,
15. ‘stocks.general’,’stocks.GOOG’,’stocks.AAPL’,
16. ‘weather’]
17.
18. ctx = zmq.Context()
19. s = ctx.socket(zmq.PUB)
20. s.bind(bind_to)
21.
22. print “Starting broadcast on topics:”
23. print ” %s” % all_topics
24. print “Hit Ctrl-C to stop broadcasting.”
25. print “Waiting so subscriber sockets can connect…”
26. print
27. time.sleep(1.0)
28.
29. msg_counter = itertools.count()
30. try:
31. for topic in itertools.cycle(all_topics):
32. msg_body = str(msg_counter.next())
33. print ‘ Topic: %s, msg:%s’ % (topic, msg_body)
34. #s.send_multipart([topic, msg_body])
35. s.send_pyobj([topic, msg_body])
36. # short wait so we don’t hog the cpu
37. time.sleep(0.1)
38. except KeyboardInterrupt:
39. pass
40.
41. print “Waiting for message queues to flush…”
42. time.sleep(0.5)
43. s.close()
44. print “Done.”
45.
46. if __name__ == “__main__”:
47. main()
订阅端(sub):
1. import sys
2. import time
3. import zmq
4.
5. def main():
6. if len (sys.argv) < 2:
7. print ‘usage: subscriber <connect_to> [topic topic …]’
8. sys.exit (1)
9.
10. connect_to = sys.argv[1]
11. topics = sys.argv[2:]
12.
13. ctx = zmq.Context()
14. s = ctx.socket(zmq.SUB)
15. s.connect(connect_to)
16.
17. # manage subscriptions
18. if not topics:
19. print “Receiving messages on ALL topics…”
20. s.setsockopt(zmq.SUBSCRIBE,”)
21. else:
22. print “Receiving messages on topics: %s …” % topics
23. for t in topics:
24. s.setsockopt(zmq.SUBSCRIBE,t)
25. print
26. try:
27. while True:
28. #topic, msg = s.recv_multipart()
29. topic, msg = s.recv_pyobj()
30. print ‘ Topic: %s, msg:%s’ % (topic, msg)
31. except KeyboardInterrupt:
32. pass
33. print “Done.”
34.
35. if __name__ == “__main__”:
36. main()
注意:
这里的发布与订阅角色是绝对的,即发布者无法使用recv,订阅者不能使用send,并且订阅者需要设置订阅条件”setsockopt”。
按照官网的说法,在这种模式下很可能发布者刚启动时发布的数据出现丢失,原因是用zmq发送速度太快,在订阅者尚未与发布者建立联系时,已经开始了数据发布(内部局域网没这么夸张的)。官网给了两个解决方案;1,发布者sleep一会再发送数据(这个被标注成愚蠢的);2,(还没有看到那,在后续中发现的话会更新这里)。
官网还提供了一种可能出现的问题:当订阅者消费慢于发布,此时就会出现数据的堆积,而且还是在发布端的堆积(有朋友指出是堆积在消费端,或许是新版本改进,需要读者的尝试和反馈,thx!),显然,这是不可以被接受的。至于解决方案,或许后面的”分而治之”就是吧。
zeroMQ初体验-3.分而治之模式(push/pull)
博客分类:
· MQ
push/pull模式:
模型描述:
1.上游(任务发布)
2.工人(中间,具体工作)
3.下游(信号采集或者工作结果收集)
上游代码:
1. import zmq
2. import random
3. import time
4.
5. context = zmq.Context()
6.
7. # Socket to send messages on
8. sender = context.socket(zmq.PUSH)
9. sender.bind(“tcp://*:5557”)
10.
11. print “Press Enter when the workers are ready: ”
12. _ = raw_input()
13. print “Sending tasks to workers…”
14.
15. # The first message is “0” and signals start of batch
16. sender.send(‘0’)
17.
18. # Initialize random number generator
19. random.seed()
20.
21. # Send 100 tasks
22. total_msec = 0
23. for task_nbr in range(100):
24. # Random workload from 1 to 100 msecs
25. workload = random.randint(1, 100)
26. total_msec += workload
27. sender.send(str(workload))
28. print “Total expected cost: %s msec” % total_msec
工作代码:
1. import sys
2. import time
3. import zmq
4.
5. context = zmq.Context()
6.
7. # Socket to receive messages on
8. receiver = context.socket(zmq.PULL)
9. receiver.connect(“tcp://localhost:5557”)
10.
11. # Socket to send messages to
12. sender = context.socket(zmq.PUSH)
13. sender.connect(“tcp://localhost:5558”)
14.
15. # Process tasks forever
16. while True:
17. s = receiver.recv()
18.
19. # Simple progress indicator for the viewer
20. sys.stdout.write(‘.’)
21. sys.stdout.flush()
22.
23. # Do the work
24. time.sleep(int(s)*0.001)
25.
26. # Send results to sink
27. sender.send(”)
下游代码:
1. import sys
2. import time
3. import zmq
4.
5. context = zmq.Context()
6.
7. # Socket to receive messages on
8. receiver = context.socket(zmq.PULL)
9. receiver.bind(“tcp://*:5558”)
10.
11. # Wait for start of batch
12. s = receiver.recv()
13.
14. # Start our clock now
15. tstart = time.time()
16.
17. # Process 100 confirmations
18. total_msec = 0
19. for task_nbr in range(100):
20. s = receiver.recv()
21. if task_nbr % 10 == 0:
22. sys.stdout.write(‘:’)
23. else:
24. sys.stdout.write(‘.’)
25.
26. # Calculate and report duration of batch
27. tend = time.time()
28. print “Total elapsed time: %d msec” % ((tend-tstart)*1000)
注意点:
这种模式与pub/sub模式一样都是单向的,区别有两点:
1,该模式下在没有消费者的情况下,发布者的信息是不会消耗的(由发布者进程维护)
2,多个消费者消费的是同一列信息,假设A得到了一条信息,则B将不再得到
这种模式主要针对在消费者能力不够的情况下,提供的多消费者并行消费解决方案(也算是之前的pub/sub模式的那个”堵塞问题”的一个解决策略吧)
由上面的模型图可以看出,这是一个N:N的模式,在1:N的情况下,各消费者并不是平均消费的,而在N:1的情况下,则有所不同,如下图:
这种模式主要关注点在于,可以扩展中间worker,来到达并发的目的。
zeroMQ初体验-4.传教(为什么要用ZeroMQ?)
博客分类:
· MQ
既然是读书笔记,按照顺序,原作者在这里”唠嗑”了一大堆Why,看的着实有些热血沸腾,幸而也是他的对头”兔子”的簇拥,略带客观的说说:
zeroMQ从某种层面上甚至都不能称为软件(或许连工具都称不上)。它只是一套组件,封装了复杂的网络、进程等连接环境,向上提供API,由各个语言编写对应类库,正式运用环境是通过调用对应的类库来实现的。
从自由的角度来说,它是去中心化的,自由度极高;而从安全稳固的角度来看,则是有着无法避免的缺憾。(原本就没有完美)
额,有点偏了。
0mq中的0从”零延迟”衍生为:零成本、零浪费、零管理,奉行简约主义的哲学。
你是否还在为网络编程中,为繁复的socket策略而纠结?甚或应此放弃或者逃避网络化设计?现在,ZeroMQ这个新世纪网络编程的福音出现了,不再需要拘泥于底层连接的策略问题,全部交给zeroMQ吧,专注于你所专注,网络编程就是这么简单~
友情提示:
代码规范别忘记
zeroMQ初体验-5.高级教程初涉
博客分类:
· MQ
诸位在前面的例子中,已经可以发现所有的关系都是成对匹配出现的。
之前已经使用的几种模式:
req/rep(请求答复模式):主要用于远程调用及任务分配等。
pub/sub(订阅模式):主要用于数据分发。
push/pull(管道模式):主要用于多任务并行。
除此之外,还有一种模式,是因为大多数人还么有从”TCP”传统模式思想转变过来,习惯性尝试的独立成对模式(1to1).这个在后面会有介绍。
ZeroMQ内置的有效绑定对:
· PUB and SUB
· REQ and REP
· REQ and XREP
· XREQ and REP
· XREQ and XREP
· XREQ and XREQ
· XREP and XREP
· PUSH and PULL
· PAIR and PAIR
非正常匹配会出现意料之外的问题(未必报错,但可能数据不通路什么的,官方说法是未来可能会有统一错误提示吧),未来还会有更高层次的模式(当然也可以自己开发)。
由于zeroMQ的发送机制,发送到数据有两种状态(是否Copy),在非Copy下,一旦发送成功,发送端将不再能访问到该数据,Copy状态则可以(主要用于重复发送)。还有就是所发送的信息都是保持在内存,故不能随意发送大数据(以防溢出),推荐的做法是拆分逐个发送。(python中的单条信息限制为4M.)
–补充:
这样的发送需要额外标识ZMQ_SNDMORE,在接收端可以通过ZMQ_RCVMORE来判定。
号外!
官方似乎野心勃勃啊,想将zeroMQ加入到Linux kernel,若真做到可就了不得了。
zeroMQ初体验-6.多模式数据来源处理方案(multi sockets)
博客分类:
· MQ
之前已经讲过,zeroMQ是可以多对多的,但需要成对匹配才行,即多个发布端都是同一种模式,而这里要涉及到的是,多个发布端模式不统一的情况。
文中先给出了一个比较”脏”的处理方式:
1. import zmq
2. import time
3.
4. context = zmq.Context()
5.
6. receiver = context.socket(zmq.PULL)
7. receiver.connect(“tcp://localhost:5557”)
8.
9. subscriber = context.socket(zmq.SUB)
10. subscriber.connect(“tcp://localhost:5556”)
11. subscriber.setsockopt(zmq.SUBSCRIBE, “10001”)
12.
13. while True:
14.
15. while True:
16. try:
17. rc = receiver.recv(zmq.NOBLOCK)#这是非阻塞模式
18. except zmq.ZMQError:
19. break
20.
21. while True:
22. try:
23. rc = subscriber.recv(zmq.NOBLOCK)
24. except zmq.ZMQError:
25. break
显然,如此做既不优雅,还有出现单来源循环不止,另一来源又得不到响应的状况。
自然,官方也做了相应的封装,给了一个相对优雅的实现:
1. import zmq
2.
3. context = zmq.Context()
4.
5. receiver = context.socket(zmq.PULL)
6. receiver.connect(“tcp://localhost:5557”)
7.
8. subscriber = context.socket(zmq.SUB)
9. subscriber.connect(“tcp://localhost:5556”)
10. subscriber.setsockopt(zmq.SUBSCRIBE, “10001”)
11.
12. poller = zmq.Poller()
13. poller.register(receiver, zmq.POLLIN)
14. poller.register(subscriber, zmq.POLLIN)
15.
16. while True:
17. socks = dict(poller.poll())
18.
19. if receiver in socks and socks[receiver] == zmq.POLLIN:
20. message = receiver.recv()
21.
22. if subscriber in socks and socks[subscriber] == zmq.POLLIN:
23. message = subscriber.recv()
这种方式采用了平衡兼顾的原则,实现了类似于同一模式多发布端推送的”平衡队列”功能。
zeroMQ初体验-7.优雅的卸载工作进程
博客分类:
· MQ
关掉一个进程有很多种方式,而在ZeroMQ中则推崇通过使用信号通知,可控的卸载、关闭进程。在这里,要援引之前的”分而治之”例子(具体可以见这里)。
例图:
显然,信号发送是由能够掌握整个进度的”水槽”(下游)来控制,在原有基础上做少许变更即可。
Worker(数据处理):
1. import sys
2. import time
3. import zmq
4.
5. context = zmq.Context()
6.
7. receiver = context.socket(zmq.PULL)
8. receiver.connect(“tcp://localhost:5557”)
9.
10. sender = context.socket(zmq.PUSH)
11. sender.connect(“tcp://localhost:5558”)
12.
13. controller = context.socket(zmq.SUB)
14. controller.connect(“tcp://localhost:5559”)
15. controller.setsockopt(zmq.SUBSCRIBE, “”)
16.
17. poller = zmq.Poller()
18. poller.register(receiver, zmq.POLLIN)
19. poller.register(controller, zmq.POLLIN)
20. while True:
21. socks = dict(poller.poll())
22.
23. if socks.get(receiver) == zmq.POLLIN:
24. message = receiver.recv()
25.
26. workload = int(message) # Workload in msecs
27. time.sleep(workload / 1000.0)
28. sender.send(message)
29.
30. sys.stdout.write(“.”)
31. sys.stdout.flush()
32.
33. if socks.get(controller) == zmq.POLLIN:
34. break
水槽(下游):
1. import sys
2. import time
3. import zmq
4.
5. context = zmq.Context()
6.
7. receiver = context.socket(zmq.PULL)
8. receiver.bind(“tcp://*:5558”)
9.
10. controller = context.socket(zmq.PUB)
11. controller.bind(“tcp://*:5559”)
12.
13. receiver.recv()
14.
15. tstart = time.time()
16.
17. for task_nbr in xrange(100):
18. receiver.recv()
19. if task_nbr % 10 == 0:
20. sys.stdout.write(“:”)
21. else:
22. sys.stdout.write(“.”)
23. sys.stdout.flush()
24.
25. tend = time.time()
26. tdiff = tend – tstart
27. total_msec = tdiff * 1000
28. print “Total elapsed time: %d msec” % total_msec
29.
30. controller.send(“KILL”)
31. time.sleep(1)
注意:
正常情况下,即使进程被关闭,可能端口并没有被清除(那是有ZeroMQ维护的),原文中调用了这么两句
zmq_close (server)
zmq_term (context)
python中对应为zmq.close(),zmq.term(),不过python的垃圾回收会替俺们解决后顾之忧的~
zeroMQ初体验-8.内存泄漏了?
博客分类:
· MQ
写过”永不停歇”的代码的兄弟应该都或多或少遇到或考虑到内存溢出之类的问题,那么,在ZeroMQ的应用中,又如何处理如是情况?
文中给出了类C这种需要自行管理内存的解决方案(虽然python的GC很强大,不过,关注下总没有坏处):
这里运用到了这个工具:valgrind
为了避免zeromq中的一些warning的干扰,首先需要重新build下zermq
· $ cd zeromq
· $ export CPPFLAGS=-DZMQ_MAKE_VALGRIND_HAPPY
· $ ./configure
· $ make clean; make
· $ sudo make install
然后:
valgrind –tool=memcheck –leak-check=full someprog
由此帮助,通过修正代码,应该可以得到如下令人愉快的信息:
==30536== ERROR SUMMARY: 0 errors from 0 contexts…
似乎这是技巧章了,与ZeroMQ关联度不是太大啊,读书笔记嘛,书上写了,就记录下,学习下。
zeroMQ初体验-9.优雅的扩展(代理模式)
博客分类:
· MQ
前面所谈到的网络拓扑结构都是这样的:
而在实际的应用中,绝大多数会出现这样的结构要求:
zeroMQ中自然也提供了这样的需求案例:
1.发布/订阅 代理模式:
1. import zmq
2.
3. context = zmq.Context()
4.
5. frontend = context.socket(zmq.SUB)
6. frontend.connect(“tcp://192.168.55.210:5556”)
7.
8. backend = context.socket(zmq.PUB)
9. backend.bind(“tcp://10.1.1.0:8100”)
10.
11. frontend.setsockopt(zmq.SUBSCRIBE, ”)
12.
13. while True:
14. while True:
15. message = frontend.recv()
16. more = frontend.getsockopt(zmq.RCVMORE)
17. if more:
18. backend.send(message, zmq.SNDMORE)
19. else:
20. backend.send(message)
21. break # Last message part
注意代码,这个代理是支持大数据多包发送的。这个proxy实现了下图:
2.请求/答复 代理模式:
因为zeroMQ天然支持”多对多”,所以看似不需要代理啊,如下图:
不过,这样会有一个问题,客户端需要知道所有的服务地址,并且在服务地址出现变迁时,需要通知客户端,这样迁移扩展的复杂度将无法预估。故,需要实现下图:
客户端:
1. import zmq
2.
3. context = zmq.Context()
4. socket = context.socket(zmq.REQ)
5. socket.connect(“tcp://localhost:5559”)
6.
7. for request in range(1,10):
8. socket.send(“Hello”)
9. message = socket.recv()
10. print “Received reply “, request, “[“, message, “]”
服务器端:
1. import zmq
2.
3. context = zmq.Context()
4. socket = context.socket(zmq.REP)
5. socket.connect(“tcp://localhost:5560”)
6.
7. while True:
8. message = socket.recv()
9. print “Received request: “, message
10. socket.send(“World”)
代理端:
1. import zmq
2.
3. context = zmq.Context()
4. frontend = context.socket(zmq.XREP)
5. backend = context.socket(zmq.XREQ)
6. frontend.bind(“tcp://*:5559”)
7. backend.bind(“tcp://*:5560”)
8.
9. poller = zmq.Poller()
10. poller.register(frontend, zmq.POLLIN)
11. poller.register(backend, zmq.POLLIN)
12.
13. while True:
14. socks = dict(poller.poll())
15.
16. if socks.get(frontend) == zmq.POLLIN:
17. message = frontend.recv()
18. more = frontend.getsockopt(zmq.RCVMORE)
19. if more:
20. backend.send(message, zmq.SNDMORE)
21. else:
22. backend.send(message)
23.
24. if socks.get(backend) == zmq.POLLIN:
25. message = backend.recv()
26. more = backend.getsockopt(zmq.RCVMORE)
27. if more:
28. frontend.send(message, zmq.SNDMORE)
29. else:
30. frontend.send(message)
上面的代码组成了下面的网络结构:
客户端与服务器端互相透明,世界一下清净了…
这节上了好多图和代码,绝对都是干货。不过,既然0mq已经想到了,为毛还要咱自己写代理捏?So,虽然上面的都是干货,或许,马上,就可以统统忘掉了。下面,展示下0mq自带的代理方案:
1. import zmq
2.
3. def main():
4. context = zmq.Context(1)
5.
6. frontend = context.socket(zmq.XREP)
7. frontend.bind(“tcp://*:5559”)
8.
9. backend = context.socket(zmq.XREQ)
10. backend.bind(“tcp://*:5560”)
11.
12. zmq.device(zmq.QUEUE, frontend, backend)
13.
14. frontend.close()
15. backend.close()
16. context.term()
17.
18. if __name__ == “__main__”:
19. main()
这是应答模式的代理,官方提供了三种标准代理:
应答模式:queue XREP/XREQ
订阅模式:forwarder SUB/PUB
分包模式:streamer PULL/PUSH
特别提醒:
官方可不推荐代理混搭,不然责任自负。按照官方的说法,既然要混搭,还是自个儿写代理比较靠谱~
zeroMQ初体验-10.优雅的使用多线程
博客分类:
· MQ
“或许,ZeroMQ是最好的多线程运行环境!”官网如是说。
其实它想要支持的是那种类似erlang信号模式。传统多线程总会伴随各种”锁”出现各种稀奇古怪的问题。而zeroMQ的多线程致力于”去锁化”,简单来说,一条数据在同一时刻只允许被一个线程持有(而传统的是:只允许被一个线程操作)。而锁,是因为可能会出现的多线程同时操作一条数据才出现的副产品。从这里就可以很清晰的看出zeromq的切入点了,通过线程间的数据流动来保证同一时刻任何数据都只会被一个线程持有。
这里给出传统的应答模式的例子:
1. import time
2. import threading
3. import zmq
4.
5. def worker_routine(worker_url, context):
6.
7. socket = context.socket(zmq.REP)
8.
9. socket.connect(worker_url)
10.
11. while True:
12.
13. string = socket.recv()
14. print(“Received request: [%s]\n” % (string))
15. time.sleep(1)
16.
17. socket.send(“World”)
18.
19. def main():
20. url_worker = “inproc://workers”
21. url_client = “tcp://*:5555”
22.
23. context = zmq.Context(1)
24.
25. clients = context.socket(zmq.XREP)
26. clients.bind(url_client)
27.
28. workers = context.socket(zmq.XREQ)
29. workers.bind(url_worker)
30.
31. for i in range(5):
32. thread = threading.Thread(target=worker_routine, args=(url_worker, context, ))
33. thread.start()
34.
35. zmq.device(zmq.QUEUE, clients, workers)
36.
37. clients.close()
38. workers.close()
39. context.term()
40.
41. if __name__ == “__main__”:
42. main()
这样的切分还有一个隐性的好处,万一要从多线程转为多进程,可以非常容易的把代码切割过来再利用。
这里还给了一个用多线程不用多进程的理由:
进程开销太大了(话说,python是鼓励多进程替代线程的)。
上面代码给出的例子似乎没有子线程间的通信啊?既然支持用多线程,自然不会忘了这个:
1. import threading
2. import zmq
3.
4. def step1(context):
5. sender = context.socket(zmq.PAIR)
6. sender.connect(“inproc://step2”)
7. sender.send(“”)
8.
9. def step2(context):
10. receiver = context.socket(zmq.PAIR)
11. receiver.bind(“inproc://step2”)
12.
13. thread = threading.Thread(target=step1, args=(context, ))
14. thread.start()
15.
16. string = receiver.recv()
17.
18. sender = context.socket(zmq.PAIR)
19. sender.connect(“inproc://step3”)
20. sender.send(“”)
21.
22. return
23.
24. def main():
25. context = zmq.Context(1)
26.
27. receiver = context.socket(zmq.PAIR)
28. receiver.bind(“inproc://step3”)
29.
30. thread = threading.Thread(target=step2, args=(context, ))
31. thread.start()
32.
33. string = receiver.recv()
34.
35. print(“Test successful!\n”)
36.
37. receiver.close()
38. context.term()
39.
40. return
41.
42. if __name__ == “__main__”:
43. main()
注意:
这里用到了一个新的端口类型:PAIR。专门为进程间通信准备的(文中还列了下为神马么用之前已经出现过的类型比如应答之类的)。这种类型及时,可靠,安全(进程间其实也是可以用的,与应答相似)。
zeroMQ初体验-11.节点间的协作
博客分类:
· MQ
上一篇讲到了线程间的协作,通过zeroMQ的pair模式可以很优雅的实现。而在各节点间(进程级),则适用度不高(虽然也能用)。这里给出了两个理由:
1.节点间是可以调节的,而线程间不是(线程是稳定的),pair模式是非自动连接的.
2.线程数是固定的,可预估的。而节点则是变动、不可预估的。
由此得出结论:pair适用于稳定、可控的环境。
所以,有了本章节。不知诸位还记得前面所讲的发布/订阅模式,在那里曾说过这种模式是不太稳定的(主要是指初始阶段),容易在连接未建立前就发布、废弃部分数据。在这里,通过节点间的协作来解决那个难题。
模型图:
发布端:
1. import zmq
2.
3. SUBSCRIBERS_EXPECTED = 2
4.
5. def main():
6. context = zmq.Context()
7.
8. publisher = context.socket(zmq.PUB)
9. publisher.bind(‘tcp://*:5561’)
10.
11. syncservice = context.socket(zmq.REP)
12. syncservice.bind(‘tcp://*:5562’)
13.
14. subscribers = 0
15. while subscribers < SUBSCRIBERS_EXPECTED:
16. msg = syncservice.recv()
17. syncservice.send(”)
18. subscribers += 1
19. print “+1 subscriber”
20.
21. for i in range(1000000):
22. publisher.send(‘Rhubarb’);
23.
24. publisher.send(‘END’)
25.
26. if name == ’main’:
27. main()
订阅端:
1. import zmq
2.
3. def main():
4. context = zmq.Context()
5.
6. subscriber = context.socket(zmq.SUB)
7. subscriber.connect(‘tcp://localhost:5561’)
8. subscriber.setsockopt(zmq.SUBSCRIBE, ””)
9.
10. syncclient = context.socket(zmq.REQ)
11. syncclient.connect(‘tcp://localhost:5562’)
12.
13. syncclient.send(”)
14.
15. syncclient.recv()
16.
17. nbr = 0
18. while True:
19. msg = subscriber.recv()
20. if msg == ’END’:
21. break
22. nbr += 1
23.
24. print ‘Received %d updates’ % nbr
25.
26. if name == ’main’:
27. main()
由上例可见,通过应答模式解决了之前的困扰,如果还不放心的话,也可以通过发布特定参数,当订阅端得到时再应答,安全系数便又升了一级。不过这里有个大前提,得先通过某种方式得到或预估一个概念数来确保应用的可用性。
zeroMQ初体验-12.安全与稳定
博客分类:
· MQ
可能绝大多数接触zeromq的人都会对其去中心的自由感到满意,同时却又对数据传输的可靠性产生怀疑甚至沮丧(如果恰巧你也知道”兔子”的话)。
在这里,或许可以为此作出一些弥补,增强诸位使用它的信心。
zeromq之所以传输的速度无以伦比,它的”zero copy”功不可没,在这种机制下,减少了数据的二次缓存和挪动,并且减少了通讯间的应答式回应。不过在快速的同时,也降低了数据传递的可靠性。而打开copy机制,则在牺牲一定速度的代价下提升了其稳定性。
除了zero-copy机制外,zeromq还提供了一种命名机制,用以建立所谓的”Durable Sockets”。从之前的章节中已知,数据传输层面的事情已经由zeromq接管,那么在 “Durable Sockets”下,即使你的程序崩溃,或者因为其他原因导致节点丢失(挂掉?)zeromq会适当的为节点存储数据,以便当节点重新连上时,可以获取之前的数据
未启用命名机制时:
启用后:
相关设置:
1. zmq_setsockopt (socket, ZMQ_IDENTITY, ”Lucy”, 4);
注意:
1.如果要启用命名机制,必须在连接前设定名字。
2.不要重名!
3.在连接建立后不要再修改名字。
4.最好不要随机命名。
5.如果需要获知消息来源的名字,需要在消息发送时附加上(xrep会自动获取)名字。
zeroMQ初体验-13.发布/订阅模式 进阶
博客分类:
· MQ
前面章节有介绍过当传输大数据时,建议分拆成多个小数据逐个发送,以防单条数据过大引发内存溢出等问题。同样的,这也适用于 发布订阅模式,这里用到了一个新名词:信封。
这种封装的数据结构看起来是这样的:
由于key的关系,不用担心出现 被分拆为多份的数据只被对应订阅方部分持有 这种尴尬的局面。
发布端:
1. import time
2. import zmq
3.
4. def main():
5.
6. context = zmq.Context(1)
7. publisher = context.socket(zmq.PUB)
8. publisher.bind(“tcp://*:5563”)
9.
10. while True:
11. publisher.send_multipart([“A”, ”We don’t want to see this”])
12. publisher.send_multipart([“B”, ”We would like to see this”])
13. time.sleep(1)
14.
15. publisher.close()
16. context.term()
17.
18. if name == ”main”:
19. main()
订阅端:
1. import zmq
2.
3. def main():
4.
5. context = zmq.Context(1)
6. subscriber = context.socket(zmq.SUB)
7. subscriber.connect(“tcp://localhost:5563”)
8. subscriber.setsockopt(zmq.SUBSCRIBE, ”B”)
9.
10. while True:
11. [address, contents] = subscriber.recv_multipart()
12. print(“[%s] %s\n” % (address, contents))
13.
14. subscriber.close()
15. context.term()
16.
17. if name == ”main”:
18. main()
zeroMQ初体验-14.命名机制 进阶
博客分类:
· MQ
前文曾提到过命名机制,事实上它是一把双刃剑。在能够持有数据等待重新连接的时候,也增加了持有数据方的负担(危险),特别是在”发布/订阅”模式下,可谓牵一发而动全身。
这里先给出一组示例,在代码的运行过程中,通过重启消费者来观察发布者的进程状态。
发布端:
1. import zmq
2. import time
3.
4. context = zmq.Context()
5.
6. sync = context.socket(zmq.PULL)
7. sync.bind(“tcp://*:5564”)
8.
9. publisher = context.socket(zmq.PUB)
10. publisher.bind(“tcp://*:5565”)
11.
12. sync_request = sync.recv()
13.
14. for n in xrange(10):
15. msg = ”Update %d” % n
16. publisher.send(msg)
17. time.sleep(1)
18.
19. publisher.send(“END”)
20. time.sleep(1) # Give 0MQ/2.0.x time to flush output
订阅端:
1. import zmq
2. import time
3.
4. context = zmq.Context()
5.
6. subscriber = context.socket(zmq.SUB)
7. subscriber.setsockopt(zmq.IDENTITY, ”Hello”)
8. subscriber.setsockopt(zmq.SUBSCRIBE, ””)
9. subscriber.connect(“tcp://localhost:5565”)
10.
11. sync = context.socket(zmq.PUSH)
12. sync.connect(“tcp://localhost:5564”)
13. sync.send(“”)
14.
15. while True:
16. data = subscriber.recv()
17. print data
18. if data == ”END”:
19. break
订阅端得到的信息:
1. $ durasub
2. Update 0
3. Update 1
4. Update 2
5. ^C
6. $ durasub
7. Update 3
8. Update 4
9. Update 5
10. Update 6
11. Update 7
12. ^C
13. $ durasub
14. Update 8
15. Update 9
16. END
数据被发布者存储了,而发布者的内存占用也节节升高(很危险啊)。所以是否使用命名策略是需要谨慎选择的。为了以防万一,zeromq也提供了”高水位”机制,即当发送端持有数据达到一定数量就不再存储后面的数据,很好的控制了风险。这个机制也适当解决了这里的慢消费问题。
使用了 高水位 后的测试结果:
1. $ durasub
2. Update 0
3. Update 1
4. ^C
5. $ durasub
6. Update 2
7. Update 3
8. Update 7
9. Update 8
10. Update 9
11. END
“高水位”封堵了内存崩溃的可能性,却是以数据丢失为代价的,zeromq也为此配对提供了”swap”功能,将内存中的数据转存入硬盘,实现了”既不耗内存又不丢数据”。
实现代码:
1. import zmq
2. import time
3.
4. context = zmq.Context()
5.
6. # Subscriber tells us when it’s ready here
7. sync = context.socket(zmq.PULL)
8. sync.bind(“tcp://*:5564”)
9.
10. # We send updates via this socket
11. publisher = context.socket(zmq.PUB)
12. publisher.bind(“tcp://*:5565”)
13.
14. # Prevent publisher overflow from slow subscribers
15. publisher.setsockopt(zmq.HWM, 1)
16.
17. # Specify the swap space in bytes, this covers all subscribers
18. publisher.setsockopt(zmq.SWAP, 25000000)
19.
20. # Wait for synchronization request
21. sync_request = sync.recv()
22.
23. # Now broadcast exactly 10 updates with pause
24. for n in xrange(10):
25. msg = ”Update %d” % n
26. publisher.send(msg)
27. time.sleep(1)
28.
29. publisher.send(“END”)
30. time.sleep(1) # Give 0MQ/2.0.x time to flush output
注意点:
高水位与交换区的设定,是需要根据实际运用状态来确定的,高水位设的过小,会影响到速度。
如果是数据存储端崩溃了,那么,所有数据将彻底消失。
关于高水位的特别说明:
除了PUB型会在达到高水位丢弃后续数据外,其他类型的都会以阻塞的形式来应对后续数据。
线程间的通信,高水位是通信双方共同设置的总和,如果有一方没有设置,则高水位规则不会起到作用。
zeroMQ初体验-15.应答模式进阶(一)-数据的封装
博客分类:
· MQ
整整一大章全部讲的应答模式的进阶,应该很重要吧(简直是一定的)。
上一节讲到了发布/订阅模式 关于封装的话题,在应答模式中也是如此,不过这个动作已经被底层(zeromq)接管,对应用透明。而其中普通模式与X模式又有区别,例如:req连接Xrep:
说明:
第三部分是实际发送的数据
第二部分是REQ向XREP发送请求时底层附加的
第一部分是XREP自身地址
注意:
前文已经说过,XREP其实用以平衡负载,所以这里由它对请求数据做了封装操作,如果通过多个XREP,数据结构就会变成这个样子:
同时,如果没有启用命名机制,XREP会自动赋予临时名字:
不然,就是这样了:
这里给出一个验证代码:
1. import zmq
2. import zhelpers
3.
4. context = zmq.Context()
5.
6. sink = context.socket(zmq.XREP)
7. sink.bind(“inproc://example”)
8.
9. # First allow 0MQ to set the identity
10. anonymous = context.socket(zmq.XREQ)
11. anonymous.connect(“inproc://example”)
12. anonymous.send(“XREP uses a generated UUID”)
13. zhelpers.dump(sink)
14.
15. # Then set the identity ourself
16. identified = context.socket(zmq.XREQ)
17. identified.setsockopt(zmq.IDENTITY, ”Hello”)
18. identified.connect(“inproc://example”)
19. identified.send(“XREP socket uses REQ’s socket identity”)
20. zhelpers.dump(sink)
上面的代码用到的zhelpers:
1. from random import randint
2.
3. import zmq
4.
5.
6. # Receives all message parts from socket, prints neatly
7. def dump(zsocket):
8. print “—————————————-”
9. for part in zsocket.recv_multipart():
10. print “[%03d]” % len(part),
11. if all(31 < ord(c) < 128 for c in part):
12. print part
13. else:
14. print “”.join(“%x” % ord(c) for c in part)
15.
16.
17. # Set simple random printable identity on socket
18. def set_id(zsocket):
19. identity = “%04x-%04x” % (randint(0, 0x10000), randint(0, 0x10000))
20. zsocket.setsockopt(zmq.IDENTITY, identity)
zeroMQ初体验-16.应答模式进阶(二)-定制路由1
博客分类:
· MQ
在上一节中已经提到XREP主要工作是包装数据,打上标记以便方便的传递数据。那么,换个角度来看,这不就是路由么!其实在优雅的扩展中有介绍过。在这里针对XREP模式做深入的探索。
首先,得要理一下其中几种类型的差别(相似的名字真是坑爹啊):
REQ,官网称之为”老妈类型”,因为它负责主动提出请求,并且要求得到答复(严格同步的)
REP,”老爸类型”,负责应答请求,(从不主动,也是严格同步的)
XREQ,”分销类型”,负责对进出的数据排序,均匀的分发给接入的REP或者XREP
XREP,”路由类型”,将信息转发至任何与他有连接的地方,可以和任何类型相连,不过看起来,天然的和老妈比较亲密。
传统的看法是,应答模式自然得同步的。不过在这里,显然是可以做到异步的(只要”老爸”或者”老妈”不处在整个线路的中间位置).
通常定制路由会用到以下四种通迅连接:
XREP-to-XREQ.
XREP-to-REQ.
XREP-to-REP.
XREP-to-XREP.
在这几种基本连接下,定制路由完全看各人的想象力了。不过在即将到来的各种通讯的详解前,还是得要申明一下:
自定义路由有风险,使用需谨慎啊!
首先要介绍的是XREP-XREQ模式:
这是比较简单的一种模式,XREQ会用到三种情景:1,汇总,2,代理分发,3,响应答复。
这里要注意,如果XREQ用于响应答复,最好只有一个XREP与它相连,因为XREQ不会指定发送目标,而会将数据均衡的摊派给所有与它有连接关系的XREP.
这里给出一个汇总式的例子:
1. import time
2. import random
3. from threading import Thread
4. import zmq
5.
6. def worker_a(context):
7. worker = context.socket(zmq.XREQ)
8. worker.setsockopt(zmq.IDENTITY, ’A’)
9. worker.connect(“ipc://routing.ipc”)
10.
11. total = 0
12. while True:
13. request = worker.recv()
14. finished = request == ”END”
15. if finished:
16. print “A received:”, total
17. break
18. total += 1
19.
20. def worker_b(context):
21. worker = context.socket(zmq.XREQ)
22. worker.setsockopt(zmq.IDENTITY, ’B’)
23. worker.connect(“ipc://routing.ipc”)
24.
25. total = 0
26. while True:
27. request = worker.recv()
28. finished = request == ”END”
29. if finished:
30. print “B received:”, total
31. break
32. total += 1
33.
34. context = zmq.Context()
35. client = context.socket(zmq.XREP)
36. client.bind(“ipc://routing.ipc”)
37.
38. Thread(target=worker_a, args=(context,)).start()
39. Thread(target=worker_b, args=(context,)).start()
40.
41. time.sleep(1)
42.
43. for _ in xrange(10):
44. if random.randint(0, 2) > 0:
45. client.send(“A”, zmq.SNDMORE)
46. else:
47. client.send(“B”, zmq.SNDMORE)
48.
49. client.send(“This is the workload”)
50.
51. client.send(“A”, zmq.SNDMORE)
52. client.send(“END”)
53.
54. client.send(“B”, zmq.SNDMORE)
55. client.send(“END”)
56.
57. time.sleep(1) # Give 0MQ/2.0.x time to flush output
传递的数据结构:
因为这是无应答的,比较简单,如果要应答的话,会稍微麻烦些,需要用到前面讲的POLLl来调度。在代码中有一行sleep,主要是为了等待接收端准备就绪,否则有可能像”发布/订阅”那样,丢失数据。除了XREP与PUB外,其他类型都不会存在这种问题(都会阻塞等待)。
注意:
在路由模式下,永远是不安全的,想要得到保障,就应该在得到路由信息时答复路由(回应一下)。
zeroMQ初体验-17.应答模式进阶(三)-定制路由2
博客分类:
· MQ
XREP-REQ模式:
典型的”老妈模式”,只有当她真的要听你说时,她才能听的进去。所以首先,得要REQ告诉你“她准备好了,你可以讲了”,然后,你才能倾吐…
一般来说与XREQ一样,一个REQ只能连接一个XREP(除非你想做容错,不过,不建议那样)。
实例模型:
1. import time
2. import random
3. from threading import Thread
4.
5. import zmq
6.
7. import zhelpers
8.
9. NBR_WORKERS = 10
10.
11. def worker_thread(context):
12. worker = context.socket(zmq.REQ)
13.
14. # We use a string identity for ease here
15. zhelpers.set_id(worker)
16. worker.connect(“ipc://routing.ipc”)
17.
18. total = 0
19. while True:
20. # Tell the router we’re ready for work
21. worker.send(“ready”)
22.
23. # Get workload from router, until finished
24. workload = worker.recv()
25. finished = workload == ”END”
26. if finished:
27. print “Processed: %d tasks” % total
28. break
29. total += 1
30.
31. # Do some random work
32. time.sleep(random.random() / 10 + 10 ** -9)
33.
34. context = zmq.Context()
35. client = context.socket(zmq.XREP)
36. client.bind(“ipc://routing.ipc”)
37.
38. for _ in xrange(NBR_WORKERS):
39. Thread(target=worker_thread, args=(context,)).start()
40.
41. for _ in xrange(NBR_WORKERS * 10):
42. # LRU worker is next waiting in the queue
43. address = client.recv()
44. empty = client.recv()
45. ready = client.recv()
46.
47. client.send(address, zmq.SNDMORE)
48. client.send(“”, zmq.SNDMORE)
49. client.send(“This is the workload”)
50.
51. # Now ask mama to shut down and report their results
52. for _ in xrange(NBR_WORKERS):
53. address = client.recv()
54. empty = client.recv()
55. ready = client.recv()
56.
57. client.send(address, zmq.SNDMORE)
58. client.send(“”, zmq.SNDMORE)
59. client.send(“END”)
60.
61. time.sleep(1) # Give 0MQ/2.0.x time to flush output
传递的数据结构:
注意点:
如果”老妈”没有和你主动联系,那么就不要向她发一个字!
XREP-REP模式:
这种模式并不属于经典的应用范畴,通常的做法是XREP-XREQ-REP,由“分销商”来负责数据的传递。不过既然有这两种类型,不妨试着联通看看~
实例模型:
1. import time
2.
3. import zmq
4.
5. import zhelpers
6.
7. context = zmq.Context()
8. client = context.socket(zmq.XREP)
9. client.bind(“ipc://routing.ipc”)
10.
11. worker = context.socket(zmq.REP)
12. worker.setsockopt(zmq.IDENTITY, ”A”)
13. worker.connect(“ipc://routing.ipc”)
14.
15. # Wait for sockets to stabilize
16. time.sleep(1)
17.
18. client.send(“A”, zmq.SNDMORE)
19. client.send(“address 3”, zmq.SNDMORE)
20. client.send(“address 2”, zmq.SNDMORE)
21. client.send(“address 1”, zmq.SNDMORE)
22. client.send(“”, zmq.SNDMORE)
23. client.send(“This is the workload”)
24.
25. # Worker should get just the workload
26. zhelpers.dump(worker)
27.
28. # We don’t play with envelopes in the worker
29. worker.send(“This is the reply”)
30.
31. # Now dump what we got off the XREP socket…
32. zhelpers.dump(client)
数据结构:
注意:
因为REP不像REQ那样,他是被动的,所以在往REP传递数据时,先得确定他已经存在,不然数据可就丢了。
zeroMQ初体验-18.应答模式进阶(四)-定制路由3
博客分类:
· MQ
从经典到超越经典。
首先,先回顾下经典:
然后,扩展:
然后,变异:
1. import threading
2. import time
3. import zmq
4.
5. NBR_CLIENTS = 10
6. NBR_WORKERS = 3
7.
8. def worker_thread(worker_url, context, i):
9. “”” Worker using REQ socket to do LRU routing ”””
10.
11. socket = context.socket(zmq.REQ)
12.
13. identity = ”Worker-%d” % (i)
14.
15. socket.setsockopt(zmq.IDENTITY, identity) #set worker identity
16.
17. socket.connect(worker_url)
18.
19. # Tell the borker we are ready for work
20. socket.send(“READY”)
21.
22. try:
23. while True:
24.
25. # python binding seems to eat empty frames
26. address = socket.recv()
27. request = socket.recv()
28.
29. print(“%s: %s\n” %(identity, request))
30.
31. socket.send(address, zmq.SNDMORE)
32. socket.send(“”, zmq.SNDMORE)
33. socket.send(“OK”)
34.
35. except zmq.ZMQError, zerr:
36. # context terminated so quit silently
37. if zerr.strerror == ’Context was terminated’:
38. return
39. else:
40. raise zerr
41.
42.
43. def client_thread(client_url, context, i):
44. “”” Basic request-reply client using REQ socket ”””
45.
46. socket = context.socket(zmq.REQ)
47.
48. identity = ”Client-%d” % (i)
49.
50. socket.setsockopt(zmq.IDENTITY, identity) #Set client identity. Makes tracing easier
51.
52. socket.connect(client_url)
53. # Send request, get reply
54. socket.send(“HELLO”)
55. reply = socket.recv()
56. print(“%s: %s\n” % (identity, reply))
57. return
58.
59. def main():
60. “”” main method ”””
61.
62. url_worker = ”inproc://workers”
63. url_client = ”inproc://clients”
64. client_nbr = NBR_CLIENTS
65.
66. # Prepare our context and sockets
67. context = zmq.Context(1)
68. frontend = context.socket(zmq.XREP)
69. frontend.bind(url_client)
70. backend = context.socket(zmq.XREP)
71. backend.bind(url_worker)
72.
73.
74.
75. # create workers and clients threads
76. for i in range(NBR_WORKERS):
77. thread = threading.Thread(target=worker_thread, args=(url_worker, context, i, ))
78. thread.start()
79.
80. for i in range(NBR_CLIENTS):
81. thread_c = threading.Thread(target=client_thread, args=(url_client, context, i, ))
82. thread_c.start()
83.
84. # Logic of LRU loop
85. # - Poll backend always, frontend only if 1+ worker ready
86. # - If worker replies, queue worker as ready and forward reply
87. # to client if necessary
88. # - If client requests, pop next worker and send request to it
89.
90. # Queue of available workers
91. available_workers = 0
92. workers_list = []
93.
94. # init poller
95. poller = zmq.Poller()
96.
97. # Always poll for worker activity on backend
98. poller.register(backend, zmq.POLLIN)
99.
100. # Poll front-end only if we have available workers
101. poller.register(frontend, zmq.POLLIN)
102.
103. while True:
104.
105. socks = dict(poller.poll())
106. # Handle worker activity on backend
107. if (backend in socks and socks[backend] == zmq.POLLIN):
108.
109. # Queue worker address for LRU routing
110. worker_addr = backend.recv()
111.
112. assert available_workers < NBR_WORKERS
113.
114. # add worker back to the list of workers
115. available_workers += 1
116. workers_list.append(worker_addr)
117.
118. # Second frame is empty
119. empty = backend.recv()
120. assert empty == ””
121.
122. # Third frame is READY or else a client reply address
123. client_addr = backend.recv()
124.
125. # If client reply, send rest back to frontend
126. if client_addr != ”READY”:
127.
128. # Following frame is empty
129. empty = backend.recv()
130. assert empty == ””
131.
132. reply = backend.recv()
133.
134. frontend.send(client_addr, zmq.SNDMORE)
135. frontend.send(“”, zmq.SNDMORE)
136. frontend.send(reply)
137.
138. client_nbr -= 1
139.
140. if client_nbr == 0:
141. break # Exit after N messages
142.
143. # poll on frontend only if workers are available
144. if available_workers > 0:
145.
146. if (frontend in socks and socks[frontend] == zmq.POLLIN):
147. # Now get next client request, route to LRU worker
148. # Client request is [address][empty][request]
149. client_addr = frontend.recv()
150.
151. empty = frontend.recv()
152. assert empty == ””
153.
154. request = frontend.recv()
155.
156. # Dequeue and drop the next worker address
157. available_workers -= 1
158. worker_id = workers_list.pop()
159.
160. backend.send(worker_id, zmq.SNDMORE)
161. backend.send(“”, zmq.SNDMORE)
162. backend.send(client_addr, zmq.SNDMORE)
163. backend.send(request)
164.
165. #out of infinite loop: do some housekeeping
166. time.sleep (1)
167.
168. frontend.close()
169. backend.close()
170. context.term()
171.
172.
173. if name == ”main”:
174. main()
client发出的数据结构:
路由处理成:
再转给worker成:
工人处理的数据:
由worker到client是一个逆序过程,不过因为两边都是REQ类型,所以其实是一致的。
[补]:
通常,上层的api会帮我们做一些事,免去了逐步封装数据的麻烦,比如在python中,最终代码会是这个样子:
1. import threading
2. import time
3. import zmq
4.
5. NBR_CLIENTS = 10
6. NBR_WORKERS = 3
7.
8. def worker_thread(worker_url, context, i):
9. “”” Worker using REQ socket to do LRU routing ”””
10.
11. socket = context.socket(zmq.REQ)
12.
13. identity = ”Worker-%d” % (i)
14.
15. socket.setsockopt(zmq.IDENTITY, identity) #set worker identity
16.
17. socket.connect(worker_url)
18.
19. # Tell the borker we are ready for work
20. socket.send(“READY”)
21.
22. try:
23. while True:
24.
25. [address, request] = socket.recv_multipart()
26.
27. print(“%s: %s\n” %(identity, request))
28.
29. socket.send_multipart([address, ””, ”OK”])
30.
31. except zmq.ZMQError, zerr:
32. # context terminated so quit silently
33. if zerr.strerror == ’Context was terminated’:
34. return
35. else:
36. raise zerr
37.
38.
39. def client_thread(client_url, context, i):
40. “”” Basic request-reply client using REQ socket ”””
41.
42. socket = context.socket(zmq.REQ)
43.
44. identity = ”Client-%d” % (i)
45.
46. socket.setsockopt(zmq.IDENTITY, identity) #Set client identity. Makes tracing easier
47.
48. socket.connect(client_url)
49.
50. # Send request, get reply
51. socket.send(“HELLO”)
52.
53. reply = socket.recv()
54.
55. print(“%s: %s\n” % (identity, reply))
56.
57. return
58.
59.
60. def main():
61. “”” main method ”””
62.
63. url_worker = ”inproc://workers”
64. url_client = ”inproc://clients”
65. client_nbr = NBR_CLIENTS
66.
67. # Prepare our context and sockets
68. context = zmq.Context(1)
69. frontend = context.socket(zmq.XREP)
70. frontend.bind(url_client)
71. backend = context.socket(zmq.XREP)
72. backend.bind(url_worker)
73.
74.
75.
76. # create workers and clients threads
77. for i in range(NBR_WORKERS):
78. thread = threading.Thread(target=worker_thread, args=(url_worker, context, i, ))
79. thread.start()
80.
81. for i in range(NBR_CLIENTS):
82. thread_c = threading.Thread(target=client_thread, args=(url_client, context, i, ))
83. thread_c.start()
84.
85. # Logic of LRU loop
86. # - Poll backend always, frontend only if 1+ worker ready
87. # - If worker replies, queue worker as ready and forward reply
88. # to client if necessary
89. # - If client requests, pop next worker and send request to it
90.
91. # Queue of available workers
92. available_workers = 0
93. workers_list = []
94.
95. # init poller
96. poller = zmq.Poller()
97.
98. # Always poll for worker activity on backend
99. poller.register(backend, zmq.POLLIN)
100.
101. # Poll front-end only if we have available workers
102. poller.register(frontend, zmq.POLLIN)
103.
104. while True:
105.
106. socks = dict(poller.poll())
107.
108. # Handle worker activity on backend
109. if (backend in socks and socks[backend] == zmq.POLLIN):
110.
111. # Queue worker address for LRU routing
112. message = backend.recv_multipart()
113.
114. assert available_workers < NBR_WORKERS
115.
116. worker_addr = message[0]
117.
118. # add worker back to the list of workers
119. available_workers += 1
120. workers_list.append(worker_addr)
121.
122. # Second frame is empty
123. empty = message[1]
124. assert empty == ””
125.
126. # Third frame is READY or else a client reply address
127. client_addr = message[2]
128.
129. # If client reply, send rest back to frontend
130. if client_addr != ”READY”:
131.
132. # Following frame is empty
133. empty = message[3]
134. assert empty == ””
135.
136. reply = message[4]
137.
138. frontend.send_multipart([client_addr, ””, reply])
139.
140. client_nbr -= 1
141.
142. if client_nbr == 0:
143. break # Exit after N messages
144.
145. # poll on frontend only if workers are available
146. if available_workers > 0:
147.
148. if (frontend in socks and socks[frontend] == zmq.POLLIN):
149. # Now get next client request, route to LRU worker
150. # Client request is [address][empty][request]
151.
152. [client_addr, empty, request ] = frontend.recv_multipart()
153.
154. assert empty == ””
155.
156. # Dequeue and drop the next worker address
157. available_workers -= 1
158. worker_id = workers_list.pop()
159.
160. backend.send_multipart([worker_id, ””, client_addr, request])
161.
162.
163. #out of infinite loop: do some housekeeping
164. time.sleep (1)
165.
166. frontend.close()
167. backend.close()
168. context.term()
169.
170.
171. if name == ”main”:
172. main()
zeroMQ初体验-19.应答模式进阶(五)-异步式应答
博客分类:
· MQ
1. import zmq
2. import threading
3. import time
4. from random import choice
5.
6. class ClientTask(threading.Thread):
7. “””ClientTask”””
8. def init(self):
9. threading.Thread.init (self)
10.
11. def run(self):
12. context = zmq.Context()
13. socket = context.socket(zmq.XREQ)
14. identity = ’worker-%d’ % (choice([0,1,2,3,4,5,6,7,8,9]))
15. socket.setsockopt(zmq.IDENTITY, identity )
16. socket.connect(‘tcp://localhost:5570’)
17. print ‘Client %s started’ % (identity)
18. poll = zmq.Poller()
19. poll.register(socket, zmq.POLLIN)
20. reqs = 0
21. while True:
22. for i in xrange(5):
23. sockets = dict(poll.poll(1000))
24. if socket in sockets:
25. if sockets[socket] == zmq.POLLIN:
26. msg = socket.recv()
27. print ‘%s: %s\n’ % (identity, msg)
28. del msg
29. reqs = reqs + 1
30. print ‘Req #%d sent..’ % (reqs)
31. socket.send(‘request #%d’ % (reqs))
32.
33. socket.close()
34. context.term()
35.
36. class ServerTask(threading.Thread):
37. “””ServerTask”””
38. def init(self):
39. threading.Thread.init (self)
40.
41. def run(self):
42. context = zmq.Context()
43. frontend = context.socket(zmq.XREP)
44. frontend.bind(‘tcp://*:5570’)
45.
46. backend = context.socket(zmq.XREQ)
47. backend.bind(‘inproc://backend’)
48.
49. workers = []
50. for i in xrange(5):
51. worker = ServerWorker(context)
52. worker.start()
53. workers.append(worker)
54.
55. poll = zmq.Poller()
56. poll.register(frontend, zmq.POLLIN)
57. poll.register(backend, zmq.POLLIN)
58.
59. while True:
60. sockets = dict(poll.poll())
61. if frontend in sockets:
62. if sockets[frontend] == zmq.POLLIN:
63. msg = frontend.recv()
64. print ‘Server received %s’ % (msg)
65. backend.send(msg)
66. if backend in sockets:
67. if sockets[backend] == zmq.POLLIN:
68. msg = backend.recv()
69. frontend.send(msg)
70.
71. frontend.close()
72. backend.close()
73. context.term()
74.
75. class ServerWorker(threading.Thread):
76. “””ServerWorker”””
77. def init(self, context):
78. threading.Thread.init (self)
79. self.context = context
80.
81. def run(self):
82. worker = self.context.socket(zmq.XREQ)
83. worker.connect(‘inproc://backend’)
84. print ‘Worker started’
85. while True:
86. msg = worker.recv()
87. print ‘Worker received %s’ % (msg)
88. replies = choice(xrange(5))
89. for i in xrange(replies):
90. time.sleep(1/choice(range(1,10)))
91. worker.send(msg)
92. del msg
93.
94. worker.close()
95.
96. def main():
97. “””main function”””
98. server = ServerTask()
99. server.start()
100. for i in xrange(3):
101. client = ClientTask()
102. client.start()
103.
104. server.join()
105.
106.
107. if name == ”main”:
108. main()
作为一个异步的服务器,详图应该是这样的:
这里的数据传递顺序是这样的:
1. client server frontend worker
2. [ XREQ ]<—->[ XREP <—-> XREQ <—-> XREQ ]
3. 1 part 2 parts 2 parts
在这里有可能碰到一个比较经典的c/s问题:
c端太多,耗尽s端资源怎么办?
这就需要靠谱些的机制了,比如通过“心跳”来确定是否应该释放这个c端的资源等。当然,那就是另外一个话题了。
zeroMQ初体验-20.应答模式进阶(六)-多对多路由模式
博客分类:
· MQ
1. import zmq
2. import time
3. import zhelpers
4.
5. context = zmq.Context()
6.
7. worker = context.socket(zmq.XREP)
8. worker.setsockopt(zmq.IDENTITY, ”WORKER”)
9. worker.bind(“ipc://rtrouter.ipc”)
10.
11. server = context.socket(zmq.XREP)
12. server.setsockopt(zmq.IDENTITY, ”SERVER”)
13. server.connect(“ipc://rtrouter.ipc”)
14.
15. time.sleep(1)
16.
17. server.send_multipart([“WORKER”, ””, ”send to worker”])
18. zhelpers.dump(worker)
19.
20. worker.send_multipart([“SERVER”, ””, ”send to server”])
21. zhelpers.dump(server)
注意:
虽然看起来这样很美好,不过,潜在着一个巨大的风险:混乱。同一个层级上的路由必须要通过命名来唯一化,以便减少出现混乱的可能性。
zeroMQ初体验-21.应答模式进阶(七)-云计算
博客分类:
· MQ
这里给出了一个最近很火的”云计算”案例。
定义:
在各种各样的硬件设备上运行着N多的worker,而任意一个worker都能够独立解决一个问题。每一个集群有这样的设备成千上百个,而同时又有一打这样的集群互相连接交互,于是,这么一个总的集合称为“云”,而其提供的服务称为“云计算”。
在“云中”的任一设备或集群都可以做到”进出自由”、任何崩溃的worker都能被检测和重启,那么,基本上就可以称为靠谱的云计算了。
首先,是一个独立的集群:
是不是很眼熟?其实这里已经有过介绍。
然后,进行扩展到多个集群:
这张图中有一个很明显的问题:两个集群间的client和worker如何互相访问?
此处有两种解决方案:
1.
这个看起来还不错,不过却有”忙者恒忙”的坏处:一个worker说“我ok了”,两个路由都知道了,同一时刻都分配了任务给他.这不是我们想要的。
2.
这个看上去更加简洁,只有中间商之间互相交换资源以达成目标。这其实是一个较”经济人”算法复杂些的算法–“经济人”互相又是”分包商”的角色。
现在,咱们选择第二种方案,那么两个中间件互联的方案选择又会衍生出好几种方式,在这里,先给出最简单的(也是我们一直在用的)“应答方案”,将中间件再组合成类c/s应答形式:
如此,似乎又产生了一个新问题(太过简单本身也是个问题啊):传统的c/s应答模式一次只能响应一个请求,然后。。就没有然后了。so,这里中间件的连接更靠谱的是使用异步连接。
除此之外,文中还给出了一个类似DNS的方案,中间件之间以“发布/订阅”的方式来交换各自的资源情况,再以“异步应答”来交换task。
在即将的案例前,良好的命名规范是非常必要的。
这里会出现三组“插座”:
1.集群内部的req/res:localfe,localbe
2.集群间的req/res:cloudfe,cloudbe
3.集群间的资源状态:statefe,statebe
最终,这个中间件会是这个样子:
下面,我们会将中间件的插座适当的分离。
1.资源状态:
1. import zmq
2. import time
3. import random
4.
5. def main(args):
6.
7. myself = args[1]
8. print “Hello, I am”, myself
9.
10. context = zmq.Context()
11.
12. # State Back-End
13. statebe = context.socket(zmq.PUB)
14.
15. # State Front-End
16. statefe = context.socket(zmq.SUB)
17. statefe.setsockopt(zmq.SUBSCRIBE, ”)
18.
19. bind_address = ”ipc://” + myself + ”-state.ipc”
20. statebe.bind(bind_address)
21.
22. for i in range(len(args) - 2):
23. endpoint = ”ipc://” + args[i + 2] + ”-state.ipc”
24. statefe.connect(endpoint)
25. time.sleep(1.0)
26.
27. poller = zmq.Poller()
28. poller.register(statefe, zmq.POLLIN)
29.
30. while True:
31.
32. ########## Solution with poll() ##########
33. socks = dict(poller.poll(1000))
34.
35. try:
36. # Handle incoming status message
37. if socks[statefe] == zmq.POLLIN:
38. msg = statefe.recv_multipart()
39. print ‘Received:’, msg
40.
41. except KeyError:
42. # Send our address and a random value
43. # for worker availability
44. msg = []
45. msg.append(bind_address)
46. msg.append(str(random.randrange(1, 10)))
47. statebe.send_multipart(msg)
48. ##################################
49.
50. ######### Solution with select() #########
51. # (pollin, pollout, pollerr) = zmq.select([statefe], [], [], 1)
52. #
53. # if len(pollin) > 0 and pollin[0] == statefe:
54. # # Handle incoming status message
55. # msg = statefe.recv_multipart()
56. # print ’Received:’, msg
57. #
58. # else:
59. # # Send our address and a random value
60. # # for worker availability
61. # msg = []
62. # msg.append(bind_address)
63. # msg.append(str(random.randrange(1, 10)))
64. # statebe.send_multipart(msg)
65. ##################################
66.
67. poller.unregister(statefe)
68. time.sleep(1.0)
69.
70. if name == ’main’:
71. import sys
72.
73. if len(sys.argv) < 2:
74. print “Usage: peering.py <myself> <peer_1> … <peer_N>”
75. raise SystemExit
76.
77. main(sys.argv)
1. require”zmq”
2. require”zmq.poller”
3. require”zmq.threads”
4. require”zmsg”
5.
6. local tremove = table.remove
7.
8. local NBR_CLIENTS = 10
9. local NBR_WORKERS = 3
10.
11. local pre_code = [[
12. local self, seed = …
13. local zmq = require”zmq”
14. local zmsg = require”zmsg”
15. require”zhelpers”
16. math.randomseed(seed)
17. local context = zmq.init(1)
18.
19. ]]
20.
21. – Request-reply client using REQ socket
22. —
23. local client_task = pre_code .. [[
24. local client = context:socket(zmq.REQ)
25. local endpoint = string.format(“ipc://%s-localfe.ipc”, self)
26. assert(client:connect(endpoint))
27.
28. while true do
29. – Send request, get reply
30. local msg = zmsg.new (“HELLO”)
31. msg:send(client)
32. msg = zmsg.recv (client)
33. printf (“I: client status: %s\n”, msg:body())
34. end
35. – We never get here but if we did, this is how we’d exit cleanly
36. client:close()
37. context:term()
38. ]]
39.
40. – Worker using REQ socket to do LRU routing
41. —
42. local worker_task = pre_code .. [[
43. local worker = context:socket(zmq.REQ)
44. local endpoint = string.format(“ipc://%s-localbe.ipc”, self)
45. assert(worker:connect(endpoint))
46.
47. – Tell broker we’re ready for work
48. local msg = zmsg.new (“READY”)
49. msg:send(worker)
50.
51. while true do
52. msg = zmsg.recv (worker)
53. – Do some ’work’
54. s_sleep (1000)
55. msg:body_fmt(“OK - %04x”, randof (0x10000))
56. msg:send(worker)
57. end
58. – We never get here but if we did, this is how we’d exit cleanly
59. worker:close()
60. context:term()
61. ]]
62.
63. – First argument is this broker’s name
64. – Other arguments are our peers’ names
65. —
66. s_version_assert (2, 1)
67. if (#arg < 1) then
68. printf (“syntax: peering2 me doyouend…\n”)
69. os.exit(-1)
70. end
71. – Our own name; in practice this‘d be configured per node
72. local self = arg[1]
73. printf (“I: preparing broker at %s…\n”, self)
74. math.randomseed(os.time())
75.
76. – Prepare our context and sockets
77. local context = zmq.init(1)
78.
79. – Bind cloud frontend to endpoint
80. local cloudfe = context:socket(zmq.XREP)
81. local endpoint = string.format(“ipc://%s-cloud.ipc”, self)
82. cloudfe:setopt(zmq.IDENTITY, self)
83. assert(cloudfe:bind(endpoint))
84.
85. – Connect cloud backend to all peers
86. local cloudbe = context:socket(zmq.XREP)
87. cloudbe:setopt(zmq.IDENTITY, self)
88.
89. local peers = {}
90. for n=2,#arg do
91. local peer = arg[n]
92. – add peer name to peers list.
93. peers[#peers + 1] = peer
94. peers[peer] = true – map peer’s name to ’true‘ for fast lookup
95. printf (“I: connecting to cloud frontend at ’%s’\n”, peer)
96. local endpoint = string.format(“ipc://%s-cloud.ipc”, peer)
97. assert(cloudbe:connect(endpoint))
98. end
99. – Prepare local frontend and backend
100. local localfe = context:socket(zmq.XREP)
101. local endpoint = string.format(“ipc://%s-localfe.ipc”, self)
102. assert(localfe:bind(endpoint))
103.
104. local localbe = context:socket(zmq.XREP)
105. local endpoint = string.format(“ipc://%s-localbe.ipc”, self)
106. assert(localbe:bind(endpoint))
107.
108. – Get user to tell us when we can start…
109. printf (“Press Enter when all brokers are started: ”)
110. io.read(‘*l’)
111.
112. – Start local workers
113. local workers = {}
114. for n=1,NBR_WORKERS do
115. local seed = os.time() + math.random()
116. workers[n] = zmq.threads.runstring(nil, worker_task, self, seed)
117. workers[n]:start(true)
118. end
119. – Start local clients
120. local clients = {}
121. for n=1,NBR_CLIENTS do
122. local seed = os.time() + math.random()
123. clients[n] = zmq.threads.runstring(nil, client_task, self, seed)
124. clients[n]:start(true)
125. end
126.
127. – Interesting part
128. – ————————————————————-
129. – Request-reply flow
130. – - Poll backends and process local/cloud replies
131. – - While worker available, route localfe to local or cloud
132.
133. – Queue of available workers
134. local worker_queue = {}
135. local backends = zmq.poller(2)
136.
137. local function send_reply(msg)
138. local address = msg:address()
139. – Route reply to cloud if it’s addressed to a broker
140. if peers[address] then
141. msg:send(cloudfe) – reply is for a peer.
142. else
143. msg:send(localfe) – reply is for a local client.
144. end
145. end
146.
147. backends:add(localbe, zmq.POLLIN, function()
148. local msg = zmsg.recv(localbe)
149.
150. – Use worker address for LRU routing
151. worker_queue[#worker_queue + 1] = msg:unwrap()
152. – if reply is not ”READY” then route reply back to client.
153. if (msg:address() ~= ”READY”) then
154. send_reply(msg)
155. end
156. end)
157.
158. backends:add(cloudbe, zmq.POLLIN, function()
159. local msg = zmsg.recv(cloudbe)
160. – We don’t use peer broker address for anything
161. msg:unwrap()
162. – send reply back to client.
163. send_reply(msg)
164. end)
165.
166. local frontends = zmq.poller(2)
167. local localfe_ready = false
168. local cloudfe_ready = false
169.
170. frontends:add(localfe, zmq.POLLIN, function() localfe_ready = true end)
171. frontends:add(cloudfe, zmq.POLLIN, function() cloudfe_ready = true end)
172.
173. while true do
174. local timeout = (#worker_queue > 0) and 1000000 or -1
175. – If we have no workers anyhow, wait indefinitely
176. rc = backends:poll(timeout)
177. assert (rc >= 0)
178.
179. – Now route as many clients requests as we can handle
180. –
181. while (#worker_queue > 0) do
182. rc = frontends:poll(0)
183. assert (rc >= 0)
184. local reroutable = false
185. local msg
186. – We’ll do peer brokers first, to prevent starvation
187. if (cloudfe_ready) then
188. cloudfe_ready = false – reset flag
189. msg = zmsg.recv (cloudfe)
190. reroutable = false
191. elseif (localfe_ready) then
192. localfe_ready = false – reset flag
193. msg = zmsg.recv (localfe)
194. reroutable = true
195. else
196. break; – No work, go back to backends
197. end
198.
199. – If reroutable, send to cloud 20% of the time
200. – Here we’d normally use cloud status information
201. –
202. local percent = randof (5)
203. if (reroutable and #peers > 0 and percent == 0) then
204. – Route to random broker peer
205. local random_peer = randof (#peers) + 1
206. msg:wrap(peers[random_peer], nil)
207. msg:send(cloudbe)
208. else
209. – Dequeue and drop the next worker address
210. local worker = tremove(worker_queue, 1)
211. msg:wrap(worker, ””)
212. msg:send(localbe)
213. end
214. end
215. end
216. – We never get here but clean up anyhow
217. localbe:close()
218. cloudbe:close()
219. localfe:close()
220. cloudfe:close()
221. context:term()
注意:
这里是lua代码,官方没有给出Python,改天补齐~
3.合并:
1. require”zmq”
2. require”zmq.poller”
3. require”zmq.threads”
4. require”zmsg”
5.
6. local tremove = table.remove
7.
8. local NBR_CLIENTS = 10
9. local NBR_WORKERS = 5
10.
11. local pre_code = [[
12. local self, seed = …
13. local zmq = require”zmq”
14. local zmsg = require”zmsg”
15. require”zhelpers”
16. math.randomseed(seed)
17. local context = zmq.init(1)
18.
19. ]]
20.
21. – Request-reply client using REQ socket
22. – To simulate load, clients issue a burst of requests and then
23. – sleep for a random period.
24. —
25. local client_task = pre_code .. [[
26. require”zmq.poller”
27.
28. local client = context:socket(zmq.REQ)
29. local endpoint = string.format(“ipc://%s-localfe.ipc”, self)
30. assert(client:connect(endpoint))
31.
32. local monitor = context:socket(zmq.PUSH)
33. local endpoint = string.format(“ipc://%s-monitor.ipc”, self)
34. assert(monitor:connect(endpoint))
35.
36. local poller = zmq.poller(1)
37. local task_id = nil
38.
39. poller:add(client, zmq.POLLIN, function()
40. local msg = zmsg.recv (client)
41. – Worker is supposed to answer us with our task id
42. assert (msg:body() == task_id)
43. – mark task as processed.
44. task_id = nil
45. end)
46. local is_running = true
47. while is_running do
48. s_sleep (randof (5) * 1000)
49.
50. local burst = randof (15)
51. while (burst > 0) do
52. burst = burst - 1
53. – Send request with random hex ID
54. task_id = string.format(“%04X”, randof (0x10000))
55. local msg = zmsg.new(task_id)
56. msg:send(client)
57.
58. – Wait max ten seconds for a reply, then complain
59. rc = poller:poll(10 * 1000000)
60. assert (rc >= 0)
61.
62. if task_id then
63. local msg = zmsg.new()
64. msg:body_fmt(
65. “E: CLIENT EXIT - lost task %s”, task_id)
66. msg:send(monitor)
67. – exit event loop
68. is_running = false
69. break
70. end
71. end
72. end
73. – We never get here but if we did, this is how we’d exit cleanly
74. client:close()
75. monitor:close()
76. context:term()
77. ]]
78.
79. – Worker using REQ socket to do LRU routing
80. —
81. local worker_task = pre_code .. [[
82. local worker = context:socket(zmq.REQ)
83. local endpoint = string.format(“ipc://%s-localbe.ipc”, self)
84. assert(worker:connect(endpoint))
85.
86. – Tell broker we’re ready for work
87. local msg = zmsg.new (“READY”)
88. msg:send(worker)
89.
90. while true do
91. – Workers are busy for 0/1/2 seconds
92. msg = zmsg.recv (worker)
93. s_sleep (randof (2) * 1000)
94. msg:send(worker)
95. end
96. – We never get here but if we did, this is how we’d exit cleanly
97. worker:close()
98. context:term()
99. ]]
100.
101. – First argument is this broker’s name
102. – Other arguments are our peers’ names
103. —
104. s_version_assert (2, 1)
105. if (#arg < 1) then
106. printf (“syntax: peering3 me doyouend…\n”)
107. os.exit(-1)
108. end
109. – Our own name; in practice this‘d be configured per node
110. local self = arg[1]
111. printf (“I: preparing broker at %s…\n”, self)
112. math.randomseed(os.time())
113.
114. – Prepare our context and sockets
115. local context = zmq.init(1)
116.
117. – Bind cloud frontend to endpoint
118. local cloudfe = context:socket(zmq.XREP)
119. local endpoint = string.format(“ipc://%s-cloud.ipc”, self)
120. cloudfe:setopt(zmq.IDENTITY, self)
121. assert(cloudfe:bind(endpoint))
122.
123. – Bind state backend / publisher to endpoint
124. local statebe = context:socket(zmq.PUB)
125. local endpoint = string.format(“ipc://%s-state.ipc”, self)
126. assert(statebe:bind(endpoint))
127.
128. – Connect cloud backend to all peers
129. local cloudbe = context:socket(zmq.XREP)
130. cloudbe:setopt(zmq.IDENTITY, self)
131.
132. for n=2,#arg do
133. local peer = arg[n]
134. printf (“I: connecting to cloud frontend at ’%s’\n”, peer)
135. local endpoint = string.format(“ipc://%s-cloud.ipc”, peer)
136. assert(cloudbe:connect(endpoint))
137. end
138. – Connect statefe to all peers
139. local statefe = context:socket(zmq.SUB)
140. statefe:setopt(zmq.SUBSCRIBE, ””, 0)
141.
142. local peers = {}
143. for n=2,#arg do
144. local peer = arg[n]
145. – add peer name to peers list.
146. peers[#peers + 1] = peer
147. peers[peer] = 0 – set peer’s initial capacity to zero.
148. printf (“I: connecting to state backend at ’%s’\n”, peer)
149. local endpoint = string.format(“ipc://%s-state.ipc”, peer)
150. assert(statefe:connect(endpoint))
151. end
152. – Prepare local frontend and backend
153. local localfe = context:socket(zmq.XREP)
154. local endpoint = string.format(“ipc://%s-localfe.ipc”, self)
155. assert(localfe:bind(endpoint))
156.
157. local localbe = context:socket(zmq.XREP)
158. local endpoint = string.format(“ipc://%s-localbe.ipc”, self)
159. assert(localbe:bind(endpoint))
160.
161. – Prepare monitor socket
162. local monitor = context:socket(zmq.PULL)
163. local endpoint = string.format(“ipc://%s-monitor.ipc”, self)
164. assert(monitor:bind(endpoint))
165.
166. – Start local workers
167. local workers = {}
168. for n=1,NBR_WORKERS do
169. local seed = os.time() + math.random()
170. workers[n] = zmq.threads.runstring(nil, worker_task, self, seed)
171. workers[n]:start(true)
172. end
173. – Start local clients
174. local clients = {}
175. for n=1,NBR_CLIENTS do
176. local seed = os.time() + math.random()
177. clients[n] = zmq.threads.runstring(nil, client_task, self, seed)
178. clients[n]:start(true)
179. end
180.
181. – Interesting part
182. – ————————————————————-
183. – Publish-subscribe flow
184. – - Poll statefe and process capacity updates
185. – - Each time capacity changes, broadcast new value
186. – Request-reply flow
187. – - Poll primary and process local/cloud replies
188. – - While worker available, route localfe to local or cloud
189.
190. – Queue of available workers
191. local local_capacity = 0
192. local cloud_capacity = 0
193. local worker_queue = {}
194. local backends = zmq.poller(2)
195.
196. local function send_reply(msg)
197. local address = msg:address()
198. – Route reply to cloud if it’s addressed to a broker
199. if peers[address] then
200. msg:send(cloudfe) – reply is for a peer.
201. else
202. msg:send(localfe) – reply is for a local client.
203. end
204. end
205.
206. backends:add(localbe, zmq.POLLIN, function()
207. local msg = zmsg.recv(localbe)
208.
209. – Use worker address for LRU routing
210. local_capacity = local_capacity + 1
211. worker_queue[local_capacity] = msg:unwrap()
212. – if reply is not ”READY” then route reply back to client.
213. if (msg:address() ~= ”READY”) then
214. send_reply(msg)
215. end
216. end)
217.
218. backends:add(cloudbe, zmq.POLLIN, function()
219. local msg = zmsg.recv(cloudbe)
220.
221. – We don’t use peer broker address for anything
222. msg:unwrap()
223. – send reply back to client.
224. send_reply(msg)
225. end)
226.
227. backends:add(statefe, zmq.POLLIN, function()
228. local msg = zmsg.recv (statefe)
229. – TODO: track capacity for each peer
230. cloud_capacity = tonumber(msg:body())
231. end)
232.
233. backends:add(monitor, zmq.POLLIN, function()
234. local msg = zmsg.recv (monitor)
235. printf(“%s\n”, msg:body())
236. end)
237.
238. local frontends = zmq.poller(2)
239. local localfe_ready = false
240. local cloudfe_ready = false
241.
242. frontends:add(localfe, zmq.POLLIN, function() localfe_ready = true end)
243. frontends:add(cloudfe, zmq.POLLIN, function() cloudfe_ready = true end)
244.
245. local MAX_BACKEND_REPLIES = 20
246.
247. while true do
248. – If we have no workers anyhow, wait indefinitely
249. local timeout = (local_capacity > 0) and 1000000 or -1
250. local rc, err = backends:poll(timeout)
251. assert (rc >= 0, err)
252.
253. – Track if capacity changes during this iteration
254. local previous = local_capacity
255.
256. – Now route as many clients requests as we can handle
257. – - If we have local capacity we poll both localfe and cloudfe
258. – - If we have cloud capacity only, we poll just localfe
259. – - Route any request locally if we can, else to cloud
260. –
261. while ((local_capacity + cloud_capacity) > 0) do
262. local rc, err = frontends:poll(0)
263. assert (rc >= 0, err)
264.
265. if (localfe_ready) then
266. localfe_ready = false
267. msg = zmsg.recv (localfe)
268. elseif (cloudfe_ready and local_capacity > 0) then
269. cloudfe_ready = false
270. – we have local capacity poll cloud frontend for work.
271. msg = zmsg.recv (cloudfe)
272. else
273. break; – No work, go back to primary
274. end
275.
276. if (local_capacity > 0) then
277. – Dequeue and drop the next worker address
278. local worker = tremove(worker_queue, 1)
279. local_capacity = local_capacity - 1
280. msg:wrap(worker, ””)
281. msg:send(localbe)
282. else
283. – Route to random broker peer
284. printf (“I: route request %s to cloud…\n”,
285. msg:body())
286. local random_peer = randof (#peers) + 1
287. msg:wrap(peers[random_peer], nil)
288. msg:send(cloudbe)
289. end
290. end
291. if (local_capacity ~= previous) then
292. – Broadcast new capacity
293. local msg = zmsg.new()
294. – TODO: send our name with capacity.
295. msg:body_fmt(“%d”, local_capacity)
296. – We stick our own address onto the envelope
297. msg:wrap(self, nil)
298. msg:send(statebe)
299. end
300. end
301. – We never get here but clean up anyhow
302. localbe:close()
303. cloudbe:close()
304. localfe:close()
305. cloudfe:close()
306. statefe:close()
307. monitor:close()
308. context:term()
ok,终于,一个完整的“云端”呈现了出来(虽然只用了一个进程)。不过从代码中,可以很清晰的划分各个模块。
不过,这里还是不可避免的涉及到了数据的安全性:如果其他的集群down了怎么办?通过更短时间的状态更新?似乎并不治本。或许一个回复链路可以解决。好吧,那是之后要解决的问题了。
zeroMQ初体验-22.可靠性-总览
博客分类:
· MQ
在开篇就从曾对zeromq的可靠性做过质疑,不过,作为一个雄心勃勃的项目,这自然不能成为它的软肋,于是乎,就有了这一完整的章节来介绍和提供“提升可靠性”的解决方案。
这一章节总共会介绍如下一些具备通用性的模式:
· 懒惰的海盗模式:由客户端来保证请求、链路的可靠性。
· 海盗的简单模式:通过LRU队列来保证请求/应答的可靠性。
· 偏执的海盗模式:通过心跳机制来确保链路的通畅。
· 管家模式:由服务器端来保证请求、链路的可靠性。
· 硬盘模式:通过磁盘的同步来确保在不稳定的链路下提供稳定的数据服务。
· 主从模式:通过主从备份来保证服务的可靠性。
· 自由模式:通过冗余的中间件来实现服务的可靠性。
(这边由于没有弄明白原作者的一些比喻,所以,就当是索引吧,主要可从解释中窥得一斑)
关于可靠性:
为了搞清楚什么是可靠性,不妨从他的反面–“故障”来看看。如果可以处理一种可被预测的”故障”,那么就可以认为,系统对于这种”故障”是具有可靠性的。当然,仅仅是对于这个故障。那么,只要能预测足够的故障,并能做到对这些故障进行相应处理。自然整个系统的可靠性将大大增强。下面将列出一些常见的故障(按出现频率从高到低排列):
1,应用程序:无论如何,似乎这都无法被避免,这些架在zeromq之上的应用程序总是会出现“接口撞车”、“消费能力不足导致内存溢出”、“停止响应请求”等待问题。
2,当然,作为整个框架的基础,zeromq本身的代码也会出现“死亡”、“内存溢出”等麻烦。不过,相对而言,我们应该相信他是可靠的(不然用他做什么)。
3,队列溢出:这个溢出可以是可预料的(因为消费能力不足而丢弃一些数据)。
4,物理层面上的链路问题:这是一个隐藏的故障,通常来说,zeromq会进行连接重试,不过,不可避免的会出现间歇性的丢包问题。
5,硬件问题:没有办法,至少基于该硬件的软体都可以理解为“故障”了。
6,链路上的丢包:数据死在路上了…
7,所有的数据中心/云端 地震、火山爆发、杯具了(e..好吧)
想要完整的解决方案来覆盖上面的一系列问题,对于本教程而言着实有些苛求了。所以,之后的方案相对会简单不少。
关于可靠性的设计:
可靠性工程可能是个杯具性的活计,不过,简单的来看(何必那么复杂),其实只要能尽量缩短 服务的杯具性崩溃时间,使服务看起来一直在运作,似乎也就马马虎虎了,不过纠错还是必不可少的!
例如:
应答模式:如果客户觉得慢,完全可以重试(或许他就能连上不那么忙的服务器了呢)。
发布/订阅模式:如果订阅者错过了一些东西,或许额外加一组应答来请求会比较靠谱。
管道模式:如果服务器后的某个数据加工出了问题,或许可以在统计结果的地方给出提示。
传统的应答模式(基于tcp/ip)当遇到链路断裂或服务器停摆时,很有可能客户端还傻傻的挂着(直到永远)。就这一点而言,zeromq似乎要友好的多,至少,他会帮你重新连接试试~
好吧,这还远远不够,不过,如果再配合着一些额外的操作,其实是可以得到一个可靠的、分布式的应答网络!
简单来说,就有以下三种解决方案:
1.所有客户端连接单一、可确定的服务器。那么一旦哪里崩溃或链路断裂,简单的自检机制加上zeromq的自动重连就可以了。
2.所有客户端连接单一、可确定的队列服务器。数据都放在了队列里,应用崩溃的话,重试应该很简单。
3.多对多的连接。算是1的加强版吧,这里不行,可以连其他的试试。
额。。。其实我也有点晕了,不过,在后续章节中会逐一详解~
zeroMQ初体验-23.可靠性-懒惰的海盗模式
博客分类:
· MQ
相较于通常的阻塞模式,这里只是做了一点简单的动作来加强系统的可靠性(不再是通常性质的阻塞了)。
这种模式增加/变更了以下几点:
· 轮询已经发出的请求直到得到响应
· 在一定时间内未得到相应则重新发出请求
· 在一定次数的重试不成功之后,停止请求
相较于传统的阻塞模式,好处显而易见:在未得到答复时可以继续发出请求而不是碰到预料之外的报错之类信息。(zeromq还默默地为你做了这么件事:当链路出现问题,它会悄悄的重新建立一个~)
client:
1. require ’rubygems’
2. require ’zmq’
3.
4. class LPClient
5. def initialize(connect, retries = nil, timeout = nil)
6. @connect = connect
7. @retries = (retries || 3).to_i
8. @timeout = (timeout || 3).to_i
9. @ctx = ZMQ::Context.new(1)
10. client_sock
11. at_exit do
12. @socket.close
13. end
14. end
15.
16. def client_sock
17. @socket = @ctx.socket(ZMQ::REQ)
18. @socket.setsockopt(ZMQ::LINGER, 0)
19. @socket.connect(@connect)
20. end
21.
22. def send(message)
23. @retries.times do |tries|
24. raise(“Send: #{message} failed”) unless @socket.send(message)
25. if ZMQ.select( [@socket], nil, nil, @timeout)
26. yield @socket.recv
27. return
28. else
29. @socket.close
30. client_sock
31. end
32. end
33. raise ‘Server down’
34. end
35.
36. end
37.
38. if $0 == FILE
39. server = LPClient.new(ARGV[0] || ”tcp://localhost:5555″, ARGV[1], ARGV[2])
40. count = 0
41. loop do
42. request = ”#{count}”
43. count += 1
44. server.send(request) do |reply|
45. if reply == request
46. puts(“I: server replied OK (#{reply})”)
47. else
48. puts(“E: malformed reply from server: #{reply}”)
49. end
50. end
51. end
52. puts ’success’
53. end
server:
1. require ’rubygems’
2. require ’zmq’
3.
4. class LPServer
5. def initialize(connect)
6. @ctx = ZMQ::Context.new(1)
7. @socket = @ctx.socket(ZMQ::REP)
8. @socket.bind(connect)
9. end
10.
11. def run
12. begin
13. loop do
14. rsl = yield @socket.recv
15. @socket.send rsl
16. end
17. ensure
18. @socket.close
19. @ctx.close
20. end
21. end
22.
23. end
24.
25. if $0 == FILE
26. cycles = 0
27. srand
28. LPServer.new(ARGV[0] || ”tcp://*:5555″).run do |request|
29. cycles += 1
30. if cycles > 3
31. if rand(3) == 0
32. puts ”I: simulating a crash”
33. break
34. elsif rand(3) == 0
35. puts ”I: simulating CPU overload”
36. sleep(3)
37. end
38. end
39. puts ”I: normal request (#{request})”
40. sleep(1)
41. request
42. end
43.
44. end
优点:
1.容易理解和实施
2.容易并入到现有的工程项目中
3.zeromq帮助解决了一部分比较麻烦的底层实现
缺点:
没有故障转移(如果server出现了问题,只有死等了~)
zeroMQ初体验-24.可靠性-简单的海盗模式
博客分类:
· MQ
相较于“懒惰的”做了些许扩展,适当的挽救那个“死等”的败笔~
恰如图所示,在消费者与服务者中加了一个中间件,做了一个请求的分发工作(尽管不是那么智能),避免了每次总是在等待不靠谱的worker服务。
中间的均衡队列:
1. require”zmq”
2. require”zmq.poller”
3. require”zhelpers”
4. require”zmsg”
5.
6. local tremove = table.remove
7.
8. local MAX_WORKERS = 100
9.
10. s_version_assert (2, 1)
11.
12. – Prepare our context and sockets
13. local context = zmq.init(1)
14. local frontend = context:socket(zmq.XREP)
15. local backend = context:socket(zmq.XREP)
16. frontend:bind(“tcp://*:5555”); – For clients
17. backend:bind(“tcp://*:5556”); – For workers
18.
19. – Queue of available workers
20. local worker_queue = {}
21. local is_accepting = false
22.
23. local poller = zmq.poller(2)
24.
25. local function frontend_cb()
26. – Now get next client request, route to next worker
27. local msg = zmsg.recv (frontend)
28.
29. – Dequeue a worker from the queue.
30. local worker = tremove(worker_queue, 1)
31.
32. msg:wrap(worker, ””)
33. msg:send(backend)
34.
35. if (#worker_queue == 0) then
36. – stop accepting work from clients, when no workers are available.
37. poller:remove(frontend)
38. is_accepting = false
39. end
40. end
41.
42. – Handle worker activity on backend
43. poller:add(backend, zmq.POLLIN, function()
44. local msg = zmsg.recv(backend)
45. – Use worker address for LRU routing
46. worker_queue[#worker_queue + 1] = msg:unwrap()
47.
48. – start accepting client requests, if we are not already doing so.
49. if not is_accepting then
50. is_accepting = true
51. poller:add(frontend, zmq.POLLIN, frontend_cb)
52. end
53.
54. – Forward message to client if it’s not a READY
55. if (msg:address() ~= ”READY”) then
56. msg:send(frontend)
57. end
58. end)
59.
60. – start poller’s event loop
61. poller:start()
62.
63. – We never exit the main loop
workers:
1. require”zmq”
2. require”zmsg”
3.
4. math.randomseed(os.time())
5.
6. local context = zmq.init(1)
7. local worker = context:socket(zmq.REQ)
8.
9. – Set random identity to make tracing easier
10. local identity = string.format(“%04X-%04X”, randof (0x10000), randof (0x10000))
11. worker:setopt(zmq.IDENTITY, identity)
12. worker:connect(“tcp://localhost:5556”)
13.
14. – Tell queue we’re ready for work
15. printf (“I: (%s) worker ready\n”, identity)
16. worker:send(“READY”)
17.
18. local cycles = 0
19. while true do
20. local msg = zmsg.recv (worker)
21.
22. – Simulate various problems, after a few cycles
23. cycles = cycles + 1
24. if (cycles > 3 and randof (5) == 0) then
25. printf (“I: (%s) simulating a crash\n”, identity)
26. break
27. elseif (cycles > 3 and randof (5) == 0) then
28. printf (“I: (%s) simulating CPU overload\n”, identity)
29. s_sleep (5000)
30. end
31. printf (“I: (%s) normal reply - %s\n”,
32. identity, msg:body())
33. s_sleep (1000) – Do some heavy work
34. msg:send(worker)
35. end
36. worker:close()
37. context:term()
注意:这里语言是lua。
这种模型理论上来说,已经相当靠谱了,好吧,路由还不够智能~
zeroMQ初体验-25.可靠性-偏执的海盗模式
博客分类:
· MQ
虽然说“简单的海盗模式”已经非常靠谱了,不过瑕疵还是有不少的。比如说,中间件队列并不监控后端的worker死活,至少会有一次丢包来确定那个worker已经不在了(虽然问题不大,但终究不爽)。而在“偏执的”模式中,有对“简单”模式做了一些扩展:
Queue:
1. require”zmq”
2. require”zmq.poller”
3. require”zmsg”
4.
5. local MAX_WORKERS = 100
6. local HEARTBEAT_LIVENESS = 3 – 3-5 is reasonable
7. local HEARTBEAT_INTERVAL = 1000 – msecs
8.
9. local tremove = table.remove
10.
11. – Insert worker at end of queue, reset expiry
12. – Worker must not already be in queue
13. local function s_worker_append(queue, identity)
14. if queue[identity] then
15. printf (“E: duplicate worker identity %s”, identity)
16. else
17. assert (#queue < MAX_WORKERS)
18. queue[identity] = s_clock() + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS
19. queue[#queue + 1] = identity
20. end
21. end
22. – Remove worker from queue, if present
23. local function s_worker_delete(queue, identity)
24. for i=1,#queue do
25. if queue == identity then
26. tremove(queue, i)
27. break
28. end
29. end
30. queue[identity] = nil
31. end
32. – Reset worker expiry, worker must be present
33. local function s_worker_refresh(queue, identity)
34. if queue[identity] then
35. queue[identity] = s_clock() + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS
36. else
37. printf(“E: worker %s not ready\n”, identity)
38. end
39. end
40. – Pop next available worker off queue, return identity
41. local function s_worker_dequeue(queue)
42. assert (#queue > 0)
43. local identity = tremove(queue, 1)
44. queue[identity] = nil
45. return identity
46. end
47. – Look for & kill expired workers
48. local function s_queue_purge(queue)
49. local curr_clock = s_clock()
50. – Work backwards from end to simplify removal
51. for i=#queue,1,-1 do
52. local id = queue
53. if (curr_clock > queue[id]) then
54. tremove(queue, i)
55. queue[id] = nil
56. end
57. end
58. end
59. s_version_assert (2, 1)
60.
61. – Prepare our context and sockets
62. local context = zmq.init(1)
63. local frontend = context:socket(zmq.XREP)
64. local backend = context:socket(zmq.XREP)
65. frontend:bind(“tcp://*:5555”); – For clients
66. backend:bind(“tcp://*:5556”); – For workers
67.
68. – Queue of available workers
69. local queue = {}
70. local is_accepting = false
71.
72. – Send out heartbeats at regular intervals
73. local heartbeat_at = s_clock() + HEARTBEAT_INTERVAL
74.
75. local poller = zmq.poller(2)
76.
77. local function frontend_cb()
78. – Now get next client request, route to next worker
79. local msg = zmsg.recv(frontend)
80. local identity = s_worker_dequeue (queue)
81. msg:push(identity)
82. msg:send(backend)
83.
84. if (#queue == 0) then
85. – stop accepting work from clients, when no workers are available.
86. poller:remove(frontend)
87. is_accepting = false
88. end
89. end
90.
91. – Handle worker activity on backend
92. poller:add(backend, zmq.POLLIN, function()
93. local msg = zmsg.recv(backend)
94. local identity = msg:unwrap()
95.
96. – Return reply to client if it’s not a control message
97. if (msg:parts() == 1) then
98. if (msg:address() == ”READY”) then
99. s_worker_delete(queue, identity)
100. s_worker_append(queue, identity)
101. elseif (msg:address() == ”HEARTBEAT”) then
102. s_worker_refresh(queue, identity)
103. else
104. printf(“E: invalid message from %s\n”, identity)
105. msg:dump()
106. end
107. else
108. – reply for client.
109. msg:send(frontend)
110. s_worker_append(queue, identity)
111. end
112.
113. – start accepting client requests, if we are not already doing so.
114. if not is_accepting and #queue > 0 then
115. is_accepting = true
116. poller:add(frontend, zmq.POLLIN, frontend_cb)
117. end
118. end)
119.
120. – start poller’s event loop
121. while true do
122. local cnt = assert(poller:poll(HEARTBEAT_INTERVAL * 1000))
123. – Send heartbeats to idle workers if it’s time
124. if (s_clock() > heartbeat_at) then
125. for i=1,#queue do
126. local msg = zmsg.new(“HEARTBEAT”)
127. msg:wrap(queue, nil)
128. msg:send(backend)
129. end
130. heartbeat_at = s_clock() + HEARTBEAT_INTERVAL
131. end
132. s_queue_purge(queue)
133. end
134.
135. – We never exit the main loop
136. – But pretend to do the right shutdown anyhow
137. while (#queue > [[span style=”color:#666666″]]0) [[span style=”color:#008000″]]do
138. s_worker_dequeue(queue)
139. [[span style=”color:#008000″]]end
140.
141. frontend:close()
142. backend:close()
worker:
1. require”zmq”
2. require”zmq.poller”
3. require”zmsg”
4.
5. local HEARTBEAT_LIVENESS = 3 – 3-5 is reasonable
6. local HEARTBEAT_INTERVAL = 1000 – msecs
7. local INTERVAL_INIT = 1000 – Initial reconnect
8. local INTERVAL_MAX = 32000 – After exponential backoff
9.
10. – Helper function that returns a new configured socket
11. – connected to the Hello World server
12. —
13. local identity
14.
15. local function s_worker_socket (context)
16. local worker = context:socket(zmq.XREQ)
17.
18. – Set random identity to make tracing easier
19. identity = string.format(“%04X-%04X”, randof (0x10000), randof (0x10000))
20. worker:setopt(zmq.IDENTITY, identity)
21. worker:connect(“tcp://localhost:5556”)
22.
23. – Configure socket to not wait at close time
24. worker:setopt(zmq.LINGER, 0)
25.
26. – Tell queue we’re ready for work
27. printf(“I: (%s) worker ready\n”, identity)
28. worker:send(“READY”)
29.
30. return worker
31. end
32.
33. s_version_assert (2, 1)
34. math.randomseed(os.time())
35.
36. local context = zmq.init(1)
37. local worker = s_worker_socket (context)
38.
39. – If liveness hits zero, queue is considered disconnected
40. local liveness = HEARTBEAT_LIVENESS
41. local interval = INTERVAL_INIT
42.
43. – Send out heartbeats at regular intervals
44. local heartbeat_at = s_clock () + HEARTBEAT_INTERVAL
45.
46. local poller = zmq.poller(1)
47.
48. local is_running = true
49.
50. local cycles = 0
51. local function worker_cb()
52. – Get message
53. – - 3-part envelope + content -> request
54. – - 1-part ”HEARTBEAT” -> heartbeat
55. local msg = zmsg.recv (worker)
56.
57. if (msg:parts() == 3) then
58. – Simulate various problems, after a few cycles
59. cycles = cycles + 1
60. if (cycles > 3 and randof (5) == 0) then
61. printf (“I: (%s) simulating a crash\n”, identity)
62. is_running = false
63. return
64. elseif (cycles > 3 and randof (5) == 0) then
65. printf (“I: (%s) simulating CPU overload\n”,
66. identity)
67. s_sleep (5000)
68. end
69. printf (“I: (%s) normal reply - %s\n”,
70. identity, msg:body())
71. msg:send(worker)
72. liveness = HEARTBEAT_LIVENESS
73. s_sleep(1000); – Do some heavy work
74. elseif (msg:parts() == 1 and msg:body() == ”HEARTBEAT”) then
75. liveness = HEARTBEAT_LIVENESS
76. else
77. printf (“E: (%s) invalid message\n”, identity)
78. msg:dump()
79. end
80. interval = INTERVAL_INIT
81. end
82. poller:add(worker, zmq.POLLIN, worker_cb)
83.
84. while is_running do
85. local cnt = assert(poller:poll(HEARTBEAT_INTERVAL * 1000))
86.
87. if (cnt == 0) then
88. liveness = liveness - 1
89. if (liveness == 0) then
90. printf (“W: (%s) heartbeat failure, can’t reach queue\n”,
91. identity)
92. printf (“W: (%s) reconnecting in %d msec…\n”,
93. identity, interval)
94. s_sleep (interval)
95.
96. if (interval < INTERVAL_MAX) then
97. interval = interval * 2
98. end
99. poller:remove(worker)
100. worker:close()
101. worker = s_worker_socket (context)
102. poller:add(worker, zmq.POLLIN, worker_cb)
103. liveness = HEARTBEAT_LIVENESS
104. end
105. end
106. – Send heartbeat to queue if it’s time
107. if (s_clock () > heartbeat_at) then
108. heartbeat_at = s_clock () + HEARTBEAT_INTERVAL
109. printf(“I: (%s) worker heartbeat\n”, identity)
110. worker:send(“HEARTBEAT”)
111. end
112. end
113. worker:close()
114. context:term()
注意:这里的是lua代码
其实从模式图中已经可以看出,系统中多了“心跳”环节,来确认链路的可用性。
关于心跳模块,着实比较棘手,也算是代码中的重头了。关于做“心跳”的策略,关键是要把握好时间间隔,以避免过载或者失效。通常的,也不建议在持久化的连接上加入心跳机制。
这里应当注意到,“偏执”模式与“简单”模式并不兼容–因为心跳机制。
为了避免混乱。 rfc.zeromq.org这儿有一些协议的声明,帮助你至少不需要去看现有的代码来确定是否兼容新的东东~
zeroMQ初体验-26.可靠性-管家模式
博客分类:
· MQ
上一节末尾有说到协议,zeromq自然做了充沛的封装,”管家模式”便由此而来。
是不是有点像简化版的”偏执模式”?这里的“broker”需要做到”承上启下”。因为这是”协议”的具体实现,自然,这里以api形式给出各个角色的相应实现。
为客户端提供的api:
1. local setmetatable = setmetatable
2.
3. local mdp = require”mdp”
4.
5. local zmq = require”zmq”
6. local zpoller = require”zmq.poller”
7. local zmsg = require”zmsg”
8. require”zhelpers”
9.
10. local s_version_assert = s_version_assert
11.
12. local obj_mt = {}
13. obj_mt.__index = obj_mt
14.
15. function obj_mt:set_timeout(timeout)
16. self.timeout = timeout
17. end
18.
19. function obj_mt:set_retries(retries)
20. self.retries = retries
21. end
22.
23. function obj_mt:destroy()
24. if self.client then self.client:close() end
25. self.context:term()
26. end
27.
28. local function s_mdcli_connect_to_broker(self)
29. – close old socket.
30. if self.client then
31. self.poller:remove(self.client)
32. self.client:close()
33. end
34. self.client = assert(self.context:socket(zmq.REQ))
35. assert(self.client:setopt(zmq.LINGER, 0))
36. assert(self.client:connect(self.broker))
37. if self.verbose then
38. s_console(“I: connecting to broker at %s…”, self.broker)
39. end
40. – add socket to poller
41. self.poller:add(self.client, zmq.POLLIN, function()
42. self.got_reply = true
43. end)
44. end
45.
46. —
47. – Send request to broker and get reply by hook or crook
48. – Returns the reply message or nil if there was no reply.
49. —
50. function obj_mt:send(service, request)
51. – Prefix request with protocol frames
52. – Frame 1: ”MDPCxy” (six bytes, MDP/Client x.y)
53. – Frame 2: Service name (printable string)
54. request:push(service)
55. request:push(mdp.MDPC_CLIENT)
56. if self.verbose then
57. s_console(“I: send request to ’%s’ service:”, service)
58. request:dump()
59. end
60.
61. local retries = self.retries
62. while (retries > 0) do
63. local msg = request:dup()
64. msg:send(self.client)
65. self.got_reply = false
66.
67. while true do
68. local cnt = assert(self.poller:poll(self.timeout * 1000))
69. if cnt ~= 0 and self.got_reply then
70. local msg = zmsg.recv(self.client)
71. if self.verbose then
72. s_console(“I: received reply:”)
73. msg:dump()
74. end
75. assert(msg:parts() >= 3)
76.
77. local header = msg:pop()
78. assert(header == mdp.MDPC_CLIENT)
79. local reply_service = msg:pop()
80. assert(reply_service == service)
81. return msg
82. else
83. retries = retries - 1
84. if (retries > 0) then
85. if self.verbose then
86. s_console(“W: no reply, reconnecting…”)
87. end
88. – Reconnect
89. s_mdcli_connect_to_broker(self)
90. break – outer loop will resend request.
91. else
92. if self.verbose then
93. s_console(“W: permanent error, abandoning request”)
94. end
95. return nil – Giving up
96. end
97. end
98. end
99. end
100. end
101.
102. module(…)
103.
104. function new(broker, verbose)
105. s_version_assert (2, 1);
106. local self = setmetatable({
107. context = zmq.init(1),
108. poller = zpoller.new(1),
109. broker = broker,
110. verbose = verbose,
111. timeout = 2500, – msecs
112. retries = 3, – before we abandon
113. }, obj_mt)
114.
115. s_mdcli_connect_to_broker(self)
116. return self
117. end
118.
119. setmetatable(_M, { __call = function(self, …) return new(…) end })
客户端调用:
1. require”mdcliapi”
2. require”zmsg”
3. require”zhelpers”
4.
5. local verbose = (arg[1] == ”-v”)
6. local session = mdcliapi.new(“tcp://localhost:5555”, verbose)
7.
8. local count=1
9. repeat
10. local request = zmsg.new(“Hello world”)
11. local reply = session:send(“echo”, request)
12. if not reply then
13. break – Interrupt or failure
14. end
15. count = count + 1
16. until (count == 100000)
17. printf(“%d requests/replies processed\n”, count)
18. session:destroy()
服务端api:
1. local HEARTBEAT_LIVENESS = 3 – 3-5 is reasonable
2.
3. local setmetatable = setmetatable
4.
5. local mdp = require”mdp”
6.
7. local zmq = require”zmq”
8. local zpoller = require”zmq.poller”
9. local zmsg = require”zmsg”
10. require”zhelpers”
11.
12. local s_version_assert = s_version_assert
13.
14. local obj_mt = {}
15. obj_mt.__index = obj_mt
16.
17. function obj_mt:set_heartbeat(heartbeat)
18. self.heartbeat = heartbeat
19. end
20.
21. function obj_mt:set_reconnect(reconnect)
22. self.reconnect = reconnect
23. end
24.
25. function obj_mt:destroy()
26. if self.worker then self.worker:close() end
27. self.context:term()
28. end
29.
30. – Send message to broker
31. – If no msg is provided, create one internally
32. local function s_mdwrk_send_to_broker(self, command, option, msg)
33. msg = msg or zmsg.new()
34.
35. – Stack protocol envelope to start of message
36. if option then
37. msg:push(option)
38. end
39. msg:push(command)
40. msg:push(mdp.MDPW_WORKER)
41. msg:push(“”)
42.
43. if self.verbose then
44. s_console(“I: sending %s to broker”, mdp.mdps_commands[command])
45. msg:dump()
46. end
47. msg:send(self.worker)
48. end
49.
50. local function s_mdwrk_connect_to_broker(self)
51. – close old socket.
52. if self.worker then
53. self.poller:remove(self.worker)
54. self.worker:close()
55. end
56. self.worker = assert(self.context:socket(zmq.XREQ))
57. assert(self.worker:setopt(zmq.LINGER, 0))
58. assert(self.worker:connect(self.broker))
59. if self.verbose then
60. s_console(“I: connecting to broker at %s…”, self.broker)
61. end
62. – Register service with broker
63. s_mdwrk_send_to_broker(self, mdp.MDPW_READY, self.service)
64. – If liveness hits zero, queue is considered disconnected
65. self.liveness = HEARTBEAT_LIVENESS
66. self.heartbeat_at = s_clock() + self.heartbeat
67. – add socket to poller
68. self.poller:add(self.worker, zmq.POLLIN, function()
69. self.got_msg = true
70. end)
71. end
72.
73. —
74. – Send reply, if any, to broker and wait for next request.
75. —
76. function obj_mt:recv(reply)
77. – Format and send the reply if we are provided one
78. if reply then
79. assert(self.reply_to)
80. reply:wrap(self.reply_to, ””)
81. self.reply_to = nil
82. s_mdwrk_send_to_broker(self, mdp.MDPW_REPLY, nil, reply)
83. end
84. self.expect_reply = true
85.
86. self.got_msg = false
87. while true do
88. local cnt = assert(self.poller:poll(self.heartbeat * 1000))
89. if cnt ~= 0 and self.got_msg then
90. self.got_msg = false
91. local msg = zmsg.recv(self.worker)
92. if self.verbose then
93. s_console(“I: received message from broker:”)
94. msg:dump()
95. end
96. self.liveness = HEARTBEAT_LIVENESS
97. – Don’t try to handle errors, just assert noisily
98. assert(msg:parts() >= 3)
99.
100. local empty = msg:pop()
101. assert(empty == ””)
102.
103. local header = msg:pop()
104. assert(header == mdp.MDPW_WORKER)
105.
106. local command = msg:pop()
107. if command == mdp.MDPW_REQUEST then
108. – We should pop and save as many addresses as there are
109. – up to a null part, but for now, just save one…
110. self.reply_to = msg:unwrap()
111. return msg – We have a request to process
112. elseif command == mdp.MDPW_HEARTBEAT then
113. – Do nothing for heartbeats
114. elseif command == mdp.MDPW_DISCONNECT then
115. – dis-connect and re-connect to broker.
116. s_mdwrk_connect_to_broker(self)
117. else
118. s_console(“E: invalid input message (%d)”, command:byte(1,1))
119. msg:dump()
120. end
121. else
122. self.liveness = self.liveness - 1
123. if (self.liveness == 0) then
124. if self.verbose then
125. s_console(“W: disconnected from broker - retrying…”)
126. end
127. – sleep then Reconnect
128. s_sleep(self.reconnect)
129. s_mdwrk_connect_to_broker(self)
130. end
131.
132. – Send HEARTBEAT if it’s time
133. if (s_clock() > self.heartbeat_at) then
134. s_mdwrk_send_to_broker(self, mdp.MDPW_HEARTBEAT)
135. self.heartbeat_at = s_clock() + self.heartbeat
136. end
137. end
138. end
139. end
140.
141. module(…)
142.
143. function new(broker, service, verbose)
144. s_version_assert(2, 1);
145. local self = setmetatable({
146. context = zmq.init(1),
147. poller = zpoller.new(1),
148. broker = broker,
149. service = service,
150. verbose = verbose,
151. heartbeat = 2500, – msecs
152. reconnect = 2500, – msecs
153. }, obj_mt)
154.
155. s_mdwrk_connect_to_broker(self)
156. return self
157. end
158.
159. setmetatable(_M, { __call = function(self, …) return new(…) end })
服务端调用:
1. require”mdwrkapi”
2. require”zmsg”
3.
4. local verbose = (arg[1] == ”-v”)
5. local session = mdwrkapi.new(“tcp://localhost:5555″, ”echo”, verbose)
6.
7. local reply
8. while true do
9. local request = session:recv(reply)
10. if not request then
11. break – Worker was interrupted
12. end
13. reply = request – Echo is complex… :-)
14. end
15. session:destroy()
注意:
这里的api全部都是单线程的,不会做心跳,并且不会做错误报告(这里可以根据具体需要修正)。确定连接通路就任务分配的是“管家”:
1. require”zmq”
2. require”zmq.poller”
3. require”zmsg”
4. require”zhelpers”
5. require”mdp”
6.
7. local tremove = table.remove
8.
9. – We’d normally pull these from config data
10.
11. local HEARTBEAT_LIVENESS = 3 – 3-5 is reasonable
12. local HEARTBEAT_INTERVAL = 2500 – msecs
13. local HEARTBEAT_EXPIRY = HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS
14.
15. – ———————————————————————
16. – Constructor for broker object
17.
18. – ———————————————————————
19. – Broker object’s metatable.
20. local broker_mt = {}
21. broker_mt.__index = broker_mt
22.
23. function broker_new(verbose)
24. local context = zmq.init(1)
25. – Initialize broker state
26. return setmetatable({
27. context = context,
28. socket = context:socket(zmq.XREP),
29. verbose = verbose,
30. services = {},
31. workers = {},
32. waiting = {},
33. heartbeat_at = s_clock() + HEARTBEAT_INTERVAL,
34. }, broker_mt)
35. end
36.
37. – ———————————————————————
38. – Service object
39. local service_mt = {}
40. service_mt.__index = service_mt
41.
42. – Worker object
43. local worker_mt = {}
44. worker_mt.__index = worker_mt
45.
46. – helper list remove function
47. local function zlist_remove(list, item)
48. for n=#list,1,-1 do
49. if list[n] == item then
50. tremove(list, n)
51. end
52. end
53. end
54.
55. – ———————————————————————
56. – Destructor for broker object
57.
58. function broker_mt:destroy()
59. self.socket:close()
60. self.context:term()
61. for name, service in pairs(self.services) do
62. service:destroy()
63. end
64. for id, worker in pairs(self.workers) do
65. worker:destroy()
66. end
67. end
68.
69. – ———————————————————————
70. – Bind broker to endpoint, can call this multiple times
71. – We use a single socket for both clients and workers.
72.
73. function broker_mt:bind(endpoint)
74. self.socket:bind(endpoint)
75. s_console(“I: MDP broker/0.1.1 is active at %s”, endpoint)
76. end
77.
78. – ———————————————————————
79. – Delete any idle workers that haven’t pinged us in a while. Workers
80. – are oldest to most recent, so we stop at the first alive worker.
81.
82. function broker_mt:purge_workers()
83. local waiting = self.waiting
84. for n=1,#waiting do
85. local worker = waiting[n]
86. if (not worker:expired()) then
87. return – Worker is alive, we’re done here
88. end
89. if (self.verbose) then
90. s_console(“I: deleting expired worker: %s”, worker.identity)
91. end
92.
93. self:worker_delete(worker, false)
94. end
95. end
96.
97. – ———————————————————————
98. – Locate or create new service entry
99.
100. function broker_mt:service_require(name)
101. assert (name)
102. local service = self.services[name]
103. if not service then
104. service = setmetatable({
105. name = name,
106. requests = {},
107. waiting = {},
108. workers = 0,
109. }, service_mt)
110. self.services[name] = service
111. if (self.verbose) then
112. s_console(“I: received message:”)
113. end
114. end
115. return service
116. end
117.
118. – ———————————————————————
119. – Destroy service object, called when service is removed from
120. – broker.services.
121.
122. function service_mt:destroy()
123. end
124.
125. – ———————————————————————
126. – Dispatch requests to waiting workers as possible
127.
128. function broker_mt:service_dispatch(service, msg)
129. assert (service)
130. local requests = service.requests
131. if (msg) then – Queue message if any
132. requests[#requests + 1] = msg
133. end
134.
135. self:purge_workers()
136. local waiting = service.waiting
137. while (#waiting > 0 and #requests > 0) do
138. local worker = tremove(waiting, 1) – pop worker from service’s waiting queue.
139. zlist_remove(self.waiting, worker) – also remove worker from broker’s waiting queue.
140. local msg = tremove(requests, 1) – pop request from service’s request queue.
141. self:worker_send(worker, mdp.MDPW_REQUEST, nil, msg)
142. end
143. end
144.
145. – ———————————————————————
146. – Handle internal service according to 8/MMI specification
147.
148. function broker_mt:service_internal(service_name, msg)
149. if (service_name == ”mmi.service”) then
150. local name = msg:body()
151. local service = self.services[name]
152. if (service and service.workers) then
153. msg:body_set(“200”)
154. else
155. msg:body_set(“404”)
156. end
157. else
158. msg:body_set(“501”)
159. end
160.
161. – Remove & save client return envelope and insert the
162. – protocol header and service name, then rewrap envelope.
163. local client = msg:unwrap()
164. msg:wrap(mdp.MDPC_CLIENT, service_name)
165. msg:wrap(client, ””)
166.
167. msg:send(self.socket)
168. end
169.
170. – ———————————————————————
171. – Creates worker if necessary
172.
173. function broker_mt:worker_require(identity)
174. assert (identity)
175.
176. – self.workers is keyed off worker identity
177. local worker = self.workers[identity]
178. if (not worker) then
179. worker = setmetatable({
180. identity = identity,
181. expiry = 0,
182. }, worker_mt)
183. self.workers[identity] = worker
184. if (self.verbose) then
185. s_console(“I: registering new worker: %s”, identity)
186. end
187. end
188. return worker
189. end
190.
191. – ———————————————————————
192. – Deletes worker from all data structures, and destroys worker
193.
194. function broker_mt:worker_delete(worker, disconnect)
195. assert (worker)
196. if (disconnect) then
197. self:worker_send(worker, mdp.MDPW_DISCONNECT)
198. end
199. local service = worker.service
200. if (service) then
201. zlist_remove (service.waiting, worker)
202. service.workers = service.workers - 1
203. end
204. zlist_remove (self.waiting, worker)
205. self.workers[worker.identity] = nil
206. worker:destroy()
207. end
208.
209. – ———————————————————————
210. – Destroy worker object, called when worker is removed from
211. – broker.workers.
212.
213. function worker_mt:destroy(argument)
214. end
215.
216. – ———————————————————————
217. – Process message sent to us by a worker
218.
219. function broker_mt:worker_process(sender, msg)
220. assert (msg:parts() >= 1) – At least, command
221.
222. local command = msg:pop()
223. local worker_ready = (self.workers[sender] ~= nil)
224. local worker = self:worker_require(sender)
225.
226. if (command == mdp.MDPW_READY) then
227. if (worker_ready) then – Not first command in session then
228. self:worker_delete(worker, true)
229. elseif (sender:sub(1,4) == ”mmi.”) then – Reserved service name
230. self:worker_delete(worker, true)
231. else
232. – Attach worker to service and mark as idle
233. local service_name = msg:pop()
234. local service = self:service_require(service_name)
235. worker.service = service
236. service.workers = service.workers + 1
237. self:worker_waiting(worker)
238. end
239. elseif (command == mdp.MDPW_REPLY) then
240. if (worker_ready) then
241. – Remove & save client return envelope and insert the
242. – protocol header and service name, then rewrap envelope.
243. local client = msg:unwrap()
244. msg:wrap(mdp.MDPC_CLIENT, worker.service.name)
245. msg:wrap(client, ””)
246.
247. msg:send(self.socket)
248. self:worker_waiting(worker)
249. else
250. self:worker_delete(worker, true)
251. end
252. elseif (command == mdp.MDPW_HEARTBEAT) then
253. if (worker_ready) then
254. worker.expiry = s_clock() + HEARTBEAT_EXPIRY
255. else
256. self:worker_delete(worker, true)
257. end
258. elseif (command == mdp.MDPW_DISCONNECT) then
259. self:worker_delete(worker, false)
260. else
261. s_console(“E: invalid input message (%d)”, command:byte(1,1))
262. msg:dump()
263. end
264. end
265.
266. – ———————————————————————
267. – Send message to worker
268. – If pointer to message is provided, sends & destroys that message
269.
270. function broker_mt:worker_send(worker, command, option, msg)
271. msg = msg and msg:dup() or zmsg.new()
272.
273. – Stack protocol envelope to start of message
274. if (option) then – Optional frame after command
275. msg:push(option)
276. end
277. msg:push(command)
278. msg:push(mdp.MDPW_WORKER)
279. – Stack routing envelope to start of message
280. msg:wrap(worker.identity, ””)
281.
282. if (self.verbose) then
283. s_console(“I: sending %s to worker”, mdp.mdps_commands[command])
284. msg:dump()
285. end
286. msg:send(self.socket)
287. end
288.
289. – ———————————————————————
290. – This worker is now waiting for work
291.
292. function broker_mt:worker_waiting(worker)
293. – Queue to broker and service waiting lists
294. self.waiting[#self.waiting + 1] = worker
295. worker.service.waiting[#worker.service.waiting + 1] = worker
296. worker.expiry = s_clock() + HEARTBEAT_EXPIRY
297. self:service_dispatch(worker.service, nil)
298. end
299.
300. – ———————————————————————
301. – Return 1 if worker has expired and must be deleted
302.
303. function worker_mt:expired()
304. return (self.expiry < s_clock())
305. end
306. – ———————————————————————
307. – Process a request coming from a client
308.
309. function broker_mt:client_process(sender, msg)
310. assert (msg:parts() >= 2) – Service name + body
311.
312. local service_name = msg:pop()
313. local service = self:service_require(service_name)
314. – Set reply return address to client sender
315. msg:wrap(sender, ””)
316. if (service_name:sub(1,4) == ”mmi.”) then
317. self:service_internal(service_name, msg)
318. else
319. self:service_dispatch(service, msg)
320. end
321. end
322.
323. – ———————————————————————
324. – Main broker work happens here
325.
326. local verbose = (arg[1] == ”-v”)
327.
328. s_version_assert (2, 1)
329. s_catch_signals ()
330. local self = broker_new(verbose)
331. self:bind(“tcp://*:5555”)
332.
333. local poller = zmq.poller.new(1)
334.
335. – Process next input message, if any
336. poller:add(self.socket, zmq.POLLIN, function()
337. local msg = zmsg.recv(self.socket)
338. if (self.verbose) then
339. s_console(“I: received message:”)
340. msg:dump()
341. end
342. local sender = msg:pop()
343. local empty = msg:pop()
344. local header = msg:pop()
345.
346. if (header == mdp.MDPC_CLIENT) then
347. self:client_process(sender, msg)
348. elseif (header == mdp.MDPW_WORKER) then
349. self:worker_process(sender, msg)
350. else
351. s_console(“E: invalid message:”)
352. msg:dump()
353. end
354. end)
355.
356. – Get and process messages forever or until interrupted
357. while (not s_interrupted) do
358. local cnt = assert(poller:poll(HEARTBEAT_INTERVAL * 1000))
359. – Disconnect and delete any expired workers
360. – Send heartbeats to idle workers if needed
361. if (s_clock() > self.heartbeat_at) then
362. self:purge_workers()
363. local waiting = self.waiting
364. for n=1,#waiting do
365. local worker = waiting[n]
366. self:worker_send(worker, mdp.MDPW_HEARTBEAT)
367. end
368. self.heartbeat_at = s_clock() + HEARTBEAT_INTERVAL
369. end
370. end
371. if (s_interrupted) then
372. printf(“W: interrupt received, shutting down…\n”)
373. end
374. self:destroy()
这里的“管家”基本上做了所有他能做的事:心跳,代理发送信息,合理利用多服务资源。
或许,效能上还有些问题,那么试试”异步”?
1. require”zmq”
2. require”zmq.threads”
3. require”zmsg”
4.
5. local common_code = [[
6. require”zmq”
7. require”zmsg”
8. require”zhelpers”
9. ]]
10.
11. local client_task = common_code .. [[
12. local context = zmq.init(1)
13. local client = context:socket(zmq.XREQ)
14. client:setopt(zmq.IDENTITY, ”C”, 1)
15. client:connect(“tcp://localhost:5555”)
16.
17. printf(“Setting up test…\n”)
18. s_sleep(100)
19.
20. local requests
21. local start
22.
23. printf(“Synchronous round-trip test…\n”)
24. requests = 10000
25. start = s_clock()
26. for n=1,requests do
27. local msg = zmsg.new(“HELLO”)
28. msg:send(client)
29. msg = zmsg.recv(client)
30. end
31. printf(“ %d calls/second\n”,
32. (1000 * requests) / (s_clock() - start))
33.
34. printf(“Asynchronous round-trip test…\n”)
35. requests = 100000
36. start = s_clock()
37. for n=1,requests do
38. local msg = zmsg.new(“HELLO”)
39. msg:send(client)
40. end
41. for n=1,requests do
42. local msg = zmsg.recv(client)
43. end
44. printf(“ %d calls/second\n”,
45. (1000 * requests) / (s_clock() - start))
46.
47. client:close()
48. context:term()
49. ]]
50.
51. local worker_task = common_code .. [[
52. local context = zmq.init(1)
53. local worker = context:socket(zmq.XREQ)
54. worker:setopt(zmq.IDENTITY, ”W”, 1)
55. worker:connect(“tcp://localhost:5556”)
56.
57. while true do
58. local msg = zmsg.recv(worker)
59. msg:send(worker)
60. end
61. worker:close()
62. context:term()
63. ]]
64.
65. local broker_task = common_code .. [[
66. – Prepare our context and sockets
67. local context = zmq.init(1)
68. local frontend = context:socket(zmq.XREP)
69. local backend = context:socket(zmq.XREP)
70. frontend:bind(“tcp://*:5555”)
71. backend:bind(“tcp://*:5556”)
72.
73. require”zmq.poller”
74. local poller = zmq.poller(2)
75. poller:add(frontend, zmq.POLLIN, function()
76. local msg = zmsg.recv(frontend)
77. –msg[1] = ”W”
78. msg:pop()
79. msg:push(“W”)
80. msg:send(backend)
81. end)
82. poller:add(backend, zmq.POLLIN, function()
83. local msg = zmsg.recv(backend)
84. –msg[1] = ”C”
85. msg:pop()
86. msg:push(“C”)
87. msg:send(frontend)
88. end)
89. poller:start()
90. frontend:close()
91. backend:close()
92. context:term()
93. ]]
94.
95. s_version_assert(2, 1)
96.
97. local client = zmq.threads.runstring(nil, client_task)
98. assert(client:start())
99. local worker = zmq.threads.runstring(nil, worker_task)
100. assert(worker:start(true))
101. local broker = zmq.threads.runstring(nil, broker_task)
102. assert(broker:start(true))
103.
104. assert(client:join())
如此这般,效能倒是大大降低了(官网说法是降了近20倍),分析了下原因,由于异步需要管理各条任务,不断轮询之类的原因,反倒降低了性能,那么I/O的异步呢?
异步的客户端api:
1. local setmetatable = setmetatable
2.
3. local mdp = require”mdp”
4.
5. local zmq = require”zmq”
6. local zpoller = require”zmq.poller”
7. local zmsg = require”zmsg”
8. require”zhelpers”
9.
10. local s_version_assert = s_version_assert
11.
12. local obj_mt = {}
13. obj_mt.__index = obj_mt
14.
15. function obj_mt:set_timeout(timeout)
16. self.timeout = timeout
17. end
18.
19. function obj_mt:destroy()
20. if self.client then self.client:close() end
21. self.context:term()
22. end
23.
24. local function s_mdcli_connect_to_broker(self)
25. – close old socket.
26. if self.client then
27. self.poller:remove(self.client)
28. self.client:close()
29. end
30. self.client = assert(self.context:socket(zmq.XREQ))
31. assert(self.client:setopt(zmq.LINGER, 0))
32. assert(self.client:connect(self.broker))
33. if self.verbose then
34. s_console(“I: connecting to broker at %s…”, self.broker)
35. end
36. – add socket to poller
37. self.poller:add(self.client, zmq.POLLIN, function()
38. self.got_reply = true
39. end)
40. end
41.
42. —
43. – Send request to broker and get reply by hook or crook
44. —
45. function obj_mt:send(service, request)
46. – Prefix request with protocol frames
47. – Frame 0: empty (REQ emulation)
48. – Frame 1: ”MDPCxy” (six bytes, MDP/Client x.y)
49. – Frame 2: Service name (printable string)
50. request:push(service)
51. request:push(mdp.MDPC_CLIENT)
52. request:push(“”)
53. if self.verbose then
54. s_console(“I: send request to ’%s’ service:”, service)
55. request:dump()
56. end
57. request:send(self.client)
58. return 0
59. end
60.
61. – Returns the reply message or NULL if there was no reply. Does not
62. – attempt to recover from a broker failure, this is not possible
63. – without storing all unanswered requests and resending them all…
64. function obj_mt:recv()
65. self.got_reply = false
66.
67. local cnt = assert(self.poller:poll(self.timeout * 1000))
68. if cnt ~= 0 and self.got_reply then
69. local msg = zmsg.recv(self.client)
70. if self.verbose then
71. s_console(“I: received reply:”)
72. msg:dump()
73. end
74. assert(msg:parts() >= 3)
75.
76. local empty = msg:pop()
77. assert(empty == ””)
78.
79. local header = msg:pop()
80. assert(header == mdp.MDPC_CLIENT)
81.
82. return msg
83. end
84. if self.verbose then
85. s_console(“W: permanent error, abandoning request”)
86. end
87. return nil – Giving up
88. end
89.
90. module(…)
91.
92. function new(broker, verbose)
93. s_version_assert (2, 1);
94. local self = setmetatable({
95. context = zmq.init(1),
96. poller = zpoller.new(1),
97. broker = broker,
98. verbose = verbose,
99. timeout = 2500, – msecs
100. }, obj_mt)
101.
102. s_mdcli_connect_to_broker(self)
103. return self
104. end
105.
106. setmetatable(_M, { __call = function(self, …) return new(…) end })
异步的客户端:
1. require”mdcliapi2″
2. require”zmsg”
3. require”zhelpers”
4.
5. local verbose = (arg[1] == ”-v”)
6. local session = mdcliapi2.new(“tcp://localhost:5555”, verbose)
7.
8. local count=100000
9. for n=1,count do
10. local request = zmsg.new(“Hello world”)
11. session:send(“echo”, request)
12. end
13. for n=1,count do
14. local reply = session:recv()
15. if not reply then
16. break – Interrupted by Ctrl-C
17. end
18. end
19. printf(“%d replies received\n”, count)
20. session:destroy()
当当当当:
$ time mdclient
同步的:
real 0m14.088s
user 0m1.310s
sys 0m2.670s
异步的:
real 0m8.730s
user 0m0.920s
sys 0m1.550s
10个服务端的异步:
real 0m3.863s
user 0m0.730s
sys 0m0.470s
经过测试,4核的话起8个服务端就算饱和了。不过,就效率而言,应该是足够了。
值得注意到是,”异步管家模式”并非全能。由于他没有做管家的连接重试,所以一旦“管家”崩溃了,那自然一切都say goodbye了。
为了服务更靠谱,或许还需要一个叫做”发现服务”的系统,来确认到底有哪些服务可用。
1. require”mdcliapi”
2. require”zmsg”
3. require”zhelpers”
4.
5. local verbose = (arg[1] == ”-v”)
6. local session = mdcliapi.new(“tcp://localhost:5555”, verbose)
7.
8. – This is the service we want to look up
9. local request = zmsg.new(“echo”)
10.
11. – This is the service we send our request to
12. local reply = session:send(“mmi.service”, request)
13.
14. if (reply) then
15. printf (“Lookup echo service: %s\n”, reply:body())
16. else
17. printf (“E: no response from broker, make sure it’s running\n”)
18. end
19.
20. session:destroy()
注:以上皆为lua代码
zeroMQ初体验-27.可靠性-硬盘模式
博客分类:
· MQ
在之前的种种模式中,虽然各擅胜场,可是有一个根本性的问题:一旦数据在某个环节丢失了,那么它就真的丢失了(虽然会有种种重试机制)。而”硬盘模式”即是为了应对这种情况而出现的。
结构图:
是不是非常眼熟?只是对“管家模式”做了些许扩展,就又使得可靠性上升了一个台阶。
总的来说,本模式主要做了三件事:
1.获取请求,给出唯一标识
2.获取答复,按照唯一标识回复
3.回复成功,关闭数据
客户端:
1. //
2. // Titanic client example
3. // Implements client side of http://rfc.zeromq.org/spec:9
4.
5. // Lets us build this source without creating a library
6. #include ”mdcliapi.c”
7.
8. // Calls a TSP service
9. // Returns reponse if successful (status code 200 OK), else NULL
10. //
11. static zmsg_t *
12. s_service_call (mdcli_t *session, char *service, zmsg_t **request_p)
13. {
14. zmsg_t *reply = mdcli_send (session, service, request_p);
15. if (reply) {
16. zframe_t *status = zmsg_pop (reply);
17. if (zframe_streq (status, ”200″)) {
18. zframe_destroy (&status);
19. return reply;
20. }
21. else
22. if (zframe_streq (status, ”400″)) {
23. printf (“E: client fatal error, aborting\n”);
24. exit (EXIT_FAILURE);
25. }
26. else
27. if (zframe_streq (status, ”500″)) {
28. printf (“E: server fatal error, aborting\n”);
29. exit (EXIT_FAILURE);
30. }
31. }
32. else
33. exit (EXIT_SUCCESS); // Interrupted or failed
34.
35. zmsg_destroy (&reply);
36. return NULL; // Didn’t succeed, don’t care why not
37. }
38.
39. int main (int argc, char *argv [])
40. {
41. int verbose = (argc > 1 && streq (argv [1], ”-v”));
42. mdcli_t *session = mdcli_new (“tcp://localhost:5555”, verbose);
43.
44. // 1. Send ’echo’ request to Titanic
45. zmsg_t *request = zmsg_new ();
46. zmsg_addstr (request, ”echo”);
47. zmsg_addstr (request, ”Hello world”);
48. zmsg_t *reply = s_service_call (
49. session, ”titanic.request”, &request);
50.
51. zframe_t *uuid = NULL;
52. if (reply) {
53. uuid = zmsg_pop (reply);
54. zmsg_destroy (&reply);
55. zframe_print (uuid, ”I: request UUID ”);
56. }
57.
58. // 2. Wait until we get a reply
59. while (!zctx_interrupted) {
60. zclock_sleep (100);
61. request = zmsg_new ();
62. zmsg_add (request, zframe_dup (uuid));
63. zmsg_t *reply = s_service_call (
64. session, ”titanic.reply”, &request);
65.
66. if (reply) {
67. char *reply_string = zframe_strdup (zmsg_last (reply));
68. printf (“Reply: %s\n”, reply_string);
69. free (reply_string);
70. zmsg_destroy (&reply);
71.
72. // 3. Close request
73. request = zmsg_new ();
74. zmsg_add (request, zframe_dup (uuid));
75. reply = s_service_call (session, ”titanic.close”, &request);
76. zmsg_destroy (&reply);
77. break;
78. }
79. else {
80. printf (“I: no reply yet, trying again…\n”);
81. zclock_sleep (5000); // Try again in 5 seconds
82. }
83. }
84. zframe_destroy (&uuid);
85. mdcli_destroy (&session);
86. return 0;
87. }
中间件(主体):
1. //
2. // Titanic service
3. //
4. // Implements server side of http://rfc.zeromq.org/spec:9
5.
6. // Lets us build this source without creating a library
7. #include ”mdwrkapi.c”
8. #include ”mdcliapi.c”
9.
10. #include ”zfile.h”
11. #include <uuid/uuid.h>
12.
13. // Return a new UUID as a printable character string
14. // Caller must free returned string when finished with it
15.
16. static char *
17. s_generate_uuid (void)
18. {
19. char hex_char [] = ”0123456789ABCDEF”;
20. char *uuidstr = zmalloc (sizeof (uuid_t) * 2 + 1);
21. uuid_t uuid;
22. uuid_generate (uuid);
23. int byte_nbr;
24. for (byte_nbr = 0; byte_nbr < sizeof (uuid_t); byte_nbr++) {
25. uuidstr [byte_nbr * 2 + 0] = hex_char [uuid [byte_nbr] >> 4];
26. uuidstr [byte_nbr * 2 + 1] = hex_char [uuid [byte_nbr] & 15];
27. }
28. return uuidstr;
29. }
30.
31. // Returns freshly allocated request filename for given UUID
32.
33. #define TITANIC_DIR ”.titanic”
34.
35. static char *
36. s_request_filename (char *uuid) {
37. char *filename = malloc (256);
38. snprintf (filename, 256, TITANIC_DIR ”/%s.req”, uuid);
39. return filename;
40. }
41.
42. // Returns freshly allocated reply filename for given UUID
43.
44. static char *
45. s_reply_filename (char *uuid) {
46. char *filename = malloc (256);
47. snprintf (filename, 256, TITANIC_DIR ”/%s.rep”, uuid);
48. return filename;
49. }
50.
51. // ———————————————————————
52. // Titanic request service
53.
54. static void
55. titanic_request (void *args, zctx_t *ctx, void *pipe)
56. {
57. mdwrk_t *worker = mdwrk_new (
58. “tcp://localhost:5555″, ”titanic.request”, 0);
59. zmsg_t *reply = NULL;
60.
61. while (TRUE) {
62. // Send reply if it’s not null
63. // And then get next request from broker
64. zmsg_t *request = mdwrk_recv (worker, &reply);
65. if (!request)
66. break; // Interrupted, exit
67.
68. // Ensure message directory exists
69. file_mkdir (TITANIC_DIR);
70.
71. // Generate UUID and save message to disk
72. char *uuid = s_generate_uuid ();
73. char *filename = s_request_filename (uuid);
74. FILE *file = fopen (filename, ”w”);
75. assert (file);
76. zmsg_save (request, file);
77. fclose (file);
78. free (filename);
79. zmsg_destroy (&request);
80.
81. // Send UUID through to message queue
82. reply = zmsg_new ();
83. zmsg_addstr (reply, uuid);
84. zmsg_send (&reply, pipe);
85.
86. // Now send UUID back to client
87. // Done by the mdwrk_recv() at the top of the loop
88. reply = zmsg_new ();
89. zmsg_addstr (reply, ”200″);
90. zmsg_addstr (reply, uuid);
91. free (uuid);
92. }
93. mdwrk_destroy (&worker);
94. }
95.
96. // ———————————————————————
97. // Titanic reply service
98.
99. static void *
100. titanic_reply (void *context)
101. {
102. mdwrk_t *worker = mdwrk_new (
103. “tcp://localhost:5555″, ”titanic.reply”, 0);
104. zmsg_t *reply = NULL;
105.
106. while (TRUE) {
107. zmsg_t *request = mdwrk_recv (worker, &reply);
108. if (!request)
109. break; // Interrupted, exit
110.
111. char *uuid = zmsg_popstr (request);
112. char *req_filename = s_request_filename (uuid);
113. char *rep_filename = s_reply_filename (uuid);
114. if (file_exists (rep_filename)) {
115. FILE *file = fopen (rep_filename, ”r”);
116. assert (file);
117. reply = zmsg_load (file);
118. zmsg_pushstr (reply, ”200″);
119. fclose (file);
120. }
121. else {
122. reply = zmsg_new ();
123. if (file_exists (req_filename))
124. zmsg_pushstr (reply, ”300″); //Pending
125. else
126. zmsg_pushstr (reply, ”400″); //Unknown
127. }
128. zmsg_destroy (&request);
129. free (uuid);
130. free (req_filename);
131. free (rep_filename);
132. }
133. mdwrk_destroy (&worker);
134. return 0;
135. }
136.
137. // ———————————————————————
138. // Titanic close service
139.
140. static void *
141. titanic_close (void *context)
142. {
143. mdwrk_t *worker = mdwrk_new (
144. “tcp://localhost:5555″, ”titanic.close”, 0);
145. zmsg_t *reply = NULL;
146.
147. while (TRUE) {
148. zmsg_t *request = mdwrk_recv (worker, &reply);
149. if (!request)
150. break; // Interrupted, exit
151.
152. char *uuid = zmsg_popstr (request);
153. char *req_filename = s_request_filename (uuid);
154. char *rep_filename = s_reply_filename (uuid);
155. file_delete (req_filename);
156. file_delete (rep_filename);
157. free (uuid);
158. free (req_filename);
159. free (rep_filename);
160.
161. zmsg_destroy (&request);
162. reply = zmsg_new ();
163. zmsg_addstr (reply, ”200″);
164. }
165. mdwrk_destroy (&worker);
166. return 0;
167. }
168.
169. // Attempt to process a single request, return 1 if successful
170.
171. static int
172. s_service_success (mdcli_t *client, char *uuid)
173. {
174. // Load request message, service will be first frame
175. char *filename = s_request_filename (uuid);
176. FILE *file = fopen (filename, ”r”);
177. free (filename);
178.
179. // If the client already closed request, treat as successful
180. if (!file)
181. return 1;
182.
183. zmsg_t *request = zmsg_load (file);
184. fclose (file);
185. zframe_t *service = zmsg_pop (request);
186. char *service_name = zframe_strdup (service);
187.
188. // Use MMI protocol to check if service is available
189. zmsg_t *mmi_request = zmsg_new ();
190. zmsg_add (mmi_request, service);
191. zmsg_t *mmi_reply = mdcli_send (client, ”mmi.service”, &mmi_request);
192. int service_ok = (mmi_reply
193. && zframe_streq (zmsg_first (mmi_reply), ”200″));
194. zmsg_destroy (&mmi_reply);
195.
196. if (service_ok) {
197. zmsg_t *reply = mdcli_send (client, service_name, &request);
198. if (reply) {
199. filename = s_reply_filename (uuid);
200. FILE *file = fopen (filename, ”w”);
201. assert (file);
202. zmsg_save (reply, file);
203. fclose (file);
204. free (filename);
205. return 1;
206. }
207. zmsg_destroy (&reply);
208. }
209. else
210. zmsg_destroy (&request);
211.
212. free (service_name);
213. return 0;
214. }
215.
216. int main (int argc, char *argv [])
217. {
218. int verbose = (argc > 1 && streq (argv [1], ”-v”));
219. zctx_t *ctx = zctx_new ();
220.
221. // Create MDP client session with short timeout
222. mdcli_t *client = mdcli_new (“tcp://localhost:5555”, verbose);
223. mdcli_set_timeout (client, 1000); // 1 sec
224. mdcli_set_retries (client, 1); // only 1 retry
225.
226. void *request_pipe = zthread_fork (ctx, titanic_request, NULL);
227. zthread_new (ctx, titanic_reply, NULL);
228. zthread_new (ctx, titanic_close, NULL);
229.
230. // Main dispatcher loop
231. while (TRUE) {
232. // We’ll dispatch once per second, if there’s no activity
233. zmq_pollitem_t items [] = { { request_pipe, 0, ZMQ_POLLIN, 0 } };
234. int rc = zmq_poll (items, 1, 1000 * ZMQ_POLL_MSEC);
235. if (rc == -1)
236. break; // Interrupted
237. if (items [0].revents & ZMQ_POLLIN) {
238. // Ensure message directory exists
239. file_mkdir (TITANIC_DIR);
240.
241. // Append UUID to queue, prefixed with ’-‘ for pending
242. zmsg_t *msg = zmsg_recv (request_pipe);
243. if (!msg)
244. break; // Interrupted
245. FILE *file = fopen (TITANIC_DIR ”/queue”, ”a”);
246. char *uuid = zmsg_popstr (msg);
247. fprintf (file, ”-%s\n”, uuid);
248. fclose (file);
249. free (uuid);
250. zmsg_destroy (&msg);
251. }
252. // Brute-force dispatcher
253. //
254. char entry [] = ”?…….:…….:…….:…….:”;
255. FILE *file = fopen (TITANIC_DIR ”/queue”, ”r+”);
256. while (file && fread (entry, 33, 1, file) == 1) {
257. // UUID is prefixed with ’-‘ if still waiting
258. if (entry [0] == ’-‘) {
259. if (verbose)
260. printf (“I: processing request %s\n”, entry + 1);
261. if (s_service_success (client, entry + 1)) {
262. // Mark queue entry as processed
263. fseek (file, -33, SEEK_CUR);
264. fwrite (“+”, 1, 1, file);
265. fseek (file, 32, SEEK_CUR);
266. }
267. }
268. // Skip end of line, LF or CRLF
269. if (fgetc (file) == ’\r’)
270. fgetc (file);
271. if (zctx_interrupted)
272. break;
273. }
274. if (file)
275. fclose (file);
276. }
277. mdcli_destroy (&client);
278. return 0;
279. }
注意:官网似乎并不推荐额外的模式存储(比如数据库、kevy-value什么的,原因与效率有关),你可以试试~
zeroMQ初体验-28.可靠性-主从模式
博客分类:
· MQ
虽然“硬盘模式”看起来已经非常靠谱了,不过,还记得前段时间”亚马逊云拓机”么,异地灾备似乎才能真正当的上’高可用’,暂且抛开物理、成本上的问题,zeromq也为此提供了靠谱的支持~
官方声明:
1.这是一个直接的高可用的方案
2.足够简单,让人易于理解和使用
3.在需要时可以提供可靠的转移
比较经典也比较靠谱的模式简图:
迁移:
作为一个高可用的框架,至少得做到如下几点:
· 为灾难性事故做准备,例如主机房失火了、地震了等。
· 切换应该在尽可能短的时间内完成(60秒内,最好是10s内)。
· 由主切换到从可以是自动的,不过恢复时最好手动完成。
· 客户端应该明了这种切换机制,最好在api中给出自动方案。
· 有明确的标识来明确主从。
· 主从间不能有启动顺序的依赖。
· 服务端的切换不会导致客户端崩溃(或许会重新连接)。
· 主从状态必须可监控。
· 主从间的连接必须靠谱,高速。
基于如此结构的一些假设:
· 一主一从结构已经足够保险,不再需要多层次的备份。
· 主和从都能单独负载整个需求,而不是平衡性的承载整个符合。
· 要有足够的预算供起这么一个日常空转的从。
这些并没有被涉及到:
· 从并非用来负载均衡。
· 信息不持久化。
· 主从间不会自动探测对方。
· 主从间不会同步服务器的状态、信息。
如果要配置一对主从,需要在各自的配置中设定好对方及同步时间。
匹配的,作为客户端需要做的:
1.知道主从的地址
2.先连主,失败则试试从
3.检测失效的连接(心跳)
4.重连时遵守2
5.重新在请求的服务器上创建状态
6.当主从切换时,可以重新传递请求
当然,这些都会由客户端的api完成。这里注意:作者一再强调,主从同时只能有一个在提供服务!
服务器端:
1. //
2. // Binary Star server
3. //
4. #include ”czmq.h”
5.
6. // We send state information every this often
7. // If peer doesn’t respond in two heartbeats, it is ’dead’
8. #define HEARTBEAT 1000 // In msecs
9.
10. // States we can be in at any point in time
11. typedef enum {
12. STATE_PRIMARY = 1, // Primary, waiting for peer to connect
13. STATE_BACKUP = 2, // Backup, waiting for peer to connect
14. STATE_ACTIVE = 3, // Active - accepting connections
15. STATE_PASSIVE = 4 // Passive - not accepting connections
16. } state_t;
17.
18. // Events, which start with the states our peer can be in
19. typedef enum {
20. PEER_PRIMARY = 1, // HA peer is pending primary
21. PEER_BACKUP = 2, // HA peer is pending backup
22. PEER_ACTIVE = 3, // HA peer is active
23. PEER_PASSIVE = 4, // HA peer is passive
24. CLIENT_REQUEST = 5 // Client makes request
25. } event_t;
26.
27. // Our finite state machine
28. typedef struct {
29. state_t state; // Current state
30. event_t event; // Current event
31. int64_t peer_expiry; // When peer is considered ’dead’
32. } bstar_t;
33.
34. // Execute finite state machine (apply event to state)
35. // Returns TRUE if there was an exception
36.
37. static Bool
38. s_state_machine (bstar_t *fsm)
39. {
40. Bool exception = FALSE;
41. // Primary server is waiting for peer to connect
42. // Accepts CLIENT_REQUEST events in this state
43. if (fsm->state == STATE_PRIMARY) {
44. if (fsm->event == PEER_BACKUP) {
45. printf (“I: connected to backup (slave), ready as master\n”);
46. fsm->state = STATE_ACTIVE;
47. }
48. else
49. if (fsm->event == PEER_ACTIVE) {
50. printf (“I: connected to backup (master), ready as slave\n”);
51. fsm->state = STATE_PASSIVE;
52. }
53. }
54. else
55. // Backup server is waiting for peer to connect
56. // Rejects CLIENT_REQUEST events in this state
57. if (fsm->state == STATE_BACKUP) {
58. if (fsm->event == PEER_ACTIVE) {
59. printf (“I: connected to primary (master), ready as slave\n”);
60. fsm->state = STATE_PASSIVE;
61. }
62. else
63. if (fsm->event == CLIENT_REQUEST)
64. exception = TRUE;
65. }
66. else
67. // Server is active
68. // Accepts CLIENT_REQUEST events in this state
69. if (fsm->state == STATE_ACTIVE) {
70. if (fsm->event == PEER_ACTIVE) {
71. // Two masters would mean split-brain
72. printf (“E: fatal error - dual masters, aborting\n”);
73. exception = TRUE;
74. }
75. }
76. else
77. // Server is passive
78. // CLIENT_REQUEST events can trigger failover if peer looks dead
79. if (fsm->state == STATE_PASSIVE) {
80. if (fsm->event == PEER_PRIMARY) {
81. // Peer is restarting - become active, peer will go passive
82. printf (“I: primary (slave) is restarting, ready as master\n”);
83. fsm->state = STATE_ACTIVE;
84. }
85. else
86. if (fsm->event == PEER_BACKUP) {
87. // Peer is restarting - become active, peer will go passive
88. printf (“I: backup (slave) is restarting, ready as master\n”);
89. fsm->state = STATE_ACTIVE;
90. }
91. else
92. if (fsm->event == PEER_PASSIVE) {
93. // Two passives would mean cluster would be non-responsive
94. printf (“E: fatal error - dual slaves, aborting\n”);
95. exception = TRUE;
96. }
97. else
98. if (fsm->event == CLIENT_REQUEST) {
99. // Peer becomes master if timeout has passed
100. // It’s the client request that triggers the failover
101. assert (fsm->peer_expiry > 0);
102. if (zclock_time () >= fsm->peer_expiry) {
103. // If peer is dead, switch to the active state
104. printf (“I: failover successful, ready as master\n”);
105. fsm->state = STATE_ACTIVE;
106. }
107. else
108. // If peer is alive, reject connections
109. exception = TRUE;
110. }
111. }
112. return exception;
113. }
114.
115. int main (int argc, char *argv [])
116. {
117. // Arguments can be either of:
118. // -p primary server, at tcp://localhost:5001
119. // -b backup server, at tcp://localhost:5002
120. zctx_t *ctx = zctx_new ();
121. void *statepub = zsocket_new (ctx, ZMQ_PUB);
122. void *statesub = zsocket_new (ctx, ZMQ_SUB);
123. void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
124. bstar_t fsm = { 0 };
125.
126. if (argc == 2 && streq (argv [1], ”-p”)) {
127. printf (“I: Primary master, waiting for backup (slave)\n”);
128. zsocket_bind (frontend, ”tcp://*:5001″);
129. zsocket_bind (statepub, ”tcp://*:5003″);
130. zsocket_connect (statesub, ”tcp://localhost:5004″);
131. fsm.state = STATE_PRIMARY;
132. }
133. else
134. if (argc == 2 && streq (argv [1], ”-b”)) {
135. printf (“I: Backup slave, waiting for primary (master)\n”);
136. zsocket_bind (frontend, ”tcp://*:5002″);
137. zsocket_bind (statepub, ”tcp://*:5004″);
138. zsocket_connect (statesub, ”tcp://localhost:5003″);
139. fsm.state = STATE_BACKUP;
140. }
141. else {
142. printf (“Usage: bstarsrv { -p | -b }\n”);
143. zctx_destroy (&ctx);
144. exit (0);
145. }
146. // Set timer for next outgoing state message
147. int64_t send_state_at = zclock_time () + HEARTBEAT;
148.
149. while (!zctx_interrupted) {
150. zmq_pollitem_t items [] = {
151. { frontend, 0, ZMQ_POLLIN, 0 },
152. { statesub, 0, ZMQ_POLLIN, 0 }
153. };
154. int time_left = (int) ((send_state_at - zclock_time ()));
155. if (time_left < 0)
156. time_left = 0;
157. int rc = zmq_poll (items, 2, time_left * ZMQ_POLL_MSEC);
158. if (rc == -1)
159. break; // Context has been shut down
160.
161. if (items [0].revents & ZMQ_POLLIN) {
162. // Have a client request
163. zmsg_t *msg = zmsg_recv (frontend);
164. fsm.event = CLIENT_REQUEST;
165. if (s_state_machine (&fsm) == FALSE)
166. // Answer client by echoing request back
167. zmsg_send (&msg, frontend);
168. else
169. zmsg_destroy (&msg);
170. }
171. if (items [1].revents & ZMQ_POLLIN) {
172. // Have state from our peer, execute as event
173. char *message = zstr_recv (statesub);
174. fsm.event = atoi (message);
175. free (message);
176. if (s_state_machine (&fsm))
177. break; // Error, so exit
178. fsm.peer_expiry = zclock_time () + 2 * HEARTBEAT;
179. }
180. // If we timed-out, send state to peer
181. if (zclock_time () >= send_state_at) {
182. char message [2];
183. sprintf (message, ”%d”, fsm.state);
184. zstr_send (statepub, message);
185. send_state_at = zclock_time () + HEARTBEAT;
186. }
187. }
188. if (zctx_interrupted)
189. printf (“W: interrupted\n”);
190.
191. // Shutdown sockets and context
192. zctx_destroy (&ctx);
193. return 0;
194. }
客户端:
1. //
2. // Binary Star client
3. //
4. #include ”czmq.h”
5.
6. #define REQUEST_TIMEOUT 1000 // msecs
7. #define SETTLE_DELAY 2000 // Before failing over
8.
9. int main (void)
10. {
11. zctx_t *ctx = zctx_new ();
12.
13. char *server [] = { ”tcp://localhost:5001″, ”tcp://localhost:5002″ };
14. uint server_nbr = 0;
15.
16. printf (“I: connecting to server at %s…\n”, server [server_nbr]);
17. void *client = zsocket_new (ctx, ZMQ_REQ);
18. zsocket_connect (client, server [server_nbr]);
19.
20. int sequence = 0;
21. while (!zctx_interrupted) {
22. // We send a request, then we work to get a reply
23. char request [10];
24. sprintf (request, ”%d”, ++sequence);
25. zstr_send (client, request);
26.
27. int expect_reply = 1;
28. while (expect_reply) {
29. // Poll socket for a reply, with timeout
30. zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 } };
31. int rc = zmq_poll (items, 1, REQUEST_TIMEOUT * ZMQ_POLL_MSEC);
32. if (rc == -1)
33. break; // Interrupted
34.
35. // If we got a reply, process it
36. if (items [0].revents & ZMQ_POLLIN) {
37. // We got a reply from the server, must match sequence
38. char *reply = zstr_recv (client);
39. if (atoi (reply) == sequence) {
40. printf (“I: server replied OK (%s)\n”, reply);
41. expect_reply = 0;
42. sleep (1); // One request per second
43. }
44. else {
45. printf (“E: malformed reply from server: %s\n”,
46. reply);
47. }
48. free (reply);
49. }
50. else {
51. printf (“W: no response from server, failing over\n”);
52. // Old socket is confused; close it and open a new one
53. zsocket_destroy (ctx, client);
54. server_nbr = (server_nbr + 1) % 2;
55. zclock_sleep (SETTLE_DELAY);
56. printf (“I: connecting to server at %s…\n”,
57. server [server_nbr]);
58. client = zsocket_new (ctx, ZMQ_REQ);
59. zsocket_connect (client, server [server_nbr]);
60.
61. // Send request again, on new socket
62. zstr_send (client, request);
63. }
64. }
65. }
66. zctx_destroy (&ctx);
67. return 0;
68. }
zeroMQ初体验-29.可靠性-自由模式
博客分类:
· MQ
好吧,本以为这可能是一个更靠谱的模式,谁知(其实是我一厢情愿了)。
所谓自由模式,自然是–爱咋咋地 呃。。
作为还算靠谱的官方,终是给出了一些经过验证的”自由模式”的模型。下面就来一一拆解吧。
简单重试和故障转移
好像有点眼熟,其实有用到”懒人模式“的简单重试,并且对服务器端做了重写,使其可适用于多服务器的情况。不过,这可不比之前的”主从模式“,而是由客户端连接多个服务器,一次性发出n个同样的请求,先得到谁的返回就用谁的应答。(真是有够简单的,难道是简陋?)
服务器端:
1. //
2. // Freelance server - Model 2
3. // Does some work, replies OK, with message sequencing
4. //
5. #include ”czmq.h”
6.
7. int main (int argc, char *argv [])
8. {
9. if (argc < 2) {
10. printf (“I: syntax: %s <endpoint>\n”, argv [0]);
11. exit (EXIT_SUCCESS);
12. }
13. zctx_t *ctx = zctx_new ();
14. void *server = zsocket_new (ctx, ZMQ_REP);
15. zsocket_bind (server, argv [1]);
16.
17. printf (“I: service is ready at %s\n”, argv [1]);
18. while (TRUE) {
19. zmsg_t *request = zmsg_recv (server);
20. if (!request)
21. break; // Interrupted
22. // Fail nastily if run against wrong client
23. assert (zmsg_size (request) == 2);
24.
25. zframe_t *address = zmsg_pop (request);
26. zmsg_destroy (&request);
27.
28. zmsg_t *reply = zmsg_new ();
29. zmsg_add (reply, address);
30. zmsg_addstr (reply, ”OK”);
31. zmsg_send (&reply, server);
32. }
33. if (zctx_interrupted)
34. printf (“W: interrupted\n”);
35.
36. zctx_destroy (&ctx);
37. return 0;
客户端:
1. //
2. // Freelance client - Model 2
3. // Uses DEALER socket to blast one or more services
4. //
5. #include ”czmq.h”
6.
7. // If not a single service replies within this time, give up
8. #define GLOBAL_TIMEOUT 2500
9.
10. // We design our client API as a class
11.
12. #ifdef __cplusplus
13. extern “C” {
14. #endif
15.
16. // Opaque class structure
17. typedef struct _flclient_t flclient_t;
18.
19. flclient_t *
20. flclient_new (void);
21. void
22. flclient_destroy (flclient_t **self_p);
23. void
24. flclient_connect (flclient_t *self, char *endpoint);
25. zmsg_t *
26. flclient_request (flclient_t *self, zmsg_t **request_p);
27.
28. #ifdef __cplusplus
29. }
30. #endif
31.
32. int main (int argc, char *argv [])
33. {
34. if (argc == 1) {
35. printf (“I: syntax: %s <endpoint> …\n”, argv [0]);
36. exit (EXIT_SUCCESS);
37. }
38. // Create new freelance client object
39. flclient_t *client = flclient_new ();
40.
41. // Connect to each endpoint
42. int argn;
43. for (argn = 1; argn < argc; argn++)
44. flclient_connect (client, argv [argn]);
45.
46. // Send a bunch of name resolution ’requests’, measure time
47. int requests = 10000;
48. uint64_t start = zclock_time ();
49. while (requests–) {
50. zmsg_t *request = zmsg_new ();
51. zmsg_addstr (request, ”random name”);
52. zmsg_t *reply = flclient_request (client, &request);
53. if (!reply) {
54. printf (“E: name service not available, aborting\n”);
55. break;
56. }
57. zmsg_destroy (&reply);
58. }
59. printf (“Average round trip cost: %d usec\n”,
60. (int) (zclock_time () - start) / 10);
61.
62. flclient_destroy (&client);
63. return 0;
64. }
65.
66. // ——————————————————————–
67. // Structure of our class
68.
69. struct _flclient_t {
70. zctx_t *ctx; // Our context wrapper
71. void *socket; // DEALER socket talking to servers
72. size_t servers; // How many servers we have connected to
73. uint sequence; // Number of requests ever sent
74. };
75.
76. // ——————————————————————–
77. // Constructor
78.
79. flclient_t *
80. flclient_new (void)
81. {
82. flclient_t
83. *self;
84.
85. self = (flclient_t *) zmalloc (sizeof (flclient_t));
86. self->ctx = zctx_new ();
87. self->socket = zsocket_new (self->ctx, ZMQ_DEALER);
88. return self;
89. }
90.
91. // ——————————————————————–
92. // Destructor
93.
94. void
95. flclient_destroy (flclient_t **self_p)
96. {
97. assert (self_p);
98. if (*self_p) {
99. flclient_t *self = *self_p;
100. zctx_destroy (&self->ctx);
101. free (self);
102. *self_p = NULL;
103. }
104. }
105.
106. // ——————————————————————–
107. // Connect to new server endpoint
108.
109. void
110. flclient_connect (flclient_t *self, char *endpoint)
111. {
112. assert (self);
113. zsocket_connect (self->socket, endpoint);
114. self->servers++;
115. }
116.
117. // ——————————————————————–
118. // Send request, get reply
119. // Destroys request after sending
120.
121. zmsg_t *
122. flclient_request (flclient_t *self, zmsg_t **request_p)
123. {
124. assert (self);
125. assert (*request_p);
126. zmsg_t *request = *request_p;
127.
128. // Prefix request with sequence number and empty envelope
129. char sequence_text [10];
130. sprintf (sequence_text, ”%u”, ++self->sequence);
131. zmsg_pushstr (request, sequence_text);
132. zmsg_pushstr (request, ””);
133.
134. // Blast the request to all connected servers
135. int server;
136. for (server = 0; server < self->servers; server++) {
137. zmsg_t *msg = zmsg_dup (request);
138. zmsg_send (&msg, self->socket);
139. }
140. // Wait for a matching reply to arrive from anywhere
141. // Since we can poll several times, calculate each one
142. zmsg_t *reply = NULL;
143. uint64_t endtime = zclock_time () + GLOBAL_TIMEOUT;
144. while (zclock_time () < endtime) {
145. zmq_pollitem_t items [] = { { self->socket, 0, ZMQ_POLLIN, 0 } };
146. zmq_poll (items, 1, (endtime - zclock_time ()) * ZMQ_POLL_MSEC);
147. if (items [0].revents & ZMQ_POLLIN) {
148. // Reply is [empty][sequence][OK]
149. reply = zmsg_recv (self->socket);
150. assert (zmsg_size (reply) == 3);
151. free (zmsg_popstr (reply));
152. char *sequence = zmsg_popstr (reply);
153. int sequence_nbr = atoi (sequence);
154. free (sequence);
155. if (sequence_nbr == self->sequence)
156. break;
157. }
158. }
159. zmsg_destroy (request_p);
160. return reply;
161. }
客户端的代码做了些封装,使用到了api的引用结构,这也是作者所推崇的代码方式。
这种模型的优点:
1.简单!易于理解和使用
2.故障冗余不错(除非你的服务器全挂了)
缺点:
1.过多的冗余流量
2.服务器的使用没有优先级
3.所有的服务器同一时间内只能做一件事,还是同样的事
复杂到令人生厌
单词有一解释为“龌龊”。其实,可以理解为除了上面那种模型之外的模型。因为现实环境总是复杂的,那么模型相应也会越来越让人生厌,却又不得不如此。(果然龌龊)
通常来说,一个客户端知道一堆服务器模型虽然简单,可是业务运营未必,那么只让他知道一个路由地址呢。业务的代价就转移到技术上了(挺悲催的)。每多一层中间件,都会多一倍的复杂逻辑。
服务器端:
1. //
2. // Freelance server - Model 3
3. // Uses an ROUTER/ROUTER socket but just one thread
4. //
5. #include ”czmq.h”
6.
7. int main (int argc, char *argv [])
8. {
9. int verbose = (argc > 1 && streq (argv [1], ”-v”));
10.
11. zctx_t *ctx = zctx_new ();
12.
13. // Prepare server socket with predictable identity
14. char *bind_endpoint = ”tcp://*:5555″;
15. char *connect_endpoint = ”tcp://localhost:5555″;
16. void *server = zsocket_new (ctx, ZMQ_ROUTER);
17. zmq_setsockopt (server,
18. ZMQ_IDENTITY, connect_endpoint, strlen (connect_endpoint));
19. zsocket_bind (server, bind_endpoint);
20. printf (“I: service is ready at %s\n”, bind_endpoint);
21.
22. while (!zctx_interrupted) {
23. zmsg_t *request = zmsg_recv (server);
24. if (verbose && request)
25. zmsg_dump (request);
26. if (!request)
27. break; // Interrupted
28.
29. // Frame 0: identity of client
30. // Frame 1: PING, or client control frame
31. // Frame 2: request body
32. zframe_t *address = zmsg_pop (request);
33. zframe_t *control = zmsg_pop (request);
34. zmsg_t *reply = zmsg_new ();
35. if (zframe_streq (control, ”PONG”))
36. zmsg_addstr (reply, ”PONG”);
37. else {
38. zmsg_add (reply, control);
39. zmsg_addstr (reply, ”OK”);
40. }
41. zmsg_destroy (&request);
42. zmsg_push (reply, address);
43. if (verbose && reply)
44. zmsg_dump (reply);
45. zmsg_send (&reply, server);
46. }
47. if (zctx_interrupted)
48. printf (“W: interrupted\n”);
49.
50. zctx_destroy (&ctx);
51. return 0;
52. }
客户端:
1. //
2. // Freelance client - Model 3
3. // Uses flcliapi class to encapsulate Freelance pattern
4. //
5. // Lets us build this source without creating a library
6. #include ”flcliapi.c”
7.
8. int main (void)
9. {
10. // Create new freelance client object
11. flcliapi_t *client = flcliapi_new ();
12.
13. // Connect to several endpoints
14. flcliapi_connect (client, ”tcp://localhost:5555″);
15. flcliapi_connect (client, ”tcp://localhost:5556″);
16. flcliapi_connect (client, ”tcp://localhost:5557″);
17.
18. // Send a bunch of name resolution ’requests’, measure time
19. int requests = 1000;
20. uint64_t start = zclock_time ();
21. while (requests–) {
22. zmsg_t *request = zmsg_new ();
23. zmsg_addstr (request, ”random name”);
24. zmsg_t *reply = flcliapi_request (client, &request);
25. if (!reply) {
26. printf (“E: name service not available, aborting\n”);
27. break;
28. }
29. zmsg_destroy (&reply);
30. }
31. printf (“Average round trip cost: %d usec\n”,
32. (int) (zclock_time () - start) / 10);
33.
34. puts (“flclient 1”);
35. flcliapi_destroy (&client);
36. puts (“flclient 2”);
37. return 0;
38. }
客户端api:
1. /* =====================================================================
2. flcliapi - Freelance Pattern agent class
3. Model 3: uses ROUTER socket to address specific services
4.
5. ———————————————————————
6. Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
7. Copyright other contributors as noted in the AUTHORS file.
8.
9. This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
10.
11. This is free software; you can redistribute it and/or modify it under
12. the terms of the GNU Lesser General Public License as published by
13. the Free Software Foundation; either version 3 of the License, or (at
14. your option) any later version.
15.
16. This software is distributed in the hope that it will be useful, but
17. WITHOUT ANY WARRANTY; without even the implied warranty of
18. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
19. Lesser General Public License for more details.
20.
21. You should have received a copy of the GNU Lesser General Public
22. License along with this program. If not, see
23. <http://www.gnu.org/licenses/>.
24. =====================================================================
25. */
26.
27. #include ”flcliapi.h”
28.
29. // If no server replies within this time, abandon request
30. #define GLOBAL_TIMEOUT 3000 // msecs
31. // PING interval for servers we think are alive
32. #define PING_INTERVAL 2000 // msecs
33. // Server considered dead if silent for this long
34. #define SERVER_TTL 6000 // msecs
35.
36. // =====================================================================
37. // Synchronous part, works in our application thread
38.
39. // ———————————————————————
40. // Structure of our class
41.
42. struct _flcliapi_t {
43. zctx_t *ctx; // Our context wrapper
44. void *pipe; // Pipe through to flcliapi agent
45. };
46.
47. // This is the thread that handles our real flcliapi class
48. static void flcliapi_agent (void *args, zctx_t *ctx, void *pipe);
49.
50. // ———————————————————————
51. // Constructor
52.
53. flcliapi_t *
54. flcliapi_new (void)
55. {
56. flcliapi_t
57. *self;
58.
59. self = (flcliapi_t *) zmalloc (sizeof (flcliapi_t));
60. self->ctx = zctx_new ();
61. self->pipe = zthread_fork (self->ctx, flcliapi_agent, NULL);
62. return self;
63. }
64.
65. // ———————————————————————
66. // Destructor
67.
68. void
69. flcliapi_destroy (flcliapi_t **self_p)
70. {
71. assert (self_p);
72. if (*self_p) {
73. flcliapi_t *self = *self_p;
74. zctx_destroy (&self->ctx);
75. free (self);
76. *self_p = NULL;
77. }
78. }
79.
80. // ———————————————————————
81. // Connect to new server endpoint
82. // Sends [CONNECT][endpoint] to the agent
83.
84. void
85. flcliapi_connect (flcliapi_t *self, char *endpoint)
86. {
87. assert (self);
88. assert (endpoint);
89. zmsg_t *msg = zmsg_new ();
90. zmsg_addstr (msg, ”CONNECT”);
91. zmsg_addstr (msg, endpoint);
92. zmsg_send (&msg, self->pipe);
93. zclock_sleep (100); // Allow connection to come up
94. }
95.
96. // ———————————————————————
97. // Send & destroy request, get reply
98.
99. zmsg_t *
100. flcliapi_request (flcliapi_t *self, zmsg_t **request_p)
101. {
102. assert (self);
103. assert (*request_p);
104.
105. zmsg_pushstr (*request_p, ”REQUEST”);
106. zmsg_send (request_p, self->pipe);
107. zmsg_t *reply = zmsg_recv (self->pipe);
108. if (reply) {
109. char *status = zmsg_popstr (reply);
110. if (streq (status, ”FAILED”))
111. zmsg_destroy (&reply);
112. free (status);
113. }
114. return reply;
115. }
116.
117. // =====================================================================
118. // Asynchronous part, works in the background
119.
120. // ———————————————————————
121. // Simple class for one server we talk to
122.
123. typedef struct {
124. char *endpoint; // Server identity/endpoint
125. uint alive; // 1 if known to be alive
126. int64_t ping_at; // Next ping at this time
127. int64_t expires; // Expires at this time
128. } server_t;
129.
130. server_t *
131. server_new (char *endpoint)
132. {
133. server_t *self = (server_t *) zmalloc (sizeof (server_t));
134. self->endpoint = strdup (endpoint);
135. self->alive = 0;
136. self->ping_at = zclock_time () + PING_INTERVAL;
137. self->expires = zclock_time () + SERVER_TTL;
138. return self;
139. }
140.
141. void
142. server_destroy (server_t **self_p)
143. {
144. assert (self_p);
145. if (*self_p) {
146. server_t *self = *self_p;
147. free (self->endpoint);
148. free (self);
149. *self_p = NULL;
150. }
151. }
152.
153. int
154. server_ping (char *key, void *server, void *socket)
155. {
156. server_t *self = (server_t *) server;
157. if (zclock_time () >= self->ping_at) {
158. zmsg_t *ping = zmsg_new ();
159. zmsg_addstr (ping, self->endpoint);
160. zmsg_addstr (ping, ”PING”);
161. zmsg_send (&ping, socket);
162. self->ping_at = zclock_time () + PING_INTERVAL;
163. }
164. return 0;
165. }
166.
167. int
168. server_tickless (char *key, void *server, void *arg)
169. {
170. server_t *self = (server_t *) server;
171. uint64_t *tickless = (uint64_t *) arg;
172. if (*tickless > self->ping_at)
173. *tickless = self->ping_at;
174. return 0;
175. }
176.
177. // ———————————————————————
178. // Simple class for one background agent
179.
180. typedef struct {
181. zctx_t *ctx; // Own context
182. void *pipe; // Socket to talk back to application
183. void *router; // Socket to talk to servers
184. zhash_t *servers; // Servers we’ve connected to
185. zlist_t *actives; // Servers we know are alive
186. uint sequence; // Number of requests ever sent
187. zmsg_t *request; // Current request if any
188. zmsg_t *reply; // Current reply if any
189. int64_t expires; // Timeout for request/reply
190. } agent_t;
191.
192. agent_t *
193. agent_new (zctx_t *ctx, void *pipe)
194. {
195. agent_t *self = (agent_t *) zmalloc (sizeof (agent_t));
196. self->ctx = ctx;
197. self->pipe = pipe;
198. self->router = zsocket_new (self->ctx, ZMQ_ROUTER);
199. self->servers = zhash_new ();
200. self->actives = zlist_new ();
201. return self;
202. }
203.
204. void
205. agent_destroy (agent_t **self_p)
206. {
207. assert (self_p);
208. if (*self_p) {
209. agent_t *self = *self_p;
210. zhash_destroy (&self->servers);
211. zlist_destroy (&self->actives);
212. zmsg_destroy (&self->request);
213. zmsg_destroy (&self->reply);
214. free (self);
215. *self_p = NULL;
216. }
217. }
218.
219. // Callback when we remove server from agent ’servers’ hash table
220.
221. static void
222. s_server_free (void *argument)
223. {
224. server_t *server = (server_t *) argument;
225. server_destroy (&server);
226. }
227.
228. void
229. agent_control_message (agent_t *self)
230. {
231. zmsg_t *msg = zmsg_recv (self->pipe);
232. char *command = zmsg_popstr (msg);
233.
234. if (streq (command, ”CONNECT”)) {
235. char *endpoint = zmsg_popstr (msg);
236. printf (“I: connecting to %s…\n”, endpoint);
237. int rc = zmq_connect (self->router, endpoint);
238. assert (rc == 0);
239. server_t *server = server_new (endpoint);
240. zhash_insert (self->servers, endpoint, server);
241. zhash_freefn (self->servers, endpoint, s_server_free);
242. zlist_append (self->actives, server);
243. server->ping_at = zclock_time () + PING_INTERVAL;
244. server->expires = zclock_time () + SERVER_TTL;
245. free (endpoint);
246. }
247. else
248. if (streq (command, ”REQUEST”)) {
249. assert (!self->request); // Strict request-reply cycle
250. // Prefix request with sequence number and empty envelope
251. char sequence_text [10];
252. sprintf (sequence_text, ”%u”, ++self->sequence);
253. zmsg_pushstr (msg, sequence_text);
254. // Take ownership of request message
255. self->request = msg;
256. msg = NULL;
257. // Request expires after global timeout
258. self->expires = zclock_time () + GLOBAL_TIMEOUT;
259. }
260. free (command);
261. zmsg_destroy (&msg);
262. }
263.
264. void
265. agent_router_message (agent_t *self)
266. {
267. zmsg_t *reply = zmsg_recv (self->router);
268.
269. // Frame 0 is server that replied
270. char *endpoint = zmsg_popstr (reply);
271. server_t *server =
272. (server_t *) zhash_lookup (self->servers, endpoint);
273. assert (server);
274. free (endpoint);
275. if (!server->alive) {
276. zlist_append (self->actives, server);
277. server->alive = 1;
278. }
279. server->ping_at = zclock_time () + PING_INTERVAL;
280. server->expires = zclock_time () + SERVER_TTL;
281.
282. // Frame 1 may be sequence number for reply
283. char *sequence = zmsg_popstr (reply);
284. if (atoi (sequence) == self->sequence) {
285. zmsg_pushstr (reply, ”OK”);
286. zmsg_send (&reply, self->pipe);
287. zmsg_destroy (&self->request);
288. }
289. else
290. zmsg_destroy (&reply);
291. }
292.
293. // ———————————————————————
294. // Asynchronous agent manages server pool and handles request/reply
295. // dialog when the application asks for it.
296.
297. static void
298. flcliapi_agent (void *args, zctx_t *ctx, void *pipe)
299. {
300. agent_t *self = agent_new (ctx, pipe);
301.
302. zmq_pollitem_t items [] = {
303. { self->pipe, 0, ZMQ_POLLIN, 0 },
304. { self->router, 0, ZMQ_POLLIN, 0 }
305. };
306. while (!zctx_interrupted) {
307. // Calculate tickless timer, up to 1 hour
308. uint64_t tickless = zclock_time () + 1000 * 3600;
309. if (self->request
310. && tickless > self->expires)
311. tickless = self->expires;
312. zhash_foreach (self->servers, server_tickless, &tickless);
313.
314. int rc = zmq_poll (items, 2,
315. (tickless - zclock_time ()) * ZMQ_POLL_MSEC);
316. if (rc == -1)
317. break; // Context has been shut down
318.
319. if (items [0].revents & ZMQ_POLLIN)
320. agent_control_message (self);
321.
322. if (items [1].revents & ZMQ_POLLIN)
323. agent_router_message (self);
324.
325. // If we’re processing a request, dispatch to next server
326. if (self->request) {
327. if (zclock_time () >= self->expires) {
328. // Request expired, kill it
329. zstr_send (self->pipe, ”FAILED”);
330. zmsg_destroy (&self->request);
331. }
332. else {
333. // Find server to talk to, remove any expired ones
334. while (zlist_size (self->actives)) {
335. server_t *server =
336. (server_t *) zlist_first (self->actives);
337. if (zclock_time () >= server->expires) {
338. zlist_pop (self->actives);
339. server->alive = 0;
340. }
341. else {
342. zmsg_t *request = zmsg_dup (self->request);
343. zmsg_pushstr (request, server->endpoint);
344. zmsg_send (&request, self->router);
345. break;
346. }
347. }
348. }
349. }
350. // Disconnect and delete any expired servers
351. // Send heartbeats to idle servers if needed
352. zhash_foreach (self->servers, server_ping, self->router);
353. }
354. agent_destroy (&self);
355. }
呼呼,这个api有够复杂吧,它包含了:
异步机制
链路稳固机制
ping机制
计时器
终于,可靠性章节终于结束了。按官网的说法:比较靠谱,现实可用的就是上面那个令人生厌的模式了,当然“管家模式”也是可以一试的。
zeroMQ初体验-30.发布/订阅模式进阶-自裁的蜗牛订阅者
博客分类:
· MQ
在初次介绍发布/订阅模式的时候,就已经抖出了这个包袱:如果订阅者的消费速度慢,则会造成发布者端队列堆积,怎么办?本篇即是针对可能出现的”蜗牛”般的订阅者而生。
通常的做法:
在发布端用靠谱的队列承接来不及被消费的信息,这样会增大发布端的压力。
在订阅端用靠谱的队列承接来不及消费的信息,压力转嫁给各订阅端。
与前面相似,在队列中设置阈值,溢出则不收录。
压迫订阅端,当发布端发现订阅端过慢,给予惩罚性质的断开连接。
虽然上述4种方案都还算经典,不过总有欠妥之处。最好的方法莫过于让订阅者明白自个儿能力不足,作出主动性措施,比如:暂时性断开,优化了再来~
至于如何让订阅端检测是否消费能力不足,只需在订阅端设置一个有阈值的队列缓存数据,发布端给所有数据打上标号,如果订阅端读到“断号”,即可认定有数据超出阈值被舍弃了,那么,嘿嘿,主动整改吧。
虽然说中断订阅连接不是太理想,不过,总比不可知,不可控的一味流转数据要可靠的多,只少,都还掌控在自己手中~
1. //
2. // Suicidal Snail
3. //
4. #include ”czmq.h”
5.
6. // ———————————————————————
7. // This is our subscriber
8. // It connects to the publisher and subscribes to everything. It
9. // sleeps for a short time between messages to simulate doing too
10. // much work. If a message is more than 1 second late, it croaks.
11.
12. #define MAX_ALLOWED_DELAY 1000 // msecs
13.
14. static void
15. subscriber (void *args, zctx_t *ctx, void *pipe)
16. {
17. // Subscribe to everything
18. void *subscriber = zsocket_new (ctx, ZMQ_SUB);
19. zsocket_connect (subscriber, ”tcp://localhost:5556″);
20.
21. // Get and process messages
22. while (1) {
23. char *string = zstr_recv (subscriber);
24. int64_t clock;
25. int terms = sscanf (string, ”%” PRId64, &clock);
26. assert (terms == 1);
27. free (string);
28.
29. // Suicide snail logic
30. if (zclock_time () - clock > MAX_ALLOWED_DELAY) {
31. fprintf (stderr, ”E: subscriber cannot keep up, aborting\n”);
32. break;
33. }
34. // Work for 1 msec plus some random additional time
35. zclock_sleep (1 + randof (2));
36. }
37. zstr_send (pipe, ”gone and died”);
38. }
39.
40. // ———————————————————————
41. // This is our server task
42. // It publishes a time-stamped message to its pub socket every 1ms.
43.
44. static void
45. publisher (void *args, zctx_t *ctx, void *pipe)
46. {
47. // Prepare publisher
48. void *publisher = zsocket_new (ctx, ZMQ_PUB);
49. zsocket_bind (publisher, ”tcp://*:5556″);
50.
51. while (1) {
52. // Send current clock (msecs) to subscribers
53. char string [20];
54. sprintf (string, ”%” PRId64, zclock_time ());
55. zstr_send (publisher, string);
56. char *signal = zstr_recv_nowait (pipe);
57. if (signal) {
58. free (signal);
59. break;
60. }
61. zclock_sleep (1); // 1msec wait
62. }
63. }
64.
65. // This main thread simply starts a client, and a server, and then
66. // waits for the client to signal it’s died.
67. //
68. int main (void)
69. {
70. zctx_t *ctx = zctx_new ();
71. void *pubpipe = zthread_fork (ctx, publisher, NULL);
72. void *subpipe = zthread_fork (ctx, subscriber, NULL);
73. free (zstr_recv (subpipe));
74. zstr_send (pubpipe, ”break”);
75. zclock_sleep (100);
76. zctx_destroy (&ctx);
77. return 0;
78. }
注意:
这里并未用到数据编号,而是打上了时间戳来判定数据是否延迟到不可接受的地步了,如何自我判定,方法万千,择优即可。
zeroMQ初体验-31.发布/订阅模式进阶-黑盒的高速订阅者
博客分类:
· MQ
作为发布/订阅模式的一个常用场景,大数据量的组播是有必要的。虽然100k/s的数度对于zeromq实在稀松平常,不过,谁会介意更快呢。
模型图:
提高效率的核心在于充分利用硬件资源,比如多核的cpu,多线程即为此而生(可惜python没有)。
于是,上图的线程级模型就会变成这样:
需要注意的:
· 这里需要两个i/0线程
· 两张网卡(俩线程各用其一)
· 这俩还得各自独占一个cpu核,其他线程(worker)根据核数确定数量,并且同时都对应两个i/o线程做连接。
· 线程数最好不要大于cpu核数,不然,适得其反
。
额,竟然没有代码,单薄了些~
zeroMQ初体验-32.发布/订阅模式进阶-克隆模式-上
博客分类:
· MQ
在发布/订阅模式中,特别是现实应用中,总会因为这样那样的问题导致订阅者丢失了所需的数据,如此,便有了重新获得的需求。通常来说,这个会由订阅者来完成,不过”千百个哈姆雷特”从工程的角度来看,实在不忍睹,完全违背了”复用”的概念。于是乎,”克隆模式”便呼之待出了。在发布端存储下这些消息,为了避免队列的堆积这样的杯具,也为了更好的订阅体验,kev-value似乎是不错的选择。
注意:这里的kev-value并非目前红火的nosql(虽然有些类似),可以理解成发布者的数据仓库(应该可以这么理解吧)。
为了简单明了,这里将会对整个机制做一个拆解。
更新数据的存储
模型图:
服务器:
1. //
2. // Clone server Model One
3. //
4.
5. // Lets us build this source without creating a library
6. #include ”kvsimple.c”
7.
8. int main (void)
9. {
10. // Prepare our context and publisher socket
11. zctx_t *ctx = zctx_new ();
12. void *publisher = zsocket_new (ctx, ZMQ_PUB);
13. zsocket_bind (publisher, ”tcp://*:5556″);
14. zclock_sleep (200);
15.
16. zhash_t *kvmap = zhash_new ();
17. int64_t sequence = 0;
18. srandom ((unsigned) time (NULL));
19.
20. while (!zctx_interrupted) {
21. // Distribute as key-value message
22. kvmsg_t *kvmsg = kvmsg_new (++sequence);
23. kvmsg_fmt_key (kvmsg, ”%d”, randof (10000));
24. kvmsg_fmt_body (kvmsg, ”%d”, randof (1000000));
25. kvmsg_send (kvmsg, publisher);
26. kvmsg_store (&kvmsg, kvmap);
27. }
28. printf (“ Interrupted\n%d messages out\n”, (int) sequence);
29. zhash_destroy (&kvmap);
30. zctx_destroy (&ctx);
31. return 0;
32. }
客户端:
1. //
2. // Clone client Model One
3. //
4.
5. // Lets us build this source without creating a library
6. #include ”kvsimple.c”
7.
8. int main (void)
9. {
10. // Prepare our context and updates socket
11. zctx_t *ctx = zctx_new ();
12. void *updates = zsocket_new (ctx, ZMQ_SUB);
13. zsocket_connect (updates, ”tcp://localhost:5556″);
14.
15. zhash_t *kvmap = zhash_new ();
16. int64_t sequence = 0;
17.
18. while (TRUE) {
19. kvmsg_t *kvmsg = kvmsg_recv (updates);
20. if (!kvmsg)
21. break; // Interrupted
22. kvmsg_store (&kvmsg, kvmap);
23. sequence++;
24. }
25. printf (“ Interrupted\n%d messages in\n”, (int) sequence);
26. zhash_destroy (&kvmap);
27. zctx_destroy (&ctx);
28. return 0;
29. }
key-value库:
1. /* =====================================================================
2. kvsimple - simple key-value message class for example applications
3.
4. ———————————————————————
5. Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
6. Copyright other contributors as noted in the AUTHORS file.
7.
8. This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
9.
10. This is free software; you can redistribute it and/or modify it under
11. the terms of the GNU Lesser General Public License as published by
12. the Free Software Foundation; either version 3 of the License, or (at
13. your option) any later version.
14.
15. This software is distributed in the hope that it will be useful, but
16. WITHOUT ANY WARRANTY; without even the implied warranty of
17. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18. Lesser General Public License for more details.
19.
20. You should have received a copy of the GNU Lesser General Public
21. License along with this program. If not, see
22. <http://www.gnu.org/licenses/>.
23. =====================================================================
24. */
25.
26. #include ”kvsimple.h”
27. #include ”zlist.h”
28.
29. // Keys are short strings
30. #define KVMSG_KEY_MAX 255
31.
32. // Message is formatted on wire as 4 frames:
33. // frame 0: key (0MQ string)
34. // frame 1: sequence (8 bytes, network order)
35. // frame 2: body (blob)
36. #define FRAME_KEY 0
37. #define FRAME_SEQ 1
38. #define FRAME_BODY 2
39. #define KVMSG_FRAMES 3
40.
41. // Structure of our class
42. struct _kvmsg {
43. // Presence indicators for each frame
44. int present [KVMSG_FRAMES];
45. // Corresponding 0MQ message frames, if any
46. zmq_msg_t frame [KVMSG_FRAMES];
47. // Key, copied into safe C string
48. char key [KVMSG_KEY_MAX + 1];
49. };
50.
51. // ———————————————————————
52. // Constructor, sets sequence as provided
53.
54. kvmsg_t *
55. kvmsg_new (int64_t sequence)
56. {
57. kvmsg_t
58. *self;
59.
60. self = (kvmsg_t *) zmalloc (sizeof (kvmsg_t));
61. kvmsg_set_sequence (self, sequence);
62. return self;
63. }
64.
65. // ———————————————————————
66. // Destructor
67.
68. // Free shim, compatible with zhash_free_fn
69. void
70. kvmsg_free (void *ptr)
71. {
72. if (ptr) {
73. kvmsg_t *self = (kvmsg_t *) ptr;
74. // Destroy message frames if any
75. int frame_nbr;
76. for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++)
77. if (self->present [frame_nbr])
78. zmq_msg_close (&self->frame [frame_nbr]);
79.
80. // Free object itself
81. free (self);
82. }
83. }
84.
85. void
86. kvmsg_destroy (kvmsg_t **self_p)
87. {
88. assert (self_p);
89. if (*self_p) {
90. kvmsg_free (*self_p);
91. *self_p = NULL;
92. }
93. }
94.
95. // ———————————————————————
96. // Reads key-value message from socket, returns new kvmsg instance.
97.
98. kvmsg_t *
99. kvmsg_recv (void *socket)
100. {
101. assert (socket);
102. kvmsg_t *self = kvmsg_new (0);
103.
104. // Read all frames off the wire, reject if bogus
105. int frame_nbr;
106. for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) {
107. if (self->present [frame_nbr])
108. zmq_msg_close (&self->frame [frame_nbr]);
109. zmq_msg_init (&self->frame [frame_nbr]);
110. self->present [frame_nbr] = 1;
111. if (zmq_recvmsg (socket, &self->frame [frame_nbr], 0) == -1) {
112. kvmsg_destroy (&self);
113. break;
114. }
115. // Verify multipart framing
116. int rcvmore = (frame_nbr < KVMSG_FRAMES - 1)? 1: 0;
117. if (zsockopt_rcvmore (socket) != rcvmore) {
118. kvmsg_destroy (&self);
119. break;
120. }
121. }
122. return self;
123. }
124.
125. // ———————————————————————
126. // Send key-value message to socket; any empty frames are sent as such.
127.
128. void
129. kvmsg_send (kvmsg_t *self, void *socket)
130. {
131. assert (self);
132. assert (socket);
133.
134. int frame_nbr;
135. for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) {
136. zmq_msg_t copy;
137. zmq_msg_init (©);
138. if (self->present [frame_nbr])
139. zmq_msg_copy (©, &self->frame [frame_nbr]);
140. zmq_sendmsg (socket, ©,
141. (frame_nbr < KVMSG_FRAMES - 1)? ZMQ_SNDMORE: 0);
142. zmq_msg_close (©);
143. }
144. }
145.
146. // ———————————————————————
147. // Return key from last read message, if any, else NULL
148.
149. char *
150. kvmsg_key (kvmsg_t *self)
151. {
152. assert (self);
153. if (self->present [FRAME_KEY]) {
154. if (!*self->key) {
155. size_t size = zmq_msg_size (&self->frame [FRAME_KEY]);
156. if (size > KVMSG_KEY_MAX)
157. size = KVMSG_KEY_MAX;
158. memcpy (self->key,
159. zmq_msg_data (&self->frame [FRAME_KEY]), size);
160. self->key [size] = 0;
161. }
162. return self->key;
163. }
164. else
165. return NULL;
166. }
167.
168. // ———————————————————————
169. // Return sequence nbr from last read message, if any
170.
171. int64_t
172. kvmsg_sequence (kvmsg_t *self)
173. {
174. assert (self);
175. if (self->present [FRAME_SEQ]) {
176. assert (zmq_msg_size (&self->frame [FRAME_SEQ]) == 8);
177. byte *source = zmq_msg_data (&self->frame [FRAME_SEQ]);
178. int64_t sequence = ((int64_t) (source [0]) << 56)
179. + ((int64_t) (source [1]) << 48)
180. + ((int64_t) (source [2]) << 40)
181. + ((int64_t) (source [3]) << 32)
182. + ((int64_t) (source [4]) << 24)
183. + ((int64_t) (source [5]) << 16)
184. + ((int64_t) (source [6]) << 8)
185. + (int64_t) (source [7]);
186. return sequence;
187. }
188. else
189. return 0;
190. }
191.
192. // ———————————————————————
193. // Return body from last read message, if any, else NULL
194.
195. byte *
196. kvmsg_body (kvmsg_t *self)
197. {
198. assert (self);
199. if (self->present [FRAME_BODY])
200. return (byte *) zmq_msg_data (&self->frame [FRAME_BODY]);
201. else
202. return NULL;
203. }
204.
205. // ———————————————————————
206. // Return body size from last read message, if any, else zero
207.
208. size_t
209. kvmsg_size (kvmsg_t *self)
210. {
211. assert (self);
212. if (self->present [FRAME_BODY])
213. return zmq_msg_size (&self->frame [FRAME_BODY]);
214. else
215. return 0;
216. }
217.
218. // ———————————————————————
219. // Set message key as provided
220.
221. void
222. kvmsg_set_key (kvmsg_t *self, char *key)
223. {
224. assert (self);
225. zmq_msg_t *msg = &self->frame [FRAME_KEY];
226. if (self->present [FRAME_KEY])
227. zmq_msg_close (msg);
228. zmq_msg_init_size (msg, strlen (key));
229. memcpy (zmq_msg_data (msg), key, strlen (key));
230. self->present [FRAME_KEY] = 1;
231. }
232.
233. // ———————————————————————
234. // Set message sequence number
235.
236. void
237. kvmsg_set_sequence (kvmsg_t *self, int64_t sequence)
238. {
239. assert (self);
240. zmq_msg_t *msg = &self->frame [FRAME_SEQ];
241. if (self->present [FRAME_SEQ])
242. zmq_msg_close (msg);
243. zmq_msg_init_size (msg, 8);
244.
245. byte *source = zmq_msg_data (msg);
246. source [0] = (byte) ((sequence >> 56) & 255);
247. source [1] = (byte) ((sequence >> 48) & 255);
248. source [2] = (byte) ((sequence >> 40) & 255);
249. source [3] = (byte) ((sequence >> 32) & 255);
250. source [4] = (byte) ((sequence >> 24) & 255);
251. source [5] = (byte) ((sequence >> 16) & 255);
252. source [6] = (byte) ((sequence >> 8) & 255);
253. source [7] = (byte) ((sequence) & 255);
254.
255. self->present [FRAME_SEQ] = 1;
256. }
257.
258. // ———————————————————————
259. // Set message body
260.
261. void
262. kvmsg_set_body (kvmsg_t *self, byte *body, size_t size)
263. {
264. assert (self);
265. zmq_msg_t *msg = &self->frame [FRAME_BODY];
266. if (self->present [FRAME_BODY])
267. zmq_msg_close (msg);
268. self->present [FRAME_BODY] = 1;
269. zmq_msg_init_size (msg, size);
270. memcpy (zmq_msg_data (msg), body, size);
271. }
272.
273. // ———————————————————————
274. // Set message key using printf format
275.
276. void
277. kvmsg_fmt_key (kvmsg_t *self, char *format, …)
278. {
279. char value [KVMSG_KEY_MAX + 1];
280. va_list args;
281.
282. assert (self);
283. va_start (args, format);
284. vsnprintf (value, KVMSG_KEY_MAX, format, args);
285. va_end (args);
286. kvmsg_set_key (self, value);
287. }
288.
289. // ———————————————————————
290. // Set message body using printf format
291.
292. void
293. kvmsg_fmt_body (kvmsg_t *self, char *format, …)
294. {
295. char value [255 + 1];
296. va_list args;
297.
298. assert (self);
299. va_start (args, format);
300. vsnprintf (value, 255, format, args);
301. va_end (args);
302. kvmsg_set_body (self, (byte *) value, strlen (value));
303. }
304.
305. // ———————————————————————
306. // Store entire kvmsg into hash map, if key/value are set
307. // Nullifies kvmsg reference, and destroys automatically when no longer
308. // needed.
309.
310. void
311. kvmsg_store (kvmsg_t **self_p, zhash_t *hash)
312. {
313. assert (self_p);
314. if (*self_p) {
315. kvmsg_t *self = *self_p;
316. assert (self);
317. if (self->present [FRAME_KEY]
318. && self->present [FRAME_BODY]) {
319. zhash_update (hash, kvmsg_key (self), self);
320. zhash_freefn (hash, kvmsg_key (self), kvmsg_free);
321. }
322. *self_p = NULL;
323. }
324. }
325.
326. // ———————————————————————
327. // Dump message to stderr, for debugging and tracing
328.
329. void
330. kvmsg_dump (kvmsg_t *self)
331. {
332. if (self) {
333. if (!self) {
334. fprintf (stderr, ”NULL”);
335. return;
336. }
337. size_t size = kvmsg_size (self);
338. byte *body = kvmsg_body (self);
339. fprintf (stderr, ”[seq:%” PRId64 ”]”, kvmsg_sequence (self));
340. fprintf (stderr, ”[key:%s]”, kvmsg_key (self));
341. fprintf (stderr, ”[size:%zd] ”, size);
342. int char_nbr;
343. for (char_nbr = 0; char_nbr < size; char_nbr++)
344. fprintf (stderr, ”%02X”, body [char_nbr]);
345. fprintf (stderr, ”\n”);
346. }
347. else
348. fprintf (stderr, ”NULL message\n”);
349. }
350.
351. // ———————————————————————
352. // Runs self test of class
353.
354. int
355. kvmsg_test (int verbose)
356. {
357. kvmsg_t
358. *kvmsg;
359.
360. printf (“ * kvmsg: ”);
361.
362. // Prepare our context and sockets
363. zctx_t *ctx = zctx_new ();
364. void *output = zsocket_new (ctx, ZMQ_DEALER);
365. int rc = zmq_bind (output, ”ipc://kvmsg_selftest.ipc”);
366. assert (rc == 0);
367. void *input = zsocket_new (ctx, ZMQ_DEALER);
368. rc = zmq_connect (input, ”ipc://kvmsg_selftest.ipc”);
369. assert (rc == 0);
370.
371. zhash_t *kvmap = zhash_new ();
372.
373. // Test send and receive of simple message
374. kvmsg = kvmsg_new (1);
375. kvmsg_set_key (kvmsg, ”key”);
376. kvmsg_set_body (kvmsg, (byte *) ”body”, 4);
377. if (verbose)
378. kvmsg_dump (kvmsg);
379. kvmsg_send (kvmsg, output);
380. kvmsg_store (&kvmsg, kvmap);
381.
382. kvmsg = kvmsg_recv (input);
383. if (verbose)
384. kvmsg_dump (kvmsg);
385. assert (streq (kvmsg_key (kvmsg), ”key”));
386. kvmsg_store (&kvmsg, kvmap);
387.
388. // Shutdown and destroy all objects
389. zhash_destroy (&kvmap);
390. zctx_destroy (&ctx);
391.
392. printf (“OK\n”);
393. return 0;
394. }
根据key获取数据
其实,当订阅者可以发出key来获取数据的时候,它已经不是一个纯粹的订阅者了,或许客户端的称谓会更合适些。
模型图:
服务器:
1. //
2. // Clone server Model Two
3. //
4.
5. // Lets us build this source without creating a library
6. #include ”kvsimple.c”
7.
8. static int s_send_single (char *key, void *data, void *args);
9. static void state_manager (void *args, zctx_t *ctx, void *pipe);
10.
11. int main (void)
12. {
13. // Prepare our context and sockets
14. zctx_t *ctx = zctx_new ();
15. void *publisher = zsocket_new (ctx, ZMQ_PUB);
16. zsocket_bind (publisher, ”tcp://*:5557″);
17.
18. int64_t sequence = 0;
19. srandom ((unsigned) time (NULL));
20.
21. // Start state manager and wait for synchronization signal
22. void *updates = zthread_fork (ctx, state_manager, NULL);
23. free (zstr_recv (updates));
24.
25. while (!zctx_interrupted) {
26. // Distribute as key-value message
27. kvmsg_t *kvmsg = kvmsg_new (++sequence);
28. kvmsg_fmt_key (kvmsg, ”%d”, randof (10000));
29. kvmsg_fmt_body (kvmsg, ”%d”, randof (1000000));
30. kvmsg_send (kvmsg, publisher);
31. kvmsg_send (kvmsg, updates);
32. kvmsg_destroy (&kvmsg);
33. }
34. printf (“ Interrupted\n%d messages out\n”, (int) sequence);
35. zctx_destroy (&ctx);
36. return 0;
37. }
38.
39. // Routing information for a key-value snapshot
40. typedef struct {
41. void *socket; // ROUTER socket to send to
42. zframe_t *identity; // Identity of peer who requested state
43. } kvroute_t;
44.
45. // Send one state snapshot key-value pair to a socket
46. // Hash item data is our kvmsg object, ready to send
47. static int
48. s_send_single (char *key, void *data, void *args)
49. {
50. kvroute_t *kvroute = (kvroute_t *) args;
51. // Send identity of recipient first
52. zframe_send (&kvroute->identity,
53. kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
54. kvmsg_t *kvmsg = (kvmsg_t *) data;
55. kvmsg_send (kvmsg, kvroute->socket);
56. return 0;
57. }
58.
59. // This thread maintains the state and handles requests from
60. // clients for snapshots.
61. //
62. static void
63. state_manager (void *args, zctx_t *ctx, void *pipe)
64. {
65. zhash_t *kvmap = zhash_new ();
66.
67. zstr_send (pipe, ”READY”);
68. void *snapshot = zsocket_new (ctx, ZMQ_ROUTER);
69. zsocket_bind (snapshot, ”tcp://*:5556″);
70.
71. zmq_pollitem_t items [] = {
72. { pipe, 0, ZMQ_POLLIN, 0 },
73. { snapshot, 0, ZMQ_POLLIN, 0 }
74. };
75. int64_t sequence = 0; // Current snapshot version number
76. while (!zctx_interrupted) {
77. int rc = zmq_poll (items, 2, -1);
78. if (rc == -1 && errno == ETERM)
79. break; // Context has been shut down
80.
81. // Apply state update from main thread
82. if (items [0].revents & ZMQ_POLLIN) {
83. kvmsg_t *kvmsg = kvmsg_recv (pipe);
84. if (!kvmsg)
85. break; // Interrupted
86. sequence = kvmsg_sequence (kvmsg);
87. kvmsg_store (&kvmsg, kvmap);
88. }
89. // Execute state snapshot request
90. if (items [1].revents & ZMQ_POLLIN) {
91. zframe_t *identity = zframe_recv (snapshot);
92. if (!identity)
93. break; // Interrupted
94.
95. // Request is in second frame of message
96. char *request = zstr_recv (snapshot);
97. if (streq (request, ”ICANHAZ?”))
98. free (request);
99. else {
100. printf (“E: bad request, aborting\n”);
101. break;
102. }
103. // Send state snapshot to client
104. kvroute_t routing = { snapshot, identity };
105.
106. // For each entry in kvmap, send kvmsg to client
107. zhash_foreach (kvmap, s_send_single, &routing);
108.
109. // Now send END message with sequence number
110. printf (“Sending state shapshot=%d\n”, (int) sequence);
111. zframe_send (&identity, snapshot, ZFRAME_MORE);
112. kvmsg_t *kvmsg = kvmsg_new (sequence);
113. kvmsg_set_key (kvmsg, ”KTHXBAI”);
114. kvmsg_set_body (kvmsg, (byte *) ””, 0);
115. kvmsg_send (kvmsg, snapshot);
116. kvmsg_destroy (&kvmsg);
117. }
118. }
119. zhash_destroy (&kvmap);
120. }
客户端:
1. //
2. // Clone client Model Two
3. //
4.
5. // Lets us build this source without creating a library
6. #include ”kvsimple.c”
7.
8. int main (void)
9. {
10. // Prepare our context and subscriber
11. zctx_t *ctx = zctx_new ();
12. void *snapshot = zsocket_new (ctx, ZMQ_DEALER);
13. zsocket_connect (snapshot, ”tcp://localhost:5556″);
14. void *subscriber = zsocket_new (ctx, ZMQ_SUB);
15. zsocket_connect (subscriber, ”tcp://localhost:5557″);
16.
17. zhash_t *kvmap = zhash_new ();
18.
19. // Get state snapshot
20. int64_t sequence = 0;
21. zstr_send (snapshot, ”ICANHAZ?”);
22. while (TRUE) {
23. kvmsg_t *kvmsg = kvmsg_recv (snapshot);
24. if (!kvmsg)
25. break; // Interrupted
26. if (streq (kvmsg_key (kvmsg), ”KTHXBAI”)) {
27. sequence = kvmsg_sequence (kvmsg);
28. printf (“Received snapshot=%d\n”, (int) sequence);
29. kvmsg_destroy (&kvmsg);
30. break; // Done
31. }
32. kvmsg_store (&kvmsg, kvmap);
33. }
34. // Now apply pending updates, discard out-of-sequence messages
35. while (!zctx_interrupted) {
36. kvmsg_t *kvmsg = kvmsg_recv (subscriber);
37. if (!kvmsg)
38. break; // Interrupted
39. if (kvmsg_sequence (kvmsg) > sequence) {
40. sequence = kvmsg_sequence (kvmsg);
41. kvmsg_store (&kvmsg, kvmap);
42. }
43. else
44. kvmsg_destroy (&kvmsg);
45. }
46. zhash_destroy (&kvmap);
47. zctx_destroy (&ctx);
48. return 0;
49. }
重新发布更新
上面的模型中,数据都集中在一点,或许会有服务器崩溃而导致数据丢失的顾虑,那么,把数据放到客户端呢?
模型图:
服务器:
1. //
2. // Clone server Model Three
3. //
4.
5. // Lets us build this source without creating a library
6. #include ”kvsimple.c”
7.
8. static int s_send_single (char *key, void *data, void *args);
9.
10. // Routing information for a key-value snapshot
11. typedef struct {
12. void *socket; // ROUTER socket to send to
13. zframe_t *identity; // Identity of peer who requested state
14. } kvroute_t;
15.
16. int main (void)
17. {
18. // Prepare our context and sockets
19. zctx_t *ctx = zctx_new ();
20. void *snapshot = zsocket_new (ctx, ZMQ_ROUTER);
21. zsocket_bind (snapshot, ”tcp://*:5556″);
22. void *publisher = zsocket_new (ctx, ZMQ_PUB);
23. zsocket_bind (publisher, ”tcp://*:5557″);
24. void *collector = zsocket_new (ctx, ZMQ_PULL);
25. zsocket_bind (collector, ”tcp://*:5558″);
26.
27. int64_t sequence = 0;
28. zhash_t *kvmap = zhash_new ();
29.
30. zmq_pollitem_t items [] = {
31. { collector, 0, ZMQ_POLLIN, 0 },
32. { snapshot, 0, ZMQ_POLLIN, 0 }
33. };
34. while (!zctx_interrupted) {
35. int rc = zmq_poll (items, 2, 1000 * ZMQ_POLL_MSEC);
36.
37. // Apply state update sent from client
38. if (items [0].revents & ZMQ_POLLIN) {
39. kvmsg_t *kvmsg = kvmsg_recv (collector);
40. if (!kvmsg)
41. break; // Interrupted
42. kvmsg_set_sequence (kvmsg, ++sequence);
43. kvmsg_send (kvmsg, publisher);
44. kvmsg_store (&kvmsg, kvmap);
45. printf (“I: publishing update %5d\n”, (int) sequence);
46. }
47. // Execute state snapshot request
48. if (items [1].revents & ZMQ_POLLIN) {
49. zframe_t *identity = zframe_recv (snapshot);
50. if (!identity)
51. break; // Interrupted
52.
53. // Request is in second frame of message
54. char *request = zstr_recv (snapshot);
55. if (streq (request, ”ICANHAZ?”))
56. free (request);
57. else {
58. printf (“E: bad request, aborting\n”);
59. break;
60. }
61. // Send state snapshot to client
62. kvroute_t routing = { snapshot, identity };
63.
64. // For each entry in kvmap, send kvmsg to client
65. zhash_foreach (kvmap, s_send_single, &routing);
66.
67. // Now send END message with sequence number
68. printf (“I: sending shapshot=%d\n”, (int) sequence);
69. zframe_send (&identity, snapshot, ZFRAME_MORE);
70. kvmsg_t *kvmsg = kvmsg_new (sequence);
71. kvmsg_set_key (kvmsg, ”KTHXBAI”);
72. kvmsg_set_body (kvmsg, (byte *) ””, 0);
73. kvmsg_send (kvmsg, snapshot);
74. kvmsg_destroy (&kvmsg);
75. }
76. }
77. printf (“ Interrupted\n%d messages handled\n”, (int) sequence);
78. zhash_destroy (&kvmap);
79. zctx_destroy (&ctx);
80.
81. return 0;
82. }
83.
84. // Send one state snapshot key-value pair to a socket
85. // Hash item data is our kvmsg object, ready to send
86. static int
87. s_send_single (char *key, void *data, void *args)
88. {
89. kvroute_t *kvroute = (kvroute_t *) args;
90. // Send identity of recipient first
91. zframe_send (&kvroute->identity,
92. kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
93. kvmsg_t *kvmsg = (kvmsg_t *) data;
94. kvmsg_send (kvmsg, kvroute->socket);
95. return 0;
96. }
客户端:
1. //
2. // Clone client Model Three
3. //
4.
5. // Lets us build this source without creating a library
6. #include ”kvsimple.c”
7.
8. int main (void)
9. {
10. // Prepare our context and subscriber
11. zctx_t *ctx = zctx_new ();
12. void *snapshot = zsocket_new (ctx, ZMQ_DEALER);
13. zsocket_connect (snapshot, ”tcp://localhost:5556″);
14. void *subscriber = zsocket_new (ctx, ZMQ_SUB);
15. zsocket_connect (subscriber, ”tcp://localhost:5557″);
16. void *publisher = zsocket_new (ctx, ZMQ_PUSH);
17. zsocket_connect (publisher, ”tcp://localhost:5558″);
18.
19. zhash_t *kvmap = zhash_new ();
20. srandom ((unsigned) time (NULL));
21.
22. // Get state snapshot
23. int64_t sequence = 0;
24. zstr_send (snapshot, ”ICANHAZ?”);
25. while (TRUE) {
26. kvmsg_t *kvmsg = kvmsg_recv (snapshot);
27. if (!kvmsg)
28. break; // Interrupted
29. if (streq (kvmsg_key (kvmsg), ”KTHXBAI”)) {
30. sequence = kvmsg_sequence (kvmsg);
31. printf (“I: received snapshot=%d\n”, (int) sequence);
32. kvmsg_destroy (&kvmsg);
33. break; // Done
34. }
35. kvmsg_store (&kvmsg, kvmap);
36. }
37. int64_t alarm = zclock_time () + 1000;
38. while (!zctx_interrupted) {
39. zmq_pollitem_t items [] = { { subscriber, 0, ZMQ_POLLIN, 0 } };
40. int tickless = (int) ((alarm - zclock_time ()));
41. if (tickless < 0)
42. tickless = 0;
43. int rc = zmq_poll (items, 1, tickless * ZMQ_POLL_MSEC);
44. if (rc == -1)
45. break; // Context has been shut down
46.
47. if (items [0].revents & ZMQ_POLLIN) {
48. kvmsg_t *kvmsg = kvmsg_recv (subscriber);
49. if (!kvmsg)
50. break; // Interrupted
51.
52. // Discard out-of-sequence kvmsgs, incl. heartbeats
53. if (kvmsg_sequence (kvmsg) > sequence) {
54. sequence = kvmsg_sequence (kvmsg);
55. kvmsg_store (&kvmsg, kvmap);
56. printf (“I: received update=%d\n”, (int) sequence);
57. }
58. else
59. kvmsg_destroy (&kvmsg);
60. }
61. // If we timed-out, generate a random kvmsg
62. if (zclock_time () >= alarm) {
63. kvmsg_t *kvmsg = kvmsg_new (0);
64. kvmsg_fmt_key (kvmsg, ”%d”, randof (10000));
65. kvmsg_fmt_body (kvmsg, ”%d”, randof (1000000));
66. kvmsg_send (kvmsg, publisher);
67. kvmsg_destroy (&kvmsg);
68. alarm = zclock_time () + 1000;
69. }
70. }
71. printf (“ Interrupted\n%d messages in\n”, (int) sequence);
72. zhash_destroy (&kvmap);
73. zctx_destroy (&ctx);
74. return 0;
75. }
克隆子树
事实上,并不是所有的消费者都愿意消费发布者所提供的所有信息,那么,针对特别的群体,只需提供一个子集就可以了。
服务器:
1. //
2. // Clone server Model Four
3. //
4.
5. // Lets us build this source without creating a library
6. #include ”kvsimple.c”
7.
8. static int s_send_single (char *key, void *data, void *args);
9.
10. // Routing information for a key-value snapshot
11. typedef struct {
12. void *socket; // ROUTER socket to send to
13. zframe_t *identity; // Identity of peer who requested state
14. char *subtree; // Client subtree specification
15. } kvroute_t;
16.
17. int main (void)
18. {
19. // Prepare our context and sockets
20. zctx_t *ctx = zctx_new ();
21. void *snapshot = zsocket_new (ctx, ZMQ_ROUTER);
22. zsocket_bind (snapshot, ”tcp://*:5556″);
23. void *publisher = zsocket_new (ctx, ZMQ_PUB);
24. zsocket_bind (publisher, ”tcp://*:5557″);
25. void *collector = zsocket_new (ctx, ZMQ_PULL);
26. zsocket_bind (collector, ”tcp://*:5558″);
27.
28. int64_t sequence = 0;
29. zhash_t *kvmap = zhash_new ();
30.
31. zmq_pollitem_t items [] = {
32. { collector, 0, ZMQ_POLLIN, 0 },
33. { snapshot, 0, ZMQ_POLLIN, 0 }
34. };
35. while (!zctx_interrupted) {
36. int rc = zmq_poll (items, 2, 1000 * ZMQ_POLL_MSEC);
37.
38. // Apply state update sent from client
39. if (items [0].revents & ZMQ_POLLIN) {
40. kvmsg_t *kvmsg = kvmsg_recv (collector);
41. if (!kvmsg)
42. break; // Interrupted
43. kvmsg_set_sequence (kvmsg, ++sequence);
44. kvmsg_send (kvmsg, publisher);
45. kvmsg_store (&kvmsg, kvmap);
46. printf (“I: publishing update %5d\n”, (int) sequence);
47. }
48. // Execute state snapshot request
49. if (items [1].revents & ZMQ_POLLIN) {
50. zframe_t *identity = zframe_recv (snapshot);
51. if (!identity)
52. break; // Interrupted
53.
54. // Request is in second frame of message
55. char *request = zstr_recv (snapshot);
56. char *subtree = NULL;
57. if (streq (request, ”ICANHAZ?”)) {
58. free (request);
59. subtree = zstr_recv (snapshot);
60. }
61. else {
62. printf (“E: bad request, aborting\n”);
63. break;
64. }
65. // Send state snapshot to client
66. kvroute_t routing = { snapshot, identity, subtree };
67.
68. // For each entry in kvmap, send kvmsg to client
69. zhash_foreach (kvmap, s_send_single, &routing);
70.
71. // Now send END message with sequence number
72. printf (“I: sending shapshot=%d\n”, (int) sequence);
73. zframe_send (&identity, snapshot, ZFRAME_MORE);
74. kvmsg_t *kvmsg = kvmsg_new (sequence);
75. kvmsg_set_key (kvmsg, ”KTHXBAI”);
76. kvmsg_set_body (kvmsg, (byte *) subtree, 0);
77. kvmsg_send (kvmsg, snapshot);
78. kvmsg_destroy (&kvmsg);
79. free (subtree);
80. }
81. }
82. printf (“ Interrupted\n%d messages handled\n”, (int) sequence);
83. zhash_destroy (&kvmap);
84. zctx_destroy (&ctx);
85.
86. return 0;
87. }
88.
89. // Send one state snapshot key-value pair to a socket
90. // Hash item data is our kvmsg object, ready to send
91. static int
92. s_send_single (char *key, void *data, void *args)
93. {
94. kvroute_t *kvroute = (kvroute_t *) args;
95. kvmsg_t *kvmsg = (kvmsg_t *) data;
96. if (strlen (kvroute->subtree) <= strlen (kvmsg_key (kvmsg))
97. && memcmp (kvroute->subtree,
98. kvmsg_key (kvmsg), strlen (kvroute->subtree)) == 0) {
99. // Send identity of recipient first
100. zframe_send (&kvroute->identity,
101. kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
102. kvmsg_send (kvmsg, kvroute->socket);
103. }
104. return 0;
105. }
客户端:
1. //
2. // Clone client Model Four
3. //
4.
5. // Lets us build this source without creating a library
6. #include ”kvsimple.c”
7.
8. #define SUBTREE ”/client/”
9.
10. int main (void)
11. {
12. // Prepare our context and subscriber
13. zctx_t *ctx = zctx_new ();
14. void *snapshot = zsocket_new (ctx, ZMQ_DEALER);
15. zsocket_connect (snapshot, ”tcp://localhost:5556″);
16. void *subscriber = zsocket_new (ctx, ZMQ_SUB);
17. zsocket_connect (subscriber, ”tcp://localhost:5557″);
18. zsockopt_set_subscribe (subscriber, SUBTREE);
19. void *publisher = zsocket_new (ctx, ZMQ_PUSH);
20. zsocket_connect (publisher, ”tcp://localhost:5558″);
21.
22. zhash_t *kvmap = zhash_new ();
23. srandom ((unsigned) time (NULL));
24.
25. // Get state snapshot
26. int64_t sequence = 0;
27. zstr_sendm (snapshot, ”ICANHAZ?”);
28. zstr_send (snapshot, SUBTREE);
29. while (TRUE) {
30. kvmsg_t *kvmsg = kvmsg_recv (snapshot);
31. if (!kvmsg)
32. break; // Interrupted
33. if (streq (kvmsg_key (kvmsg), ”KTHXBAI”)) {
34. sequence = kvmsg_sequence (kvmsg);
35. printf (“I: received snapshot=%d\n”, (int) sequence);
36. kvmsg_destroy (&kvmsg);
37. break; // Done
38. }
39. kvmsg_store (&kvmsg, kvmap);
40. }
41.
42. int64_t alarm = zclock_time () + 1000;
43. while (!zctx_interrupted) {
44. zmq_pollitem_t items [] = { { subscriber, 0, ZMQ_POLLIN, 0 } };
45. int tickless = (int) ((alarm - zclock_time ()));
46. if (tickless < 0)
47. tickless = 0;
48. int rc = zmq_poll (items, 1, tickless * ZMQ_POLL_MSEC);
49. if (rc == -1)
50. break; // Context has been shut down
51.
52. if (items [0].revents & ZMQ_POLLIN) {
53. kvmsg_t *kvmsg = kvmsg_recv (subscriber);
54. if (!kvmsg)
55. break; // Interrupted
56.
57. // Discard out-of-sequence kvmsgs, incl. heartbeats
58. if (kvmsg_sequence (kvmsg) > sequence) {
59. sequence = kvmsg_sequence (kvmsg);
60. kvmsg_store (&kvmsg, kvmap);
61. printf (“I: received update=%d\n”, (int) sequence);
62. }
63. else
64. kvmsg_destroy (&kvmsg);
65. }
66. // If we timed-out, generate a random kvmsg
67. if (zclock_time () >= alarm) {
68. kvmsg_t *kvmsg = kvmsg_new (0);
69. kvmsg_fmt_key (kvmsg, ”%s%d”, SUBTREE, randof (10000));
70. kvmsg_fmt_body (kvmsg, ”%d”, randof (1000000));
71. kvmsg_send (kvmsg, publisher);
72. kvmsg_destroy (&kvmsg);
73. alarm = zclock_time () + 1000;
74. }
75. }
76. printf (“ Interrupted\n%d messages in\n”, (int) sequence);
77. zhash_destroy (&kvmap);
78. zctx_destroy (&ctx);
79. return 0;
80. }
zeroMQ初体验-33.发布/订阅模式进阶-克隆模式-中
博客分类:
· MQ
临时缓存
现实中,比如DNS服务器,可以算是一个典型案例。临时存储一个节点,如果节点小时,那么该存储也会随之灰飞,所以,”过期”也是一个靠谱的需求。
由于有新的需求提出,我们的key-value库也得做些相应的改动:
1. /* =====================================================================
2. kvmsg - key-value message class for example applications
3.
4. ———————————————————————
5. Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
6. Copyright other contributors as noted in the AUTHORS file.
7.
8. This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
9.
10. This is free software; you can redistribute it and/or modify it under
11. the terms of the GNU Lesser General Public License as published by
12. the Free Software Foundation; either version 3 of the License, or (at
13. your option) any later version.
14.
15. This software is distributed in the hope that it will be useful, but
16. WITHOUT ANY WARRANTY; without even the implied warranty of
17. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18. Lesser General Public License for more details.
19.
20. You should have received a copy of the GNU Lesser General Public
21. License along with this program. If not, see
22. <http://www.gnu.org/licenses/>.
23. =====================================================================
24. */
25.
26. #include ”kvmsg.h”
27. #include <uuid/uuid.h>
28. #include ”zlist.h”
29.
30. // Keys are short strings
31. #define KVMSG_KEY_MAX 255
32.
33. // Message is formatted on wire as 4 frames:
34. // frame 0: key (0MQ string)
35. // frame 1: sequence (8 bytes, network order)
36. // frame 2: uuid (blob, 16 bytes)
37. // frame 3: properties (0MQ string)
38. // frame 4: body (blob)
39. #define FRAME_KEY 0
40. #define FRAME_SEQ 1
41. #define FRAME_UUID 2
42. #define FRAME_PROPS 3
43. #define FRAME_BODY 4
44. #define KVMSG_FRAMES 5
45.
46. // Structure of our class
47. struct _kvmsg {
48. // Presence indicators for each frame
49. int present [KVMSG_FRAMES];
50. // Corresponding 0MQ message frames, if any
51. zmq_msg_t frame [KVMSG_FRAMES];
52. // Key, copied into safe C string
53. char key [KVMSG_KEY_MAX + 1];
54. // List of properties, as name=value strings
55. zlist_t *props;
56. size_t props_size;
57. };
58.
59. // Serialize list of properties to a message frame
60. static void
61. s_encode_props (kvmsg_t *self)
62. {
63. zmq_msg_t *msg = &self->frame [FRAME_PROPS];
64. if (self->present [FRAME_PROPS])
65. zmq_msg_close (msg);
66.
67. zmq_msg_init_size (msg, self->props_size);
68. char *prop = zlist_first (self->props);
69. char *dest = (char *) zmq_msg_data (msg);
70. while (prop) {
71. strcpy (dest, prop);
72. dest += strlen (prop);
73. *dest++ = ’\n’;
74. prop = zlist_next (self->props);
75. }
76. self->present [FRAME_PROPS] = 1;
77. }
78.
79. // Rebuild properties list from message frame
80. static void
81. s_decode_props (kvmsg_t *self)
82. {
83. zmq_msg_t *msg = &self->frame [FRAME_PROPS];
84. self->props_size = 0;
85. while (zlist_size (self->props))
86. free (zlist_pop (self->props));
87.
88. size_t remainder = zmq_msg_size (msg);
89. char *prop = (char *) zmq_msg_data (msg);
90. char *eoln = memchr (prop, ’\n’, remainder);
91. while (eoln) {
92. *eoln = 0;
93. zlist_append (self->props, strdup (prop));
94. self->props_size += strlen (prop) + 1;
95. remainder -= strlen (prop) + 1;
96. prop = eoln + 1;
97. eoln = memchr (prop, ’\n’, remainder);
98. }
99. }
100.
101. // ———————————————————————
102. // Constructor, sets sequence as provided
103.
104. kvmsg_t *
105. kvmsg_new (int64_t sequence)
106. {
107. kvmsg_t
108. *self;
109.
110. self = (kvmsg_t *) zmalloc (sizeof (kvmsg_t));
111. self->props = zlist_new ();
112. kvmsg_set_sequence (self, sequence);
113. return self;
114. }
115.
116. // ———————————————————————
117. // Destructor
118.
119. // Free shim, compatible with zhash_free_fn
120. void
121. kvmsg_free (void *ptr)
122. {
123. if (ptr) {
124. kvmsg_t *self = (kvmsg_t *) ptr;
125. // Destroy message frames if any
126. int frame_nbr;
127. for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++)
128. if (self->present [frame_nbr])
129. zmq_msg_close (&self->frame [frame_nbr]);
130.
131. // Destroy property list
132. while (zlist_size (self->props))
133. free (zlist_pop (self->props));
134. zlist_destroy (&self->props);
135.
136. // Free object itself
137. free (self);
138. }
139. }
140.
141. void
142. kvmsg_destroy (kvmsg_t **self_p)
143. {
144. assert (self_p);
145. if (*self_p) {
146. kvmsg_free (*self_p);
147. *self_p = NULL;
148. }
149. }
150.
151. // ———————————————————————
152. // Create duplicate of kvmsg
153.
154. kvmsg_t *
155. kvmsg_dup (kvmsg_t *self)
156. {
157. kvmsg_t *kvmsg = kvmsg_new (0);
158. int frame_nbr;
159. for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) {
160. if (self->present [frame_nbr]) {
161. zmq_msg_t *src = &self->frame [frame_nbr];
162. zmq_msg_t *dst = &kvmsg->frame [frame_nbr];
163. zmq_msg_init_size (dst, zmq_msg_size (src));
164. memcpy (zmq_msg_data (dst),
165. zmq_msg_data (src), zmq_msg_size (src));
166. kvmsg->present [frame_nbr] = 1;
167. }
168. }
169. kvmsg->props = zlist_copy (self->props);
170. return kvmsg;
171. }
172.
173. // ———————————————————————
174. // Reads key-value message from socket, returns new kvmsg instance.
175.
176. kvmsg_t *
177. kvmsg_recv (void *socket)
178. {
179. assert (socket);
180. kvmsg_t *self = kvmsg_new (0);
181.
182. // Read all frames off the wire, reject if bogus
183. int frame_nbr;
184. for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) {
185. if (self->present [frame_nbr])
186. zmq_msg_close (&self->frame [frame_nbr]);
187. zmq_msg_init (&self->frame [frame_nbr]);
188. self->present [frame_nbr] = 1;
189. if (zmq_recvmsg (socket, &self->frame [frame_nbr], 0) == -1) {
190. kvmsg_destroy (&self);
191. break;
192. }
193. // Verify multipart framing
194. int rcvmore = (frame_nbr < KVMSG_FRAMES - 1)? 1: 0;
195. if (zsockopt_rcvmore (socket) != rcvmore) {
196. kvmsg_destroy (&self);
197. break;
198. }
199. }
200. if (self)
201. s_decode_props (self);
202. return self;
203. }
204.
205. // ———————————————————————
206. // Send key-value message to socket; any empty frames are sent as such.
207.
208. void
209. kvmsg_send (kvmsg_t *self, void *socket)
210. {
211. assert (self);
212. assert (socket);
213.
214. s_encode_props (self);
215. int frame_nbr;
216. for (frame_nbr = 0; frame_nbr < KVMSG_FRAMES; frame_nbr++) {
217. zmq_msg_t copy;
218. zmq_msg_init (©);
219. if (self->present [frame_nbr])
220. zmq_msg_copy (©, &self->frame [frame_nbr]);
221. zmq_sendmsg (socket, ©,
222. (frame_nbr < KVMSG_FRAMES - 1)? ZMQ_SNDMORE: 0);
223. zmq_msg_close (©);
224. }
225. }
226.
227. // ———————————————————————
228. // Return key from last read message, if any, else NULL
229.
230. char *
231. kvmsg_key (kvmsg_t *self)
232. {
233. assert (self);
234. if (self->present [FRAME_KEY]) {
235. if (!*self->key) {
236. size_t size = zmq_msg_size (&self->frame [FRAME_KEY]);
237. if (size > KVMSG_KEY_MAX)
238. size = KVMSG_KEY_MAX;
239. memcpy (self->key,
240. zmq_msg_data (&self->frame [FRAME_KEY]), size);
241. self->key [size] = 0;
242. }
243. return self->key;
244. }
245. else
246. return NULL;
247. }
248.
249. // ———————————————————————
250. // Return sequence nbr from last read message, if any
251.
252. int64_t
253. kvmsg_sequence (kvmsg_t *self)
254. {
255. assert (self);
256. if (self->present [FRAME_SEQ]) {
257. assert (zmq_msg_size (&self->frame [FRAME_SEQ]) == 8);
258. byte *source = zmq_msg_data (&self->frame [FRAME_SEQ]);
259. int64_t sequence = ((int64_t) (source [0]) << 56)
260. + ((int64_t) (source [1]) << 48)
261. + ((int64_t) (source [2]) << 40)
262. + ((int64_t) (source [3]) << 32)
263. + ((int64_t) (source [4]) << 24)
264. + ((int64_t) (source [5]) << 16)
265. + ((int64_t) (source [6]) << 8)
266. + (int64_t) (source [7]);
267. return sequence;
268. }
269. else
270. return 0;
271. }
272.
273. // ———————————————————————
274. // Return UUID from last read message, if any, else NULL
275.
276. byte *
277. kvmsg_uuid (kvmsg_t *self)
278. {
279. assert (self);
280. if (self->present [FRAME_UUID]
281. && zmq_msg_size (&self->frame [FRAME_UUID]) == sizeof (uuid_t))
282. return (byte *) zmq_msg_data (&self->frame [FRAME_UUID]);
283. else
284. return NULL;
285. }
286.
287. // ———————————————————————
288. // Return body from last read message, if any, else NULL
289.
290. byte *
291. kvmsg_body (kvmsg_t *self)
292. {
293. assert (self);
294. if (self->present [FRAME_BODY])
295. return (byte *) zmq_msg_data (&self->frame [FRAME_BODY]);
296. else
297. return NULL;
298. }
299.
300. // ———————————————————————
301. // Return body size from last read message, if any, else zero
302.
303. size_t
304. kvmsg_size (kvmsg_t *self)
305. {
306. assert (self);
307. if (self->present [FRAME_BODY])
308. return zmq_msg_size (&self->frame [FRAME_BODY]);
309. else
310. return 0;
311. }
312.
313. // ———————————————————————
314. // Set message key as provided
315.
316. void
317. kvmsg_set_key (kvmsg_t *self, char *key)
318. {
319. assert (self);
320. zmq_msg_t *msg = &self->frame [FRAME_KEY];
321. if (self->present [FRAME_KEY])
322. zmq_msg_close (msg);
323. zmq_msg_init_size (msg, strlen (key));
324. memcpy (zmq_msg_data (msg), key, strlen (key));
325. self->present [FRAME_KEY] = 1;
326. }
327.
328. // ———————————————————————
329. // Set message sequence number
330.
331. void
332. kvmsg_set_sequence (kvmsg_t *self, int64_t sequence)
333. {
334. assert (self);
335. zmq_msg_t *msg = &self->frame [FRAME_SEQ];
336. if (self->present [FRAME_SEQ])
337. zmq_msg_close (msg);
338. zmq_msg_init_size (msg, 8);
339.
340. byte *source = zmq_msg_data (msg);
341. source [0] = (byte) ((sequence >> 56) & 255);
342. source [1] = (byte) ((sequence >> 48) & 255);
343. source [2] = (byte) ((sequence >> 40) & 255);
344. source [3] = (byte) ((sequence >> 32) & 255);
345. source [4] = (byte) ((sequence >> 24) & 255);
346. source [5] = (byte) ((sequence >> 16) & 255);
347. source [6] = (byte) ((sequence >> 8) & 255);
348. source [7] = (byte) ((sequence) & 255);
349.
350. self->present [FRAME_SEQ] = 1;
351. }
352.
353. // ———————————————————————
354. // Set message UUID to generated value
355.
356. void
357. kvmsg_set_uuid (kvmsg_t *self)
358. {
359. assert (self);
360. zmq_msg_t *msg = &self->frame [FRAME_UUID];
361. uuid_t uuid;
362. uuid_generate (uuid);
363. if (self->present [FRAME_UUID])
364. zmq_msg_close (msg);
365. zmq_msg_init_size (msg, sizeof (uuid));
366. memcpy (zmq_msg_data (msg), uuid, sizeof (uuid));
367. self->present [FRAME_UUID] = 1;
368. }
369.
370. // ———————————————————————
371. // Set message body
372.
373. void
374. kvmsg_set_body (kvmsg_t *self, byte *body, size_t size)
375. {
376. assert (self);
377. zmq_msg_t *msg = &self->frame [FRAME_BODY];
378. if (self->present [FRAME_BODY])
379. zmq_msg_close (msg);
380. self->present [FRAME_BODY] = 1;
381. zmq_msg_init_size (msg, size);
382. memcpy (zmq_msg_data (msg), body, size);
383. }
384.
385. // ———————————————————————
386. // Set message key using printf format
387.
388. void
389. kvmsg_fmt_key (kvmsg_t *self, char *format, …)
390. {
391. char value [KVMSG_KEY_MAX + 1];
392. va_list args;
393.
394. assert (self);
395. va_start (args, format);
396. vsnprintf (value, KVMSG_KEY_MAX, format, args);
397. va_end (args);
398. kvmsg_set_key (self, value);
399. }
400.
401. // ———————————————————————
402. // Set message body using printf format
403.
404. void
405. kvmsg_fmt_body (kvmsg_t *self, char *format, …)
406. {
407. char value [255 + 1];
408. va_list args;
409.
410. assert (self);
411. va_start (args, format);
412. vsnprintf (value, 255, format, args);
413. va_end (args);
414. kvmsg_set_body (self, (byte *) value, strlen (value));
415. }
416.
417. // ———————————————————————
418. // Get message property, if set, else ””
419.
420. char *
421. kvmsg_get_prop (kvmsg_t *self, char *name)
422. {
423. assert (strchr (name, ’=’) == NULL);
424. char *prop = zlist_first (self->props);
425. size_t namelen = strlen (name);
426. while (prop) {
427. if (strlen (prop) > namelen
428. && memcmp (prop, name, namelen) == 0
429. && prop [namelen] == ’=’)
430. return prop + namelen + 1;
431. prop = zlist_next (self->props);
432. }
433. return “”;
434. }
435.
436. // ———————————————————————
437. // Set message property
438. // Names cannot contain ’=’. Max length of value is 255 chars.
439.
440. void
441. kvmsg_set_prop (kvmsg_t *self, char *name, char *format, …)
442. {
443. assert (strchr (name, ’=’) == NULL);
444.
445. char value [255 + 1];
446. va_list args;
447. assert (self);
448. va_start (args, format);
449. vsnprintf (value, 255, format, args);
450. va_end (args);
451.
452. // Allocate name=value string
453. char *prop = malloc (strlen (name) + strlen (value) + 2);
454.
455. // Remove existing property if any
456. sprintf (prop, ”%s=”, name);
457. char *existing = zlist_first (self->props);
458. while (existing) {
459. if (memcmp (prop, existing, strlen (prop)) == 0) {
460. self->props_size -= strlen (existing) + 1;
461. zlist_remove (self->props, existing);
462. free (existing);
463. break;
464. }
465. existing = zlist_next (self->props);
466. }
467. // Add new name=value property string
468. strcat (prop, value);
469. zlist_append (self->props, prop);
470. self->props_size += strlen (prop) + 1;
471. }
472.
473. // ———————————————————————
474. // Store entire kvmsg into hash map, if key/value are set.
475. // Nullifies kvmsg reference, and destroys automatically when no longer
476. // needed. If value is empty, deletes any previous value from store.
477.
478. void
479. kvmsg_store (kvmsg_t **self_p, zhash_t *hash)
480. {
481. assert (self_p);
482. if (*self_p) {
483. kvmsg_t *self = *self_p;
484. assert (self);
485. if (kvmsg_size (self)) {
486. if (self->present [FRAME_KEY]
487. && self->present [FRAME_BODY]) {
488. zhash_update (hash, kvmsg_key (self), self);
489. zhash_freefn (hash, kvmsg_key (self), kvmsg_free);
490. }
491. }
492. else
493. zhash_delete (hash, kvmsg_key (self));
494.
495. *self_p = NULL;
496. }
497. }
498.
499. // ———————————————————————
500. // Dump message to stderr, for debugging and tracing
501.
502. void
503. kvmsg_dump (kvmsg_t *self)
504. {
505. if (self) {
506. if (!self) {
507. fprintf (stderr, ”NULL”);
508. return;
509. }
510. size_t size = kvmsg_size (self);
511. byte *body = kvmsg_body (self);
512. fprintf (stderr, ”[seq:%” PRId64 ”]”, kvmsg_sequence (self));
513. fprintf (stderr, ”[key:%s]”, kvmsg_key (self));
514. fprintf (stderr, ”[size:%zd] ”, size);
515. if (zlist_size (self->props)) {
516. fprintf (stderr, ”[“);
517. char *prop = zlist_first (self->props);
518. while (prop) {
519. fprintf (stderr, ”%s;”, prop);
520. prop = zlist_next (self->props);
521. }
522. fprintf (stderr, ”]”);
523. }
524. int char_nbr;
525. for (char_nbr = 0; char_nbr < size; char_nbr++)
526. fprintf (stderr, ”%02X”, body [char_nbr]);
527. fprintf (stderr, ”\n”);
528. }
529. else
530. fprintf (stderr, ”NULL message\n”);
531. }
532.
533. // ———————————————————————
534. // Runs self test of class
535.
536. int
537. kvmsg_test (int verbose)
538. {
539. kvmsg_t
540. *kvmsg;
541.
542. printf (“ * kvmsg: ”);
543.
544. // Prepare our context and sockets
545. zctx_t *ctx = zctx_new ();
546. void *output = zsocket_new (ctx, ZMQ_DEALER);
547. int rc = zmq_bind (output, ”ipc://kvmsg_selftest.ipc”);
548. assert (rc == 0);
549. void *input = zsocket_new (ctx, ZMQ_DEALER);
550. rc = zmq_connect (input, ”ipc://kvmsg_selftest.ipc”);
551. assert (rc == 0);
552.
553. zhash_t *kvmap = zhash_new ();
554.
555. // Test send and receive of simple message
556. kvmsg = kvmsg_new (1);
557. kvmsg_set_key (kvmsg, ”key”);
558. kvmsg_set_uuid (kvmsg);
559. kvmsg_set_body (kvmsg, (byte *) ”body”, 4);
560. if (verbose)
561. kvmsg_dump (kvmsg);
562. kvmsg_send (kvmsg, output);
563. kvmsg_store (&kvmsg, kvmap);
564.
565. kvmsg = kvmsg_recv (input);
566. if (verbose)
567. kvmsg_dump (kvmsg);
568. assert (streq (kvmsg_key (kvmsg), ”key”));
569. kvmsg_store (&kvmsg, kvmap);
570.
571. // Test send and receive of message with properties
572. kvmsg = kvmsg_new (2);
573. kvmsg_set_prop (kvmsg, ”prop1″, ”value1″);
574. kvmsg_set_prop (kvmsg, ”prop2″, ”value1″);
575. kvmsg_set_prop (kvmsg, ”prop2″, ”value2″);
576. kvmsg_set_key (kvmsg, ”key”);
577. kvmsg_set_uuid (kvmsg);
578. kvmsg_set_body (kvmsg, (byte *) ”body”, 4);
579. assert (streq (kvmsg_get_prop (kvmsg, ”prop2″), ”value2″));
580. if (verbose)
581. kvmsg_dump (kvmsg);
582. kvmsg_send (kvmsg, output);
583. kvmsg_destroy (&kvmsg);
584.
585. kvmsg = kvmsg_recv (input);
586. if (verbose)
587. kvmsg_dump (kvmsg);
588. assert (streq (kvmsg_key (kvmsg), ”key”));
589. assert (streq (kvmsg_get_prop (kvmsg, ”prop2″), ”value2″));
590. kvmsg_destroy (&kvmsg);
591.
592. // Shutdown and destroy all objects
593. zhash_destroy (&kvmap);
594. zctx_destroy (&ctx);
595.
596. printf (“OK\n”);
597. return 0;
598. }
服务器:
1. //
2. // Clone server Model Five
3. //
4.
5. // Lets us build this source without creating a library
6. #include ”kvmsg.c”
7.
8. // zloop reactor handlers
9. static int s_snapshots (zloop_t *loop, void *socket, void *args);
10. static int s_collector (zloop_t *loop, void *socket, void *args);
11. static int s_flush_ttl (zloop_t *loop, void *socket, void *args);
12.
13. // Our server is defined by these properties
14. typedef struct {
15. zctx_t *ctx; // Context wrapper
16. zhash_t *kvmap; // Key-value store
17. zloop_t *loop; // zloop reactor
18. int port; // Main port we’re working on
19. int64_t sequence; // How many updates we’re at
20. void *snapshot; // Handle snapshot requests
21. void *publisher; // Publish updates to clients
22. void *collector; // Collect updates from clients
23. } clonesrv_t;
24.
25. int main (void)
26. {
27. clonesrv_t *self = (clonesrv_t *) zmalloc (sizeof (clonesrv_t));
28.
29. self->port = 5556;
30. self->ctx = zctx_new ();
31. self->kvmap = zhash_new ();
32. self->loop = zloop_new ();
33. zloop_set_verbose (self->loop, FALSE);
34.
35. // Set up our clone server sockets
36. self->snapshot = zsocket_new (self->ctx, ZMQ_ROUTER);
37. self->publisher = zsocket_new (self->ctx, ZMQ_PUB);
38. self->collector = zsocket_new (self->ctx, ZMQ_PULL);
39. zsocket_bind (self->snapshot, ”tcp://*:%d”, self->port);
40. zsocket_bind (self->publisher, ”tcp://*:%d”, self->port + 1);
41. zsocket_bind (self->collector, ”tcp://*:%d”, self->port + 2);
42.
43. // Register our handlers with reactor
44. zloop_reader (self->loop, self->snapshot, s_snapshots, self);
45. zloop_reader (self->loop, self->collector, s_collector, self);
46. zloop_timer (self->loop, 1000, 0, s_flush_ttl, self);
47.
48. // Run reactor until process interrupted
49. zloop_start (self->loop);
50.
51. zloop_destroy (&self->loop);
52. zhash_destroy (&self->kvmap);
53. zctx_destroy (&self->ctx);
54. free (self);
55. return 0;
56. }
57.
58. // ———————————————————————
59. // Send snapshots to clients who ask for them
60.
61. static int s_send_single (char *key, void *data, void *args);
62.
63. // Routing information for a key-value snapshot
64. typedef struct {
65. void *socket; // ROUTER socket to send to
66. zframe_t *identity; // Identity of peer who requested state
67. char *subtree; // Client subtree specification
68. } kvroute_t;
69.
70. static int
71. s_snapshots (zloop_t *loop, void *snapshot, void *args)
72. {
73. clonesrv_t *self = (clonesrv_t *) args;
74.
75. zframe_t *identity = zframe_recv (snapshot);
76. if (identity) {
77. // Request is in second frame of message
78. char *request = zstr_recv (snapshot);
79. char *subtree = NULL;
80. if (streq (request, ”ICANHAZ?”)) {
81. free (request);
82. subtree = zstr_recv (snapshot);
83. }
84. else
85. printf (“E: bad request, aborting\n”);
86.
87. if (subtree) {
88. // Send state socket to client
89. kvroute_t routing = { snapshot, identity, subtree };
90. zhash_foreach (self->kvmap, s_send_single, &routing);
91.
92. // Now send END message with sequence number
93. zclock_log (“I: sending shapshot=%d”, (int) self->sequence);
94. zframe_send (&identity, snapshot, ZFRAME_MORE);
95. kvmsg_t *kvmsg = kvmsg_new (self->sequence);
96. kvmsg_set_key (kvmsg, ”KTHXBAI”);
97. kvmsg_set_body (kvmsg, (byte *) subtree, 0);
98. kvmsg_send (kvmsg, snapshot);
99. kvmsg_destroy (&kvmsg);
100. free (subtree);
101. }
102. }
103. return 0;
104. }
105.
106. // Send one state snapshot key-value pair to a socket
107. // Hash item data is our kvmsg object, ready to send
108. static int
109. s_send_single (char *key, void *data, void *args)
110. {
111. kvroute_t *kvroute = (kvroute_t *) args;
112. kvmsg_t *kvmsg = (kvmsg_t *) data;
113. if (strlen (kvroute->subtree) <= strlen (kvmsg_key (kvmsg))
114. && memcmp (kvroute->subtree,
115. kvmsg_key (kvmsg), strlen (kvroute->subtree)) == 0) {
116. // Send identity of recipient first
117. zframe_send (&kvroute->identity,
118. kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
119. kvmsg_send (kvmsg, kvroute->socket);
120. }
121. return 0;
122. }
123.
124. // ———————————————————————
125. // Collect updates from clients
126.
127. static int
128. s_collector (zloop_t *loop, void *collector, void *args)
129. {
130. clonesrv_t *self = (clonesrv_t *) args;
131.
132. kvmsg_t *kvmsg = kvmsg_recv (collector);
133. if (kvmsg) {
134. kvmsg_set_sequence (kvmsg, ++self->sequence);
135. kvmsg_send (kvmsg, self->publisher);
136. int ttl = atoi (kvmsg_get_prop (kvmsg, ”ttl”));
137. if (ttl)
138. kvmsg_set_prop (kvmsg, ”ttl”,
139. “%” PRId64, zclock_time () + ttl * 1000);
140. kvmsg_store (&kvmsg, self->kvmap);
141. zclock_log (“I: publishing update=%d”, (int) self->sequence);
142. }
143. return 0;
144. }
145.
146. // ———————————————————————
147. // Purge ephemeral values that have expired
148.
149. static int s_flush_single (char *key, void *data, void *args);
150.
151. static int
152. s_flush_ttl (zloop_t *loop, void *unused, void *args)
153. {
154. clonesrv_t *self = (clonesrv_t *) args;
155. zhash_foreach (self->kvmap, s_flush_single, args);
156. return 0;
157. }
158.
159. // If key-value pair has expired, delete it and publish the
160. // fact to listening clients.
161. static int
162. s_flush_single (char *key, void *data, void *args)
163. {
164. clonesrv_t *self = (clonesrv_t *) args;
165.
166. kvmsg_t *kvmsg = (kvmsg_t *) data;
167. int64_t ttl;
168. sscanf (kvmsg_get_prop (kvmsg, ”ttl”), ”%” PRId64, &ttl);
169. if (ttl && zclock_time () >= ttl) {
170. kvmsg_set_sequence (kvmsg, ++self->sequence);
171. kvmsg_set_body (kvmsg, (byte *) ””, 0);
172. kvmsg_send (kvmsg, self->publisher);
173. kvmsg_store (&kvmsg, self->kvmap);
174. zclock_log (“I: publishing delete=%d”, (int) self->sequence);
175. }
176. return 0;
177. }
可靠性
任何东东,一旦脱离的 可靠性,那么终将只是个玩具,虽然它也会带来无尽的麻烦。
之前的”主从模式“似乎不错,再稍微变动下,把“从”也当作客户端之一,接受、存储所有的更新,是不是有点靠谱的感觉。
主从的状态机制模型图:
机制为:客户端发现主服务器不返回了,那么就向从服务器请求,从服务器确认主服务器的确不行了,于是就上马充当服务器。主回归后,从再变为一个客户端。
客户端:
1. //
2. // Clone client Model Six
3. //
4.
5. // Lets us build this source without creating a library
6. #include ”clone.c”
7.
8. #define SUBTREE ”/client/”
9.
10. int main (void)
11. {
12. // Create distributed hash instance
13. clone_t *clone = clone_new ();
14.
15. // Specify configuration
16. clone_subtree (clone, SUBTREE);
17. clone_connect (clone, ”tcp://localhost”, ”5556″);
18. clone_connect (clone, ”tcp://localhost”, ”5566″);
19.
20. // Set random tuples into the distributed hash
21. while (!zctx_interrupted) {
22. // Set random value, check it was stored
23. char key [255];
24. char value [10];
25. sprintf (key, ”%s%d”, SUBTREE, randof (10000));
26. sprintf (value, ”%d”, randof (1000000));
27. clone_set (clone, key, value, randof (30));
28. sleep (1);
29. }
30. clone_destroy (&clone);
31. return 0;
32. }
客户端api:
1. /* =====================================================================
2. clone - client-side Clone Pattern class
3.
4. ———————————————————————
5. Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
6. Copyright other contributors as noted in the AUTHORS file.
7.
8. This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
9.
10. This is free software; you can redistribute it and/or modify it under
11. the terms of the GNU Lesser General Public License as published by
12. the Free Software Foundation; either version 3 of the License, or (at
13. your option) any later version.
14.
15. This software is distributed in the hope that it will be useful, but
16. WITHOUT ANY WARRANTY; without even the implied warranty of
17. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
18. Lesser General Public License for more details.
19.
20. You should have received a copy of the GNU Lesser General Public
21. License along with this program. If not, see
22. <http://www.gnu.org/licenses/>.
23. =====================================================================
24. */
25.
26. #include ”clone.h”
27.
28. // If no server replies within this time, abandon request
29. #define GLOBAL_TIMEOUT 4000 // msecs
30. // Server considered dead if silent for this long
31. #define SERVER_TTL 5000 // msecs
32. // Number of servers we will talk to
33. #define SERVER_MAX 2
34.
35. // =====================================================================
36. // Synchronous part, works in our application thread
37.
38. // ———————————————————————
39. // Structure of our class
40.
41. struct _clone_t {
42. zctx_t *ctx; // Our context wrapper
43. void *pipe; // Pipe through to clone agent
44. };
45.
46. // This is the thread that handles our real clone class
47. static void clone_agent (void *args, zctx_t *ctx, void *pipe);
48.
49. // ———————————————————————
50. // Constructor
51.
52. clone_t *
53. clone_new (void)
54. {
55. clone_t
56. *self;
57.
58. self = (clone_t *) zmalloc (sizeof (clone_t));
59. self->ctx = zctx_new ();
60. self->pipe = zthread_fork (self->ctx, clone_agent, NULL);
61. return self;
62. }
63.
64. // ———————————————————————
65. // Destructor
66.
67. void
68. clone_destroy (clone_t **self_p)
69. {
70. assert (self_p);
71. if (*self_p) {
72. clone_t *self = *self_p;
73. zctx_destroy (&self->ctx);
74. free (self);
75. *self_p = NULL;
76. }
77. }
78.
79. // ———————————————————————
80. // Specify subtree for snapshot and updates, do before connect
81. // Sends [SUBTREE][subtree] to the agent
82.
83. void clone_subtree (clone_t *self, char *subtree)
84. {
85. assert (self);
86. zmsg_t *msg = zmsg_new ();
87. zmsg_addstr (msg, ”SUBTREE”);
88. zmsg_addstr (msg, subtree);
89. zmsg_send (&msg, self->pipe);
90. }
91.
92. // ———————————————————————
93. // Connect to new server endpoint
94. // Sends [CONNECT][endpoint][service] to the agent
95.
96. void
97. clone_connect (clone_t *self, char *address, char *service)
98. {
99. assert (self);
100. zmsg_t *msg = zmsg_new ();
101. zmsg_addstr (msg, ”CONNECT”);
102. zmsg_addstr (msg, address);
103. zmsg_addstr (msg, service);
104. zmsg_send (&msg, self->pipe);
105. }
106.
107. // ———————————————————————
108. // Set new value in distributed hash table
109. // Sends [SET][key][value][ttl] to the agent
110.
111. void
112. clone_set (clone_t *self, char *key, char *value, int ttl)
113. {
114. char ttlstr [10];
115. sprintf (ttlstr, ”%d”, ttl);
116.
117. assert (self);
118. zmsg_t *msg = zmsg_new ();
119. zmsg_addstr (msg, ”SET”);
120. zmsg_addstr (msg, key);
121. zmsg_addstr (msg, value);
122. zmsg_addstr (msg, ttlstr);
123. zmsg_send (&msg, self->pipe);
124. }
125.
126. // ———————————————————————
127. // Lookup value in distributed hash table
128. // Sends [GET][key] to the agent and waits for a value response
129. // If there is no clone available, will eventually return NULL.
130.
131. char *
132. clone_get (clone_t *self, char *key)
133. {
134. assert (self);
135. assert (key);
136. zmsg_t *msg = zmsg_new ();
137. zmsg_addstr (msg, ”GET”);
138. zmsg_addstr (msg, key);
139. zmsg_send (&msg, self->pipe);
140.
141. zmsg_t *reply = zmsg_recv (self->pipe);
142. if (reply) {
143. char *value = zmsg_popstr (reply);
144. zmsg_destroy (&reply);
145. return value;
146. }
147. return NULL;
148. }
149.
150. // =====================================================================
151. // Asynchronous part, works in the background
152.
153. // ———————————————————————
154. // Simple class for one server we talk to
155.
156. typedef struct {
157. char *address; // Server address
158. int port; // Server port
159. void *snapshot; // Snapshot socket
160. void *subscriber; // Incoming updates
161. uint64_t expiry; // When server expires
162. uint requests; // How many snapshot requests made?
163. } server_t;
164.
165. static server_t *
166. server_new (zctx_t *ctx, char *address, int port, char *subtree)
167. {
168. server_t *self = (server_t *) zmalloc (sizeof (server_t));
169.
170. zclock_log (“I: adding server %s:%d…”, address, port);
171. self->address = strdup (address);
172. self->port = port;
173.
174. self->snapshot = zsocket_new (ctx, ZMQ_DEALER);
175. zsocket_connect (self->snapshot, ”%s:%d”, address, port);
176. self->subscriber = zsocket_new (ctx, ZMQ_SUB);
177. zsocket_connect (self->subscriber, ”%s:%d”, address, port + 1);
178. zsockopt_set_subscribe (self->subscriber, subtree);
179. return self;
180. }
181.
182. static void
183. server_destroy (server_t **self_p)
184. {
185. assert (self_p);
186. if (*self_p) {
187. server_t *self = *self_p;
188. free (self->address);
189. free (self);
190. *self_p = NULL;
191. }
192. }
193.
194. // ———————————————————————
195. // Our agent class
196.
197. // States we can be in
198. #define STATE_INITIAL 0 // Before asking server for state
199. #define STATE_SYNCING 1 // Getting state from server
200. #define STATE_ACTIVE 2 // Getting new updates from server
201.
202. typedef struct {
203. zctx_t *ctx; // Context wrapper
204. void *pipe; // Pipe back to application
205. zhash_t *kvmap; // Actual key/value table
206. char *subtree; // Subtree specification, if any
207. server_t *server [SERVER_MAX];
208. uint nbr_servers; // 0 to SERVER_MAX
209. uint state; // Current state
210. uint cur_server; // If active, server 0 or 1
211. int64_t sequence; // Last kvmsg processed
212. void *publisher; // Outgoing updates
213. } agent_t;
214.
215. static agent_t *
216. agent_new (zctx_t *ctx, void *pipe)
217. {
218. agent_t *self = (agent_t *) zmalloc (sizeof (agent_t));
219. self->ctx = ctx;
220. self->pipe = pipe;
221. self->kvmap = zhash_new ();
222. self->subtree = strdup (“”);
223. self->state = STATE_INITIAL;
224. self->publisher = zsocket_new (self->ctx, ZMQ_PUB);
225. return self;
226. }
227.
228. static void
229. agent_destroy (agent_t **self_p)
230. {
231. assert (self_p);
232. if (*self_p) {
233. agent_t *self = *self_p;
234. int server_nbr;
235. for (server_nbr = 0; server_nbr < self->nbr_servers; server_nbr++)
236. server_destroy (&self->server [server_nbr]);
237. zhash_destroy (&self->kvmap);
238. free (self->subtree);
239. free (self);
240. *self_p = NULL;
241. }
242. }
243.
244. // Returns -1 if thread was interrupted
245. static int
246. agent_control_message (agent_t *self)
247. {
248. zmsg_t *msg = zmsg_recv (self->pipe);
249. char *command = zmsg_popstr (msg);
250. if (command == NULL)
251. return -1;
252.
253. if (streq (command, ”SUBTREE”)) {
254. free (self->subtree);
255. self->subtree = zmsg_popstr (msg);
256. }
257. else
258. if (streq (command, ”CONNECT”)) {
259. char *address = zmsg_popstr (msg);
260. char *service = zmsg_popstr (msg);
261. if (self->nbr_servers < SERVER_MAX) {
262. self->server [self->nbr_servers++] = server_new (
263. self->ctx, address, atoi (service), self->subtree);
264. // We broadcast updates to all known servers
265. zsocket_connect (self->publisher, ”%s:%d”,
266. address, atoi (service) + 2);
267. }
268. else
269. zclock_log (“E: too many servers (max. %d)”, SERVER_MAX);
270. free (address);
271. free (service);
272. }
273. else
274. if (streq (command, ”SET”)) {
275. char *key = zmsg_popstr (msg);
276. char *value = zmsg_popstr (msg);
277. char *ttl = zmsg_popstr (msg);
278. zhash_update (self->kvmap, key, (byte *) value);
279. zhash_freefn (self->kvmap, key, free);
280.
281. // Send key-value pair on to server
282. kvmsg_t *kvmsg = kvmsg_new (0);
283. kvmsg_set_key (kvmsg, key);
284. kvmsg_set_uuid (kvmsg);
285. kvmsg_fmt_body (kvmsg, ”%s”, value);
286. kvmsg_set_prop (kvmsg, ”ttl”, ttl);
287. kvmsg_send (kvmsg, self->publisher);
288. kvmsg_destroy (&kvmsg);
289. puts (key);
290. free (ttl);
291. free (key); // Value is owned by hash table
292. }
293. else
294. if (streq (command, ”GET”)) {
295. char *key = zmsg_popstr (msg);
296. char *value = zhash_lookup (self->kvmap, key);
297. if (value)
298. zstr_send (self->pipe, value);
299. else
300. zstr_send (self->pipe, ””);
301. free (key);
302. free (value);
303. }
304. free (command);
305. zmsg_destroy (&msg);
306. return 0;
307. }
308.
309. // ———————————————————————
310. // Asynchronous agent manages server pool and handles request/reply
311. // dialog when the application asks for it.
312.
313. static void
314. clone_agent (void *args, zctx_t *ctx, void *pipe)
315. {
316. agent_t *self = agent_new (ctx, pipe);
317.
318. while (TRUE) {
319. zmq_pollitem_t poll_set [] = {
320. { pipe, 0, ZMQ_POLLIN, 0 },
321. { 0, 0, ZMQ_POLLIN, 0 }
322. };
323. int poll_timer = -1;
324. int poll_size = 2;
325. server_t *server = self->server [self->cur_server];
326. switch (self->state) {
327. case STATE_INITIAL:
328. // In this state we ask the server for a snapshot,
329. // if we have a server to talk to…
330. if (self->nbr_servers > 0) {
331. zclock_log (“I: waiting for server at %s:%d…”,
332. server->address, server->port);
333. if (server->requests < 2) {
334. zstr_sendm (server->snapshot, ”ICANHAZ?”);
335. zstr_send (server->snapshot, self->subtree);
336. server->requests++;
337. }
338. server->expiry = zclock_time () + SERVER_TTL;
339. self->state = STATE_SYNCING;
340. poll_set [1].socket = server->snapshot;
341. }
342. else
343. poll_size = 1;
344. break;
345. case STATE_SYNCING:
346. // In this state we read from snapshot and we expect
347. // the server to respond, else we fail over.
348. poll_set [1].socket = server->snapshot;
349. break;
350. case STATE_ACTIVE:
351. // In this state we read from subscriber and we expect
352. // the server to give hugz, else we fail over.
353. poll_set [1].socket = server->subscriber;
354. break;
355. }
356. if (server) {
357. poll_timer = (server->expiry - zclock_time ())
358. * ZMQ_POLL_MSEC;
359. if (poll_timer < 0)
360. poll_timer = 0;
361. }
362. // ————————————————————
363. // Poll loop
364. int rc = zmq_poll (poll_set, poll_size, poll_timer);
365. if (rc == -1)
366. break; // Context has been shut down
367.
368. if (poll_set [0].revents & ZMQ_POLLIN) {
369. if (agent_control_message (self))
370. break; // Interrupted
371. }
372. else
373. if (poll_set [1].revents & ZMQ_POLLIN) {
374. kvmsg_t *kvmsg = kvmsg_recv (poll_set [1].socket);
375. if (!kvmsg)
376. break; // Interrupted
377.
378. // Anything from server resets its expiry time
379. server->expiry = zclock_time () + SERVER_TTL;
380. if (self->state == STATE_SYNCING) {
381. // Store in snapshot until we’re finished
382. server->requests = 0;
383. if (streq (kvmsg_key (kvmsg), ”KTHXBAI”)) {
384. self->sequence = kvmsg_sequence (kvmsg);
385. self->state = STATE_ACTIVE;
386. zclock_log (“I: received from %s:%d snapshot=%d”,
387. server->address, server->port,
388. (int) self->sequence);
389. kvmsg_destroy (&kvmsg);
390. }
391. else
392. kvmsg_store (&kvmsg, self->kvmap);
393. }
394. else
395. if (self->state == STATE_ACTIVE) {
396. // Discard out-of-sequence updates, incl. hugz
397. if (kvmsg_sequence (kvmsg) > self->sequence) {
398. self->sequence = kvmsg_sequence (kvmsg);
399. kvmsg_store (&kvmsg, self->kvmap);
400. zclock_log (“I: received from %s:%d update=%d”,
401. server->address, server->port,
402. (int) self->sequence);
403. }
404. else
405. kvmsg_destroy (&kvmsg);
406. }
407. }
408. else {
409. // Server has died, failover to next
410. zclock_log (“I: server at %s:%d didn’t give hugz”,
411. server->address, server->port);
412. self->cur_server = (self->cur_server + 1) % self->nbr_servers;
413. self->state = STATE_INITIAL;
414. }
415. }
416. agent_destroy (&self);
417. }
zeroMQ初体验-34.发布/订阅模式进阶-克隆模式-下,结言
博客分类:
· MQ
服务器:
1. //
2. // Clone server Model Six
3. //
4.
5. // Lets us build this source without creating a library
6. #include ”bstar.c”
7. #include ”kvmsg.c”
8.
9. // Bstar reactor handlers
10. static int s_snapshots (zloop_t *loop, void *socket, void *args);
11. static int s_collector (zloop_t *loop, void *socket, void *args);
12. static int s_flush_ttl (zloop_t *loop, void *socket, void *args);
13. static int s_send_hugz (zloop_t *loop, void *socket, void *args);
14. static int s_new_master (zloop_t *loop, void *unused, void *args);
15. static int s_new_slave (zloop_t *loop, void *unused, void *args);
16. static int s_subscriber (zloop_t *loop, void *socket, void *args);
17.
18. // Our server is defined by these properties
19. typedef struct {
20. zctx_t *ctx; // Context wrapper
21. zhash_t *kvmap; // Key-value store
22. bstar_t *bstar; // Bstar reactor core
23. int64_t sequence; // How many updates we’re at
24. int port; // Main port we’re working on
25. int peer; // Main port of our peer
26. void *publisher; // Publish updates and hugz
27. void *collector; // Collect updates from clients
28. void *subscriber; // Get updates from peer
29. zlist_t *pending; // Pending updates from clients
30. Bool primary; // TRUE if we’re primary
31. Bool master; // TRUE if we’re master
32. Bool slave; // TRUE if we’re slave
33. } clonesrv_t;
34.
35. int main (int argc, char *argv [])
36. {
37. clonesrv_t *self = (clonesrv_t *) zmalloc (sizeof (clonesrv_t));
38. if (argc == 2 && streq (argv [1], ”-p”)) {
39. zclock_log (“I: primary master, waiting for backup (slave)”);
40. self->bstar = bstar_new (BSTAR_PRIMARY, ”tcp://*:5003″,
41. “tcp://localhost:5004”);
42. bstar_voter (self->bstar, ”tcp://*:5556″, ZMQ_ROUTER,
43. s_snapshots, self);
44. self->port = 5556;
45. self->peer = 5566;
46. self->primary = TRUE;
47. }
48. else
49. if (argc == 2 && streq (argv [1], ”-b”)) {
50. zclock_log (“I: backup slave, waiting for primary (master)”);
51. self->bstar = bstar_new (BSTAR_BACKUP, ”tcp://*:5004″,
52. “tcp://localhost:5003”);
53. bstar_voter (self->bstar, ”tcp://*:5566″, ZMQ_ROUTER,
54. s_snapshots, self);
55. self->port = 5566;
56. self->peer = 5556;
57. self->primary = FALSE;
58. }
59. else {
60. printf (“Usage: clonesrv4 { -p | -b }\n”);
61. free (self);
62. exit (0);
63. }
64. // Primary server will become first master
65. if (self->primary)
66. self->kvmap = zhash_new ();
67.
68. self->ctx = zctx_new ();
69. self->pending = zlist_new ();
70. bstar_set_verbose (self->bstar, TRUE);
71.
72. // Set up our clone server sockets
73. self->publisher = zsocket_new (self->ctx, ZMQ_PUB);
74. self->collector = zsocket_new (self->ctx, ZMQ_SUB);
75. zsocket_bind (self->publisher, ”tcp://*:%d”, self->port + 1);
76. zsocket_bind (self->collector, ”tcp://*:%d”, self->port + 2);
77.
78. // Set up our own clone client interface to peer
79. self->subscriber = zsocket_new (self->ctx, ZMQ_SUB);
80. zsocket_connect (self->subscriber, ”tcp://localhost:%d”, self->peer + 1);
81.
82. // Register state change handlers
83. bstar_new_master (self->bstar, s_new_master, self);
84. bstar_new_slave (self->bstar, s_new_slave, self);
85.
86. // Register our other handlers with the bstar reactor
87. zloop_reader (bstar_zloop (self->bstar), self->collector, s_collector, self);
88. zloop_timer (bstar_zloop (self->bstar), 1000, 0, s_flush_ttl, self);
89. zloop_timer (bstar_zloop (self->bstar), 1000, 0, s_send_hugz, self);
90.
91. // Start the Bstar reactor
92. bstar_start (self->bstar);
93.
94. // Interrupted, so shut down
95. while (zlist_size (self->pending)) {
96. kvmsg_t *kvmsg = (kvmsg_t *) zlist_pop (self->pending);
97. kvmsg_destroy (&kvmsg);
98. }
99. zlist_destroy (&self->pending);
100. bstar_destroy (&self->bstar);
101. zhash_destroy (&self->kvmap);
102. zctx_destroy (&self->ctx);
103. free (self);
104.
105. return 0;
106. }
107.
108. // ———————————————————————
109. // Send snapshots to clients who ask for them
110.
111. static int s_send_single (char *key, void *data, void *args);
112.
113. // Routing information for a key-value snapshot
114. typedef struct {
115. void *socket; // ROUTER socket to send to
116. zframe_t *identity; // Identity of peer who requested state
117. char *subtree; // Client subtree specification
118. } kvroute_t;
119.
120. static int
121. s_snapshots (zloop_t *loop, void *snapshot, void *args)
122. {
123. clonesrv_t *self = (clonesrv_t *) args;
124.
125. zframe_t *identity = zframe_recv (snapshot);
126. if (identity) {
127. // Request is in second frame of message
128. char *request = zstr_recv (snapshot);
129. char *subtree = NULL;
130. if (streq (request, ”ICANHAZ?”)) {
131. free (request);
132. subtree = zstr_recv (snapshot);
133. }
134. else
135. printf (“E: bad request, aborting\n”);
136.
137. if (subtree) {
138. // Send state socket to client
139. kvroute_t routing = { snapshot, identity, subtree };
140. zhash_foreach (self->kvmap, s_send_single, &routing);
141.
142. // Now send END message with sequence number
143. zclock_log (“I: sending shapshot=%d”, (int) self->sequence);
144. zframe_send (&identity, snapshot, ZFRAME_MORE);
145. kvmsg_t *kvmsg = kvmsg_new (self->sequence);
146. kvmsg_set_key (kvmsg, ”KTHXBAI”);
147. kvmsg_set_body (kvmsg, (byte *) subtree, 0);
148. kvmsg_send (kvmsg, snapshot);
149. kvmsg_destroy (&kvmsg);
150. free (subtree);
151. }
152. }
153. return 0;
154. }
155.
156. // Send one state snapshot key-value pair to a socket
157. // Hash item data is our kvmsg object, ready to send
158. static int
159. s_send_single (char *key, void *data, void *args)
160. {
161. kvroute_t *kvroute = (kvroute_t *) args;
162. kvmsg_t *kvmsg = (kvmsg_t *) data;
163. if (strlen (kvroute->subtree) <= strlen (kvmsg_key (kvmsg))
164. && memcmp (kvroute->subtree,
165. kvmsg_key (kvmsg), strlen (kvroute->subtree)) == 0) {
166. // Send identity of recipient first
167. zframe_send (&kvroute->identity,
168. kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
169. kvmsg_send (kvmsg, kvroute->socket);
170. }
171. return 0;
172. }
173.
174. // ———————————————————————
175. // Collect updates from clients
176. // If we’re master, we apply these to the kvmap
177. // If we’re slave, or unsure, we queue them on our pending list
178.
179. static int s_was_pending (clonesrv_t *self, kvmsg_t *kvmsg);
180.
181. static int
182. s_collector (zloop_t *loop, void *collector, void *args)
183. {
184. clonesrv_t *self = (clonesrv_t *) args;
185.
186. kvmsg_t *kvmsg = kvmsg_recv (collector);
187. kvmsg_dump (kvmsg);
188. if (kvmsg) {
189. if (self->master) {
190. kvmsg_set_sequence (kvmsg, ++self->sequence);
191. kvmsg_send (kvmsg, self->publisher);
192. int ttl = atoi (kvmsg_get_prop (kvmsg, ”ttl”));
193. if (ttl)
194. kvmsg_set_prop (kvmsg, ”ttl”,
195. “%” PRId64, zclock_time () + ttl * 1000);
196. kvmsg_store (&kvmsg, self->kvmap);
197. zclock_log (“I: publishing update=%d”, (int) self->sequence);
198. }
199. else {
200. // If we already got message from master, drop it, else
201. // hold on pending list
202. if (s_was_pending (self, kvmsg))
203. kvmsg_destroy (&kvmsg);
204. else
205. zlist_append (self->pending, kvmsg);
206. }
207. }
208. return 0;
209. }
210.
211. // If message was already on pending list, remove it and
212. // return TRUE, else return FALSE.
213.
214. static int
215. s_was_pending (clonesrv_t *self, kvmsg_t *kvmsg)
216. {
217. kvmsg_t *held = (kvmsg_t *) zlist_first (self->pending);
218. while (held) {
219. if (memcmp (kvmsg_uuid (kvmsg),
220. kvmsg_uuid (held), sizeof (uuid_t)) == 0) {
221. zlist_remove (self->pending, held);
222. return TRUE;
223. }
224. held = (kvmsg_t *) zlist_next (self->pending);
225. }
226. return FALSE;
227. }
228.
229. // ———————————————————————
230. // Purge ephemeral values that have expired
231.
232. static int s_flush_single (char *key, void *data, void *args);
233.
234. static int
235. s_flush_ttl (zloop_t *loop, void *unused, void *args)
236. {
237. clonesrv_t *self = (clonesrv_t *) args;
238. zhash_foreach (self->kvmap, s_flush_single, args);
239. return 0;
240. }
241.
242. // If key-value pair has expired, delete it and publish the
243. // fact to listening clients.
244. static int
245. s_flush_single (char *key, void *data, void *args)
246. {
247. clonesrv_t *self = (clonesrv_t *) args;
248.
249. kvmsg_t *kvmsg = (kvmsg_t *) data;
250. int64_t ttl;
251. sscanf (kvmsg_get_prop (kvmsg, ”ttl”), ”%” PRId64, &ttl);
252. if (ttl && zclock_time () >= ttl) {
253. kvmsg_set_sequence (kvmsg, ++self->sequence);
254. kvmsg_set_body (kvmsg, (byte *) ””, 0);
255. kvmsg_send (kvmsg, self->publisher);
256. kvmsg_store (&kvmsg, self->kvmap);
257. zclock_log (“I: publishing delete=%d”, (int) self->sequence);
258. }
259. return 0;
260. }
261.
262. // ———————————————————————
263. // Send hugz to anyone listening on the publisher socket
264.
265. static int
266. s_send_hugz (zloop_t *loop, void *unused, void *args)
267. {
268. clonesrv_t *self = (clonesrv_t *) args;
269.
270. kvmsg_t *kvmsg = kvmsg_new (self->sequence);
271. kvmsg_set_key (kvmsg, ”HUGZ”);
272. kvmsg_set_body (kvmsg, (byte *) ””, 0);
273. kvmsg_send (kvmsg, self->publisher);
274. kvmsg_destroy (&kvmsg);
275.
276. return 0;
277. }
278.
279. // ———————————————————————
280. // State change handlers
281. // We’re becoming master
282. //
283. // The backup server applies its pending list to its own hash table,
284. // and then starts to process state snapshot requests.
285.
286. static int
287. s_new_master (zloop_t *loop, void *unused, void *args)
288. {
289. clonesrv_t *self = (clonesrv_t *) args;
290.
291. self->master = TRUE;
292. self->slave = FALSE;
293. zloop_cancel (bstar_zloop (self->bstar), self->subscriber);
294.
295. // Apply pending list to own hash table
296. while (zlist_size (self->pending)) {
297. kvmsg_t *kvmsg = (kvmsg_t *) zlist_pop (self->pending);
298. kvmsg_set_sequence (kvmsg, ++self->sequence);
299. kvmsg_send (kvmsg, self->publisher);
300. kvmsg_store (&kvmsg, self->kvmap);
301. zclock_log (“I: publishing pending=%d”, (int) self->sequence);
302. }
303. return 0;
304. }
305.
306. // ———————————————————————
307. // We’re becoming slave
308.
309. static int
310. s_new_slave (zloop_t *loop, void *unused, void *args)
311. {
312. clonesrv_t *self = (clonesrv_t *) args;
313.
314. zhash_destroy (&self->kvmap);
315. self->master = FALSE;
316. self->slave = TRUE;
317. zloop_reader (bstar_zloop (self->bstar), self->subscriber,
318. s_subscriber, self);
319.
320. return 0;
321. }
322.
323. // ———————————————————————
324. // Collect updates from peer (master)
325. // We’re always slave when we get these updates
326.
327. static int
328. s_subscriber (zloop_t *loop, void *subscriber, void *args)
329. {
330. clonesrv_t *self = (clonesrv_t *) args;
331. // Get state snapshot if necessary
332. if (self->kvmap == NULL) {
333. self->kvmap = zhash_new ();
334. void *snapshot = zsocket_new (self->ctx, ZMQ_DEALER);
335. zsocket_connect (snapshot, ”tcp://localhost:%d”, self->peer);
336. zclock_log (“I: asking for snapshot from: tcp://localhost:%d”,
337. self->peer);
338. zstr_send (snapshot, ”ICANHAZ?”);
339. while (TRUE) {
340. kvmsg_t *kvmsg = kvmsg_recv (snapshot);
341. if (!kvmsg)
342. break; // Interrupted
343. if (streq (kvmsg_key (kvmsg), ”KTHXBAI”)) {
344. self->sequence = kvmsg_sequence (kvmsg);
345. kvmsg_destroy (&kvmsg);
346. break; // Done
347. }
348. kvmsg_store (&kvmsg, self->kvmap);
349. }
350. zclock_log (“I: received snapshot=%d”, (int) self->sequence);
351. zsocket_destroy (self->ctx, snapshot);
352. }
353. // Find and remove update off pending list
354. kvmsg_t *kvmsg = kvmsg_recv (subscriber);
355. if (!kvmsg)
356. return 0;
357.
358. if (strneq (kvmsg_key (kvmsg), ”HUGZ”)) {
359. if (!s_was_pending (self, kvmsg)) {
360. // If master update came before client update, flip it
361. // around, store master update (with sequence) on pending
362. // list and use to clear client update when it comes later
363. zlist_append (self->pending, kvmsg_dup (kvmsg));
364. }
365. // If update is more recent than our kvmap, apply it
366. if (kvmsg_sequence (kvmsg) > self->sequence) {
367. self->sequence = kvmsg_sequence (kvmsg);
368. kvmsg_store (&kvmsg, self->kvmap);
369. zclock_log (“I: received update=%d”, (int) self->sequence);
370. }
371. else
372. kvmsg_destroy (&kvmsg);
373. }
374. else
375. kvmsg_destroy (&kvmsg);
376.
377. return 0;
378. }
代码不短,不过作者的牢骚更长。(貌似花了一周的时间)
当然作为一个靠谱的模型,总会制定一些规范给某些不太靠谱的人:http://rfc.zeromq.org/spec:12
至此,整个教程算是告一段落了。(之所以这最后一个模型分了三段,着实是代码多了些)
教程结束了,学习才刚开始。至于会不会再有后续,诚如guide结尾:
More coming soon…
结言:
虽然知道翻译技术文章有难度,但着实还是吓着了,在写第一章的时候就打了退堂鼓。终究在自我安慰、勉励下完成了这个系列的笔记(退一步)。好吧,我承认代码、图示占了大比例,不过,好歹算是有始有终的完成了。
原计划一周时间结束的,由于诸多原因(磨蹭,消极,退堂鼓)前后竟然跨了两个多月,总算咬牙坚持了下来,其实不敢说学到了很多,自从中部python的代码不再时,几乎就没有再自己验证代码的可行和逻辑了。写本系列,更多的是自个儿跟自个儿过不去(俺就不信写不完了!)折腾到最后,多少也是有些收获的(谁折腾谁知道~)
回首看看,也就这样了,倒是有些”天凉好个秋”的意味。
也罢,哦了~
- 本文固定链接: http://www.wy182000.com/2013/04/23/zeromq/
- 转载请注明: wy182000 于 Studio 发表