On Eloquent Conversations - Part I

Raju Gandhi
  • July 2011
  • Java
  • Web Services

It goes without saying that an enterprise consists of many moving parts, with multiple applications that serve to support different business processes. These applications rarely live in a silo, and consequently need to be integrated to allow for reliable and in some cases, secure data transfer. In this two-part article series, we will discuss some of the hurdles to integration and some possible approaches.

We will then turn our attention mainly to messaging. We will also take a look at Spring Integration, a library from Spring Source that lets us integrate our applications in a unobtrusive and declarative manner.

The enterprise today is a complex beast. An enterprise can consist of tens, if not hundreds of applications, ranging from custom scripts to internal systems to full-blown web applications that may be customer facing. Often, these applications need to work in cohesion, sharing data and notifying each other of events that have occurred. Maintaining synchronicity, and ensuring that these are all on the same page is an important aspect, because invariably these disparate applications working together are seen by a customer as one single transaction.

Consider the example of a pizza shop. A customer goes online to place an order, opts for delivery or take-out, and then sits back hoping to bite into their favorite selection of toppings within a certain time-frame. Let's look at the sequence of events - a payment processing application receives the user's credit card number and attempts to charge them for their order. Another application receives the order and forwards the same to a franchise located near the customer's supplied address. Once the order is ready to go, it either needs to go on a "wait" pile if the customer opted for a pick-up, or handed off to a delivery person with the right address. Perhaps the customer can track the progress of their order on an online tracker. Naturally this application needs to know of the state of the pie on an on-going and timely basis.

Regardless of how many applications are involved in the scenario described above, as far as your paying customer is concerned, they all represent one transaction! If any one or more applications fail to play their role, and fail to notify subsequent (or concurrently running applications) then you are going to have one really aggravated customer, or a really satisfied one (assuming you fail to process the credit card transaction but deliver the pizza regardless). Either way, this pizza shop is facing a very early, and untimely demise.

Introduction

Enterprise integration is a hard problem to solve. You may have to deal with different technology stacks, including custom applications written in-house, legacy systems of a by-gone era, as well as off-the-shelf applications where the Not Invented Here (NIH) syndrome was (in)appropriately averted. Some of these applications may not lend themselves to intrusive code enhancements. Even if you were to implement an integration solution, you have to ensure that changing one application does not introduce a ripple effect on all other applications that are integrated with it. The integration solution that you implement also needs to scale, as more and more applications come "online".

As developers, we may have implemented a one-off integration solution without necessarily thinking of them as integration strategies. One common approach is file-sharing, where one or more applications write to (or read from) a common file or files. This approach can be a little brittle, and brings with it the overhead of having to decide on a file format that is agreeable to all parties involved, permission issues regarding the files themselves, etc....

Another popular approach is database sharing. Rather than coupling the applications in code, they are coupled on the back-end. This approach too is fraught with pitfalls - the schema needs to be one that everyone agrees on, and any change in the schema, especially a destructive change (such as dropping a column or table) needs to be communicated, and may affect one or all participating applications.

In order to address some of the issues that enterprise integration brings with itself, and discuss some possible approaches, Gregor Hohpe and Bobby Woolf wrote Enterprise Integration Patterns (hereon referred to as "EIP"). EIP is an excellent book, and if you are interested in reading more about integration, I highly recommend it (footnote: I am in no way associated with the book's authors, or publishers in any way).

EIP focuses primarily on messaging as an integration and architectural strategy, and discusses various patterns that can be used to implement a scalable integration solution. The authors discuss over 60 patterns for the messages themselves, messaging channels, filtering, routing and other aspects involved in messaging solutions.

Following the lead of EIP, Spring Source (the creators of the popular Spring Framework) introduced Spring Integration (hereon referred to as SI). Internally SI is a library that builds on top of the Spring programming model and implements many of the patterns featured in EIP. It adheres to the core Spring philosophy of working with POJOs, and being very test-friendly. SI is merely an extension to the Spring Framework, and thus leverages the application-context for Inversion of Control (IoC) principle. This allows you to declaratively configure your components to be SI aware, while encapsulating the internal implementation of the integration infrastructure. Lastly, it builds on some of Spring Framework's support for JMS, scheduling, AOP and transaction support.

Before proceeding, I might highlight the fact that SI is not the only contender in this arena. Mule is a full-blown open source Java-based enterprise service bus (ESB), and like SI offers developers a way to link applications together. MuleForge is Mule's home for extensions and connectors, and offers an integration solution for almost every problem. Mule is definitely worth a close look if you are investigating integration solutions. Another popular option is Apache ServiceMix. ServiceMix leverages several popular technologies from Apache under the covers, such as Apache ActiveMQ, and interestingly, much like SI, uses Apache Camel which is yet another implementation of the EIP patterns. ServiceMix is an implementation of the Java Business Integration specification, and fully suports OSGi.

In this article we will discuss some of the key patterns from EIP, their implementation in SI, and some code to demonstrate the configuration and usage of these components.

Sound simple? Let's get started then.

Establishing The Lines Of Communication

Right off the bat, we realize that we need a means to actually send messages back and forth between parts of an application, and in-between applications. When thinking of messaging, one way to perceive it, is to use the pipes-and-filters analogy (footnote: Due credit to the SI manual for providing this little nugget of wisdom). Messages flow along the pipes destined for certain filters - although here, you should look at the filters as objects that do _something_ with the message - be that consuming it, filtering it, re-routing it, re-packaging it, or what-have-you.

EIP introduces a pattern named "Message Channel" to represent the "pipes" of the messaging infrastructure, with a few added constraints. Rather than have one channel to transmit all possible kinds of messages, each application should only deliver (and expect) a certain type of message from a _particular_ message channel. Note that the channel is merely an abstraction - a construct that serves as a logical address for applications to connect to. It provides a standard interface for different applications to communicate with each other without being highly coupled to the physical communication infrastructure, or a specific transport-type. Under the covers, the implementation might pool all messages while ensuring that the messages reach their destination. It might use in-memory queues, or the file-system, even HTTP or UDP as it's transport. All of the above should be transparent to the application.

Naturally this brings up the issue of discovery. How else is an application to send and receive on a particular channel? To address this issue, EIP recommends the use of an alphanumeric name to identify a message channel. Furthermore, EIP identifies two separate semantics for message channels - these are proposed as two separate patterns (which build on the "Message Channel") known as "Point-to-Point Channel" and "Publish-Subscribe Channel".

The Point-to-Point channel pattern ensures that when a message is sent by a producer, it will be consumed by one, and only one consumer. This pattern (in conjunction with another pattern called "Competing Consumers") does not prevent multiple consumers from listening on that channel for messages, but mandates that only one of them is allowed to process it. This is a useful pattern when you are looking for a load-balancing strategy, or when designing for redundancy.

On the other hand, the Publish-Subscribe channel pattern is akin to an IP broadcast - all consumers listening on a particular channel are allowed to consume a message, and only after every listener is given a chance to consume a message is the message considered to be _spent_ and dropped from the system. This pattern buys you a lot of flexibility when thinking of your work flow - you can have a plug-and-play strategy, swapping components in and out as and when you need to without affecting the producers. This is also a particularly useful pattern for debugging, logging and auditing concerns. Since these operations are usually orthogonal to the actual business concern, you wouldn't want to change the work flow to make these happen - rather you want to _listen_ in on messages as they flow through the system while other value adding components remain agnostic to your nosiness :)

There is another vector to consider when we talk about consuming messages - should the consumer be synchronous or asynchronous? In that, when a producer publishes a message, should the channel block till the consumer responds, or adopt a fire-and-forget strategy? Fret not, SI has you covered.

SI's approach to implementing the Message Channel pattern involves making it a first-class citizen of the framework. This gives you, the user of the library, full control on the configuration of the channel, including dictating whether the channel has synchronous or asynchronous semantics, whether it is an in-memory or backed by a persistent store and other parameters.

SI's base contract for all Message Channel implementations is the `MessageChannel` interface. This defines the basic contract for a channel to send messages. SI then defines two sub-interfaces - `SubscribableChannel` and `PollableChannel`. The `SubscribableChannel` is meant for implementing event-driven semantics - that is push or synchronous messaging, and the `PollableChannel` is for on-demand semantics - that is pull or asynchronous messaging.

Keeping in tune with the one-to-one (Point-to-Point) and one-to-many (Publish-Subscribe) semantics that EIP defines, SI provides various implementations such as the `DirectChannel` and `QueueChannel` for one-to-one publications, and the `PublishSubscribeChannel' to support the one-to-many paradigm. Each of these message channel implementations vary in regard to how they handle messages.

For example, the `DirectChannel` (one-to-one) will dispatch its message onto its subscriber (or consumer) and block until the consumer returns. Just as we discussed earlier, a Point-to-Point channel does not limit the number of consumers to one, but rather guarantees that only one consumer processes each message, the `DirectChannel` leverages the strategy pattern to dictate how it load-balances it's subscribers (by default it uses SI's `RoundRobinLoadBalancingStrategy`). If you were to look at DirectChannel you would notice that it implements the SubscribableChannel interface. On the other hand, if you would rather not block on the _send_ but rather let messages buffer while consumers poll, then you can leverage something like SI's `QueueChannel` implementation.

Of course, if you see the potential for, or need to have multiple consumers on a particular channel, then you should use `PublishSubscribeChannel`. This class, like the `DirectChannel` implements the `SubscribableChannel` channel interface, but rather than sending a message to only one consumer, guarantees that all registered consumers will be given a chance to process the message.

Listing GAN-1 shows how you can go about configuring channels using SI's XML DSL.

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:beans="http://www.springframework.org/schema/beans"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
    http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration.xsd">
          
  <channel id="webOrdersIn"/>
  <channel id="phoneOrdersIn"/>
  <channel id="faxOrdersIn">
    <queue capacity="10"/>
  </channel>
</beans:beans>

For those familiar with Spring's application-context files, this should be an easy read. Here, we are declaring 3 message channels. The `DirectChannel` is SI's default channel implementation - since `webOrdersIn` (or `phoneOrdersIn`) does not declare any additional configuration, they will be instances of a `DirectChannel`. On the other hand `faxOrdersIn` configures within itself a queue capacity. In this case, this will be an instance of a `QueueChannel`. A `QueueChannel` is a message channel backed by an in-memory Java queue, in this case with a size of 10. A `QueueChannel` also implements the `PollableChannel` interface, allowing consumers to poll the channel for new messages.

A naive, but rather informative usage of the queues is shown in Listing GAN-2.

public static void main(String[] args) {
  AbstractApplicationContext context = 
    new ClassPathXmlApplicationContext(
      "/META-INF/spring/integration/onEloquentConversations.xml", 
      Demo.class);
  MessageChannel inputChannel = context.getBean("webOrdersIn", 
                                                MessageChannel.class);
          
  // create or get an Order from a list of toppings
  List toppings = new ArrayList();
  Order order = new Order(CrustType.THIN, toppings);
          
  inputChannel.send(new GenericMessage(order));
}

We load a `ClassPathXmlApplicationContext` by pointing it to the context definition file, then get a handle to the `webOrdersIn` channel, and send a message to it. The message wraps an `Order` object as it's body (we will discuss the "Message" pattern in the next section).

As you might have concluded, SI offers several alternatives for the Message channel pattern - whatever be your requirements, SI probably has an implementation you can use. Or, you could always provide your own.

Once again, message channels within SI play a crucial role. They are not abstracted away in the framework, rather they play an active role when you go about designing your overall integration strategy. Your selection of the same will have a huge impact on the runtime characteristics of your application, or applications, and the overall throughput.

Having Something To Say

Now that we have opened up the lines to communication, we should have something interesting to say. EIP defines a pattern, appropriately called the "Message" to describe this very construct. A message is a very simple wrapper around two parts - a header and a body. The header consists of simple key-value pairs that contain meta-data pertinent to the payload (a.k.a body) of the message, or other information such as time stamps, the destination address of the message etc.... The body, on the other hand, can be any piece of data or even an object that the consumer(s) might be interested in. Note that the message, and its contents, are a function of the interaction between the producers and the consumers, and has nothing to do with the message channel the message is traveling on. Much like the job of the postal service is to guarantee delivery of your mail without worrying (for the most part anyways ;) ) about the contents, the message channel is to remain completely agnostic of the contents of the messages themselves. This separation of concerns allows for great freedom as applications (and consequently their communication needs) evolve over time.

SI provides a simple interface, called `Message`, as the base contract for any message that can be sent along a `MessageChannel`. This interface provides the methods to set the headers (which happen to be a simple wrapper around a `Map`) and the body (which can be any `Object`). There are some basic implementations of this interface, such as the `GenericMessage`, and for the most part, this simple implementation suffices.

One thing to note about SI's implementation of the `Message` interface is that it's a read-only (or immutable) object post construction. If you ever need to create a new message from an existing one, or create a new message with a new payload but use an existing set of headers, you will need to create a new one. To facilitate this, SI provides a message factory called `MessageBuilder` that gives you convenient literate methods to create new messages, copy payloads from existing messages into a new message, even create new headers or copy existing headers.

Creating messages is pretty trivial. We already saw an example of it in Listing GAN-2. Listing GAN-3 shows how you can use the `MessageBuilder` to create and copy messages.

// Once again, create or get an Order from a list of toppings
List toppings = new ArrayList();
Order order = new Order(CrustType.THIN, toppings);
          
// Option 1
Message genericMessage = new GenericMessage(order);
          
// Option 2 - Use the MessageBuilder
Message orderMessage = MessageBuilder
                                .withPayload(order)
                                .setHeader("ts", System.currentTimeMillis())
                                .build();
          
// Copy an existing message
Message orderMessageCopy = MessageBuilder.fromMessage(orderMessage)
                                    .build();
          
// Lets say you have
// Order anotherOrder = new Order(CrustType.THICK, toppings);
// Create a message, but use the headers on the "orderMessage"
Message combinationOrder = MessageBuilder
                                    .withPayload(anotherOrder)
                                    .copyHeaders(orderMessage.getHeaders())
                                    .build();

`genericMessage` is similar to the snippet we saw in Listing GAN-2. `orderMessage` on the other hand, uses the `MessageBuilder`'s `build` method to build a message, and sets its headers using the `setHeader` method. Remember that the headers can be any key-value pair - in this case we are setting the time stamp of the order. `orderMessageCopy` is merely a copy of the `orderMessage` in case you need to tweak anything as the message moves between application to application. Finally, we have the `combinationOrder` uses the payload of `anotherOrder`, but the headers of `orderMessage`. Note that this too will be a new, immutable instance - it just happens to share the same key-value pairs as `ordersMessage`.

Making a connection

We have talked about pipes (a.k.a message channels), and what they transport, that is the messages themselves. Now it's time for us to turn to another and equally important tenet of integration - how does an application actually connect to the messaging channel to send a message?

EIP addresses this by introducing yet another pattern - the "Message Endpoint". The message endpoint is to messaging what a database driver is to CRUD applications - it provides a consistent API for applications to talk to for sending messages. The main intent behind this pattern is to insulate your application from the specifics of your messaging infrastructure. It is the message endpoint's job to receive some data, know how to convert it into a message and deliver it a specific message channel. On the receiving end, it will be another endpoint that receives the message, unpacks it and hands it to an application in a format that the application expects. Need to swap out the messaging system from one to another? No problem - only the endpoints have to change. Your domain specific code, that is your application, can continue to chug away like nothing's changed.

EIP specifies some additional constraints on message endpoints: one, they should be designed to either send, or receive, but not both. Furthermore, there should be a one to one relationship between an endpoint and a channel. So if an application needs to send messages down more than one message channel, it will need as many endpoints as channels. However an application may have more than one endpoint for a particular channel to allow for higher processing via multiple threads.

Much like most of the patterns we have discussed, this abstraction dovetails well with the central philosophies of SI (and the Spring framework), which is to enable loose-coupling via the IoC principle. Furthermore, this pattern ensures that your application logic is completely unaware of messages, the messaging channels, or the messaging infrastructure itself. All it knows is that it needs to send a piece of data, and does not play any role outside of handing it off to the endpoint. This allows you, the application developer to de-compartmentalize your business logic from your messaging logic.

Within SI, any component that is capable of doing something with a message off of a message channel is an endpoint. In this regard, SI veers off a little from the description offered by EIP. This could be filtering messages based on their contents, re-routing them to a different destination than they were originally intended for, or splitting the message into one or more smaller messages. An endpoint can also be a component that let's an application talk to the _outside_ world - the filesystem, JMS, HTTP, email and other systems.

SI offers various implementations of endpoints for filtering, routing, aggregating and splitting messages. It gives you support for reading and writing to filesystems, JMS, even Twitter support!

Endpoints are a discussion in themselves, one that we will have to continue next time, along with some code samples to see how all these pieces fit together. But I assure you of one thing - SI will have you dreaming up ways to use it within (inter) and between (intra) your applications.

Conclusion

Enterprise Integration is a tough nut to crack. We often end up _working_ up a solution, using shared databases, or one off scripts. But invariably, these solutions can prove to be brittle. Change is inevitable in the enterprise, and often these solutions can create more headaches than they solve. EIP has provided us with a comprehensive resource to better understand, and apply some nuggets of wisdom, especially in the realm of messaging as an integration style. SI, for the most part, remains true to the spirit of EIP, and gives us a way to approach a solution incrementally, while keeping our domain intact, and allowing our applications to be loosely-coupled - both to each other, and to the messaging infrastructure.

SI has a lot more to offer, as we have seen. We will finish our education in the art of conversing next month. Till then ... keep polling my friends :)

Share