自律的に動作する簡単な信号機

色(Green, Yellow, Red)が一定時間ごとに変化する信号機をモデリングします。

各色の状態が、次の時間だけ持続するものとします。

時間
Green 50
Yellow 10
Red 60

このモデルは、入力を取らず、出力として、色が変化するときに、どの色に変化するかを出力します。

ライブラリのロード

PopDEVS のアセンブリと、グラフ表示に使用する XPlot を読み込みます。

In [1]:
#r "../PopDEVS/bin/Debug/netstandard2.0/PopDEVS.dll"
open PopDEVS

#load "XPlot.Plotly.Paket.fsx"
#load "XPlot.Plotly.fsx"
open XPlot.Plotly

状態と出力を表す型

モデルの状態と出力は同じく色なので、次の TrafficLightColor 型を、状態と出力として使用します。

In [2]:
type TrafficLightColor =
    | Green
    | Yellow
    | Red

信号機を表す Atomic モデル

P-DEVS において、 Atomic モデルは、次の 4 つの要素で構成されます。

  • 状態遷移関数 状態 * 経過時間 * 入力 -> 状態
  • 時間経過関数 状態 -> 進める時間
  • 出力関数 状態 -> 出力
  • 初期状態

(状態遷移関数は、 internal, external, confluent の 3 種類がありますが、それぞれは汎用的な状態遷移関数の特殊な場合として考えます。)

状態遷移関数

状態遷移関数は、現在の状態を引数に取り、次の状態を返します。入力がある場合は、入力を処理します。簡単な信号機のモデルでは、入力はないので、単に色を変化させます。

信号機は、次のように色が変化します: Green → Yellow → Red → Green → ……

これを関数として書き表すと、次のようになります。

In [3]:
let transition (state, _elapsed, _inputBag) =
    match state with
    | Green -> Yellow
    | Yellow -> Red
    | Red -> Green

時間経過関数

時間経過関数は、現在の状態を引数に取り、この状態がどのくらいの時間持続するかを返します。各色がどのくらい持続するかは、最初に示したので、その通りに実装します。

In [4]:
let timeAdvance state =
    match state with
    | Green -> 50.0
    | Yellow -> 10.0
    | Red -> 60.0

出力関数

出力関数は、現在の状態を引数に取り、時間経過関数の時間だけ経過したときの出力を返します。もし、時間が経過する前に入力があった場合は、出力されません。

このモデルにおいて、出力は、色が変化するときに、どの色に変化するかを出力します。したがって、ある状態のときの出力は、その次の色の出力になります。つまり、状態が Green のとき、出力は Yellow、 Yellow のとき Red、 Red のとき Green となります。

これを関数として書き表します。戻り値は 0 個以上の出力、つまり出力のシーケンスなことに注意し、このモデルでは、必ず 1 個のイベントを出力します。

In [5]:
let output state =
    let outputValue =
        match state with
        | Green -> TrafficLightColor.Yellow
        | Yellow -> TrafficLightColor.Red
        | Red -> TrafficLightColor.Green
    Seq.ofArray [| outputValue |]

初期状態

このモデルにおいては、初期状態はいずれかの色となります。ここでは、 Green を初期状態とすることにしましょう。

これで Atomic モデルを構成するすべての要素を用意することができました。 AtomicModel.create を用いて、 PopDEVS の AtomicModel インスタンスを作成します。

In [6]:
let initialState = TrafficLightColor.Green

let trafficLightModel : AtomicModel<VoidEvent, TrafficLightColor> =
    AtomicModel.create
        (transition, timeAdvance, output)
        initialState
    |> AtomicModel.withName "TrafficLight"

出力を観測する

作成したモデルをシミュレーションする前に、シミュレーション結果を観測する方法を用意します。 EventObserver モジュールに、モデルの出力に対してアクションを起こすための手段が用意してあります。

ここでは、信号機モデルの出力をコンソールに表示させてみましょう。 T 型のイベントを出力するモデルを観測するアクションを、 EventObserver.ObservedEvent<T> -> unit の関数として用意します。ここでは TTrafficLightColor です。

In [7]:
let observe (ev: EventObserver.ObservedEvent<TrafficLightColor>) =
    printfn "%O (Time = %f)" ev.Event ev.Time

観測を行うオブジェクトも、ひとつの Atomic モデルとして扱います。したがって、観測を行うには、複数のモデルをまとめてシミュレーションする必要があります。複数のモデルの管理には Coupled モデルを使用します。 Coupled モデルは CoupledModelBuilder を通じて作成します。

まず、新たな Coupled モデルを作成し、信号機モデルを追加します。

In [8]:
// 外部からの入出力のない Coupled モデル
let systemBuilder = CoupledModelBuilder<VoidEvent, VoidEvent>()

let modelRef = systemBuilder.AddComponent(trafficLightModel)

この Coupled モデル内の信号機モデルを modelRef で表します。 modelRef の出力を、先ほど宣言した observe 関数で観測します。 EventObserver.observe 関数を呼び出すと、観測するための Atomic モデルを作成し、 Coupled モデルに追加します。

In [9]:
let observerModel, observerRef =
    EventObserver.observe observe systemBuilder modelRef
// observerRef  modelRef と同じように Coupled モデル内におけるこの観測用のモデルを表します

これで、必要なモデルを Coupled モデルに追加できたので、最後に CoupledModelBuilder.Build() を呼び出して、イミュータブルな CoupledModel に変換します。

In [10]:
let system = systemBuilder.Build()

シミュレーションを実行する

モデルをシーケンシャル(並列化しない)に実行する SequentialRunner を用いて、シミュレーションを行います。信号機モデルは、永遠に色が変化し続けるモデルなので、終了条件をしていしなければシミュレーションは終了しません。ここでは、時刻 400 までシミュレーションを行います。もし、必ず停止するモデルを、停止するまで実行したい場合は、 400.0 の代わりに infinity を指定します。

In [11]:
let runner = SequentialRunner.Create(system)
runner.RunUntil(400.0)
Yellow (Time = 50.000000)
Red (Time = 60.000000)
Green (Time = 120.000000)
Yellow (Time = 170.000000)
Red (Time = 180.000000)
Green (Time = 240.000000)
Yellow (Time = 290.000000)
Red (Time = 300.000000)
Green (Time = 360.000000)

タイミングチャートにプロットする

シミュレーションが動きました! パチパチパチ

ただ、文字が表示されるだけでは味気ないので、結果をタイミングチャートに表示してみましょう。観測関数で出力をリストに記録し、シミュレーションが完了したら、タイミングチャートを表示します。

観測関数を次のように書き換え、 Coupled モデルを作成し直します。

In [12]:
let outputList = System.Collections.Generic.List()
outputList.Add((0.0, initialState)) // 初期状態

let observe (ev: EventObserver.ObservedEvent<TrafficLightColor>) =
    outputList.Add((ev.Time, ev.Event))

let system =
    let systemBuilder = CoupledModelBuilder<VoidEvent, VoidEvent>()
    let modelRef = systemBuilder.AddComponent(trafficLightModel)
    EventObserver.observe observe systemBuilder modelRef |> ignore
    systemBuilder.Build()

これで、シミュレーションを実行すれば、 outputList に結果が挿入されているはずです。

In [13]:
let runner = SequentialRunner.Create(system)
runner.RunUntil(400.0)

outputList
Out[13]:
seq [(0.0, Green); (50.0, Yellow); (60.0, Red); (120.0, Green); ...]

結果がリストに挿入されていることが確認できたので、 XPlot でプロットしてみます。

In [14]:
let colorToNum = function
    | Green -> 0
    | Yellow -> 1
    | Red -> 2

let scatter =
    Scatter(
        x = (outputList |> Seq.map (fun (time, _) -> time)),
        y = (outputList |> Seq.map (fun (_, color) -> colorToNum color)),
        mode = "lines+markers",
        line = Line(shape = "hv"))
        
let layout =
    Layout(
        yaxis = Yaxis(
            tickmode = "array",
            tickvals = [| 0; 1; 2 |],
            ticktext = [| "Green"; "Yellow"; "Red" |]))

Chart.Plot scatter
|> Chart.WithLayout layout
|> Chart.WithHeight 300
Out[14]:
In [ ]: