Actor-model and Agent-model
When talking about the Actor-model, the term "Actor" refers to verb act as "someone who does something", not "someone who acts like doing". When talking about the Agent-model, the term "Agent" refers also to "someone who does something" or "represents" (like agency), but not "secret agent". Actor-model is not an alternative to domain-model: Actor-model is not actually an abstract data model, but more like a concrete way to implement the program state.
If you want to test speed in interactive, just type:
1:
|
#time "on";; |
Parallelism
There are three types of doing parallelism:
- Parallel Execution
- Run the same code path simultaneously in multiple threads.
- Suits well for parallel set-theory operations for collection of independent items
- Suits well for CPU-bound resources when a lot of CPU-cycles available.
- .NET-environment: Task Parallel Library (TPL), .AsParallel()
- Concurrency.
- Run separate code paths simultaneously.
- Suits well when resources are something else than CPU-bound: e.g. I/O or if just "waiting for synchronization".
- Based on giving away the thread when not using it: Everyone else should also do that!
- .NET-environment: Async-await
- Used in message based ("Message Passing") architectures
- Lock the threads and the state
- No parallelism
- No synchronization issues
- .NET: lock
- Scalability and performance problems
To more concrete example, here are three little programs that enumerate a collection of items through and sleeps a while in every item while enumerating that item:
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: 19: 20: 21: 22: 23: 24: 25: 26: 27: 28: 29: 30: 31: 32: 33: 34: 35: 36: 37: 38: 39: 40: 41: 42: 43: 44: 45: 46: 47: 48: 49: |
#if INTERACTIVE #time "on" #endif open System open System.Threading open System.Threading.Tasks let Execution() = let iteri() = [|1..1000|] |> Array.map( fun i -> Thread.Sleep 200 //Lock the thread match i % 250 with | 0 -> Console.WriteLine("Item: " + i.ToString()) | _ -> () ) Console.WriteLine("Starting...") let complete = iteri() Console.WriteLine("Done") let ParallelExecution() = //TPL let iteri() = [|1..1000|] |> Array.Parallel.map( fun i -> Thread.Sleep 200 match i % 250 with | 0 -> Console.WriteLine("Item: " + i.ToString()) | _ -> () ) Console.WriteLine("Starting...") let complete = iteri() Console.WriteLine("Done") let AsyncParallelExecution() = //Async-await let iteri() = [|1..1000|] |> Array.map( fun i -> async { do! Async.Sleep 200 return match i % 250 with | 0 -> Console.WriteLine("Item: " + i.ToString()) | _ -> () } ) Console.WriteLine("Starting...") let complete = iteri() |> Async.Parallel |> Async.RunSynchronously Console.WriteLine("Done") |
The first one is slow. The second one is clearly faster. But it runs out of the threads. The third one is clearly the fastest, because it won't block the threads while executing (non-CPU-bound) async-operation.
Here are corresponding execution times (in interactive, with i7-laptop):
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: 19: 20: 21: 22: 23: 24: 25: 26: 27: 28: 29: 30: |
> Execution();; Starting... Item: 250 Item: 500 Item: 750 Item: 1000 Done Real: 00:03:20.582, CPU: 00:00:00.000, GC gen0: 0, gen1: 0, gen2: 0 val it : unit = () > > ParallelExecution();; Starting... Item: 250 Item: 500 Item: 1000 Item: 750 Done Real: 00:00:22.266, CPU: 00:00:00.000, GC gen0: 0, gen1: 0, gen2: 0 val it : unit = () > AsyncParallelExecution();; Starting... Item: 1000 Item: 750 Item: 500 Item: 250 Done Real: 00:00:00.204, CPU: 00:00:00.000, GC gen0: 0, gen1: 0, gen2: 0 val it : unit = () |
Message Queues
There are a lot of different kind of MQ-implementation-products. What if some simple message-queue-functionality would be integrated directly to the programming language?
Test this source code with interactive:
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: 19: 20: |
let myQueue = new MailboxProcessor<string>(fun myHandler -> let rec readNextMessage() = async { let! result = myHandler.Receive() System.Console.WriteLine("Handled: " + result) do! readNextMessage() } readNextMessage()) myQueue.Post("Hello 1 !") myQueue.Post("Hello 2 !") myQueue.CurrentQueueLength // 2 myQueue.Start() myQueue.CurrentQueueLength // 0 myQueue.Post("Hello 3 !") |
Actor-model
What if "objects" as capturing the program state, would be replaced with message-based solution?
Actor-model is a theory, which uses “Actors” as parallel-computation-items. Actor may process information, store information or communicate with other actors. Actor may even take a message which gives a totally new functionality to that actor.
Actor-model has been implemented to Erlang-language, but lately there have appeared actor-libraries to many programming languages.
The basic idea is comparable to distributed version control system: you can't actually "stop the program and see what state it is in", as the state is constantly-evolving and may look different from a different point of view.
Agent-model
Agent-model is basically the same thing as actor model with small differences. Agent accepts requests, so the interface is more solid. F#-language have built-in support for Agent-model and it is capsulated to the class called MailboxProcessor. Usually this is concretized like this: the master just asks for things and the background thread does the work of parsing the event history with set-theory-operations.
The previous example sent strings to the message-channel. Usually it is a better practice to send discriminated union commands and their parameters.
Static MailboxProcessor.Start is equivalent of creating new MailboxProcessor() and then calling its .Start(). It is easy to construct time-outs, exception handling and cancellation-operations for agents.
Test the following source code:
1: 2: 3: 4: 5: 6: 7: 8: 9: 10: 11: 12: 13: 14: 15: 16: 17: 18: 19: 20: 21: 22: 23: 24: |
type ``Person agent methods`` = | AddPerson of string * int //Name*Age | GetPeople of AsyncReplyChannel<(string*int) list> let myQueue2 = MailboxProcessor<``Person agent methods``>.Start(fun myHandler -> let rec readNextMessage people = async { let! result = myHandler.Receive() match result with | AddPerson(n,s) -> return! readNextMessage ((n,s)::people) | GetPeople(response) -> let p = people response.Reply(p) return! readNextMessage people } readNextMessage []) let addPerson = AddPerson >> myQueue2.Post ("Matti", 30) |> addPerson ("Maija", 7) |> addPerson let res = myQueue2.PostAndReply(GetPeople) |
In addition to PostAndReply there is also AsyncPostAndReply
We want more...
This was just a single agent, which by itself doesn't separate very much from .NET ConcurrentDictionary
Instead of object-inheritance (or enum-type-properties) using agents to communicate with each other.
(image source: Writing Concurrent Applications Using F# Agents)
Exercises
Exercise 1
Add a new method to "Person agent methods": a method that will return only people over the age of 18 years.
Exercise 2
Modify the "Person agent methods" agent to be "unprocessed people" and make also another agent, "processed people":
- Create a small business-logic and communication, how the un-processed people moves to processed people.
- Change the interface to asynchronous (AsyncPostAndReply)
- You can also encapsulate the agent inside a type.
Full name: AgentModelEng.Execution
member Clone : unit -> obj
member CopyTo : array:Array * index:int -> unit + 1 overload
member GetEnumerator : unit -> IEnumerator
member GetLength : dimension:int -> int
member GetLongLength : dimension:int -> int64
member GetLowerBound : dimension:int -> int
member GetUpperBound : dimension:int -> int
member GetValue : params indices:int[] -> obj + 7 overloads
member Initialize : unit -> unit
member IsFixedSize : bool
...
Full name: System.Array
Full name: Microsoft.FSharp.Collections.Array.map
type Thread =
inherit CriticalFinalizerObject
new : start:ThreadStart -> Thread + 3 overloads
member Abort : unit -> unit + 1 overload
member ApartmentState : ApartmentState with get, set
member CurrentCulture : CultureInfo with get, set
member CurrentUICulture : CultureInfo with get, set
member DisableComObjectEagerCleanup : unit -> unit
member ExecutionContext : ExecutionContext
member GetApartmentState : unit -> ApartmentState
member GetCompressedStack : unit -> CompressedStack
member GetHashCode : unit -> int
...
Full name: System.Threading.Thread
--------------------
Thread(start: ThreadStart) : unit
Thread(start: ParameterizedThreadStart) : unit
Thread(start: ThreadStart, maxStackSize: int) : unit
Thread(start: ParameterizedThreadStart, maxStackSize: int) : unit
Thread.Sleep(millisecondsTimeout: int) : unit
static member BackgroundColor : ConsoleColor with get, set
static member Beep : unit -> unit + 1 overload
static member BufferHeight : int with get, set
static member BufferWidth : int with get, set
static member CapsLock : bool
static member Clear : unit -> unit
static member CursorLeft : int with get, set
static member CursorSize : int with get, set
static member CursorTop : int with get, set
static member CursorVisible : bool with get, set
...
Full name: System.Console
(+0 other overloads)
Console.WriteLine(value: string) : unit
(+0 other overloads)
Console.WriteLine(value: obj) : unit
(+0 other overloads)
Console.WriteLine(value: uint64) : unit
(+0 other overloads)
Console.WriteLine(value: int64) : unit
(+0 other overloads)
Console.WriteLine(value: uint32) : unit
(+0 other overloads)
Console.WriteLine(value: int) : unit
(+0 other overloads)
Console.WriteLine(value: float32) : unit
(+0 other overloads)
Console.WriteLine(value: float) : unit
(+0 other overloads)
Console.WriteLine(value: decimal) : unit
(+0 other overloads)
Int32.ToString(provider: IFormatProvider) : string
Int32.ToString(format: string) : string
Int32.ToString(format: string, provider: IFormatProvider) : string
Full name: AgentModelEng.ParallelExecution
from Microsoft.FSharp.Collections.ArrayModule
Full name: Microsoft.FSharp.Collections.ArrayModule.Parallel.map
Full name: AgentModelEng.AsyncParallelExecution
Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken
Full name: Microsoft.FSharp.Control.Async
--------------------
type Async<'T>
Full name: Microsoft.FSharp.Control.Async<_>
static member AddMemoryPressure : bytesAllocated:int64 -> unit
static member CancelFullGCNotification : unit -> unit
static member Collect : unit -> unit + 2 overloads
static member CollectionCount : generation:int -> int
static member GetGeneration : obj:obj -> int + 1 overload
static member GetTotalMemory : forceFullCollection:bool -> int64
static member KeepAlive : obj:obj -> unit
static member MaxGeneration : int
static member ReRegisterForFinalize : obj:obj -> unit
static member RegisterForFullGCNotification : maxGenerationThreshold:int * largeObjectHeapThreshold:int -> unit
...
Full name: System.GC
Full name: Microsoft.FSharp.Core.unit
type MailboxProcessor<'Msg> =
interface IDisposable
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
member Post : message:'Msg -> unit
member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
member Receive : ?timeout:int -> Async<'Msg>
member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
member Start : unit -> unit
member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
...
Full name: Microsoft.FSharp.Control.MailboxProcessor<_>
--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
val string : value:'T -> string
Full name: Microsoft.FSharp.Core.Operators.string
--------------------
type string = String
Full name: Microsoft.FSharp.Core.string
val int : value:'T -> int (requires member op_Explicit)
Full name: Microsoft.FSharp.Core.Operators.int
--------------------
type int = int32
Full name: Microsoft.FSharp.Core.int
--------------------
type int<'Measure> = int
Full name: Microsoft.FSharp.Core.int<_>
member Reply : value:'Reply -> unit
Full name: Microsoft.FSharp.Control.AsyncReplyChannel<_>
Full name: Microsoft.FSharp.Collections.list<_>