Streamと仲良くなる

今までなんとなくでStreamを触ってきたけど、そろそろちゃんと理解しておきたいので記事に残すことにした。基本的なところからエラーハンドリングまで調べたり手を動かして検証した。

Streamの生成

async*による生成

Stream<int> countStream(int max) async* {
  for (var i = 0; i <= max; i++) {
    yield i;
  }
}

async*がついた関数を呼ぶとStreamが生成され、関数が終了するとそのStreamは終了する。その間、yieldyield*によってStreamに値を送ることができる。

yieldは値をそのままStreamに送るのに対して、yield*は別のStreamを受け取り、そのStreamから受け取った値をyieldする。

別のStreamから生成

final stream = countStream(10).map((n) => n * 2);

Streamクラスには新しいStreamを生成するための便利なメソッドが用意されている。例えば、以下のようなメソッドが定義されている。

  • map: 引数の関数で変換した値を送る新たなStreamを生成する。
  • where: 引数の条件を満たす値を送る新たなStreamを生成する。
  • take: 最初のN件の値だけを返す新たなStreamを生成する。
  • skip: 最初のN件をスキップした残りの値を返す新たなStreamを生成する。
  • cast: 実行時に型キャストした値を返す新たなStreamを生成する。

StreamControllerによる生成

これだけで1本記事が書けそうなので今回は割愛する。

エラーハンドリング

Stream<int> countStream(int max) async* {
  for (var i = 0; i <= max; i++) {
    if (i == 5) {
      // throw Exception('error for $i') はNG
      yield* Stream.error(Exception('error for $i'));
    } else {
      yield i;
    }
  }
}

Streamの生成時にエラーが発生した場合、エラーをStreamに送る必要がある。throwしてしまうと、Streamを生成する関数自体が例外を投げてしまう。エラーをStreamに送ると、後述するようにhandleError等によって利用側がエラーハンドリングできるようになる。

async*を使って生成する場合、上のようにStream.errorで単一のエラーを送るStreamを作りyield*に渡すことでエラーをStreamに送ることができる(もっと簡単にできる方法があれば教えてください)。map等で別のStreamから生成する場合も同様にできるはず。

Streamの利用

await forによる利用

void main() async {
  var total = 0;
  await for (final n in countStream(10)) {
    total += n;
  }
  print(total);
}

await for文を使うと、Streamから値を受け取るまで待機し、値を受け取ったら処理できる。

Streamクラスのメソッドによる利用

void main() async {
  final total = await countStream(10).reduce((a, b) => a + b);
  print(total);
}

StreamクラスにはStreamの値を処理して結果をFutureとして出力するメソッドが用意されている。例えば、以下のようなメソッドが定義されている。

  • any: Streamの値のいずれかが条件を満たすかを返す。
  • contains: Streamに引数の値が含まれるかを返す。
  • reduce: Streamの値を集約して一つの値を返す。
  • drain: Streamが完了したら引数の値を返し、エラーが発生したらエラーを返す。
  • toList: Streamの値をListにして返す。

listenによる利用

void main() {
  var total = 0;

  countStream(10).listen(
    (n) => total += n,
    onDone: () => print(total),
  );
}

listenメソッドを使うと、Streamの値を受け取ったときの処理、Streamが完了したときの処理などをより汎用的に定義できる。

エラーハンドリング

Streamの生成側のエラーハンドリングで使ったコードを利用する場合を考える。

void main() async {
  var total = 0;

  try {
    await for (final n in countStream(10)) {
      total += n;
    }
  } catch (error) {
    print(error); //=> Exception: error for 5
  }

  print(total); //=> 10
}

await forで値を受け取る場合、途中でエラーを受け取るとawait forはStreamの処理を中止しそのエラーをthrowする。なので、エラーハンドリングするにはawait fortry ... catchで囲む必要がある。そして、エラーを受け取るまでは処理されるが、catchした時点ではループを抜けているため、その後の処理を継続させることができない。

void main() {
  var total = 0;

  countStream(10).listen(
    (n) => total += n,
    onDone: () => print(total), //=> 50
    onError: (error) => print(error), //=> Exception: error for 5
    cancelOnError: false,
  );
}

listenで値を受け取る場合はより細かくエラーを受け取ったときの処理を定義できる。onErrorでエラーを受け取ったときの処理を定義でき、cancelOnErrorでエラーを受け取ったときにStreamをcancelするかどうかを設定できる。デフォルトではこれがfalseなので、エラーを受け取っても処理を継続できる。