Kafka Streams入門3(Window)

前回では簡単なstatefulな操作を扱ったので、今回はwindowを伴う操作を動かしてみた。

Hopping time windows

Hopping time windowはお互いに重なり合う一定期間のtime windowのことで、期間のことをwindow size、time windowが生成される感覚をhopと呼ぶ。

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("streams-plaintext-input");
source.groupByKey()
        .windowedBy(
          TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1))
                     .advanceBy(Duration.ofSeconds(30))
        )
        .count()
        .toStream()
        .print(Printed.toSysOut());
[KTABLE-TOSTREAM-0000000002]: [a@1645928070000/1645928130000], 1
[KTABLE-TOSTREAM-0000000002]: [a@1645928100000/1645928160000], 1
[KTABLE-TOSTREAM-0000000002]: [a@1645928070000/1645928130000], 2
  • 1645928070000のような数値はミリ秒単位のunixtimeで、[a@1645928070000/1645928130000]はキーaの1分間の(window size = 1分)time windowを表している。
  • 確かに30秒ずつずれた(hop = 30秒)time windowごとに集計されていることがわかる。

Tumbling time windows

Tumbling time windowsはお互いに重なり合わない一定期間のtime windowのことで、window sizeとhopが同じ値のhopping time windowsとも言える。

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("streams-plaintext-input");
source.groupByKey()
        .windowedBy(
          TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1))
        )
        .count()
        .toStream()
        .print(Printed.toSysOut());
[KTABLE-TOSTREAM-0000000002]: [a@1645967820000/1645967880000], 2
[KTABLE-TOSTREAM-0000000002]: [a@1645967880000/1645967940000], 1
  • 1分間のtime windowで、重複がないことがわかる。

Sliding time windows

joinで使われるようなので、joinのときにあらためて理解したい。

Session windows

Session windowsは一連のレコードをセッション化するためのwindowで、あるレコードから一定期間(inactivity gap)内に同じキーをもつレコードを受け取ると同じwindowに含める。ユーザー行動分析などの用途で便利。

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> source = builder.stream("streams-plaintext-input");
source.groupByKey()
        .windowedBy(
          SessionWindows.ofInactivityGapWithNoGrace(Duration.ofSeconds(30))
        )
        .count(Materialized.as("wordcounts"))
        .toStream()
        .print(Printed.toSysOut());
[KTABLE-TOSTREAM-0000000002]: [a@1646006322519/1646006327264], 3
[KTABLE-TOSTREAM-0000000002]: [a@1646006357869/1646006357869], 1
  • 最後のレコードを受け取ってから30秒後に下流にレコードが送られているようだ。