- ActiveMQ, RabbitMQ和ZeroMQ的特性比较(当初为了为共享给别人,写的是英文,而且放到了Google Doc上,现在,我自己也看不了了,
)。
选择MQ时,主要关注的特性,可能就以下几个:
通信模式(是否满足业务场景):
ActiveMQ: queue(producer/consumer), topic(publisher/subsriber)
RabbitMQ: AMQP协议支持路由选择和广播
ZeroMQ: 支持REQ,PUSH, PULL等模式
支持的接口语言:
这三个因为比较流行,所以常见的语言比如python,php等都有相应的封装。。这样,客户端和服务端可以用不同语言来写了
message持久化:
ActiveMQ 和 RabbitMQ都支持
服务器主备搭建,客户端自动failover支持:
ActiveMQ(非常容易) > RabbitMQ > ZeroMQ
如果对性能敏感,可能需要在上面几个都选定了的情况下,测试一下性能,做到心里有数。
- ActiveMQ, RabbitMQ和ZeroMQ性能测试结果
1 个consumer vs 一个producer,message大小约120Bytes,message总数100万
因为当初测试的目的只是为了对性能做定性了解,所以包括语言的选择,消息接收发送模式没有严格一致。
ActiveMQ: 10526条/秒 版本 5.6.0
RabbitMQ:4926条/秒 版本2.8.6
ZeroMQ: 15625条/秒 版本2.2.0
ActiveMQ 性能比自己想象中的要好。一般应用足够了。
RabbitMQ 测试结果和官方结果的相差很大。一方面可能是服务端编译,配置优化,另一方面可能是客户端语言的选择,我用的python,官方的是Java。
ZeroMQ性能没这么突出,应该是测试模式用的是ZMQ_REQ有关,client和server多一次message交互
- ActiveMQ测试过程[接口: python,协议stomp]
./active_consumer.py consumer_1 1000000 /queue/test 10.21.0.32 ./active_producer.py producer_1 1000000 /queue/test 10.21.0.32
- RabbitMQ测试过程[接口: python,协议AMQP]
./rabbit_consumer.py consumer_1 1000000 10.21.0.33 ./rabbit_producer.py producer_1 1000000 10.21.0.33
- ZeroMQ测试过程[接口: C++,模式REQ]
./server date;./client;date
- ActiveMQ 测试代码
-
- Server配置
- ActiveMQ server: conf/activemq.xml 添加<transportConnector name="stomp" uri="stomp://0.0.0.0:61613"/>
- Python接口:
- http://code.google.com/p/stomppy/ install: sudo python setup.py install --no-compile
- active_producer.py
- Server配置
#!/usr/bin/env python import stomp import logging import sys import random from datetime import *; import time logging.basicConfig() if len(sys.argv) <= 4: print "Usage: %s id msg_num destination [mq_ip] [mq_port]" %(sys.argv[0]) sys.exit(1); id = sys.argv[1] msg_total = int(sys.argv[2]) dest=sys.argv[3]; mq_ip = '10.125.42.57' if len(sys.argv) >= 5: mq_ip = sys.argv[4] mq_port = 61613 if len(sys.argv) >= 6: mq_port = int(sys.argv[5]) datetime_len = len('2012-08-02T13:44:31.348417'); #print "datetime_len = ", datetime_len message='hello world' * 10; conn=stomp.Connection([(mq_ip,mq_port)]) #print('set up Connection') conn.start() #print('started connection') conn.connect(wait=True) #print('connected') print "%s id = %s starts at %s" %(sys.argv[0],id, datetime.now().isoformat()) msg_sent = 0 while msg_sent < msg_total: send_time = datetime.now().isoformat() send_msg = message + send_time conn.send(message=send_msg, destination=dest,headers={'seltype':'mandi-age-to-man','type':'textMessage'},ack='auto') msg_sent += 1; conn.disconnect() print "%s id = %s ends at %s and send %d messages" %(sys.argv[0], id, datetime.now().isoformat(), msg_sent)
- active_consumer.py
#!/usr/bin/env python import stomp import logging import sys import random from datetime import *; import time logging.basicConfig() if len(sys.argv) <= 4: print "Usage: %s id msg_num destination [mq_ip] [mq_port]" %(sys.argv[0]) sys.exit(1); id = sys.argv[1] msg_total = int(sys.argv[2]) dest=sys.argv[3]; mq_ip = '10.125.42.57' if len(sys.argv) >= 5: mq_ip = sys.argv[4] mq_port = 61613 if len(sys.argv) >= 6: mq_port = int(sys.argv[5]) msg_received = 0; msg_total_delay = 0 datetime_len = len('2012-08-02T13:44:31.348417'); #print "datetime_len = ", datetime_len class MyListener(stomp.ConnectionListener): def on_error(self, headers, message): print('received an error %s' % message) def on_message(self, headers, message): recv_time = datetime.now() #for k,v in headers.iteritems(): # print('header: key %s , value %s' %(k,v)) #print('received message\n %s'% message) try: (dt, micro_sec) = message[-datetime_len:].split("."); d = time.strptime(dt, "%Y-%m-%dT%H:%M:%S") send_time = datetime(int(d[0]), int(d[1]), int(d[2]), int(d[3]), int(d[4]), int(d[5]), int(micro_sec)); #print recv_time.isoformat() delta = recv_time - send_time; delta_micro = delta.microseconds + delta.seconds * 1000000 + delta.days * (1000000 * 86400) #print delta_micro global msg_total_delay msg_total_delay += delta_micro except: print "exception msg ", message global msg_received msg_received += 1; conn=stomp.Connection([(mq_ip, mq_port)]) #print('set up Connection') conn.set_listener('somename',MyListener()) #print('Set up listener') conn.start() #print('started connection') conn.connect(wait=True) #print('connected') conn.subscribe(destination=dest, ack='auto') #print('subscribed') print "receiver id: %s starts" %(id) while 1: #print msg_received #print msg_total if msg_received >= msg_total: conn.disconnect() #print('disconnected') print "average delay: receiver id: %s, %d microseconds, %s ends at %s and revecived %d messages" %(id, msg_total_delay / msg_received, sys.argv[0], datetime.now().isoformat(), msg_received) sys.exit(1) time.sleep(1)
- RabbitMQ 测试代码
- Python接口:
sudo yum install python-pip git-core; sudo pip-python install pika==0.9.5
- rabbit_producer.py
#!/usr/bin/env python import pika import sys from datetime import *; import time if len(sys.argv) < 3: print "Usage: %s id msg_num [mq_ip]" %(sys.argv[0]) sys.exit(1); id = sys.argv[1] msg_total = int(sys.argv[2]) mq_ip = '10.125.42.57' if len(sys.argv) >= 4: mq_ip = sys.argv[3] connection = pika.BlockingConnection(pika.ConnectionParameters( host=mq_ip)) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') datetime_len = len('2012-08-02T13:44:31.348417'); #print "datetime_len = ", datetime_len message='hello world' * 10; print "%s id = %s starts at %s" %(sys.argv[0],id, datetime.now().isoformat()) msg_sent = 0 while msg_sent < msg_total: send_time = datetime.now().isoformat() send_msg = message + send_time channel.basic_publish(exchange='direct_logs', routing_key='broadcast', body=send_msg) msg_sent += 1; connection.close() print "%s id = %s ends at %s and send %d messages" %(sys.argv[0], id, datetime.now().isoformat(), msg_sent)
- rabbit_consumer.py
#!/usr/bin/env python import pika import sys from datetime import *; import time import signal if len(sys.argv) < 3: print "Usage: %s id msg_num [mq_ip]" %(sys.argv[0]) sys.exit(1); id = sys.argv[1] msg_total = int(sys.argv[2]) mq_ip = '10.125.42.57' if len(sys.argv) >= 4: mq_ip = sys.argv[3] msg_received = 0; msg_total_delay = 0 def signal_handler(signal, frame): global msg_received print "You pressed Ctrl+C!, message received %d" %(msg_received) sys.exit(0) signal.signal(signal.SIGINT, signal_handler) datetime_len = len('2012-08-02T13:44:31.348417'); #print "datetime_len = ", datetime_len connection = pika.BlockingConnection(pika.ConnectionParameters( host=mq_ip)) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue print "receiver id: %s starts" %(id) def callback(ch, method, properties, message): recv_time = datetime.now() #for k,v in headers.iteritems(): # print('header: key %s , value %s' %(k,v)) #print('received message\n %s'% message) try: (dt, micro_sec) = message[-datetime_len:].split("."); d = time.strptime(dt, "%Y-%m-%dT%H:%M:%S") send_time = datetime(int(d[0]), int(d[1]), int(d[2]), int(d[3]), int(d[4]), int(d[5]), int(micro_sec)); #print recv_time.isoformat() delta = recv_time - send_time; delta_micro = delta.microseconds + delta.seconds * 1000000 + delta.days * (1000000 * 86400) #print delta_micro global msg_total_delay msg_total_delay += delta_micro except: print "exception msg ", message global msg_received msg_received += 1; if msg_received >= msg_total: print "average delay: receiver id: %s, %d microseconds, %s ends at %s and revecived %d messages" %(id, msg_total_delay / msg_received, sys.argv[0], datetime.now().isoformat(), msg_received) sys.exit(1); channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key="broadcast") channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
- ZeroMQ 测试代码
- c++接口 http://www.zeromq.org/bindings:cpp
- client.cpp
// // Hello World client in C++ // Connects REQ socket to tcp://localhost:5555 // Sends "Hello" to server, expects "World" back // #include <zmq.hpp> #include <string> #include <iostream> using namespace std; int main () { // Prepare our context and socket zmq::context_t context (1); zmq::socket_t socket (context, ZMQ_REQ); socket.connect ("tcp://localhost:5555"); string s(128, 'a'); //cout << s << endl; // Do 10 requests, waiting each time for a response for (int request_nbr = 0; request_nbr != 1000000; request_nbr++) { zmq::message_t request (s.size() + 1); memcpy (request.data (), s.c_str(), s.size() + 1); #ifdef DEBUG cout << request_nbr << " sent data " << (char *)request.data() << endl; #endif socket.send (request); // Get the reply. zmq::message_t reply; socket.recv (&reply); #ifdef DEBUG cout << request_nbr << " received data " << (char *)reply.data() << endl; #endif } return 0; }
- server.cpp
// // Hello World server in C++ // Binds REP socket to tcp://*:5555 // #include <zmq.hpp> #include <string> #include <iostream> #include <unistd.h> using namespace std; int main () { // Prepare our context and socket zmq::context_t context (1); zmq::socket_t socket (context, ZMQ_REP); socket.bind ("tcp://*:5555"); while (true) { zmq::message_t request; // Wait for next request from client socket.recv (&request); //cout << (char*) request.data() << endl; /* std::cout << "Received Hello" << std::endl; // Do some 'work' sleep (1); // Send reply back to client string s('a', 128); zmq::message_t reply (); memcpy ((void *) reply.data (), "World", 5); */ socket.send (request); } return 0; }
- makefile
all release: g++ -g -Wall -fPIC -I/admin//admin/include -I/admin//admin/utils -shared -c client.cpp -o client.o cc -L/admin//admin/lib -g -lstdc++ -lzmq -o client client.o g++ -g -Wall -fPIC -I/admin//admin/include -I/admin//admin/utils -shared -c server.cpp -o server.o cc -L/admin//admin/lib -g -lstdc++ -lzmq -o server server.o debug: g++ -g -Wall -DDEBUG -fPIC -I/admin//admin/include -I/admin//admin/utils -shared -c client.cpp -o client.o cc -L/admin//admin/lib -g -lstdc++ -lzmq -o client client.o g++ -g -Wall -DDEBUG -fPIC -I/admin//admin/include -I/admin//admin/utils -shared -c server.cpp -o server.o cc -L/admin//admin/lib -g -lstdc++ -lzmq -o server server.o clean: rm -f *.o ctest client server
- README
release version: make all , make release or make debug version: client outputs sent/received data make debug clean: make clean LD_LIBRARY_PATH: export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/lib/ usage: ./server date;./client;date