For the last couple of weeks I had the time to work on recoding the Aggregator EIP in Apache Camel.
Over the time we have gathered a list of issues with the old code, which you can find here.
The works is now complete and we got a much improved Aggregator in Apache Camel 2.3 onwards.
The new and improved aggregator
First of all it allows you to use multiple and independent ways of triggering a completion. For example you can say the say the completion should happen when 5 messages have been aggregated. Then that will wait until that condition occurs. So if that takes 4 seconds or 17 hours don't matter. You may also add an additional completion trigger such as an inactivity timeout. So if you think that waiting 17 hours is to long you can set the timeout to e.g 1h. The aggregated message will also contain a property which tells you which of the completion conditions triggered. So you can see if it was the timeout or the size etc.
On top of that the inactivity timeout is per correlation key. So if you aggregated with multiple keys then those are separated. In the old logic, the timeout was not based on inactivity, but really just a timer for a background thread to wake up, and start aggregating the messages which have arrived since last time.
This is really a great improvement as it works much more in the way end users will expect, that the timeout is based on inactivity and its per correlation key. So for example the timeout is not reset just because a new messages arrived on a another correlation key.
You can also add a 3rd completion trigger such a predicate. So you can use that to check if a special message arrived, or the aggregated message contains some special data that would trigger completion.
On top of that we have added support for letting the aggregator use parallel processing. That means you can have concurrent messages being send out of the aggregator.
And another easy of use change is that the AggregationStrategy is now mandatory, which means than the UseLastAggregationStrategy is no longer default. That should fix up some confusing when using it for the first time - e.g. why does the aggregator just spit out the last message, etc.
Other minor improvements is that you can chose what to happen if the correlation key is bad, and as well you can now "close" a correlation key to deny late messages.
Well that is not all, this blog entry is also about a new feature which I am a bit exited about.
With the help from the very talented people from ApacheMQ, such as Hiram Chirino, we have a new Camel component, called camel-hawtdb.
HawtDB is a lightweight and very fast key/value database. Its based on the sound knowledge from ApacheMQand its KahaDB persistent store.
So what we have done now is to integrate HawtDB with the Camel aggregator. Which means we now support persistence out of the box. That means the aggregated messages is persistent to a file store, which means you avoid loosing messages in case your server crashes, or is reliable shutdown.
This is just a start. We will in the future add support for using HawtDB in other areas with Camel, such as the resequencer EIP, SEDA queues, DeadLetterChannel, StreamCaching big streams, idempotent repository and elsewhere. If you got good ideas for use cases then let us know.
I have put together a small example which demonstrates the new aggregator and the persistence support. If you don't want to run the example, the link showed console output which is kinda like the sexiest we can do with integration software.