Search on the blog

2017年6月27日火曜日

Survey: Building a Real-Time Streaming ETL Pipeline in 20 Minutes

Confluentのブログを読んだ。
最近KafkaまわりのOSSにプルリク送ってみたりして、Confluent熱が個人的に高まっている。

Building a Real-Time Streaming ETL Pipeline in 20 Minutes

以下、感想・考えてみたことなど。

  • Kafkaってなんなのか一言で説明するのは難しいが、このブログにあるように「分散ストリーミング基盤」というのが良さそう。
  • Streaming ETLという概念がやばい。Batch ETLでやっているような集計処理をバックグラウンドでリアルタイムに実行できるやつ。
    • ブログの下の方で引用されているが、簡単なサンプルアプリがあるので見てみるとイメージがわく
  • 「複数のアプリが同じデータソースを参照していると密結合になる」みたいな話を聞いたことがあったが、その感覚が分かった。例えば、アプリAがデータ生成するテーブルAがあったとして、これをみたい他のアプリB, C, D, ..があったとする。アプリB, C, D, ..からテーブルAを直接参照すると以下のような問題が起こる。
    • 参照者が増えてくると、テーブルAのアクセス負荷が増える(Producer:Consumer = 1:N)
    • アプリAが自由にテーブルAのスキーマを変更できない(テーブルAの変更は参照側のアプリに影響を与える)
    • 参照で集計処理をしたい場合は、日次バッチになってしまう(負荷が高いのでリアルタイムで集計できない)

2017年6月24日土曜日

VagrantのゲストOS間でホスト名参照

 Vagrantfileでホスト名を指定してるのに何故か参照できなかった。

# -*- mode: ruby -*-
# vi: set ft=ruby :

Vagrant.configure("2") do |config|
  config.vm.box = "ubuntu15.04"
  config.ssh.insert_key = false
  config.vm.define "kafka-base" do |server|
    server.vm.network "private_network", ip: "192.168.33.11"
    server.vm.hostname = "kafka-base"
  end
  config.vm.define "kafka-connector" do |server|
    server.vm.network "private_network", ip: "192.168.33.12"
    server.vm.hostname = "kafka-connector"
  end
end

vagrant@kafka-base:~$ hostname
kafka-base
vagrant@kafka-base:~$ ping kafka-connector
ping: unknown host kafka-connector
vagrant@kafka-base:~$ cat /etc/hosts
127.0.0.1 kafka-base  kafka-base
127.0.0.1 localhost
127.0.1.1 vagrant-ubuntu-trusty.vagrantup.com vagrant-ubuntu-trusty
::1     localhost ip6-localhost ip6-loopback
ff02::1 ip6-allnodes
ff02::2 ip6-allrouters

vagrant-hostsというプラグインを入れてみると解決した。
$ vagrant plugin install vagrant-hosts

以下のようにserver.vm.provisionの行を追加すると、ホスト名が設定され、さらにゲストOSの/etc/hostsに仮想マシンのホスト名情報が追記される。
# -*- mode: ruby -*-
# vi: set ft=ruby :

Vagrant.configure("2") do |config|
  config.vm.box = "ubuntu15.04"
  config.ssh.insert_key = false

  config.vm.define "kafka-base" do |server|
    server.vm.network "private_network", ip: "192.168.33.11"
    server.vm.provision :hosts, :sync_hosts => true
  end
  config.vm.define "kafka-connector" do |server|
    server.vm.network "private_network", ip: "192.168.33.12"
    server.vm.provision :hosts, :sync_hosts => true
  end
end

以下のように動作確認してみると、ホスト 名でゲストOS間の通信ができることが分かる。
vagrant@kafka-base:~$ hostname
kafka-base
vagrant@kafka-base:~$ ping kafka-connector
PING kafka-connector (192.168.33.12) 56(84) bytes of data.
64 bytes from kafka-connector (192.168.33.12): icmp_seq=1 ttl=64 time=0.470 ms
64 bytes from kafka-connector (192.168.33.12): icmp_seq=2 ttl=64 time=0.348 ms
^C
--- kafka-connector ping statistics ---
2 packets transmitted, 2 received, 0% packet loss, time 1002ms
rtt min/avg/max/mdev = 0.348/0.409/0.470/0.061 ms
vagrant@kafka-base:~$ cat /etc/hosts
127.0.0.1 localhost
127.0.1.1 kafka-base
192.168.33.11 kafka-base
192.168.33.12 kafka-connector

Vagrantで共通のssh private keyを使う

 Vagrantで複数の仮想マシンを立ち上げて、仮想マシン間でsshの秘密鍵を共有したい場合がある。Vagrant 1.8でのデフォルトの設定ではマシンごとに異なる秘密鍵が生成される。

# -*- mode: ruby -*-
# vi: set ft=ruby :

Vagrant.configure("2") do |config|
  config.vm.box = "ubuntu15.04"
  config.vm.define "kafka-base" do |server|
    server.vm.network "private_network", ip: "192.168.33.11"
  end
  config.vm.define "kafka-connector" do |server|
    server.vm.network "private_network", ip: "192.168.33.12"
  end
end

秘密鍵情報を確認。仮想マシンごとに別の秘密鍵が生成されていることが分かる。
$ vagrant ssh-config
Host kafka-base
  HostName 127.0.0.1
  User vagrant
  Port 2200
  UserKnownHostsFile /dev/null
  StrictHostKeyChecking no
  PasswordAuthentication no
  IdentityFile /Users/kenjih/work/vagrant_ansible_kafka/vagrant/.vagrant/machines/kafka-base/virtualbox/private_key
  IdentitiesOnly yes
  LogLevel FATAL

Host kafka-connector
  HostName 127.0.0.1
  User vagrant
  Port 2201
  UserKnownHostsFile /dev/null
  StrictHostKeyChecking no
  PasswordAuthentication no
  IdentityFile /Users/kenjih/work/vagrant_ansible_kafka/vagrant/.vagrant/machines/kafka-connector/virtualbox/private_key
  IdentitiesOnly yes
  LogLevel FATAL

 しかし、デプロイ自動化を行う場合など、共通の秘密鍵を使えると便利なことが多い。そのような場合は以下のようにconfig.ssh.insert_key = falseという設定を追加すればよい。

# -*- mode: ruby -*-
# vi: set ft=ruby :

Vagrant.configure("2") do |config|
  config.vm.box = "ubuntu15.04"
  config.ssh.insert_key = false
  config.vm.define "kafka-base" do |server|
    server.vm.network "private_network", ip: "192.168.33.11"
  end
  config.vm.define "kafka-connector" do |server|
    server.vm.network "private_network", ip: "192.168.33.12"
  end
end

秘密鍵情報を確認してみると、共通の秘密鍵を利用できることが分かる。
kenjih$ vagrant ssh-config
Host kafka-base
  HostName 127.0.0.1
  User vagrant
  Port 2200
  UserKnownHostsFile /dev/null
  StrictHostKeyChecking no
  PasswordAuthentication no
  IdentityFile /Users/kenjih/.vagrant.d/insecure_private_key
  IdentitiesOnly yes
  LogLevel FATAL

Host kafka-connector
  HostName 127.0.0.1
  User vagrant
  Port 2201
  UserKnownHostsFile /dev/null
  StrictHostKeyChecking no
  PasswordAuthentication no
  IdentityFile /Users/kenjih/.vagrant.d/insecure_private_key
  IdentitiesOnly yes
  LogLevel FATAL

2017年6月11日日曜日

VagrantとAnsibleでKafka環境をつくる(8)

 Vagrant、Ansibleで作ったKafka環境で動作確認をしてみる。
以下の操作はすべてプロジェクトのrootディレクトリから実行するものとします。

まずvagrantで仮想マシンを起動する。
$ cd vagrant
$ vagrant up

続いてansibleでplaybookを適用する。
$ cd ansible
$ ansible-playbook site.yml 

まずはrest proxyと疎通確認を行う。
$ curl 192.168.33.11:8082/topics
["_schemas"]

rest proxy経由でKafkaにデータを送ってみる。
$ curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" \
 --data '{"records":[{"value":{"name": "testUser"}}]}' \
      "192.168.33.11:8082/topics/jsontest"
{"offsets":[{"partition":0,"offset":0,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":null}

consumerを作成し、先ほど作成したjsontestトピックをサブスクライブする。
$ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" -H "Accept: application/vnd.kafka.v2+json" \
--data '{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "earliest"}' \
http://192.168.33.11:8082/consumers/my_json_consumer
{"instance_id":"my_consumer_instance","base_uri":"http://192.168.33.11:8082/consumers/my_json_consumer/instances/my_consumer_instance"}

$ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["jsontest"]}' \
http://192.168.33.11:8082/consumers/my_json_consumer/instances/my_consumer_instance/subscription

もう1つメッセージを送ってみる。
$ curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" \
 --data '{"records":[{"value":{"name": "testUserXXX"}}]}' \
      "192.168.33.11:8082/topics/jsontest"
{"offsets":[{"partition":0,"offset":1,"error_code":null,"error":null}],"key_schema_id":null,"value_schema_id":null}

メッセージをコンシュームする。
$ curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \       
http://192.168.33.11:8082/consumers/my_json_consumer/instances/my_consumer_instance/records                               
[{"key":null,"value":{"name":"testUser"},"partition":0,"offset":0,"topic":"jsontest"},{"key":null,"value":{"name":"testUserXXX"},"partition":0,"offset":1,"topic":"jsontest"}]

データの送受信を確認できたので、テスト用のコンシューマーを削除する。
$ curl -X DELETE -H "Accept: application/vnd.kafka.v2+json" \
      http://192.168.33.11:8082/consumers/my_json_consumer/instances/my_consumer_instance

VagrantとAnsibleでKafka環境をつくる(7)

 前回マニュアルで試したsystemdからconfluentコンポーネントを起動するための設定をansible化した(成果物)。利用するコンポーネントは以下の4つなので、それぞれのコンポーネント別にroleを作った。

 serviceの設定スクリプトは、Jinja2を使ってtemplatesディレクトリ配下に置いている。taskではtemplatesに置かれたテンプレートを/etc/systemd/system/配下に格納する。設定が変更されると、handlerで指定したようにserviceが起動される。

 以下にzookeeper roleの場合のサンプルを載せておく。

ansible/roles/zookeeper/templates/zookeeper.service.j2
[Unit]
Description=confluent platform zookeeper
After=network.target

[Service]
ExecStart=/usr/bin/zookeeper-server-start /etc/kafka/zookeeper.properties
ExecStop=/usr/bin/zookeeper-server-stop

[Install]
WantedBy=multi-user.target

ansible/roles/zookeeper/tasks/main.yml
- name: zookeeper systemd script
  template:
    src: zookeeper.service.j2
    dest: /etc/systemd/system/zookeeper.service
    owner: root
    group: root
    mode: 644
  notify: start zookeeper

ansible/roles/zookeeper/handlers/main.yml
- name: start zookeeper
  service: name=zookeeper state=started enabled=yes

2017年6月5日月曜日

VagrantとAnsibleでKafka環境をつくる(6)

 zookeeperをサービス登録して、systemdから起動させるようにしてみた。
Ansible化はまだ出来ていないが、とりあえず手動で設定&起動できた。

以下のようなファイルを作っておく。
vagrant@vagrant-ubuntu-trusty:~$ cat /etc/systemd/system/zookeeper.service 
[Unit]
Description=confluent platform zookeeper
After=network.target

[Service]
ExecStart=/usr/bin/zookeeper-server-start /etc/kafka/zookeeper.properties
ExecStop=/usr/bin/zookeeper-server-stop

[Install]
WantedBy=multi-user.target

After=network.targetとすることで、ネットワーク起動後にサービスを開始させるという意味になる。
WantedByのところには、ランレベルを設定する。
multi-user.targetとするとマルチユーザモードで使用されるサービスとなる。

ちなみに、targetの一覧は以下のようにして参照できる。
vagrant@vagrant-ubuntu-trusty:~$ systemctl list-units --type target
UNIT                  LOAD   ACTIVE SUB    DESCRIPTION
basic.target          loaded active active Basic System
cryptsetup.target     loaded active active Encrypted Volumes
getty.target          loaded active active Login Prompts
graphical.target      loaded active active Graphical Interface
local-fs-pre.target   loaded active active Local File Systems (Pre)
local-fs.target       loaded active active Local File Systems
multi-user.target     loaded active active Multi-User System
network-online.target loaded active active Network is Online
network.target        loaded active active Network
nfs-client.target     loaded active active NFS client services
paths.target          loaded active active Paths
remote-fs-pre.target  loaded active active Remote File Systems (Pre)
remote-fs.target      loaded active active Remote File Systems
rpcbind.target        loaded active active RPC Port Mapper
slices.target         loaded active active Slices
sockets.target        loaded active active Sockets
swap.target           loaded active active Swap
sysinit.target        loaded active active System Initialization
time-sync.target      loaded active active System Time Synchronized
timers.target         loaded active active Timers

サービスとして認識しているか確認。
vagrant@vagrant-ubuntu-trusty:~$ sudo systemctl list-unit-files --type=service | grep zookeeper
zookeeper.service                      disabled

サービス有効化。
vagrant@vagrant-ubuntu-trusty:~$  sudo systemctl enable zookeeper
Created symlink from /etc/systemd/system/multi-user.target.wants/zookeeper.service to /etc/systemd/system/zookeeper.service.

サービス起動&確認。
vagrant@vagrant-ubuntu-trusty:~$ sudo systemctl start zookeeper

vagrant@vagrant-ubuntu-trusty:~$ sudo systemctl status zookeeper
● zookeeper.service - confluent platform zookeeper
   Loaded: loaded (/etc/systemd/system/zookeeper.service; enabled; vendor preset: enabled)
   Active: active (running) since Mon 2017-06-05 14:22:49 GMT; 16s ago

デフォルトポートでlistenしているか念のため確認。
vagrant@vagrant-ubuntu-trusty:~$ pgrep -f zookeeper
4542
vagrant@vagrant-ubuntu-trusty:~$ sudo lsof -a -i -p 4542
COMMAND  PID USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
java    4542 root   98u  IPv6  25718      0t0  TCP *:56431 (LISTEN)
java    4542 root  109u  IPv6  25729      0t0  TCP *:2181 (LISTEN)

VagrantとAnsibleでKafka環境をつくる(5)

 前回Javaが入ったので、Kafka環境を入れてみた(成果物)。
KafkaはConfluent Platformを使う。手動でインストールすると以下のようになる。

# install confluent public key
$ wget -qO - http://packages.confluent.io/deb/3.2/archive.key | sudo apt-key add -

# add confluent repository
$ sudo add-apt-repository "deb [arch=amd64] http://packages.confluent.io/deb/3.2 stable main"

# update apt
$ sudo apt-get update 

# install confluent platform
$ sudo apt-get install confluent-platform-2.11

上のインストール作業を行うplaybookは以下のようになる。

[ansible/roles/common/tasks/install_confluent.yml]
- name: install confluent public key
  apt_key: url="http://packages.confluent.io/deb/{{ confluent_repo_version }}/archive.key" state="present"

- name: add confluent repository
  apt_repository: repo="deb [arch=amd64] http://packages.confluent.io/deb/{{ confluent_repo_version }} stable main"

- name: update apt
  apt: update_cache=true

- name: install confluent platform
  apt: "name=confluent-platform-2.11={{ confluent_package_version }} state=present"

{{ XXX }}のところは変数になっている。変数の値は以下のようにvarsディレクトリ内のファイルで定義できる。

[ansible/roles/common/vars/main.yml]
confluent_repo_version: 3.2
confluent_package_version: 3.2.1-1