Fusic Tech Blog

Fusion of Society, IT and Culture

今更ながらKafkaを触ってみた
2020/06/24

今更ながらKafkaを触ってみた

岡嵜です。

先日、LINE ENGINEERINGの以下記事を拝見しました。

これまで、Kafka=メッセージシステムというレベルの認識しか持っていなかったのですが、IoT界隈でもKafkaは採用されうる技術要素です。

いざ使うとなったときに困らないよう、今更ながらKafkaを触ってみました。

構成図

Kafkaを使ったメッセージングを行うシステムの構成図です。

Kafka構成図

Producer(送信者)がBrokerを経由してConsumer(受信者)にメッセージを送ります。構成だけ見るとMQTTに似ていますね。

Kafkaの特徴の一つに「Brokerがスケーラブルである」という点があります。Brokerがスケールアウトして複数台構成となっても、メッセージの送受信を分散処理することでパフォーマンスを維持します。

Broker間の連携を担うのがZppKeeperの役目です。

ちなみに、数日前に出たこちらの記事によると、KafkaをZookeeperに依存しない形にする計画も始まっているようです。 https://www.confluent.io/blog/removing-zookeeper-dependency-in-kafka/

Brokerの起動

今回はlocal環境でdockerコンテナを起動することで、Brokerを立ち上げることにしました。

Kafkaのdocker imageは残念ながら公式のものがなく、スター数が多いwurstmeister/kafka-docker を利用させていただきました。

cloneしてIPアドレスとsockの保存先を修正します。

$ git clone git@github.com:wurstmeister/kafka-docker.git
$ cd kafka-docker
$ vi docker-compose-single-broker.yml
diff --git a/docker-compose-single-broker.yml b/docker-compose-single-broker.yml
index 4d8e9f5..0d00807 100644
--- a/docker-compose-single-broker.yml
+++ b/docker-compose-single-broker.yml
@@ -9,8 +9,8 @@ services:
     ports:
       - "9092:9092"
     environment:
-      KAFKA_ADVERTISED_HOST_NAME: 192.168.99.100
+      KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
       KAFKA_CREATE_TOPICS: "test:1:1"
       KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
     volumes:
-      - /var/run/docker.sock:/var/run/docker.sock
+      - ./docker.sock:/var/run/docker.sock

準備ができたらdocker-composeで、Broker(とZooKeeper)を起動します。

$ docker-compose -f docker-compose-single-broker.yml up

Topicを作成

Kafka CLIを使えるようにするためbrewでインストールします。

$ brew install kafka

以下コマンドを実行すると「test_topic」というTopicが作成されます。

$ kafka-topics --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test_topic

$ kafka-topics --list --zookeeper 127.0.0.1:2181
test_topic

ProducerからConsumerへデータ送信してみる

先にConsumerを起動しておきます。

$ kafka-console-consumer --bootstrap-server=127.0.0.1:9092 --topic=test_topic

次に別のTerminalでProducerを起動します。

$ kafka-console-producer --broker-list=127.0.0.1:9092 --topic=test_topic

>this is a test message.
>hello world!

起動後、任意の文字を入力してEnterすると、その文字がConsumerのTerminalに出力されます。

$ kafka-console-consumer --bootstrap-server=127.0.0.1:9092 --topic=test_topic
this is a test message.
hello world!

Rubyで書いたプログラムでデータ送信してみる

RubyでKafkaを扱う際のフレームワークとして、karafka/karafkaを利用してみましょう。

karafka/example-appというリポジトリが公開されていたので、これをもう少し単純化したものを準備しました。

https://github.com/yuuu/example-app/tree/simple

$ git clone git@github.com:yuuu/example-app.git
$ cd example-app
$ git checkout simple

以下コマンドを入力すると、ConsumerのWorkerが起動します。

$ bundle exec karafka s

以下コマンドを入力すると、Producerのrakeタスクが test message をいうメッセージを3回送信します。

$ bundle exec rake waterdrop:send
I, [2020-05-19T12:01:16.560455 #42930]  INFO -- : Initializing Karafka server 42930
I, [2020-05-19T12:01:16.572583 #42930]  INFO -- : New topics added to target list: test_topic
I, [2020-05-19T12:01:16.572646 #42930]  INFO -- : Fetching cluster metadata from kafka://127.0.0.1:9092
I, [2020-05-19T12:01:16.578173 #42930]  INFO -- : Discovered cluster metadata; nodes: 127.0.0.1:9092 (node_id=1009)
I, [2020-05-19T12:01:16.578533 #42930]  INFO -- : Sending 1 messages to 127.0.0.1:9092 (node_id=1009)
I, [2020-05-19T12:01:16.589281 #42930]  INFO -- : Sending 1 messages to 127.0.0.1:9092 (node_id=1009)
I, [2020-05-19T12:01:16.599346 #42930]  INFO -- : Sending 1 messages to 127.0.0.1:9092 (node_id=1009)
I, [2020-05-19T12:01:16.605608 #42930]  INFO -- : Disconnecting broker 1009

ConsumerのWorkerが受信したメッセージを出力します。

I, [2020-05-19T12:01:16.153869 #42861]  INFO -- : [[example_app_async_pong] {ping: 0; pong: 0}:] There are no partitions to fetch from, sleeping for 1s
I, [2020-05-19T12:01:17.157552 #42861]  INFO -- : [[example_app_async_pong] {ping: 0; pong: 0}:] There are no partitions to fetch from, sleeping for 1s
I, [2020-05-19T12:01:17.236999 #42861]  INFO -- : #<Karafka::Params::ParamsBatch:0x00007f8b7bd38430 @params_array=[{"create_time"=>2020-05-19 12:01:16.572 +0900, "headers"=>{}, "is_control_record"=>false, "key"=>nil, "offset"=>25, "deserializer"=>#<Karafka::Serialization::Json::Deserializer:0x00007f8b7d0a4050>, "partition"=>0, "receive_time"=>2020-05-19 12:01:17.236981 +0900, "topic"=>"test_topic", "payload"=>"test message"}]>
I, [2020-05-19T12:01:17.237137 #42861]  INFO -- : Inline processing of topic test_topic with 1 messages took 0 ms
I, [2020-05-19T12:01:17.237174 #42861]  INFO -- : 1 messages on test_topic topic delegated to PrintConsumer
I, [2020-05-19T12:01:17.248551 #42861]  INFO -- : #<Karafka::Params::ParamsBatch:0x00007f8b7cc5ce98 @params_array=[{"create_time"=>2020-05-19 12:01:16.588 +0900, "headers"=>{}, "is_control_record"=>false, "key"=>nil, "offset"=>26, "deserializer"=>#<Karafka::Serialization::Json::Deserializer:0x00007f8b7d0a4050>, "partition"=>0, "receive_time"=>2020-05-19 12:01:17.248536 +0900, "topic"=>"test_topic", "payload"=>"test message"}]>
I, [2020-05-19T12:01:17.248654 #42861]  INFO -- : Inline processing of topic test_topic with 1 messages took 0 ms
I, [2020-05-19T12:01:17.248709 #42861]  INFO -- : 1 messages on test_topic topic delegated to PrintConsumer
I, [2020-05-19T12:01:17.258411 #42861]  INFO -- : #<Karafka::Params::ParamsBatch:0x00007f8b7bcba300 @params_array=[{"create_time"=>2020-05-19 12:01:16.599 +0900, "headers"=>{}, "is_control_record"=>false, "key"=>nil, "offset"=>27, "deserializer"=>#<Karafka::Serialization::Json::Deserializer:0x00007f8b7d0a4050>, "partition"=>0, "receive_time"=>2020-05-19 12:01:17.258398 +0900, "topic"=>"test_topic", "payload"=>"test message"}]>
I, [2020-05-19T12:01:17.258506 #42861]  INFO -- : Inline processing of topic test_topic with 1 messages took 0 ms
I, [2020-05-19T12:01:17.258531 #42861]  INFO -- : 1 messages on test_topic topic delegated to PrintConsumer
I, [2020-05-19T12:01:18.161370 #42861]  INFO -- : [[example_app_async_pong] {ping: 0; pong: 0}:] There are no partitions to fetch from, sleeping for 1s
I, [2020-05-19T12:01:19.166455 #42861]  INFO -- : [[example_app_async_pong] {ping: 0; pong: 0}:] There are no partitions to fetch from, sleeping for 1s

KarafkaについてはRubyKaigi 2018での発表資料が公開されているのでこちらを読むと理解が深まります。

まとめ

以上、Kafkaを使ったメッセージングをCLIとRubyを使って試してみました。

Karafkaを使うとRailsアプリにも簡単にWorkerを組み込めそうなので、案件でKafkaを使う自信がつきました。

みなさんもぜひお試しください。

yuuu

yuuu

2018年の年明けに組込み畑からやってきた、2児の父 兼 Webエンジニアです。 mockmockの開発・運用を担当しており、組込みエンジニア時代の経験を活かしてデバイスをプログラミングしたり、簡易的なIoTシステムを作ったりしています。主な開発言語はRuby、時々Go。