:>/dev/null

ラガードエンジニアの不撓不屈の精神

fluentdで送信recordに文字列を代入してから送信する

タイトル通り実装する要件があり、調査・設定行った時の勉強メモ。
fluentdを使用したログ集約環境で、送信データの特定フィールドの値をごにょごにょ加工したい(次工程に渡す)っていう要件があります。
調査の結果、fluentdのfilter_record_transformerプラグインで実装可能との事で設定してみた。

record_transformer Filter Plugin | Fluentd

  • 構築手順
    • td-agent-gem install fluentd -v 0.12.x
      • ※今回はVersion0.12.xを使用
      • ※fluentdに内包されているのでプラグインのインストールは必要ありません
    • out.conf(送信設定conf)作成
    • td-agent再起動(設定反映)

書き方は以下ようになります。ディレクティブの中にフィールド名とその値を書きます。
タイトルで代入と記載してますが、代入時に同名のフィールドがあればその値が代入項目にマップされる動作になっています。

<filter foo.bar>
  @type record_transformer
  <record>
    hostname "#{Socket.gethostname}"
    tag ${tag}
  </record>
</filter>

また、enable_rubyオプションを有効にした場合、Ruby式の評価結果を値にする事が可能です。 計算処理や文字列操作なんかで使えそうで便利です。
以下にサンプル処理を記載します。
特定recordに新規項目代入・含まれる特定の文字列削除(gsubメソッド使用)・更新等で使用可能です。

#  ### sample log scrubbing filters
#  #replace social security numbers
# <filter reform.**>
#   @type record_transformer
#   enable_ruby true
#   <record>
#     log ${record["log"].gsub(/[0-9]{3}-*[0-9]{2}-*[0-9]{4}/,"xxx-xx-xxxx")}
#   </record>
# </filter>
# # replace credit card numbers that appear in the logs
# <filter reform.**>
#   @type record_transformer
#   enable_ruby true
#   <record>
#      log ${record["log"].gsub(/[0-9]{4} *[0-9]{4} *[0-9]{4} *[0-9]{4}/,"xxxx xxxx xxxx xxxx")}
#   </record>
# </filter>
# # replace email addresses that appear in the logs
# <filter reform.**>
#   @type record_transformer
#   enable_ruby true
#   <record>
#     log ${record["log"].gsub(/[\w+\-]+@[a-z\d\-]+(\.[a-z\d\-]+)*\.[a-z]+/i,"user@email.tld")}
#   </record>
# </filter>
# # replace url addresses that appear in the logs
# <filter reform.**>
#   @type record_transformer
#   enable_ruby true
#   <record>
#     log ${record["log"].gsub(/\.sub\.test\.com|\.sub\.test\.co\.jp/,"")}}
#   </record>
# </filter>
# ### end sample log scrubbing filters

これでログ集約時、送信前に加工処理し送信可能です。

amazon linux2でのpython3環境構築

Lambdaをpython3で書く必要があり、まずは環境構築を行った時の勉強メモ。

  • 構築手順
    • python3 install
    • pip3 install
    • reqest package作成
    • boto3 package作成
    • Token取得
      • ※取得したTokenが正常か確認
・python3 install
$ sudo yum install python36 python36-virtualenv python36-pip
$ python3 -V
Python 3.6.5

・pip3 install
install python3-pip

・reqest package
pip3 install requests -t ./
rm -rf *.dist-info bin
pip3 install pipdeptree
sudo pip3 install pipdeptree
zip -r reqest.zip ./*

・boto3 package
pip install -t ./python boto3
zip -r boto3-1.9.67.zip python
cp boto3-1.9.67.zip /home/ec2-user/

・Token取得について
https://api.slack.com/apps/AE******/oauth? ->[OAuth&Permission] ->「Bot User OAuth Access Token」

Tokenが正常認証許可するか確認
https://slack.com/api/chat.postMessage?token=xoxb-xxxxxx-xxxxxx-xxxxxx&channel=test&text=%22Hello%22
↓結果
{"ok":true,"channel":"CERG265JT","ts":"1545287527.000100","message":{"type":"message","subtype":"bot_message","text":"\"Hello\"","ts":"1545287527.000100","username":"slack-test-func","bot_id":"BE9TMPEUQ"}}

まず、python3→Token取得まで確認した。
サーバレスアーキテクチャでLambdaを選択する場合に使えそう。

ref)
https://goodbyegangster.hatenablog.com/entry/2018/05/22/024513
https://qiita.com/SHASE03/items/16fd31d3698f207b42c9
https://qiita.com/hayao_k/items/b9750cc8fa69d0ce91b0
https://qiita.com/ykhirao/items/3b19ee6a1458cfb4ba21

aws cuiを使用してKinesis streamデータ登録・確認

Kinesisはライブラリとして使用する事が多い為、実際の挙動を把握せず使ってるので調査した時の勉強メモ。
実際の挙動を把握する事で障害対応等で効率的に行動出来るようにする。

■ストリーム作成

aws kinesis create-stream --stream-name <ストリーム名> --shard-count <シャード数>

■レコード登録

# "ShardId": "shardId-000000000000" と出力されればOK
aws kinesis put-record --stream-name <ストリーム名> --partition-key 123 --data <レコードに保存したい文字列>

■レコード確認

aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name <ストリーム名> --query 'ShardIterator'
{
    "ShardIterator": "AAAAAAAAAAE2R*******************=="
}

aws kinesis get-records --shard-iterator "AAAAAAAAAAE2R*******************=="
{
    "Records": [
        {
            "Data": "aG9nZWhvZ2U=",
            "PartitionKey": "123",
            "ApproximateArrivalTimestamp": 1546849899.19,
            "SequenceNumber": "49591791737946085028906975044094438871988285374419959810"
        },
        {
            "Data": "aG9nZTE=",
            "PartitionKey": "123",
            "ApproximateArrivalTimestamp": 1546851156.579,
            "SequenceNumber": "49591791737946085028906975051711880461380150184625831938"
        }
    ],
    "NextShardIterator": "AAAAAAAAAAFQkY****************==",
    "MillisBehindLatest": 0
}

■簡易確認の為、上記手順をスクリプト

$ cat chk-kinesis-record.sh
#!/bin/bash

KINESIS_NAME=<ストリーム名>

# 引数チェック
#echo "args = "$#
ARGS1=${1:?}
#echo $ARGS1

# レコード登録
aws kinesis put-record --stream-name $KINESIS_NAME --partition-key 123 --data $ARGS1

# shardIdからiterator値取得
SHARD_ITERATOR=$(aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name $KINESIS_NAME --query 'ShardIterator')

# レコード取得
aws kinesis get-records --shard-iterator $SHARD_ITERATOR

kinesis→レコード登録・確認

$ ./chk-kinesis-record.sh hoge1  
{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "49591791737946085028906975051711880461380150184625831938"
}
{
    "Records": [
        {
            "Data": "aG9nZWhvZ2U=",
            "PartitionKey": "123",
            "ApproximateArrivalTimestamp": 1546849899.19,
            "SequenceNumber": "49591791737946085028906975044094438871988285374419959810"
        },
        {
            "Data": "aG9nZTE=",
            "PartitionKey": "123",
            "ApproximateArrivalTimestamp": 1546851156.579,
            "SequenceNumber": "49591791737946085028906975051711880461380150184625831938"
        }
    ],
    "NextShardIterator": "AAAAAAAAAAFQkY****************==",
    "MillisBehindLatest": 0
}

指定データをKinesisBase64 エンコーディング後、Dataに文字列が格納される。
Base64 デコーディングする事により元データ参照可能。
Base64 エンコーディング・デコーディングサイト: Base64 Decode and Encode - Online

また、時刻に関してはtimestamp形式で格納される。
UNIX時間(UNIX時刻)から日付に変換サイト: UNIX時間⇒日付変換 - 高精度計算サイト

■ストリーム削除

aws kinesis delete-stream --stream-name <ストリーム名>

AWS WEBコンソールからKinesisのData Streamの挙動モニタリング
AWS Kinesis → Data Stream → ストリーム名 → モニタリングへ移動

  • 下記図を確認
    • GetRecords.Bytes
    • GetRecords.Records
    • PutRecord.Bytes
    • PutRecords.Records

図の中に青い丸(単一データ)、又は青い線(複数データ)でグラフ化されているば正常

  • ※ 上部赤い線は制限値(Maxの値)なので無視で問題ない

CloudWatchでCustomMetrix設定

表記の通りCloudWatchに監視項目を追加する必要が発生したので調査した時の勉強メモ。

  • CustomMetrix追加手順
    • aws configureがインストールされてない環境での手順
    • cloudwatchのsrcファイル(今回は「CloudWatch-1.0.20.0」)をインストール
    • カスタムメトリクス取得するファイル作成
    • 下記スクリプト内容を記述
    • cron設定にて定期監視
      • ※crontabにて設定した場合環境変数が読み込まれない為各実行コマンド位置までフルパスを指定する必要あり
    • 「cloudwatch→すべて→CustomMetrix→InstanceId→対象インスタンス」の手順にてグラフ化
    • 必要に応じてcloudwatch→ダッシュボード追加

■CustomMetrix監視項目

#!/bin/bash

FILTER="*testItem*"

export AWS_CLOUDWATCH_HOME=/usr/local/bin/CloudWatch-1.0.20.0
export AWS_CREDENTIAL_FILE=$AWS_CLOUDWATCH_HOME/credential
export PATH=$AWS_CLOUDWATCH_HOME/bin:$PATH
export EC2_REGION=ap-northeast-1
export JAVA_HOME=/usr/lib/jvm/jre

InstanceId=$(curl -s http://169.254.169.254/latest/meta-data/instance-id)

NodeCnt=`~/.local/bin/aws ec2 describe-instances --filters "Name=tag:Name,Values=$FILTER" "Name=instance-state-name,Values=running" --query "Reservations[].Instances[].[InstanceId]" --output text |wc -l`

echo $NodeCnt

/usr/local/bin/CloudWatch-1.0.20.0/bin/mon-put-data --metric-name "CustomMetrix Item" --namespace "CustomMetrix" --dimensions "InstanceId=$InstanceId" --value "$NodeCnt"

■crontab設定

$ crontab -l
*/5 * * * * /bin/sh /home/centos/chk-item 1> /dev/null

設定に問題無ければ、5分間隔でグラフ描写される。
監視項目(CustomMetrix)の追加が簡易に行えるのawsは便利。

terminal is not big enough

sshターミナル環境でtopコマンド発行時、terminal is not big enoughが表示され出力されないので調査した時の勉強メモ。

  • sshターミナルでtopコマンド発行
    • 発生内容:Sorry, terminal is not big enough
    • 対応:代替えコマンド発行(mpstat -P ALL)
[root@**** ~]# top
top - 12:29:03 up 67 days, 47 min,  1 user,  load average: 0.17, 0.15, 0.15
Tasks: 648 total,   1 running, 647 sleeping,   0 stopped,   0 zombie
Cpu(s):  1.7%us,  0.4%sy,  0.0%ni, 97.9%id,  0.0%wa,  0.0%hi,  0.1%si,  0.0%st
Mem:  132111780k total, 85066756k used, 47045024k free,   370724k buffers
Swap:   524284k total,   250892k used,   273392k free, 75117400k cached
 Sorry, terminal is not big enough
  PID USER      PR  NI  VIRT  RES  SHR S %CPU %MEM    TIME+  COMMAND
 3256 root      20   0 15.2g 4.2g 5212 S 50.2  3.3  16477:55 ruby
17742 root      20   0 15424 1564  832 R  3.9  0.0   0:00.02 top
    1 root      20   0 19340 1348 1124 S  0.0  0.0   0:01.96 init

↑でSorry, terminal is not big enough発生。

■代替えコマンド発行

[root@**** ~]# mpstat -P ALL
Linux 2.6.32-696.16.1.el6.x86_64 (www.test.co.jp)   12/11/2018  _x86_64_    (32 CPU)

12:24:03 PM  CPU    %usr   %nice    %sys %iowait    %irq   %soft  %steal  %guest   %idle
12:24:03 PM  all    1.65    0.00    0.39    0.00    0.00    0.07    0.00    0.00   97.88
12:24:03 PM    0    1.80    0.00    0.60    0.01    0.00    0.06    0.00    0.00   97.53
12:24:03 PM    1    1.71    0.00    0.31    0.00    0.00    0.00    0.00    0.00   97.97
...

CPUコア毎に確認出来た。
さまざまなアプローチ方法を知っているかがポイントになる。

aws環境でfluentd経由を用いたログ基盤構築

aws環境でログ基盤を構築する必要があり、周辺関連の知識がたりなさすぎたので調査した時の勉強メモ。

  • lamda関数
    • 処理フロー
    • クラアント(td-agent)→Kinesis firehose→lamdba→s3 f:id:oguguman:20181103205729p:plain
#
# lamdba
#

import boto3
import json
import base64

import time
import sys 
import pprint
from datetime import datetime

def lambda_handler(event, context):
    firehose = boto3.client('firehose')
    s3 = boto3.resource('s3')
    output = []
    
#
# firehose
# 

        import pprint; pprint.pprint('firehose------')
        import pprint; pprint.pprint(firehose)
        
        strjson = json.dumps('ccc')
        encodedata = base64.b64encode(strjson.encode())
        output_json = {
            'recordId': event['records'][num]['recordId'],
            'result': 'Ok',
            'data': event['records'][num]['data'],
        }
        output.append(output_json)
        
        import pprint; pprint.pprint(event)
        import pprint; pprint.pprint(event['records'])
        import pprint; pprint.pprint(event['records'][num]['recordId'])

#
# s3
#

        bucket = 'testfirehosetokyo'
        key = 'test/test_' + datetime.now().strftime('%Y-%m-%d-%H-%M-%S') + '.txt'
        file_contents = str(output)
    
        time_info = []
        stime = time.time()
        s3_obj = s3.Object(bucket, key)
        result = s3_obj.put( Body=file_contents )
        etime = time.time()
        time_info.append(etime - stime)


    return {
        'records': output
    }      
  • GlueでETL処理
    • 処理フロー
    • s3→Glueで既存項目にColumn追加→s3(指定パスへ出力)
      f:id:oguguman:20181104132207p:plain
#
# Glue
#

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import *
from pyspark.sql.functions import lit

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

# 引数からパスを生成
basepath = "s3://testfirehosetokyo/sample_glue_for_result"
s3path = basepath + "/test"

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @args: [database = "sample_glue_db", table_name = "10", transformation_ctx = "datasource0"]
## @return: datasource0
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "sample_glue_db", table_name = "10", transformation_ctx = "datasource0")
## @type: ApplyMapping
## @args: [mapping = [("time", "string", "time", "string"), ("message", "string", "message", "string"), ("worker", "int", "worker", "int")], transformation_ctx = "applymapping1"]
## @return: applymapping1
## @inputs: [frame = datasource0]
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("time", "string", "time", "string"), ("message", "string", "message", "string"), ("worker", "int", "worker", "int"),("time", "string", "testcol", "string")], transformation_ctx = "applymapping1")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://testfirehosetokyo/sample_glue_for_result"}, format = "json", transformation_ctx = "datasink2"]
## @return: datasink2
## @inputs: [frame = applymapping1]
#datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://testfirehosetokyo/sample_glue_for_result"}, format = "json", transformation_ctx = "datasink2")

df = applymapping1.toDF()
#Column追加
df = df.withColumn('pint2', lit(123))
dyf = DynamicFrame.fromDF(df, glueContext, 'dyf')

# DynamicFrameをJSON形式でS3に書き出す 
glueContext.write_dynamic_frame.from_options( 
    frame=dyf, 
    connection_type='s3', 
    connection_options={'path':s3path}, 
    format='json')
  • ログ送信側クラアント設定
    • 処理フロー
    • クラアント(td-agent)→Kinesis firehose
      f:id:oguguman:20181104132054p:plain
#
# td-agent
#

yum install redhat-lsb-core
yum install aws-kinesis-agent
td-agent-gem install fluent-plugin-kinesis

※1x系(version指定)
td-agent-gem install fluent-plugin-kinesis --no-document -V -v 1.3.0


[root@vm**** td-agent]# cat teststream.conf
<source>
  type tail
  path /tmp/testlog.log
  pos_file /tmp/testlog.pos
  format none
  message_key log_line
  tag teststream.test.test
</source>

<match teststream.**>
  type kinesis_firehose

  region ap-northeast-1
  delivery_stream_name teststream
  flush_interval 1s
  include_time_key true
  include_tag_key true
  buffer_chunk_limit 1m
  try_flush_interval 0.1
  queued_chunk_flush_interval 0.01
  num_threads 1
</match>

まずは触ってみたが関連知識が不足しているのが実感できた。。
1歩ずつでも触ってアウトプットして行こう。

td-agentでログ欠損が発生した

fluent(td-agent)で、Buffer溢れにによるログ欠損が発生したのでその時の調査・対応を下記へ記載しておく。

・対応フロー

  • コンソールからtd-agentのログ状況確認
  • Bufferの蓄積状況確認
  • td-agent.conf確認
  • td-agent.conf設定変更
  • td-agnetサービス再起動

以下、設定環境:td-agent version0.12

□ Kibana上でログ欠損が発生

・コンソールからtd-agentのログ状況確認

# less /var/log/td-agent/td-agent.log
2018-09-11 03:28:43 +0900 [warn]: emit transaction failed: error_class=Fluent::BufferQueueLimitError error="queue size exceeds limit" tag="www.nginx.access-json"
  2018-09-11 03:28:43 +0900 [warn]: /opt/td-agent/embedded/lib/ruby/gems/2.1.0/gems/fluentd-0.12.40/lib/fluent/buffer.rb:204:in `block in emit'

queue size exceeds limitが発生してる。

・Bufferの蓄積状況確認

# cd {buffer_dir}
# ll -h /data/buffer/secure-forward.* |wc -l
2054

・td-agent.conf設定確認 (一部抜粋)

# less td-agent.conf
        buffer_chunk_limit  2m
        buffer_queue_limit  2048

・td-agent.conf設定変更 (一部抜粋)

# less td-agent.conf
        buffer_chunk_limit  2m
        buffer_queue_limit  4096

↑の設定変更によりログが流れた。
※今回、master -> standby構成を組んだがmaster側でBuffer溢れが発生している状況においても、standby側に流れない状況

また、今回設定した送信部分のconf内容を抜粋して記載。

  <match **>
    @id testid.match
    @type forest
    subtype copy
    <template>
      <store>
        @id secure_forward
        @type secure_forward
        self_hostname test123.co.jp
        shared_key testkey
        secure yes
        ca_cert_path /etc/td-agent/ca/ca-cert.pem
        keep_alive 300
        num_threads 1
        <server>
          host testbudder.co.jp
          port 24284
          username test
          password ******
        </server>
        flush_interval 0s
        try_flush_interval  0.1
        queued_chunk_flush_interval 0.01
        buffer_type file
        buffer_path /data/buffer/secure-forward_${tag}.*
        buffer_chunk_limit  2m
        buffer_queue_limit  6000000 # 2 * 6000000 = 120T
        retry_wait 5s # default: 1.0
#        retry_limit 10
        reload_connections true
        reload_on_failure true
        heartbeat_type tcp
        heartbeat_interval 10s

        phi_threshold 100 # default: 16
        hard_timeout 200s # default: equals to send_timeout
        require_ack_response true
        send_timeout 60s # default 60s
        ack_response_timeout 61s # default 190s

        max_retry_wait 1h # default: inifinity
        disable_retry_limit true # DO NOT discard buffer anyway
        require_ack_response true # Qos 1 (At least once)
        flush_at_shutdown true
      </store>
    </template>
  </match>

上記設定でログ欠損発生せず稼働している。

ref

fluentd 1.0 でログの欠損を防ぐ · the world as code