Testing Leader Election (in Raft) using Akka, without Thread.sleep() ;-)

Posted by Konrad 'ktoso' Malawski under coding, english, fun, scala (No Respond)

As I’m currently living in London and miss the Kraków SCKRK Reading Club meetings a lot – (they’re basically the best thing since sliced bread!). Even though I do I attend them remotely anyway nowadays, I still miss having proper discussions on science papers. The same feeling struck me and  some friends (Andrzej from GeeCON, to name one) decided to start a similar initiative in London. It’s called Paper Cup, the Reading Club, and is a “reading club”, which means that we select Computer Science Whitepapers to read, read them at home, and then during a meeting discuss them fiercly :-)

For this week’s meeting we decided to read Raft (Raft – In Search of an Understandable Consensus Algorithm), which is “very trendy” right now (and relatively fresh; the paper is from Oct 2013!). But to sum it up quickly – it’s an algorithm trying to solve the same problem as Paxos (achieving consensus in distributed systems), yet being understandable at the same time. We did read Paxos more than a few times (the Lamport’s “Paxos Made Simple” paper for example), but it was always a challenge to get everything right with it. Raft’s authors on the other hand really wanted the algorithm to be understandable, as well as still “good enough” performance wise.

As the algorithm is really so nice, and described in terms of a simple State Machine, I figured I’d implement it using Akka (and FSM) for the next meeting, and another friend is implementing it on Clojure. To be fair, it seems I’m not the first person to have figured out that Akka’s FSM and Actors in general will make this super fun to implement – there are already some repos doing exactly that (well, some of them being “initial commit” ;-)). In this post I won’t be focusing on the algorithm itself, but rather, would like to share a useful technique when you implement a thing like this, and would like to write a “big test”, that asserts from a high point of view that the system performs as you’d expect.

The test case we’ll talk about is very simple:

  1. Raft should elect a leader right away after the nodes start
  2. Raft should re-elect a leader if we kill the current one

As I already said, we won’t be diving into the algorithm here – if you’re curious you can just read the whitepaper (and maybe even drop by for PaperCup? :-)).
Let’s jump right into the tests:

Assume we’ve already spun up a few actors  performing the Raft algorithm (talking with each other in order to find a leader) before executing these tests. The setup itself is not so interesting, and you can check the full test “base” file on github if you’re interested. Now the real question as with all async algorithms is “how long do I wait for completion?“. The answer in our case is “Until someone becomes Leader.“ So we want to be fishing for such a transition, or message stating this change in the raft cluster.

Each member is running an instance of Raft, and one of them (by election) will after some time become the Leader. One way to test this would be to plan a TestProbe in the “members to notify”, or use the probe as intermediate between members (A -> Probe -> B). But that’s a lot of work to be honest. The solution I’m proposing here, is also used by akka-persistence (well, and my akka-persistence-hbase plugin to it too) – let’s call it “Event Stream Subscription Awaiting“. Now that we have a fancy name for it, let’s continue with implementing it.

Let’s start from the end of the story. I want to block until some event occurs in the system (the awaitElectedLeader() method does this). Where can I fish for those events? Turns out Akka has a built in eventBus ready to messages into (and it’s available for every ActorSystem, without any additional setup). Let’s first implement our helper methods, for awaiting on an ElectedAsLeader message:

Here (in the a test class extending TestKit) we can get access to the EventStream, and subscribe for specific types of messages on it. We’ll be using a TestProbe() to recieve these events, because it allows us to expectMsg* which is exactly what we need. So in awaitElectedLeader() we just await using the probe, until the message of “some Leader has appeared in the cluster” comes in. So far this is not so different what I described before and then called it “a lot of work”. Now we’ll get to the actual trick in this method though. The eventStream you see here is defined on the ActorSystem, and as we know, that is also accessible from within an Actor itself.

In akka-persistence for example Actors publish events about the progress of the replay / confirmations etc. But that is only for testing, so you can enable a flag (persistence.publish-confirmations for example) to make it publish events to the eventStream. That’s a great idea and makes testing very simple, and would be certainly possible to implement in my Raft implementation (and probably will end up like this anyway). Let’s now however think how we could extend a receive of an Actor, to automatically send incoming messages also to the EventStream.

It’s very simple actually, if you know about Actor’s aroundReceive method. Just like it’s name implies (sounds very much like AOP by the way, doesn’t it? ;-)), it allows you to wrap a receive call of an actor with your code. Our implementation will simply just send all messages to the eventStream:

Easy! Now we don’t want to change our production code (or maybe it’s not our code, etc etc), so during Actor creation we can use a type refinement to mix this trait into our RaftActor (during setup, that I briefly mentioned, but didn’t show explicitly):

And this Actor will now automatically send all messages to the EventStream. On the event stream we subscribe for LeaderElected events using our probe, and the expectMsg part we already talked about. So… that’s it! A very simple method to test actors “from the outside”. Now you can look back at the first code-snippet in this post, to see how this all fits together.

All in all, if you’re building anything using Actors, or maybe even a library someone might want to test, it’s a great idea to provide a setting to enable your Actors sending events the EventStream, because then even from the outside, it’s easy to track what’s happening deep down in the system. I’m pretty sure I will include such “enableable” events for the libraries I’m currently building – in order to be nice to the lib’s users. ;-)

Tags: , , , , , , ,

NoJarTool – Run Scalding jobs on remote cluster from SBT

Posted by Konrad 'ktoso' Malawski under big data, coding, english, scala, twitter (No Respond)

I’ve just released a small but very helpful class which enables you to run Scalding (Hadoop) Jobs on a remote cluster, without the need of creating a so-called fat-jar you’d pass in to hadoop jar [jar]. This saves me on average 10 minutes of dev time before I can start a job on my cluster, and is extremely useful during development – when I run against small datasets.

Using the Tool is rather simple, as it only wraps around an existing Tool, in this case the scalding.Tool:

This runner will include all files located in the target/scala-2.10/classes directory as well as the all-deps.jar which should contain all your dependencies (such as Scalding, Cascading, and all other 3rd party libraries) in the “distributed cache” (by using the -libjars option, which in source is actually called “tmpjars” but that’s another story… ;-)), which is sent over to all nodes executing the job. The huge gain here is that the “dependency-jar” so you don’t need to run assembly each time you want to run a job on the cluster (which can take quite some time…).

As for creating the “dependencies jar”, I’d recommend using you can use the sbt assembly plugin and it’s great assemblyPackageDependency command. It might also be a good idea to automate the “build new jar when dependencies have changed” step – for that you can use appendContentHash feature which my pal from SoftwareMill, Krzysztof Ciesielski developed => sbt assembly content hashing.

The idea came from a discussion on cascading-user, where debs (including me) were a bit unsure why Cascading sometimes decided to use LocalJobRunner instead of the real one.

For more info, and sources of the Tool, check ktoso/hadoop-scalding-nojartool.

PS: Oh man, how come I didm’t blog anything else Scalding related before? Been playing around with it over a year now… Oh well :-)

Tags: , , , , , ,

Disrupt 2 Grow — “Life is Study!”

Posted by Konrad 'ktoso' Malawski under freedom (1 Respond)

Life is Study! - Kintaro Oe (Golden Boy)

Since the videos from this year’s Devoxx are online already, I figured I’d post my quicky session that was titled “Disrupt 2 Grow”.
The talk was very unusual for me to do, as I’d usually only talk about “the code“, but once I started preparing this talk which is a bit “soft and fluffy”, I discovered there’s quite a bit to tell about my last few years. I think this talk, though only the tip of the iceberg, is quite crucial to how I’ve been living my life recently, so if you’d like to get into that topic, and perhaps discuss about it, why not start with this video. I already got some fantastic discussion-email about some of the topics to which I can’t wait to properly reply — finally a real educated discussion via email, whoa! Anyhow, the video is only 15 minutes long, so you might give the video a try, here it is:

Following that presentation Tori Weldt (also known as “@java” on twitter) did a short interview with me, about what I’m up to recently, why I advocate Scala recently and how I feel about Kraków‘s and Poland‘s in general communities (they’re the best) – such as JUGs (Polish (Kraków), Warsaw, Poznań, Lublin), the Kraków Lambda Lounge the GDGs (GDG Kraków, Warsaw, Poznań and more…) and of course the Kraków Scala User Group — all groups I either was very active with or even helped founding. And then of course there’s Geecon, which we’re incredibly proud of (hint: call for papers is open). It was a good year when you think about it this way :-)

Devoxx-wise still, I also participated in facilitating the BOF about the “Devoxx for Kids” initiative, where I talked about how we did our Kraków edition of that idea – Geecon 4 kids. All in all, I think it was good to come back to Devoxx this year – it’s always good to meet old friends, and that’s something Devoxx is good at – because of the sheer amount of developers attending.

PS: Merry x-mas!

Tags: , , , , , ,

Scala reflection basics – accessing an object’s private field explained

Posted by Konrad 'ktoso' Malawski under english, scala (No Respond)

Today’s blog post will be on the slightly weird side, because as Scala developers we don’t usually need to reach into reflection to solve problems at hand. It’s easy to fight most problems with the “there’s a monad for that!” tactic, or simply “adding yet another layer solves any software development problem!”.

Sadly today’s task is not easily solvable by just that. The problem we’re going to tackle using Scala’s reflection involves a scala object and a private field we need to access.

Problem statement: We need to support parallel integration tests (sbt running tests in parallel), using Mongo (or Fongo). The MongoDB object though, contains globaly shared mutable state, in form of the dbs Map, which is used to determine which MongoAddress to hit for which entity.

We are able to setup the mongo identifiers up properly for the threads executing the tests, so they don’t interfere with each other, and we won’t focus on the setup part today. Instead let’s focus on what happens during teardown. A naive implementation is a simple (ScalaTest) afterAll, like this:

So sadly the naive implementation with using MongoDB.close() is not enough for us. Why? Imagine Two threads, running two MongoFlatSpecs, assume each has it’s own in memory Fongo instance even. There still is that shared MongoDB.dbs field deep in lift-mongodb’s internals. If we’d use MongoDB.close() we’ll clean all MongoIdentifiers, so we might break tests which are still in flight… We could try something among the lines of “wait for the right moment to clean up”, but … why wait!? There’s a ConcurrentHashMap in there, and we just want to remove “the one MongoIdentifier I have been using for this MongoFlatSpec”.

*Reflection to the rescue!* So we’ll have to use reflection, in order to execute such statement: MongoDB.dbs.remove(myMongoIdentifier). The first problem is that we’re dealing with a Scala object, also known as “Module”. Why is it also known as *Module*? Let’s take a look into the ByteCode!

Since we want the dbs field, we will have to go through the static final MongoDB$ MODULE$ field of the MongoDB$ class. While it’s certainly doable using plain Java reflection, let’s see how Scala’s (still experimental) reflection can make this a bit nicer:

The first thing to notice here is the use of Mirrors instead of explicitly calling getDeclaredField and it’s friends on Class<?> objects like you would in plain Java APIs. Why the hassle? The grand idea behind Mirrors (other than “it sounds so cool – reflection => mirrors, yay!“) is that you can obtain either a runtime or macros universe. This is discussed in detail in the environment-universes-mirrors section of the Scala docs if you want to dig deeper. The quick overview though is this: when you think about it, AST and reflection stuff are pretty similar. I mean, we traverse the same things, only at a different time (compile-time vs. runtime), so some things may be not available in the runtime, but why change the entire API if we could find some common ground – that is, Mirrors. Sure, they will be a bit different if used from the macros universe than from the runtime universe, but as we also write compile time macros the benefit is huge – only one API with slight flavoring to learn, instead of two separate ones – depending on the stage of when we apply them.

Coming back to our code though, as you can see, we were able to avoid going through the magic MODULE$ constant explicitly, it was abstracted away from us thanks to the types of mirrors we’ve been using – it’s also worth noting that for example the reflectModule(ModuleSymbol) takes a ModuleSymbol which is obtained via the “safe cast”, as one might think of it in the line above using the asModule method. The nice thing about asModule is that it will fail if the term you’re calling it on is not a module. So the API is both designed to be as typesafe as possible, even in reflection-lang, as well as “fail fast” if it detects you’re doing something wrong.

For more docs or further reference check out the docs about scala-reflection or ScalaDoc about scala.reflect.api.Mirrors.

Tags: , , , ,

Java8 The Good Parts @ JavaOne 2013

Posted by Konrad 'ktoso' Malawski under coding (2 Responds)

Just yesterday I gave a presentation together with Andrzej Grzesik, with whom we (and more friends!) organize GeeCON and the PolishJUG among other things.

This is our first time at JavaOne, and right away we got a full room of developers, eager to learn about the new stuff in JDK8. I have to admit, being a Scala guy since quite a few years, presenting about Java 8 felt a bit weird, but I’m still very happy about it – especially the JVM changes which allowed Java 8 and it’s Lambdas to happen. All these changes will not only make Java developer lives easier (less-painful), but also allow our Scala code to become loads faster! :-)

The slides from the talk are online here:

Oh, and it seems that if you link to http://oracle.com/technetwork/java/, which is the OTN website, you get a free t shirt, yay!

Tags: , , , , , , , , , ,