Most usable template in WSO2 for RabbitMQ

One of the very handful features in WSO2 Enterprise Integrator for RabbitMQ, is sequence template. That functionality is not so popular as proxy services, but can save a lot of time. I often have had to do integrations with RabbitMQ, so i wanted to simplify my integrations. For this I created that useful template sequence for send messages to different exchanges with various routing key in RabbitMQ.

Let’s Start

I assume, that you have configured WSO2 Enterprise Integrator product, for sending messages to RabbitMQ. If not, you should configure accordingly to this documentation. It is good practice, to have configured like it has been described. This can be also used in newer products of WSO2 like Micro Integrator (MI). And for this product the configuration of RabbitMQ you can find in this documentation.

Problem

The main problem was that, I was struggling with very nasty looking address for RabbitMQ sender, like this:

<address uri="rabbitmq:/AMQPProducerSample?rabbitmq.server.host.name=localhost&amp;rabbitmq.server.port=5672&amp;rabbitmq.queue.name=queue&amp;rabbitmq.queue.route.key=route&amp;rabbitmq.exchange.name=exchange"/>

It’s very hard to read what is going on. What are the sender parameters like exchange or routing key which are important in sending in RabbitMQ. Where the message is sending. Second thing, is that, you always need use OUT_ONLY and FORCE_SC_ACCEPTED property for sending. Without that, you will recived warnings in your mediation. I will show below the very basic construction and usage of this template, but you can adapt to your own needs.

Solution

So I use template, named send.rabbitmq, like below:

<template name="send.rabbitmq" xmlns="http://ws.apache.org/ns/synapse">
    <parameter name="exchangeName"/>
    <parameter name="routingKey"/>
    <sequence>
        <property name="REST_URL_POSTFIX" action="remove" scope="axis2"/>
        <property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
        <property name="FORCE_SC_ACCEPTED" scope="axis2" type="STRING" value="true"/>
        <header name="To" scope="default"
            expression="concat('rabbitmq:/?rabbitmq.connection.factory=CachedRabbitMQConnectionFactory
&amp;rabbitmq.exchange.name=',$func:exchangeName,
'&amp;rabbitmq.queue.routing.key=',$func:routingKey)"
             xmlns:ns="http://org.apache.synapse/xsd"/>
        <log/>
        <send/>
    </sequence>
</template>

In first look, the simplify is not so obvious. We use XPATH concat function, to create To: header, which is used in <send/> mediator. In template I also use removing url postfix, for removing my REST API information. And also use logging to get some extra information. The important part is usage of <parameter name="exchangeName"/> and <parameter name="routingKey"/> which corresponds to the $func:exchangeName and $func:routingKey in concatenation. In the same way, you can add some other parameters.The benefits are in usage, which is simple as you see below.

<call-template target="send.rabbitmq">
    <with-param name="exchangeName" value="testExch"/>
    <with-param name="routingKey" value="testRoutingKey"/>
</call-template>

You need only call this mediator with two parameters: exchangeName and routingKey. It’s very handful, and also look nice in graphical.

WSO2 Enterprise Integrator carbon view

As you see, there is also option to use expressions for that parameters. For example from context message: $ctx:sampleProperty or other XPATH expression. This template can be used in proxyServices, APIs, and sequences mediations.

Summary

To sum up, the solution can be expanded, with many more parameters. I used only that parameters because I needed only them. You can add in concatenation other RabbitMQ sender parameters. Even you can set other hostname address, to send to other RabbitMQ instances. You can used it also in WSO2 API Manager as RabbitMQ message producer. I hope that would help You!

Reading, modifying and writing data in WSO2 Enterprise Integrator registry on an example.

Introduction

Many times we need to store data for short period of time, like access token or some time stamp. Some times we need to share data between processing messages in sequences. Or maybe we need to store some configuration information for API. That sort of problem we can resolve by use WSO2 Enterprise Integrator registry. In this post I will show you how to play with registry on two type manner.

In this post I won’t describe all ways, how to add new content / elements to registry. Adding in the old fashioned way could be done by WSO2 carbon panel, described here . Other way is by using Integration Studio and deploy in Carbon Application, which is described here.

Problem

Is how to read, modify and store some data content stored as XML file and more. First I create simple XML file configuration, with access token and its refresh time stamp. In conclusion, I point some problem while using registry approach.

Solution

I assume that we have already stored our configuration as XML file in registry. And it should looks like below:

Sample registry configuration
Sample registry configuration
<SomeConfiguration>      
    <token>urn:uuid:00000000-0000-0000-0000-000000000000</token>
    <time>1600000000000</time>
</SomeConfiguration>

Our solution is straight forward. First we store time to calculate difference between token refresh. Next we read our configuration from registry, using property mediator as sample like below:

<property name="sample" scope="default"
expression="get-property('registry', 'conf:/Testing/Sample.xml')" type="OM"/>

For that, we use: get-property(‘registry’, ‘conf:/Testing/Sample.xml’) with type=”OM”, and naming it sample. We calculate the time stamp difference, and check in filter mediator, is there need to refresh the token. If yes we get new token. In my example i just use get-property(‘Message ID’) as token for maximal simplicity. In realistic environment, you probably will call some API endpoint to receive new token. Next we want to modify our content. In my opinion, for property like XML, the easiest way is to use enrich mediator. Therefore we use it for setting new token and time stamp. After that, we store modified property, using property mediator like belowe.

<property name="conf:/Testing/Sample.xml" scope="registry"
expression="$ctx:sample" type="OM"/>

Above all, this is the secret thing. But there is one downside to doing in that way, which is covered later. Certainly most important is set proper name of property, pointing to registry configuration file: conf:/Testing/Sample.xml. Another thing is set scope to registry, and use type OM. After that we can go to registry and checkout how our Sample.xml configuration looks like. For me it looks like below.

Sample registry file after modify
Sample registry file after modify

In the same vein, you can manipulate the governance registry. As you see, we use “conf” for configuration. Similarly, to modify governance registry just use “gov“. This steps are quite easy, and also described in documentation.

Known downside

This solution has a one drawback, which i must told you. That is to say, when you will read this property from registry, the entry will be cached for 15 seconds. In short, every time you will update this entry, the old value stays in cache. Therefore, you read will read old, not new value by property mediator- you will get cached value. However i find out how to “fix” this, and got solution for you., in the form of a script mediator, listed below:

Preview in new tab

<script language="js"><![CDATA[
var regUrl = mc.getProperty('regUrl');
mc.getConfiguration()
   .getRegistry()
   .updateResource(regUrl ,mc.getProperty('sample'));
var regEntry = mc.getConfiguration()
    .getRegistry().getRegistryEntry(regUrl);
regEntry.setCachableDuration(0);
mc.getConfiguration()
   .getRegistry()
   .updateRegistryEntry(regEntry);
]]></script>

In conclusion, we need change manually cache duration for our registry entry.

Summary

To sum up, my example sequence solution, with ten minutes validity token time, looks like below.

<sequence name="sample" xmlns="http://ws.apache.org/ns/synapse">
    <property name="regUrl" scope="default" type="STRING" value="conf:/Testing/Sample.xml"/>    
    <property expression="get-property('SYSTEM_TIME')" name="actualTime"
        scope="default" type="STRING" xmlns:ns="http://org.apache.synapse/xsd"/>
    <property
        expression="get-property('registry', $ctx:regUrl)"
        name="sample" scope="default" type="OM" xmlns:ns="http://org.apache.synapse/xsd"/>
    <property expression="get-property('SYSTEM_TIME')-$ctx:sample//time"
        name="timeDiff" scope="default" type="STRING" xmlns:ns="http://org.apache.synapse/xsd"/>
    <filter xmlns:ns="http://org.apache.synapse/xsd" xpath="$ctx:timeDiff >= 600000.0 or $ctx:timeDiff='NaN'">
        <then>
            <property expression="get-property('MessageID')"
                name="someNewToken" scope="default" type="STRING"/>
            <log level="custom">
                <property expression="$ctx:someNewToken" name="NewToken"/>
            </log>
            <enrich>
                <source clone="false" type="custom" xpath="$ctx:someNewToken"/>
                <target action="replace" type="custom" xpath="$ctx:sample//token"/>
            </enrich>
            <enrich>
                <source clone="false" type="custom" xpath="$ctx:actualTime"/>
                <target action="replace" type="custom" xpath="$ctx:sample//time"/>
            </enrich>
            <script language="js"><![CDATA[var regUrl = mc.getProperty('regUrl');
mc.getConfiguration().getRegistry().updateResource(regUrl ,mc.getProperty('sample'));
var regEntry = mc.getConfiguration().getRegistry().getRegistryEntry(regUrl);
regEntry.setCachableDuration(0);
mc.getConfiguration().getRegistry().updateRegistryEntry(regEntry);]]></script>
        </then>
        <else>
            <log level="custom">
                <property expression="$ctx:sample//token" name="Token"/>
                <property
                    expression="600000.0-number($ctx:timeDiff)" name="Valid for"/>
            </log>
        </else>
    </filter>
    <enrich>
        <source clone="true" property="sample" type="property"/>
        <target action="replace" type="body"/>
    </enrich>
    <respond/>
</sequence>

In conclusion, storing the file in registry from sequence is quite easy. For read only, we can use just property mediator, but for often changes better way is to use script mediator with manually set cache duration.

How to check message context all properties in WSO2 Enterprise Integrator

Introduction

Inspiration for this post was problem I found in StackOverflow. It is about getting access to Properties set in Message Context in WSO2 Enterprise Integrator / WSO2 ESB. Of course I gave the appropriate answer. The first time I was thinking, that we can only do that by using Custom class mediator, as someone suggest. But there is a much simpler way to do that.

Solution

Some time ago i discover, that Script mediator in WSO2 EI, can run not only JavaScript code, but also code written in Java. Of course with some limitations. But it is much easier and faster way, than implement custom mediator for only that purpose. Using script mediator we cane get access to different types of Message Context, depend of what language/engine we chose. For JavaScript it is CommonScriptMessageContext, and from WSO2 EI 6.2.0 , you can additionally use Nashorn and it will be NashornJavaScriptMessageContext. In older WSO2 ESB 4.9.0 you can use only Javascript engine – but I tested that code. It will works in the same way.

<script language="js"><![CDATA[var keySet = mc.getPropertyKeySet();
   var i = 1;
   var it = keySet.iterator();
   var propertySet = "\n\r";       
   while(it.hasNext()){
     var prop = it.next();
     propertySet += (i + ": " + prop + " = " + mc.getProperty(prop)+java.lang.System.lineSeparator());
     i++;
}
mc.setProperty('propertySet',propertySet.toString());]]>
</script>
<log level="custom">
    <property expression="$ctx:propertySet" name="propertySet" xmlns:ns="http://org.apache.synapse/xsd"/>
</log>

Yes, and that is all you need to do in your mediation path. In the log, you will get printed all the properties that are stored in message context. I create simple rest API for tests:

<api xmlns="http://ws.apache.org/ns/synapse" name="ApiForTests" context="/testing/scriptmediator">
   <resource methods="POST GET">
      <inSequence>
         <property name="TestStringProperty" value="Some text in property" scope="default" type="STRING"/>
         <property name="TestOMProperty" scope="default">
            <testOm xmlns="">SomeValueInOmProperty</testOm>
         </property>
         <property name="TestFloatProperty" value="3.1415" scope="default" type="FLOAT"/>
         <script language="js">
            var keySet = mc.getPropertyKeySet();         
            var it = keySet.iterator();
            var propertySet = "\n\r";         
            var i = 1;
            while(it.hasNext()){              
               var prop = it.next();              
               propertySet +=(i + ": " + prop + " = " + mc.getProperty(prop)+java.lang.System.lineSeparator());  i++;         
            }
         </script>
         <log level="custom">
             <property expression="$ctx:propertySet" name="propertySet" xmlns:ns="http://org.apache.synapse/xsd"/>
         </log>
         <respond/>
      </inSequence>
   </resource>
</api>

With some three custom properties in mediation: TestStringProperty, TestOMProperty, TestFloatProperty and after API call, in log I got:

Output describes how script mediator works.
Output log after use Script Mediator with described code.

Extra hint

Because it is quite useful trick, I suggest you to store this code in a separate file, referenced via the Local or Remote Registry entry. After that you can easily use a key to refer to a script, which is already saved in the Registry and reuse it, as documentation describes here.

Summary

The Script mediator is very handy, and give much more than documentation describes. If you know, how to write code in Java it becomes quite a powerful tool. If you know Anther plus is that, you can apply all our ideas “on the fly”, without restarting WSO2 for reload libraries with custom class mediator.

WSO2 API Manager with message routing to different endpoints types

Introduction

The scenario depicted here is for a simple route data to endpoint, based on HTTP method and API URL. I show two aproaches. One simple using only HTTP method – PUT. And second advanced with specify URL routing. Data will be routed to different endpoint. As an example, of endpoint we use RabbitMQ message queue. How to configure endpoint for RabbitMQ I refer to this post[1]. And for more complex scenarios of routing messages I recommended to use WSO2 Enterprise Integrator instead of WSO2 API Manager.

Steps to start

To create simple routing messages in WSO2 API manager we modify in Runtime Configuratons our Message Mediation component.

As schown on image, we go to APIs -> Runtime Configurations -> Message Mediation. Before we can upload our routing mediation, we must create artifact as XML file like below. We called it rabbitmq.router.xml

Simple solution example

<?xml version="1.0" encoding="UTF-8"?><sequence xmlns="http://ws.apache.org/ns/synapse" name="rabbitmq.router" onError="fault">
  <filter source="$axis2:HTTP_METHOD" regex="PUT">
    <then>
      <property name="exchangeName" value="wso2.api" scope="default" type="STRING"/>
      <property name="routingKey" value="data.put" scope="default" type="STRING"/>
      <property name="OUT_ONLY" value="true" scope="default" type="STRING"/>
      <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2" type="STRING"/>
      <header xmlns:ns="http://org.apache.synapse/xsd" name="To" scope="default" expression="fn:concat('rabbitmq:/?rabbitmq.connection.factory=CachedRabbitMQConnectionFactory&rabbitmq.queue.delivery.mode=2&rabbitmq.exchange.name=',$ctx:exchangeName,'&rabbitmq.queue.routing.key=',$ctx:routingKey)"/>
      <send/>  
    </then>
    <else/>
  </filter>
</sequence>

In simple aproach, we use filter mediator based on $axis2:HTTP_METHOD. For PUT method we set exchangeName and routingKey. Based on this values we build header for sending to Rabbit MQ. All other http method will be passed to default endpoint, only PUT will be redirect to Rabbit MQ.

Advence solution example

In more advenced solution, we can additionaly route messages using pattern in our URI. For this example we have want to route to Rabbit MQ endpoints defined as:
/data/sample/{sampleId}/id/{id}/status
/data/complex/{complexId}/guid/{guid}

Using appropriate regex in switch mediator we filter our URI and set diffrent exchangeName and routingKey properties. Moreover we use dynamic URI elements to concatenate exchange name and routing key for more flexible usecases – filtering topics in Rabbit MQ. All other approach is the same as in simple example. This approach as XML is listed below.

<?xml version="1.0" encoding="UTF-8"?><sequence xmlns="http://ws.apache.org/ns/synapse" name="rabbitmq.router" onError="fault">
  <filter source="$axis2:HTTP_METHOD" regex="PUT">
    <then>
      <property name="exchangeName" value="null" scope="default" type="STRING"/>
      <property name="routingKey" value="null" scope="default" type="STRING"/>
      <switch source="get-property('To')">
        <case regex=".*\/data\/sample\/([^\/]*)\/id\/([^\/]*)\/status">
          <property name="routingKey" expression="concat('sample.',get-property('uri.var.sampleId'),'.id.',get-property('uri.var.id'))" scope="default" type="STRING"/>
          <property name="exchangeName" value="sample.api.data" scope="default" type="STRING"/>
        </case>
        <case regex=".*\/data\/complex\/([^\/]*)\/guid\/([^\/]*)">
          <property name="routingKey" expression="concat('complex.',get-property('uri.var.complexId'),'.id.',get-property('uri.var.guid'))" scope="default" type="STRING"/>
          <property name="exchangeName" value="complex.api.data" scope="default" type="STRING"/>
        </case>
      </switch>
      <filter xpath="$ctx:exchangeName != 'null' and $ctx:routingKey != 'null'">
        <then>
          <property name="OUT_ONLY" value="true" scope="default" type="STRING"/>
          <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2" type="STRING"/>
          <header xmlns:ns="http://org.apache.synapse/xsd" name="To" scope="default" expression="fn:concat('rabbitmq:/?rabbitmq.connection.factory=CachedRabbitMQConnectionFactory&rabbitmq.queue.delivery.mode=2&rabbitmq.exchange.name=',$ctx:exchangeName,'&rabbitmq.queue.routing.key=',$ctx:routingKey)"/>
          <send/>
        </then>
        <else/>
     </filter>
    </then>
    <else/>
  </filter>
</sequence>

Summary

In this article, I have described a fairly simple way to separate API traffic to different endpoints. It can be use for one API on front, with all benefits of WSO2 API Manager, and have sepparate backend endpoints. For example for bigger traffic. Or can be use in CQRS-style separating queres to backend and commands to queue system like Rabbit MQ.

How to permanently change or add configuration in WSO2 API Manger 3.1.0

What is the problem?

In my previous post, i show how to modify configuration in axis2.xml to set API Manager to work with RabbitMQ as endpoint. Sadly it doesn’t work on WSO2 API Manager version 3.1.0. From this version, the deployment was changed. The standard configuration is overwritten every WSO2 API Manager is started. That is because, first the configuration files are automatic generated from templates, before WSO2 API Manager starts.

What we can do?

After a small investigation, I found the solution. Because, configuration files are generating, from Jinja2 templates – files with extension .j2, we need to edit it. The simplest way is to found them and add or change what we need to change, and then save them. In WSO2 API Manager 3.1.0 the files can be found in <API-M_HOME>\repository\resources\conf\templates\. You will see, the structure of directories, in this path is familiar. You can found the axis2 template here:
<API-M_HOME>\repository\resources\conf\templates\repository\conf\axis2\axis2.xml.j2 Now you can open this file with your text editor, and permanently change what you want. Be careful, because it is not XML but j2 template.

Can we do it in better way?

Yes we can. Considering my previous post, where i was configuring RabbitMQ introduced changes in accordance with the Jinja2 template documentation. So my changes was looking like following:

{% if transport.rabbitmq.connection is defined %}
	<transportSender name="rabbitmq" class="org.apache.axis2.transport.rabbitmq.RabbitMQSender">
		<parameter name="CachedRabbitMQConnectionFactory" locked="false">
			<parameter name="rabbitmq.server.host.name" locked="false">{{transport.rabbitmq.connection.parameters.host.name}}</parameter>
			<parameter name="rabbitmq.server.port" locked="false">{{transport.rabbitmq.connection.parameters.port}}</parameter>
			<parameter name="rabbitmq.server.user.name" locked="false">{{transport.rabbitmq.connection.parameters.user.name}}</parameter>
			<parameter name="rabbitmq.server.password" locked="false">{{transport.rabbitmq.connection.parameters.password}}</parameter>
	</transportSender>
{% endif %}

It simply add the configuration node TransportSender for RabbitMQ, but it is adding dynamically. You can See the notation with double bracket – that is part of the Jinja2 template. But wher comes from this dynamicly variables? All of those templates variables, are defined in: <API-M_HOME>\repository\resources\conf\default.json. For my configuration, I need to add in json configuration rows like this:

"transport.rabbitmq.connection.parameters.host.name": "localhost",
"transport.rabbitmq.connection.parameters.port": "5672",
"transport.rabbitmq.connection.parameters.user.name": "guest",
"transport.rabbitmq.connection.parameters.password": "guest",

After that, is done, when i start my WSO2 API Manager 3.1.0, a proper axis2.xml are generated including my RabbitMQ config and works exacly what i expected.

Additional information

In addition to this product, I see, that newly released WSO2 products are also based on configuration using j2 templates, including WSO2 Micro Integrator.

How to use WSO2 API Manager as RabbitMQ message producer

Introduction

In integration world, you often got some not usual scenarios. In one hand you would like have benefits witch WSO2 API Manager offers. On the other hand you look for benefits of asynchronous communication. In this scenario I show, how to configure WSO2 API Manager, to expose REST API to act as data feeder to RabittMQ.

Solution

  1. Firstly, we need to enable RabbitMQ sender in the API Manager configuration file.
    <API-M_HOME>\repository\conf\axis2\ axis2.xml
    We do this by adding the following configuration. In other words you must set your RabbitMQ IP address, user and password.
<transportSender name="rabbitmq" class="org.apache.axis2.transport.rabbitmq.RabbitMQSender"> 
    <parameter name="CachedRabbitMQConnectionFactory" locked="false">
    <parameter name="rabbitmq.server.host.name" locked="false">RabbitMQIp</parameter>
	<parameter name="rabbitmq.server.port" locked="false">5672</parameter>
	<parameter name="rabbitmq.server.user.name" locked="false">guest</parameter>
	<parameter name="rabbitmq.server.password" locked="false">guest</parameter>
    </parameter>
</transportSender>

2. Copy RabbitMQ client library files to location:
<API-M_HOME>\repository\components\lib
I was using: amqp-client-5.8.0.jar

3. After the above steps are done, start WSO2 API Manger and RabbitMQ.

4. For passing messages to RabbitMQ, we need to create mediation extension. In this extension we set asynchronous behavior. API Manager will response to client without waiting for response from backend and push message to RabbitMQ exchange. For this we need to set two properties, and use send to RabbitMQ endpoint. In the URI we set connectionstring parameters. We can also add rabbitmq.queue.delivery.mode=2, if you want to mark message as persistent in RabbitMQ. After that, the sequence should look like the following.

<sequence xmlns="http://ws.apache.org/ns/synapse" name="pass.to.rabbitmq">  
   <property name="OUT_ONLY" value="true"/>
   <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2"/>
   <send>
     <endpoint>
       <address uri="rabbitmq:/?rabbitmq.connection.factory=CachedRabbitMQConnectionFactory&rabbitmq.queue.route.key=route&rabbitmq.exchange.name=exchange"/>
     </endpoint>
   </send>
</sequence>

5. Now we need to create an API with API Manager. I suggest to create PUT or POST HTTP verb when defining resources.

6. Next in API Manager Publisher, in Runtime Configuration, we upload Message Mediation as Custom Policies that we created in step 4. and use our created pass.to.rabbitmq sequence.

After this is done, next steps are straight forward for API Manager APIs. Publish this API to the API Store. Now you can subscribe from API Store, and use it as regular one.

Summary

Inspiration for this post was this article, which describes passing data to JMS from WSO2 API Manager.