Transgate という Node.js 製エージェントベースのタスクフローフレームワーク
Transgate というNode.js製のエージェントベースのタスクフローフレームワークを作った。 どうして作ったのか? 自宅の家電を操作するためのプログラムを書いていたら、色々なフローがごちゃごちゃになったから。Dysonのファンから定期的に温度湿度を取得してデータベースに保存したり、Google Home/Assistant + IFTTT から来るメッセージを処理してIRKit を操作する。そのうち温度に従って自動的に IRKit 経由でエアコンを操作したくもなった、さてどう書こうかと? どんなもの? 突然だけど空港などの荷物の仕分けをイメージしてください。エージェントは、ゲートから出てくるアイテムを受け取り、処理して別のゲートに送る。ゲートの向こう側がどうなっているかは、エージェントは何も知らない。エージェントは空のアイテムを来たら作業を終える。アーキテクチャのイメージはこんな感じです。 エージェントはゲートからアイテムを受け取ることと、別のゲートに新たにアイテムを送ることができる。アイテムはシンプルなオブジェクトだ。エージェントは自身のタスクに集中できる。だから前工程や次工程が増えても減ってもアイテムの構成が変わらなければ問題なく動く。そして入出力がシンプルなためユニットテストも簡単にかける。エージェントはゲートの実体を知らないので、入力元ゲートをスタブに、出力先ゲートをモックに、簡単に置き換えられる。 フレームワークに出てくる概念のまとめ ゲート(Gate)はファイルストレージやデータベース、キュー、APIサービスといった入出力のエンドポイントです。 エージェント(Agent)はゲート間でアイテムを処理するタスクワーカーです。ゲートが何に通じているかは関知しません。 アイテム(Item)はゲート間を流れるシンプルなオブジェクトでエージェントが処理する対象です。 使用例 今回のフレームワークを作るきっかけになったホームコントロールプログラムを通じて説明してみます。 ちなみにこのプログラムは靴箱の中の Raspberry PI 上でデーモンとして動いています。 構成図 メインプログラム (main.js) 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 const { Agent, HttpClientGate, HttpServerGate, IntervalGate, JointGate, StdoutGate, duplicator, mixer, } = require('transgate'); const pino = require('pino')(); const config = require('konfig-yaml')(); const MongoGate = require('./lib/mongo_gate'); const IRKitGate = require('./lib/irkit_gate'); // Agent const AnalysisCommander = require('./lib/analysis_commander'); const DysonCoolLinkRecorder = require('./lib/dyson/cool_link_recorder'); const EnvironmentalAnalyzer = require('./lib/environmental_analyzer'); // Gate const slackGate = new HttpClientGate({ endpoint: config.slack.webhook_url }); const iftttGate = new HttpServerGate({ port: config.port }); const irkitGate = new IRKitGate(config.irkit.endpoint); const intervalGate = new IntervalGate(60); const mongoGate = new MongoGate(config.mongodb.endpoint, config.mongodb.collection); const drToEaGate = new JointGate(); (async () => { try { await Agent.all( new AnalysisCommander(iftttGate, { irkitGate, slackGate }), new DysonCoolLinkRecorder(intervalGate, duplicator(mongoGate, drToEaGate)), new EnvironmentalAnalyzer(drToEaGate, { irkitGate, slackGate }), ); } catch(err) { pino.error(err); await iftttGate.close(); await mongoGate.close(); } intervalGate.clear(); })() .catch(err => { pino.error(err); }); 7つのゲート slackGate は slack にテキストメッセージをポストします。HttpClientGate のインスタンスで、アイテムとなるJSON は { "text": "<text message>" } です。 iftttGate は IFTTT の webhook から受け取った JSON をアイテムとして利用します。アイテムとなるJSON は { "target": "TV", "text": "<speaking words>" } です。 irkitGate はHTTPインターフェイスを備える赤外線送信器に命令します。アイテムとなるJSON は { "command": "celling_light_off" } です。 intervalGate は一定の間隔でアイテムを生成します。アイテムは { "time": <Date instance> } です。この場合は 1 分おきにエージェントの処理を走らせます。 mongoGate は MongoDB の指定のコレクションに送信されたアイテムを登録します。 drToEaGate は後述の DysonCoolLinkRecorder から EnvironmentalAnalyzer にアイテムの流すジョイントです。 3つのエージェント AnalysisCommander は IFTTT の webhook から来た JSON をアイテムとして受け取り、操作対象とテキストから IRKit に対して送信すべき赤外線信号を指定します。slack には文言が解釈できなかったときにポストします。 DysonCoolLinkRecorder は Dyson PureCoolLink ファンから1分おきに温度と湿度を取得して、duplicator という複製機を挟んで MongoDB への書き込みとジョイントとなるゲートに送ります。 EnvironmentalAnalyzer はそのジョイントを通じて来た温度から閾値を超えていたらエアコンの操作を IRKit に要求します。自動的に操作をしたときは slack に記録します。 エージェントの実装 Agentのサブクラスを作ります。main メソッドで受け取ったアイテムを処理して指定先のゲートに新たなアイテムを送る処理を書きます。before/after のフックメソッドを使って、初期化処理や別に利用するプロセス(例えば headless chrome) をここで制御(起動・停止)します。 ...