The Greater Helsinki Area F# User Group - F# & Azure - FI EN

Actor-model and Agent-model

When talking about the Actor-model, the term "Actor" refers to verb act as "someone who does something", not "someone who acts like doing". When talking about the Agent-model, the term "Agent" refers also to "someone who does something" or "represents" (like agency), but not "secret agent". Actor-model is not an alternative to domain-model: Actor-model is not actually an abstract data model, but more like a concrete way to implement the program state.

If you want to test speed in interactive, just type:

1: 
#time "on";;

Parallelism

There are three types of doing parallelism:

  • Parallel Execution
    • Run the same code path simultaneously in multiple threads.
    • Suits well for parallel set-theory operations for collection of independent items
    • Suits well for CPU-bound resources when a lot of CPU-cycles available.
    • .NET-environment: Task Parallel Library (TPL), .AsParallel()
  • Concurrency.
    • Run separate code paths simultaneously.
    • Suits well when resources are something else than CPU-bound: e.g. I/O or if just "waiting for synchronization".
    • Based on giving away the thread when not using it: Everyone else should also do that!
    • .NET-environment: Async-await
    • Used in message based ("Message Passing") architectures
  • Lock the threads and the state
    • No parallelism
    • No synchronization issues
    • .NET: lock
    • Scalability and performance problems

To more concrete example, here are three little programs that enumerate a collection of items through and sleeps a while in every item while enumerating that item:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
42: 
43: 
44: 
45: 
46: 
47: 
48: 
49: 
#if INTERACTIVE
#time "on"
#endif

open System
open System.Threading
open System.Threading.Tasks

let Execution() = 
    let iteri() = 
        [|1..1000|] |> Array.map(
            fun i -> 
                Thread.Sleep 200 //Lock the thread
                match i % 250 with
                | 0 -> Console.WriteLine("Item: " + i.ToString())
                | _ -> ()
        )
    Console.WriteLine("Starting...")
    let complete = iteri()
    Console.WriteLine("Done")

let ParallelExecution() = //TPL
    let iteri() = 
        [|1..1000|] |> Array.Parallel.map(
            fun i -> 
                Thread.Sleep 200
                match i % 250 with
                | 0 -> Console.WriteLine("Item: " + i.ToString())
                | _ -> ()
        )
    Console.WriteLine("Starting...")
    let complete = iteri()
    Console.WriteLine("Done")

let AsyncParallelExecution() = //Async-await
    let iteri() = 
        [|1..1000|] |> Array.map(
            fun i -> 
                async {
                    do! Async.Sleep 200
                    return 
                        match i % 250 with
                        | 0 -> Console.WriteLine("Item: " + i.ToString())
                        | _ -> ()
                }
        )
    Console.WriteLine("Starting...")
    let complete = iteri() |> Async.Parallel |> Async.RunSynchronously
    Console.WriteLine("Done")

The first one is slow. The second one is clearly faster. But it runs out of the threads. The third one is clearly the fastest, because it won't block the threads while executing (non-CPU-bound) async-operation.

Here are corresponding execution times (in interactive, with i7-laptop):

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
> Execution();;
Starting...
Item: 250
Item: 500
Item: 750
Item: 1000
Done
Real: 00:03:20.582, CPU: 00:00:00.000, GC gen0: 0, gen1: 0, gen2: 0
val it : unit = ()
> 

> ParallelExecution();;
Starting...
Item: 250
Item: 500
Item: 1000
Item: 750
Done
Real: 00:00:22.266, CPU: 00:00:00.000, GC gen0: 0, gen1: 0, gen2: 0
val it : unit = ()

> AsyncParallelExecution();;
Starting...
Item: 1000
Item: 750
Item: 500
Item: 250
Done
Real: 00:00:00.204, CPU: 00:00:00.000, GC gen0: 0, gen1: 0, gen2: 0
val it : unit = ()

Message Queues

There are a lot of different kind of MQ-implementation-products. What if some simple message-queue-functionality would be integrated directly to the programming language?

Test this source code with interactive:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
let myQueue = 
    new MailboxProcessor<string>(fun myHandler -> 
        let rec readNextMessage() =
            async {
                let! result = myHandler.Receive()
                System.Console.WriteLine("Handled: " + result)
                do! readNextMessage()
            }
        readNextMessage())

myQueue.Post("Hello 1 !")
myQueue.Post("Hello 2 !")

myQueue.CurrentQueueLength // 2

myQueue.Start()

myQueue.CurrentQueueLength // 0

myQueue.Post("Hello 3 !")

Actor-model

What if "objects" as capturing the program state, would be replaced with message-based solution?

Actor-model is a theory, which uses “Actors” as parallel-computation-items. Actor may process information, store information or communicate with other actors. Actor may even take a message which gives a totally new functionality to that actor.

Actor-model has been implemented to Erlang-language, but lately there have appeared actor-libraries to many programming languages.

The basic idea is comparable to distributed version control system: you can't actually "stop the program and see what state it is in", as the state is constantly-evolving and may look different from a different point of view.

Agent-model

Agent-model is basically the same thing as actor model with small differences. Agent accepts requests, so the interface is more solid. F#-language have built-in support for Agent-model and it is capsulated to the class called MailboxProcessor. Usually this is concretized like this: the master just asks for things and the background thread does the work of parsing the event history with set-theory-operations.

The previous example sent strings to the message-channel. Usually it is a better practice to send discriminated union commands and their parameters.

Static MailboxProcessor.Start is equivalent of creating new MailboxProcessor() and then calling its .Start(). It is easy to construct time-outs, exception handling and cancellation-operations for agents.

Test the following source code:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
type ``Person agent methods`` =
| AddPerson of string * int //Name*Age
| GetPeople of AsyncReplyChannel<(string*int) list>

let myQueue2 = 
    MailboxProcessor<``Person agent methods``>.Start(fun myHandler -> 
        let rec readNextMessage people =
            async {
                let! result = myHandler.Receive()
                match result with
                | AddPerson(n,s) ->
                    return! readNextMessage ((n,s)::people)
                | GetPeople(response) ->
                    let p = people
                    response.Reply(p)
                    return! readNextMessage people
            }
        readNextMessage [])

let addPerson =  AddPerson >> myQueue2.Post 
("Matti", 30) |> addPerson
("Maija", 7) |> addPerson 

let res = myQueue2.PostAndReply(GetPeople)

In addition to PostAndReply there is also AsyncPostAndReply

We want more...

This was just a single agent, which by itself doesn't separate very much from .NET ConcurrentDictionary other than this: The programming interface is business-logical methods and not CRUD-operations. But the real benefits come to life when there are a lot of actors:

Instead of object-inheritance (or enum-type-properties) using agents to communicate with each other.

(image source: Writing Concurrent Applications Using F# Agents)

Exercises

Exercise 1

Add a new method to "Person agent methods": a method that will return only people over the age of 18 years.

Exercise 2

Modify the "Person agent methods" agent to be "unprocessed people" and make also another agent, "processed people":

  • Create a small business-logic and communication, how the un-processed people moves to processed people.
  • Change the interface to asynchronous (AsyncPostAndReply)
  • You can also encapsulate the agent inside a type.

Back to the menu

namespace System
namespace System.Threading
namespace System.Threading.Tasks
val Execution : unit -> unit

Full name: AgentModelEng.Execution
val iteri : (unit -> unit [])
type Array =
  member Clone : unit -> obj
  member CopyTo : array:Array * index:int -> unit + 1 overload
  member GetEnumerator : unit -> IEnumerator
  member GetLength : dimension:int -> int
  member GetLongLength : dimension:int -> int64
  member GetLowerBound : dimension:int -> int
  member GetUpperBound : dimension:int -> int
  member GetValue : params indices:int[] -> obj + 7 overloads
  member Initialize : unit -> unit
  member IsFixedSize : bool
  ...

Full name: System.Array
val map : mapping:('T -> 'U) -> array:'T [] -> 'U []

Full name: Microsoft.FSharp.Collections.Array.map
val i : int
Multiple items
type Thread =
  inherit CriticalFinalizerObject
  new : start:ThreadStart -> Thread + 3 overloads
  member Abort : unit -> unit + 1 overload
  member ApartmentState : ApartmentState with get, set
  member CurrentCulture : CultureInfo with get, set
  member CurrentUICulture : CultureInfo with get, set
  member DisableComObjectEagerCleanup : unit -> unit
  member ExecutionContext : ExecutionContext
  member GetApartmentState : unit -> ApartmentState
  member GetCompressedStack : unit -> CompressedStack
  member GetHashCode : unit -> int
  ...

Full name: System.Threading.Thread

--------------------
Thread(start: ThreadStart) : unit
Thread(start: ParameterizedThreadStart) : unit
Thread(start: ThreadStart, maxStackSize: int) : unit
Thread(start: ParameterizedThreadStart, maxStackSize: int) : unit
Thread.Sleep(timeout: TimeSpan) : unit
Thread.Sleep(millisecondsTimeout: int) : unit
type Console =
  static member BackgroundColor : ConsoleColor with get, set
  static member Beep : unit -> unit + 1 overload
  static member BufferHeight : int with get, set
  static member BufferWidth : int with get, set
  static member CapsLock : bool
  static member Clear : unit -> unit
  static member CursorLeft : int with get, set
  static member CursorSize : int with get, set
  static member CursorTop : int with get, set
  static member CursorVisible : bool with get, set
  ...

Full name: System.Console
Console.WriteLine() : unit
   (+0 other overloads)
Console.WriteLine(value: string) : unit
   (+0 other overloads)
Console.WriteLine(value: obj) : unit
   (+0 other overloads)
Console.WriteLine(value: uint64) : unit
   (+0 other overloads)
Console.WriteLine(value: int64) : unit
   (+0 other overloads)
Console.WriteLine(value: uint32) : unit
   (+0 other overloads)
Console.WriteLine(value: int) : unit
   (+0 other overloads)
Console.WriteLine(value: float32) : unit
   (+0 other overloads)
Console.WriteLine(value: float) : unit
   (+0 other overloads)
Console.WriteLine(value: decimal) : unit
   (+0 other overloads)
Int32.ToString() : string
Int32.ToString(provider: IFormatProvider) : string
Int32.ToString(format: string) : string
Int32.ToString(format: string, provider: IFormatProvider) : string
val complete : unit []
val ParallelExecution : unit -> unit

Full name: AgentModelEng.ParallelExecution
module Parallel

from Microsoft.FSharp.Collections.ArrayModule
val map : mapping:('T -> 'U) -> array:'T [] -> 'U []

Full name: Microsoft.FSharp.Collections.ArrayModule.Parallel.map
val AsyncParallelExecution : unit -> unit

Full name: AgentModelEng.AsyncParallelExecution
val iteri : (unit -> Async<unit> [])
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async

--------------------
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>
static member Async.Sleep : millisecondsDueTime:int -> Async<unit>
static member Async.Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member Async.RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
type GC =
  static member AddMemoryPressure : bytesAllocated:int64 -> unit
  static member CancelFullGCNotification : unit -> unit
  static member Collect : unit -> unit + 2 overloads
  static member CollectionCount : generation:int -> int
  static member GetGeneration : obj:obj -> int + 1 overload
  static member GetTotalMemory : forceFullCollection:bool -> int64
  static member KeepAlive : obj:obj -> unit
  static member MaxGeneration : int
  static member ReRegisterForFinalize : obj:obj -> unit
  static member RegisterForFullGCNotification : maxGenerationThreshold:int * largeObjectHeapThreshold:int -> unit
  ...

Full name: System.GC
type unit = Unit

Full name: Microsoft.FSharp.Core.unit
Multiple items
type MailboxProcessor<'Msg> =
  interface IDisposable
  new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
  member Post : message:'Msg -> unit
  member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
  member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
  member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
  member Receive : ?timeout:int -> Async<'Msg>
  member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
  member Start : unit -> unit
  member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
  ...

Full name: Microsoft.FSharp.Control.MailboxProcessor<_>

--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
Multiple items
val string : value:'T -> string

Full name: Microsoft.FSharp.Core.Operators.string

--------------------
type string = String

Full name: Microsoft.FSharp.Core.string
Multiple items
val int : value:'T -> int (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int

--------------------
type int = int32

Full name: Microsoft.FSharp.Core.int

--------------------
type int<'Measure> = int

Full name: Microsoft.FSharp.Core.int<_>
type AsyncReplyChannel<'Reply>
member Reply : value:'Reply -> unit

Full name: Microsoft.FSharp.Control.AsyncReplyChannel<_>
type 'T list = List<'T>

Full name: Microsoft.FSharp.Collections.list<_>

Creative Commons -copyright Tuomas Hietanen, 2014, thorium(at)iki.fi, Creative Commons