Most usable template in WSO2 for RabbitMQ

One of the very handful features in WSO2 Enterprise Integrator for RabbitMQ, is sequence template. That functionality is not so popular as proxy services, but can save a lot of time. I often have had to do integrations with RabbitMQ, so i wanted to simplify my integrations. For this I created that useful template sequence for send messages to different exchanges with various routing key in RabbitMQ.

Let’s Start

I assume, that you have configured WSO2 Enterprise Integrator product, for sending messages to RabbitMQ. If not, you should configure accordingly to this documentation. It is good practice, to have configured like it has been described. This can be also used in newer products of WSO2 like Micro Integrator (MI). And for this product the configuration of RabbitMQ you can find in this documentation.

Problem

The main problem was that, I was struggling with very nasty looking address for RabbitMQ sender, like this:

<address uri="rabbitmq:/AMQPProducerSample?rabbitmq.server.host.name=localhost&amp;rabbitmq.server.port=5672&amp;rabbitmq.queue.name=queue&amp;rabbitmq.queue.route.key=route&amp;rabbitmq.exchange.name=exchange"/>

It’s very hard to read what is going on. What are the sender parameters like exchange or routing key which are important in sending in RabbitMQ. Where the message is sending. Second thing, is that, you always need use OUT_ONLY and FORCE_SC_ACCEPTED property for sending. Without that, you will recived warnings in your mediation. I will show below the very basic construction and usage of this template, but you can adapt to your own needs.

Solution

So I use template, named send.rabbitmq, like below:

<template name="send.rabbitmq" xmlns="http://ws.apache.org/ns/synapse">
    <parameter name="exchangeName"/>
    <parameter name="routingKey"/>
    <sequence>
        <property name="REST_URL_POSTFIX" action="remove" scope="axis2"/>
        <property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
        <property name="FORCE_SC_ACCEPTED" scope="axis2" type="STRING" value="true"/>
        <header name="To" scope="default"
            expression="concat('rabbitmq:/?rabbitmq.connection.factory=CachedRabbitMQConnectionFactory
&amp;rabbitmq.exchange.name=',$func:exchangeName,
'&amp;rabbitmq.queue.routing.key=',$func:routingKey)"
             xmlns:ns="http://org.apache.synapse/xsd"/>
        <log/>
        <send/>
    </sequence>
</template>

In first look, the simplify is not so obvious. We use XPATH concat function, to create To: header, which is used in <send/> mediator. In template I also use removing url postfix, for removing my REST API information. And also use logging to get some extra information. The important part is usage of <parameter name="exchangeName"/> and <parameter name="routingKey"/> which corresponds to the $func:exchangeName and $func:routingKey in concatenation. In the same way, you can add some other parameters.The benefits are in usage, which is simple as you see below.

<call-template target="send.rabbitmq">
    <with-param name="exchangeName" value="testExch"/>
    <with-param name="routingKey" value="testRoutingKey"/>
</call-template>

You need only call this mediator with two parameters: exchangeName and routingKey. It’s very handful, and also look nice in graphical.

WSO2 Enterprise Integrator carbon view

As you see, there is also option to use expressions for that parameters. For example from context message: $ctx:sampleProperty or other XPATH expression. This template can be used in proxyServices, APIs, and sequences mediations.

Summary

To sum up, the solution can be expanded, with many more parameters. I used only that parameters because I needed only them. You can add in concatenation other RabbitMQ sender parameters. Even you can set other hostname address, to send to other RabbitMQ instances. You can used it also in WSO2 API Manager as RabbitMQ message producer. I hope that would help You!

Reading, modifying and writing data in WSO2 Enterprise Integrator registry on an example.

Introduction

Many times we need to store data for short period of time, like access token or some time stamp. Some times we need to share data between processing messages in sequences. Or maybe we need to store some configuration information for API. That sort of problem we can resolve by use WSO2 Enterprise Integrator registry. In this post I will show you how to play with registry on two type manner.

In this post I won’t describe all ways, how to add new content / elements to registry. Adding in the old fashioned way could be done by WSO2 carbon panel, described here . Other way is by using Integration Studio and deploy in Carbon Application, which is described here.

Problem

Is how to read, modify and store some data content stored as XML file and more. First I create simple XML file configuration, with access token and its refresh time stamp. In conclusion, I point some problem while using registry approach.

Solution

I assume that we have already stored our configuration as XML file in registry. And it should looks like below:

Sample registry configuration
Sample registry configuration
<SomeConfiguration>      
    <token>urn:uuid:00000000-0000-0000-0000-000000000000</token>
    <time>1600000000000</time>
</SomeConfiguration>

Our solution is straight forward. First we store time to calculate difference between token refresh. Next we read our configuration from registry, using property mediator as sample like below:

<property name="sample" scope="default"
expression="get-property('registry', 'conf:/Testing/Sample.xml')" type="OM"/>

For that, we use: get-property(‘registry’, ‘conf:/Testing/Sample.xml’) with type=”OM”, and naming it sample. We calculate the time stamp difference, and check in filter mediator, is there need to refresh the token. If yes we get new token. In my example i just use get-property(‘Message ID’) as token for maximal simplicity. In realistic environment, you probably will call some API endpoint to receive new token. Next we want to modify our content. In my opinion, for property like XML, the easiest way is to use enrich mediator. Therefore we use it for setting new token and time stamp. After that, we store modified property, using property mediator like belowe.

<property name="conf:/Testing/Sample.xml" scope="registry"
expression="$ctx:sample" type="OM"/>

Above all, this is the secret thing. But there is one downside to doing in that way, which is covered later. Certainly most important is set proper name of property, pointing to registry configuration file: conf:/Testing/Sample.xml. Another thing is set scope to registry, and use type OM. After that we can go to registry and checkout how our Sample.xml configuration looks like. For me it looks like below.

Sample registry file after modify
Sample registry file after modify

In the same vein, you can manipulate the governance registry. As you see, we use “conf” for configuration. Similarly, to modify governance registry just use “gov“. This steps are quite easy, and also described in documentation.

Known downside

This solution has a one drawback, which i must told you. That is to say, when you will read this property from registry, the entry will be cached for 15 seconds. In short, every time you will update this entry, the old value stays in cache. Therefore, you read will read old, not new value by property mediator- you will get cached value. However i find out how to “fix” this, and got solution for you., in the form of a script mediator, listed below:

Preview in new tab

<script language="js"><![CDATA[
var regUrl = mc.getProperty('regUrl');
mc.getConfiguration()
   .getRegistry()
   .updateResource(regUrl ,mc.getProperty('sample'));
var regEntry = mc.getConfiguration()
    .getRegistry().getRegistryEntry(regUrl);
regEntry.setCachableDuration(0);
mc.getConfiguration()
   .getRegistry()
   .updateRegistryEntry(regEntry);
]]></script>

In conclusion, we need change manually cache duration for our registry entry.

Summary

To sum up, my example sequence solution, with ten minutes validity token time, looks like below.

<sequence name="sample" xmlns="http://ws.apache.org/ns/synapse">
    <property name="regUrl" scope="default" type="STRING" value="conf:/Testing/Sample.xml"/>    
    <property expression="get-property('SYSTEM_TIME')" name="actualTime"
        scope="default" type="STRING" xmlns:ns="http://org.apache.synapse/xsd"/>
    <property
        expression="get-property('registry', $ctx:regUrl)"
        name="sample" scope="default" type="OM" xmlns:ns="http://org.apache.synapse/xsd"/>
    <property expression="get-property('SYSTEM_TIME')-$ctx:sample//time"
        name="timeDiff" scope="default" type="STRING" xmlns:ns="http://org.apache.synapse/xsd"/>
    <filter xmlns:ns="http://org.apache.synapse/xsd" xpath="$ctx:timeDiff >= 600000.0 or $ctx:timeDiff='NaN'">
        <then>
            <property expression="get-property('MessageID')"
                name="someNewToken" scope="default" type="STRING"/>
            <log level="custom">
                <property expression="$ctx:someNewToken" name="NewToken"/>
            </log>
            <enrich>
                <source clone="false" type="custom" xpath="$ctx:someNewToken"/>
                <target action="replace" type="custom" xpath="$ctx:sample//token"/>
            </enrich>
            <enrich>
                <source clone="false" type="custom" xpath="$ctx:actualTime"/>
                <target action="replace" type="custom" xpath="$ctx:sample//time"/>
            </enrich>
            <script language="js"><![CDATA[var regUrl = mc.getProperty('regUrl');
mc.getConfiguration().getRegistry().updateResource(regUrl ,mc.getProperty('sample'));
var regEntry = mc.getConfiguration().getRegistry().getRegistryEntry(regUrl);
regEntry.setCachableDuration(0);
mc.getConfiguration().getRegistry().updateRegistryEntry(regEntry);]]></script>
        </then>
        <else>
            <log level="custom">
                <property expression="$ctx:sample//token" name="Token"/>
                <property
                    expression="600000.0-number($ctx:timeDiff)" name="Valid for"/>
            </log>
        </else>
    </filter>
    <enrich>
        <source clone="true" property="sample" type="property"/>
        <target action="replace" type="body"/>
    </enrich>
    <respond/>
</sequence>

In conclusion, storing the file in registry from sequence is quite easy. For read only, we can use just property mediator, but for often changes better way is to use script mediator with manually set cache duration.