Streamと仲良くなる
今までなんとなくでStreamを触ってきたけど、そろそろちゃんと理解しておきたいので記事に残すことにした。基本的なところからエラーハンドリングまで調べたり手を動かして検証した。
Streamの生成
async*による生成
Stream<int> countStream(int max) async* {
for (var i = 0; i <= max; i++) {
yield i;
}
}
async*がついた関数を呼ぶとStreamが生成され、関数が終了するとそのStreamは終了する。その間、yieldやyield*によってStreamに値を送ることができる。
yieldは値をそのままStreamに送るのに対して、yield*は別のStreamを受け取り、そのStreamから受け取った値をyieldする。
別のStreamから生成
final stream = countStream(10).map((n) => n * 2);
Streamクラスには新しいStreamを生成するための便利なメソッドが用意されている。例えば、以下のようなメソッドが定義されている。
-
map: 引数の関数で変換した値を送る新たなStreamを生成する。 -
where: 引数の条件を満たす値を送る新たなStreamを生成する。 -
take: 最初のN件の値だけを返す新たなStreamを生成する。 -
skip: 最初のN件をスキップした残りの値を返す新たなStreamを生成する。 -
cast: 実行時に型キャストした値を返す新たなStreamを生成する。
StreamControllerによる生成
これだけで1本記事が書けそうなので今回は割愛する。
エラーハンドリング
Stream<int> countStream(int max) async* {
for (var i = 0; i <= max; i++) {
if (i == 5) {
// throw Exception('error for $i') はNG
yield* Stream.error(Exception('error for $i'));
} else {
yield i;
}
}
}
Streamの生成時にエラーが発生した場合、エラーをStreamに送る必要がある。throwしてしまうと、Streamを生成する関数自体が例外を投げてしまう。エラーをStreamに送ると、後述するようにhandleError等によって利用側がエラーハンドリングできるようになる。
async*を使って生成する場合、上のようにStream.errorで単一のエラーを送るStreamを作りyield*に渡すことでエラーをStreamに送ることができる(もっと簡単にできる方法があれば教えてください)。map等で別のStreamから生成する場合も同様にできるはず。
Streamの利用
await forによる利用
void main() async {
var total = 0;
await for (final n in countStream(10)) {
total += n;
}
print(total);
}
await for文を使うと、Streamから値を受け取るまで待機し、値を受け取ったら処理できる。
Streamクラスのメソッドによる利用
void main() async {
final total = await countStream(10).reduce((a, b) => a + b);
print(total);
}
StreamクラスにはStreamの値を処理して結果をFutureとして出力するメソッドが用意されている。例えば、以下のようなメソッドが定義されている。
-
any: Streamの値のいずれかが条件を満たすかを返す。 -
contains: Streamに引数の値が含まれるかを返す。 -
reduce: Streamの値を集約して一つの値を返す。 -
drain: Streamが完了したら引数の値を返し、エラーが発生したらエラーを返す。 -
toList: Streamの値をListにして返す。
listenによる利用
void main() {
var total = 0;
countStream(10).listen(
(n) => total += n,
onDone: () => print(total),
);
}
listenメソッドを使うと、Streamの値を受け取ったときの処理、Streamが完了したときの処理などをより汎用的に定義できる。
エラーハンドリング
Streamの生成側のエラーハンドリングで使ったコードを利用する場合を考える。
void main() async {
var total = 0;
try {
await for (final n in countStream(10)) {
total += n;
}
} catch (error) {
print(error); //=> Exception: error for 5
}
print(total); //=> 10
}
await forで値を受け取る場合、途中でエラーを受け取るとawait forはStreamの処理を中止しそのエラーをthrowする。なので、エラーハンドリングするにはawait forをtry ... catchで囲む必要がある。そして、エラーを受け取るまでは処理されるが、catchした時点ではループを抜けているため、その後の処理を継続させることができない。
void main() {
var total = 0;
countStream(10).listen(
(n) => total += n,
onDone: () => print(total), //=> 50
onError: (error) => print(error), //=> Exception: error for 5
cancelOnError: false,
);
}
listenで値を受け取る場合はより細かくエラーを受け取ったときの処理を定義できる。onErrorでエラーを受け取ったときの処理を定義でき、cancelOnErrorでエラーを受け取ったときにStreamをcancelするかどうかを設定できる。デフォルトではこれがfalseなので、エラーを受け取っても処理を継続できる。