« Back
in playframework cluster akka Scala read.

Akka Cluster and Remote Lookup.

In my adventures for learning more about Akka and building up a cluster, I wanted to build some kind of architecture where I had a notion of a Master Actor that would get some work and pass it onto a bunch of Worker nodes. This meant that I also wanted to be able to pass the Master messages and query information about it remotely. In this case, I wanted to output it to a web page using the Play Framework after it queried the Master node.

Akka Cluster Setup

There are quite a few good tutorials out there on setting up an Akka cluster including the one from Typesafe's Activator. I won't go into too much detail about that but wanted to write in my own language on how it works as I found it a bit confusing at first.

Seed Nodes

The cluster seed nodes are starting points for the cluster. Essentially, every node, including the "Master" that we will soon have, all try to connect to one or more seed nodes specified in the configuration files. Akka does plenty of the plumbing underneath to get them to talk to each other, detect when nodes are dissociated and a plethora of other things. The difference between our "Master" node today and the other "Workers" is that the Master will keep a reference to all the workers and have a different role as an Actor. The Seed node themselves are also workers, but two of them will be dedicated to being the "Seed" nodes because we will run them on specific ports as specified in the config file.

Just to be clear, all the actors can be started on remote machines or different JVMs. In this post I'll be starting them all on the same machine but different JVM. I plan to load up a bunch of workers on an AWS EC2 Auto Scaling Group in the future but that will be another story.

Code Organization

This is how my code looks like in the Play Framework imported by IntelliJ:
IntelliJ Code Organization

  • application.conf is the application's overall configuration put there by Play
  • master.conf is the Master node's configuration
  • worker.conf is the Worker node's configuration

Akka Configuration

The last few lines of application.conf:

akka {  
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
    enabled-transports = ["akka.remote.netty.tcp"]
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }
}

This enables remoting. I tried to do it without this because it looked like having a cluster up also enables remoting. But that didn't work when I tried to remote in through Play's controllers. If someone knows about another way of doing it please do let me know.

The configuration files for Master and Worker are very similar and could have shared a common configuration file but I've separated it out for clarity. We could also modify them individually later on for their roles.

master.conf:

akka {  
  log-dead-letters = off
  jvm-exit-on-fatal-error = on
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 3000
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://[email protected]:2551",
      "akka.tcp://[email protected]:2552"]

    auto-down-unreachable-after = 10s
  }
}

Keep note that Master runs specifically on port 3000. We will need to remember this later when we look it up. When Master starts up, it will also try to connect to the seed nodes on ports 2551 and 2552.

worker.conf:

akka {  
  log-dead-letters = off
  jvm-exit-on-fatal-error = on
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://[email protected]:2551",
      "akka.tcp://[email protected]:2552"]

    auto-down-unreachable-after = 10s
  }
}

Cluster Messages

//Master Node Messages
case class ListWorkers()

//Worker Messages
case class RegisterWorker()  

There's only a couple of message types here that are custom. Akka itself has some built in messages and states for a cluster system which handles the nodes. We'll keep things to a minimum here.

Master Actor and Startup

object Master{  
  def main(args:Array[String]): Unit = {
    val config= ConfigFactory.parseString("akka.cluster.roles=[Master]")
      .withFallback(ConfigFactory.load("master"))

    val system=ActorSystem("ClusterSystem",config)
    system.actorOf(Props[Master],name="master")
  }
}

class Master extends Actor {  
  var workers = IndexedSeq.empty[ActorRef]

  def receive = {
    case RegisterWorker if !workers.contains(sender()) =>
        context watch sender()
        workers = workers :+ sender()

    case ListWorkers =>
        sender() ! workers

    case Terminated(node)=>
      workers = workers.filterNot(_==node)
  }

This is pretty standard cluster code. The difference is the ListWorkers message we saw earlier. The Master keeps track of all the Workers and that's what we're interested in for this demo. An external lookup will ask the Master for its list of Workers.

Worker Actor and Startup

object Worker{  
  def main(args:Array[String]):Unit={
    val port=if(args.isEmpty) "0" else args(0)
    val config=ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port")
      .withFallback(ConfigFactory.parseString("akka.cluster.roles=[Worker]"))
      .withFallback(ConfigFactory.load("worker"))

    val system=ActorSystem("ClusterSystem",config)
    system.actorOf(Props[Worker],name="worker")

  }
}


class Worker extends Actor  
{
  val cluster=Cluster(context.system)
  var master:Option[Member]=None

  override def preStart(): Unit = cluster.subscribe(self,classOf[MemberUp])
  override def postStop(): Unit = cluster.unsubscribe(self)

  def receive = {
    case state:CurrentClusterState=>
      state.members.filter(_.status==MemberStatus.Up) foreach register

    case MemberUp(node)=> register(node)
  }

  def register(member:Member):Unit={
    if(member.hasRole("master"))
      master=Some(member)
      context.actorSelection(RootActorPath(member.address)/"user"/"master") ! RegisterWorker
  }

}

The worker has a little bit more going on. The startup object lets us specify a port. From the configuration earlier, if we start a node on ports 2551 or 2552 it will be the seed nodes. Each time a node comes online, it will check in with the seed nodes. Then all the nodes will run the register function and check to see if a node is the Master node by comparing its role. If it finds a Master node, it will send the RegisterWorker message and the Master will add that node to its list of workers.

After the initial seed nodes are created, we can spin up as many workers as we want. This time we don't specify the port and let the system pick a random free one.

Running The Cluster

Let's open up 5 different consoles and start up the cluster:
Console 1 - Startup Seed/Worker 1:

cd [ProjectDirectory]  
sbt  
runMain cluster.Worker 2551  

Don't worry if you see some errors about not seeing a node on 2552. This will go away when we startup the next seed:

Console 2 - Startup Seed/Worker 2:

cd [ProjectDirectory]  
sbt  
runMain cluster.Worker 2552  

Console 3 - Startup Another Worker:

cd [ProjectDirectory]  
sbt  
runMain cluster.Worker  

Console 4 - Start Master:

cd [ProjectDirectory]  
sbt  
runMain cluster.Master  

Remember, he runs on port 3000.

You will notice that in the console they welcome each other.

Master Node Lookup in the Play Application

Let's make the controller that will do the lookup:

object Master extends Controller {  
  def listWorkers = Action {
    val actor = Akka.system.actorSelection("akka.tcp://[email protected]:3000/user/master")
    implicit val timeout = Timeout(10 seconds)

    val future= actor ? ListWorkers

    val workers=Await.result(future,timeout.duration).asInstanceOf[Vector[ActorRef]]

    Ok(views.html.masterWorkers(workers))
  }
}

We know that the Master node is on 127.0.0.1:3000 so that's where we'll look. We use akka.pattern.ask so that we can use a future to get the result. Akka is asynchronous, if we didn't use the ask pattern, the page might return HTTP OK before it finished fetching the workers from the Master node.

To specify the routes to hit this controller method we'll need to modify conf/routes:

# Routes
# This file defines all application routes (Higher priority routes first)
# ~~~~

# Home page
GET     /                           controllers.Application.index  
GET     /master/listWorkers         controllers.Master.listWorkers

# Map static resources from the /public folder to the /assets URL path
GET     /assets/*file               controllers.Assets.at(path="/public", file)

This is the standard Play route file. The only line added was:
GET /master/listWorkers controllers.Master.listWorkers which maps the browser: http://127.0.0.1:9000/master/listWorkers to the method listWorkers in the Master controller object where http://127.0.0.1:9000 is where our Play application will run.

Finally, in our View file masterWorkers.scala.html:

@import akka.actor.ActorRef
@(workers: Vector[ActorRef])

@main("Current Workers in Master") {
    <ul>
        @for(w <- workers) {
        <li>@w.path</li>
        }
    </ul>
}

We just pass it a Vector of ActorRef and loop through the Vector to display the worker nodes' addresses.

Console 5 - Start the Play Application

cd [ProjectDirectory]  
sbt  
run  

Navigating to http://127.0.0.1:9000/master/listWorkers should yield the following result:

Play Output Worker Nodes

Here we see both seed worker nodes and a third worker who runs on random port 9297.

Well that was a lot of consoles but it was a lot of fun!

Happy coding.

comments powered by Disqus