Scala powered streaming migrations in MongoDB on millions of records

Posted by Konrad 'ktoso' Malawski on 01/11/2012 – 22:23;

In my project at work we have some production systems using MongoDB as their primary database. Quite a number of the collections we have there could be called “huge” and we sometimes have to migrate data from one (let’s call it a “schema” for simplicity – ok?) schema to another one, or perform some statistics on the entire collection.

So the first thing you’d try to do is a typical foreach on a rogue query like this:

… and do whatever stats etc you need to do for each of the elements. Turns out that there’s quite a few reasons this sucks (bad time), and wouldn’t even get close to computing the entire thing. Some problems are:

  • the cursor WILL timeout if used like this
  • we’re mapping stuff to objects of Person, so that takes time and memory

(ps: we could do worse than the example above – don’t try to get a huge collection in memory… ;-)). Moving on to the solution, there’s a few things we have to do here. First, stop the cursor from timing out, which can be achieved by setting the apropriate option on the Cursor:

As you’ve already noticed… we’re down to the low-level api here… For now let’s say I’m ok with that (we’ll enable rogue a bit later again). So putting this in context, you’d have to write:

Ok, here you can use the cursor and iterate over the whole collection. That’s quite a bit of code but we got to the point the cursor is usable, and won’t time out… The next step is to package this up in a nice function, and allow passing in a rogue query object. I’ll paste in the end result of those steps so you can analise it yourself:

There is a minor annoyance with writing query.asDBObject.some (which is Scalaz’s equivalent of Some(query.asDBObject))… Let’s fix this with a simple delegating function:

But I still am not happy with this. As already mentioned, we’re dealing with huge collections, so the processing may well take up to a day sometimes. Let’s see what we can cut off here… Oh yeah – let’s not select fields we don’t use. Let’s revise our code to implement a streamSelect version of the above, which will only select fields we’re interested in from mongo:

Here we only select the fields we really need – which has proven to be a big performance boost. It’s quite readable, though I wasn’t able to get rid of the


type parameter in the


helper. We’re using such streams wherever we know there’s “a lot of stuff to process” or in so-called “preloads”, where we compute a set of values from an entire collection for alter re-use.

You may have noticed that all this has not been very TypeSafe (the callback isn’t). So… you might ask, did we implement a “typesafe version of our streams? And in fact we did, though it’s tuple based, so we had to implement the same thing multiple times – for different tuples. I’ll paste just the usage of the TypeSafe version here (and if you’re interested I can do a follow-up blog post about them):

What we’ve gained using those streams is the ability to easily write all the migrations we need to, and we’re still elastic enough to for example – remove fields from the collection (with some processing, so just a mongo query won’t be enough). In streamSelect we’re open to have multiple case statements, so even if the collection is not homogenic, we can match null on some of the fields and still proceed over the entire collection – if needed.

Anyway, I hope you found this bunch of code snippets interesting or useful – we certainly do in our day to day coding :-)

Tags: , , , , , , ,

This post is under “coding, guide, scala” and has no respond so far.

Post a reply