aws環境でfluentd経由を用いたログ基盤構築
aws環境でログ基盤を構築する必要があり、周辺関連の知識がたりなさすぎたので調査した時の勉強メモ。
- lamda関数
- 処理フロー
- クラアント(td-agent)→Kinesis firehose→lamdba→s3
# # lamdba # import boto3 import json import base64 import time import sys import pprint from datetime import datetime def lambda_handler(event, context): firehose = boto3.client('firehose') s3 = boto3.resource('s3') output = [] # # firehose # import pprint; pprint.pprint('firehose------') import pprint; pprint.pprint(firehose) strjson = json.dumps('ccc') encodedata = base64.b64encode(strjson.encode()) output_json = { 'recordId': event['records'][num]['recordId'], 'result': 'Ok', 'data': event['records'][num]['data'], } output.append(output_json) import pprint; pprint.pprint(event) import pprint; pprint.pprint(event['records']) import pprint; pprint.pprint(event['records'][num]['recordId']) # # s3 # bucket = 'testfirehosetokyo' key = 'test/test_' + datetime.now().strftime('%Y-%m-%d-%H-%M-%S') + '.txt' file_contents = str(output) time_info = [] stime = time.time() s3_obj = s3.Object(bucket, key) result = s3_obj.put( Body=file_contents ) etime = time.time() time_info.append(etime - stime) return { 'records': output }
- GlueでETL処理
- 処理フロー
- s3→Glueで既存項目にColumn追加→s3(指定パスへ出力)
# # Glue # import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.dynamicframe import DynamicFrame from pyspark.sql.functions import * from pyspark.sql.functions import lit ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) # 引数からパスを生成 basepath = "s3://testfirehosetokyo/sample_glue_for_result" s3path = basepath + "/test" sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [database = "sample_glue_db", table_name = "10", transformation_ctx = "datasource0"] ## @return: datasource0 ## @inputs: [] datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "sample_glue_db", table_name = "10", transformation_ctx = "datasource0") ## @type: ApplyMapping ## @args: [mapping = [("time", "string", "time", "string"), ("message", "string", "message", "string"), ("worker", "int", "worker", "int")], transformation_ctx = "applymapping1"] ## @return: applymapping1 ## @inputs: [frame = datasource0] applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("time", "string", "time", "string"), ("message", "string", "message", "string"), ("worker", "int", "worker", "int"),("time", "string", "testcol", "string")], transformation_ctx = "applymapping1") ## @type: DataSink ## @args: [connection_type = "s3", connection_options = {"path": "s3://testfirehosetokyo/sample_glue_for_result"}, format = "json", transformation_ctx = "datasink2"] ## @return: datasink2 ## @inputs: [frame = applymapping1] #datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://testfirehosetokyo/sample_glue_for_result"}, format = "json", transformation_ctx = "datasink2") df = applymapping1.toDF() #Column追加 df = df.withColumn('pint2', lit(123)) dyf = DynamicFrame.fromDF(df, glueContext, 'dyf') # DynamicFrameをJSON形式でS3に書き出す glueContext.write_dynamic_frame.from_options( frame=dyf, connection_type='s3', connection_options={'path':s3path}, format='json')
- ログ送信側クラアント設定
- 処理フロー
- クラアント(td-agent)→Kinesis firehose
# # td-agent # yum install redhat-lsb-core yum install aws-kinesis-agent td-agent-gem install fluent-plugin-kinesis ※1x系(version指定) td-agent-gem install fluent-plugin-kinesis --no-document -V -v 1.3.0 [root@vm**** td-agent]# cat teststream.conf <source> type tail path /tmp/testlog.log pos_file /tmp/testlog.pos format none message_key log_line tag teststream.test.test </source> <match teststream.**> type kinesis_firehose region ap-northeast-1 delivery_stream_name teststream flush_interval 1s include_time_key true include_tag_key true buffer_chunk_limit 1m try_flush_interval 0.1 queued_chunk_flush_interval 0.01 num_threads 1 </match>
まずは触ってみたが関連知識が不足しているのが実感できた。。
1歩ずつでも触ってアウトプットして行こう。
td-agentでログ欠損が発生した
fluent(td-agent)で、Buffer溢れにによるログ欠損が発生したのでその時の調査・対応を下記へ記載しておく。
・対応フロー
- コンソールからtd-agentのログ状況確認
- Bufferの蓄積状況確認
- td-agent.conf確認
- td-agent.conf設定変更
- td-agnetサービス再起動
以下、設定環境:td-agent version0.12
□ Kibana上でログ欠損が発生
・コンソールからtd-agentのログ状況確認
# less /var/log/td-agent/td-agent.log 2018-09-11 03:28:43 +0900 [warn]: emit transaction failed: error_class=Fluent::BufferQueueLimitError error="queue size exceeds limit" tag="www.nginx.access-json" 2018-09-11 03:28:43 +0900 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.40/lib/fluent/buffer.rb:204:in `block in emit'
queue size exceeds limit
が発生してる。
・Bufferの蓄積状況確認
# cd {buffer_dir} # ll -h /data/buffer/secure-forward.* |wc -l 2054
・td-agent.conf設定確認 (一部抜粋)
# less td-agent.conf buffer_chunk_limit 2m buffer_queue_limit 2048
・td-agent.conf設定変更 (一部抜粋)
# less td-agent.conf buffer_chunk_limit 2m buffer_queue_limit 4096
↑の設定変更によりログが流れた。
※今回、master -> standby構成を組んだがmaster側でBuffer溢れが発生している状況においても、standby側に流れない状況
。
また、今回設定した送信部分のconf内容を抜粋して記載。
<match **> @id testid.match @type forest subtype copy <template> <store> @id secure_forward @type secure_forward self_hostname test123.co.jp shared_key testkey secure yes ca_cert_path /etc/td-agent/ca/ca-cert.pem keep_alive 300 num_threads 1 <server> host testbudder.co.jp port 24284 username test password ****** </server> flush_interval 0s try_flush_interval 0.1 queued_chunk_flush_interval 0.01 buffer_type file buffer_path /data/buffer/secure-forward_${tag}.* buffer_chunk_limit 2m buffer_queue_limit 6000000 # 2 * 6000000 = 120T retry_wait 5s # default: 1.0 # retry_limit 10 reload_connections true reload_on_failure true heartbeat_type tcp heartbeat_interval 10s phi_threshold 100 # default: 16 hard_timeout 200s # default: equals to send_timeout require_ack_response true send_timeout 60s # default 60s ack_response_timeout 61s # default 190s max_retry_wait 1h # default: inifinity disable_retry_limit true # DO NOT discard buffer anyway require_ack_response true # Qos 1 (At least once) flush_at_shutdown true </store> </template> </match>
上記設定でログ欠損発生せず稼働している。
ref
ansible playbook performance tuning
ansibleでplaybookを書いていたが、大規模化するにつれ実行時間を要するようになり調査・対応を下記へ記載しておく。
以下、設定環境:ansible version2.1
・ansible.confパラメータチューニング
# cat .ansible.cfg [defaults] transport=ssh pipelining=True forks=100 fact_caching = jsonfile fact_caching_connection = /tmp/cache fact_caching_timeout = 86400 # seconds [ssh_connection] ssh_args=-o ControlMaster=auto -o ControlPersist=30m scp_if_ssh=True control_path=%(directory)s/%%h-%%r
・strategy設定(linear, free)
- linear
- 今まで(〜1.9)と同じ動作
- 全てのホストでそれぞれのタスクを次タスクに行く前に終わらせる
- 並列処理で各ホストのタスクを実施する
- デフォルトは5並列で並列数の設定が可能
- 1.9以前と同様、serialパラメータとの併用で並列数が設定出来る
- free
- ホスト間においてタイミングの調整など取らずとにかく早く終わるように実行する
playbookにstrategyを記載
# cat test.yml
---
- hosts: testhosts
become: yes
strategy: free
become_method: su
become_user: root
roles:
- td-agent
規模が大きくなる場合、playbook実行に時間を要するようになる。
上記で設定でパフォーマンス改善が見込めそう。cacheをMWにする事によりより汎用性が高まりそうではある。
mongodbでのslowlog設定
Mongodb に関してslowlog出力設定での調査・対応を下記へ記載しておく。
mongosに設定追加は「profile currently not supported via mongos」とエラーとなる為、shard(mongod)に設定をする。
mongodへ設定追加後、mongosで解析行う。
- 以下対応フロー
- mongodで起動パラメータ追加(ログにスロークエリを出力)
- /etc/init.d/mongodへパラメータ設定
- 設定ファイルの出力確認
- 必要に応じて閾値調整
- db.currentOp()
- 実行中のクエリを確認
- db.coll.find().explain()
- 実行計画を確認
- mongodで起動パラメータ追加(ログにスロークエリを出力)
以下、設定環境:mongo version2.4
・起動時パラメータ追加方法
#vi /etc/init.d/mongod //--profile //プロファイルのレベル //1: 閾値以上の時間のかかったクエリを残す //2: 全てのクエリを残す // //--slowms //閾値(ms) #OPTIONS=" -f $CONFIGFILE" OPTIONS=" -f $CONFIGFILE --profile=1 --slowms=100"
・出力サンプルイメージ
Thu Aug 9 14:10:09.069 [conn40] query testsite_mongo.offerwall_click_data query: { $query: { $and: [ { app_id_to: 100 }, { user_id: "10999999" } ] }, $orderby: { create_date: -1 } } planSummary: IXSCAN { create_date: 1.0 } ntoreturn:2 ntoskip:0 nscanned:128538 nscannedObjects:128538 keyUpdates:0 numYields:0 locks(micros) r:113561 nreturned:0 reslen:20 113ms
上記での嵌ったポイントとして設定ファイルへの出力方法
がある。
mongo shellから実行する場合、オンライン(db.setProfilingLevel(1,20))にてON/OFF可能であるがファイル出力方法に時間を要した。
※出力先はsystem.profileというcollectionに収集され、ログ出力されない
起動時のパラメータを上記設定追加する事で正常稼働した。
ref)
https://www.mysoftkey.com/mongodb/profiling-for-slow-query-log-in-mongodb/
logrotate設定のエラー改善されない事象に関して
apacheのaccess_logをカスタム出力設定後、ローテート設定したが正常稼働せず調査・対応を下記へ記載しておく。
- 以下対応フロー
以下ポイント部分のみ抜粋
// option // d:dry run // v:詳細出力 // f:強制実行 // ログローテートテスト実行 # /usr/sbin/logrotate -dv /etc/logrotate.d/apache // ログローテート実行 # /usr/sbin/logrotate /etc/logrotate.d/apache // ログローテート強制実行 # /usr/sbin/logrotate -f /etc/logrotate.d/apache // ログローテート状況保存ファイル削除 # rm /var/lib/logrotate.status
上記での嵌ったポイントとして/var/lib/logrotate.status
ファイル削除を行ってない場合エラー改善されない事がある
apacheのaccess_logをjson化しtd-agentで集約サーバへ収集する
apacheのaccess_logを構築した集約サーバへ収集する必要があり、そのついでに解析しやすいようjson化設定したのでメモ。
apacheログをjson化する要件が発生した場合汎用性がありそうなので、調査・対応を下記へ記載しておく。
apacheのconfファイルにjsonフォーマット指定追加
# vi http.conf // フォーマット指定 LogFormat "{\"timestamp\":\"%{%Y-%m-%dT%H:%M:%S}t.%{msec_frac}t%{%z}t\", \"remote_ip\":\"%a\", \"port\":%{local}p, \"local_ip\":\"%A\", \"httpHost\":\"%v\", \"reqTime\":\"%T\", \"reqStatus\":%>s, \"protocol\":\"%H\", \"bytes_sent\":%O, \"bytes_received\":%I, \"reqTime\":%D, \"file_path\":\"%f\", \"query\":\"%q\", \"method\":\"%m\", \"reqURL\":\"%U\", \"request\":\"%r\", \"ua\":\"%{User-Agent}i\", \"referrer\":\"%{Referer}i\"}" json //カスタムログにてログ出力 CustomLog /usr/local/apache2/logs/test.com_ssl_access.443.json.log json
以下td-agent2環境fluentd v0.12以降のバージョン
向けの設定例
# vi apache.access.conf <source> @id in_apache_http_com_json @type config_expander <config> @type tail path /usr/local/apache2/logs/test.com_ssl_access.443.json.log pos_file /var/log/td-agent/in.apache.com.access.443.json.pos read_from_head true keep_time_key true refresh_interval 1s format json time_key time time_format %Y-%m-%dT%H:%M:%S keep_time_key true tag test.apache.access @label @testserver </config> </source>
後は、上記ファイルをtd-agent.confで@includeし、送信設定(forward)すれば送信される。
※必要に応じてローテート設定行う
# vi /etc/logrotate.d/apache /usr/local/apache2/logs/*.log { daily rotate 30 compress missingok notifempty sharedscripts postrotate /usr/local/apache2/bin/apachectl graceful > /dev/null 2>/dev/null || true endscript }
ref)
shで日時バックアップscript
shellで日時バックアップ処理を行う必要があった為、設定時メモ。
- 以下構築条件
- デイリーバックアップ
- 圧縮必須
- suffixが日時のみ処理
- 開始・終了時刻が記載
本条件でshell scriptを書いた
#!/bin/sh set -e readonly SCRIPT_NAME=${0##*/} readonly HISTORY=/root/logrotate/history.log LOG_DATE_FORMAT(){ echo `date '+%Y/%m/%d %H:%M:%S %a'` } echo `LOG_DATE_FORMAT` `hostname -s` ${SCRIPT_NAME}[$$]: START >> ${HISTORY} #: "テスト" && { # echo 今日の日付 … $(date +'%Y-%m-%d %H:%M') # echo 更新日が1日以上過去のファイルを探す: && # find /data/td-agent/ -type f -mtime +0 -regextype posix-basic -regex ".*[0-9]\{4\}[0-9]\{1,2\}[0-9]\{1,2\}" # find /data/td-agent/ -not -name "*.gz" -type f -mtime +0 -regextype posix-basic -regex ".*[0-9]\{4\}[0-9]\{1,2\}[0-9]\{1,2\}.*" # echo 更新日が2日以上過去のファイルを探す: && # find /data/td-agent/ -not -name "*.gz" -type f -mtime +1 # echo 更新日が3日以上過去のファイルを探す: && # find /data/td-agent/ -not -name "*.gz" -type f -mtime +2 #} /bin/find /var/td-agent/store/ -not -name "*.gz" -type f -mtime +0 -regextype posix-basic -regex '.*[0-9]\{4\}[0-9]\{1,2\}[0-9]\{1,2\}' -exec gzip {} \; /bin/find /var/td-agent/store/ -name "*.gz" -type f -mtime +1 -exec rm {} \; echo `LOG_DATE_FORMAT` `hostname -s` ${SCRIPT_NAME}[$$]: STOP >> ${HISTORY} exit 0
後は、cronなりで起動設定すればバックアップされる。