eBay NYC: How To Use Scala on Hadoop by Adam Ilardi - Transcript
(Original post with video of talk here)
Adam Illardi: Hi, I’m Adam Ilardi. I work here at eBay. I’m an applied researcher. Why do I choose eBay? It’s a pretty cool company. They sell the craziest stuff you’ll ever believe. There’s denim jean jackets with Nick Cage on the back, and this kind of stuff is all over the place. So it’s definitely cool.
The New York office is brand new. It’s less than a year. What does the New York office do? Well, we own the homepage of eBay, so the brand-new feed is developed right over there. You might even see one of the guys. He’s hiding. Okay. And also, all the merchandising for eBay is going to be run out of the New York office. So that’s billions of dollars worth of eBay business run right out of here. It’s a major investment eBay has made in New York, which is really cool.
So why you’re here is to find out how we use Scala and Hadoop, and given all the data we have, the two pair very nicely together, as you will see. All right, so let’s get started. Okay, these are some things we’ll cover—polymorphic function values, higher kinded types, Cokleislis Star Operator, some use of macros.
Okay, I’m kidding. I have no clue what those things are. So that was a test, all right? We’re not going to be talking about that. We’re going to be talking about practical uses of Scala. I don’t know if any of those could increase revenue on eBay, but, if they could, let me know after the talk. I’ll be happy to hear about it.
Okay, so what we’re actually going to talk about—why Scala, why Hadoop, why we chose Scala and Hadoop, and I’m going to go through a lot of code. So bear with me, but I assume everyone is familiar with Scala, so it shouldn’t be too much of a shock. All right, cool, okay.
So why Scala? So these are typical reasons why you’d use it. But the JVM’s really important, because in a big company like eBay, they’ve made huge investments in JVM technology. So they know how to scale it, they know how to monitor it, they have all these systems up and running that do this for you for free. So that’s kind of a huge selling point, for sure.
Functional—so Scala’s really cool. When I first saw Options and Eithers, I was blown away. You just can’t get that in Java, and Scala gives you that kind of conciseness. It also is extremely expressive. So you say what you want to do, and maybe it doesn’t make a lot of sense to non-Scala users. When I first saw Scala, I thought it was a total mess, and I thought, as I wrote things more in a Scala way, it became less readable, but I realized it was more expressive over time. So I think that’s where you end up getting to use Scala, which is really cool.
How to convince your boss? You should probably just work somewhere where your boss already agrees with you. So that’s my advice there. Yeah.
So another thing that’s good about Scala is a lot of people—there’s tons of Haskell tutorials on the web. I feel like Haskell is maybe not going to break into the enterprise, but Scala has a chance. So, by blending the functional style and objectory into programing, Scala might bring people closer to that pure, functional programming that Haskell offers without beating you over the had with it, so that’s definitely cool. And all my life, I’ve always wanted a typesafe build system, and Scala finally delivered it.
This happens a lot, okay. But I found it—so this is a weird—I found it easier to write the bullet points on this slide than the last slide, which is odd, but there are certainly some complaints. The compile times can be very long if you have a large application. Ask anyone who works at Foursquare and they will tell you all about it.
Great backwards compatibility—so every time they release a new version, they’ll break something. They’re known for doing this. The Java util list is the same as it was in ’94. That’s not the case with Scala.
Complicated—again, I think people just get used to the functional style. I think, over time, everyone’s going to get to understand it, and they’ll realize what they’re seeing is not really complicated. They just didn’t quite know how to wrap their heads around it at first.
This is one point that I’ve found personally, that it leads to madness, and I think Scala kind of goads you in a certain way. Let me try to explain that with some code. You’re looking at a trait, which is a type class that tells you you can create an object with certain parameters, and it will only be created once, and if you ever try to change the parameters, it just won’t work. You can see here, this is some crazy Scala stuff I needed to make this co-variant to get the implicits to work. I had to make P a tuple to get random parameters into this code. I used lazy vals here to capture the fact that the thing is created once, and then here’s a cool use of Either. So, if it blows up, you get a throwable back, but it won’t actually keep trying. So the main use of this was I/O on Hadoop. So if someone accidentally tried to do I/O in the context of a mapper, this would prevent you from crashing the name node. So this is very useful if you’ve ever crashed a name node. And then, down here, there’s some code, like how you actually get it, and it sets these parameters and then returns to lazy thing. Does that make sense to everybody?
So, then, here’s how you use it. This is how you call it. You call with that, and then type inference figures out, what you were asking for based on the parameters you cast in. I didn’t show the actual implementation of the implicit here, but it’s around. So here’s my point, though. So this is used by one client class in my entire client application. So show some self-restraint. Scala goads you to make these really cool frameworks. It’s super expressive, you read all these blog posts about advanced features. You just want to use them, but you’ve got to watch out. So this is one of the things that’s really practical in Scala. You want to just get the job done. You don’t need to do it the coolest possible way you can, and I learned that firsthand.
So that sums up why we chose Scala. And it also blends into making it easier to use MapReduce, since MapReduce is on Java, as you all know. So, for those of you who don’t know what Hadoop is—this is a high-level view. So these are files. These are just files. They can be in various formats. This is an abstract class, and then they get sucked into mappers. You can think of a mapper as a map function in Scala. You just map over a list of things, and then it spits out data here, it gets sorted, and then it gets merged, so then it gets fetched by these reducers and it’s merged and sorted again. A reducer is like an aggregator, so it’s not exactly analogous to reduced function, but it’s close enough. So you get keys, and you get an iterator over those keys. And my next slide will make that a little more clear.
So this is a full MapReduce task. You could do no reduce if you want. You could do no map if you want. But this is generally what a full unit of MapReduce work looks like. So it’s pretty tricky. And to actually implement it, you need to implement these interfaces in Java. So, like I said before, here’s the map interface. You see a key, and you see a value, and then you output another key and a different value. So that’s basically what a mapper does. And then the reducer, you notice Key 2 from here came in as Key 2 down here, and then you get an iterator of all the values of this that you outputted from before. So this is pretty simple. It is very low-level, though, and it’s nice to put up abstractions on top of this to actually get any real work done. Imagine trying to do a join in this. It’s probably difficult for people who know how to do it to do it in raw MapReduce, because it’s just not natural. So that’s the problem there.
As far as you guys choosing to use MapReduce or not, I think—this is my prediction—a lot of these algorithms that are coming out are tuned for MapReduce, and I think you’re going to see more and more of that. So while you could probably do things on a single machine at a smaller company, you’re probably going to see a lot of mindshare around MapReduce. So even when you have these medium-sized data sets, it still might be easier to use it, just because of all the code out there that’s available for you.
How about eBay and MapReduce and Hadoop? So we have petabytes of data, we have thousand-node Hadoop cluster, which is pretty big, and I mentioned before the multibillion dollar merchandising business that this Hadoop cluster supports. It also supports search, which is used quite heavily at eBay, and there’s lots of users and items. I can’t exactly say how many.
So how do I use MapReduce? This is a good question. If you just Googled MapReduce and framework, you’d get back a ton of stuff, and you’d probably be lost. I remember the first time I saw the MapReduce web site, it had BigHive, HDFS, core MapReduce. It was just overwhelming. So what do you do? The worst thing to do is write raw MapReduce. You’ll spend a lot of your time in boilerplate, and it just won’t be fun. You’re not going to enjoy yourself. So what did people early on say? We’ll just make an abstraction over MapReduce. And they came up with Pig, which is a scripting language, but it lets you integrate Java code and other code with it. Then they came up with Hive when we added Facebook, and that was basically SQL queries. So if you just need to know the average of a column, use Hive. Don’t go crazy. But there is a lot of setup to get Hive going also.
So this is where the picture starts to get a little bit better. We land on Cascading. Cascading is a Java framework. It’s built over MapReduce. It abstracts the whole mapping, sorting, reducing bit, and it lets you write natural code. And you’re going to see some examples of this. This is also maybe not the least amount of code you’ll ever have to write in your life, but it’s very maintainable, and I’ll talk more about that, as well.
Some other options may be Scoobi. You guys are Scala people, you’ve probably heard about this. It’s basically a MapReduce framework written in Scala. Remember when I talked about madness before? I think a lot of things in there were to scratch some itches. You have to compile it with the Y-dependent types flag. You know you’re in different territory. So those guys, I think they’re doing great work, but they’re definitely still working on it.
And here we come to the punchline here. Scalding is the way we’ve evolved here. So, to be fair to history, we have a lot of Pig code, we have a lot of Cascading code, and now we’re creating a lot of Scalding code.
So how do you make a decision? Here’s a kind of long quote. I’ll give you a moment to digest it. So the basic idea is, some guy built his house on sand, and then flood came—floods come a lot when you’re in software engineering—and it got washed away, except for the guy who built his house on stone, it's not going to be washed away.
So I believe that Scalding combines the best of Pig and Cascading. And you’re going to see this in a moment with some examples. And you might say during this, why we didn’t go with Scoobi, because it’s raw Scala? And part of the reason is Scalding is based off Cascading, which is an extremely strong base. The abstractions they built there have been tuned over years, and there’s a company backing it, so everything about it is really solid. I think I ran into one major bug, and it was fixed in the next version. So I trust in the Cascading people. Pig people, too—I do trust them, as well. Some of the Scoobi stuff was a little immature. It is getting a lot better, but it’s not perfect as of yet. Maybe one day it will be better. And I’ll bring up some examples to contrast it later on.
Here’s some good Pig. Remember that MapReduced API I showed you before? You don’t have to write any code that does that. This is some good Pig code. You say, hey, load me this file, give me some field names, do some filters. This sounds a lot like Scala right here. And then you can output it, and then over here you can do some other stuff that gets some other fields, and then store it somewhere else. Another thing, Pig is doing joins and group buys, so joining in that other API is clearly unintuitive, and you have to do some major hackery to get it to work. In Pig, you could just say, join these two variables by some value, and it just magically works. So I understand why everyone flocked to this early on.
Here’s some bad Pig. This is the downside. Let’s see here. What’s someone doing here? It looks like they’re taking a Perl script and sending it to every node on the cluster. So this may not be a good idea. Do you even know where that Perl script is stored? It could be in someone’s home directory. It’s definitely in this scripts variable, but I don’t know. It looks a little shady to me. I don’t actually know what this code’s doing, because it just stays NV terms, stream some data through it, and I get it back out. So without a comment here, I really have no clue what this Perl script actually does. So this is not so good. Another thing is, you can add Java code to it. So, right here, you’ll notice Java code being called. Again, I really don’t know what this Java code does. I’d have to actually look at the source, I’d have to find the jar, and that jar is probably imported via some script tag like this. And then it’s exported to everything on the cluster.
So another problem with this—so you don’t know where your source is, and you wrote a bunch of scripts that look like this, and now you decide Script A now depends on Script B, and after that’s all done, add Script C to run. So how are you going to string these things together? Well, that’s kind of a problem, because if you don’t always want to run A, B, and C in that order—maybe A is cached and gets updated every so often—you’re going to need to actually string them together using bash scripts or something like that. You’ll be like, there’s no way people do that. That is true. There are extra frameworks built, like LinkedIn has Azkaban, which basically allows you to construct a DAG of all your dependencies and lets you kick off that job. So this is just another layer of complexity if you need to handle if you’re going to write Pig code. And I just mentioned the scheduling, DAG creation. You need to use some external framework or roll your own. And it might be tempting to just roll your own, because it’s quicker and easier, but you end up, then, with all these dependency problems, and nobody knows what’s going on. I can say this from some experience.
So Cascading rocks! I’ll remind you again what it is. It’s Java framework. It treats MapReduce as a stream of tuples. So you have tuples in Scala. Imagine that a stream of them is just flowing through various pipelines, and that’s what Cascading is at a very high level. Look—it addresses two of the big problems I had with Pig also. It supports really large workloads, so I can use all the abstractions available to me in Java, which are plentiful, and I could write code with all of those. Some really cool things are the automatic DAG generation. So based on how you structure code in the Cascading framework, it will compute all the dependencies for you, and it will actually output a graph of everything that is there, which is super cool. If you just show somebody that, they’ll be sold. Also, parallel executions—since they computed the graph of everything it needs to do, then it can choose to do everything in parallel. And Pig can’t exactly do that out of the box. You’d need Azkaban to do that, or some additional framework.
Also, one other thing is, because of DAG generation, you can cache files. So, if there’s a really heavyweight job that needs to run, you can cache it on the disk, and then you don’t need to run it every time you kick off dependent jobs. I’ll show you an example of that later on also.
Okay, so here’s some code. Basically, here, I’m saying, hey, create a new masterPipe. I give it a nice name, and then I’m going to say, filter some URL-encoded strings out of this, and here’s the field name I'm passing to it. Now, what is actually in this pipe are actually a bunch of tuples, and they’re all named. Here’s the name of one of them. There’s a lot of good things about this. This is pretty clear. It’s Java code. If I hold down control and click in Eclipse, that will take me to the source, and that’s a beautiful thing, especially when you’re coming from Pig, where I couldn’t exactly do that.
And then, down here, it’s doing another thing. I’m saying, filter more inappropriate queries, and you’ll notice it’s building a pipeline. So this variable is passed into this thing, and then it’s reassigned. I just realized it couldn’t be reassigned because I made that a vowel. Anyhow, then you’ll see down here that it’s taken again, and then you’re going to GroupBy. So you could say, hey, I know I have these fields. GroupBy them and then give me that iterator. It’s the same iterator that you saw back in the raw MapReduce API. It can also do things like secondary sort. That’s what’s going on here. Does all this make sense? Cool.
All right. Oh, man, what did someone do here? Okay, so this is the output of one of those DAG calculations. Now, you may argue that, possibly, this wasn’t the best use of MapReduce, but the fact is, it was very easy to do in Cascading. And because of all the abstractions it gave me, it was very easy to do a loop and cause all these things to happen. And then they get floated there and there and there, and it’s finally outputted. This is really cool. And it’d be very difficult to manually put this into Azkaban, I could imagine. Again, you’d probably argue I shouldn’t do that, but this is definitely cool.
Cascading issues—I can’t really think of too many. This is an old IBM joke here. Anyways, the point is, the only thing I can really think of that’s not good is you have to still write a lot of code because it’s in Java. People kind of complain about the verbosity of Java. And also, you have a lot of pipelines. It’s not always super clear. So I guess you can level that complaint against it. But, again, it’s definitely a good workhorse. If you were to reach into the bag of Hadoop tools and pull out Cascading, I don’t think anyone would fault you.
So now it’s time for Scalding. Welcome back to the world of Scala. This is where Scalding really makes things cool. Scalding is a domain-specific language written on top of Cascading. It’s done by Twitter. They’ve done excellent work here. So let’s look at this code for a little bit here. So, take some arguments, it extends a job—that’s in Scalding—and then, here, you’re saying, okay, get me a text input, and that’s the file name in the arcs. And then, now, you’re doing something that’s pretty Scala-like here. Before I go into that, this is counting words. You may have seen this example countless number of times, like Hadoop is the best framework for counting words ever built. And you’ll see it implemented over and over and over. It’s a very good example, and really illustrates the power or each abstraction that every given framework adds.
So what do you got here? You’ve got some flatMaps going on. It takes a string. So assume this input was line number and then some text. And then I use this tokenize function to spit out an array of strings, and what flatMap is going to do here, it’s going to create more and more tuples that come out of this. So then you’re like, okay, I have all these words now, and I just want to get their size, so I GroupBy them and do a size, and then I write it out.
I think what’s important to note here is that you’re writing code that’s kind of close to what you would actually do if you were doing it in memory. So that’s really what the goal is here. They don’t want you writing streams and pipes and all these kind of things. It’s not really a natural Scala construct. They want you to be reminded what it is to write Scala code, and then you can be more productive. So you start seeing things like flatMap, you know what to do, as opposed to some tuple named Every—you’d be a little more confused. So to be fair to Scoobi, it has a very similar interface, but ,again, since Scalding is built on top of Cascading, you’re basically just proxying down to Cascading functions, which are really well designed and work excellently.
We chose to use Scalding. There’s still some problems. We have to use it at eBay. We have these gigantic tables on HDFS that have a hundred or more fields in it, and, if I want just one of them, I’m extremely confused about how to get it, often. So, if you just have to know the index of it, and someone changed it, all your code would be kind of screwed, so that wouldn’t be good. So what do you use Scalding for? You could definitely reduce the boilerplate of actually finding fields in HDFS tables, if you want to call them tables. It’s super extensible, so you’ll see an example of how we extended Scalding to do a lot of natural things that make our code a lot more readable and definitely more extensible. So this is a good point.
New hires—so there was a big argument about whether or not people who get hired into doing this kind of work should start off directly with one of these frameworks that’s super high-level to get them productive right away. So that’s why we’re pushing toward Scoobi and Scalding. It’s a lot less intimidating than going right to raw MapReduce or going right to Cascading. The problem is, when you actually get to do something complicated, you run back right up against the wall of, I need to know how MapReduce works. So I think it’s good that these things are coming out, but the thing it abstracts is still very important to know, and I’m not sure we could get away from that now. Maybe in time.
So practical Scalding usage—they use the Pimp My Library pattern, and we just took that one step further and did it again. This code generated boilerplate—so we actually just wrote a Scala program that parses over XML definitions of tables and spits out a very clean interface for accessing those tables. I’ll show you an example of that in a bit. So this is super important. If . . . did this, everyone was just clamoring for it, because everyone had this same problem of, I had to look at this table, and I don’t know what fields I need. And now we give them all consistent names based off this XML definition, and everyone’s super happy.
Cascades—cascades are a Cascading feature where one Cascading flow depends on another flow, and you can do special things. So you could say, if Flow A was stale, then regenerate its data, and then give it to Flow B. So, that I’ll give you an example of a little bit later, but that’s not in Scalding. That’s just in Cascading. But I submitted a full request, and they accepted it, so it is now going to be in the next version, so that’s very cool.
Traps—so often you’re producing a lot of data in an HDFS that’s just garbage. So a data warehouse team has to put it there. This is like QuickStream data, people put things in crazy fields they’re not supposed to, and it all ends up in your data warehouse. So what a trap does is say, hey, I was expecting this to be an integer, and it blew up. Just throw it in some separate area, and I can go back and look at it later. So these are critical for making these things work in debugging, because debugging a MapReduce job is very difficult, as you will see. I’ll get into it a little bit later also.
And then another important things is testing. Scalding actually has an in-memory test framework, so you could say, give me these inputs. You actually define the tuples that are your input, and then you define the tuples that are your output. They will run through your entire job and just validate if it worked. So this is extremely important, because, once it’s on the cluster, anything could happen, and it’s very difficult to tell if you had a rounding error when you’re adding millions of numbers together. So it’s super important to do things on a known data set beforehand to make sure you’re getting the actual results you want.
All right. So I’ll walk you through some of the code we wrote over here. I had a large hand in it. You saw beginning that extended job before, so now we have an eBay job, which I broke naming convention for branding purposes here. And then that extends that job, and then I mixed in this pipe boilerplate. So you’ll see an example of pipe boilerplate later, but that’s basically that code generation to get rid of having to know where every field is on HDFS.
And then, over here, is where we extend the RichPipe. So the way Scalding’s implemented is it has this thing called RichPipe, so, every time it sees a pipe, it turns it into a RichPipe. That gives you all these cool functions, like flatMap, Map, whatever else. So what we did is say, hey, I’ll just take that a step further. There’s a lot of common things people at eBay do over and over again that are often annoying. Unfortunately, some of the more annoying things are obscure, so not everybody knows them, and then you have to go digging for it.
So what you do is basically you define a trait that imports the various DSL classes, and then you just define a pipe, and, due to the magic of mix-ins, it ignores this when it mixes it in, which is really cool. And then, you could define all the really complex functions you want. So whatever mind-blowing code you have that you think people just can’t handle, you just wrap it in this abstraction, and then you can call a really complex function exactly like we called flatMap. And this is extremely useful. One example was we wanted to compute quantiles over a given category and the items that sold in it for what price. So we just extended the RichPipe and included a function like compute quantiles in category. So, if anyone wants it now, they can just use it automatically, which is really cool.
So it ends up writing code like this, which some might argue is too readable for Scala. Martin Odersky ells you to use single-letter variable names and things like that. So you end up with code that basically says, hey, I want to get the transactions pipe, and give me some default path logic, and we’ve defined all that, too, for you. And then this is where the magic happens. Every field name is in this pipe at this point, and then I say, hey, just give me the fields I need, and you just toss out the name. And to find the name, you could just look through the code, and this actual concrete class that was also co-generated that has the name Checkout Transactions. So now, I have exactly the fields I need. And then I use this in a single job. This is not shared code, but I did it to make the high-level flow very readable. So, to give you some more context, this is actually a user preferences job, so it says, does this user actually like brand Coach? And then give them an affinity towards that piece of structured data.
So, in order to encapsulate that entire process, it does a few things. It counts how many times a user interacted with that piece of structured data, and then it does a score calculation, and then it does a confidence calculation. So if anyone came up to this code, they could easily click into each bit, and then find the actual calculation that I did. To be fair, there’s a couple of lines that I left out. But it’s generally very simple, and your code could end up looking like this. So remember back when you saw that Pig code where it was just—this is why I say it combines the best of Pig and Cascading, because the Pig code did look somewhat similar in terms of the use of filters and Maps and GroupBys. But it’s not in Scala. So inside all these functions, whenever I bring a piece of data into memory, I could use all the abstractions Scala has to offer, and those are even more plentiful than Java, which brings me back to that madness point, so don’t go too crazy.
Here’s some other interesting things we’ve done. Collaborative filtering is typically very difficult to run on a large data set. So there are libraries like Mahout, but they have an extremely large learning curve. The examples will run out of the box, but then as soon as you’re like, okay, I’m going to plug my data into it, you start getting exceptions, and you have to go to the user group, figure out what’s wrong. I hope this is going to get better in the future, but it’s not as user-friendly. I’ve noticed a lot of cool things happening in Scalding around this space. One thing is Edwin Chen, one of the guys who worked at Twitter, just put out a blog that used a very simple item-to-item similarity algorithm, and it’s extremely extensible. You basically just override the input pipe. So you say, give me this input, and then you could do some output munging, too. It just works.
So, the barrier to entry to running collaborative filtering algorithms, which are typically difficult, has just dropped down to almost nothing if you have a Hadoop cluster. So this is extremely powerful, and a lot of the abstractions that the Scalding guys are building in go with this same principle. There’s another library called Algebird, and it has all sorts of cool things. One is a monoid that just gets average standard deviation and count, and it does this cross MapReduce, but it doesn’t need to bring all the data into memory at once. So they’re implementing a lot of algorithms like that, that are smart enough to work on streams of data instead of viewing the entire data set in memory at once. And that’s what using Scalding is really good for. You’re leveraging those abstractions to do more and more complicated things, but you’re doing it in a simple way.
To be fair, these methods of collaborative filtering are a little less precise than some of the other state-of-the-art methods, but I think the point is that if you can get that up and running, you can make something better for your customer tomorrow, rather than six months for now, which is super cool. And if you’re super interested in machine learning, one of my colleagues, Chris Severs, is giving a talk at a Scala conference about this. I’ll post a comment on the board. Definitely check it out.
Here’s another cool piece of tech we built using Scala and Cascading. This is structured data importance, so it’s like the user has an affinity to a certain brand, but in aggregate, around what aspects do people decide to shop? So, if you go to the store, do you first look for the brand, or do you look for the color of the thing you want, or do you look for the size? So all these are various aspects, and this represents the supply we have on the site. This job is important for two reasons. If bag depth was really important but had no supply, maybe we need to tell people to start listing it more. And also, if you notice the eBay search experience, you’ll see a left navigation, and there are certain aspects you could actually click on, and then, when you click on them, it filters the results. So, above the fold, you only have the opportunity to show X number of things, so you have to figure out what they are.
So remember that crazy DAG I showed you from before? That’s this job. And the reason the DAG is so crazy is that it’s a linear model that’s computed completely on Hadoop. The coefficients on this model are not learned on Hadoop, but they very well could be. And then you just plug it in and it’ll compute your model for you. So you could do things like that pretty simply without resorting to frameworks or anything like that, which I think is pretty good.
One other interesting thing that this job used is structured data extraction. We have these other machine-learned models that are used within Hadoop context. So, say you’re streaming over a bunch of search queries and you want to know, did that person mention a brand or not? So you could build a completely offline model that is partitioned in some way, and then you use Hadoop to stream over this massive data set. So you load that machine-learned model into memory, and then you just stream all your data through it and output more and more tuples. When you have 1,500 machines doing this for you, it gets done a lot quicker than you could ever imagine doing it on your own. So you could scale simple models across huge amounts of data by using Hadoop in MapReduce, which brings me back to the earliest point of that trait you saw, to do I/O once. That was because of this. So I had seen it, it had been used once in the context of a mapper. I realized it definitely could cause problems, so you only want to do that I/O once. So that’s why that earlier trait was created as well.
Some other cool things you should do, practical concerns, especially with this job—this graph was built out of a SQL-like database. Often your MapReduce jobs will turn over immense big data, and, at the end of it, you end up with something that’s very small, and you’re like, I want to ask some more questions about what I outputted. So you just chuck it in a SQL-like database, and you can figure everything out from there.
Here’s a particularly cool piece of Scala code that I recently wrote. We did some markoff chains to investigate the buying patterns, and it’s about fifty total lines of Scalding code, but here’s the meat of it. You can’t do these kind of things in Pig. So this is all in the context of a reducer, but I’ve used all the cool stuff Scala has to offer to compute this in a few lines of code. And these lines of code here are almost identical to what I wrote to test it in a single-threaded context. And then I just copied and pasted it into a reducer, and then ran it on the cluster, and the first shot, it came out with the data I wanted.
A cool thing here is how it—a markoff chain is, given your current state, what’s the probability you will advance to the next state? So you could imagine this state as you bought something, and you’re going to advance to the next state of buying something else. So what you’re looking at here, the purchases that somebody made, and then we’re figuring out the probability that they’re going to move on to the next state, which is you’re going to buy the next product. So this is pretty useful. And then I just use this cool trick to get the first-time probability here. So you’re not looking at a MapReduce code raw. This isn’t Scalding code, it’s just Scala that’s being used. Does that make sense to everybody? Any questions?
Here’s a very interesting job. Again, we had to mine search queries. So, there’s billions of search queries associated with a given user on eBay. A lot of the queries they make are done before they’re logged in, so they’re not fully associated with the user. But if you want to know what the are top queries for a user, various use cases could be the saved-search kind of thing, like here you search for these things a lot, maybe you want to follow it, something along those lines.
So this job here really took all the lessons we learned about Hadoop and implemented them. So these twenty billion user queries are not in a single place. They’re mixed in with clickstream data. So, to actually get to them, you have to parse over tons and tons of clickstream data. So it doesn’t make sense to, every time you want to do that, look through the fact that you clicked on an item, clicked the back button, did something else, it’s just useless. And there’s petabytes of it. So what you do is you go over it once, dump it to a known location for a given day, and what Cascading allows you to do is say, hey, if this day’s computed already, go on with your life. Don’t recompute it. So that’s the perfect example of a great abstraction Cascading gives you.
So now we have all these queries that are on the disk, and they’re isolated. What do we have to do? One typical thing is deduping them. So you could run some tokenization on them. This is actually what we did—ran some tokenization, and then we used this unique function in Cascading, which basically sends every query to a bunch of mappers, and it keeps a few in memory and says, if I saw it already, get rid of it. And then it pushes everything to a reducer, which has the effect of deduping naturally also. So that is pretty cool.
So now you’ve deduped a lot of the useless queries, but you’ve retained the ones that are the newest and the ones that are the oldest out of all of those. So now you can rank. You could say, if someone has searched for something a year ago, and then they did it six months ago, and then they’re still doing it today, that’s probably pretty important to them. So you could write a ranking function around that. It’s very easy to get mins and maxes of things in a MapReduced kind of context. So that worked out really well.
Another thing is some kind of fuzzy matching. Given a user did X number of things, you rank the top bunch of them and maybe you say, well, these two things are really the same, but since you’ve reduced your data set down so much, you can now actually run a more comprehensive algorithm to say these two pieces of a text are exactly the same. So you’ll see this a lot in MapReduce—you start out with a lot of data, you whittle it down, you whittle it down, you do the easy things to get what you want, and then you can finally get it into a compact enough representation where you can do something really cool to it, which is pretty typical.
So this brings me back to a point. I’m going over twenty billion user queries. If some of them are buggy, how the heck am I going to be able to tell? I can’t just attach a debugger to every JVM that’s running in our Hadoop cluster. That wouldn’t work. So what you can do is count things. Hadoop gives you this abstraction called counters, and, as you see something, you increment. So what we did was we put counters in for bad situations. Any time you saw a bad situatio,n you got a count. And then when the job ended, Cascading gives you a list of all the counters that ran and all the flows, and then you can run validators over it. So this is a good use for Scalaz validation, which is pretty cool. You could just run validators over all the flows that were ever done.
So you have a bunch of jobs. You have a bunch of Scala code running these jobs, and now, someone asks me, I want this updated Deli, so what do I do here? So this is pretty simple. This is how we do it. Because everything is Scala and Cascading, we basically use Jenkins to compile it, and then we say, hey, Jenkins, go and send that compiled code over to the Hadoop proxy. And because, like I mentioned, it handles all this dependency of all these various flows and jobs within this Hadoop context, it’s easier just to package up a single jar and send it over. And you don’t need a complex scheduling framework like Azkaban, necessarily. Depends how crazy you go.
You can just make a simple Jenkins job that says, hey, trigger this bash script that runs this job, and then you wait on the status, and then when the status is complete, you just say, hey, Jenkins, okay, go back there, tell it to download the file. So Jenkins compiled it, sent it over. It then said execute it, and then it said, oh, all right, it’s done. Okay, download the file and then sync it over to these batch database load machines—so these are just BMs that we have—and once they’re there, then Jenkins also kicks off another process that says, okay, get this data into the database. And that’s where our data goes into Cassandra, and so those user preferences I talked about before, which was like, like Brand X, that goes in Cassandra, and various other stuff, like that aspect recording stuff goes to my SQL. Same thing with those user queries, and some legacy stuff goes into Mongo, as well.
I think that about wraps it up. Does anyone have any questions? You, sir.
Oh, okay. The question was about HCatalog, and had we looked at it. So to be honest, that kind of decision is out of my hands. The Hadoop group is a bunch of people, and they have things that support Teradata and other kind of big data warehouse stores, so they have their own system, and I don’t exactly know what it is, but it publishes similar metadata, which we used for code generation. So I could imagine you could do the same thing with another suite of software.
Ah, yes, you’re very observant, sir. So the Scalding, the API we’re using currently, is not typesafe, so the only type safety you get is once you’re actually once you’re executing codes. Back here, now I get type safety, but that X-variable where I’m getting stuff out of, there’s no type safety on that. There is a typesafe API that’s in development, and it lets you pass classes around. So it will always validate that, okay, you gave this segment of code an Adam class, and you’re getting back a Foo class. And then it wires type safety throughout the whole process. I don’t have an example, but you can imagine there being types on every one of these things, like count user in action, return some type. And that’s what they’re doing with the typed API. You, sir.
We use all sorts of stuff. There’s sequence files, and there’s TSVs. A lot of this data is created by this data warehouse team, and we’re kind of stuck with what they give us, but when we sample it, we’ve done a combination, like occasionally we use sequence files and a serialization format. But, often, we’ll just use TSV, because I could go to it and look at it, which is actually quite valuable. We tend to gravitate toward readable text.
Multicore? We haven’t done any kind of multicore processing within the MapReduce flows themselves, but you certainly can. All these frameworks let you actually get pretty close to the middle of MapReduce, and there are methods in there where you could override, and it would let you do multicore processing. One example is if you just aggregated a bunch of stuff, and you know it fits in memory, maybe it’s faster to do multicore processing on it. We haven’t run into a situation where we necessarily needed to do that. One of the problems with doing that is you kind of starve everyone out. So, if I go nuts on my one reducer, and I’m sharing that node with a bunch of other people, which is very common at eBay—we have many clients of the Hadoop cluster—you basically consume all the resources of that reducer. Hadoop doesn’t have very good boundaries between different tenants. So if one tenant takes out the system, it kills it for everyone else. That’s why we personally haven’t done that. But you could do it.
Queuing system? Yeah, there is a queuing system that you can give people certain priorities over other people, but you don’t have any control of where the task ultimately gets run. So the task is going to be scheduled out by the MapReduce framework. It’s going to land on some worker, and if you starve out that worker and start failing everyone else’s jobs, there’s nothing they can do about it. You.
That’s a good question. No, we’re using Python, because we like Python, as well, and it was very easy to do. We were familiar with it. But you certainly could use Scalding. I think this general framework supports anything. I do have one job that creates bloom filters that uses Java. I certainly could’ve done that in Scala. I just didn’t. Yes?
The question was, how does it play with different versions of Hadoop? I’ve run Scalding code on .22, if I recall. We’re on a newer version of that. It does work—I was able to get it to work on an older version of MapReduce. I know there are two specific MapReduce APIs, and it does support both of them. There’s the old style—if you actually look through the MapReduce code, you’ll see an end statement, like use new API, use old one. You can get it to run with the old API code. I’m not sure exactly what the problem that you ran into was. It’s possible they didn’t cover all the cases, but generally it’s kind of friendly, I’ve found, with the stuff we do here.
Yeah, it’s definitely interesting. Cascading is built against specific versions of Hadoop also. But I think in general they try to be friendly to the older versions, because a place like eBay gets stuck on an old version, and you actually need to take the cluster down to upgrade it, so it becomes very difficult to upgrade seamlessly. You have to actually take the thing down. Any other questions?
Yeah, Hadoop supports G-zipping, I believe by default, and it also has a splitable compressed format. The name is escaping me. But definitely the data we use is zipped up. But the thing is, when you choose to use compression on Hadoop, you have to keep in mind splitability. So when you’re trying to farm tasks out to mappers, if you can’t split at arbitrary points of a file, then you can’t actually get it to a mapper without truncating something useful. So yeah, I believe we don’t use the splitable one, but I’d have to double-check. Yes?
Yeah, I learned Scala in conjunction with Cascading, to be fair. We originally just had Scala calling Cascading directly, so we didn’t write anything in Java, and that’s when I started picking up Scala. I’d say, I think my early Scala, to be honest, looked a lot like Java. So I look back at things I did before, and I see myself incrementing data into mutable lists, and I’m like, what the heck was I thinking? That’s the evolution I took over it. And then, when I started getting better and better at Scala, I found Cascading and Scalding more and more appealing, because it melded with the functional constructs that I was learning. Yeah?
The guy I mentioned who’s giving the talk at Scala Days, he’s a big proponent of it. So you could imagine all of our data warehouse tables are stored in this type of serialization, and that would make things a lot easier in certain cases, so people do use it, and they like it. I think we just haven’t jumped into it because we haven’t had a real strong, pressing need to do it, but it’s definitely something to consider if you’re starting fresh and writing tables into Hadoop that many people are going to be using, Avro's a good choice.
Alright, anyone else? No? All right, cool. Well, thank you all for coming. I hope this was informative. If you want to follow up anything new about eBay, go to our website. We are hiring, of course, and we love Scala here. We are also trying to get Scala into more visible places in eBay. For instance, all the recommendation services currently run on Java, and we’re experimenting—right now, people are pushing code that is going to get Scala serving recommendations. So we’re experimenting with it in more places than just Hadoop, which is really cool.
Thank you, and good night.