2009-05-23

On the road to Camel 2.0 - Concurrency with the async DSL

Its time to continue the road to Camel 2.0 and I thought that I should detail a bit more on the async DSL we have in Camel 2.0. It replaces the old thread DSL from Camel 1.x.

The async DSL is like the thread DSL used for concurrency. Under the async DSL lies the excellent Java Concurrency API where we leverage the ExecutorService for submitting concurrent tasks.  By default a thread pool with 5 threads is created. But you can pass in the core pool size if you like. For instance: from(x).async(20).to(y)

Suppose we have this simple route where we poll a folder for new files, process the files and afterwards move the files to a backup folder when complete.

from("file://inbox?move=../backup-${date:now:yyyyMMdd}")
    .to("bean:calculateBean");

The route is synchronous and there is only a single consumer running at any given time. This scenario is well known and it doesn't affect thread safety as we only have one active thread involved at any given time.

Now imagine that the inbox folder is filled with filers quicker than we can process. So we want to speed up this process. How can we do this?

Well we could try adding a 2nd route with the same route path. Well that doesn't work so well as we have competing consumers for the same files. That requires however that we use file locking so we wont have two consumers compete for the same file. By default Camel support this with its file locking option on the file component. But what if the component doesn't support this, or its not possible to add a 2nd consumer for the same endpoint? And yes its a bit of a hack and the route logic code is duplicated. And what if we need more, then we need to add a 3rd, a 4th and so on.

What if the processing of the file itself is the bottleneck? That is the calculateBean is slow. So how can we process messages with this bean concurrently?

Yeah we can use the async DSL, so if we insert it in the route we get:

from("file://inbox?move=../backup-${date:now:yyyyMMdd}")
    .async(10)
    .to("bean:calculateBean");


So by inserting async(10) we have instructed Camel that from this point forward in the route it should use a thread pool with up till 10 concurrent threads. So when the file consumer delivers a message to the async, then the async take it from there and the file consumer can return and continue to poll the next file. By leveraging this fact we can still use a single file consumer to poll new files. And polling a directory to just grab the file handle is very fast. And we wont have problem with file locking, sorting, filtering and whatnot. And at the same time we can leverage the fact that we can process the file messages concurrently by the calculate bean.

Here at the end lets take a closer look what happens with the synchronous thread and the asynchronous thread. The synchronous thread hands over the exchange to the new async thread and as such the synchronous thread completes. The asynchronous thread is then routing and processing the message. And when this thread finishes it will take care of the file completion strategy to move the file into the backup folder. This is an important note, that the on completion is done by the async thread. This ensures the file is not moved before the file is processed successfully. Suppose the calculate bean could not process one of the files. If it was the sync thread that should do the on completion strategy then the file would have been moved to early into the backup folder. By handing over this to the async thread we do it after we have processed the message completely.

For more information about the new Async API in Camel check out my previous blog entry and the Camel documentation.

Update: async was renamed to threads in Camel 2.0 final.

Those where the days where the internet led did not go green

Monday night a storm hit southern Sweden and as the blog title indicates the storm did bring in other issues than just rain and thunders. Next morning the internet connection led did not go green and thus not much you can do. The enjoyment of having the first cup of coffee isn't quite the same as you cant take it easy and read the news on the net and turn to your mail box to see how many millions I have won in all the lotteries or what's the spam of today is.
 
As I get up before most of other people I had to wait until 8 am before my ISP provider opens the support center. Well you still get a voice machine.

In the old days you used to navigate these call centers by pressing the numbers on the phone. But not today they use voice recognition. So as a Danish citizen living and working in Sweden its a challenge to understandable swedish for a voice machine to understand that I just want to get a status when the internet will be back. After several tries and having a lot of "I don't understand you" from the voice lady, I won, it gave up and presented me the good old fashioned button menu navigation. With this I could press 4 and then 3 and get the status. And guess what, the next time I call in its much faster with the button navigation as I could just press 4 and 3 again. But no the damn phone company want to pesky me with that voice lady again. 

As the phone company is the largest in Scandinavia I guess they reaction to remedy the internet for a local town in the southern Sweden isn't on their top of the list. So today the internet is still not green and the latest estimate is 3pm this afternoon. Well guess what they said that yesterday as well. They just add 6 hours to the clock each time. 

Well one service they offered that I thought would be cool was that you could type in your mobile phone number and will receive a text message when the internet is green again. But yet again they would not accept my mobile number as it was from another vendor. Aint it sweet when companies just go to big and don't bother with the little people.

And on top of that I missed the latest episode of Heros last night, couldn't see digital TV either as its also requires the internet led to be green.

So appreciate when you internet led is green. You newer know when a storm is coming and having a ISP provider that is slower than a turtle to remedy infrastructure that people start to take for granted. 

Maybe wireless broadband is the future, haven't heard of any major phone company having 2 days downtime on the mobile phone net.

Just heard that in Denmark the record was topped for the most used active hour last day. More than 2.600.000 minutes was registered on the mobile net during a hour last day. That is 50% of the population was talking for 1 minute in that hour!

Yeah since you can read this blog entry I guess the internet led is green again.
The downtime was 3 days, on the 4th day the internet led did go green again :)

2009-05-22

Apache Camel 1.6.1 Released

We the Camel riders is happy to announce that Apache Camel 1.6.1 has been released.

This is a maintenance release with approx 93 issues resolved. As we want the 1.6.x series to be production stable the next 1.6.2 release should only contain important bug fixes. Its important for the Camel community that uses of 1.6.x in production can rely on that we wont break or anyhow compromise the release.

All future development and features go into the 2.0 release.

2009-05-16

Apache Camel online training

I take my good employee hat on and posts about this new offerings we have.

On-line training for Apache ServiceMix with Camel
Interactive classes without having to travel

Not getting the most out of your open source integration projects? Register today for one of our travel-free Apache ServiceMix with Camel classes. This is an intensive two-day, instructor-led, live training class that is delivered in a virtual classroom via the Web.
The course includes lectures and hands-on, code-level exercises targeted for developers and architects. Throughout the course the instructor, a teaching assistant and a member of the Progress FUSE Technical Services team are available to answer questions and assist with any technical issues. Click here to view the full agenda.

Real life story - Camel help track vehicles in Australia

I do think it was time for a break in my blog series on the progress on Apache Camel 2.0, and post a little intermission with a real life story.

A long time Camel user and contributor (Thanks Christopher for contributing to the Camel community with your great blogs, patches and involvement in the forums), Christopher Hunt, posted a great blog entry on his latest project - Titan Class goes into production. He uses a variety of various OSS projects to facilitate an application that is capable of tracking vehicles etc. 

I encourage you to take a minute to read his blog entry and enjoy the great screenshot of the application in action. I know its not often we can present nice and sexy screenshots with Camel.

Christopher have listed the Open Source projects used and its nice to see how diverse the list is and that its possible to build such applications. Imagine what it would have taken to do that 5-10 years ago in a single R&D department in a large corporation? Thanks to the innovation and spirit in the OSS space its now possible. And yes we gotta thank Google for doing Google Earth as well :)

2009-05-11

On the road to Camel 2.0 - Asynchronous routing

We have rewritten the Asynchronous routing in Camel 2.0.

The new Async API in Camel 2.0 leverages in much greater detail the Java Concurrency API and its support for executing tasks asynchronous. Therefore the Camel Async API should be familiar for users with knowledge of the Java Concurrency API.

Before we dig into this matter there are a few concepts we need to discuss as a background material we must understand before using the Async API. The concept is general for asynchronous messaging and will apply for other Integration Frameworks as such.

As a client we can submit a message as either:

The former is the postal service example where we send a postal mail and but a letter in a letterbox. We do not expect a reply in this conversation.

The latter is on the web where we using a web browser request to visit a web page and the remote web server replies with the web content.

The invocation and processing for both of them can happen either:
- synchronous
- asynchronous
From the client point of view, that is.

By using synchronous the client will wait until the conversation is over before continuing. With asynchronous the control to the client is returned immediately after the message have been submitted. The client can in the mean time do other work while the message is being routed and processed. At any given time in the future the client can decide to check whether the task is finished or not. In fact the client can wait until the task is complete and its result is returned to the client.

All together we have 2 x 2 combinations to understand. In this blog I will only show 1 of them. The Camel documentation contains all 4 of them with more details.

Asynchronous Request Reply
The asynchronous request reply is where the client sends a message to an Endpoint and the control is returned immediately back to the client. The message however is processed in another thread, the asynchronous thread. The client can continue doing other work and at the same time the asynchronous thread is processing the message. This is illustrated in the diagram below:










  
  1. The client sends an Async Request Reply message over Http to Camel. The control is immediately returned to the client application, that can continue and do other work while Camel routes the message.
  2. Camel invokes an external TCP service using synchronous Request Reply. The client application can do other work simultaneously.
  3. The client wants to get the reply so it uses the Future handle it got as response from step 1. With this handle it retrieves the reply, wait if nessasary if the reply is not ready.
Asynchronous Request Reply Example
Suppose we want to call a Http service but it is usually slow and thus we do not want to block and wait for the response, as we can do other important computation. So we can initiate an Async exchange to the Http endpoint and then do other stuff while the slow Http service is processing our request. And then a bit later we can use the Future handle to get the response from the Http service. 

First we define some routes in Camel. One for the Http service where we simulate a slow server as it takes at least 1 second to reply. And then other route that we want to invoke while the Http service is on route. This allows you to be able to process the two routes simultaneously:

// The mocks are here for unit test

// Some other service to return a name
// this is invoked synhronously
from("direct:name")
    .transform(constant("Claus")).to("mock:result");

// Simulate a slow http service (delaying 1 sec)
// we want to invoke async
from("jetty:http://0.0.0.0:9080/myservice")
    .delay(1000)
    .transform(constant("Bye World"))
    .to("mock:result");

And then we have the client API where we call the two routes and we can get the responses from both of them. As the code is based on unit test there is a bit of mock in there as well:

MockEndpoint mock = getMockEndpoint("mock:result");
// We expect the name job to be faster than the async
// job even though the async job was started first
mock.expectedBodiesReceived("Claus", "Bye World");

// Send a async request/reply message to the http endpoint
Future future = template.asyncRequestBody("http://0.0.0.0:9080/myservice",
                          "Hello World");

// We got the future so in the meantime we can do other stuff
// as this is Camel so lets invoke another request/reply
// route but this time is synchronous
String name = template.requestBody("direct:name",
                  "Give me a name", String.class);
assertEquals("Claus", name);

// Okay we got a name and we have done some other
// work at the same time the async route is running,
// but now its about time to wait and get get the response
// from the async task

// We use the extract future body to get the response
// from the future (waiting if needed) and then return
// a string body response. This allows us to do this in a
// single code line instead of using the JDK Future API
// to get hold of it, but you can also use that if you want
String response = template.extractFutureBody(future, String.class);
assertEquals("Bye World", response);

assertMockEndpointsSatisfied();

Summary
Its now much easier to do asynchronous messaging with Apache Camel. The Client API has be rewritten so it resembles and uses the Java Concurrency API. The Camel documentation contains much more information and also about the new async DSL that can be used within the route itself to turn a route into asynchronous.

2009-05-05

On the road to Camel 2.0 - Interceptors round two

A week have passed since the last blog post on interceptors in Apache Camel.

So this is round two, where I present the latest updates and the new whistles and bells features added recently.

With this overhaul of the interceptors in Camel 2.0 we fell that they are first class citizen in the Camel family and that they will provide value in the integration space.

To recap Camel supports 3 kinds of interceptors:
  1. intercept
  2. interceptFrom
  3. interceptSendToEndpoint

Ad 1)
Intercept is the regular interceptor that is like a AOP before. Its triggered before a message is forwarded to the next step in the route path. Camel also leverages this internally to apply needed processing of messages during routing such as JMX instrumentation, Tracing, Error Handling and more.

What we have done lately is to improve the intercept with syntax sugar so you can define intercept as regular routing rules that you are already familiar with. 

So for instance to do the Hello World logging AOP we just do:
intercept().to("log:hello");

And Camel will thus log the message at each processing step while being routed.

As intercept() is also a regular route you can do routing as well, so you can do a little detour if you like:
intercept().to("log:hello")
    .to("bean:sanityCheck").delay(100);

Camel have build in support for predicates, so we have added this to the intercept as well. This allows you to attach a predicate to the intercept to let it only trigger in certain conditions. For instance to only trigger if the message is a test message we can add the when predicate as:
intercept().when(header("test").isEqualTo("true"))
    .to("log:hello");

So what is next? Well what if you want to stop continue routing, lets say the test message should not be routed normally but intercepted and logged. Well we have added support for stop as a general purpose stop continue routing. So we just prepend stop() to our intercept route:
intercept().when(header("test").isEqualTo("true"))
    .to("log:hello").stop();


Ad 2)
InterceptFrom is opposed to intercept only triggered once = when a new message arrives to our route. So this allows you to intercept any incoming messages and e.g. log them.
interceptFrom().to("log:incoming");

Well recently we added a new whistle and bells feature to this interceptor. You can now filter by endpoint URI. So if you only want to intercept certain endpoints you can filter using:
  • exact match
  • match by wildcard
  • match by regular expression

Exact match is to match a single endpoint, and its just to provide the endpoint URI:
interceptFrom("activemq:queue:order").to("log:neworder");

Wildcards is supported by using a * in the end of the uri. So to match all jms queues you can do:
interceptFrom("activemq:queue:*").to("log:neworder");

And you can use regular expression for fine grained matching, to match for instance a gold and silver queues:
interceptFrom("activemq:queue:(gold|silver)")
    .to("log:neworder");

Camel provides the real intercepted endpoint URI as a message header with the key: Exchange.INTERCEPTED_ENDPOINT.

So in the sample above it will contain either: "activemq:queue:gold" or "activemq:queue:silver". This allows you to known which endpoint was intercepted.


Ad 3)
InterceptSendToEndpoint is triggered when you are about to send a message to the given endpoint. It basically intercepts the Producer. So this allows you for instance to log all outgoing messages as:
interceptSendToEndpoint().to("log:outgoing");

And just like interceptFrom it also have the same URI matching, so you can for instance intercept all cxf based endpoints such as:
interceptSendToEndpoint("cxf:*).to("log:callingwebservice");

Oh and just as intercept they all support the when predicate. So you can attach a predicate to only trigger in certain conditions. Can for instance be used to patch a message before sending:
interceptSendToEndpoint("cxf:*)
    .when().xpath("//order@id = '0')
    .to("bean:fixMissingOrderId");

The interceptSendToEndpoint brings in a new interesting feature as you can use it to mock endpoints in unit test. Lets say you write a unit test to invoke a http service to retrieve some data. With the interceptSendToEndpoint() we can intercept this and construct our own canned response. But how do you avoid invoking the real http endpoint? Well we added the option skipSendToOriginalEndpoint() to force Camel to not send it to the real intended endpoint. That leaves us with:
interceptSendToEndpoint("http:*")
    .skipSendToOriginalEndpoint().to("bean:simulateResponse");

Well I guess the 3 minutes is up and its time to end round two. Next blog entry will be on another subject as I kinda got full with interceptors now :)

The Camel wiki page for intercept has been updated with the latest details.