Knative EventingがGCP PubSubからメッセージを受け取る様子を観察する

Knative EventingはGCP PubSubをバスとして使用することができる。他にも、Kafkaを使ったりもできるようだ。Knativeのオートスケール機能をminikubeで試したメモと同様動く様子を観察してみて、Serverless的な機能だということを確認できた。なお、今回はGKEを使います。

まずはGKE上にクラスタを構築し、IstioとKnative Servingをインストールする。ドキュメントに従って進めればOKなので省略します。

Kevent Eventingのインストール

これも基本的にはドキュメントに従って進めればいい。コマンド一発でできる。

kubectl apply -f https://storage.googleapis.com/knative-releases/eventing/latest/release.yaml

GCP PubSub

ここからが本題。GCP PubSubと連携するための準備を行う。ドキュメントはの通りに進めるとハマった所があったので手順を記録しておく。

以下のコマンドでGCP PubSubのイベントソースをインストールする。

ko apply -f pkg/sources/gcppubsub/

しかし、以下のようなエラーが出る場合がある。

{"errors":[{"code":"UNAUTHORIZED","message":"You don't have the needed permissions to perform this operation, and you may have invalid credentials. To aut
henticate your request, follow the steps in: https://cloud.google.com/container-registry/docs/advanced-authentication"}]}

インストール時にGCRにイメージをプッシュしようとしていて、権限がなくてエラーとなっているようであった。高度な認証方式を見つつ、以下のコマンドでクレデンシャルを設定し、再度 ko applyすればよい。

gcloud components install docker-credential-gcr
docker-credential-gcr configure-docker

次に、GCP PubSubのトピックを作成。

gcloud pubsub topics create knative-demo

最後にクラスターバスを作成する。stub-bus.yaml内のkindをClusterBusに変更する必要があります。最初読み飛ばしていていた…

diff --git a/config/buses/stub/stub-bus.yaml b/config/buses/stub/stub-bus.yaml
index d39f4c3..e3e0f66 100644
--- a/config/buses/stub/stub-bus.yaml
+++ b/config/buses/stub/stub-bus.yaml
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 apiVersion: channels.knative.dev/v1alpha1
-kind: Bus
+kind: ClusterBus
 metadata:
   name: stub
 spec:

ko applyします。

ko apply -f config/buses/stub/stub-bus.yaml

サービスアカウントの作成

これはドキュメントの通りコマンド実行すればよかった。

ko apply -f sample/gcp_pubsub_function/serviceaccount.yaml
ko apply -f sample/gcp_pubsub_function/serviceaccountbinding.yaml

Route、Configurationの設定

これもドキュメント通りでOK。RouteはどのConfigurationにトラフィックを流すかを決定して、Configurationは実行するFunction(サンプルではGithub上のコードをapply時にビルド、レジストリに保存しているのでコンテナイメージを意識しなくてもよさそう)を決定しているようだ。

ko apply -f sample/gcp_pubsub_function/route.yaml
ko apply -f sample/gcp_pubsub_function/configuration.yaml

Flowの作成

Flowオブジェクトを作ることで、GCP PubSubに送ったメッセージを受け取れるようになる。TrigerでGCP PubSubのトピックを指定して、Actionでメッセージを流すRouteを指定している。なお、flow.yaml内のresourceを、自分のプロジェクトIDやトピック名に書き換える必要がある。

perl -pi -e "s@quantum-reducer-434@XXXXXXXXXX@g" sample/gcp_pubsub_function/flow.yaml

diffの様子。

diff --git a/sample/gcp_pubsub_function/flow.yaml b/sample/gcp_pubsub_function/flow.yaml
index b581856..0567cf6 100644
--- a/sample/gcp_pubsub_function/flow.yaml
+++ b/sample/gcp_pubsub_function/flow.yaml
@@ -21,7 +21,7 @@ spec:
   serviceAccountName: feed-sa
   trigger:
     eventType: google.pubsub.topic.publish
-    resource: projects/quantum-reducer-434/topics/knative-demo
+    resource: projects/XXXXXXXXXX/topics/knative-demo
     service: pubsub.googleapis.com
   action:
     target:

applyだ!!!

ko apply -f sample/gcp_pubsub_function/flow.yaml

うまくいくと、GCP PubSubのサブスクリプションが作られる。

⟩ gcloud pubsub subscriptions list
---
ackDeadlineSeconds: 10
messageRetentionDuration: 604800s
name: projects/XXXXXXXXXX/subscriptions/sub-f5fe9dff-52ed-4e72-9d0d-a7ea609af303
pushConfig: {}
topic: projects/XXXXXXXXXX/topics/knative-demo

次はgcloudコマンドを使ってトピックへメッセージを送る。

⟩ gcloud pubsub topics publish knative-demo --message 'test message'
messageIds:
- '151700697138415'

Podのログを見てみると、データを受信していることが確認できる。ドキュメントだと送ったメッセージが表示されているけど、なんだかちゃんと表示されていないね。

⟩ kubectl logs -f pod/gcp-pubsub-function-00001-deployment-59f4d67d7-rjkpf -c user-container
2018/07/25 13:11:05 Received data: "\xb5\xeb-"
2018/07/25 13:11:24 Received data: ""

動作しているコードはfunction.go。メッセージを受け取ってあれこれできそうで、FaaSらしさがでてきたな。なお、gcp-pubsub-functionのPodはしばらくメッセージを受け取らないと終了しているようで、これもFaaSらしいね。(条件は確認しておきたい)。終了した場合でも、メッセージを送るとPodがちゃんと再度作成される。

感想

準備がいろいろ必要だったけど、ある程度形にしてしまえばfunctionとマニフェスト書いて適用、でPubSubとやりとりできるようになるのかな。Knativeのオートスケール機能をminikubeで試したメモを試した時はPaaS的な機能が主だったけど、今回はFaaS的な動きを見ることができておもしろかった。

Leave a Reply

Your email address will not be published. Required fields are marked *