Adding Content in Real-time
iHeartRadio ingests hundreds of thousands of products each month. Historically, as a new product delivery was received, a user would manually initiate the ingestion process by entering its file path into a form on a web page, triggering the ingestion application to parse the delivery and update the database. Downstream systems would constantly poll the database, run at regularly scheduled intervals, or be triggered manually. This process, roughly visualized below, was reasonable, with new content arriving in the catalog within a few days of its receipt.
Linear ingestion flow
As new content providers were added, new distribution formats needed to be accommodated. More and more code was added to the application. Eventually we developed a new version of the application, this time introducing an XSLT stylesheet for each provider. These stylesheets transformed the providers’ formats into a single, canonical format. This simplified the application as the ingestion application only then needed to know how to parse one XML format.
Over time, though, provider-specific logic found its way into the application to handle cases that couldn’t be handled by XSLT. Also, one provider delivered their content in a format that couldn’t easily be handled by XSLT at all. This meant that both versions of the application were used to ingest new content. Changes targeted at all providers needed to be made to two different applications. This also meant two applications needed to be tested against changes.
We made another pass at creating a provider-agnostic version of the ingestion application. This time, however, the goal was to include other types of content. The first two iterations focused solely on music. Data models and their business logic were pushed out of the application and into configuration files. This would allow for new instances of the application to be spun up with configuration files that described different content types.
To help with the additional workload, the application was designed to distribute its work. As this version was written in Python (the previous two were both written in Java), Celery, backed by RabbitMQ, was used to distribute tasks across multiple workers.
Unfortunately the simplicity of the application came at the cost of code that was very difficult to debug. It was also difficult to efficiently debug a data model when the model was defined in configuration rather than in code. Problems were hard to diagnose, and it quickly became clear that adding additional content types would only make this worse.
In addition to debugging problems, there was one issue this version failed to address, something that had been plaguing us from the first version on: when a delivery failed, it needed to be placed through the entire process again.
We set out to build the fourth version of the ingestion application. This time, however, we decided to split it up into smaller, single-purpose applications. Each application would be connected to the next through a message queue.
With our new approach, applications can be run in succession or in parallel. Applications can be added or removed at any time without affecting the entire system. The flow of products through the ingestion system takes on a very different shape.
By logging each outgoing message we gain visibility into the state of the system and can better monitor its health and performance. We send all of our logs — both the messages between systems and all application-level logs — to Logstash. This enables us to easily get messages into Elasticsearch, either in their original format or with keys remapped. This, coupled withKibana for dashboards, allows us to gain insight into how the system is performing and the stakeholders to gain insight into products being ingested.
We can also recover from errors much easier. For errors from which we know we can recover, an application can place its incoming message back into the incoming queue — this could also be done by not acknowledging the message and allowing it to return to the queue, but we want to know about the error and be able to fail if it’s been retried too many times — to allow it to be processed again later.
Brought to you by the letter H
To accomplish this, we built a framework known as Henson. Henson allows us to hook up a consumer (usually an instance of our Henson-AMQP plugin) to a callback function. Any message received from the consumer will be passed to the callback function. To help simplify the callback function associated with each application, Henson also supports processing the message before giving it to the callback through a series of callbacks (e.g., message schema validation, setting timestamps) and processing the results received from the callback (e.g., more timestamps, sending the message through a producer).
Henson allows us to reduce each application to just the amount of code required to implement its core functionality. The rest can be handled through code contained in shared libraries, registered as the appropriate processor.
The boilerplate for one of our services can be as simple as:
from henson import Application
from henson_amqp import AMQP
from ingestion import send_message, validate_schema
from .callback import callback
app = Application('iheartradio', callback=callback, consumer=AMQP(app))
All we need to do when creating a new service is to implement callback.
async def callback(application, message):
"""Return a list of results after printing the message.
application (henson.base.Application): The application
instance that received the message.
message (dict): The incoming message.
List[dict]: The results of processing the message.
print('Message received:', message)
Once this is done, we can then run the application with
$ henson run service_name
We decided to use asyncio’s coroutines and event loop to allow multiple messages to be processed while awaiting for traditionally blocking actions to complete. This is especially important in our content enrichment jobs, many of which poll APIs from third parties.