Transgate というNode.js製のエージェントベースのタスクフローフレームワークを作った。

どうして作ったのか?

自宅の家電を操作するためのプログラムを書いていたら、色々なフローがごちゃごちゃになったから。Dysonのファンから定期的に温度湿度を取得してデータベースに保存したり、Google Home/Assistant + IFTTT から来るメッセージを処理してIRKit を操作する。そのうち温度に従って自動的に IRKit 経由でエアコンを操作したくもなった、さてどう書こうかと?

どんなもの?

突然だけど空港などの荷物の仕分けをイメージしてください。エージェントは、ゲートから出てくるアイテムを受け取り、処理して別のゲートに送る。ゲートの向こう側がどうなっているかは、エージェントは何も知らない。エージェントは空のアイテムを来たら作業を終える。アーキテクチャのイメージはこんな感じです。

エージェントはゲートからアイテムを受け取ることと、別のゲートに新たにアイテムを送ることができる。アイテムはシンプルなオブジェクトだ。エージェントは自身のタスクに集中できる。だから前工程や次工程が増えても減ってもアイテムの構成が変わらなければ問題なく動く。そして入出力がシンプルなためユニットテストも簡単にかける。エージェントはゲートの実体を知らないので、入力元ゲートをスタブに、出力先ゲートをモックに、簡単に置き換えられる。

フレームワークに出てくる概念のまとめ

使用例

今回のフレームワークを作るきっかけになったホームコントロールプログラムを通じて説明してみます。 ちなみにこのプログラムは靴箱の中の Raspberry PI 上でデーモンとして動いています。

構成図

f:id:tilfin:20171123144501p:plain

メインプログラム (main.js)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
const {
Agent,
HttpClientGate,
HttpServerGate,
IntervalGate,
JointGate,
StdoutGate,
duplicator,
mixer,
} = require('transgate');

const pino = require('pino')();
const config = require('konfig-yaml')();

const MongoGate = require('./lib/mongo_gate');
const IRKitGate = require('./lib/irkit_gate');

// Agent
const AnalysisCommander = require('./lib/analysis_commander');
const DysonCoolLinkRecorder = require('./lib/dyson/cool_link_recorder');
const EnvironmentalAnalyzer = require('./lib/environmental_analyzer');

// Gate
const slackGate = new HttpClientGate({ endpoint: config.slack.webhook_url });
const iftttGate = new HttpServerGate({ port: config.port });
const irkitGate = new IRKitGate(config.irkit.endpoint);
const intervalGate = new IntervalGate(60);
const mongoGate = new MongoGate(config.mongodb.endpoint, config.mongodb.collection);
const drToEaGate = new JointGate();

(async () => {
try {
await Agent.all(
new AnalysisCommander(iftttGate, { irkitGate, slackGate }),
new DysonCoolLinkRecorder(intervalGate, duplicator(mongoGate, drToEaGate)),
new EnvironmentalAnalyzer(drToEaGate, { irkitGate, slackGate }),
);
} catch(err) {
pino.error(err);
await iftttGate.close();
await mongoGate.close();
}

intervalGate.clear();
})()
.catch(err => {
pino.error(err);
});

7つのゲート

3つのエージェント

エージェントの実装

Agentのサブクラスを作ります。main メソッドで受け取ったアイテムを処理して指定先のゲートに新たなアイテムを送る処理を書きます。before/after のフックメソッドを使って、初期化処理や別に利用するプロセス(例えば headless chrome) をここで制御(起動・停止)します。

下記は EnvironmentalAnalyzer の実装例でです。室温が摂氏17度以下になったらエアコンをオンにします。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
const { Agent } = require('transgate');

module.exports =
class EnvironmentalAnalyzer extends Agent {
async before() {
this._preTemp = null;
this._airconAlive = false;
}

async main(item, { irkitGate, slackGate }) {
const curTemp = item.temp;

if (this._preTemp && this._preTemp > 17 && curTemp <= 17) {
if (!this._airconAlive) {
await irkitGate.sendAll({ command: 'aircon_on' });
this._airconAlive = true;
await slackGate.send({ text: `Turn on aircon because temp is down to ${curTemp}` });
}
}

this._preTemp = curTemp;
}
}

コンストラクタとアイテムの入力元ゲートが隠蔽されているのは、 null を受け取ると次のゲートに送り、自身は終了するという仕様の実装を意識させないためです。

特徴のまとめ

予想される疑問と答え

参照先サービスは全部ゲートになるのか?

Noです。ゲート間は一方通行に限定されます。エージェントはその先を知らない。つまりリクエストを投げて、それに対するレスポンスを得ることはできません。往復ではなく、ループにすることは可能ですが、ステートレスなのでどの送り出したアイテム(リクエスト)に対してのレスポンスかはわからないのです。ゲートは、エージェントにとってトリガーとなるものを出す部分と成果を送る部分になります。

一連のフローが終わったら時にキッカーに完了を通知するには?

キューシステムはタスクが完了したら完了通知を送る必要が往往にあります。こういった場合は、アイテムにそのコンテキストを持たせてフローに流して、最後のゲートが完了通知を送る役割を担うようにします。

ロガーはゲートにすべきか?

ログがアウトプットそのものになるならゲートにすべきです。そうすれば後からゲートをさらに Agent にジョイントするものに置き換えて、そこからログ解析サービスに投げるといった修正も容易にできます。

ゲートにどこまでロジックを含めていいのか?

ゲートはできる限りシンプルな方が良いです。エージェントはテストしやすいように設計しますが、ゲートそのものにロジックを入れてしまうと入出力先を付け替えてテストできなくなります。ただプロジェクト共通のロジックでそれがフォーマット程度であれば、ゲートに実装してもいいでしょう。複雑ならばそれ用のエージェントを作ってゲートの前に置き、ジョイントで繋げるだけです。

Transgate に興味を持っていただけたら幸いです。

English version Transgate is Agent-based taskflow framework for Node.js