The challenge

Writing concurrent programs is hard. Having to deal with threads, locks, race conditions, and so on is highly error-prone and can lead to code that is difficult to read, test, and maintain.

Many therefore prefer to avoid multithreading altogether. Instead, they employ single-threaded processes exclusively, relying on external services (such as databases, queues, etc.) to handle any needed concurrent or asynchronous operations. While this approach is in some cases a legitimate alternative, there are many scenarios in which it is simply not a viable option. Many real-time systems – such as trading or banking applications, or real-time games – don’t have the luxury of waiting for a single-threaded process to complete (they need the answer now!). Other systems are so compute- or resource-intensive that they would take an inordinate amount of time (hours or even days in some cases) to run without introducing parallelization into their code.

One fairly common single-threaded approach (widely used in the Node.js world, for example) is to use an event-based, non-blocking paradigm. While this does help performance by avoiding context switches, locks, and blocking, it still does not address the issues of using multiple processors concurrently (doing so would require launching, and coordinating between, multiple independent processes).

So does this mean you have no choice but to journey deep into the bowels of threads, locks, and race conditions in order to build a concurrent application?

Thanks to frameworks like Akka, the answer is no. This post introduces Akka and explores the ways in which it facilitates and simplifies the implementation of concurrent, distributed applications.

What is Akka?

This post introduces Akka and explores the ways in which it facilitates and simplifies the implementation of concurrent, distributed applications.

Akka is a toolkit and runtime for building highly concurrent, distributed, and fault tolerant applications on the JVM. Akka is written in Scala, with language bindings provided for both Scala and Java.

Akka’s approach to handling concurrency is based on the Actor Model. In an actor-based system, everything is an actor, in much the same way that everything is an object in object-oriented design. A key difference, though – particularly relevant to our discussion – is that the Actor Model was specifically designed and architected to serve as a concurrent model whereas the object-oriented model is not. More specifically, in an actor system, actors interact and share information, without any presupposition of sequentiality. The mechanism by which actors share information with one another, and task one another, is message passing.

All the complexity of creating and scheduling threads, receiving and dispatching messages, and handling race conditions and synchronization, is relegated to the framework to handle transparently.

Akka creates a layer between the actors and the underlying system such that actors simply need to process messages. All the complexity of creating and scheduling threads, receiving and dispatching messages, and handling race conditions and synchronization, is relegated to the framework to handle transparently.

Akka strictly adheres to the The Reactive Manifesto. Reactive applications aim at replacing traditional multithreaded applications with an architecture that satisfies one or more of the following requirements:

  • Event-driven. Using Actors, one can write code that handles requests asynchronously and employs non-blocking operations exclusively.
  • Scalable. In Akka, adding nodes without having to modify the code is possible, thanks both to message passing and location transparency.
  • Resilient. Any application will encounter errors and fail at some point in time. Akka provides “supervision” (fault tolerance) strategies to facilitate a self-healing system.
  • Responsive. Many of today’s high performance and rapid response applications need to give quick feedback to the user and therefore need to react to events in an extremely timely manner. Akka’s non-blocking, message-based strategy helps achieve this.

What is an Actor?

An actor is essentially nothing more than an object that receives messages and takes actions to handle them. It is decoupled from the source of the message and its only responsibility is to properly recognize the type of message it has received and take action accordingly.

Upon receiving a message, an actor may take one or more of the following actions:

  • Execute some operations itself (such as performing calculations, persisting data, calling an external web service, and so on)
  • Forward the message, or a derived message, to another actor
  • Instantiate a new actor and forward the message to it

Alternatively, the actor may choose to ignore the message entirely (i.e., it may choose inaction) if it deems it appropriate to do so.

To implement an actor, it is necessary to extend the akka.actor.Actor trait and implement the receive method. An actor’s receive method is invoked (by Akka) when a message is sent to that actor. Its typical implementation consists of pattern matching, as shown in the following example, to identify the message type and react accordingly:

import akka.actor.Actor
import akka.actor.Props
import akka.event.Logging
 
class MyActor extends Actor {
  def receive = {
    case value: String => doSomething(value)
    case _ => println("received unknown message")
  }
}

Pattern matching is a relatively elegant technique for handling messages, which tends to produce “cleaner” and easier-to-navigate code than a comparable implementation based on callbacks. Consider, for example, a simplistic HTTP request/response implementation.

First, let’s implement this using a callback-based paradigm in JavaScript:

route(url, function(request){
  var query = buildQuery(request);
  dbCall(query, function(dbResponse){
    var wsRequest = buildWebServiceRequest(dbResponse);
    wsCall(wsRequest, function(wsResponse) {
      sendReply(wsResponse);
    });
  });
});

Now let’s compare this to a pattern-matching-based implementation:

msg match {
  case HttpRequest(request) => {
    val query = buildQuery(request)
    dbCall(query)
  }
  case DbResponse(dbResponse) => {
    var wsRequest = buildWebServiceRequest(dbResponse);
    wsCall(dbResponse)
  }
  case WsResponse(wsResponse) => sendReply(wsResponse)
}

While the callback-based JavaScript code is admittedly compact, it is certainly harder to read and navigate. In comparison, the pattern-matching-based code makes it more immediately apparent what cases are being considered and how each is being handled.

The Actor System

Taking a complex problem and recursively splitting it into smaller sub-problems is a sound problem solving technique in general. This approach can be particularly beneficial in computer science (consistent with the Single Responsibility Principle), as it tends to yield clean, modularized code, with little or no redundancy, that is relatively easy to maintain.

In an actor-based design, use of this technique facilitates the logical organization of actors into a hierarchical structure known as an Actor System. The actor system provides the infrastructure through which actors interact with one another.

In Akka, the only way to communicate with an actor is through an ActorRef. An ActorRef represents a reference to an actor that precludes other objects from directly accessing or manipulating that actor’s internals and state. Messages may be sent to an actor via an ActorRef using one of the following syntax protocols:

  • ! (“tell”) – sends the message and returns immediately
  • ? (“ask”) – sends the message and returns a Future representing a possible reply

Each actor has a mailbox to which its incoming messages are delivered. There are multiple mailbox implementations from which to choose, with the default implementation being FIFO.

An actor contains many instance variables to maintain state while processing multiple messages. Akka ensures that each instance of an actor runs in its own lightweight thread and that messages are processed one at a time. In this way, each actor’s state can be reliably maintained without the developer needing to explicitly worry about synchronization or race conditions.

Each actor is provided with the following useful information for performing its tasks via the Akka Actor API:

  • sender: an ActorRef to the sender of the message currently being processed
  • context: information and methods relating to the context within which the actor is running (includes, for example, an actorOf method for instantiating a new actor)
  • supervisionStrategy: defines the strategy to be used for recovering from errors
  • self: the ActorRef for the actor itself
Akka ensures that each instance of an actor runs in its own lightweight thread and that messages are processed one at a time. In this way, each actor's state can be reliably maintained without the developer needing to explicitly worry about synchronization or race conditions.

To help tie these concepts together, let’s consider a simple example of counting the number of words in a text file.

For purposes of our example, we’ll decompose the problem into two subtasks; namely, (1) a “child” task of counting the number of words on a single line and (2) a “parent” task of summing those per-line word counts to get the total number of words in the file.

The parent actor will load each line from the file and then delegate to a child actor the task of counting the words in that line. When the child is done, it will send a message back to the parent with the result. The parent will receive the messages with the word counts (for each line) and keep a counter for the total number of words in the entire file, which it will then return to its invoker upon completion.

(Note that the code samples provided below are intended to be didactic only and therefore do not necessarily concern themselves with all edge conditions, performance optimizations, and so on. Also, a complete compilable version of the code samples shown below is available in this gist.)

Let’s first look at a sample implementation of the child StringCounterActor class:

case class ProcessStringMsg(string: String)
case class StringProcessedMsg(words: Integer)
 
class StringCounterActor extends Actor {
  def receive = {
    case ProcessStringMsg(string) => {
      val wordsInLine = string.split(" ").length
      sender ! StringProcessedMsg(wordsInLine)
    }
    case _ => println("Error: message not recognized")
  }
}

This actor has a very simple task: consume ProcessStringMsg messages (containing a line of text), count the number of words on the specified line, and return the result to the sender via a StringProcessedMsg message. Note that we have implemented our class to use the ! (“tell”) method to send the StringProcessedMsg message (i.e., to send the message and return immediately).

OK, now let’s turn our attention to the parent WordCounterActor class:

1.  case class StartProcessFileMsg()
2.
3.  class WordCounterActor(filename: String) extends Actor {
4.  
5.    private var running = false
6.    private var totalLines = 0
7.    private var linesProcessed = 0
8.    private var result = 0
9.    private var fileSender: Option[ActorRef] = None
10.  
11.   def receive = {
12.     case StartProcessFileMsg() => {
13.       if (running) {
14.         // println just used for example purposes;
15.         // Akka logger should be used instead
16.         println("Warning: duplicate start message received")
17.       } else {
18.         running = true
19.         fileSender = Some(sender) // save reference to process invoker
20.         import scala.io.Source._
21.         fromFile(filename).getLines.foreach { line =>
22.           context.actorOf(Props[StringCounterActor]) ! ProcessStringMsg(line)
23.           totalLines += 1
24.         }
25.       }
26.     }
27.     case StringProcessedMsg(words) => {
28.       result += words
29.       linesProcessed += 1
30.       if (linesProcessed == totalLines) {
31.         fileSender.map(_ ! result)  // provide result to process invoker
32.       }
33.     }
34.     case _ => println("message not recognized!")
35.   }
36. }

Many things are going on in here, so let’s examine each of them in more detail (note that the line numbers referenced in the discussion that follows are based on the above code sample)

First, notice that the name of the file to process is passed to the WordCounterActor constructor (line 3). This indicates that the actor is only to be used to process a single file. This also simplifies the coding job for the developer, by avoiding the need to reset state variables (running, totalLines, linesProcessed, and result) when the job is done, since the instance is only used once (i.e., to process a single file) and then discarded.

Next, observe that the WordCounterActor handles two types of messages:

  • StartProcessFileMsg (line 12)
    • Received from the external actor that initially initiates the WordCounterActor.
    • When received, the WordCounterActor first checks that it is not receiving a redundant request.
    • If the request is redundant, WordCounterActor generates a warning and nothing more is done (line 16).
    • If the request is not redundant:
      • WordCounterActor stores a reference to the sender in the fileSender instance variable (note that this is an Option[ActorRef] rather than an Option[Actor] - see line 9). This ActorRef is needed in order to later access and respond to it when processing the final StringProcessedMsg (which is received from a StringCounterActor child, as described below).
      • WordCounterActor then reads the file and, as each line in the file is loaded, a StringCounterActor child is created and a message containing the line to be processed is passed to it (lines 21-24).
  • StringProcessedMsg (line 27)
    • Received from a child StringCounterActor when it completes processing the line assigned to it.
    • When received, the WordCounterActor increments the line counter for the file and, if all lines in the file have been processed (i.e., when totalLines and linesProcessed are equal), it sends the final result to the original fileSender (lines 28-31).

Once again, notice that the sole mechanism for inter-actor communication is message passing. Messages are the only thing that actors share and, since actors can potentially access the same messages concurrently, it is important for them to be immutable, in order to avoid race conditions and unexpected behaviors.

Case classes in Scala are regular classes that provide a recursive decomposition mechanism via pattern matching.

It is therefore common to pass messages in the form of case classes since they are immutable by default and because of how seamlessly they integrate with pattern matching.

Let’s conclude the example with the code to run the whole app.

object Sample extends App {
 
  import akka.util.Timeout
  import scala.concurrent.duration._
  import akka.pattern.ask
  import akka.dispatch.ExecutionContexts._
  
  implicit val ec = global
  
  override def main(args: Array[String]) {
    val system = ActorSystem("System")
    val actor = system.actorOf(Props(new WordCounterActor(args(0))))
    implicit val timeout = Timeout(25 seconds)
    val future = actor ? StartProcessFileMsg()
    future.map { result =>
      println("Total number of words " + result)
      system.shutdown
    }
  }
}
In concurrent programming, a "future" is a essentially a placeholder object for a result that is not yet known.

Notice how this time the ? method is used to send a message. In this way, the caller can use the returned Future to print the final result when this is available and to exit the program by shutting down the ActorSystem.

Fault tolerance and supervisor strategies

In an actor system, each actor is the supervisor of its children. If an actor fails to handle a message, it suspends itself and all of its children and sends a message, usually in the form of an exception, to its supervisor.

In Akka, supervisor strategies are the primary and straightforward mechanism for defining the fault tolerant behavior of your system.

In Akka, the way in which a supervisor reacts to and handles exceptions that percolate up to it from its children is referred to as a supervisor strategy. Supervisor strategies are the primary and straightforward mechanism by which you define the fault tolerant behavior of your system.

When a message signifying a failure reaches a supervisor, it can take one of the following actions:

  • Resume the child (and its children), keeping its internal state. This strategy can be applied when the child state was not corrupted by the error and it can continue functioning correctly.
  • Restart the child (and its children), clearing its internal state. This strategy can be used in the opposite scenario of the one just described. If the child state has been corrupted by the error, it is necessary the reset its state before it can be used in the Future.
  • Stop the child (and its children) permanently. This strategy can be employed in cases where the error condition is not believed to be rectifiable, but does not jeopardize the rest of the operation being performed, which can be completed in the absence of the failed child.
  • Stop itself and escalate the error. Employed when the supervisor does not know how to handle the failure and so it escalates it to its own supervisor.

Moreover, an Actor can decide to apply the action just to the failed children or to its siblings as well. There are two pre-defined strategies for this:

  • OneForOneStrategy: Applies the specified action to the failed child only
  • AllForOneStrategy: Applies the specified action to all of its children

Here’s a simple example, using the OneForOneStrategy:

import akka.actor.OneForOneStrategy
import akka.actor.SupervisorStrategy._
import scala.concurrent.duration._
 
override val supervisorStrategy =
 OneForOneStrategy() {
   case _: ArithmeticException      => Resume
   case _: NullPointerException     => Restart
   case _: IllegalArgumentException => Stop
   case _: Exception                => Escalate
 }

If no strategy is specified, the following default strategy is employed:

  • If there was an error while initializing the actor or if the actor was killed, the actor is stopped.
  • If there was any other kind of exception, the actor is simply restarted.

The Akka-supplied implementation of this default strategy is as follows:

final val defaultStrategy: SupervisorStrategy = {
  def defaultDecider: Decider = {
    case _: ActorInitializationException ⇒ Stop
    case _: ActorKilledException         ⇒ Stop
    case _: Exception                    ⇒ Restart
  }
  OneForOneStrategy()(defaultDecider)
}

Akka allows for the implementation of custom supervisor strategies, but as the Akka documentation warns, do so with caution as incorrect implementations may lead to problems such as blocked actor systems (i.e. permanently suspended actors).

Location transparency

The Akka architecture supports location transparency, enabling actors to be entirely agnostic to where the messages that they receive originated. The sender of the message may reside in the same JVM as the actor or in a separate JVM (either running on the same node or a different node). Akka enables each of these cases to be handled in a manner that is completely transparent to the actor (and therefore the developer). The only caveat is that messages sent across multiple nodes must be serializable.

The Akka architecture supports location transparency, enabling actors to be entirely agnostic to where the messages that they receive originated.

Actor systems are designed to run in a distributed environment without requiring any specialized code. Akka only requires the presence of a configuration file (application.conf) that specifies the nodes to send messages to. Here’s a simple example of a configuration file:

akka {
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
  }
  remote {
    transport = "akka.remote.netty.NettyRemoteTransport"
    netty {
      hostname = "127.0.0.1"
      port = 2552
    }
  }
}

A few parting tips…

We have seen how Akka helps achieve concurrency and high performance. However, there are few points to keep in mind when designing and implementing your system in order to exploit the power of Akka to its fullest:

  • To the greatest extent possible, each actor should be assigned the smallest task possible (as previously discussed, following the Single Responsibility Principle)
  • Actors should handle events (i.e., process messages) asynchronously and should not block, otherwise context switches will happen which can adversely affect performance. Specifically, it is best to perform blocking operations (IO, etc.) in a Future so as not to block the actor; i.e.:

      case evt => blockingCall() // BAD
      case evt => Future {
        blockingCall()           // GOOD
      }
    
  • Be sure your messages are all immutable, since the actors that pass them to one another will all be running concurrently in their own threads. Mutable messages are likely to result in unexpected behavior.
  • Since messages sent between nodes must be serializable, it is important to bear in mind that the larger the messages are, the longer it will take to serialize, send, and deserialize them, which can negatively impact performance.

Conclusion

Akka simplifies and facilitates the development of highly-concurrent, distributed, and fault tolerant applications, hiding much of the complexity from the developer. Doing Akka full justice would require much more than this single blog post, but hopefully this introduction was sufficiently captivating to get you to want to read more.

Amazon, VMWare, and CSC are but a few examples of leading companies who are actively using Akka. Visit the official Akka website to learn more and to explore whether Akka could be the right answer for your project as well.

Hiring? Meet the Top 10 Scala Developers for Hire in October 2014
Like what you're reading?
Get the latest updates first.
Don't miss out.
Get the latest updates first.

Comments

Arseniy Zhizhelev
Nice introduction! I would add that while Scala gives the developer the toolkit for dealing with immutable data (vals, functional approach, case classes, immutable collections, etc.), Akka gives an ability to seamlessly handle mutable state encapsulating it in actors. Scala + Akka together are indispensable for concurrent and distributed programming.
Dylan Arnold
Great post. Cleared a lot up for me. This is exactly the sort of Akka introduction I needed when I first used it. :)
Amol Khatri
Nice introduction. Helpful for beginners.
AndriyDrozdyuk
1. This line does not work, as there is no "ProcessFileMsg": val future = actor ? ProcessFileMsg("file.txt") 2. Also, the WordCounterActor has a constructor that takes a string argument. Whereas you just instantiated it from "Props": val actor = system.actorOf(Props[WordCounterActor]) 3. This line does not compile: implicit val timeout = Timeout(25 seconds) 4. Could you put the import statements into source? For example, the import of "akka.pattern.ask".
Diego Castorina
Hi Andriy, Thanks a lot for your comments. I have fixed the errors you pointed out and linked a gist with the complete example (which includes the import statements). Diego
Yegor Andreenko
I'm confused with WordCounterActor with regard to "share nothing" principle. Would you have problems with line 30? // if (linesProcessed == totalLines) There is my concern: you have 2 counters(line 6, 7). And two flows of control StringProcessedMsg & StartProcessFileMsg. It's wrong to save state in actor between messages this way. It's actually lead to race condition. Modification at line 23 and 29 and their comparison(line 30) the cause of it. How far do you agree with my opinion?
Matthew
Perhaps I'm missing something but I agree with these concerns; surely as you are still in the process of dispatching lines to be processed by child actors, you may concurrently receive responses for previous lines that could satisfy the linesProcessed == totalLines condition and cause the result to be returned prematurely?
Matthew
I think there is another bug in this code involving the usage of 'sender'. This is a function and hence will return the newest sending ActorRef if we have one or more file processing requests. Considering the entire example, I can see that multiple requests are not possible but I think it's still misleading (if I'm right!).
t11ger
It's clearly mentioned in the positing that "instance is only used once" and hence you will never face the scenarios that you are describing.
perfectinvesting
Legit HYIP - Invest With Perfect Money,Egopay and Bitcoin Confident in your choices. Clear about your goals. In control of your financial life. That's how we want you to feel when you're planning for your future. And that's why Legit Hyip puts you and your needs at the center of our busixness, from our approach to our philosophy on how advisors work with you. Learn more about what Legit Hyip can offer you below. Deposit Now ( Perfect Money Investment ) 100% Guaranteed Return http://www.legithyip.net PerfectMoney Proof of Payment http://www.payinghyiponline.com/legithyip.html
ches
It may add a little too much cognitive overhead for an introductory article as another concept to explain, but using <code>become</code> to switch <code>WordCounterActor</code>'s behavior could arguably clean up the logic of the <code>running</code> var.
Saeed Zarinfam
Very good article to start using akka. Thanks.
Gabriel Volpe
It's a good approach! Now I'm learning Akka with Scala, and I started from this example. Once I've understand then I replaced the use of the 'running' variable using context.become. This is the code: https://github.com/gvolpe/simple-file-reader-akka-actors
comments powered by Disqus
Subscribe
Free email updates
Get the latest content first.
Trending articles
Relevant technologies
Toptal Authors
Software Engineer
Software Developer
Software Engineer
Software Architect
iOS Developer