Most usable template in WSO2 for RabbitMQ

One of the very handful features in WSO2 Enterprise Integrator for RabbitMQ, is sequence template. That functionality is not so popular as proxy services, but can save a lot of time. I often have had to do integrations with RabbitMQ, so i wanted to simplify my integrations. For this I created that useful template sequence for send messages to different exchanges with various routing key in RabbitMQ.

Let’s Start

I assume, that you have configured WSO2 Enterprise Integrator product, for sending messages to RabbitMQ. If not, you should configure accordingly to this documentation. It is good practice, to have configured like it has been described. This can be also used in newer products of WSO2 like Micro Integrator (MI). And for this product the configuration of RabbitMQ you can find in this documentation.

Problem

The main problem was that, I was struggling with very nasty looking address for RabbitMQ sender, like this:

<address uri="rabbitmq:/AMQPProducerSample?rabbitmq.server.host.name=localhost&amp;rabbitmq.server.port=5672&amp;rabbitmq.queue.name=queue&amp;rabbitmq.queue.route.key=route&amp;rabbitmq.exchange.name=exchange"/>

It’s very hard to read what is going on. What are the sender parameters like exchange or routing key which are important in sending in RabbitMQ. Where the message is sending. Second thing, is that, you always need use OUT_ONLY and FORCE_SC_ACCEPTED property for sending. Without that, you will recived warnings in your mediation. I will show below the very basic construction and usage of this template, but you can adapt to your own needs.

Solution

So I use template, named send.rabbitmq, like below:

<template name="send.rabbitmq" xmlns="http://ws.apache.org/ns/synapse">
    <parameter name="exchangeName"/>
    <parameter name="routingKey"/>
    <sequence>
        <property name="REST_URL_POSTFIX" action="remove" scope="axis2"/>
        <property name="OUT_ONLY" scope="default" type="STRING" value="true"/>
        <property name="FORCE_SC_ACCEPTED" scope="axis2" type="STRING" value="true"/>
        <header name="To" scope="default"
            expression="concat('rabbitmq:/?rabbitmq.connection.factory=CachedRabbitMQConnectionFactory
&amp;rabbitmq.exchange.name=',$func:exchangeName,
'&amp;rabbitmq.queue.routing.key=',$func:routingKey)"
             xmlns:ns="http://org.apache.synapse/xsd"/>
        <log/>
        <send/>
    </sequence>
</template>

In first look, the simplify is not so obvious. We use XPATH concat function, to create To: header, which is used in <send/> mediator. In template I also use removing url postfix, for removing my REST API information. And also use logging to get some extra information. The important part is usage of <parameter name="exchangeName"/> and <parameter name="routingKey"/> which corresponds to the $func:exchangeName and $func:routingKey in concatenation. In the same way, you can add some other parameters.The benefits are in usage, which is simple as you see below.

<call-template target="send.rabbitmq">
    <with-param name="exchangeName" value="testExch"/>
    <with-param name="routingKey" value="testRoutingKey"/>
</call-template>

You need only call this mediator with two parameters: exchangeName and routingKey. It’s very handful, and also look nice in graphical.

WSO2 Enterprise Integrator carbon view

As you see, there is also option to use expressions for that parameters. For example from context message: $ctx:sampleProperty or other XPATH expression. This template can be used in proxyServices, APIs, and sequences mediations.

Summary

To sum up, the solution can be expanded, with many more parameters. I used only that parameters because I needed only them. You can add in concatenation other RabbitMQ sender parameters. Even you can set other hostname address, to send to other RabbitMQ instances. You can used it also in WSO2 API Manager as RabbitMQ message producer. I hope that would help You!

WSO2 API Manager with message routing to different endpoints types

Introduction

The scenario depicted here is for a simple route data to endpoint, based on HTTP method and API URL. I show two aproaches. One simple using only HTTP method – PUT. And second advanced with specify URL routing. Data will be routed to different endpoint. As an example, of endpoint we use RabbitMQ message queue. How to configure endpoint for RabbitMQ I refer to this post[1]. And for more complex scenarios of routing messages I recommended to use WSO2 Enterprise Integrator instead of WSO2 API Manager.

Steps to start

To create simple routing messages in WSO2 API manager we modify in Runtime Configuratons our Message Mediation component.

As schown on image, we go to APIs -> Runtime Configurations -> Message Mediation. Before we can upload our routing mediation, we must create artifact as XML file like below. We called it rabbitmq.router.xml

Simple solution example

<?xml version="1.0" encoding="UTF-8"?><sequence xmlns="http://ws.apache.org/ns/synapse" name="rabbitmq.router" onError="fault">
  <filter source="$axis2:HTTP_METHOD" regex="PUT">
    <then>
      <property name="exchangeName" value="wso2.api" scope="default" type="STRING"/>
      <property name="routingKey" value="data.put" scope="default" type="STRING"/>
      <property name="OUT_ONLY" value="true" scope="default" type="STRING"/>
      <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2" type="STRING"/>
      <header xmlns:ns="http://org.apache.synapse/xsd" name="To" scope="default" expression="fn:concat('rabbitmq:/?rabbitmq.connection.factory=CachedRabbitMQConnectionFactory&rabbitmq.queue.delivery.mode=2&rabbitmq.exchange.name=',$ctx:exchangeName,'&rabbitmq.queue.routing.key=',$ctx:routingKey)"/>
      <send/>  
    </then>
    <else/>
  </filter>
</sequence>

In simple aproach, we use filter mediator based on $axis2:HTTP_METHOD. For PUT method we set exchangeName and routingKey. Based on this values we build header for sending to Rabbit MQ. All other http method will be passed to default endpoint, only PUT will be redirect to Rabbit MQ.

Advence solution example

In more advenced solution, we can additionaly route messages using pattern in our URI. For this example we have want to route to Rabbit MQ endpoints defined as:
/data/sample/{sampleId}/id/{id}/status
/data/complex/{complexId}/guid/{guid}

Using appropriate regex in switch mediator we filter our URI and set diffrent exchangeName and routingKey properties. Moreover we use dynamic URI elements to concatenate exchange name and routing key for more flexible usecases – filtering topics in Rabbit MQ. All other approach is the same as in simple example. This approach as XML is listed below.

<?xml version="1.0" encoding="UTF-8"?><sequence xmlns="http://ws.apache.org/ns/synapse" name="rabbitmq.router" onError="fault">
  <filter source="$axis2:HTTP_METHOD" regex="PUT">
    <then>
      <property name="exchangeName" value="null" scope="default" type="STRING"/>
      <property name="routingKey" value="null" scope="default" type="STRING"/>
      <switch source="get-property('To')">
        <case regex=".*\/data\/sample\/([^\/]*)\/id\/([^\/]*)\/status">
          <property name="routingKey" expression="concat('sample.',get-property('uri.var.sampleId'),'.id.',get-property('uri.var.id'))" scope="default" type="STRING"/>
          <property name="exchangeName" value="sample.api.data" scope="default" type="STRING"/>
        </case>
        <case regex=".*\/data\/complex\/([^\/]*)\/guid\/([^\/]*)">
          <property name="routingKey" expression="concat('complex.',get-property('uri.var.complexId'),'.id.',get-property('uri.var.guid'))" scope="default" type="STRING"/>
          <property name="exchangeName" value="complex.api.data" scope="default" type="STRING"/>
        </case>
      </switch>
      <filter xpath="$ctx:exchangeName != 'null' and $ctx:routingKey != 'null'">
        <then>
          <property name="OUT_ONLY" value="true" scope="default" type="STRING"/>
          <property name="FORCE_SC_ACCEPTED" value="true" scope="axis2" type="STRING"/>
          <header xmlns:ns="http://org.apache.synapse/xsd" name="To" scope="default" expression="fn:concat('rabbitmq:/?rabbitmq.connection.factory=CachedRabbitMQConnectionFactory&rabbitmq.queue.delivery.mode=2&rabbitmq.exchange.name=',$ctx:exchangeName,'&rabbitmq.queue.routing.key=',$ctx:routingKey)"/>
          <send/>
        </then>
        <else/>
     </filter>
    </then>
    <else/>
  </filter>
</sequence>

Summary

In this article, I have described a fairly simple way to separate API traffic to different endpoints. It can be use for one API on front, with all benefits of WSO2 API Manager, and have sepparate backend endpoints. For example for bigger traffic. Or can be use in CQRS-style separating queres to backend and commands to queue system like Rabbit MQ.