:>/dev/null

ラガードエンジニアの不撓不屈の精神/unlearning/go beyond

aws環境でfluentd経由を用いたログ基盤構築

aws環境でログ基盤を構築する必要があり、周辺関連の知識がたりなさすぎたので調査した時の勉強メモ。

  • lamda関数
    • 処理フロー
    • クラアント(td-agent)→Kinesis firehose→lamdba→s3 f:id:oguguman:20181103205729p:plain
#
# lamdba
#

import boto3
import json
import base64

import time
import sys 
import pprint
from datetime import datetime

def lambda_handler(event, context):
    firehose = boto3.client('firehose')
    s3 = boto3.resource('s3')
    output = []
    
#
# firehose
# 

        import pprint; pprint.pprint('firehose------')
        import pprint; pprint.pprint(firehose)
        
        strjson = json.dumps('ccc')
        encodedata = base64.b64encode(strjson.encode())
        output_json = {
            'recordId': event['records'][num]['recordId'],
            'result': 'Ok',
            'data': event['records'][num]['data'],
        }
        output.append(output_json)
        
        import pprint; pprint.pprint(event)
        import pprint; pprint.pprint(event['records'])
        import pprint; pprint.pprint(event['records'][num]['recordId'])

#
# s3
#

        bucket = 'testfirehosetokyo'
        key = 'test/test_' + datetime.now().strftime('%Y-%m-%d-%H-%M-%S') + '.txt'
        file_contents = str(output)
    
        time_info = []
        stime = time.time()
        s3_obj = s3.Object(bucket, key)
        result = s3_obj.put( Body=file_contents )
        etime = time.time()
        time_info.append(etime - stime)


    return {
        'records': output
    }      
  • GlueでETL処理
    • 処理フロー
    • s3→Glueで既存項目にColumn追加→s3(指定パスへ出力)
      f:id:oguguman:20181104132207p:plain
#
# Glue
#

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import *
from pyspark.sql.functions import lit

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

# 引数からパスを生成
basepath = "s3://testfirehosetokyo/sample_glue_for_result"
s3path = basepath + "/test"

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "sample_glue_db", table_name = "10", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "sample_glue_db", table_name = "10", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("time", "string", "time", "string"), ("message", "string", "message", "string"), ("worker", "int", "worker", "int")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("time", "string", "time", "string"), ("message", "string", "message", "string"), ("worker", "int", "worker", "int"),("time", "string", "testcol", "string")], transformation_ctx = "applymapping1")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://testfirehosetokyo/sample_glue_for_result"}, format = "json", transformation_ctx = "datasink2"]
## @return: datasink2
## @inputs: [frame = applymapping1]
#datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://testfirehosetokyo/sample_glue_for_result"}, format = "json", transformation_ctx = "datasink2")

df = applymapping1.toDF()
#Column追加
df = df.withColumn('pint2', lit(123))
dyf = DynamicFrame.fromDF(df, glueContext, 'dyf')

# DynamicFrameをJSON形式でS3に書き出す 
glueContext.write_dynamic_frame.from_options( 
    frame=dyf, 
    connection_type='s3', 
    connection_options={'path':s3path}, 
    format='json')
  • ログ送信側クラアント設定
    • 処理フロー
    • クラアント(td-agent)→Kinesis firehose
      f:id:oguguman:20181104132054p:plain
#
# td-agent
#

yum install redhat-lsb-core
yum install aws-kinesis-agent
td-agent-gem install fluent-plugin-kinesis

※1x系(version指定)
td-agent-gem install fluent-plugin-kinesis --no-document -V -v 1.3.0


[root@vm**** td-agent]# cat teststream.conf
<source>
  type tail
  path /tmp/testlog.log
  pos_file /tmp/testlog.pos
  format none
  message_key log_line
  tag teststream.test.test
</source>

<match teststream.**>
  type kinesis_firehose

  region ap-northeast-1
  delivery_stream_name teststream
  flush_interval 1s
  include_time_key true
  include_tag_key true
  buffer_chunk_limit 1m
  try_flush_interval 0.1
  queued_chunk_flush_interval 0.01
  num_threads 1
</match>

まずは触ってみたが関連知識が不足しているのが実感できた。。
1歩ずつでも触ってアウトプットして行こう。