# setup a 'server' qpid endpoint require "qpid" require "socket" ################################## establish the connection to the broker broker = "localhost" port = 5672 conn = Qpid::Connection.new(TCPSocket.new(broker, port)) conn.start(10) ################################## setup a session / queue / exchange ssn = conn.session("server") # create the server request exchange ssn.exchange_declare("request-exchange", :type => "direct") # create the server request queue ssn.queue_declare("request-queue") # create the server request exchange ssn.exchange_bind(:exchange => "request-exchange", :queue => "request-queue") # subscribe to messages coming in on the queue. # this tells the session to place any messages in request-queue # into a local buffer named 'messages' which we can later retrieve. ssn.message_subscribe(:destination => "messages", :queue => "request-queue", :accept_mode => ssn.message_accept_mode.none) # we grab a handle to the 'messages' buffer here incoming = ssn.incoming("messages") ################################## receive messages # start incoming message flow incoming.start() puts "receiving messages" # probably should have some exit condition while true # grab a message from the queue request = incoming.get(10) # grab request body / extract what we need s = request.body puts "Message received " + s s.slice!(0..3) # slice the 'syn ' out (eg handle the message however you want to) ################################## send the response message # get the reply_to field the message specifies # note often you won't need / care about the reply_to field and it won't be set if request.get(:message_properties).reply_to.nil? raise RuntimeError("received message doesn't specify reply field") end # need this to send the response routing_key = request.get(:message_properties).reply_to.routing_key puts "!!!" + routing_key.to_s # construct response message and send it response_body= 'ack ' + s dp = ssn.delivery_properties(:routing_key => routing_key) mp = ssn.message_properties(:content_type => "text/plain") response = Qpid::Message.new(dp, mp, response_body) puts "sending response " + response_body ssn.message_transfer(:message => response, :destination => "request-exchange") end ################################## terminate operations # cancel the subscription and close the session and connection ssn.message_cancel(:destination => "messages") ssn.close() conn.close() puts "server finished"