Stream Processing Using Esper

Here at SumAll, we provide analytics from over 40 different sources that range from social media data to health and fitness tracking. We collect and connect metrics from a wide range of 3rd party API data sources for our clients to help them make better business decisions. One of engineering's biggest challenge has been on building and maintaining a data pipeline that is both performant and reliable.

We currently have a few different implementations that are running in production. One set uses Esper at its core while another set uses an in-house aggregation library written in Java. The use of Esper makes the aggregation piece of the processing asynchronous unlike the pipeline that uses the in-house aggregation library. This has been one of the debate points in discussions amongst our engineering team over which technology to ultimately use for our aggregator services. As the number of additional platforms we integrate with continues to increase, we have found that having different approaches to solving the same problem fragments the engineering team and leads to code overhead.

I have been a strong advocate of the use of Esper here at SumAll for our current stream processing architecture for a number of reasons:


  • Feature rich and mature: There’s no point in reinventing the wheel when there is a mature, battle-tested, off-the-shelf solution with the features we need. We don’t want to get into the business of building and maintaining low-level aggregation code unless there is a compelling reason. Esper is open-source, has a community, and is well documented.

  • Isolation of service: Consolidation of common processing allows us to manage the component better and make optimizations easier.
    Declarative SQL-like scripts: Scripts that hold Esper statements are loaded into the Esper engine. This provides flexibility in what aggregations we wish to do. In addition, we can focus on doing calculations instead of thinking about low-level details of how to execute the actual calculations, which boosts our productivity.

  • Performant: A data pipeline’s throughput is only as good as its slowest component and Esper has proven to have good performance through extensive testing. (Our slowest component is the database, which is typical in architectures that have a traditional database.)


How We Implemented Esper
We built a dedicated Play service to be responsible for doing all of the aggregations using Esper. At bootstrap, the Esper engine loads in all its dependent scripts that contains all the statements to evaluate on the streams of inputted data. There is one Esper engine per domain (i.e., one for social data, one for order data, one for web traffic data, etc.). Attached to the Esper engine is a dedicated thread to input data and a thread-pool to handle data that is outputted from Esper. The dedicated thread to input data has the responsibility to periodically grab aggregation requests from a work queue for a given time period and push them into the proper Esper engine. Esper will evaluate the statements on the inputted stream of data and output the result onto the threadpool.The threadpool that handles the streaming outputs from Esper has the responsibility to take the results and merge them with the existing documents in our MongoDB.

Handling these outputs quickly is very important in order to keep the production pipeline healthy and flowing at a high throughput. Otherwise, the outputs will begin to pile up and will have a cascading negative effect to upstream components like the work queues causing the whole system to get backed up. The pipeline is only as fast as the slowest part which in our case is the output handler, which does database reads and writes. To handle this, the outputs are quickly transformed into a modeled form and pushed into a queue to be persisted by another dedicated thread. If for some reason the queue to persist the modeled data becomes backed up (maybe the database connection dropped), we have implemented a simple admission control mechanism that turns off or lowers the consumption of new aggregation requests by the input thread. This is to prevent the service from potentially piling up with in-memory objects and causing the service to fall over. It gives the service an opportunity to recover.

Details on Esper Usage
With Esper, we are able to create scripts that are loaded into the appropriate Esper engine. The scripts are written in a SQL-like language that gives us the ability to focus just on the computation and less on how the computation is done. The following is a snippet of our production script that shows what we do with the modeled data that is pushed into the engine. The script here uses our FacebookPost model:

CREATE SCHEMA FacebookPost AS models.data.facebook.FacebookPost;

INSERT INTO PostsStream
SELECT

createdOn AS _createdTime,
createdOn.roundFloor('day') AS _periodDay,
createdOn.roundFloor('month') AS _periodMonth,
DateTimeUtils.truncateToISOWeek(createdOn) AS _periodISOWeek,

*
FROM FacebookPost fp;

In the snippet above, we declare the FacebookPost model and do some basic date manipulations where the date transformation functions are all Java functions that are executed by Esper. The projections that are generated from the stream of FacebookPost get inserted into the PostsStream that is used to feed into the following aggregation query as well as its sibling counterparts—there’s one for weeklies and one for monthlies. In this same query, we use other features of Esper which include context partitioning and event stream windowing.
CREATE CONTEXT ByStream PARTITION BY streamId from PostsStream;

CONTEXT ByStream
INSERT INTO PostsByDayStream
SELECT

streamId,
tzOffset,
_periodDay AS period,
sum(likes) AS numLikes,
sum(comments) AS numComments,
sum(shares) AS numShares,
sum(count) AS numCounts,
count(*) AS numProcessed

FROM PostsStream.win:time_length_batch(30 sec, 4000)
GROUP BY streamId, tzOffset, _periodDay
HAVING count(*) > 0;

Context partitioning allows us to logically partition a stream by a collection of the model’s attributes which gives us nice isolation of the streaming data and reduces query complexity. In addition, the benefit of using context partitions is the high degree of concurrency in processing the stream “with a maximum (theoretical) degree of parallelism at 2^31-1 (2,147,483,647) parallel threads” (Esper documentation). Aggregation queries that are written to use contexts are guaranteed to work on the isolated partition of data only.

Windowing is provided by Esper to give us the option to have a moving window so that we perform the queries over a subset of objects over time or count. Queries can be done over the lifetime of the stream, but this is not ideal for our scenario because a given stream is aggregated only a certain number of times in a day so it results in a misuse of resources.

The combination of context partitioning and windowing allows us to effectively perform the necessary queries for a given stream with enough time to account for all the stream’s modeled input data elements.

The final thing to mention about our use of Esper is that we subscriber objects over the more flexible data frame and event listener for handling the outputs from Esper scripts. The registered subscriber object is tightly coupled to the associated output query in the Esper script via the signature of the update method in the subscriber object. The downside of using subscribers is the loss of a bit of flexibility in changing the scripts and a higher susceptibility to a runtime failure. However, if a runtime failure does occur, it will be at the point of subscriber registration, which is much better than if the failure were to happen while data is in flight. The upside of using subscribers is better throughput because there is less translation and fewer generic objects to deal with.

Conclusion
This particular pipeline has been running for about half a year now and has been performant. The aggregation service has been measured to be processing at a rate of 10K events/second per node where the host resources were not being heavily utilized. Esper is a good off-the-shelf piece of technology that has become essential for us to do stream processing. It has effectively taken care of how we do the computations so we can now focus on what computations to do.