repl.info

Knative EventingがGCP PubSubからメッセージを受け取る様子を観察する

Knative EventingはGCP PubSubをバスとして使用することができる。他にも、Kafkaを使ったりもできるようだ。Knativeのオートスケール機能をminikubeで試したメモと同様動く様子を観察してみて、Serverless的な機能だということを確認できた。なお、今回はGKEを使います。

まずはGKE上にクラスタを構築し、IstioとKnative Servingをインストールする。ドキュメントに従って進めればOKなので省略します。

Kevent Eventingのインストール

これも基本的にはドキュメントに従って進めればいい。コマンド一発でできる。

kubectl apply -f https://storage.googleapis.com/knative-releases/eventing/latest/release.yaml

GCP PubSub

ここからが本題。GCP PubSubと連携するための準備を行う。ドキュメントはの通りに進めるとハマった所があったので手順を記録しておく。

以下のコマンドでGCP PubSubのイベントソースをインストールする。

ko apply -f pkg/sources/gcppubsub/

しかし、以下のようなエラーが出る場合がある。

{"errors":[{"code":"UNAUTHORIZED","message":"You don't have the needed permissions to perform this operation, and you may have invalid credentials. To aut

henticate your request, follow the steps in: https://cloud.google.com/container-registry/docs/advanced-authentication"}]}

インストール時にGCRにイメージをプッシュしようとしていて、権限がなくてエラーとなっているようであった。高度な認証方式を見つつ、以下のコマンドでクレデンシャルを設定し、再度 ko applyすればよい。

gcloud components install docker-credential-gcr

docker-credential-gcr configure-docker

次に、GCP PubSubのトピックを作成。

gcloud pubsub topics create knative-demo

最後にクラスターバスを作成する。stub-bus.yaml内のkindをClusterBusに変更する必要があります。最初読み飛ばしていていた…

diff --git a/config/buses/stub/stub-bus.yaml b/config/buses/stub/stub-bus.yaml

index d39f4c3..e3e0f66 100644

--- a/config/buses/stub/stub-bus.yaml

+++ b/config/buses/stub/stub-bus.yaml

@@ -12,7 +12,7 @@

 # See the License for the specific language governing permissions and

 # limitations under the License.

 apiVersion: channels.knative.dev/v1alpha1

-kind: Bus

+kind: ClusterBus

 metadata:

   name: stub

 spec:

ko applyします。

ko apply -f config/buses/stub/stub-bus.yaml

サービスアカウントの作成

これはドキュメントの通りコマンド実行すればよかった。

ko apply -f sample/gcp_pubsub_function/serviceaccount.yaml

ko apply -f sample/gcp_pubsub_function/serviceaccountbinding.yaml

Route、Configurationの設定

これもドキュメント通りでOK。RouteはどのConfigurationにトラフィックを流すかを決定して、Configurationは実行するFunction(サンプルではGithub上のコードをapply時にビルド、レジストリに保存しているのでコンテナイメージを意識しなくてもよさそう)を決定しているようだ。

ko apply -f sample/gcp_pubsub_function/route.yaml

ko apply -f sample/gcp_pubsub_function/configuration.yaml

Flowの作成

Flowオブジェクトを作ることで、GCP PubSubに送ったメッセージを受け取れるようになる。TrigerでGCP PubSubのトピックを指定して、Actionでメッセージを流すRouteを指定している。なお、flow.yaml内のresourceを、自分のプロジェクトIDやトピック名に書き換える必要がある。

perl -pi -e "s@quantum-reducer-434@XXXXXXXXXX@g" sample/gcp_pubsub_function/flow.yaml

diffの様子。

diff --git a/sample/gcp_pubsub_function/flow.yaml b/sample/gcp_pubsub_function/flow.yaml

index b581856..0567cf6 100644

--- a/sample/gcp_pubsub_function/flow.yaml

+++ b/sample/gcp_pubsub_function/flow.yaml

@@ -21,7 +21,7 @@ spec:

   serviceAccountName: feed-sa

   trigger:

     eventType: google.pubsub.topic.publish

- resource: projects/quantum-reducer-434/topics/knative-demo

+ resource: projects/XXXXXXXXXX/topics/knative-demo

     service: pubsub.googleapis.com

   action:

     target:

applyだ!!!

ko apply -f sample/gcp_pubsub_function/flow.yaml

うまくいくと、GCP PubSubのサブスクリプションが作られる。

⟩ gcloud pubsub subscriptions list

---

ackDeadlineSeconds: 10

messageRetentionDuration: 604800s

name: projects/XXXXXXXXXX/subscriptions/sub-f5fe9dff-52ed-4e72-9d0d-a7ea609af303

pushConfig: {}

topic: projects/XXXXXXXXXX/topics/knative-demo

次はgcloudコマンドを使ってトピックへメッセージを送る。

⟩ gcloud pubsub topics publish knative-demo --message 'test message'

messageIds:

- '151700697138415'

Podのログを見てみると、データを受信していることが確認できる。ドキュメントだと送ったメッセージが表示されているけど、なんだかちゃんと表示されていないね。

⟩ kubectl logs -f pod/gcp-pubsub-function-00001-deployment-59f4d67d7-rjkpf -c user-container

2018/07/25 13:11:05 Received data: "\xb5\xeb-"

2018/07/25 13:11:24 Received data: ""

動作しているコードはfunction.go。メッセージを受け取ってあれこれできそうで、FaaSらしさがでてきたな。なお、gcp-pubsub-functionのPodはしばらくメッセージを受け取らないと終了しているようで、これもFaaSらしいね。(条件は確認しておきたい)。終了した場合でも、メッセージを送るとPodがちゃんと再度作成される。

感想

準備がいろいろ必要だったけど、ある程度形にしてしまえばfunctionとマニフェスト書いて適用、でPubSubとやりとりできるようになるのかな。Knativeのオートスケール機能をminikubeで試したメモを試した時はPaaS的な機能が主だったけど、今回はFaaS的な動きを見ることができておもしろかった。