Parallelizing algorithms with the Actor Model
The idea of this post is to illustrate how to apply the Actor Model, by taking advantage of its concurrence model to solve parallel problems. The kind of parallel problems that we can solve with the Actor Model are the same kind of problems that we can solve while using MPI systems.
In synthesis, the actor model is a concurrent programming model that treats each actor as the universal primitive of a concurrent computation.
When an Actor receives a message it can:
-Make local decisions.
-Create another actors.
-Send messages.
-Determinate how to reply to the next message.
The test
The test will consist of comparison of the difference between finding n times, the index of a Fibonacci sequence in both implementations. The algorithm is the following:
let fib (n) =
let rec loop a b = function
| n when n = 0I->a
| n->loop b (a + b) (n - 1I)
loop0 I 1 I n
Notice that it uses TailRecursion to avoid stack overflow
The variation of each test will be the number of index that we are going to use, so starting with n=7:
let numbers = [1..n] |> List.map(fun n -> bigint(10000))
This is the serial version:
let computeSerial (numbers:List<bigint>,f)=
let start = DateTime.Now
let justEv n = f n |> ignore
numbers |> List .iter justEv
It just iterates over the list of numbers and executes the Fib function for each one.
The parallel version uses two actors:
type FibActor() =
inherit Actor()
override x.OnReceive message =
match message with
| :? bigint as f ->
fib f |> ignore
x.Sender <! CollectorMessage.MessageProcessed
| _ -> failwith "unknown message”
The FibActor receives a message of the type bigint and then it passes it as an argument to the Fibonacci function. When it finishes it sends a message back to the parent actor.
There is another Actor (FibCollector) which is responsable for creating and passing messages to the FibActor.
type FibCollector()=
inherit Actor()
letmutable size = numbers.Length
[<DefaultValue>] val mutable start : DateTime
let sendNumberToActor (f, ctx:IUntypedActorContext)=
let child = ctx.ActorOf(Props(typedefof<FibActor>, Array.empty))
child <! f
override x.OnReceivemessage =
let ctx = Actor.Context
match message with
| :? CollectorMessage as msg->
match msg with
| CollectorMessage.Start ->
x.start <- DateTime.Now
numbers |> List.iter (fun f ->sendNumberToActor(f, ctx))
| CollectorMessage.MessageProcessed->
size <- (size - 1)
ifsize = 0thenx.Self <! CollectorMessage.End
| CollectorMessage.End->
Console.WriteLine("Parallel ends. It tooks {0} to finish", (DateTime.Now-x.start).TotalMilliseconds)
system.Terminate().RunSynchronously()
| _ -> failwith "unknown message"
We also defined a helper Discriminated Union, which is the type of messages that our collector can handle.
typeCollectorMessage = | Start | MessageProcessed | End
So when the collector receives the Start message it initialises the Time and it also creates an Actor for each number that we want to process.
When the actor receives a MessageProcessed signal, it decreases the count and when it reaches zero it just sends an End message to itself.
When it receives the End message it prints the amount of time that it requires to process all the numbers. It also tells the system to terminate.
Results
As you can see in the table below, in a machine with a reduced number of cores (i7 4-core with HyperThreading) the actor version is about ~170% faster. We can asume that increasing the number of core (or machines) will also increase the improvement, not lineally though.
Conclusions:
As we saw the actor model is an viable option for parallelising message based algorithm. The most interesting thing from a code perspective is that it reduces the state into atomic units, so there is no shared state, which makes our code easier to understand. This also implies that we won’t have thread locks (at least at the application level).
Another benefit is that the Actor only uses threads on demand. This means that the actor will only use it when it has work to do. So, this will reduce the amount of memory consumed.