# setup a 'client' qpid endpoint require "qpid" require "socket" # generate a random id for the client client_id = rand(100).to_s #exchange = "client" + client_id + "-exchange" exchange="request-exchange" queue = "client" + client_id + "-queue" puts "client " + client_id + " started" ################################## establish the connection to the broker broker = "localhost" port = 5672 conn = Qpid::Connection.new(TCPSocket.new(broker, port)) conn.start(10) ################################## setup a session / queue / exchange ssn = conn.session("client") # create an queue / exchange which to receive replies #ssn.exchange_declare(exchange, :type => "direct") ssn.queue_declare(queue) ssn.exchange_bind(:exchange => exchange, :queue => queue, :binding_key => queue) # subscribe to messages coming in the response queue ssn.message_subscribe(:destination => "client-messages", :queue => queue, :accept_mode => ssn.message_accept_mode.none) # handle message received event asynchronously incoming = ssn.incoming("client-messages") incoming.start incoming.listen { |msg| puts "Response received " + msg.body } ################################## send request message # create request message dp = ssn.delivery_properties(:routing_key => "request-queue") mp = ssn.message_properties( :content_type => "text/plain") rp = ssn.message_properties( :reply_to => ssn.reply_to(exchange, queue)) msg = Qpid::Message.new(dp, mp, rp, "syn " + client_id) # send it ssn.message_transfer(:message => msg) ################################## terminate operations # wait a little time for a response # FIXME do this via a lock sleep(5) # cancel the subscription and close the session and connection ssn.message_cancel(:destination => "messages") ssn.close() conn.close() puts "client finished"