RabbitMQ Transport in WSO2 ESB


INTRODUCTION

What is RabbitMQ?
RabbitMQ is an open source message broker which uses the AMQP (Advanced Message Queueing Protocol) protocol which is written in Erlang.

What is AMQP?
AMQP is a standard wire-level protocol and semantic framework for high performance enterprise messaging. AMQP is an Open Standard for Messaging Middleware.

By complying to the AMQP standard, middleware products written for different platforms and in different languages can send messages to one another. AMQP addresses the problem of transporting value-bearing messages across and between organisations in a timely manner.
AMQP enables complete interoperability for messaging middleware; both the networking protocol and the semantics of broker services are defined in AMQP [1].

JMS vs RabbitMQ
Java Message Service is an API that is part of Java EE for sending messages between two or more clients.  There are many JMS providers such as ActiveMQ and OpenMQ [2]. RabbitMQ is an open source message broker software which implements the AMQP wire-level protocol.

The salient features of AMQP are listed below [3].

  • Security
  • Reliability
  • Interoperability
  • Standard
  • Open




SETTING THE THINGS UP

Now let me show you how these capabilities can be achieved by using the RabbitMQ transport implementation in WSO2 ESB. First you need to install the RabbitMQ broker in your machine [5]. Once it is installed successfully you can start the RabbitMQ broker in Ubuntu by using the following command.

sudo invoke-rc.d rabbitmq-server start

Then install the management plugin for the RabbitMQ [6], so that you can access the broker using web based management console. After installing the management plugin, you may access the management web console of the broker using the url; http://localhost:15672/

Then download the latest WSO2 ESB 4.9.0 pack from [4]. Move on to the \$ESB_HOME/repository/conf/axis2 directory and open axis2.xml file. There under the transport receivers section find the rabbitmq transport receiver and replace it with your configuration. A sample configuration is given here to assist you.



Now WSO2 ESB can act as a RabbitMQ consumer. After that in the same file (i.e. axis2.xml), locate the transport sender element for the rabbitmq and uncomment it. A sample configuration for rabbitmq sender is given below.

After applying this, WSO2 ESB can act as  RabbitMQ producer where it can produce messages to a given RabbitMQ queue or topic.

As I mentioned above the WSO2 ESB can act as either a RabbitMQ Producer or a Consumer. Now let us elaborate these concepts further.




USE CASE 1 : WSO2 ESB as RabbitMQ Producer
In this scenario WSO2 ESB will produce a getQuote message to a queue resides in RabbitMQ broker. We can use the client application [7], to consume that message and verify the behaviour.

Copy the following synapse configuration to the source view of your ESB. In this configuration the Proxy service will publish a message to a queue named ‘queue1’ which resides inside the RabbitMQ broker.


ESB as RabbitMQ Producer-1.png

Figure 1: WSO2 ESB as RabbitMQ Producer scenario

The payload we are going to use is given below. Copy it to a file named ‘getQuote.xml’ and save to your local file system.





Then from the directory where the ‘getQuote.xml’ resides, execute the following curl command.


curl -v -d @getQuote.xml -H "Content-Type: text/xml; charset=utf-8" -H "SOAPAction:urn:getQuote"  http://localhost:8280/services/RabbitMQProducerProxy


This will merely send the getQuote payload to the Proxy service, which eventually places this message into a queue named ‘queue1’ which resides inside the RabbitMQ broker. You may use the RabbitMQ administration web console to check this message. For that click on the Quques section and select the queue to which the message was produced, ‘quque1’ in this case. Then scroll down and click on Get Message(s), which will show you the message we produced a little while ago.

Alternatively you may use the Java client application [7] to retrieve the message we produced above. For that checkout the Git project [7] and import it into your Eclipse IDE. Open the BasicRabbitMQQueueConsumer class in your editor. Change the two variables as given below.


Then just run that class, and it will fetch the above message from the destination queue and print it to the console. This way we can verify that the above message was produced to the RabbitMQ queue successfully.




USE CASE 2 : WSO2 ESB as RabbitMQ Consumer

In this scenario WSO2 ESB will consume a getQuote message from a queue resides in RabbitMQ broker. We can use the client application [7], to produce the message and verify the behaviour.

Copy the following synapse configuration to the source view of your ESB. In this configuration the Proxy service will consume a message from a queue named ‘queue2’ which resides inside the RabbitMQ broker and then it will send this message to the StockQuote endpoint using send mediator.

Now before moving forward we need to install SimpleStockQuoteService to the Axis2Server and start it. For that move on to the \$ESB_HOME/samples/axis2Server/src/SimpleStockQuoteService and execute ant command, which will install the StockQuoteService to the Axis2Server. Then move on to the \$ESB_HOME/samples/axis2Server/ directory and execute the following command to start the axis2 Server.

./axis2Server



ESB as RabbitMQ Consumer-3.png

Figure 2: ESB as a RabbitMQ Consumer scenario






Now we need to produce the message to a queue named ‘queue2’ which resides inside the RabbitMQ broker. For that open the BasicRabbitMQQueueProducer class in your Eclipse editor and set the following parameters to the given values.


Let’s set message count to 1, since we don’t want to send multiple messages for the moment. After changing the above parameters in BasicRabbitMQQueueProducer class, merely run it from your Eclipse IDE. It will publish a placeOrder message/payload to the queue2 which resides inside the RabbitMQ broker. Then the ESB Proxy will consume it and dispatch to the StockQuote endpoint. You may see the ‘Accepted order’ notification message at the endpoint.




CONCLUSION
In this workshop I have walked you through the RabbitMQ AMQP transport in WSO2 ESB. I have started with an introduction to AMQP, RabbitMQ. Then I have explained two use cases of RabbitMQ transport where WSO2 ESB acts as RabbitMQ Producer and Consumer. In my next article I will explain you how to enable SSL support in WSO2 ESB RabbitMQ transport which becomes handy in a production setup. Till then Bye !




References

Comments

  1. I follow the sample https://docs.wso2.com/display/ESB490/ESB+as+a+RabbitMQ+Message+Producer. When ESB tries to publish a message to rabbitmq, it always throws exception.

    I run jdk1.7 on windows7, and esb4.9 and rabbitmq. RabbitMQ and ESB are running on same machine.


    2016-04-20 10:36:03,665 [-] [PassThroughMessageProcessor-275] ERROR RabbitMQMessageSender Error while creating connection pool
    java.io.IOException
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
    at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
    at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:376)
    at com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory.newConnection(RecoveryAwareAMQConnectionFactory.java:36)
    at com.rabbitmq.client.impl.recovery.AutorecoveringConnection.init(AutorecoveringConnection.java:83)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:609)
    at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:651)
    at org.apache.axis2.transport.rabbitmq.RabbitMQConnectionFactory.getConnectionPool(RabbitMQConnectionFactory.java:146)
    at org.apache.axis2.transport.rabbitmq.RabbitMQMessageSender.(RabbitMQMessageSender.java:64)
    at org.apache.axis2.transport.rabbitmq.RabbitMQSender.sendMessage(RabbitMQSender.java:87)
    at org.apache.axis2.transport.base.AbstractTransportSender.invoke(AbstractTransportSender.java:112)
    at org.apache.axis2.engine.AxisEngine.send(AxisEngine.java:442)
    at org.apache.axis2.description.OutOnlyAxisOperationClient.executeImpl(OutOnlyAxisOperation.java:297)
    at org.apache.axis2.client.OperationClient.execute(OperationClient.java:149)
    at org.apache.synapse.core.axis2.Axis2FlexibleMEPClient.send(Axis2FlexibleMEPClient.java:542)
    at org.apache.synapse.core.axis2.Axis2Sender.sendOn(Axis2Sender.java:79)
    at org.apache.synapse.core.axis2.Axis2SynapseEnvironment.send(Axis2SynapseEnvironment.java:461)
    at org.apache.synapse.endpoints.AbstractEndpoint.send(AbstractEndpoint.java:372)
    at org.apache.synapse.endpoints.AddressEndpoint.send(AddressEndpoint.java:65)
    at org.apache.synapse.mediators.builtin.SendMediator.mediate(SendMediator.java:105)
    at org.apache.synapse.mediators.AbstractListMediator.mediate(AbstractListMediator.java:81)
    at org.apache.synapse.mediators.AbstractListMediator.mediate(AbstractListMediator.java:48)
    at org.apache.synapse.mediators.base.SequenceMediator.mediate(SequenceMediator.java:149)
    at org.apache.synapse.core.axis2.ProxyServiceMessageReceiver.receive(ProxyServiceMessageReceiver.java:185)
    at org.apache.axis2.engine.AxisEngine.receive(AxisEngine.java:180)
    at org.apache.synapse.transport.passthru.ServerWorker.processEntityEnclosingRequest(ServerWorker.java:395)
    at org.apache.synapse.transport.passthru.ServerWorker.run(ServerWorker.java:142)
    at org.apache.axis2.transport.base.threads.NativeWorkerPool$1.run(NativeWorkerPool.java:172)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
    at java.lang.Thread.run(Thread.java:722)
    Caused by: com.rabbitmq.client.ShutdownSignalException: connection error
    at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
    at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
    at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:348)
    at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:221)
    at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
    ... 29 more

    ReplyDelete

Post a Comment

Popular posts from this blog

Introducing Java Reactive Extentions in to a SpringBoot Micro Service

Optimal binary search trees

Edit distance