fluentdで送信recordに文字列を代入してから送信する
タイトル通り実装する要件があり、調査・設定行った時の勉強メモ。
fluentdを使用したログ集約環境で、送信データの特定フィールドの値をごにょごにょ加工したい(次工程に渡す)っていう要件があります。
調査の結果、fluentdのfilter_record_transformerプラグインで実装可能との事で設定してみた。
record_transformer Filter Plugin | Fluentd
- 構築手順
- td-agent-gem install fluentd -v 0.12.x
- ※今回はVersion
0.12.x
を使用 - ※fluentdに内包されているのでプラグインのインストールは必要ありません
- ※今回はVersion
- out.conf(送信設定conf)作成
- td-agent再起動(設定反映)
- td-agent-gem install fluentd -v 0.12.x
書き方は以下ようになります。
タイトルで代入と記載してますが、代入時に同名のフィールドがあればその値が代入項目にマップされる動作になっています。
<filter foo.bar> @type record_transformer <record> hostname "#{Socket.gethostname}" tag ${tag} </record> </filter>
また、enable_rubyオプションを有効にした場合、Ruby式の評価結果を値にする事が可能です。
計算処理や文字列操作なんかで使えそうで便利です。
以下にサンプル処理を記載します。
特定recordに新規項目代入・含まれる特定の文字列削除(gsubメソッド使用)・更新等で使用可能です。
# ### sample log scrubbing filters # #replace social security numbers # <filter reform.**> # @type record_transformer # enable_ruby true # <record> # log ${record["log"].gsub(/[0-9]{3}-*[0-9]{2}-*[0-9]{4}/,"xxx-xx-xxxx")} # </record> # </filter> # # replace credit card numbers that appear in the logs # <filter reform.**> # @type record_transformer # enable_ruby true # <record> # log ${record["log"].gsub(/[0-9]{4} *[0-9]{4} *[0-9]{4} *[0-9]{4}/,"xxxx xxxx xxxx xxxx")} # </record> # </filter> # # replace email addresses that appear in the logs # <filter reform.**> # @type record_transformer # enable_ruby true # <record> # log ${record["log"].gsub(/[\w+\-]+@[a-z\d\-]+(\.[a-z\d\-]+)*\.[a-z]+/i,"user@email.tld")} # </record> # </filter> # # replace url addresses that appear in the logs # <filter reform.**> # @type record_transformer # enable_ruby true # <record> # log ${record["log"].gsub(/\.sub\.test\.com|\.sub\.test\.co\.jp/,"")}} # </record> # </filter> # ### end sample log scrubbing filters
これでログ集約時、送信前に加工処理し送信可能です。
amazon linux2でのpython3環境構築
Lambdaをpython3で書く必要があり、まずは環境構築を行った時の勉強メモ。
- 構築手順
- python3 install
- pip3 install
- reqest package作成
- boto3 package作成
- Token取得
- ※取得したTokenが正常か確認
・python3 install $ sudo yum install python36 python36-virtualenv python36-pip $ python3 -V Python 3.6.5 ・pip3 install install python3-pip ・reqest package pip3 install requests -t ./ rm -rf *.dist-info bin pip3 install pipdeptree sudo pip3 install pipdeptree zip -r reqest.zip ./* ・boto3 package pip install -t ./python boto3 zip -r boto3-1.9.67.zip python cp boto3-1.9.67.zip /home/ec2-user/ [2019.02.14 add] aws linux(ec2/旧環境)で「No package」対応 ># yum install python3 >Loaded plugins: priorities, update-motd, upgrade-helper >amzn-main/latest | 2.1 kB 00:00 >amzn-updates/latest | 2.5 kB 00:00 >21 packages excluded due to repository priority protections >No package python3 available. >Error: Nothing to do ・現環境確認 pip --version ・Repository確認 yum list available | grep "python" | grep "libs" ・python3インストール yum install python36 python36-virtualenv python36-pip pip-3.6 -V ・必要パケージ収集 mkdir lambda_test/ cd lambda_test/ pip install --upgrade pip pip-3.6 install requests bs4 slackweb -t . rm -rf *.dist-info *.egg-info bin ・パケージ化 zip -r scraping_pkg.zip ./*
・Token取得について
https://api.slack.com/apps/AE******/oauth? ->[OAuth&Permission] ->「Bot User OAuth Access Token」
Tokenが正常認証許可するか確認
https://slack.com/api/chat.postMessage?token=xoxb-xxxxxx-xxxxxx-xxxxxx&channel=test&text=%22Hello%22
↓結果
{"ok":true,"channel":"CERG265JT","ts":"1545287527.000100","message":{"type":"message","subtype":"bot_message","text":"\"Hello\"","ts":"1545287527.000100","username":"slack-test-func","bot_id":"BE9TMPEUQ"}}
まず、python3→Token取得まで確認した。
サーバレスアーキテクチャでLambdaを選択する場合に使えそう。
ref)
https://goodbyegangster.hatenablog.com/entry/2018/05/22/024513
https://qiita.com/SHASE03/items/16fd31d3698f207b42c9
https://qiita.com/hayao_k/items/b9750cc8fa69d0ce91b0
https://qiita.com/ykhirao/items/3b19ee6a1458cfb4ba21
aws cuiを使用してKinesis streamデータ登録・確認
Kinesisはライブラリとして使用する事が多い為、実際の挙動を把握せず使ってるので調査した時の勉強メモ。
実際の挙動を把握する事で障害対応等で効率的に行動出来るようにする。
■ストリーム作成
aws kinesis create-stream --stream-name <ストリーム名> --shard-count <シャード数>
■レコード登録
# "ShardId": "shardId-000000000000" と出力されればOK aws kinesis put-record --stream-name <ストリーム名> --partition-key 123 --data <レコードに保存したい文字列>
■レコード確認
aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name <ストリーム名> --query 'ShardIterator' { "ShardIterator": "AAAAAAAAAAE2R*******************==" } aws kinesis get-records --shard-iterator "AAAAAAAAAAE2R*******************==" { "Records": [ { "Data": "aG9nZWhvZ2U=", "PartitionKey": "123", "ApproximateArrivalTimestamp": 1546849899.19, "SequenceNumber": "49591791737946085028906975044094438871988285374419959810" }, { "Data": "aG9nZTE=", "PartitionKey": "123", "ApproximateArrivalTimestamp": 1546851156.579, "SequenceNumber": "49591791737946085028906975051711880461380150184625831938" } ], "NextShardIterator": "AAAAAAAAAAFQkY****************==", "MillisBehindLatest": 0 }
■簡易確認の為、上記手順をスクリプト化
$ cat chk-kinesis-record.sh #!/bin/bash KINESIS_NAME=<ストリーム名> # 引数チェック #echo "args = "$# ARGS1=${1:?} #echo $ARGS1 # レコード登録 aws kinesis put-record --stream-name $KINESIS_NAME --partition-key 123 --data $ARGS1 # shardIdからiterator値取得 SHARD_ITERATOR=$(aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name $KINESIS_NAME --query 'ShardIterator') # レコード取得 aws kinesis get-records --shard-iterator $SHARD_ITERATOR
■kinesis→レコード登録・確認
- スクリプト引数として、登録レコード文字列指定
$ ./chk-kinesis-record.sh hoge1 { "ShardId": "shardId-000000000000", "SequenceNumber": "49591791737946085028906975051711880461380150184625831938" } { "Records": [ { "Data": "aG9nZWhvZ2U=", "PartitionKey": "123", "ApproximateArrivalTimestamp": 1546849899.19, "SequenceNumber": "49591791737946085028906975044094438871988285374419959810" }, { "Data": "aG9nZTE=", "PartitionKey": "123", "ApproximateArrivalTimestamp": 1546851156.579, "SequenceNumber": "49591791737946085028906975051711880461380150184625831938" } ], "NextShardIterator": "AAAAAAAAAAFQkY****************==", "MillisBehindLatest": 0 }
指定データをKinesisがBase64 エンコーディング後、Dataに文字列が格納される。
Base64 デコーディングする事により元データ参照可能。
Base64 エンコーディング・デコーディングサイト:
Base64 Decode and Encode - Online
また、時刻に関してはtimestamp形式で格納される。
UNIX時間(UNIX時刻)から日付に変換サイト:
UNIX時間⇒日付変換 - 高精度計算サイト
■ストリーム削除
aws kinesis delete-stream --stream-name <ストリーム名>
■AWS WEBコンソールからKinesisのData Streamの挙動モニタリング
AWS Kinesis → Data Stream → ストリーム名 → モニタリングへ移動
- 下記図を確認
- GetRecords.Bytes
- GetRecords.Records
- PutRecord.Bytes
- PutRecords.Records
図の中に青い丸(単一データ)、又は青い線(複数データ)でグラフ化されているば正常
- ※ 上部赤い線は制限値(Maxの値)なので無視で問題ない
CloudWatchでCustomMetrix設定
表記の通りCloudWatchに監視項目を追加する必要が発生したので調査した時の勉強メモ。
- CustomMetrix追加手順
■CustomMetrix監視項目
#!/bin/bash FILTER="*testItem*" export AWS_CLOUDWATCH_HOME=/usr/local/bin/CloudWatch-1.0.20.0 export AWS_CREDENTIAL_FILE=$AWS_CLOUDWATCH_HOME/credential export PATH=$AWS_CLOUDWATCH_HOME/bin:$PATH export EC2_REGION=ap-northeast-1 export JAVA_HOME=/usr/lib/jvm/jre InstanceId=$(curl -s http://169.254.169.254/latest/meta-data/instance-id) NodeCnt=`~/.local/bin/aws ec2 describe-instances --filters "Name=tag:Name,Values=$FILTER" "Name=instance-state-name,Values=running" --query "Reservations[].Instances[].[InstanceId]" --output text |wc -l` echo $NodeCnt /usr/local/bin/CloudWatch-1.0.20.0/bin/mon-put-data --metric-name "CustomMetrix Item" --namespace "CustomMetrix" --dimensions "InstanceId=$InstanceId" --value "$NodeCnt"
■crontab設定
$ crontab -l */5 * * * * /bin/sh /home/centos/chk-item 1> /dev/null
設定に問題無ければ、5分間隔でグラフ描写される。
監視項目(CustomMetrix)の追加が簡易に行えるのawsは便利。
terminal is not big enough
sshターミナル環境でtopコマンド発行時、terminal is not big enough
が表示され出力されないので調査した時の勉強メモ。
- sshターミナルでtopコマンド発行
- 発生内容:Sorry, terminal is not big enough
- 対応:代替えコマンド発行(mpstat -P ALL)
[root@**** ~]# top top - 12:29:03 up 67 days, 47 min, 1 user, load average: 0.17, 0.15, 0.15 Tasks: 648 total, 1 running, 647 sleeping, 0 stopped, 0 zombie Cpu(s): 1.7%us, 0.4%sy, 0.0%ni, 97.9%id, 0.0%wa, 0.0%hi, 0.1%si, 0.0%st Mem: 132111780k total, 85066756k used, 47045024k free, 370724k buffers Swap: 524284k total, 250892k used, 273392k free, 75117400k cached Sorry, terminal is not big enough PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND 3256 root 20 0 15.2g 4.2g 5212 S 50.2 3.3 16477:55 ruby 17742 root 20 0 15424 1564 832 R 3.9 0.0 0:00.02 top 1 root 20 0 19340 1348 1124 S 0.0 0.0 0:01.96 init
↑でSorry, terminal is not big enough
発生。
■代替えコマンド発行
[root@**** ~]# mpstat -P ALL Linux 2.6.32-696.16.1.el6.x86_64 (www.test.co.jp) 12/11/2018 _x86_64_ (32 CPU) 12:24:03 PM CPU %usr %nice %sys %iowait %irq %soft %steal %guest %idle 12:24:03 PM all 1.65 0.00 0.39 0.00 0.00 0.07 0.00 0.00 97.88 12:24:03 PM 0 1.80 0.00 0.60 0.01 0.00 0.06 0.00 0.00 97.53 12:24:03 PM 1 1.71 0.00 0.31 0.00 0.00 0.00 0.00 0.00 97.97 ...
CPUコア毎に確認出来た。
さまざまなアプローチ方法を知っているかがポイントになる。
aws環境でfluentd経由を用いたログ基盤構築
aws環境でログ基盤を構築する必要があり、周辺関連の知識がたりなさすぎたので調査した時の勉強メモ。
- lamda関数
- 処理フロー
- クラアント(td-agent)→Kinesis firehose→lamdba→s3
# # lamdba # import boto3 import json import base64 import time import sys import pprint from datetime import datetime def lambda_handler(event, context): firehose = boto3.client('firehose') s3 = boto3.resource('s3') output = [] # # firehose # import pprint; pprint.pprint('firehose------') import pprint; pprint.pprint(firehose) strjson = json.dumps('ccc') encodedata = base64.b64encode(strjson.encode()) output_json = { 'recordId': event['records'][num]['recordId'], 'result': 'Ok', 'data': event['records'][num]['data'], } output.append(output_json) import pprint; pprint.pprint(event) import pprint; pprint.pprint(event['records']) import pprint; pprint.pprint(event['records'][num]['recordId']) # # s3 # bucket = 'testfirehosetokyo' key = 'test/test_' + datetime.now().strftime('%Y-%m-%d-%H-%M-%S') + '.txt' file_contents = str(output) time_info = [] stime = time.time() s3_obj = s3.Object(bucket, key) result = s3_obj.put( Body=file_contents ) etime = time.time() time_info.append(etime - stime) return { 'records': output }
- GlueでETL処理
- 処理フロー
- s3→Glueで既存項目にColumn追加→s3(指定パスへ出力)
# # Glue # import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.dynamicframe import DynamicFrame from pyspark.sql.functions import * from pyspark.sql.functions import lit ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) # 引数からパスを生成 basepath = "s3://testfirehosetokyo/sample_glue_for_result" s3path = basepath + "/test" sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [database = "sample_glue_db", table_name = "10", transformation_ctx = "datasource0"] ## @return: datasource0 ## @inputs: [] datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "sample_glue_db", table_name = "10", transformation_ctx = "datasource0") ## @type: ApplyMapping ## @args: [mapping = [("time", "string", "time", "string"), ("message", "string", "message", "string"), ("worker", "int", "worker", "int")], transformation_ctx = "applymapping1"] ## @return: applymapping1 ## @inputs: [frame = datasource0] applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("time", "string", "time", "string"), ("message", "string", "message", "string"), ("worker", "int", "worker", "int"),("time", "string", "testcol", "string")], transformation_ctx = "applymapping1") ## @type: DataSink ## @args: [connection_type = "s3", connection_options = {"path": "s3://testfirehosetokyo/sample_glue_for_result"}, format = "json", transformation_ctx = "datasink2"] ## @return: datasink2 ## @inputs: [frame = applymapping1] #datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://testfirehosetokyo/sample_glue_for_result"}, format = "json", transformation_ctx = "datasink2") df = applymapping1.toDF() #Column追加 df = df.withColumn('pint2', lit(123)) dyf = DynamicFrame.fromDF(df, glueContext, 'dyf') # DynamicFrameをJSON形式でS3に書き出す glueContext.write_dynamic_frame.from_options( frame=dyf, connection_type='s3', connection_options={'path':s3path}, format='json')
- ログ送信側クラアント設定
- 処理フロー
- クラアント(td-agent)→Kinesis firehose
# # td-agent # yum install redhat-lsb-core yum install aws-kinesis-agent td-agent-gem install fluent-plugin-kinesis ※1x系(version指定) td-agent-gem install fluent-plugin-kinesis --no-document -V -v 1.3.0 [root@vm**** td-agent]# cat teststream.conf <source> type tail path /tmp/testlog.log pos_file /tmp/testlog.pos format none message_key log_line tag teststream.test.test </source> <match teststream.**> type kinesis_firehose region ap-northeast-1 delivery_stream_name teststream flush_interval 1s include_time_key true include_tag_key true buffer_chunk_limit 1m try_flush_interval 0.1 queued_chunk_flush_interval 0.01 num_threads 1 </match>
まずは触ってみたが関連知識が不足しているのが実感できた。。
1歩ずつでも触ってアウトプットして行こう。
td-agentでログ欠損が発生した
fluent(td-agent)で、Buffer溢れにによるログ欠損が発生したのでその時の調査・対応を下記へ記載しておく。
・対応フロー
- コンソールからtd-agentのログ状況確認
- Bufferの蓄積状況確認
- td-agent.conf確認
- td-agent.conf設定変更
- td-agnetサービス再起動
以下、設定環境:td-agent version0.12
□ Kibana上でログ欠損が発生
・コンソールからtd-agentのログ状況確認
# less /var/log/td-agent/td-agent.log 2018-09-11 03:28:43 +0900 [warn]: emit transaction failed: error_class=Fluent::BufferQueueLimitError error="queue size exceeds limit" tag="www.nginx.access-json" 2018-09-11 03:28:43 +0900 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.40/lib/fluent/buffer.rb:204:in `block in emit'
queue size exceeds limit
が発生してる。
・Bufferの蓄積状況確認
# cd {buffer_dir} # ll -h /data/buffer/secure-forward.* |wc -l 2054
・td-agent.conf設定確認 (一部抜粋)
# less td-agent.conf buffer_chunk_limit 2m buffer_queue_limit 2048
・td-agent.conf設定変更 (一部抜粋)
# less td-agent.conf buffer_chunk_limit 2m buffer_queue_limit 4096
↑の設定変更によりログが流れた。
※今回、master -> standby構成を組んだがmaster側でBuffer溢れが発生している状況においても、standby側に流れない状況
。
また、今回設定した送信部分のconf内容を抜粋して記載。
<match **> @id testid.match @type forest subtype copy <template> <store> @id secure_forward @type secure_forward self_hostname test123.co.jp shared_key testkey secure yes ca_cert_path /etc/td-agent/ca/ca-cert.pem keep_alive 300 num_threads 1 <server> host testbudder.co.jp port 24284 username test password ****** </server> flush_interval 0s try_flush_interval 0.1 queued_chunk_flush_interval 0.01 buffer_type file buffer_path /data/buffer/secure-forward_${tag}.* buffer_chunk_limit 2m buffer_queue_limit 6000000 # 2 * 6000000 = 120T retry_wait 5s # default: 1.0 # retry_limit 10 reload_connections true reload_on_failure true heartbeat_type tcp heartbeat_interval 10s phi_threshold 100 # default: 16 hard_timeout 200s # default: equals to send_timeout require_ack_response true send_timeout 60s # default 60s ack_response_timeout 61s # default 190s max_retry_wait 1h # default: inifinity disable_retry_limit true # DO NOT discard buffer anyway require_ack_response true # Qos 1 (At least once) flush_at_shutdown true </store> </template> </match>
上記設定でログ欠損発生せず稼働している。
ref