Erlang MapReduce Queries, MultiFetch and Network Latency with Riak

I know, you're looking at your calendar, let me be the first to assure you it's not 2011. I recently had the need to write some Erlang MapReduce queries for Riak and it was a bit of an adventure. The Riak MapReduce documentation is good but generally focused on JavaScript. If you're using Riak it's quite possible you've never had the need to use its' MapReduce capabilities. We haven't really used it at Boundary before I dug into some performance problems and it's probably not regarded as one of Riak's strengths. With that said though it's a nice feature and was worth some investigation.

Slow code

To provide a bit of context let me first describe the performance problem I was investigating. Boundary customers were experiencing poor response time from a service that is responsible for managing metadata for Boundary meters. The service is called Metermgr and it's a webmachine/OTP application that relies on Riak for persistence and exposes meter metadata with a REST interface.

I noticed that as the set of meters for an organization grew there appeared to be a simple regression in a certain queries response time. For queries with as little as 200 keys response time was between 2 - 4 seconds. After taking a look at the code I was able to pinpoint the cause of the slowdown to a function called multiget_meters. Unfortunately this function didn't multiget anything rather it iteratively fetched them one by one, oof.

Anyway, my initial thought was, "I'll just use MultiFetch."

Does Riak support MultiFetch/MultiGet?

If you're familiar with the more popular Riak clients or search around the internet for "riak multiget" you might get the impression that Riak supports retrieving multiple values in a single HTTP or Protocol Buffers request, sometimes referred to as "multiget or multifetch".

Unfortunately that's not the case, take a look at the source you'll see that Riak itself doesn't support these capabilities. Rather some Riak clients provide this functionality by parallelizing a set of requests and coalescing the results. The riak-java-client is one such example.

Having had experience with the Java client I incorrectly assumed that the official Erlang client had a similar implementation but if you check out the source you'll notice it doesn't support MultiFetch. I did a bit of archeology and found there are a lot of posts with questions and requests around implementing multifetch in the Riak Erlang client. Most of these posts point the user towards using MapReduce. The most useful thread I could find on the subject can be found here, not surprisingly it is entitled multi-get-yet-again!

MapReduce in Riak

Implementing MultiFetch in Erlang wouldn't be too difficult but several users reported very good performance using the MapReduce approach with the only caveat being:

  1. I heard MapReduce in Riak is slow (hearsay etc...).
  2. MapReduce queries in Riak clusters are run with a R=1.
Unfortunately the latter is a serious problem and I would like to see it addressed but for now let's disregard this as it's outside the scope of the discussion. It's fine, take him outside and show him the pool, get him a cookie, he'll be fiiiiiiine, etc....

The MapReduce docs on Basho's website are pretty good but there's a lot of data to sift through in order to find the most relevant pieces of information to get started quickly. After doing so though I'm pleased to say using Erlang MapReduce queries with Riak is quite easy and there's really only 2 important pieces of information you need to know to get started.

  1. Riak has built-in Erlang MapReduce functions and you can use these to address many common use cases. You should learn how to use these first.
  2. You can write custom Erlang MapReduce functions but you need to compile and distribute the object code to all riak nodes.

As noted in the docs the basic MapReduce function riakcpbsocket:mapred/3 takes a client, a list of {Bucket, Key} tuples as input and a list of Erlang Queries. Let's dig into the Query a bit more, it looks like the following

{Type, FunTerm, Arg, Keep}

Type - is an atom and is either map or reduce
FunTerm - a tuple 
  for built-in functions use : {modfun, Module, Function}
  for custom functions use : {qfun, Fun}
Arg - Static argument (any Erlang term) to pass to each execution of the phase
Keep - True/False - Include results in the final value of the query

The examples in the documentation focus heavily on writing your own qfun queries, though as I mentioned you can't just use qfun without some upfront work, the documentation notes.

Screen Shot 2014-06-25 at 6.44.01 PM

In addition, there is another paragraph that in the section called "A MapReduce Challenge" that states.

Screen Shot 2014-06-25 at 6.46.47 PM

In summary, if you want to write custom MapReduce queries in Erlang you need to compile and distribute your code to Riak nodes. I've gotten so comfortable using erl as a REPL that I glossed over this and assumed I could simply pass functions references and they'd be evaluated. If you don't take the time to read and fully understand the documentation you might skim past those qfun requirements and just start writing your own custom queries like me and this guy. Combine that with the fact that qfun MapReduce error messages are generally quite opaque and that can lead to a bit of frustration when getting started.

I'd prefer the documentation break out the difference between built-in and qfun queries more clearly and focus on modfun examples initially with a separate qfun section, preferably with a big red callout yelling "Hey Dummy, don't try this yet". The JavaScript MapReduce API doesn't suffer from this limitation of course because it's JavaScript and is interpreted via the Spidermonkey JS engine that ships with Riak. Perhaps that and the recent popularity of JavaScript is why it is given much more attention in the docs.

Simulating MultiFetch with Built-In MapReduce Queries

So back to the point it's best we understand the built-in queries before we go any further. Here's a quick walk through of the default map functions that are provided.

map_identity - Return a list of riak_object for each bucket/key
map_object_value - Returns a list of values stored in each key (calls riak_object:get_value(RiakObject)) 
map_object_value_list - calls riak_object:get_value(RiakObject) assumes get_value returns a list, returns a merged list

There are reduce phases as well, but to achieve multifetch like capabilities we only need to concern ourselves with the mapobjectvalue map function. We can achieve our original multifetch use case by substituting.


As expected a quick set of tests against the production cluster and we've reduced the query from 2 - 4 seconds down to an acceptable (albeit not blazingly fast) average of approximately ~115 milliseconds.

Comparing to MultiFetch in Java

These results of course got me thinking about how Erlang mapred would perform compared to MultiFetch in Java on the JVM and as such I decided it was worth gathering some data. I constructed a test for 20, 200, and 2000 keys (this is not a benchmark) and ran each of the 3 tests 100 times, gathered samples and calculated the average and variance. I ran the tests on a server in the same data center and on the same broadcast domain as the Riak cluster. As to be expected MultiFetch outperformed mapred and the latency of MultiFetch (as noted by Sean Cribbs and the Riak documentation) was more predictable.

Response time in ms where network latency ranges between 0.1 - 0.4ms

As the number of keys increased by orders of magnitude query response time becomes less predictable with both approaches though MapReduce's variance is greater. Many raw samples with MapReduce fell within ~600ms but there also several samples between ~900ms and ~1400ms.

When might MapReduce be faster?

This had me wondering if there are any situations where MapReduce might be preferable to MultiFetch or should I always just use MultiFetch? It seems to be the prevailing sentiment, most in use by clients and even Basho sometimes seems reticent about suggesting the use of MapReduce. I decided to run the the same set of tests but this time I ran them from Metermgr running locally on my laptop connecting to the production Riak cluster over the VPN.

Response time in ms where network latency ranges between 100 - 300ms

While the results are somewhat expected they are interesting nonetheless. Initially with a key set of 20 MultiFetch overcomes the added network latency and outperforms MapReduce but as the key set grows by an order of magnitude the average MapReduce query time outperforms MultiFetch by a factor of 2. Average variance remains less predictable in MapReduce because adding network latency doesn't affect the variance we experienced at sub-millisecond latency.

We all know situating your application servers near your database is important for performance, but in an age of "hosted this and that", "PaaS" and "DBaaS" as a developer you may end up using a database or service where network latency becomes a factor. In the above example using a MultiFetch approach network latency is compounded as the input set grows, whereas MapReduce takes that hit only once, hence the improved average response time.

I would of course be remiss if I didn't mention that Boundary is an exemplary tool to monitor performance of such different techniques and can provide 1 second resolution of average response time for Riak Protocol Buffer queries whether they are within the same data center or across the internet.

Where to go from here?

Well, I've got a solution for my performance problem that meets my near term needs. I'm interested into digging into alternative clients and seeing if a MultiFetch implementation for Riak exists in Erlang, if I don't find one I like I will write my own. I also believe it's incorrect to say "MapReduce in Riak is slow", in fact under certain input constraints and configurations it is not only acceptable it is preferable to the MultiFetch approach, if latency predictability is not too much of a factor. The problem is more nuanced than "should I use MapReduce" and it's more abstract than MapReduce and Riak. It is about read techniques and their performance within the constraints of a distributed system. There are problems and there are tools, we need to use the right tools to solve a problem in certain situations.

I'm looking forward to digging into more custom Erlang queries and can already envision situations where Riak MapReduce might be favorable. Finally if you're using Riak but haven't dug into this custom MapReduce queries because you're not comfortable with Erlang then it's about time you learn you some.

Special thanks to @pkwarren for peer review, without his grammatical support this post would be unreadable

comments powered by Disqus