本記事では、AWS Glueを使ってCloudFrontのアクセスログを分析用に最適化する方法について解説します。Glueは、データの変換をサーバレスで行うためのサービスであり、PySparkをGlue上で実行することでアクセスログを最適化します。本記事では、PySparkのコード全体も記載しているため、コードをコピーして利用できます。
CloudFrontのアクセスログはそのまま分析しようとすると、次の課題があるため分析パフォーマンスとコスト効率が良くないことを第1回でお伝えしました。第2回の本記事では、AWS Glueを使ってこの課題を解決するための最適化を行います。
- パーティショニングされていない
- 全てのカラムが読み取られる
AWS Glueの概要
まず初めに、AWS Glueについて簡単に紹介します。
AWS Glueは、データ統合のための包括的なサーバーレスサービスで、データカタログ機能とETLジョブの実行が主な機能です。
- データカタログ機能
-
Glue Data Catalogは、様々なデータソースのメタデータを一元管理する永続的なメタデータストアです。AWS全体のデータを一元的にカタログ化し、検索・管理できるようになります。メタデータには、テーブル名、列名、データ型、パーティションキーなどのスキーマ情報と、データの実際の保存場所(S3のパスなど)への参照が含まれています。
- ETLジョブの実行機能
-
Glue Jobは、サーバーレスのApache Sparkエンジン上でETL(抽出、変換、ロード)ジョブを実行する機能です。Glue Jobでは、Python3(PySpark)でコードを記述し、Apache Sparkの処理を実行します。Glue Jobは、GUIで視覚的にデータ変換のワークフローを定義し、実行する機能も備えています。一般的なETL処理であれば、PySparkに習熟していなくてもGUIで使えることが魅力の一つです。
今回は後者のETL機能を使います。今回のケースでは、S3バケットからExtract(抽出)したCloudFrontのアクセスログ(CSV)をParquetフォーマットにTransform(変換)し、別のS3バケットにLoad(ロード)する処理をGlueで行います。
前者のデータカタログ機能は、第3回で使用します。 Amazon AthenaでS3のデータにアクセスする際に、Glue Data Catalogに保存されているメタデータを利用します。
CloudFrontのアクセスログのフォーマット
次に、CloudFrontのアクセスログのフォーマットを見ていきます。CloudFrontのアクセスログは、区切り文字がタブ(\t)のCSVです。一般的なCSVと異なるのはヘッダーが2行ある点です。このヘッダーが2行あるところが非常に厄介です。 Glueは一般的なETLの処理であれば、GUIでコードを作成できますが、2行あるヘッダーに対処するため、今回の用途ではGUIは使えず、PySparkのコードを書く必要があります。
#Version: 1.0
#Fields: date time x-edge-location sc-bytes c-ip cs-method cs(Host) cs-uri-stem sc-status cs(Referer) cs(User-Agent) cs-uri-query cs(Cookie) x-edge-result-type x-edge-request-id x-host-header cs-protocol cs-bytes time-taken x-forwarded-for ssl-protocol ssl-cipher x-edge-response-result-type cs-protocol-version fle-status fle-encrypted-fields c-port time-to-first-byte x-edge-detailed-result-type sc-content-type sc-content-len sc-range-start sc-range-end
2019-12-04 21:02:31 LAX1 392 192.0.2.100 GET d111111abcdef8.cloudfront.net /index.html 200 - Mozilla/5.0%20(Windows%20NT%2010.0;%20Win64;%20x64)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20Chrome/78.0.3904.108%20Safari/537.36 - - Hit SOX4xwn4XV6Q4rgb7XiVGOHms_BGlTAC4KyHmureZmBNrjGdRLiNIQ== d111111abcdef8.cloudfront.net https 23 0.001 - TLSv1.2 ECDHE-RSA-AES128-GCM-SHA256 Hit HTTP/2.0 - - 11040 0.001 Hit text/html 78 - -
2019-12-04 21:02:31 LAX1 392 192.0.2.100 GET d111111abcdef8.cloudfront.net /index.html 200 - Mozilla/5.0%20(Windows%20NT%2010.0;%20Win64;%20x64)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20Chrome/78.0.3904.108%20Safari/537.36 - - Hit k6WGMNkEzR5BEM_SaF47gjtX9zBDO2m349OY2an0QPEaUum1ZOLrow== d111111abcdef8.cloudfront.net https 23 0.000 - TLSv1.2 ECDHE-RSA-AES128-GCM-SHA256 Hit HTTP/2.0 - - 11040 0.000 Hit text/html 78 - -
2019-12-04 21:02:31 LAX1 392 192.0.2.100 GET d111111abcdef8.cloudfront.net /index.html 200 - Mozilla/5.0%20(Windows%20NT%2010.0;%20Win64;%20x64)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20Chrome/78.0.3904.108%20Safari/537.36 - - Hit f37nTMVvnKvV2ZSvEsivup_c2kZ7VXzYdjC-GUQZ5qNs-89BlWazbw== d111111abcdef8.cloudfront.net https 23 0.001 - TLSv1.2 ECDHE-RSA-AES128-GCM-SHA256 Hit HTTP/2.0 - - 11040 0.001 Hit text/html 78 - -
2019-12-13 22:36:27 SEA19-C1 900 192.0.2.200 GET d111111abcdef8.cloudfront.net /favicon.ico 502 http://www.example.com/ Mozilla/5.0%20(Windows%20NT%2010.0;%20Win64;%20x64)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20Chrome/78.0.3904.108%20Safari/537.36 - - Error 1pkpNfBQ39sYMnjjUQjmH2w1wdJnbHYTbag21o_3OfcQgPzdL2RSSQ== www.example.com http 675 0.102 - - - Error HTTP/1.1 - - 25260 0.102 OriginDnsError text/html 507 - -
2019-12-13 22:36:26 SEA19-C1 900 192.0.2.200 GET d111111abcdef8.cloudfront.net / 502 - Mozilla/5.0%20(Windows%20NT%2010.0;%20Win64;%20x64)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20Chrome/78.0.3904.108%20Safari/537.36 - - Error 3AqrZGCnF_g0-5KOvfA7c9XLcf4YGvMFSeFdIetR1N_2y8jSis8Zxg== www.example.com http 735 0.107 - - - Error HTTP/1.1 - - 3802 0.107 OriginDnsError text/html 507 - -
2019-12-13 22:37:02 SEA19-C2 900 192.0.2.200 GET d111111abcdef8.cloudfront.net / 502 - curl/7.55.1 - - Error kBkDzGnceVtWHqSCqBUqtA_cEs2T3tFUBbnBNkB9El_uVRhHgcZfcw== www.example.com http 387 0.103 - - - Error HTTP/1.1 - - 12644 0.103 OriginDnsError text/html 507 - -
アクセスログの各フィールドの意味はAWSの公式ドキュメントを参照してください。
GlueでCloudFrontのアクセスログをETLする
それでは、Glueで動かすPySparkのコードを見ていきましょう。
PySparkのコード
下記がコード全体です。コードブロックはコメントで解説しています。
import os
import sys
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.transforms import *
from awsglue.transforms import Filter
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)
# CloudFrontのアクセスログが置いてあるS3を指定
source_bucket = "source_backet_name"
s3_path = f"s3://{source_bucket}/"
# 変換後のアクセスログを置くS3を指定
destination_bucket = "destination_bucket_name"
# S3にあるCloudFrontのアクセスログを読み込む。
dyf = glueContext.create_dynamic_frame.from_options(
"s3",
{
"paths": [s3_path],
"compressionType": "gzip",
"groupFiles": "inPartition",
"groupSize": "1048576",
},
format="csv",
format_options={
"withHeader": False,
"separator": "\t",
},
transformation_ctx="glue_s3_input",
)
# コメント行(#で始まる)を除外するためのフィルター
def filter_metadata(rec):
return rec["col0"] and not rec["col0"].startswith("#")
# フィルターを適用し、各ファイルの最初の2行を削除する
filtered_dyf = Filter.apply(frame=dyf, f=filter_metadata, transformation_ctx="filter")
# 各カラムに名前を付ける
dyf = ApplyMapping.apply(
frame=filtered_dyf,
mappings=[
("col0", "string", "date", "date"),
("col1", "string", "time", "string"),
("col2", "string", "x_edge_location", "string"),
("col3", "string", "sc_bytes", "bigint"),
("col4", "string", "c_ip", "string"),
("col5", "string", "cs_method", "string"),
("col6", "string", "cs_host", "string"),
("col7", "string", "cs_uri_stem", "string"),
("col8", "string", "sc_status", "int"),
("col9", "string", "cs_referrer", "string"),
("col10", "string", "cs_user_agent", "string"),
("col11", "string", "cs_uri_query", "string"),
("col12", "string", "cs_cookie", "string"),
("col13", "string", "x_edge_result_type", "string"),
("col14", "string", "x_edge_request_id", "string"),
("col15", "string", "x_host_header", "string"),
("col16", "string", "cs_protocol", "string"),
("col17", "string", "cs_bytes", "bigint"),
("col18", "string", "time_taken", "float"),
("col19", "string", "x_forwarded_for", "string"),
("col20", "string", "ssl_protocol", "string"),
("col21", "string", "ssl_cipher", "string"),
("col22", "string", "x_edge_response_result_type", "string"),
("col23", "string", "cs_protocol_version", "string"),
("col24", "string", "fle_status", "string"),
("col25", "string", "fle_encrypted_fields", "int"),
("col26", "string", "c_port", "int"),
("col27", "string", "time_to_first_byte", "float"),
("col28", "string", "x_edge_detailed_result_type", "string"),
("col29", "string", "sc_content_type", "string"),
("col30", "string", "sc_content_len", "bigint"),
("col31", "string", "sc_range_start", "bigint"),
("col32", "string", "sc_range_end", "bigint"),
],
transformation_ctx="apply_mapping",
)
# S3にParquet化してdateをパーティションキーにして書き込む
s3 = glueContext.write_dynamic_frame.from_options(
frame=dyf,
connection_type="s3",
format="glueparquet",
connection_options={
"path": f"s3://{destination_bucket}/",
"partitionKeys": ["date"],
},
format_options={"compression": "snappy"},
transformation_ctx="s3_output",
)
# 変換前のアクセスログを削除する
glueContext.purge_s3_path(
s3_path, {"retentionPeriod": 72}, transformation_ctx="purge_s3_path"
)
job.commit()
Glueのジョブブックマーク機能を使う
ここでもう一工夫します。先ほどのコード全体の最後に次のようなコードがありました。
# 変換前のアクセスログを削除する
glueContext.purge_s3_path(
s3_path, {"retentionPeriod": 72}, transformation_ctx="purge_s3_path"
)
このコードは、変換前のCloudFrontのアクセスログを削除します。元のアクセスログにもS3の料金がかかりつづけるため、コスト削減のために削除しています。
このうち、retentionPeriod
に注目します。retentionPeriod
を72にしているため、元のログファイルは3日間保持されます。
https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-purge_s3_path
retentionPeriod
には、ファイルを保持する期間を時間単位で指定します。保存期間より新しいファイルは保持されます。デフォルトでは 168 時間(7 日)に設定されています。
この状態でGlueを通常通り実行すると、3日間は同じファイルが変換されることになり、変換後のファイルに重複が発生する上に無駄なコストが発生します。
これを防ぐためにGlueのジョブブックマーク機能を利用します。
AWS Glueのジョブブックマーク機能は、ETLジョブにおいて既に処理したデータを追跡し、再処理を防ぐための仕組みです。主な特徴は以下の通りです。
- 前回のジョブ実行以降に追加された新しいデータのみを処理できます
- S3の場合は、新しく追加されたオブジェクトやパーティションだけを処理できます
このジョブブックマークを正しく機能させるためには、以下の条件を満たす必要があります。上述したコード全体を改めて見て頂くとtransformation_ctx
が各メソッドで指定されていることが分かります。
- Glue Jobの設定でジョブのブックマークオプションが有効になっている
- 各処理ステップに
transformation_ctx
パラメータが指定されている job.init()
とjob.commit()
が呼び出されている
まとめ
本記事では、AWS Glueを使ってCloudFrontのアクセスログをETL処理し、Parquet化とパーティショニングを実現しました。Parquet化とパーティショニングにより、Amazon Athenaで必要な範囲のログだけスキャンできるようになるため、分析のパフォーマンスとコストが削減できます。
次回は、今回変換した効率の良いアクセスログに対して、Amazon Athenaで分析クエリを投げる方法を解説します。
シリーズの一覧は以下の通りです。
- 第1回: Amazon CloudFrontのログを分析するメリットとその課題
- 第2回: AWS GlueでParquet化とパーティショニングを行う
- 第3回: Amazon Athenaで最適化されたログを分析する
- 第4回: Amazon QuickSightでログを可視化する