:>/dev/null

ラガードエンジニアの不撓不屈の精神/unlearning/go beyond

aws環境でfluentd経由を用いたログ基盤構築

aws環境でログ基盤を構築する必要があり、周辺関連の知識がたりなさすぎたので調査した時の勉強メモ。

  • lamda関数
    • 処理フロー
    • クラアント(td-agent)→Kinesis firehose→lamdba→s3 f:id:oguguman:20181103205729p:plain
#
# lamdba
#

import boto3
import json
import base64

import time
import sys 
import pprint
from datetime import datetime

def lambda_handler(event, context):
    firehose = boto3.client('firehose')
    s3 = boto3.resource('s3')
    output = []
    
#
# firehose
# 

        import pprint; pprint.pprint('firehose------')
        import pprint; pprint.pprint(firehose)
        
        strjson = json.dumps('ccc')
        encodedata = base64.b64encode(strjson.encode())
        output_json = {
            'recordId': event['records'][num]['recordId'],
            'result': 'Ok',
            'data': event['records'][num]['data'],
        }
        output.append(output_json)
        
        import pprint; pprint.pprint(event)
        import pprint; pprint.pprint(event['records'])
        import pprint; pprint.pprint(event['records'][num]['recordId'])

#
# s3
#

        bucket = 'testfirehosetokyo'
        key = 'test/test_' + datetime.now().strftime('%Y-%m-%d-%H-%M-%S') + '.txt'
        file_contents = str(output)
    
        time_info = []
        stime = time.time()
        s3_obj = s3.Object(bucket, key)
        result = s3_obj.put( Body=file_contents )
        etime = time.time()
        time_info.append(etime - stime)


    return {
        'records': output
    }      
  • GlueでETL処理
    • 処理フロー
    • s3→Glueで既存項目にColumn追加→s3(指定パスへ出力)
      f:id:oguguman:20181104132207p:plain
#
# Glue
#

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import *
from pyspark.sql.functions import lit

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

# 引数からパスを生成
basepath = "s3://testfirehosetokyo/sample_glue_for_result"
s3path = basepath + "/test"

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "sample_glue_db", table_name = "10", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "sample_glue_db", table_name = "10", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("time", "string", "time", "string"), ("message", "string", "message", "string"), ("worker", "int", "worker", "int")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("time", "string", "time", "string"), ("message", "string", "message", "string"), ("worker", "int", "worker", "int"),("time", "string", "testcol", "string")], transformation_ctx = "applymapping1")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://testfirehosetokyo/sample_glue_for_result"}, format = "json", transformation_ctx = "datasink2"]
## @return: datasink2
## @inputs: [frame = applymapping1]
#datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://testfirehosetokyo/sample_glue_for_result"}, format = "json", transformation_ctx = "datasink2")

df = applymapping1.toDF()
#Column追加
df = df.withColumn('pint2', lit(123))
dyf = DynamicFrame.fromDF(df, glueContext, 'dyf')

# DynamicFrameをJSON形式でS3に書き出す 
glueContext.write_dynamic_frame.from_options( 
    frame=dyf, 
    connection_type='s3', 
    connection_options={'path':s3path}, 
    format='json')
  • ログ送信側クラアント設定
    • 処理フロー
    • クラアント(td-agent)→Kinesis firehose
      f:id:oguguman:20181104132054p:plain
#
# td-agent
#

yum install redhat-lsb-core
yum install aws-kinesis-agent
td-agent-gem install fluent-plugin-kinesis

※1x系(version指定)
td-agent-gem install fluent-plugin-kinesis --no-document -V -v 1.3.0


[root@vm**** td-agent]# cat teststream.conf
<source>
  type tail
  path /tmp/testlog.log
  pos_file /tmp/testlog.pos
  format none
  message_key log_line
  tag teststream.test.test
</source>

<match teststream.**>
  type kinesis_firehose

  region ap-northeast-1
  delivery_stream_name teststream
  flush_interval 1s
  include_time_key true
  include_tag_key true
  buffer_chunk_limit 1m
  try_flush_interval 0.1
  queued_chunk_flush_interval 0.01
  num_threads 1
</match>

まずは触ってみたが関連知識が不足しているのが実感できた。。
1歩ずつでも触ってアウトプットして行こう。

td-agentでログ欠損が発生した

fluent(td-agent)で、Buffer溢れにによるログ欠損が発生したのでその時の調査・対応を下記へ記載しておく。

・対応フロー

  • コンソールからtd-agentのログ状況確認
  • Bufferの蓄積状況確認
  • td-agent.conf確認
  • td-agent.conf設定変更
  • td-agnetサービス再起動

以下、設定環境:td-agent version0.12

□ Kibana上でログ欠損が発生

・コンソールからtd-agentのログ状況確認

# less /var/log/td-agent/td-agent.log
2018-09-11 03:28:43 +0900 [warn]: emit transaction failed: error_class=Fluent::BufferQueueLimitError error="queue size exceeds limit" tag="www.nginx.access-json"
  2018-09-11 03:28:43 +0900 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.40/lib/fluent/buffer.rb:204:in `block in emit'

queue size exceeds limitが発生してる。

・Bufferの蓄積状況確認

# cd {buffer_dir}
# ll -h /data/buffer/secure-forward.* |wc -l
2054

・td-agent.conf設定確認 (一部抜粋)

# less td-agent.conf
        buffer_chunk_limit  2m
        buffer_queue_limit  2048

・td-agent.conf設定変更 (一部抜粋)

# less td-agent.conf
        buffer_chunk_limit  2m
        buffer_queue_limit  4096

↑の設定変更によりログが流れた。
※今回、master -> standby構成を組んだがmaster側でBuffer溢れが発生している状況においても、standby側に流れない状況

また、今回設定した送信部分のconf内容を抜粋して記載。

  <match **>
    @id testid.match
    @type forest
    subtype copy
    <template>
      <store>
        @id secure_forward
        @type secure_forward
        self_hostname test123.co.jp
        shared_key testkey
        secure yes
        ca_cert_path /etc/td-agent/ca/ca-cert.pem
        keep_alive 300
        num_threads 1
        <server>
          host testbudder.co.jp
          port 24284
          username test
          password ******
        </server>
        flush_interval 0s
        try_flush_interval  0.1
        queued_chunk_flush_interval 0.01
        buffer_type file
        buffer_path /data/buffer/secure-forward_${tag}.*
        buffer_chunk_limit  2m
        buffer_queue_limit  6000000 # 2 * 6000000 = 120T
        retry_wait 5s # default: 1.0
#        retry_limit 10
        reload_connections true
        reload_on_failure true
        heartbeat_type tcp
        heartbeat_interval 10s

        phi_threshold 100 # default: 16
        hard_timeout 200s # default: equals to send_timeout
        require_ack_response true
        send_timeout 60s # default 60s
        ack_response_timeout 61s # default 190s

        max_retry_wait 1h # default: inifinity
        disable_retry_limit true # DO NOT discard buffer anyway
        require_ack_response true # Qos 1 (At least once)
        flush_at_shutdown true
      </store>
    </template>
  </match>

上記設定でログ欠損発生せず稼働している。

ref

fluentd 1.0 でログの欠損を防ぐ · the world as code

ansible playbook performance tuning

ansibleでplaybookを書いていたが、大規模化するにつれ実行時間を要するようになり調査・対応を下記へ記載しておく。

以下、設定環境:ansible version2.1

・ansible.confパラメータチューニング

# cat .ansible.cfg
[defaults]
transport=ssh
pipelining=True
forks=100

fact_caching = jsonfile
fact_caching_connection = /tmp/cache
fact_caching_timeout = 86400 # seconds

[ssh_connection]
ssh_args=-o ControlMaster=auto -o ControlPersist=30m
scp_if_ssh=True
control_path=%(directory)s/%%h-%%r

・strategy設定(linear, free)

  • linear
    • 今まで(〜1.9)と同じ動作
    • 全てのホストでそれぞれのタスクを次タスクに行く前に終わらせる
    • 並列処理で各ホストのタスクを実施する
    • デフォルトは5並列で並列数の設定が可能
    • 1.9以前と同様、serialパラメータとの併用で並列数が設定出来る
  • free
    • ホスト間においてタイミングの調整など取らずとにかく早く終わるように実行する

playbookにstrategyを記載

# cat test.yml

---
- hosts: testhosts
  become: yes
  strategy: free
  become_method: su
  become_user: root
  roles:
     - td-agent

規模が大きくなる場合、playbook実行に時間を要するようになる。
上記で設定でパフォーマンス改善が見込めそう。cacheをMWにする事によりより汎用性が高まりそうではある。

mongodbでのslowlog設定

Mongodb に関してslowlog出力設定での調査・対応を下記へ記載しておく。
mongosに設定追加は「profile currently not supported via mongos」とエラーとなる為、shard(mongod)に設定をする。
mongodへ設定追加後、mongosで解析行う。

  • 以下対応フロー
    • mongodで起動パラメータ追加(ログにスロークエリを出力)
      • /etc/init.d/mongodへパラメータ設定
    • 設定ファイルの出力確認
    • 必要に応じて閾値調整
    • db.currentOp()
      • 実行中のクエリを確認
    • db.coll.find().explain()
      • 実行計画を確認

以下、設定環境:mongo version2.4

・起動時パラメータ追加方法

#vi /etc/init.d/mongod

//--profile
//プロファイルのレベル
//1: 閾値以上の時間のかかったクエリを残す
//2: 全てのクエリを残す
//
//--slowms
//閾値(ms)

#OPTIONS=" -f $CONFIGFILE"
OPTIONS=" -f $CONFIGFILE --profile=1 --slowms=100"

・出力サンプルイメージ

Thu Aug  9 14:10:09.069 [conn40] query testsite_mongo.offerwall_click_data query: { $query: { $and: [ { app_id_to: 100 }, { user_id: "10999999" } ] }, $orderby: { create_date: -1 } } planSummary: IXSCAN { create_date: 1.0 } ntoreturn:2 ntoskip:0 nscanned:128538 nscannedObjects:128538 keyUpdates:0 numYields:0 locks(micros) r:113561 nreturned:0 reslen:20 113ms

上記での嵌ったポイントとして設定ファイルへの出力方法がある。
mongo shellから実行する場合、オンライン(db.setProfilingLevel(1,20))にてON/OFF可能であるがファイル出力方法に時間を要した。
※出力先はsystem.profileというcollectionに収集され、ログ出力されない
起動時のパラメータを上記設定追加する事で正常稼働した。

ref)

MongoDBで実行したクエリをログに出力する

hiyokur.hatenablog.com

stackoverflow.com

https://www.mysoftkey.com/mongodb/profiling-for-slow-query-log-in-mongodb/

logrotate設定のエラー改善されない事象に関して

apacheaccess_logをカスタム出力設定後、ローテート設定したが正常稼働せず調査・対応を下記へ記載しておく。

  • 以下対応フロー
    • apacheaccess_logを稼働出力設定する
    • ログローテート設定に上記出力したファイルをローテート設定
    • ログローテート実行
    • ログローテートエラー発生
    • ログローテート設定修正
    • ログローテート再実行
      • ログローテートエラー改善されず
    • ログローテートスタータスファイル削除
    • ログローテート再実行
      • ログローテート正常稼働

以下ポイント部分のみ抜粋

// option
// d:dry run
// v:詳細出力
// f:強制実行

// ログローテートテスト実行
# /usr/sbin/logrotate -dv /etc/logrotate.d/apache

// ログローテート実行
# /usr/sbin/logrotate /etc/logrotate.d/apache

// ログローテート強制実行
# /usr/sbin/logrotate -f /etc/logrotate.d/apache

// ログローテート状況保存ファイル削除
# rm /var/lib/logrotate.status

上記での嵌ったポイントとして/var/lib/logrotate.statusファイル削除を行ってない場合エラー改善されない事がある

apacheのaccess_logをjson化しtd-agentで集約サーバへ収集する

apacheaccess_logを構築した集約サーバへ収集する必要があり、そのついでに解析しやすいようjson化設定したのでメモ。
apacheログをjson化する要件が発生した場合汎用性がありそうなので、調査・対応を下記へ記載しておく。

  • 以下構築要件
    • apacheaccess_logを集約サーバへ収集する
    • 集約サーバで解析しやすいようjson形式で送信
    • td-agentを使用し、送信時でのformatで正規表現指定しない

apacheのconfファイルにjsonフォーマット指定追加

# vi http.conf
// フォーマット指定
LogFormat "{\"timestamp\":\"%{%Y-%m-%dT%H:%M:%S}t.%{msec_frac}t%{%z}t\", \"remote_ip\":\"%a\", \"port\":%{local}p, \"local_ip\":\"%A\", \"httpHost\":\"%v\", \"reqTime\":\"%T\", \"reqStatus\":%>s, \"protocol\":\"%H\", \"bytes_sent\":%O, \"bytes_received\":%I, \"reqTime\":%D, \"file_path\":\"%f\", \"query\":\"%q\", \"method\":\"%m\", \"reqURL\":\"%U\", \"request\":\"%r\", \"ua\":\"%{User-Agent}i\", \"referrer\":\"%{Referer}i\"}" json

//カスタムログにてログ出力
CustomLog /usr/local/apache2/logs/test.com_ssl_access.443.json.log json

以下td-agent2環境fluentd v0.12以降のバージョン向けの設定例

# vi apache.access.conf
<source>
  @id in_apache_http_com_json
  @type   config_expander
   <config>
    @type tail
    path /usr/local/apache2/logs/test.com_ssl_access.443.json.log
    pos_file /var/log/td-agent/in.apache.com.access.443.json.pos

    read_from_head true
    keep_time_key true
    refresh_interval 1s
    format json
    time_key  time
    time_format %Y-%m-%dT%H:%M:%S
    keep_time_key true

    tag test.apache.access
    @label @testserver
  </config>
</source>

後は、上記ファイルをtd-agent.confで@includeし、送信設定(forward)すれば送信される。
※必要に応じてローテート設定行う

# vi /etc/logrotate.d/apache
/usr/local/apache2/logs/*.log {
    daily
    rotate 30
    compress
    missingok
    notifempty
    sharedscripts
    postrotate
        /usr/local/apache2/bin/apachectl graceful > /dev/null 2>/dev/null || true
    endscript
}

ref)

System 5 Admins 0: Apache Access Logs to JSON

カスタムログ形式 - ログのストリーム配信 | Fastly Help Guides

shで日時バックアップscript

shellで日時バックアップ処理を行う必要があった為、設定時メモ。

  • 以下構築条件
    • デイリーバックアップ
    • 圧縮必須
    • suffixが日時のみ処理
    • 開始・終了時刻が記載

本条件でshell scriptを書いた

#!/bin/sh

set -e

readonly SCRIPT_NAME=${0##*/}
readonly HISTORY=/root/logrotate/history.log

LOG_DATE_FORMAT(){
 echo `date '+%Y/%m/%d %H:%M:%S %a'`
}

echo  `LOG_DATE_FORMAT` `hostname -s` ${SCRIPT_NAME}[$$]: START >> ${HISTORY}


#: "テスト" && {
#    echo 今日の日付 … $(date +'%Y-%m-%d %H:%M')
#    echo 更新日が1日以上過去のファイルを探す: &&
#        find /data/td-agent/ -type f -mtime +0  -regextype posix-basic -regex ".*[0-9]\{4\}[0-9]\{1,2\}[0-9]\{1,2\}"
#        find /data/td-agent/ -not -name "*.gz" -type f -mtime +0  -regextype posix-basic -regex ".*[0-9]\{4\}[0-9]\{1,2\}[0-9]\{1,2\}.*"
#    echo 更新日が2日以上過去のファイルを探す: &&
#        find /data/td-agent/ -not -name "*.gz" -type f -mtime +1
#    echo 更新日が3日以上過去のファイルを探す: &&
#        find /data/td-agent/ -not -name "*.gz" -type f -mtime +2
#}


/bin/find /var/td-agent/store/ -not -name "*.gz" -type f -mtime +0 -regextype posix-basic -regex '.*[0-9]\{4\}[0-9]\{1,2\}[0-9]\{1,2\}' -exec gzip {} \;
/bin/find /var/td-agent/store/ -name "*.gz" -type f -mtime +1 -exec rm {} \;

echo  `LOG_DATE_FORMAT` `hostname -s` ${SCRIPT_NAME}[$$]: STOP >> ${HISTORY}

exit 0

後は、cronなりで起動設定すればバックアップされる。