The Greater Helsinki Area F# User Group - F# & Azure - FI EN

Agent-malli ja Actor-malli

Puhuttaessa Actor-mallista termi "Actor" viittaa "toimijaan" tai "tekijään", ei "näyttelijään". Puhuttaessa Agent-mallista termi "Agent" viittaa myös "toimijaan" tai "edustajaan", mutta ei "salaiseen agenttiin". Actor-malli ei ole vaihtoehto domain-mallille: Actor-malli ei ole varsinainen tietomalli, vaan lähinnä ohjelman tilan toteutustekniikka.

Jos haluat testailla nopeuksia, saat päälle ajanoton F#-interactiveen kirjoittamalla:

1: 
#time "on";;

Rinnakkaissuorituksesta

Rinnakkaista suorittamista on kolmea tyyppiä:

  • Parallel Execution
    • Ajetaan samaa koodipolkua yhtaikaa monessa säikeessä.
    • Sopii rinnakkaisiin joukko-operaatioihin itsenäisille alkioille
    • Sopii tarvittaessa CPU-resursseja paljon ja kun niitä on myös käytettävissä.
    • .NET-ympäristössä: Task Parallel Library (TPL), .AsParallel()
  • Concurrency.
    • Ajetaan eri koodipolkuja yhtaikaa.
    • Sopii silloin kun odotettavat resurssit ovat jotain muuta kuin CPU: Esim I/O tai se, että "odotetaan muita, odotetaan synkronointia".
    • Perustuu siihen, että luovutetaan oma säie pois, kun sitä ei tarvita: Kaikkien muidenkin on tehtävä niin!
    • .NET-ympäristössä: Async-await
    • Hyödynnetään yleisesti viestipohjaisissa ("Message Passing") arkkitehtuureissa
  • Lukitaan säikeet ja tila
    • Ei ajeta rinnakkain
    • Ei synkronointikysymyksiä
    • .NET: lock
    • Skaalautuvuus- ja suorituskykyongelmat

Asiaa havainnollistamaan, tässä kolme pientä ohjelmaa, jotka lähinnä kelaavat listan läpi ja nukkuvat joka alkiolla hetken:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
31: 
32: 
33: 
34: 
35: 
36: 
37: 
38: 
39: 
40: 
41: 
42: 
43: 
44: 
45: 
46: 
47: 
48: 
49: 
#if INTERACTIVE
#time "on"
#endif

open System
open System.Threading
open System.Threading.Tasks

let Execution() = 
    let iteri() = 
        [|1..1000|] |> Array.map(
            fun i -> 
                Thread.Sleep 200 //Lock the thread
                match i % 250 with
                | 0 -> Console.WriteLine("Item: " + i.ToString())
                | _ -> ()
        )
    Console.WriteLine("Starting...")
    let complete = iteri()
    Console.WriteLine("Done")

let ParallelExecution() = //TPL
    let iteri() = 
        [|1..1000|] |> Array.Parallel.map(
            fun i -> 
                Thread.Sleep 200
                match i % 250 with
                | 0 -> Console.WriteLine("Item: " + i.ToString())
                | _ -> ()
        )
    Console.WriteLine("Starting...")
    let complete = iteri()
    Console.WriteLine("Done")

let AsyncParallelExecution() = //Async-await
    let iteri() = 
        [|1..1000|] |> Array.map(
            fun i -> 
                async {
                    do! Async.Sleep 200
                    return 
                        match i % 250 with
                        | 0 -> Console.WriteLine("Item: " + i.ToString())
                        | _ -> ()
                }
        )
    Console.WriteLine("Starting...")
    let complete = iteri() |> Async.Parallel |> Async.RunSynchronously
    Console.WriteLine("Done")

Ensimmäinen on hidas. Seuraava on huomattavasti nopeampi. Mutta sille käy niin, että koneesta loppuvat threadit kesken. Kolmas on selvästi nopein, koska se vapauttaa säikeet suorittaessaan (ei-cpu-riippuvaista) async-operaatiota.

Tässä vastaavat suoritusajat (Interactivessa, i7-kannettavalla):

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
25: 
26: 
27: 
28: 
29: 
30: 
> Execution();;
Starting...
Item: 250
Item: 500
Item: 750
Item: 1000
Done
Real: 00:03:20.582, CPU: 00:00:00.000, GC gen0: 0, gen1: 0, gen2: 0
val it : unit = ()
> 

> ParallelExecution();;
Starting...
Item: 250
Item: 500
Item: 1000
Item: 750
Done
Real: 00:00:22.266, CPU: 00:00:00.000, GC gen0: 0, gen1: 0, gen2: 0
val it : unit = ()

> AsyncParallelExecution();;
Starting...
Item: 1000
Item: 750
Item: 500
Item: 250
Done
Real: 00:00:00.204, CPU: 00:00:00.000, GC gen0: 0, gen1: 0, gen2: 0
val it : unit = ()

Viestijonot

Erilaisia MQ-jono-tuotteita on useita. Mitäpä jos yksinkertainen viestijonotoiminnallisuus olisi integroitu suoraa ohjelmointikieleen?

Testaa oheinen koodi interactiven avulla:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
let myQueue = 
    new MailboxProcessor<string>(fun myHandler -> 
        let rec readNextMessage() =
            async {
                let! result = myHandler.Receive()
                System.Console.WriteLine("Handled: " + result)
                do! readNextMessage()
            }
        readNextMessage())

myQueue.Post("Hello 1 !")
myQueue.Post("Hello 2 !")

myQueue.CurrentQueueLength // 2

myQueue.Start()

myQueue.CurrentQueueLength // 0

myQueue.Post("Hello 3 !")

Actor-malli

Mitä jos "oliot" tilan kapseloijina korvattaisiin viestipohjaisella ratkaisulla?

Actor-malli on teoria, joka käyttää “Actor”eita rinnakkaislaskennan alkioina. Actor voi prosessoida tietoa, varastoida tietoa tai kommunikoida muiden actoreiden kanssa. Actor voi jopa ottaa vastaan viestissä itsellensä uuden toiminnallisuuden.

Actor-malli on lähtöisin Erlang-kielestä, mutta nykyään vaihtoehtoja alkaa löytyä jo monille eri kielille.

Tausta-ajatusta voisi verrata hajautettuun versiohallintajärjestelmään: ohjelmaa ei ikinä voi ”pysäyttää ja katsoa missä tilassa se on”, vaan tila on kokoajan muuttuva käsite ja eri katsojalle näyttää erilaiselta.

Agent-malli

Agent-malli on käytännössä sama asia kuin Actor-malli, ehkä sillä pienellä erolla, että agent ottaa vastaan pyyntöjä, joten rajapinta on kiinteämpi. F#:ssa Agent-malli on kielessä suoraa tuettuna ja kapseloituna luokkaan nimeltä MailboxProcessor. Usein käytännössä tämä konkretisoituu siten, että isäntä vaan kyselee asioita, ja taustasäie sykkii parsien tapahtumahistoriaa joukko-operaatioilla.

Edellinen esimerkki lähetti viestikanavana merkkijonoja. Kätevämpää on kuitenkin lähettää discriminated union komentoja ja niiden parametreja.

Staattinen MailboxProcessor.Start on sama kuin kutsuisi ensin new MailboxProcessor() ja sitten sille .Start(). Agentille on helppo rakentaa myös timeoutit, virheenkäsittely ja peruutus-operaatiot.

Testaa oheista koodia:

 1: 
 2: 
 3: 
 4: 
 5: 
 6: 
 7: 
 8: 
 9: 
10: 
11: 
12: 
13: 
14: 
15: 
16: 
17: 
18: 
19: 
20: 
21: 
22: 
23: 
24: 
type ``Person agent methods`` =
| AddPerson of string * int //Name*Age
| GetPeople of AsyncReplyChannel<(string*int) list>

let myQueue2 = 
    MailboxProcessor<``Person agent methods``>.Start(fun myHandler -> 
        let rec readNextMessage people =
            async {
                let! result = myHandler.Receive()
                match result with
                | AddPerson(n,s) ->
                    return! readNextMessage ((n,s)::people)
                | GetPeople(response) ->
                    let p = people
                    response.Reply(p)
                    return! readNextMessage people
            }
        readNextMessage [])

let addPerson =  AddPerson >> myQueue2.Post 
("Matti", 30) |> addPerson
("Maija", 7) |> addPerson 

let res = myQueue2.PostAndReply(GetPeople)

PostAndReply ohella on myös AsyncPostAndReply

Tarvitaan lisää...

Tämä oli vasta yksi agent, joka ei itsessään vielä oikein eroa .NET ConcurrentDictionary:stä muuta kuin siten, että ohjelmointirajapinta on business-loogisia metodeita, eikä CRUD-operaatioita. Mutta varsinaiset hyödyt aletaan saada vasta kun agentteja on useita:

Olion periyttämisen (tai enum-tyyppiproperyjen) sijaan käytetäänkin sitä, että agentit keskustelevat keskenään.

(kuvan lähde: Writing Concurrent Applications Using F# Agents)

Harjoitustehtävä

Harjoitus 1

Lisää "Person agent methods"-metodiin uusi metodi, joka palauttaa vain yli 18 vuotiaat ihmiset.

Harjoitus 2

Muuta "Person agent methods" agentti olemaan "prosessoimattomat ihmiset" ja tee rinnalle toinen agentti, "prosessoidut ihmiset":

  • Tee pieni business-logiikka ja kommunikaatio, miten agentti prosessoimaton ihminen siirtyy prosessoiduksi.
  • Vaihda rajapinta asynkroniseksi (AsyncPostAndReply)
  • Voit myös kapseloida agentin tyypin sisälle.

Takaisin valikkoon

namespace System
namespace System.Threading
namespace System.Threading.Tasks
val Execution : unit -> unit

Full name: AgentModelFin.Execution
val iteri : (unit -> unit [])
type Array =
  member Clone : unit -> obj
  member CopyTo : array:Array * index:int -> unit + 1 overload
  member GetEnumerator : unit -> IEnumerator
  member GetLength : dimension:int -> int
  member GetLongLength : dimension:int -> int64
  member GetLowerBound : dimension:int -> int
  member GetUpperBound : dimension:int -> int
  member GetValue : params indices:int[] -> obj + 7 overloads
  member Initialize : unit -> unit
  member IsFixedSize : bool
  ...

Full name: System.Array
val map : mapping:('T -> 'U) -> array:'T [] -> 'U []

Full name: Microsoft.FSharp.Collections.Array.map
val i : int
Multiple items
type Thread =
  inherit CriticalFinalizerObject
  new : start:ThreadStart -> Thread + 3 overloads
  member Abort : unit -> unit + 1 overload
  member ApartmentState : ApartmentState with get, set
  member CurrentCulture : CultureInfo with get, set
  member CurrentUICulture : CultureInfo with get, set
  member DisableComObjectEagerCleanup : unit -> unit
  member ExecutionContext : ExecutionContext
  member GetApartmentState : unit -> ApartmentState
  member GetCompressedStack : unit -> CompressedStack
  member GetHashCode : unit -> int
  ...

Full name: System.Threading.Thread

--------------------
Thread(start: ThreadStart) : unit
Thread(start: ParameterizedThreadStart) : unit
Thread(start: ThreadStart, maxStackSize: int) : unit
Thread(start: ParameterizedThreadStart, maxStackSize: int) : unit
Thread.Sleep(timeout: TimeSpan) : unit
Thread.Sleep(millisecondsTimeout: int) : unit
type Console =
  static member BackgroundColor : ConsoleColor with get, set
  static member Beep : unit -> unit + 1 overload
  static member BufferHeight : int with get, set
  static member BufferWidth : int with get, set
  static member CapsLock : bool
  static member Clear : unit -> unit
  static member CursorLeft : int with get, set
  static member CursorSize : int with get, set
  static member CursorTop : int with get, set
  static member CursorVisible : bool with get, set
  ...

Full name: System.Console
Console.WriteLine() : unit
   (+0 other overloads)
Console.WriteLine(value: string) : unit
   (+0 other overloads)
Console.WriteLine(value: obj) : unit
   (+0 other overloads)
Console.WriteLine(value: uint64) : unit
   (+0 other overloads)
Console.WriteLine(value: int64) : unit
   (+0 other overloads)
Console.WriteLine(value: uint32) : unit
   (+0 other overloads)
Console.WriteLine(value: int) : unit
   (+0 other overloads)
Console.WriteLine(value: float32) : unit
   (+0 other overloads)
Console.WriteLine(value: float) : unit
   (+0 other overloads)
Console.WriteLine(value: decimal) : unit
   (+0 other overloads)
Int32.ToString() : string
Int32.ToString(provider: IFormatProvider) : string
Int32.ToString(format: string) : string
Int32.ToString(format: string, provider: IFormatProvider) : string
val complete : unit []
val ParallelExecution : unit -> unit

Full name: AgentModelFin.ParallelExecution
module Parallel

from Microsoft.FSharp.Collections.ArrayModule
val map : mapping:('T -> 'U) -> array:'T [] -> 'U []

Full name: Microsoft.FSharp.Collections.ArrayModule.Parallel.map
val AsyncParallelExecution : unit -> unit

Full name: AgentModelFin.AsyncParallelExecution
val iteri : (unit -> Async<unit> [])
val async : AsyncBuilder

Full name: Microsoft.FSharp.Core.ExtraTopLevelOperators.async
Multiple items
type Async
static member AsBeginEnd : computation:('Arg -> Async<'T>) -> ('Arg * AsyncCallback * obj -> IAsyncResult) * (IAsyncResult -> 'T) * (IAsyncResult -> unit)
static member AwaitEvent : event:IEvent<'Del,'T> * ?cancelAction:(unit -> unit) -> Async<'T> (requires delegate and 'Del :> Delegate)
static member AwaitIAsyncResult : iar:IAsyncResult * ?millisecondsTimeout:int -> Async<bool>
static member AwaitTask : task:Task<'T> -> Async<'T>
static member AwaitWaitHandle : waitHandle:WaitHandle * ?millisecondsTimeout:int -> Async<bool>
static member CancelDefaultToken : unit -> unit
static member Catch : computation:Async<'T> -> Async<Choice<'T,exn>>
static member FromBeginEnd : beginAction:(AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg:'Arg1 * beginAction:('Arg1 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * beginAction:('Arg1 * 'Arg2 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromBeginEnd : arg1:'Arg1 * arg2:'Arg2 * arg3:'Arg3 * beginAction:('Arg1 * 'Arg2 * 'Arg3 * AsyncCallback * obj -> IAsyncResult) * endAction:(IAsyncResult -> 'T) * ?cancelAction:(unit -> unit) -> Async<'T>
static member FromContinuations : callback:(('T -> unit) * (exn -> unit) * (OperationCanceledException -> unit) -> unit) -> Async<'T>
static member Ignore : computation:Async<'T> -> Async<unit>
static member OnCancel : interruption:(unit -> unit) -> Async<IDisposable>
static member Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
static member Sleep : millisecondsDueTime:int -> Async<unit>
static member Start : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions * ?cancellationToken:CancellationToken -> Task<'T>
static member StartChild : computation:Async<'T> * ?millisecondsTimeout:int -> Async<Async<'T>>
static member StartChildAsTask : computation:Async<'T> * ?taskCreationOptions:TaskCreationOptions -> Async<Task<'T>>
static member StartImmediate : computation:Async<unit> * ?cancellationToken:CancellationToken -> unit
static member StartWithContinuations : computation:Async<'T> * continuation:('T -> unit) * exceptionContinuation:(exn -> unit) * cancellationContinuation:(OperationCanceledException -> unit) * ?cancellationToken:CancellationToken -> unit
static member SwitchToContext : syncContext:SynchronizationContext -> Async<unit>
static member SwitchToNewThread : unit -> Async<unit>
static member SwitchToThreadPool : unit -> Async<unit>
static member TryCancelled : computation:Async<'T> * compensation:(OperationCanceledException -> unit) -> Async<'T>
static member CancellationToken : Async<CancellationToken>
static member DefaultCancellationToken : CancellationToken

Full name: Microsoft.FSharp.Control.Async

--------------------
type Async<'T>

Full name: Microsoft.FSharp.Control.Async<_>
static member Async.Sleep : millisecondsDueTime:int -> Async<unit>
static member Async.Parallel : computations:seq<Async<'T>> -> Async<'T []>
static member Async.RunSynchronously : computation:Async<'T> * ?timeout:int * ?cancellationToken:CancellationToken -> 'T
type GC =
  static member AddMemoryPressure : bytesAllocated:int64 -> unit
  static member CancelFullGCNotification : unit -> unit
  static member Collect : unit -> unit + 2 overloads
  static member CollectionCount : generation:int -> int
  static member GetGeneration : obj:obj -> int + 1 overload
  static member GetTotalMemory : forceFullCollection:bool -> int64
  static member KeepAlive : obj:obj -> unit
  static member MaxGeneration : int
  static member ReRegisterForFinalize : obj:obj -> unit
  static member RegisterForFullGCNotification : maxGenerationThreshold:int * largeObjectHeapThreshold:int -> unit
  ...

Full name: System.GC
type unit = Unit

Full name: Microsoft.FSharp.Core.unit
Multiple items
type MailboxProcessor<'Msg> =
  interface IDisposable
  new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
  member Post : message:'Msg -> unit
  member PostAndAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply>
  member PostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply
  member PostAndTryAsyncReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> Async<'Reply option>
  member Receive : ?timeout:int -> Async<'Msg>
  member Scan : scanner:('Msg -> Async<'T> option) * ?timeout:int -> Async<'T>
  member Start : unit -> unit
  member TryPostAndReply : buildMessage:(AsyncReplyChannel<'Reply> -> 'Msg) * ?timeout:int -> 'Reply option
  ...

Full name: Microsoft.FSharp.Control.MailboxProcessor<_>

--------------------
new : body:(MailboxProcessor<'Msg> -> Async<unit>) * ?cancellationToken:CancellationToken -> MailboxProcessor<'Msg>
Multiple items
val string : value:'T -> string

Full name: Microsoft.FSharp.Core.Operators.string

--------------------
type string = String

Full name: Microsoft.FSharp.Core.string
Multiple items
val int : value:'T -> int (requires member op_Explicit)

Full name: Microsoft.FSharp.Core.Operators.int

--------------------
type int = int32

Full name: Microsoft.FSharp.Core.int

--------------------
type int<'Measure> = int

Full name: Microsoft.FSharp.Core.int<_>
type AsyncReplyChannel<'Reply>
member Reply : value:'Reply -> unit

Full name: Microsoft.FSharp.Control.AsyncReplyChannel<_>
type 'T list = List<'T>

Full name: Microsoft.FSharp.Collections.list<_>

Creative Commons -copyright Tuomas Hietanen, 2014, thorium(at)iki.fi, Creative Commons