Most usable template in WSO2 for RabbitMQ

One of the very handful features in WSO2 Enterprise Integrator for RabbitMQ, is sequence template. That functionality is not so popular as proxy services, but can save a lot of time. I often have had to do integrations with RabbitMQ, so i wanted to simplify my integrations. For this I created that useful template sequence for send messages to different exchanges with various routing key in RabbitMQ.

Let’s Start

I assume, that you have configured WSO2 Enterprise Integrator product, for sending messages to RabbitMQ. If not, you should configure accordingly to this documentation. It is good practice, to have configured like it has been described. This can be also used in newer products of WSO2 like Micro Integrator (MI). And for this product the configuration of RabbitMQ you can find in this documentation.

Problem

The main problem was that, I was struggling with very nasty looking address for RabbitMQ sender, like this:

<address uri="rabbitmq:/AMQPProducerSample?rabbitmq.server.host.name=localhost&amp;rabbitmq.server.port=5672&amp;rabbitmq.queue.name=queue&amp;rabbitmq.queue.route.key=route&amp;rabbitmq.exchange.name=exchange"/>

It’s very hard to read what is going on. What are the sender parameters like exchange or routing key which are important in sending in RabbitMQ. Where the message is sending. Second thing, is that, you always need use OUT_ONLY and FORCE_SC_ACCEPTED property for sending. Without that, you will recived warnings in your mediation. I will show below the very basic construction and usage of this template, but you can adapt to your own needs.

Solution

So I use template, named send.rabbitmq, like below:

<template name="send.rabbitmq" xmlns="http://ws.apache.org/ns/synapse">
    <parameter name="exchangeName"/>
    <parameter name="routingKey"/>
    <sequence>
        <property name="REST_URL_POSTFIX" action="remove" scope="axis2"/>
        <property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
        <property name="FORCE_SC_ACCEPTED" scope="axis2" type="STRING" value="true"/>
        <header name="To" scope="default"
            expression="concat('rabbitmq:/?rabbitmq.connection.factory=CachedRabbitMQConnectionFactory
&amp;rabbitmq.exchange.name=',$func:exchangeName,
'&amp;rabbitmq.queue.routing.key=',$func:routingKey)"
             xmlns:ns="http://org.apache.synapse/xsd"/>
        <log/>
        <send/>
    </sequence>
</template>

In first look, the simplify is not so obvious. We use XPATH concat function, to create To: header, which is used in <send/> mediator. In template I also use removing url postfix, for removing my REST API information. And also use logging to get some extra information. The important part is usage of <parameter name="exchangeName"/> and <parameter name="routingKey"/> which corresponds to the $func:exchangeName and $func:routingKey in concatenation. In the same way, you can add some other parameters.The benefits are in usage, which is simple as you see below.

<call-template target="send.rabbitmq">
    <with-param name="exchangeName" value="testExch"/>
    <with-param name="routingKey" value="testRoutingKey"/>
</call-template>

You need only call this mediator with two parameters: exchangeName and routingKey. It’s very handful, and also look nice in graphical.

WSO2 Enterprise Integrator carbon view

As you see, there is also option to use expressions for that parameters. For example from context message: $ctx:sampleProperty or other XPATH expression. This template can be used in proxyServices, APIs, and sequences mediations.

Summary

To sum up, the solution can be expanded, with many more parameters. I used only that parameters because I needed only them. You can add in concatenation other RabbitMQ sender parameters. Even you can set other hostname address, to send to other RabbitMQ instances. You can used it also in WSO2 API Manager as RabbitMQ message producer. I hope that would help You!

How to use WSO2 API Manager as RabbitMQ message producer

Introduction

In integration world, you often got some not usual scenarios. In one hand you would like have benefits witch WSO2 API Manager offers. On the other hand you look for benefits of asynchronous communication. In this scenario I show, how to configure WSO2 API Manager, to expose REST API to act as data feeder to RabittMQ.

Solution

  1. Firstly, we need to enable RabbitMQ sender in the API Manager configuration file.
    <API-M_HOME>\repository\conf\axis2\ axis2.xml
    We do this by adding the following configuration. In other words you must set your RabbitMQ IP address, user and password.
<transportSender name="rabbitmq" class="org.apache.axis2.transport.rabbitmq.RabbitMQSender"> 
    <parameter name="CachedRabbitMQConnectionFactory" locked="false">
    <parameter name="rabbitmq.server.host.name" locked="false">RabbitMQIp</parameter>
	<parameter name="rabbitmq.server.port" locked="false">5672</parameter>
	<parameter name="rabbitmq.server.user.name" locked="false">guest</parameter>
	<parameter name="rabbitmq.server.password" locked="false">guest</parameter>
    </parameter>
</transportSender>

2. Copy RabbitMQ client library files to location:
<API-M_HOME>\repository\components\lib
I was using: amqp-client-5.8.0.jar

3. After the above steps are done, start WSO2 API Manger and RabbitMQ.

4. For passing messages to RabbitMQ, we need to create mediation extension. In this extension we set asynchronous behavior. API Manager will response to client without waiting for response from backend and push message to RabbitMQ exchange. For this we need to set two properties, and use send to RabbitMQ endpoint. In the URI we set connectionstring parameters. We can also add rabbitmq.queue.delivery.mode=2, if you want to mark message as persistent in RabbitMQ. After that, the sequence should look like the following.

<sequence xmlns="http://ws.apache.org/ns/synapse" name="pass.to.rabbitmq">  
   <property name="OUT_ONLY" value="true"/>
   <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>
   <send>
     <endpoint>
       <address uri="rabbitmq:/?rabbitmq.connection.factory=CachedRabbitMQConnectionFactory&rabbitmq.queue.route.key=route&rabbitmq.exchange.name=exchange"/>
     </endpoint>
   </send>
</sequence>

5. Now we need to create an API with API Manager. I suggest to create PUT or POST HTTP verb when defining resources.

6. Next in API Manager Publisher, in Runtime Configuration, we upload Message Mediation as Custom Policies that we created in step 4. and use our created pass.to.rabbitmq sequence.

After this is done, next steps are straight forward for API Manager APIs. Publish this API to the API Store. Now you can subscribe from API Store, and use it as regular one.

Summary

Inspiration for this post was this article, which describes passing data to JMS from WSO2 API Manager.