Table of Contents
構成図
Kafkaを使ったメッセージングを行うシステムの構成図です。
Producer(送信者)がBrokerを経由してConsumer(受信者)にメッセージを送ります。構成だけ見るとMQTTに似ていますね。
Kafkaの特徴の一つに「Brokerがスケーラブルである」という点があります。Brokerがスケールアウトして複数台構成となっても、メッセージの送受信を分散処理することでパフォーマンスを維持します。
Broker間の連携を担うのがZppKeeperの役目です。
ちなみに、数日前に出たこちらの記事によると、KafkaをZookeeperに依存しない形にする計画も始まっているようです。 https://www.confluent.io/blog/removing-zookeeper-dependency-in-kafka/
Brokerの起動
今回はlocal環境でdockerコンテナを起動することで、Brokerを立ち上げることにしました。
Kafkaのdocker imageは残念ながら公式のものがなく、スター数が多いwurstmeister/kafka-docker を利用させていただきました。
cloneしてIPアドレスとsockの保存先を修正します。
$ git clone git@github.com:wurstmeister/kafka-docker.git
$ cd kafka-docker
$ vi docker-compose-single-broker.yml
diff --git a/docker-compose-single-broker.yml b/docker-compose-single-broker.yml
index 4d8e9f5..0d00807 100644
--- a/docker-compose-single-broker.yml
+++ b/docker-compose-single-broker.yml
@@ -9,8 +9,8 @@ services:
ports:
- "9092:9092"
environment:
- KAFKA_ADVERTISED_HOST_NAME: 192.168.99.100
+ KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_CREATE_TOPICS: "test:1:1"
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- - /var/run/docker.sock:/var/run/docker.sock
+ - ./docker.sock:/var/run/docker.sock
準備ができたらdocker-composeで、Broker(とZooKeeper)を起動します。
$ docker-compose -f docker-compose-single-broker.yml up
Topicを作成
Kafka CLIを使えるようにするためbrewでインストールします。
$ brew install kafka
以下コマンドを実行すると「test_topic」というTopicが作成されます。
$ kafka-topics --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test_topic
$ kafka-topics --list --zookeeper 127.0.0.1:2181
test_topic
ProducerからConsumerへデータ送信してみる
先にConsumerを起動しておきます。
$ kafka-console-consumer --bootstrap-server=127.0.0.1:9092 --topic=test_topic
次に別のTerminalでProducerを起動します。
$ kafka-console-producer --broker-list=127.0.0.1:9092 --topic=test_topic
>this is a test message.
>hello world!
起動後、任意の文字を入力してEnterすると、その文字がConsumerのTerminalに出力されます。
$ kafka-console-consumer --bootstrap-server=127.0.0.1:9092 --topic=test_topic
this is a test message.
hello world!
Rubyで書いたプログラムでデータ送信してみる
RubyでKafkaを扱う際のフレームワークとして、karafka/karafkaを利用してみましょう。
karafka/example-appというリポジトリが公開されていたので、これをもう少し単純化したものを準備しました。
$ git clone git@github.com:yuuu/example-app.git
$ cd example-app
$ git checkout simple
以下コマンドを入力すると、ConsumerのWorkerが起動します。
$ bundle exec karafka s
以下コマンドを入力すると、Producerのrakeタスクが test message
をいうメッセージを3回送信します。
$ bundle exec rake waterdrop:send
I, [2020-05-19T12:01:16.560455 #42930] INFO -- : Initializing Karafka server 42930
I, [2020-05-19T12:01:16.572583 #42930] INFO -- : New topics added to target list: test_topic
I, [2020-05-19T12:01:16.572646 #42930] INFO -- : Fetching cluster metadata from kafka://127.0.0.1:9092
I, [2020-05-19T12:01:16.578173 #42930] INFO -- : Discovered cluster metadata; nodes: 127.0.0.1:9092 (node_id=1009)
I, [2020-05-19T12:01:16.578533 #42930] INFO -- : Sending 1 messages to 127.0.0.1:9092 (node_id=1009)
I, [2020-05-19T12:01:16.589281 #42930] INFO -- : Sending 1 messages to 127.0.0.1:9092 (node_id=1009)
I, [2020-05-19T12:01:16.599346 #42930] INFO -- : Sending 1 messages to 127.0.0.1:9092 (node_id=1009)
I, [2020-05-19T12:01:16.605608 #42930] INFO -- : Disconnecting broker 1009
ConsumerのWorkerが受信したメッセージを出力します。
I, [2020-05-19T12:01:16.153869 #42861] INFO -- : [[example_app_async_pong] {ping: 0; pong: 0}:] There are no partitions to fetch from, sleeping for 1s
I, [2020-05-19T12:01:17.157552 #42861] INFO -- : [[example_app_async_pong] {ping: 0; pong: 0}:] There are no partitions to fetch from, sleeping for 1s
I, [2020-05-19T12:01:17.236999 #42861] INFO -- : #<Karafka::Params::ParamsBatch:0x00007f8b7bd38430 @params_array=[{"create_time"=>2020-05-19 12:01:16.572 +0900, "headers"=>{}, "is_control_record"=>false, "key"=>nil, "offset"=>25, "deserializer"=>#<Karafka::Serialization::Json::Deserializer:0x00007f8b7d0a4050>, "partition"=>0, "receive_time"=>2020-05-19 12:01:17.236981 +0900, "topic"=>"test_topic", "payload"=>"test message"}]>
I, [2020-05-19T12:01:17.237137 #42861] INFO -- : Inline processing of topic test_topic with 1 messages took 0 ms
I, [2020-05-19T12:01:17.237174 #42861] INFO -- : 1 messages on test_topic topic delegated to PrintConsumer
I, [2020-05-19T12:01:17.248551 #42861] INFO -- : #<Karafka::Params::ParamsBatch:0x00007f8b7cc5ce98 @params_array=[{"create_time"=>2020-05-19 12:01:16.588 +0900, "headers"=>{}, "is_control_record"=>false, "key"=>nil, "offset"=>26, "deserializer"=>#<Karafka::Serialization::Json::Deserializer:0x00007f8b7d0a4050>, "partition"=>0, "receive_time"=>2020-05-19 12:01:17.248536 +0900, "topic"=>"test_topic", "payload"=>"test message"}]>
I, [2020-05-19T12:01:17.248654 #42861] INFO -- : Inline processing of topic test_topic with 1 messages took 0 ms
I, [2020-05-19T12:01:17.248709 #42861] INFO -- : 1 messages on test_topic topic delegated to PrintConsumer
I, [2020-05-19T12:01:17.258411 #42861] INFO -- : #<Karafka::Params::ParamsBatch:0x00007f8b7bcba300 @params_array=[{"create_time"=>2020-05-19 12:01:16.599 +0900, "headers"=>{}, "is_control_record"=>false, "key"=>nil, "offset"=>27, "deserializer"=>#<Karafka::Serialization::Json::Deserializer:0x00007f8b7d0a4050>, "partition"=>0, "receive_time"=>2020-05-19 12:01:17.258398 +0900, "topic"=>"test_topic", "payload"=>"test message"}]>
I, [2020-05-19T12:01:17.258506 #42861] INFO -- : Inline processing of topic test_topic with 1 messages took 0 ms
I, [2020-05-19T12:01:17.258531 #42861] INFO -- : 1 messages on test_topic topic delegated to PrintConsumer
I, [2020-05-19T12:01:18.161370 #42861] INFO -- : [[example_app_async_pong] {ping: 0; pong: 0}:] There are no partitions to fetch from, sleeping for 1s
I, [2020-05-19T12:01:19.166455 #42861] INFO -- : [[example_app_async_pong] {ping: 0; pong: 0}:] There are no partitions to fetch from, sleeping for 1s
KarafkaについてはRubyKaigi 2018での発表資料が公開されているのでこちらを読むと理解が深まります。
まとめ
以上、Kafkaを使ったメッセージングをCLIとRubyを使って試してみました。
Karafkaを使うとRailsアプリにも簡単にWorkerを組み込めそうなので、案件でKafkaを使う自信がつきました。
みなさんもぜひお試しください。