On Eloquent Conversations - Part II

Raju Gandhi
  • Augsut 2011
  • Java
  • Web Services

In the first installment of this series, we discussed the need for integration, and some of the potential pitfalls, especially when attempting to roll your own integration system. We then proceeded to discuss some of the patterns in Gregor Hohpe's and Bobby Woolf's aptly named

Enterprise Integration Patterns and their corresponding implementations in Spring Integration. We discussed the core patterns that make up the founding blocks of Spring Integration - "Message Channel", "Message" and "Message Endpoint". In this article we will explore a few more patterns that will allow you to route, filter and manipulate messages as well as talk to external systems. We will learn how to do this while leveraging Spring's declarative model that let's you focus on your domain, and let Spring Integration handle the specifics of messaging.

Last month we discussed three core patterns - the "Message Channel" that serves as the "pipes" of the "pipes-and-filters" analogy. They serve as the primary conduit for messages themselves. The "Message"s themselves happen to be a simple wrapper around a set of headers, and a body. Finally, the "Message Endpoint", which is simply a component that _does_ something with a message - consume it, re-route it or perhaps transform it.

So there is our 2-minute recap of the first installment. In this installment we will focus on the different kinds of endpoints that are available to you. Ready? Here we go ...

Playing the Field

We understand the core constructs that make up Spring Integration (hereon referred to as SI). Now let's talk about a specific type of end-point - the router. Invariably, as messages are sent down the integration infrastructure, you may need to route them to one or more destinations (or even other channels) based on certain criteria. For example, a pizza ordering customer may choose to pay by credit card, cash or in the age of digital currency, using a BitCoin A message published with the payment details will then need to be appropriately routed to either notify a third-party payment system, or to the final receipt payment system to advise the delivery personnel to collect cash in-person.

EIP defines a very generic pattern, the "Message Router", as well as a specialized pattern, called the "Content-Based Router". My initial reaction to reading about routing within EIP was - why do I need it at all? After all, you could always have a separate channel for different kinds of messages and leave the responsibility of deciding which channel to issue the message on to the application itself! Unfortunately, this was short-sighted. The number of channels is now directly proportional to how many kinds of messages that flow through the system. Furthermore, if a consumer can process more than one kind of message, then it now needs to listen to more than one channel! Think about what would need to change if you were to introduce a new variant of a message?

It dawned on me that the principle underlying these patterns is simple - to decouple the application that publishes the message from the logic that dictates it's destination. Now, an application can simply publish a message to one channel, and leave the job of figuring out where the message is headed to the router. EIP defines a Message router as a component that consumes a message off a channel, and republishes it to another channel based on a certain set of criteria. These criteria could be the contents of the message itself, or an external factor such as re-routing messages based on a failover or load-balancing strategy. The Content-Based router, on the other hand, accomplishes exactly the same thing, except that by definition, it is allowed to only inspect the contents of the message itself, be that the headers, or the body (vs. interrogating, say the context to determine the destination channel). By definition, a router has one incoming channel, and one or more outgoing channels.

Within SI, the grand-daddy of all routers is the `AbstractMessageRouter`. There are a couple of important methods defined in this contract - one is `java.util.List<java.lang.Object> getChannelIdentifiers(Message<?> message)` which attempts to resolve which channel the router should publish to based on the message that it was supplied. Note that a router can identify a channel using it's unique name as supplied in the application-context, or actually return a `MessageChannel`, or a combination of both (in case you need to route the channel to multiple output channels). The second method to note is `void setDefaultOutputChannel(MessageChannel defaultOutputChannel)` which, as the name suggests, defines the output if an appropriate channel cannot be resolved.

SI ships with several implementations that you can use out of the box. The `HeaderValueRouter`, true to it's name, attempts to map the output channel based on a certain value in the header of the message. You need to supply the key that the router is to inspect at construction time, and the router will resolve the channel name based on that key. Another useful router is the `PayloadTypeRouter`, which, as you might have already guessed looks at the type (that is the Java type) of the message to decide the output channel. Of course, you can always define your own router to map messages to output channels (See Listing GAN-1).

Let's see how we can use routers within our application. In Listing GAN-1 we define two routers - a `HeaderValueRouter` that inspects the header of the message and decides which channel to forward the message to based on the value of the 'PAYMENT_TYPE' key. We then define a router implementation of our own that invokes a method on the message itself, and uses that to decide which channel it goes to next.

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:beans="http://www.springframework.org/schema/beans"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration-2.0.xsd">
          
  <channel id="payments" />
  <channel id="creditSystemPayments" />
  <channel id="notifyPrintReceipts" />
          
  <header-value-router input-channel="payments" 
    header-name="PAYMENT_TYPE">
    <mapping value="credit" channel="creditSystemPayments" />
    <mapping value="cash" channel="notifyPrintReceipts" />
  </header-value-router>
          
  <channel id="visa" />
  <channel id="masterCard" />
  <channel id="amex" />
          
  <router input-channel="creditSystemPayments" ref="cardSystemRouter" 
    method="resolveCreditCardChannel" />
          
  <beans:bean id="cardSystemRouter" 
    class="com.looselytyped.CreditCardSytemRouter" />
</beans:beans>

Here is the implementation of the router itself.

public class CreditCardSytemRouter {
  public String resolveCreditCardChannel(OrderItem orderItem) {
    // ask the payload to determine where it ought to be sent
    if (orderItem.getCardType().equals("VISA")) return "visa";
    if (orderItem.getCardType().equals("MASTER_CARD")) 
      return "masterCard";
    if (orderItem.getCardType().equals("AMEX")) return "amex";
  }
}

As you can see from Listing GAN-1, the `CreditCardSytemRouter` receives an `OrderItem` and returns a `String` name that matches the name of a channel - that will be the channel the message will be forwarded to.

Zoning Out

There are times when, rather than forwarding messages to a channel, you would rather drop them from the system altogether based on a certain check. EIP defines a pattern for this - the "Message Filter". When you think about it, the Message filter is merely a specialization of the message router pattern - except in this case, you have only one output channel to which select messages get published, and discarded messages are sent down a sink.

SI offers an implementation in the form of a `MessageFilter`. This class does not piggy-back off of the `AbstractMessageRouter` but rather stands on its own. The interesting aspect of this implementation is not the `MessageFilter` class itself, but rather the `MessageSelector` contract, one whose implementation you have to provide when constructing the `MessageFilter`. The `MessageSelector` is a simple interface, with one method `boolean accept(Message<?> message)`. Perhaps you can see why SI might have implemented this pattern on its own - it's very clear as to what this does. The `accept` method is very explicit - based on whether it returns a true or a false, the filter will either forward the message, or discard it. Let's see how we can go about declaring a message filter (Listing GAN-2).

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:beans="http://www.springframework.org/schema/beans"
  xmlns:stream="http://www.springframework.org/schema/integration/stream"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/stream
    http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">
          
  <channel id="cardNumbers" />
  <channel id="validCreditCards" />
          
  <filter input-channel="cardNumbers" 
    output-channel="validCreditCards" ref="selector" />
          
  <beans:bean id="selector" 
    class="com.looselytyped.ValidCreditCardSelector" />
</beans:beans>

Here is the message selector itself.

public class ValidCreditCardSelector implements MessageSelector {
  public boolean accept(Message message) {
    Order order = (Order) message.getPayload();
    return order.getCreditCardNumber().startsWith("1234");
  }
}

In Listing GAN-2 we define an implementation of the `MessageSelector` that checks to see if the card number in the `Order` starts with a specific string, or the filter discards the message. We then inject this onto the filter as its `selector` attribute in the application context.

Now, perhaps dropping messages with bad credit card numbers is not what you want (perhaps you would rather have some notification to the customer that they might have fat-fingered their credit card number) or, in some cases, you want to throw an exception if you get a bad message. No worries - the `filter` declaration in the configuration file can accept two other optional attributes - `throw-exception-on-rejection` and a `discard-channel`. The `discard-channel` is by default set to `nullChannel` but you can set this to process a notification system of sorts.

Finding Meaning

Invariably, systems may work with data formats that need to be translated, or transformed into a canonical format, or export data in a format that another application expects. Naturally, putting the conversion logic with the application in and of itself implies a certain amount of coupling with other applications, and can also be rather invasive to the core logic of the application. In this regard, EIP defines a pattern - the "Message Translator". The Message Translator, like the routers is just another filter (from the "pipes-and-filters" analogy) that consumes a message, and outputs a message in a different format altogether. Translation (or transformation) is a very useful construct. You may use transformation so that two distinct applications can talk to one another or perhaps even leverage different "transports" - perhaps you wish you send a message down HTTP without losing fidelity. Regardless, the premise of transformation is simple - given an input channel with incoming messages, apply a set of translation rules, and send out an equivalent, yet differently formatted message.

SI defines a high level contract, the `Transformer` interface. Not surprisingly, this is a simple interface with one method - `Message transform(Message message)` - given a message of a particular type, return a message of a different type. SI comes with batteries included and a full range of implementations for you to pick from. You have a `ObjectToStringTransformer` which merely invokes the `toString` method on an incoming payload, the `PayloadSerializingTransformer` (and its cousin the `PayloadDeserializingTransformer`) that remain true to their name. Need JSON, simply use the `ObjectToJsonTransformer`. SI can also deep-serialize an object to a `java.util.Map` using the `ObjectToMapTransformer` and `MapToObjectTransformer`.

Of course you can always make your own transformer. All you need to declare is the bean class and method that will perform the translation and you are done! Let's say you had an application, or a script that published a phone number in the format 614-555-1234 and you wanted to transform that into the domain object, `PhoneNumber`. The application context will look like the code in Listing GAN-3.

<beans:beans xmlns="http://www.springframework.org/schema/integration"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:beans="http://www.springframework.org/schema/beans"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration-2.0.xsd">
          
  <channel id="inputChannel" />
  <channel id="phoneChannel" />
          
  <transformer input-channel="inputChannel" ref="transformer" 
    method="transform" output-channel="phoneChannel" />
          
  <beans:bean id="transformer" 
    class="com.looselytyped.TextToPhoneTransformer" />
</beans:beans>

Here is the code for the transformer

public class TextToPhoneTransformer {
          
  public PhoneNumber transform(String textPhone) {
    StringTokenizer st = new StringTokenizer(textPhone, "-");
    final String extCode = st.nextToken();
    final String num1 = st.nextToken();
    final String num2 = st.nextToken();
    return new PhoneNumber(extCode, num1, num2);
  }
}

Pretty simple huh?

Transformation is a rather broad, and extremely useful topic. EIP endorses several other patterns for transformation, including `Content Enricher` and `Claim Check`. Although I am not going to demonstrate the implementation of these within SI, I would like to point out a subtle, yet important difference between the `Message Translator` pattern and the `Content Enricher`. EIP defines a `Content Enricher` as a component that allows two systems to communicate although the data generated by the sending application may be insufficient or incomplete as far as the consumer is concerned. For example, a Web application may generate a message to update the address information for a customer with `customer_id` 1. But the system that updates the customer information needs a full-blown `Customer` domain object! Note that this is no longer a simple transformation (although off-the-cuff it does seem to be). In this case, we need an entity that can use the `customer_id`, reach into a data-store, and map the information to a domain object. This is the role of the `Content Enricher`.

Summarizing, a `Message Transformer` needs no external help to transform a message. It uses only the data available in the message, but the output format would be different. On the other hand, a `Content Enricher` uses the the data contained in the message and some external mechanism - a calculation, a data-store, to create a new message that is apt for consumption by another system. It may so even happen that none of the data supplied to a `Content Enricher` makes it in the final message - consider the example of applying an algorithm or a calculation to the contents of the message. SI does ship with a few `Content Enrichers` such as the `HeaderEnricher`, but the primary component for transformation (be that simple transformation or enrichment) still remains the purview of the transformer component.

Reaching Out

So far, we have discussed quite a few patterns as they are described in EIP, and their corresponding implementations within SI. The common thread running through all of these is that they are useful for inter-app communication. But integration often involves applications reaching out to external systems, such as files on disk, HTTP (or HTTPS), FTP, email servers, what-have-you. As we discussed in the first installment of this series, these, along with the other components that we just discussed are all "endpoints" - that is essentially a component that _does_ something with a message. In this section, let us look and see how we can leverage some of the external adapters that SI ships with. These adapters let you reach and access external systems using a variety of different protocols.

Let us look at HTTP integration. Suppose your application needs to communicate with another application using HTTP, be that `GET`ing some data or `POST`ing to a URL. SI provides a rich set of HTTP adapters that leverage the HTTP protocol as their transport that provide inbound and outbound support for your applications.

In Listing GAN-4 we will discuss HTTP integration with a REST-ful service (Yahoo's Weather API), but note that SI comes with adapters to integrate with SOAP and POX (Plain-Old-XML) endpoints. Here we are sending a Java `String` payload (that holds the WOEID concept of the region we are interested in) down the `inputChannel`. The HTTP `outbound-gateway` is configured with a URL and a placeholder (cleverly named `woeid`). We are also using SpEL (Spring Expression Language) to set the `location` to the value of the payload, which in our example is the zipcode itself. Finally, we have a `service-activator` that gets the output of the HTTP `GET` and simply prints it to the console.

A gateway within SI is a component that allows for bi-directional communication with an external system. The role of the gateway is to take a message that is supplied to its input channel, wrap it or format it in a structure that the external system requires and send it. A gateway can then take the response and construct an SI message for processing and publish it on its outbound channel (footnote: SI also comes with "channel adapters" - these, much like gateways, are a means for your application to talk to external systems, except they are unidirectional).

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-http="http://www.springframework.org/schema/integration/http"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
    http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration-2.0.xsd
    http://www.springframework.org/schema/integration/http
    http://www.springframework.org/schema/integration/http/spring-integration-http-2.0.xsd">
          
  <int:channel id="inputChannel" />
          
  <int-http:outbound-gateway request-channel="inputChannel"
    url="http://www.google.com/ig/api?weather={woeid}"
    http-method="GET"
    expected-response-type="java.lang.String" 
    reply-channel="weatherChannel">
    <int-http:uri-variable name="woeid" expression="payload" />
  </int-http:outbound-gateway>
          
  <int:service-activator input-channel="weatherChannel" ref="handler" 
    method="handleMessage" />
          
  <bean id="handler" class="com.looselytyped.DefaultMessageHandler"/>
</beans>

This is the class that kicks things off -

public class Demo {
          
  public static void main(String[] args) {
    AbstractApplicationContext context = 
      new ClassPathXmlApplicationContext(
        "/META-INF/spring/integration/demo.xml", Demo.class);
          
    MessageChannel inputChannel = 
      context.getBean("inputChannel", MessageChannel.class);
          
    MessagingTemplate template = new MessagingTemplate();
    // Use the Message Builder factory to wrap a simple String 
    // as our message payload
    // Use the WOEID of Palo Alto
    Message message = MessageBuilder.withPayload("12797282").build();
    template.send(inputChannel, message);
  }
}

And something to log the output

public class DefaultMessageHandler {
  private Log log = LogFactory.getLog(getClass());
          
  public void handleMessage(Message message) {
    System.out.println(message);
  }
}

You should see an XML payload come through at the console when you run the main method in Listing GAN-4.

The gateway takes the payload, appropriately constructs an HTTP request, and then receives the response and unwraps it to an SI message and sends it down its response channel. All of this is completely transparent to your application! This is true for all endpoints within SI - channel adapters and message gateways.

Allow me to point out a few things here. First, notice the `int-http` namespace declaration in the application context file, as well us the additional xml schema hints that we provide (namely `http://www.springframework.org/schema/integration/http` and `http://www.springframework.org/schema/integration/http/spring-integration-http-2.0.xsd`).

Another thing of note regarding your classpath - for all the examples that I demonstrated so far, you simply need the SI core JAR file and you would be up and running. In order for you to run the code in Listing GAN-4 you will need the `spring-integration-http` artifact, which you can declare in your POM file as in Listing GAN-5.

<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-http</artifactId>
  <version>2.0.4.RELEASE</version>
</dependency>

Furthermore, this will pull in `spring-integration-core` as a transitive dependency, so you don't need to declare both in your POM file.

Bidding Farewell

Phew! That was quite a whirlwind tour of EIP and Spring Integration. EIP provides a rich and comprehensive vocabulary for integration. Much like the nomenclature that the GoF book injected into our daily communication, EIP can provide a consistent and clear means of expressing our intents and ideas about integration. SI embraces Spring's non-invasive POJO-oriented mantra, allowing us to incrementally adopt to it, and providing additional benefits such as easy testability.

SI is a well thought-out, extensive implementation of the EIP patterns. It attempts, for the most part, to adhere to the philosophy and spirit of EIP. Where it does veer off, it does so for good reason. We have only scratched the surface of SI in this series, but I certainly hope I have convinced you to look further into SI, if not adopt it for your integration needs.

Your education on the fine art of conversing only begins here. I wish you many a pleasant dialogues in the future :)

Share