AWS Lambda Go 1.x、Kinesis、CloudSearch

前の記事で、Golangで単純なラムダを作成する方法を説明しました。これは、2つのフィールドの単純なオブジェクトを入力として受け取り、同じ単純なオブジェクトを出力として提供します。 ここで、Kinesisをデータソースとしてラムダに接続してタスクを少し複雑にし、Kinesisレコードの処理結果をCloudSearchに転送します。 ラムダには単純化する特別なロジックはありません。Kinesisからのリクエストを受け入れ、CloudWatchに誓約し、変換してCloudSearchに送信するだけです。



画像





関数で受け取ることが予想されるKinesisイベントは次のとおりです。



{ "Records": [ { "awsRegion": "us-east-1", "eventID": "<event-id>", "eventName": "aws:kinesis:record", "eventSource": "aws:kinesis", "eventSourceARN": "arn:aws:kinesis:us-east-1:<xxxxxxxx>:stream/<stream>", "eventVersion": "1.0", "invokeIdentityArn": "arn:aws:iam::<xxxxxx>:role/<role>", "kinesis": { "approximateArrivalTimestamp": <timestamp>, "data": <data>, "partitionKey": "<partionkey>", "sequenceNumber": "<sequenceNumber>", "kinesisSchemaVersion": "1.0" } } ] }
      
      







ここでは、データフィールドに興味があります。 Kinesisからイベントを受信し、データフィールドからデータをログに記録するLambda関数のコードを以下に説明します(コードはここにあります ):



 package main import ( "context" "fmt" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) func handler(ctx context.Context, kinesisEvent events.KinesisEvent) error { for _, record := range kinesisEvent.Records { kinesisRecord := record.Kinesis dataBytes := kinesisRecord.Data dataText := string(dataBytes) fmt.Printf("%s Data = %s \n", record.EventName, dataText) } return nil } func main() { lambda.Start(handler) }
      
      







変更したデータをCloudSearchに記録するには、コードを完成させる必要があります

ここでは、Kinesisから受信したデータを検索ドメイン(CloudSearch)の提出物にコンパイルします。



Kinesisからのデータは、 データフィールドにbase64エンコード形式で送信されます。 デコード後、データには次のフィールドが含まれます。



 type KinesisEventData struct { FilePath string `json:"filePath"` Id int `json:"id"` }
      
      







CloudSearchでは、次のデータを送信します。



 type CloudSearchDocument struct { Directory string `json:"dir"` FileName string `json:"name"` FileExtension string `json:"ext"` }
      
      







この場合、ドキュメントIDにidフィールドを保存します。 CloudSearchにアップロードするデータの準備について詳しくは、 こちらをご覧ください 。 要するに、CloudSearchでは、次の形式のjsonを送信します。



 [ {"type": "add", "id": "12345", "fields": { "dir": ":", "name": "file.txt", "ext": "txt" } } ]
      
      





ここで、type-追加または削除の2つの値を取る要求のタイプ。 id-ドキュメントの識別子。この場合、 Idフィールドのkinesisのオブジェクトに保存されている値。 fields-検索ドメインに保存する名前と値のペア、この場合はCloudSearchDocumentタイプのオブジェクト。



以下のコードは、Kinesisから来たオブジェクトのRecordsコレクションのデータを、CloudSearchにアップロードする準備ができたデータのコレクションに変換します。



 var amasonDocumentsBatch []AmazonDocumentUploadRequest //Preparing data for _, record := range kinesisEvent.Records { kinesisRecord := record.Kinesis dataBytes := kinesisRecord.Data fmt.Printf("%s Data = %s \n", record.EventName, string(dataBytes)) //Deserialize data from kinesis to KinesisEventData var eventData KinesisEventData err := json.Unmarshal(dataBytes, &eventData) if err != nil { return failed(), err } //Convert data to CloudSearch format document := ConvertToCloudSearchDocument(eventData) request := CreateAmazonDocumentUploadRequest(eventData.Id, document) amasonDocumentsBatch = append(amasonDocumentsBatch, request) }
      
      







次のコードは検索ドメインに接続して、以前に準備したデータを検索ドメインにロードします。



 if len(amasonDocumentsBatch) > 0 { fmt.Print("Connecting to cloudsearch...\n") svc := cloudsearchdomain.New(session.New(&aws.Config{ Region: aws.String(os.Getenv("SearchRegion")), Endpoint: aws.String(os.Getenv("SearchEndpoint")), MaxRetries: aws.Int(6), })) fmt.Print("Creating request...\n") batch, err := json.Marshal(amasonDocumentsBatch) if err != nil { return failed(), err } fmt.Printf("Search document = %s \n", batch) params := &cloudsearchdomain.UploadDocumentsInput{ ContentType: aws.String("application/json"), Documents: strings.NewReader(string(batch)), } fmt.Print("Starting to upload...\n") req, resp := svc.UploadDocumentsRequest(params) fmt.Print("Send request...\n") err = req.Send() if err != nil { return failed(), err } fmt.Println(resp) }
      
      







コードをコンパイルするには、aws -sdk-goおよびaws-lambda-goライブラリをロードする必要があります



 go get -u github.com/aws/aws-lambda-go/cmd/build-lambda-zip go get -d github.com/aws/aws-sdk-go/
      
      





ラムダをアセンブルおよびデプロイする方法は前の記事で説明されてますが、ここでは、Lambdaコンソールから環境変数を追加し、新しいテストデータを準備するだけです。



 os.Getenv("SearchRegion") os.Getenv("SearchEndpoint")
      
      







完全なコードはこちらから入手できます



ただし、最初にCloudSearchコンソールを開き、検索ドメインを作成します。 ドメインの場合、最も最小のインスタンスとレプリケーション数= 1を選択します。次に、フィールドdirnameextを作成する必要があります。 これらのフィールドでは、タイプ文字列を選択しますが、リテラルフィールドなど、一部のタイプが異なる場合があります。 しかし、これらはすべて、これらのフィールドの操作方法に依存します。 詳細については、Amazonのドキュメントで詳細を確認してください。



検索ドメインを作成し ([ 新しいドメインを作成 ]ボタン)、名前を入力し、インスタンスのタイプを選択します。



画像



フィールドを作成します。



画像



ドメインは約10分間作成され、アクティブになった後、Lambdaコンソールの環境変数に入力する必要がある検索ドメインのURLがあります。下の画像のようにUrlの前にプロトコルを指定することを忘れないでください。また、検索ドメイン:



画像



IAMコンソールを介してラムダに権利を付与して、Kinesis、CloudWatch、CloudSearchと連携することを忘れないでください。 KinesisはLambdaコンソール経由で接続できます:これを行うには、 トリガー追加ブロックでそれを選択し、フィールドに入力して、リージョンに存在するストリーム、バッチ内のレコード数、読み取りが開始されるストリーム内の位置を示します。 キネシスに接続せずにラムダの動作をテストできます。そのためには、テストを作成し、次の形式のjsonを追加する必要があります。



 { "Records": [ { "awsRegion": "us-east-1", "eventID": "shardId-000000000001:1", "eventName": "aws:kinesis:record", "eventSource": "aws:kinesis", "eventSourceARN": "arn:aws:kinesis:us-east-1:xxx", "eventVersion": "1.0", "invokeIdentityArn": "arn:aws:iam::xxx", "kinesis": { "approximateArrivalTimestamp": 1522222222.06, "data": "eyJpZCI6IDEyMzQ1LCJmaWxlUGF0aCI6ICJDOlxcZmlsZS50eHQifQ==", "partitionKey": "key", "sequenceNumber": "1", "kinesisSchemaVersion": "1.0" } }, { "awsRegion": "us-east-1", "eventID": "shardId-000000000001:1", "eventName": "aws:kinesis:record", "eventSource": "aws:kinesis", "eventSourceARN": "arn:aws:kinesis:us-east-1:xxx", "eventVersion": "1.0", "invokeIdentityArn": "arn:aws:iam::xxx", "kinesis": { "approximateArrivalTimestamp": 1522222222.06, "data": "eyJpZCI6IDEyMzQ2LCJmaWxlUGF0aCI6ICJDOlxcZm9sZGVyXFxmaWxlLnR4dCJ9", "partitionKey": "key", "sequenceNumber": "2", "kinesisSchemaVersion": "1.0" } } ] }
      
      







リンクを使用して、 データフィールドの他の値を生成できます。



画像



作業の結果は、検索ドメインでも表示できます。



画像



追加資料:



Goの検索ドメインでドキュメントを検索するためのコード



次の記事では、Lambda、Kinesis、CloudSearchを自動的に作成して接続するためのCloudFormationスクリプトを検討します。



All Articles