I'm Brett Slatkin and this is where I write about programming and related topics. You can contact me here or view my projects.

13 January 2013

Why is Map Reduce faster? -- A practical example of data joins

Joining data with Map Reduce is the fastest, most scalable way to generate reports.

Generating reports is a vital part of any product. Reports show how you're meeting the needs of your users. Reports tell you how much money you owe or are earning. Even PageRank is just a report of the best content. But generating reports at scale, on large amounts of data, is difficult. It's even harder if you need to combine different types of data together.

To show what I mean, let's go through an example: Generating a report about your users. I'll explain this in App Engine Python jargon, but it translates easily.


Starting slow

The common first step for such a report is iterating over your user table. You would do a bunch of queries for each user, calculate some things (spam score, lifetime value, etc), and then save the result back to the datastore (one result per user). If you're trying to rank influencers in a social network, your cronjob may look like this:

def calculate_influence(user_id):
  message_count = Message.query().filter(Message.sender_id == user_id).count()
  follower_count = Follower.query().filter(Follower.target_id == user_id).count()
  user_stats = UserStats(
      id=user_id,
      message_count=message_count,
      follower_count=follower_count,
      influence=message_count / follower_count)
  user_stats.put()

for user in User.query():
  calculate_influence(user.key.id())

The problem is your throughput is awful because of latency. Even if you do the datastore queries for one user in parallel, the latency of each user analyzed takes as much time as a single lookup (50ms-200ms) plus your logic (say, 50ms). That means a single thread can only process 4 user records per second. If you had 16 threads going at this rate you could only process 64 user records per second. If you have 10 million users, you're looking at almost two days for this job to complete.


Going fast (in theory)

Imagine that something fetched the per-user data ahead of time and passed it to your analysis function. Now your function does nothing but your calculations for 50ms, no datastore lookups. One thread could process 20 users per second, and 16 threads could do 320 per second. That's a factor of 5 speedup for the same number of threads. It would reduce the processing time of our example from 2 days to 9 hours.

How do you accomplish this? The answer is data joins with map, shuffle, and reduce. Let's go through how this works.


Straight-lining: Map

First, run a mapper on each table from your analysis function. For the example above this would be one mapper for the Messages and one mapper for the Followers. Each map function will consume a record (independently, with no shared state) and output a corresponding key/value pair:

def map_message(message):
  return (message.sender_id, 'm')

def map_follower(follower):
  return (follower.target_id, 'f')

The output of the mappers is a sequence of key/value pairs. For our example, keys would be user_ids and values would be from the functions above:

[ ('67de5732', 'm'), ('41bd4388', 'm'), ('018dc053', 'm'), ('77e1f707', 'm'),
  ('06a06e66', 'm'), ('f66df598', 'm'), ('442bb7be', 'm'), ('1c67b5f6', 'm'),
  ('9b77f86d', 'm'), ('7f1240eb', 'm'), ... ]

You don't actually get back a Python list like this, but you can think of it that way. In practice it would be a set of record files, each with 100,000+ records and 1GB+ of data.

Map is fast because it processes each record as quickly as your system can get it off disk. The natural orderings of your Message and Follower tables don't matter. There is no performance difference between a date-based primary key and a randomly assigned UUID. Indexes have no effect. Map will run sequentially through each table at full speed.


The tricky part: Shuffle

Second, you run shuffle on the mapper output. Shuffle is an optimized collator. It takes a bunch of mapper outputs and regroups them by key. It would transform the sequence of key/value pairs from above into this:

[ ('0cc562bb', ['m', 'f', 'f', 'm', 'm', ...]),
  ('1c8a6783', ['m', 'f', 'f', 'm', 'm', ...]),
  ('24364a28', ['m', 'f', 'f', 'm', 'm', ...]),
  ('93a281c5', ['m', 'f', 'f', 'm', 'm', ...]),
  ('93bc00b2', ['m', 'f', 'f', 'm', 'm', ...]),
  ('a028f042', ['m', 'f', 'f', 'm', 'm', ...]),
  ('a7553463', ['m', 'f', 'f', 'm', 'm', ...]),
  ('ca3a1966', ['m', 'f', 'f', 'm', 'm', ...]),
  ('e3b7ceae', ['m', 'f', 'f', 'm', 'm', ...]),
  ('ea40670a', ['m', 'f', 'f', 'm', 'm', ...]),
  ... ]

Shuffle is the most important part of Map Reduce and the function that makes this all work. It's sad that it doesn't get more widespread credit (like Jefferson getting stuck on the $2 bill). Shuffle is a distributed merge sort algorithm. It takes individual, unordered key/value pairs as input. It outputs a sequence of key/many-values groups, with one group for each unique key.


Parallel joins: Reduce

Third, you run reduce on the shuffle output. Reduce takes the place of the analysis function. It assumes all per-user data was already fetched ahead of time, thanks to the magic of shuffle. It would transform the shuffled data like this:

def reduce_influence(user_id, values_list):
  message_count = 0
  follower_count = 0
  for value in values_list:
    if value == 'm':
      message_count += 1
    elif value == 'f':
      follower_count += 1

  return UserStats(
      id=user_id,
      message_count=message_count,
      follower_count=follower_count,
      influence=message_count / follower_count)

Reduce will process the shuffled data sequentially, as fast as your network can go. Each run of reduce is independent from all others. The relative order of reduce runs doesn't matter. This means reduce can be massively parallel. Reduce's output may be saved in batch (asynchronously) so it doesn't hurt throughput.


What doesn't work

With the cronjob example above, the only way to go faster is optimizing the datastore access. Making your analysis function instant would only improve performance by 25% (it's 50ms of the 250ms per record). Caching won't help at because the cronjob touches each record only once. Making your machine faster won't help because I/O is the bottleneck. Improving I/O latency will max out at the access time of your disk (~10ms). You're stuck.

Using a system with native support for joins (like MySQL) doesn't help with this once you get enough data. Under the covers, an SQL engine is a more generic version of the process I've described here, but on a single machine. Instead of shuffle, it presorts indexes. Instead of reduce, it aggregates in memory. The problem is once disk I/O is maxed out, or you can't fit your indexes in memory, SQL is hosed.

(Update: Yes, there are parallel, sharded, and non-standard SQL engines out there. There is also SQL-like infrastructure for warehousing and high-availability. I'm asserting that the fundamental nature of the way Map Reduce process information (sequentially) is more efficient than these for report generation, where you want to process all of the data in a single pass.)


Conclusion

What's surprising about Map Reduce is that it decouples the speed of your analysis function from the speed of your I/O. If you need more throughput, you can throw more computers at the problem, more disks, more network. You can scale out (more) instead of up (bigger).

For data joins, you can shuffle as many mapper outputs together as you need. In this example we have just two (Messages, Followers), but we could easily add fifty without any changes to the shuffle algorithm. So: Shuffle is awesome! When people talk about TeraSort performance, what they're seeing is the speed of the shuffler.


If you're interested in doing any of this yourself, be sure to check out the App Engine Map Reduce project, especially the Shuffler Service, which makes the library scale to large datasets.
© 2009-2024 Brett Slatkin