cp-kafkaでruby-kafkaを試してみる

前回、cp-kafkaを使ってKafkaの検証環境を用意したので、今度はRubyで簡単なproducerとconsumerを書いてみる。

前回用意したdocker-composeからkafka-topicコマンドでgreetingsトピックを作っておく。

% docker-compose exec kafka kafka-topic \
  --create \
  --zookeeper zookeeper:32181 \
  --partitions 1 \
  --replication-factor 1 \
  --topic greetings
  • --partitions 1でこのトピックのパーティション数が1つだけ。
  • --replication-factor 1でパーティションのレプリカの数を表している。この値はパーティション数に対する倍数なので、1の場合はレプリカなしになる。

producer

ruby-kafkaを使って簡単なproducerを作る。

# producer.rb
require "bundler/inline"

gemfile do
  source "https://rubygems.org"
  gem "ruby-kafka"
end

kafka = Kafka.new(["localhost:9092"], client_id: "hello-kafka")
kafka.deliver_message("Hello, World!", key: "hello", topic: "greetings")
  • Kafka.newの第1引数はseed brokerのホスト名のリスト。
  • Kafka.newの第2引数はclient idで、任意だけどクライアントを識別するために使うので指定するのが推奨。
% ruby producer.rb

kafkacatでconsumerを起動してproducerが送った値を受け取る。

% kafka -b localhost:9092 -t greetings
% Auto-selecting consumer mode (use -P or -C to override)
% Reached end of topic greetings [0] at offset 0
Hello, World!
  • -bはbrokerのホストを指す。
  • -tはトピックを指す。

Rubyで書いたproducerから送ったメッセージをconsumerから確認できた。

consumer

同様にruby-kafkaで簡単なconsumerを作る。

# consumer.rb
require "bundler/inline"

gemfile do
  source "https://rubygems.org"
  gem "ruby-kafka"
end

kafka = Kafka.new(["localhost:9092"])
kafka.each_message(topic: "greetings") do |message|
  puts "offset:#{message.offset}\tkey:#{message.key}\tvalue:#{message.value}"
end

kafkacatでproducerを起動してメッセージを送る。

% echo "hello:Hello, World" | kafkacat -b localhost:9092 -t greetings -K :
  • -Kでメッセージとキーを分割するデリミタを指定できる。
% ruby consumer.rb
offset:0    key:hello   value:Hello, World

producerから送ったメッセージをRubyで書いたconsumerで取得することができた。