On the road to Camel 2.0 - Concurrency with the async DSL

Its time to continue the road to Camel 2.0 and I thought that I should detail a bit more on the async DSL we have in Camel 2.0. It replaces the old thread DSL from Camel 1.x.

The async DSL is like the thread DSL used for concurrency. Under the async DSL lies the excellent Java Concurrency API where we leverage the ExecutorService for submitting concurrent tasks.  By default a thread pool with 5 threads is created. But you can pass in the core pool size if you like. For instance: from(x).async(20).to(y)

Suppose we have this simple route where we poll a folder for new files, process the files and afterwards move the files to a backup folder when complete.


The route is synchronous and there is only a single consumer running at any given time. This scenario is well known and it doesn't affect thread safety as we only have one active thread involved at any given time.

Now imagine that the inbox folder is filled with filers quicker than we can process. So we want to speed up this process. How can we do this?

Well we could try adding a 2nd route with the same route path. Well that doesn't work so well as we have competing consumers for the same files. That requires however that we use file locking so we wont have two consumers compete for the same file. By default Camel support this with its file locking option on the file component. But what if the component doesn't support this, or its not possible to add a 2nd consumer for the same endpoint? And yes its a bit of a hack and the route logic code is duplicated. And what if we need more, then we need to add a 3rd, a 4th and so on.

What if the processing of the file itself is the bottleneck? That is the calculateBean is slow. So how can we process messages with this bean concurrently?

Yeah we can use the async DSL, so if we insert it in the route we get:


So by inserting async(10) we have instructed Camel that from this point forward in the route it should use a thread pool with up till 10 concurrent threads. So when the file consumer delivers a message to the async, then the async take it from there and the file consumer can return and continue to poll the next file. By leveraging this fact we can still use a single file consumer to poll new files. And polling a directory to just grab the file handle is very fast. And we wont have problem with file locking, sorting, filtering and whatnot. And at the same time we can leverage the fact that we can process the file messages concurrently by the calculate bean.

Here at the end lets take a closer look what happens with the synchronous thread and the asynchronous thread. The synchronous thread hands over the exchange to the new async thread and as such the synchronous thread completes. The asynchronous thread is then routing and processing the message. And when this thread finishes it will take care of the file completion strategy to move the file into the backup folder. This is an important note, that the on completion is done by the async thread. This ensures the file is not moved before the file is processed successfully. Suppose the calculate bean could not process one of the files. If it was the sync thread that should do the on completion strategy then the file would have been moved to early into the backup folder. By handing over this to the async thread we do it after we have processed the message completely.

For more information about the new Async API in Camel check out my previous blog entry and the Camel documentation.

Update: async was renamed to threads in Camel 2.0 final.


Christopher said...

Hi Claus,

With your example, why would async be better than using a seda endpoint?

Also does ,async deprecate .thread?

Kind regards,

Claus Ibsen said...

seda is a more similar to an internal queue, kinda like a very lightweight JMS queue. As a figure of speech.

So when you send a message to a seda queue its like sending to a jms queue. Your route ends here. And unlike jms queue, the seda does not support request/reply.

So all together that means that when using seda the original route ends. And a new route is routing from the seda queue (if there is a consumer) but the two routes are independent.

The async however lets the first route be linked with the 2nd route as it holds the Future handle so we can get hold of the result from the 2nd route. And it also hands over the on completion work so the 2nd route can do the file on completion.

Yes async replaces the thread from Camel 1.x. The old thread did not have the same powers as the new async does.

Christopher said...

Aha - now I see that SEDA is not going to support in-out messages given your comment on JIRA. Prior to your decision SEDA looked as though it would support InOut hence my question really.

I still think that having queue semantics on an async situation is very useful; indeed that is entirely what you get with JMS. It is a pity that SEDA won't get this functionality now.

On async vs. thread I would have preferred sticking with the thread name given that it amounts to the same thing. Oh well. :-)

Claus Ibsen said...

Hi Christopher,

Thanks for the input. I would like that we continue this discussion at the Camel dev mailinglist.

Naming is hard. I selected the async DSL over thread as I wanted it to standout that it turns the route into asynchronous. The thread did not really indicate this and end users was a bit confused what it does. They focused on the threadpool aspect and did not always realize the implications with the synchronous vs. asynchronous aspect it causes.

But lets discuss at the dev mailinglist.

For instance:

Anonymous said...

Hi Claus,

How can we achieve the same using spring instead of Java DSL?

Claus Ibsen said...

Just use the XML DSL, There is a XSD schema so your editor should allow to assist you. The Fuse IDE editor is pre-setup for Camel, and thus can help you. Just press ctrl + space and you get a list of EIP in the XML editor.

Anyway the XML route would be something alike. Hoping the XML tags come out alright in a comment

<from uri="file://inbox?move=../backup-${date:now:yyyyMMdd}"/>
<threads poolSize=10">
<to uri="bean:calculateBean"/>

rathna said...

Hi Claus,
I have a scenario in which i want to send only 100 messages to multicast endpoint (say A to C and B) with a condition that no endpoint(C or B) should exceed more than 100. also A should check both the C and B every time, if any data is consumed from C or B then A has to poll the data to specific endpoint. For example C has 98 data (2 data is consumed) and B has 100 data(0 data consumed) then A has to poll 2 data only to C and should not poll data to B. Is there a way to do this?

rathna said...

I tried with one endpoint using Routepolicy and throttle routepolicy.It worked but that policy is not working for multicast endpoint....:(