Trouble with Event Processing? - Try the Method that Stripe has Mastered with MongoDB!

(Original post with audio and slides is here)

Greg Brockman:  So, I’m Greg Brockman. I work at Stripe.  So, Stripe is an online payment processor to make it really easy to add payments to your website.  So, if you haven’t heard of us before, you should definitely go check us out and use us for your data processing needs.

So, the thing I’m going to be talking about today is Monster, which is our event processing system that we’ve been using for a little over a year now.  So, I wrote the first generation Monster, and over time, a lot of different people at Stripe have contributed to it.  And the reason that I’m here is that we use MongoDB pretty heavily in order to make Monster work.  There are a lot of reasons that we chose MongoDB and stuck with MongoDB over the past year, and I’ll get into that.

And so it’s, basically—the problem that Monster is trying to solve is traditionally, actually two problems.  You have two different systems for solving.  So, one is structured solving.  So, you have a bunch of events going on in your system. Whether it’s that someone has just logged in, or someone wants a password reset, or someone has just tried hacking into your system, or someone has tried to process a payment.  All of these are interesting events, and you want to keep hold of the data.  And the fact that this thing has happened for, basically, an indefinite period of time.

So, you’ve a—have some structured log that you shove all this data into.  And then you also have that #[01:43] events. But those aren’t very useful for doing computational work later. So, you definitely want to make sure you keep some structure.

And then the other problem is just doing asynchronous processing. So, I have some email that I want to send, but I don’t want to send it in a web request, or some other work that just needs to be done. It doesn’t really matter if it’s done synchronously or not. It just needs to be taken care of somewhere. And usually you have two different systems for this.  And you have something like ResQ or if you want to be fancier something like Storm for doing all of your batch processing, or sorry, doing real-time processing. But the thing is that as soon as the event has processed in that system, it’s gone. What do you do with it?  How do you know that you did that?

And so, the system that’s most similar to Monster that I know of is Kafka, which is Lincoln’s EventQ system, which basically persists all of your logs forever.  And the one problem with it, which they’re actually fixing #[02:40] upon release is that it’s missing replication. So, if you’re talking to some broker, and that broker goes down, it’s possible that you’re going to lose events.  And depending on your use case, that might be okay. But for something like Stripe, we care a lot about durability, a lot about the fact that this event’s happened. And that we know about it forever because fundamentally what we’re moving around is payments.  And people care a lot about whether or not their payments are still there. And so that’s actually the main reason that we chose Monster, that we choose to use Mongo in the—having basically the great property about Mongo is it comes with good replication that you have to figure bulk durabilities. So, you can take any particular array and set an option and say I want this thing to replicate to a certain number of secondaries before it returns success. And by building off of that as a primitive, you’re able to get some pretty good properties out of your system.

So, first of all, I’m going to go and do a live demo. I have to show you exactly how Monster works and what the interface is. Okay, this is going to be a little bit difficult to maneuver. Okay, so, let’s say that we’re writing our web application, which is a blog post generator. And so, we  #[04:03]  the code for this and it’s great. You run it, and it generates this lovely blog post. You run it again, and you generate a new blog post. And all the code here is doing is just generating random words and randomly deciding when it wants to put a period. You’ll see some of these are short, and some of these sentences are long. But you don’t like long blog posts. That’s actually bogging your system. And so, you want to start recording whenever that happens, and then do something with that. Probably send an email to alert yourself. So, this is a pretty good use case for Monster.

So, in order to do that, the first thing you want to do is create a model associated with this event.  And so the event that we’re going to create is going to be called sends. We’re just going to give it one property, which is an array of the words in the sentence. And so now whenever we generate a blog post using this code, we should put it in a line saying that we want to block that out. Okay. So, let’s see what happens. So, we run—generate blog posts.  It runs exactly as before, and it doesn’t look like anything particular has changed. But so, the other thing that we might want to do is start doing with that. That there’s a sentence that’s long. And so, I will put in a sentence consumer. This thing will descend from, after our consumer.

So, here you have defined the two methods. First of all, the register method, which just says exactly what you want it to process. And, so, here we put in a—I want to consume sentence events, and then the second one which says okay, well, I have this event. What do I do with it?  And here, let’s say that the sentence if it is longer than say 10 words, then print out some angry message. Okay. That’s easy enough.

And so, let’s run Monster over here. So, you notice it went over some backlog, and now whenever we generate blog posts, it’ll go and take all the sentences, generate those as events, put them into the cue. And then Monster will go and print out any of your#[07:02] out all the long sentences that it finds. And so the power here is that actually all those events are persisted. So, if I want to add a new consumer in the future and go over this backlog, all I have to do is just write consumer exactly as I did there and put it into production. And it will happily go and churn through all these past events, even though they have been previously consumed by other consumers. So, this is just a brief recap of the #[07:35].

And so, it appears to be missing images here, but that’s okay. So, the primitive that Monster really uses out of Mongo, the primary thing that makes me really excited to be using Mongo and to be here is that Mongo uses replicate sets. So, if you’re not familiar with how replicate sets work, they have extraction that they give as basically here is a pool of servers. You don’t have to care what they are doing internally.  As far as you’re concerned, these are indistinguishable nodes, and if one of them blows up, then that’s okay.

The replicate set will continue to work. You can just talk to this thing as if were a single database, which is basically the abstraction you want for your database. And in practice what happens is that there is an election, and at any given point, only one of these nodes is able to accept rights. And if that node blows up, that the new election occurs, failure happens. Production in this typically takes quarter of 10 seconds, which if your web requests are long enough, that the 10 seconds isn’t a problem, then you can actually just handle a primary going down with no down time.

So, we’re listed on AWS, and so we end up with lots of our TD primaries going down all the time, and that we’re resilient to that kind of failure. And I guess I should also say that for something like Mongo or something like Monster, availability is just really important. It’s not the kind of thing where when Monster goes down, that people will notice immediately. But first of all, we actually use a lot of our internal monitoring and alerted is reported through Monster.  And so, if Monster’s down, then that means that we’ve lost one of our channel there. But we do have a redundant monitoring outside of that in a couple different areas in order to cope for that.  But this is basically a bootstrapping problem where the monitoring there isn’t the monitoring that Monster’s working, and then pretty much everything else goes through Monster.

Another important thing there is that we use Monster for things like web books. So, if Monster’s down, then that means web books aren’t working either. And so, in terms of actually implementing Monster, one thing that was really useful and being used pretty heavily is find and modify. So, find and modify is a Mongo operation, which basically does exactly what it sounds like. It’s a combine atomic query and update.  It’s a—you do this against your server, so your server will be able to do things like take out lots on jobs without having to implement your own login around that.  So, that’s useful, and it is something that we use pretty heavily in the internals in terms of saying that when you have multiple consumers on your cue, figuring out which one is considered the #[10:21] job.

Another pretty useful thing for us is the fact that Mongo makes it easy for you to just go and get started.  So, usually people think that oh, I have this production system, it’s not really that important what the overhead is for going and starting something new.  For something like Monster, it’s really important that anyone is able to just go and create a new event and start logging it right away without having to coordinate the aux team or think about okay, now that I’ve written this thing, how is it reliable to work.

The best attraction that I can give someone is just something they can start using and not have to think about how is this thing actually going to work.  How do I go and run an operation in order to get it started. And so, the fact that Mongo just has automatic clutching creation is something that we take full advantage of, and it’s been really useful to us.  We’ve been actually using this in production.

Another related property is that fact that it’s a document store. And so, what this means is that you can take your data and maybe you have some semi-structured data that you just want to shove into an event and think about it later. So, probably the most common event that we have in our code is this thing just called general event, which is basically what it sounds like. It’s just some hash review you shove a bunch of data into there and you don’t really think about it.

And there’s also a value on there you can set saying I want this event to eval #[11:44], and so there’s basically zero overhead having this sort of thing. So, what this means is we end up being alerted for lots of things that would have been otherwise never have thought to put into or to do anything with simply because they’re just so low overhead to doing that.

And so, often what you’ll do is you’ll go and write and you’ll add a general event for some particular thing that you find interesting, but aren’t really sure if it’s ever going to be triggered.  Then, that #[12:08] gets triggered and, then, at that point you go and maybe do something more structured, more #[12:13].  And it’s really useful to be able to just append that and edit it out and not really think about, okay, now I have to go and find my schema or think about what the possible types are of this data.

So, we actually have had a second similar event which is probably our second most common one, which is insertion failure. So, there you can just put in a service statement in the code and maybe put in some extra structured parameters that need to be logged.  Then if the—if some Boolean calculate is false. Then, that means the assertion’s failed, and #[12:50] log enough properties, and so you could reconstruct the environment and debug at some appropriate, later point in time.

Also, being related to being able to easily roll out and just easily get started is background index creation.  And so, with many databases if you want to build a new index, that means that you’re going to be grinding your entire database to a halt while that’s going or at least that collection.

With Mongo, you can in a sense specify that you want your index build to go in the background.  So, this is just a single flag to be set saying build in the background is true. So, for us, our ODM takes care of that for us and is specified.  Here is an index. Don’t create that.  And the first time this code rolled out, it will go and automatically create that.

So, we’ve actually gone through a couple of variations of how this interaction works. And the  #[13:45] that we are on now is that it doesn’t get created on the first load. And the kinds of problems that we ran into was initially this is what would happen. You would just write this code and you roll it out. And you kind of know that there’s going to be this index built.  Then, as we start to add more developers to the system, we started having problems where there was just multiple versions of the code run at different bases and so immediately there was some index that we got rid of.

We’d go ahead and drop that index and think that it would never come back, #[14:15] say you would run some old version of the code. And the next time that was run, it would let us know there’s no index here, but it’s specified in the code. I guess I should go and build that. And so we started to run into problems with just having these mysterious indexes come, and then we realized that it’s just really hard to scale something like that. So, this is one case where we decided to pick a slightly less filter-friendly interface as simply for being able to scale to being an abstraction used by lots of people at the company.

So, one question that always comes up with Mongo is just how do you deal with lack of transactionality. And so, for Monster there are actually very few bases where we need transactionality.  But there is one case that we do, which is that there’s one abstraction, which is a statement consumer.  Basically, the way that this works is rather than just being given an event that you consume, you’re also given some statement that you can update. And that state at the end is just magically persisted and you don’t have to think about auto-persist that, how do you deal with failures? That’s a case where you really want transactionality between the consumption and the event and the updated events sticking.

Fortunately, this is something you can write at the application level, and the thing that we found is that for most of the cases that we care about, I think basically all of them, that either A, traditional database transactions would not actually help because you can’t really do transactions for things that have side effects. If you’re sending an email like, that’s an outside #[15:51] property.  You can’t build that into your database system.

And then, the other case is that usually you can come up with some sort of scheme that works #[16:01] this case.  So, the #[16:03] massive code there is basically the amount of code that we had to write in order to get transactionality for #[16:08]* staple consumers.  So, it’s really not that bad in the scheme of the other values that you get out of Mongo such as the clustering, which would take a lot of underlying code from that in order to get right.

So, these days, we basically use Monster pretty heavily internally. We’re probably at an order of 100 to 200 different events that we’re logging and probably about the same number of consumers.  Really, this is used for everything in Stripe that’s not done synchronously that we use it as sometimes a message cue. Sometimes for simple things like I want to reset my password, which results in a password reset event being logged, which then is consumed and generates an email event, which then is consumed and actually sends an email.

And we use this for things like updating totals in peoples’ web interface. We use this a lot in fraud systems.  It’s pretty heavily used and the—and that’s actually allowed us to move a lot faster and do a lot of interesting things that would be hard to do if you didn’t have a unified way of doing them.

So, in order to actually make this work, we’ve had to a lot of scaling over the past year. We’ve probably maybe gone through about three about three orders of magnitude of growth on just Monster volume.  So, we ended up learning a lot of different things in making a lot of trade-offs and we doing a lot of different things in order to actually keep up with volume.

So, the kinds of things that we’ve been changing are one, that at some point, you just can’t poll all the events anymore and so you need to do something to get rid of them.  So, that’s actually something we built on top of the existing infrastructure for Monster where there are a few tiny events can export.  And after that period of time, then another consumer just goes and consumes that event and deletes it from Mongo. This works pretty well. You can use this to keep the size of your data under control. You need to be a little bit careful about fragmentation, which is something that you can correct by doing periodic rehash of database or we actually usually just rebuild the entire cluster every so often in order to keep the data compact.

The other kinds of things that we’ve had to deal with is that some of our initial limitation on queuing logic wasn’t actually horizontally scalable, which is a problem that you really don’t need, basically because at that point, it’s going to keep [0:18:53.7] serving, and scaling issues and you just don’t really have good avenue to go down.  So, the kinds of problems that we have is where you’d have multiple consumers in parallel optimistically doing some work, and if they later found that oh, someone else had to do this work, then they’d just give up and move on to something else.  So, that would mean that as through more and more loss that the system, you would end up having more and more collisions where people are just doing the same work and not actually producing anything useful as a result. And so, whenever you’re building a system like this, it’s always pretty important to think about how that’s going to work.

But on the whole, it’s actually scaled remarkably well. I actually gave, basically, this talk at Mongo SB about a year ago, and it’s been interesting to me to see just how little it’s actually changed. That the fundamental implementation and way in which we use Mongo is basically the same as it was.

So, that’s everything that I have. Does anyone have any questions?

Male Speaker 1:  Are there any bottlenecks or synchronous processes you can’t avoid with this framework, this system?  Or do you do everything asynchronous?

Greg Brockman:  So, the question was if we can do everything asynchronously here.  I mean, do you mean within sort of the larger system of—

Male Speaker 1:  Yeah.  With your events and some of the other things you’re tasked to solve.  Can you basically model everything you’re doing in terms of asynchronous processing or?

Greg Brockman:  Right.  Okay. So, it’s definitely not the case that we can put all of our #[20:20] here.  Like certainly, if a user has some page that they want to see.  You need to render that page in line with the request and it doesn’t really matter if you shell out some processing system.  You need to get back that response before they go away.  So, there’s definitely a lot of work in doing in line with the request, but we do as little as we can.  Basically, any additional work that you put into your request is work that A, if  sash plan breaks, you’re going to end up having a user based exception and B, slows down your request, and just generally makes your system less flexible and less error tolerant.

Male Speaker 2:  You mentioned the scaling #[20:59] with more consumers getting the same jobs essentially.

Greg Brockman:  Yeah.

Male Speaker 2:  How do you #[21:04] to solve that?

Greg Brockman:  Yeah.  So, basically, the—it’s rapidly, right now, we’re going to miss [0:21:11.4] another without the scaling. We’re going to be moving to Storm as the scheduling layer and using Mongo as continuing to be the durable store. And there, we’ll be able to take advantage a lot of work that’s already been done in the system like that. So, the way that we’ve approached this is moving away from doing optimistic #[21:34].  So, basically using more and more find and modify, which is scaling pretty well for us.  And basically the kind of thing that we used to do is you go and you find some job that looks like it’s available for processing, and then you go and you try to do a find and modify to grab that job. And that, if you instead just model this as you #[21:58] operation, you get that job and reserve it. Then, you don’t have to worry about doing the duplicated work.

Male Speaker 3:  What volume are you actually doing in storing #[22:11] and how do you [0:22:12.5]?

Greg Brockman:  Yeah storing. So, we’re doing an order of like 5 to 10,000,000 events a day.  And these days that’s probably taking up about a terabyte, somewhere in there.

Male Speaker 3:  And what’s the retention period?

Greg Brockman:  It depends on the event. So, some events have been kept down to the order of 7 days. Some of them are 90 days, and some of them are kept indefinitely. So, basically, for things that are low volume, we just keep them forever because we can. For things that are higher volume, every time you create a charge, we try to get rid of those faster. So, we actually do hold onto all of the events. We just don’t keep them online. It’s basically our preference #[22:51] is to another store.

All right.  Thank you guys very much.

Want to hear from more top engineers?

Our weekly email contains the best software development content and interviews with top CTOs. Enter your email address now to stay in the loop.