aws環境でfluentd経由を用いたログ基盤構築
aws環境でログ基盤を構築する必要があり、周辺関連の知識がたりなさすぎたので調査した時の勉強メモ。
- lamda関数
- 処理フロー
- クラアント(td-agent)→Kinesis firehose→lamdba→s3
# # lamdba # import boto3 import json import base64 import time import sys import pprint from datetime import datetime def lambda_handler(event, context): firehose = boto3.client('firehose') s3 = boto3.resource('s3') output = [] # # firehose # import pprint; pprint.pprint('firehose------') import pprint; pprint.pprint(firehose) strjson = json.dumps('ccc') encodedata = base64.b64encode(strjson.encode()) output_json = { 'recordId': event['records'][num]['recordId'], 'result': 'Ok', 'data': event['records'][num]['data'], } output.append(output_json) import pprint; pprint.pprint(event) import pprint; pprint.pprint(event['records']) import pprint; pprint.pprint(event['records'][num]['recordId']) # # s3 # bucket = 'testfirehosetokyo' key = 'test/test_' + datetime.now().strftime('%Y-%m-%d-%H-%M-%S') + '.txt' file_contents = str(output) time_info = [] stime = time.time() s3_obj = s3.Object(bucket, key) result = s3_obj.put( Body=file_contents ) etime = time.time() time_info.append(etime - stime) return { 'records': output }
- GlueでETL処理
- 処理フロー
- s3→Glueで既存項目にColumn追加→s3(指定パスへ出力)
# # Glue # import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.dynamicframe import DynamicFrame from pyspark.sql.functions import * from pyspark.sql.functions import lit ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) # 引数からパスを生成 basepath = "s3://testfirehosetokyo/sample_glue_for_result" s3path = basepath + "/test" sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [database = "sample_glue_db", table_name = "10", transformation_ctx = "datasource0"] ## @return: datasource0 ## @inputs: [] datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "sample_glue_db", table_name = "10", transformation_ctx = "datasource0") ## @type: ApplyMapping ## @args: [mapping = [("time", "string", "time", "string"), ("message", "string", "message", "string"), ("worker", "int", "worker", "int")], transformation_ctx = "applymapping1"] ## @return: applymapping1 ## @inputs: [frame = datasource0] applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("time", "string", "time", "string"), ("message", "string", "message", "string"), ("worker", "int", "worker", "int"),("time", "string", "testcol", "string")], transformation_ctx = "applymapping1") ## @type: DataSink ## @args: [connection_type = "s3", connection_options = {"path": "s3://testfirehosetokyo/sample_glue_for_result"}, format = "json", transformation_ctx = "datasink2"] ## @return: datasink2 ## @inputs: [frame = applymapping1] #datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://testfirehosetokyo/sample_glue_for_result"}, format = "json", transformation_ctx = "datasink2") df = applymapping1.toDF() #Column追加 df = df.withColumn('pint2', lit(123)) dyf = DynamicFrame.fromDF(df, glueContext, 'dyf') # DynamicFrameをJSON形式でS3に書き出す glueContext.write_dynamic_frame.from_options( frame=dyf, connection_type='s3', connection_options={'path':s3path}, format='json')
- ログ送信側クラアント設定
- 処理フロー
- クラアント(td-agent)→Kinesis firehose
# # td-agent # yum install redhat-lsb-core yum install aws-kinesis-agent td-agent-gem install fluent-plugin-kinesis ※1x系(version指定) td-agent-gem install fluent-plugin-kinesis --no-document -V -v 1.3.0 [root@vm**** td-agent]# cat teststream.conf <source> type tail path /tmp/testlog.log pos_file /tmp/testlog.pos format none message_key log_line tag teststream.test.test </source> <match teststream.**> type kinesis_firehose region ap-northeast-1 delivery_stream_name teststream flush_interval 1s include_time_key true include_tag_key true buffer_chunk_limit 1m try_flush_interval 0.1 queued_chunk_flush_interval 0.01 num_threads 1 </match>
まずは触ってみたが関連知識が不足しているのが実感できた。。
1歩ずつでも触ってアウトプットして行こう。