QPID
Releasing Simrpc
Submitted by mmorsi on Fri, 2009-10-16 23:09Simrpc is a simple rpc implementation written in Ruby, relying on Apache QPID for the transport mechanism. I modeled it after Apache QMF, though nowhere near as complex (simrpc is procedural based for example, with data structures only encapsulating data, and not methods), intended to be a placeholder until QMF becomes a bit more stable.
I was able to whip it up pretty quick, it only took me a week to write end-to-end, and thus I'm releasing it under the MIT License instead of my usual favorite, the AGPL. Eventually its a possibility that I might rewrite it in C++ as there isn't anything ruby-specific in it and I could easily then after write a Ruby wrapper (or any other).
Have at it!
Getting Started With Apache Qpid
Submitted by mmorsi on Sat, 2009-08-22 23:13Apache Qpid is an implementation of AMQP - Advanced Message Queuing Protocol
AMQP is a cross platform / cross language messaging protocol
A central broker is established which manages exchanges and queues.
Note while exchanges are currently a big part of AMQP, they are going away w/ the upcoming AMQP/1.0 spec. Regardless as they are currently integral to using Qpid, I'm going to discuss them here.
Exchanges are the named target entities which messages are sent, they get mapped to queues through bindings. Queues are also named and they queue up messages so that clients can subscribe and/or pop messages off. Both Exchanges and Queues have optional properties which if set alter the interal operations of the entity.
Messages which get passed to exchanges and accumulate in queues consist of a header, containing a number of optional properties, and a body containing the actual data.
A host connecting to the broker first establishes the underlying tcp connection on top of which 'channels' or logical connections to the broker are established. A channel should be established for every thread / concurrent entity a client uses to communicate with the broker.
Qpid is The Apache Foundation's implementation of AMQP. It's aiming to be 100% AMQP standard complaint. It is licensed under the Apache License, eg it is open source, but not copyleft, meaning you can interface and use it via any software of any license of your chooising.
qmf is a remoting framework built ontop of qpid that allows you to call methods on remote objects. I'm not going to go into qmf in this article.
To setup what you need to use Qpid on Fedora, follow these steps:
1. Install the broker: yum install qpidd
2. sudo service qpidd start # there seems to be several ways to configure qpidd See this, be warned though that some means will vary w/ the qpidd that is packaged w/ Fedora ('man qpidd' and see the init script and sysconf for more information)
3. To start qpidd manually, first stop the service via 'service qpidd stop' and then run (as root) 'sudo -u qpidd /usr/sbin/qpidd --pid-dir /var/run/qpidd --data-dir /var/lib/qpidd -t'. The "-t" results in _alot_ of output being written to stdout.
4. Install library bindings for whatever language you want to use: yum install qpidc-devel python-qpid ruby-qpid
5. At this point your ready to write your software to interface w/ qpid, obviously how you do this depends on the api for particular language. I'm going to implement an example in ruby but the terminology should be the same for the most part regardless of your language of choice
In this ruby example, we will setup a traditional request-response model, eg with a server listening for requests and returning responses. Note this is only one of many exchange types which to follow once you start playing around with the api. (much of this example comes from a ruby and a C++ example)
All you need to do to run this is download the server.rb and client.rb files below (also attached to this article). Run the server first and then the client and view the STDOUT output for each.
server.rb
# setup a 'server' qpid endpoint require "qpid" require "socket" ################################## establish the connection to the broker broker = "localhost" port = 5672 conn = Qpid::Connection.new(TCPSocket.new(broker, port)) conn.start(10) ################################## setup a session / queue / exchange ssn = conn.session("server") # create the server request exchange ssn.exchange_declare("request-exchange", :type => "direct") # create the server request queue ssn.queue_declare("request-queue") # create the server request exchange ssn.exchange_bind(:exchange => "request-exchange", :queue => "request-queue") # subscribe to messages coming in on the queue. # this tells the session to place any messages in request-queue # into a local buffer named 'messages' which we can later retrieve. ssn.message_subscribe(:destination => "messages", :queue => "request-queue", :accept_mode => ssn.message_accept_mode.none) # we grab a handle to the 'messages' buffer here incoming = ssn.incoming("messages") ################################## receive messages # start incoming message flow incoming.start() puts "receiving messages" # probably should have some exit condition while true # grab a message from the queue request = incoming.get(10) # grab request body / extract what we need s = request.body puts "Message received " + s s.slice!(0..3) # slice the 'syn ' out (eg handle the message however you want to) ################################## send the response message # get the reply_to field the message specifies # note often you won't need / care about the reply_to field and it won't be set if request.get(:message_properties).reply_to.nil? raise RuntimeError("received message doesn't specify reply field") end # need this to send the response routing_key = request.get(:message_properties).reply_to.routing_key puts "!!!" + routing_key.to_s # construct response message and send it response_body= 'ack ' + s dp = ssn.delivery_properties(:routing_key => routing_key) mp = ssn.message_properties(:content_type => "text/plain") response = Qpid::Message.new(dp, mp, response_body) puts "sending response " + response_body ssn.message_transfer(:message => response, :destination => "request-exchange") end ################################## terminate operations # cancel the subscription and close the session and connection ssn.message_cancel(:destination => "messages") ssn.close() conn.close() puts "server finished"
client.rb
# setup a 'client' qpid endpoint require "qpid" require "socket" # generate a random id for the client client_id = rand(100).to_s #exchange = "client" + client_id + "-exchange" exchange="request-exchange" queue = "client" + client_id + "-queue" puts "client " + client_id + " started" ################################## establish the connection to the broker broker = "localhost" port = 5672 conn = Qpid::Connection.new(TCPSocket.new(broker, port)) conn.start(10) ################################## setup a session / queue / exchange ssn = conn.session("client") # create an queue / exchange which to receive replies #ssn.exchange_declare(exchange, :type => "direct") ssn.queue_declare(queue) ssn.exchange_bind(:exchange => exchange, :queue => queue, :binding_key => queue) # subscribe to messages coming in the response queue ssn.message_subscribe(:destination => "client-messages", :queue => queue, :accept_mode => ssn.message_accept_mode.none) # handle message received event asynchronously incoming = ssn.incoming("client-messages") incoming.start incoming.listen { |msg| puts "Response received " + msg.body } ################################## send request message # create request message dp = ssn.delivery_properties(:routing_key => "request-queue") mp = ssn.message_properties( :content_type => "text/plain") rp = ssn.message_properties( :reply_to => ssn.reply_to(exchange, queue)) msg = Qpid::Message.new(dp, mp, rp, "syn " + client_id) # send it ssn.message_transfer(:message => msg) ################################## terminate operations # wait a little time for a response # FIXME do this via a lock sleep(5) # cancel the subscription and close the session and connection ssn.message_cancel(:destination => "messages") ssn.close() conn.close() puts "client finished"
Disclaimer, I'm pretty new to all this myself so there might be better / more efficient ways of doing things. But these examples should work and demonstrate the basic functionality of qpid. From here, all you need is to deteremine the model which you want to communicate between endpoints and leverage the qpid API to do that (obviously one of the most powerful features of AMQP / QPID is its language-agnosticity, different endpoints can be written in different languages, even a javascript client was recently written).
Hope this helps anyone getting started with AMQP / Qpid like I am. Look for more posts on the subject as I start learning the intrices of the API.
Useful links:
AMQP Terminology (a MUST read if you intend to use Qpid)
Official Qpid Documentation
Good ruby/qpid example
Good informantion
C++ QPID API / and examples
Python QPID API / and examples
Ruby QPID API which I'm hosting locally since no official version is on the Apache Qpid site (abliet it's a bit lacking in comments) / and examples





