I´ve been working on integrating zeromq with libevent in order to implement some kind of high performance proxy server. The server is built as a mxn
system, where we maintain m
streamers that all respond to requests in the the same port (in this example, port 12000) and then forward requests to n
downloaders
that do the real content download. The downloader
then responds back to the streamer that the download has ended and that the content is ready.
The main architecture can be seen in the following diagram:
The core features of this example are:
- First, all the streamers and downloaders are different processes. This provides some interesting advantages, like respawning dead processes when they die (even when a hard crash occurs), as well as the improved response times thanks to all streamers using the same port. All the processes are started from a
fork()
of the main program, and then we start independent libevent dispatchers on each one. We could also use regular threads, but then we would lose this crash resiliance… - Second, the message queue for download requests is a zeromq
queue
device (see this). This device provides a fair-queueing mechanism on the frontend, and pass the requests to the backend in a load-balancing mode that distributes requests to the downloaders. The downloaders then copy the original request identity parts in the response in order to build the return path. This gives the queue device a hint on how to route the response back to the right streamer that started the request.
The following code implements this model:
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/time.h>
#include <event.h>
#include <errno.h>
#include <err.h>
#include <signal.h>
#include <map>
#include <list>
#include <vector>
#include <string>
#include <zmq.hpp>
////////////////////////////////////////////////////////////////
//#define STREAMERS_EP "tcp://127.0.0.1:54325"
//#define DOWNLOADERS_EP "tcp://127.0.0.1:54326"
#define STREAMERS_EP "ipc://streamers"
#define DOWNLOADERS_EP "ipc://downloaders"
static const unsigned int kStreamerPort = 12000;
static const unsigned int kMaxStreamerConns = 1024;
static const unsigned int kMaxStreamers = 2;
static const unsigned int kMaxDownloaders = 2;
static const unsigned int kTimeout = 1;
void create_streamer (const int lsd);
void create_downloader (const int lsd);
void accept_client (const int lsd, short event, void *ev);
void handle_client (const int lsd, short event, void *ev);
void handle_responses (const int lsd, short event, void *ev);
void accept_job (const int lsd, short event, void *ev_v);
/**
* A client of the service
*/
typedef struct
{
unsigned int fd;
struct event ev;
} client_t;
/**
* A streamer
*/
typedef struct streamer
{
pid_t pid;
int listen_fd;
struct event listen_ev;
// zeromq
zmq::context_t * context;
zmq::socket_t * sock;
struct event sock_ev;
unsigned int msg_count;
// the clients
std::map< unsigned int, client_t * > clients;
}
streamer_t;
typedef std::list< streamer_t * > streamers_lst_t;
static streamers_lst_t streamers;
/**
* A downloader
*/
typedef struct downloader
{
pid_t pid;
// zeromq
zmq::context_t * context;
zmq::socket_t * sock;
struct event sock_ev;
}
downloader_t;
typedef std::list< downloader_t * > downloaders_lst_t;
static downloaders_lst_t downloaders;
// children dead handler: it will remove the children from the lists and remove all the events...
static void child_exit(int sig, siginfo_t * info, void * context) {
(void) fprintf(stderr, "Child %d exited due to signal %d\n", info->si_pid, info->si_signo);
for (streamers_lst_t::iterator it = streamers.begin(); it != streamers.end(); it++)
{
streamer_t * str = *it;
if ((*it)->pid == info->si_pid)
{
fprintf(stderr, "... it was a streamer\n");
event_del(&str->sock_ev);
event_del(&str->listen_ev);
delete (*it);
streamers.erase(it);
break;
}
}
for (downloaders_lst_t::iterator it = downloaders.begin(); it != downloaders.end(); it++)
{
downloader_t * down = *it;
if (down->pid == info->si_pid)
{
fprintf(stderr, "... it was a downloader\n");
event_del(&down->sock_ev);
delete (*it);
downloaders.erase(it);
break;
}
}
}
int main(void) {
int rc;
int lsd, reuse = 1;
struct sockaddr_in sa;
pid_t pid;
bzero(&sa, sizeof(sa));
if ((lsd = socket(AF_INET, SOCK_STREAM, 0)) < 0)
errx(1, "socket: %s (%d)\n", strerror(errno), errno);
if (setsockopt(lsd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)) < 0)
errx(2, "setsockopt: %s (%d)\n", strerror(errno), errno);
sa.sin_family = AF_INET;
sa.sin_port = htons(kStreamerPort);
sa.sin_addr.s_addr = htonl(INADDR_ANY);
// create the signal handlers, for children deads...
struct sigaction saction;
memset(&saction, 0, sizeof(struct sigaction));
saction.sa_sigaction = child_exit;
saction.sa_flags = SA_SIGINFO | SA_NOCLDSTOP | SA_NOCLDWAIT;
sigaction(SIGCHLD, &saction, NULL);
if (bind(lsd, (struct sockaddr *) &sa, sizeof(sa)) < 0)
errx(3, "bind: %s (%d)\n", strerror(errno), errno);
if (listen(lsd, kMaxStreamerConns) < 0)
errx(4, "listen: %s (%d)\n", strerror(errno), errno);
fprintf(stderr, "[0] Master: listening on port %d\n",
(int) kStreamerPort);
fprintf(stderr, "[0] Master: creating zeromq environment\n");
if ((pid = fork()) == 0)
{
uint64_t s;
size_t slen = sizeof(s);
fprintf(stderr, "[%d] Forwarder created\n", getpid());
zmq::context_t context(3);
zmq::socket_t frontend(context, ZMQ_XREP);
zmq::socket_t backend(context, ZMQ_XREQ);
s = 10000;
frontend.setsockopt(ZMQ_HWM, &s, slen);
backend.setsockopt(ZMQ_HWM, &s, slen);
frontend.bind(STREAMERS_EP);
backend.bind(DOWNLOADERS_EP);
zmq::device (ZMQ_QUEUE, frontend, backend);
// We never get here...
return 0;
}
else
{
sleep(1);
}
fprintf(stderr, "[0] Master: launching processes...\n");
for (;;) {
if (streamers.size() < kMaxStreamers) {
if ((pid = fork()) == 0) {
fprintf(stderr, "[%d] Streamer created\n", getpid());
create_streamer(lsd);
} else if (pid > 0) {
struct streamer *str = new struct streamer;
str->pid = pid;
str->listen_fd = lsd;
streamers.push_back(str);
fprintf(stderr, "[0] Master: %d streamers\n",
(int) streamers.size());
} else
errx(6, "fork: %s(%d)\n", strerror(errno), errno);
}
if (downloaders.size() < kMaxDownloaders) {
if ((pid = fork()) == 0) {
fprintf(stderr, "[%d] Downloader created\n", getpid());
create_downloader(lsd);
} else if (pid > 0) {
struct downloader *down = new struct downloader;
down->pid = pid;
downloaders.push_back(down);
fprintf(stderr, "[0] Master: %d downloaders\n",
(int) downloaders.size());
} else
errx(6, "fork: %s(%d)\n", strerror(errno), errno);
}
sleep(kTimeout);
}
return 0;
}
/********************************************************
* Streamers
*******************************************************/
void create_streamer (const int lsd)
{
streamer_t streamer;
int rc;
streamer.listen_fd = lsd;
streamer.pid = getpid();
streamer.msg_count = 0;
assert(streamer.listen_fd > 0);
event_init();
// create the listener for listening for client's connections
bzero(&streamer.listen_ev, sizeof(streamer.listen_ev));
event_set(&streamer.listen_ev, streamer.listen_fd, EV_READ | EV_PERSIST, accept_client, &streamer);
if (event_add(&streamer.listen_ev, NULL) < 0)
errx(5, "event_add: %s (%d)\n", strerror(errno), errno);
// create the zeromq stuff
streamer.context = new zmq::context_t(1);
streamer.sock = new zmq::socket_t(*streamer.context, ZMQ_XREQ);
assert(streamer.sock != NULL);
// rc = zmq_setsockopt (streamer.sock, ZMQ_IDENTITY, "STREAMER", sizeof("STREAMER") - 1);
// assert(rc == 0);
// set some socket parameters
uint64_t s;
size_t slen = sizeof(s);
s = 10000;
streamer.sock->setsockopt(ZMQ_HWM, &s, slen);
fprintf(stderr, "[%d] Streamer: connecting to %s\n", streamer.pid, STREAMERS_EP);
streamer.sock->connect(STREAMERS_EP);
// get the fd used by the zsocket
int fd = (-1);
size_t fd_size = sizeof(fd);
streamer.sock->getsockopt(ZMQ_FD, &fd, &fd_size);
assert(fd > 0);
// ... and register the fd with libevent
bzero(&streamer.sock_ev, sizeof(streamer.sock_ev));
event_set(&streamer.sock_ev, fd, EV_READ | EV_PERSIST, handle_responses, &streamer);
if (event_add(&streamer.sock_ev, NULL) < 0)
errx(5, "event_add: %s (%d)\n", strerror(errno), errno);
fprintf(stderr, "[%d] Streamer: registering zsocket 0x%p (fd:%d) on context 0x%p\n",
streamer.pid, streamer.sock, fd, streamer.context);
event_dispatch();
(void) fprintf(stderr, "aborted\n");
}
// handle a new user
void accept_client (const int lsd, short event, void *data) {
struct sockaddr_in remote;
int asd = 0;
int socklen = 0;
streamer_t * streamer = (streamer_t*) data;
pid_t pid = getpid();
assert(streamer->listen_fd == lsd);
//event_add(&streamer->listen_ev, NULL);
socklen = sizeof(remote);
bzero(&remote, socklen);
asd = accept(streamer->listen_fd, (struct sockaddr *) &remote, (socklen_t*) &socklen);
if (asd > 0)
{
fprintf(stderr, "[%d] Streamer: received connection from: %s:%d, socket: %d\n",
pid, inet_ntoa(remote.sin_addr), ntohs(remote.sin_port), asd);
}
else
{
errx(6, "accept: %s (%d)\n", strerror(errno), errno);
}
// we have a new client: create a client struct and register the fd handlers
fprintf(stderr, "[%d] Streamer: ... creating new client.\n", pid);
client_t * client = new client_t;
client->fd = asd;
bzero(&client->ev, sizeof(client->ev));
event_set(&client->ev, client->fd, EV_READ | EV_PERSIST, handle_client, streamer);
if (event_add(&client->ev, NULL) < 0)
errx(5, "event_add: %s (%d)\n", strerror(errno), errno);
fprintf(stderr, "[%d] Streamer: ... adding client to the list.\n", pid);
streamer->clients[asd] = client;
}
void handle_client (const int fd, short event, void *data)
{
streamer_t * streamer = (streamer_t*) data;
if (streamer->clients.find(fd) != streamer->clients.end())
{
client_t * client = streamer->clients[fd];
assert((int) client->fd == fd);
fprintf(stderr, "[%d] Streamer: activity on regular socket fd:%d, events:%d\n", streamer->pid, fd, event);
if (event & EV_READ)
{
const unsigned int buf_size = 1024;
unsigned char buf[buf_size];
int read_len;
read_len = recv(fd, buf, buf_size, 0);
if (read_len == 0)
{
fprintf(stderr, "[%d] Streamer: ... the client has disconnected\n", streamer->pid);
// the client has disconnected
streamer->clients.erase((unsigned int)fd);
close(fd);
}
else
{
fprintf(stderr, "[%d] Streamer: ... sending zeromq message to %p\n", streamer->pid, streamer->sock);
{
// the client is asking for something: send a zeromq message
char msg_str[1024];
snprintf(msg_str, 1024, "%d %d", streamer->pid, fd);
zmq::message_t reply(strlen(msg_str));
memcpy ((void *) reply.data(), msg_str, reply.size());
bool send_ok = streamer->sock->send(reply, ZMQ_NOBLOCK);
if (!send_ok)
{
fprintf(stderr, "[%d] Streamer: ERROR: when sending message.\n", streamer->pid);
}
}
streamer->msg_count++;
fprintf(stderr, "[%d] Streamer: ...... sent!\n", streamer->pid);
}
}
}
else
{
fprintf(stderr, "[%d] Streamer: WARNING: fd:%d not found on clients list (size:%d)\n",
streamer->pid, fd, (int)streamer->clients.size());
}
}
// handle responses from the backends
void handle_responses (const int fd, short event, void *data)
{
streamer_t * streamer = (streamer_t*) data;
// detect the pending events on the zeromq world
unsigned int zmq_events;
size_t zmq_events_size = sizeof(zmq_events);
fprintf(stderr, "[%d] Streamer: zsocket activity on fd:%d, events:%d\n", streamer->pid, fd, event);
streamer->sock->getsockopt(ZMQ_EVENTS, &zmq_events, &zmq_events_size);
if (zmq_events & ZMQ_POLLIN)
{
fprintf(stderr, "[%d] Streamer: ... uhu! we were waiting for a message and there seems we have some mail!\n", streamer->pid);
std::vector< std::string > responses;
while (1)
{
int64_t more;
size_t more_size = sizeof (more);
{
zmq::message_t reply;
streamer->sock->recv(&reply, ZMQ_NOBLOCK);
streamer->sock->getsockopt (ZMQ_RCVMORE, &more, &more_size);
unsigned char * msg_data = (unsigned char *) reply.data();
unsigned int msg_len = (unsigned int) reply.size();
char msg_printf[1024];
snprintf (msg_printf, msg_len + 1, "%s", msg_data);
responses.push_back(std::string(msg_printf));
fprintf(stderr, "[%d] Streamer: ... received message: '%s' (%d bytes)\n", streamer->pid, msg_data, msg_len);
}
if (!more)
break; // Last message part
}
// verify that we are receiving the response for the message that we sent
unsigned int resp_pid, resp_fd;
sscanf(responses[0].c_str(), "%d %d", &resp_pid, &resp_fd);
if ((int)resp_pid != (int)streamer->pid)
{
fprintf(stderr, "[%d] Streamer: ...... WARNING: resp_pid (%d) != streamer->pid (%d)\n", streamer->pid, resp_pid, streamer->pid);
}
else
{
fprintf(stderr, "[%d] Streamer: ...... response looks GOOD!\n", streamer->pid);
}
}
}
/************************************************
* Downloaders
***********************************************/
void create_downloader (const int lsd)
{
downloader_t downloader;
downloader.pid = getpid();
event_init();
// create the zeromq stuff
downloader.context = new zmq::context_t (1);
downloader.sock = new zmq::socket_t (*downloader.context, ZMQ_XREP);
assert(downloader.sock != NULL);
// rc = zmq_setsockopt (downloader.sock, ZMQ_IDENTITY, "DOWNLOADER", sizeof("DOWNLOADER") - 1);
// assert(rc == 0);
fprintf(stderr, "[%d] Downloader: connecting to %s\n", downloader.pid, DOWNLOADERS_EP);
downloader.sock->connect(DOWNLOADERS_EP);
int fd = (-1);
size_t fd_size = sizeof(fd);
downloader.sock->getsockopt(ZMQ_FD, &fd, &fd_size);
// set some socket parameters
uint64_t s;
size_t slen = sizeof(s);
s = 10000;
downloader.sock->setsockopt(ZMQ_HWM, &s, slen);
// register the zeromq socket
bzero(&downloader.sock_ev, sizeof(downloader.sock_ev));
event_set(&downloader.sock_ev, fd, EV_READ | EV_PERSIST,
accept_job, &downloader);
if(event_add(&downloader.sock_ev, NULL) < 0)
errx(5, "event_add: %s (%d)\n", strerror(errno), errno);
fprintf(stderr, "[%d] Downloader: registering zsocket 0x%p (fd:%d) on context 0x%p\n",
downloader.pid, downloader.sock, fd, downloader.context);
event_dispatch();
(void) fprintf(stderr, "aborted\n");
}
void accept_job (const int fd, short event, void *data)
{
downloader_t * downloader = (downloader_t*) data;
// detect the pending events on the zeromq world
unsigned int zmq_events;
size_t zmq_events_size = sizeof(zmq_events);
fprintf(stderr, "[%d] Downloader: zsocket activity on fd:%d, events:%d\n", downloader->pid, fd, event);
assert(downloader->sock != NULL);
downloader->sock->getsockopt(ZMQ_EVENTS, &zmq_events, &zmq_events_size);
if (zmq_events & ZMQ_POLLIN)
{
fprintf(stderr, "[%d] Downloader: ... uhu! we were waiting for a message and there seems we have some mail!\n", downloader->pid);
while(1)
{
int64_t more;
size_t more_size = sizeof (more);
{
zmq::message_t request;
downloader->sock->recv (&request, ZMQ_NOBLOCK);
downloader->sock->getsockopt (ZMQ_RCVMORE, &more, &more_size);
unsigned char * msg_data = (unsigned char *) request.data();
unsigned int msg_len = (unsigned int) request.size();
char msg_printf[1024];
snprintf (msg_printf, msg_len + 1, "%s", msg_data);
fprintf(stderr, "[%d] Downloader: ... received message: '%s' (%d bytes)\n", downloader->pid,
msg_printf, msg_len);
{
// we send back this message part
zmq::message_t reply;
reply.copy(&request);
fprintf(stderr, "[%d] Downloader: ...... sending back response (%d bytes)\n", downloader->pid, (int) reply.size());
bool send_ok = downloader->sock->send(reply,
ZMQ_SNDMORE | ZMQ_NOBLOCK);
if (!send_ok)
{
fprintf(stderr, "[%d] Downloader: ERROR: when sending message.\n",
downloader->pid);
}
}
}
if (!more)
break; // Last message part
}
//sleep(10); // we have been doing some hard work!
{
char reply_str[1024];
// send the reponse!
snprintf(reply_str, 1024, "%d %d", downloader->pid, fd);
zmq::message_t reply(strlen(reply_str));
memcpy ((void *) reply.data(), reply_str, reply.size());
bool send_ok = downloader->sock->send(reply, ZMQ_NOBLOCK);
if (!send_ok)
{
fprintf(stderr, "[%d] Downloader: ERROR: when sending message.\n", downloader->pid);
}
}
fprintf(stderr, "[%d] Downloader: ...... response sent!\n",
downloader->pid);
}
fprintf(stderr, "[%d] Downloader: ... done!\n",
downloader->pid);
}
Compile this code with
gcc workers.c -o workers -levent -lzmq -luuid
There are some things we must take into account in this example code:
- First, the main point in this test was to see how it scales and if messages route forth and back the right way, so the downloaders do not really download anything. I have skipped this part of the process and our downloaders just return a proper response to the streamers.
- Second, the zeromq
queue
device is not integrated in the libevent dispatch loop, so I had to do an initial fork for launching the queue and let ir run its own dispatcher. A production version of this code should build a queue device from scratch and integrate it in the main loop.
- 本文固定链接: http://www.wy182000.com/2013/06/04/zeromq-with-libevent/
- 转载请注明: wy182000 于 Studio 发表