最新消息:热烈庆祝IT小记上线!

ActiveMQ, RabbitMQ和ZeroMQ 特性和性能的比较

  • ActiveMQ, RabbitMQ和ZeroMQ的特性比较(当初为了为共享给别人,写的是英文,而且放到了Google Doc上,现在,我自己也看不了了,委屈)。

     ActiveMQ, RabbitMQ 和 ZeroMQ的特性比较[Google Doc]  

             

             选择MQ时,主要关注的特性,可能就以下几个:

                        通信模式(是否满足业务场景): 

                                   ActiveMQ: queue(producer/consumer), topic(publisher/subsriber)

                                   RabbitMQ: AMQP协议支持路由选择和广播

                                   ZeroMQ: 支持REQ,PUSH, PULL等模式   

 

                        支持的接口语言: 

                                     这三个因为比较流行,所以常见的语言比如python,php等都有相应的封装。。这样,客户端和服务端可以用不同语言来写了


                        message持久化:

                                       ActiveMQ 和 RabbitMQ都支持          


                         服务器主备搭建,客户端自动failover支持:

                                      ActiveMQ(非常容易) > RabbitMQ > ZeroMQ

         

                           如果对性能敏感,可能需要在上面几个都选定了的情况下,测试一下性能,做到心里有数。  

  • ActiveMQ, RabbitMQ和ZeroMQ性能测试结果

                  1 个consumer vs 一个producer,message大小约120Bytes,message总数100万

                   因为当初测试的目的只是为了对性能做定性了解,所以包括语言的选择,消息接收发送模式没有严格一致。

        

                   ActiveMQ: 10526条/秒     版本 5.6.0

                   RabbitMQ:4926条/秒       版本2.8.6

                   ZeroMQ:      15625条/秒      版本2.2.0 

 

                   ActiveMQ 性能比自己想象中的要好。一般应用足够了。

                   RabbitMQ 测试结果和官方结果的相差很大。一方面可能是服务端编译,配置优化,另一方面可能是客户端语言的选择,我用的python,官方的是Java。

                   ZeroMQ性能没这么突出,应该是测试模式用的是ZMQ_REQ有关,client和server多一次message交互


    • ActiveMQ测试过程[接口: python,协议stomp]

      ./active_consumer.py consumer_1 1000000 /queue/test 10.21.0.32
      ./active_producer.py producer_1 1000000 /queue/test 10.21.0.32

    • RabbitMQ测试过程[接口: python,协议AMQP]

      ./rabbit_consumer.py consumer_1 1000000 10.21.0.33
      ./rabbit_producer.py producer_1 1000000 10.21.0.33

    • ZeroMQ测试过程[接口: C++,模式REQ]
    ./server
     date;./client;date

  • ActiveMQ 测试代码
    • Server配置 
      •  ActiveMQ server:  conf/activemq.xml 添加<transportConnector name="stomp" uri="stomp://0.0.0.0:61613"/>
    • Python接口: 
      • http://code.google.com/p/stomppy/   install: sudo python setup.py install --no-compile
    • active_producer.py

#!/usr/bin/env python
import stomp
import logging
import sys
import random
from datetime import *;
import time

logging.basicConfig()

if len(sys.argv) <= 4:
	print "Usage: %s id msg_num destination [mq_ip] [mq_port]" %(sys.argv[0])
	sys.exit(1);

id = sys.argv[1]

msg_total = int(sys.argv[2])

dest=sys.argv[3];

mq_ip = '10.125.42.57'
if len(sys.argv) >= 5:
	mq_ip = sys.argv[4]

mq_port = 61613
if len(sys.argv) >= 6:
	mq_port = int(sys.argv[5])

datetime_len = len('2012-08-02T13:44:31.348417');
#print "datetime_len = ", datetime_len

message='hello world' * 10;

conn=stomp.Connection([(mq_ip,mq_port)])
#print('set up Connection')

conn.start()
#print('started connection')

conn.connect(wait=True)
#print('connected')

print "%s id = %s starts at %s" %(sys.argv[0],id, datetime.now().isoformat())

msg_sent = 0
while msg_sent <  msg_total:
	send_time = datetime.now().isoformat()

	send_msg = message + send_time

	conn.send(message=send_msg, destination=dest,headers={'seltype':'mandi-age-to-man','type':'textMessage'},ack='auto')

	msg_sent += 1;

conn.disconnect()
print "%s id = %s ends at %s and send %d messages" %(sys.argv[0], id, datetime.now().isoformat(), msg_sent)

    • active_consumer.py

#!/usr/bin/env python
import stomp
import logging
import sys
import random
from datetime import *;
import time


logging.basicConfig()




if len(sys.argv) <= 4:
	print "Usage: %s id msg_num destination [mq_ip] [mq_port]" %(sys.argv[0])
	sys.exit(1);


id = sys.argv[1]


msg_total = int(sys.argv[2])


dest=sys.argv[3];


mq_ip = '10.125.42.57'
if len(sys.argv) >= 5:
	mq_ip = sys.argv[4]


mq_port = 61613
if len(sys.argv) >= 6:
	mq_port = int(sys.argv[5])


msg_received = 0;
msg_total_delay = 0


datetime_len = len('2012-08-02T13:44:31.348417');
#print "datetime_len = ", datetime_len




class MyListener(stomp.ConnectionListener):
    def on_error(self, headers, message):
        print('received an error %s' % message)


    def on_message(self, headers, message):
	recv_time = datetime.now()


        #for k,v in headers.iteritems():
        #    print('header: key %s , value %s' %(k,v))


        #print('received message\n %s'% message)
	try:
		(dt, micro_sec) = message[-datetime_len:].split(".");


		d = time.strptime(dt, "%Y-%m-%dT%H:%M:%S")
		
		send_time = datetime(int(d[0]), int(d[1]), int(d[2]), int(d[3]), int(d[4]), int(d[5]), int(micro_sec));


		#print recv_time.isoformat()


		delta = recv_time - send_time;
			
		delta_micro =  delta.microseconds + delta.seconds * 1000000 + delta.days * (1000000 * 86400)
		#print delta_micro
		global msg_total_delay
		msg_total_delay += delta_micro
	except:
		print "exception msg ", message


	global msg_received
	msg_received += 1;
				


conn=stomp.Connection([(mq_ip, mq_port)])
#print('set up Connection')
conn.set_listener('somename',MyListener())
#print('Set up listener')


conn.start()
#print('started connection')


conn.connect(wait=True)
#print('connected')
conn.subscribe(destination=dest, ack='auto')
#print('subscribed')


print "receiver id: %s starts" %(id)


while 1:
	#print msg_received
	#print msg_total


	if msg_received >= msg_total:
		conn.disconnect()
		#print('disconnected')
		print "average delay: receiver id: %s, %d microseconds, %s ends at %s and revecived %d messages" %(id, msg_total_delay / msg_received, sys.argv[0], datetime.now().isoformat(),   msg_received)
		sys.exit(1)


	time.sleep(1)	

  • RabbitMQ 测试代码

    • Python接口: 

                             sudo yum install python-pip git-core; sudo pip-python install pika==0.9.5

    • rabbit_producer.py

#!/usr/bin/env python
import pika
import sys
from datetime import *;
import time


if len(sys.argv) < 3:
	print "Usage: %s id msg_num [mq_ip]" %(sys.argv[0])
	sys.exit(1);


id = sys.argv[1]


msg_total = int(sys.argv[2])


mq_ip = '10.125.42.57'
if len(sys.argv) >= 4:
	mq_ip = sys.argv[3]


connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=mq_ip))
channel = connection.channel()


channel.exchange_declare(exchange='direct_logs',
                         type='direct')


datetime_len = len('2012-08-02T13:44:31.348417');
#print "datetime_len = ", datetime_len




message='hello world' * 10;


print "%s id = %s starts at %s" %(sys.argv[0],id, datetime.now().isoformat())


msg_sent = 0
while msg_sent <  msg_total:
	send_time = datetime.now().isoformat()


	send_msg = message + send_time


	channel.basic_publish(exchange='direct_logs',
                      routing_key='broadcast',
                      body=send_msg)


	msg_sent += 1;


connection.close()


print "%s id = %s ends at %s and send %d messages" %(sys.argv[0], id, datetime.now().isoformat(), msg_sent)


    • rabbit_consumer.py

#!/usr/bin/env python
import pika
import sys
from datetime import *;
import time
import signal

if len(sys.argv) < 3:
	print "Usage: %s id msg_num [mq_ip]" %(sys.argv[0])
	sys.exit(1);

id = sys.argv[1]

msg_total = int(sys.argv[2])

mq_ip = '10.125.42.57'
if len(sys.argv) >= 4:
	mq_ip = sys.argv[3]

msg_received = 0;
msg_total_delay = 0

def signal_handler(signal, frame):
	global msg_received
        print "You pressed Ctrl+C!, message received %d" %(msg_received)
        sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)

datetime_len = len('2012-08-02T13:44:31.348417');
#print "datetime_len = ", datetime_len

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host=mq_ip))
channel = connection.channel()

channel.exchange_declare(exchange='direct_logs',
                         type='direct')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue


print "receiver id: %s starts" %(id)

def callback(ch, method, properties, message):
	recv_time = datetime.now()

        #for k,v in headers.iteritems():
        #    print('header: key %s , value %s' %(k,v))

        #print('received message\n %s'% message)
	try:
		(dt, micro_sec) = message[-datetime_len:].split(".");

		d = time.strptime(dt, "%Y-%m-%dT%H:%M:%S")
		
		send_time = datetime(int(d[0]), int(d[1]), int(d[2]), int(d[3]), int(d[4]), int(d[5]), int(micro_sec));

		#print recv_time.isoformat()

		delta = recv_time - send_time;
			
		delta_micro =  delta.microseconds + delta.seconds * 1000000 + delta.days * (1000000 * 86400)
		#print delta_micro
		global msg_total_delay
		msg_total_delay += delta_micro
	except:
		print "exception msg ", message

	global msg_received
	msg_received += 1;

	if msg_received >= msg_total:
		print "average delay: receiver id: %s, %d microseconds, %s ends at %s and revecived %d messages" %(id, msg_total_delay / msg_received, sys.argv[0], datetime.now().isoformat(),   msg_received)
		sys.exit(1);

channel.queue_bind(exchange='direct_logs',
                       queue=queue_name,
                       routing_key="broadcast")

channel.basic_consume(callback,
                      queue=queue_name,
                      no_ack=True)

channel.start_consuming()


//
// Hello World client in C++
// Connects REQ socket to tcp://localhost:5555
// Sends "Hello" to server, expects "World" back
//
#include <zmq.hpp>
#include <string>
#include <iostream>
using namespace std;


int main ()
{
    // Prepare our context and socket
    zmq::context_t context (1);
    zmq::socket_t socket (context, ZMQ_REQ);


    socket.connect ("tcp://localhost:5555");


        string s(128, 'a');
        //cout << s << endl;


    // Do 10 requests, waiting each time for a response
    for (int request_nbr = 0; request_nbr != 1000000; request_nbr++) {


        zmq::message_t request (s.size() + 1);
        memcpy (request.data (), s.c_str(), s.size() + 1);
                #ifdef DEBUG
                        cout << request_nbr << " sent data " << (char *)request.data() << endl;
                #endif
        socket.send (request);


        // Get the reply.
        zmq::message_t reply;
        socket.recv (&reply);
                #ifdef DEBUG
                        cout << request_nbr << " received data " << (char *)reply.data() << endl;
                #endif
    }
    return 0;
}

    • server.cpp

//
// Hello World server in C++
// Binds REP socket to tcp://*:5555
//
#include <zmq.hpp>
#include <string>
#include <iostream>
#include <unistd.h>
using namespace std;

int main () {
    // Prepare our context and socket
    zmq::context_t context (1);
    zmq::socket_t socket (context, ZMQ_REP);
    socket.bind ("tcp://*:5555");

    while (true) {
        zmq::message_t request;

        // Wait for next request from client
        socket.recv (&request);
		//cout << (char*) request.data() << endl;
		/*
        std::cout << "Received Hello" << std::endl;

        // Do some 'work'
        sleep (1);

        // Send reply back to client
		string s('a', 128);
        zmq::message_t reply ();
        memcpy ((void *) reply.data (), "World", 5);
		*/
        socket.send (request);
    }
    return 0;
}


    • makefile

all release:
	g++ -g -Wall -fPIC -I/admin//admin/include -I/admin//admin/utils -shared -c client.cpp -o client.o
	cc -L/admin//admin/lib -g -lstdc++ -lzmq -o client client.o

	g++ -g -Wall -fPIC -I/admin//admin/include -I/admin//admin/utils -shared -c server.cpp -o server.o
	cc -L/admin//admin/lib -g -lstdc++ -lzmq -o server server.o

debug:
	g++ -g -Wall -DDEBUG -fPIC -I/admin//admin/include -I/admin//admin/utils -shared -c client.cpp -o client.o
	cc -L/admin//admin/lib -g -lstdc++ -lzmq -o client client.o
	
	g++ -g -Wall -DDEBUG -fPIC -I/admin//admin/include -I/admin//admin/utils -shared -c server.cpp -o server.o
	cc -L/admin//admin/lib -g -lstdc++ -lzmq -o server server.o


clean:
	rm -f *.o ctest client server


    • README

release version:
	make all , make release or make

debug version: client outputs sent/received data 
	make debug

clean:
	make clean

LD_LIBRARY_PATH:
	export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/lib/

usage:	
	./server
	date;./client;date



上一篇 读取MAC地址

猜您喜欢

备案号:苏ICP备12016861号-4