CloudFrontのアクセスログを分析用に最適化して可視化する – 第2回: AWS GlueでParquet化とパーティショニングを行う

本記事では、AWS Glueを使ってCloudFrontのアクセスログを分析用に最適化する方法について解説します。Glueは、データの変換をサーバレスで行うためのサービスであり、PySparkをGlue上で実行することでアクセスログを最適化します。本記事では、PySparkのコード全体も記載しているため、コードをコピーして利用できます。

CloudFrontのアクセスログはそのまま分析しようとすると、次の課題があるため分析パフォーマンスとコスト効率が良くないことを第1回でお伝えしました。第2回の本記事では、AWS Glueを使ってこの課題を解決するための最適化を行います。

  • パーティショニングされていない
  • 全てのカラムが読み取られる
目次

AWS Glueの概要

まず初めに、AWS Glueについて簡単に紹介します。

AWS Glueは、データ統合のための包括的なサーバーレスサービスで、データカタログ機能とETLジョブの実行が主な機能です。

データカタログ機能

Glue Data Catalogは、様々なデータソースのメタデータを一元管理する永続的なメタデータストアです。AWS全体のデータを一元的にカタログ化し、検索・管理できるようになります。メタデータには、テーブル名、列名、データ型、パーティションキーなどのスキーマ情報と、データの実際の保存場所(S3のパスなど)への参照が含まれています。

ETLジョブの実行機能

Glue Jobは、サーバーレスのApache Sparkエンジン上でETL(抽出、変換、ロード)ジョブを実行する機能です。Glue Jobでは、Python3(PySpark)でコードを記述し、Apache Sparkの処理を実行します。Glue Jobは、GUIで視覚的にデータ変換のワークフローを定義し、実行する機能も備えています。一般的なETL処理であれば、PySparkに習熟していなくてもGUIで使えることが魅力の一つです。

今回は後者のETL機能を使います。今回のケースでは、S3バケットからExtract(抽出)したCloudFrontのアクセスログ(CSV)をParquetフォーマットにTransform(変換)し、別のS3バケットにLoad(ロード)する処理をGlueで行います。

前者のデータカタログ機能は、第3回で使用します。 Amazon AthenaでS3のデータにアクセスする際に、Glue Data Catalogに保存されているメタデータを利用します。

CloudFrontのアクセスログのフォーマット

次に、CloudFrontのアクセスログのフォーマットを見ていきます。CloudFrontのアクセスログは、区切り文字がタブ(\t)のCSVです。一般的なCSVと異なるのはヘッダーが2行ある点です。このヘッダーが2行あるところが非常に厄介です。 Glueは一般的なETLの処理であれば、GUIでコードを作成できますが、2行あるヘッダーに対処するため、今回の用途ではGUIは使えず、PySparkのコードを書く必要があります。

#Version: 1.0
#Fields: date time x-edge-location sc-bytes c-ip cs-method cs(Host) cs-uri-stem sc-status cs(Referer) cs(User-Agent) cs-uri-query cs(Cookie) x-edge-result-type x-edge-request-id x-host-header cs-protocol cs-bytes time-taken x-forwarded-for ssl-protocol ssl-cipher x-edge-response-result-type cs-protocol-version fle-status fle-encrypted-fields c-port time-to-first-byte x-edge-detailed-result-type sc-content-type sc-content-len sc-range-start sc-range-end
2019-12-04	21:02:31	LAX1	392	192.0.2.100	GET	d111111abcdef8.cloudfront.net	/index.html	200	-	Mozilla/5.0%20(Windows%20NT%2010.0;%20Win64;%20x64)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20Chrome/78.0.3904.108%20Safari/537.36	-	-	Hit	SOX4xwn4XV6Q4rgb7XiVGOHms_BGlTAC4KyHmureZmBNrjGdRLiNIQ==	d111111abcdef8.cloudfront.net	https	23	0.001	-	TLSv1.2	ECDHE-RSA-AES128-GCM-SHA256	Hit	HTTP/2.0	-	-	11040	0.001	Hit	text/html	78	-	-
2019-12-04	21:02:31	LAX1	392	192.0.2.100	GET	d111111abcdef8.cloudfront.net	/index.html	200	-	Mozilla/5.0%20(Windows%20NT%2010.0;%20Win64;%20x64)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20Chrome/78.0.3904.108%20Safari/537.36	-	-	Hit	k6WGMNkEzR5BEM_SaF47gjtX9zBDO2m349OY2an0QPEaUum1ZOLrow==	d111111abcdef8.cloudfront.net	https	23	0.000	-	TLSv1.2	ECDHE-RSA-AES128-GCM-SHA256	Hit	HTTP/2.0	-	-	11040	0.000	Hit	text/html	78	-	-
2019-12-04	21:02:31	LAX1	392	192.0.2.100	GET	d111111abcdef8.cloudfront.net	/index.html	200	-	Mozilla/5.0%20(Windows%20NT%2010.0;%20Win64;%20x64)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20Chrome/78.0.3904.108%20Safari/537.36	-	-	Hit	f37nTMVvnKvV2ZSvEsivup_c2kZ7VXzYdjC-GUQZ5qNs-89BlWazbw==	d111111abcdef8.cloudfront.net	https	23	0.001	-	TLSv1.2	ECDHE-RSA-AES128-GCM-SHA256	Hit	HTTP/2.0	-	-	11040	0.001	Hit	text/html	78	-	-	
2019-12-13	22:36:27	SEA19-C1	900	192.0.2.200	GET	d111111abcdef8.cloudfront.net	/favicon.ico	502	http://www.example.com/	Mozilla/5.0%20(Windows%20NT%2010.0;%20Win64;%20x64)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20Chrome/78.0.3904.108%20Safari/537.36	-	-	Error	1pkpNfBQ39sYMnjjUQjmH2w1wdJnbHYTbag21o_3OfcQgPzdL2RSSQ==	www.example.com	http	675	0.102	-	-	-	Error	HTTP/1.1	-	-	25260	0.102	OriginDnsError	text/html	507	-	-
2019-12-13	22:36:26	SEA19-C1	900	192.0.2.200	GET	d111111abcdef8.cloudfront.net	/	502	-	Mozilla/5.0%20(Windows%20NT%2010.0;%20Win64;%20x64)%20AppleWebKit/537.36%20(KHTML,%20like%20Gecko)%20Chrome/78.0.3904.108%20Safari/537.36	-	-	Error	3AqrZGCnF_g0-5KOvfA7c9XLcf4YGvMFSeFdIetR1N_2y8jSis8Zxg==	www.example.com	http	735	0.107	-	-	-	Error	HTTP/1.1	-	-	3802	0.107	OriginDnsError	text/html	507	-	-
2019-12-13	22:37:02	SEA19-C2	900	192.0.2.200	GET	d111111abcdef8.cloudfront.net	/	502	-	curl/7.55.1	-	-	Error	kBkDzGnceVtWHqSCqBUqtA_cEs2T3tFUBbnBNkB9El_uVRhHgcZfcw==	www.example.com	http	387	0.103	-	-	-	Error	HTTP/1.1	-	-	12644	0.103	OriginDnsError	text/html	507	-	-

アクセスログの各フィールドの意味はAWSの公式ドキュメントを参照してください。

GlueでCloudFrontのアクセスログをETLする

それでは、Glueで動かすPySparkのコードを見ていきましょう。

PySparkのコード

下記がコード全体です。コードブロックはコメントで解説しています。

import os
import sys

from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.transforms import *
from awsglue.transforms import Filter
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# CloudFrontのアクセスログが置いてあるS3を指定
source_bucket = "source_backet_name"
s3_path = f"s3://{source_bucket}/"

# 変換後のアクセスログを置くS3を指定
destination_bucket = "destination_bucket_name" 

# S3にあるCloudFrontのアクセスログを読み込む。
dyf = glueContext.create_dynamic_frame.from_options(
    "s3",
    {
        "paths": [s3_path],
        "compressionType": "gzip",
        "groupFiles": "inPartition",
        "groupSize": "1048576",
    },
    format="csv",
    format_options={
        "withHeader": False,
        "separator": "\t",
    },
    transformation_ctx="glue_s3_input",
)

# コメント行(#で始まる)を除外するためのフィルター
def filter_metadata(rec):
    return rec["col0"] and not rec["col0"].startswith("#")

# フィルターを適用し、各ファイルの最初の2行を削除する
filtered_dyf = Filter.apply(frame=dyf, f=filter_metadata, transformation_ctx="filter")

# 各カラムに名前を付ける
dyf = ApplyMapping.apply(
    frame=filtered_dyf,
    mappings=[
        ("col0", "string", "date", "date"),
        ("col1", "string", "time", "string"),
        ("col2", "string", "x_edge_location", "string"),
        ("col3", "string", "sc_bytes", "bigint"),
        ("col4", "string", "c_ip", "string"),
        ("col5", "string", "cs_method", "string"),
        ("col6", "string", "cs_host", "string"),
        ("col7", "string", "cs_uri_stem", "string"),
        ("col8", "string", "sc_status", "int"),
        ("col9", "string", "cs_referrer", "string"),
        ("col10", "string", "cs_user_agent", "string"),
        ("col11", "string", "cs_uri_query", "string"),
        ("col12", "string", "cs_cookie", "string"),
        ("col13", "string", "x_edge_result_type", "string"),
        ("col14", "string", "x_edge_request_id", "string"),
        ("col15", "string", "x_host_header", "string"),
        ("col16", "string", "cs_protocol", "string"),
        ("col17", "string", "cs_bytes", "bigint"),
        ("col18", "string", "time_taken", "float"),
        ("col19", "string", "x_forwarded_for", "string"),
        ("col20", "string", "ssl_protocol", "string"),
        ("col21", "string", "ssl_cipher", "string"),
        ("col22", "string", "x_edge_response_result_type", "string"),
        ("col23", "string", "cs_protocol_version", "string"),
        ("col24", "string", "fle_status", "string"),
        ("col25", "string", "fle_encrypted_fields", "int"),
        ("col26", "string", "c_port", "int"),
        ("col27", "string", "time_to_first_byte", "float"),
        ("col28", "string", "x_edge_detailed_result_type", "string"),
        ("col29", "string", "sc_content_type", "string"),
        ("col30", "string", "sc_content_len", "bigint"),
        ("col31", "string", "sc_range_start", "bigint"),
        ("col32", "string", "sc_range_end", "bigint"),
    ],
    transformation_ctx="apply_mapping",
)

# S3にParquet化してdateをパーティションキーにして書き込む
s3 = glueContext.write_dynamic_frame.from_options(
    frame=dyf,
    connection_type="s3",
    format="glueparquet",
    connection_options={
        "path": f"s3://{destination_bucket}/",
        "partitionKeys": ["date"],
    },
    format_options={"compression": "snappy"},
    transformation_ctx="s3_output",
)

# 変換前のアクセスログを削除する
glueContext.purge_s3_path(
    s3_path, {"retentionPeriod": 72}, transformation_ctx="purge_s3_path"
)

job.commit()

Glueのジョブブックマーク機能を使う

ここでもう一工夫します。先ほどのコード全体の最後に次のようなコードがありました。

# 変換前のアクセスログを削除する
glueContext.purge_s3_path(
    s3_path, {"retentionPeriod": 72}, transformation_ctx="purge_s3_path"
)

このコードは、変換前のCloudFrontのアクセスログを削除します。元のアクセスログにもS3の料金がかかりつづけるため、コスト削減のために削除しています。

このうち、retentionPeriodに注目します。retentionPeriodを72にしているため、元のログファイルは3日間保持されます。

retentionPeriodには、ファイルを保持する期間を時間単位で指定します。保存期間より新しいファイルは保持されます。デフォルトでは 168 時間(7 日)に設定されています。

https://docs.aws.amazon.com/ja_jp/glue/latest/dg/aws-glue-api-crawler-pyspark-extensions-glue-context.html#aws-glue-api-crawler-pyspark-extensions-glue-context-purge_s3_path

この状態でGlueを通常通り実行すると、3日間は同じファイルが変換されることになり、変換後のファイルに重複が発生する上に無駄なコストが発生します。

これを防ぐためにGlueのジョブブックマーク機能を利用します。

AWS Glueのジョブブックマーク機能は、ETLジョブにおいて既に処理したデータを追跡し、再処理を防ぐための仕組みです。主な特徴は以下の通りです。

  • 前回のジョブ実行以降に追加された新しいデータのみを処理できます
  • S3の場合は、新しく追加されたオブジェクトやパーティションだけを処理できます

このジョブブックマークを正しく機能させるためには、以下の条件を満たす必要があります。上述したコード全体を改めて見て頂くとtransformation_ctxが各メソッドで指定されていることが分かります。

  • Glue Jobの設定でジョブのブックマークオプションが有効になっている
  • 各処理ステップにtransformation_ctxパラメータが指定されている
  • job.init()job.commit()が呼び出されている

まとめ

本記事では、AWS Glueを使ってCloudFrontのアクセスログをETL処理し、Parquet化とパーティショニングを実現しました。Parquet化とパーティショニングにより、Amazon Athenaで必要な範囲のログだけスキャンできるようになるため、分析のパフォーマンスとコストが削減できます。

次回は、今回変換した効率の良いアクセスログに対して、Amazon Athenaで分析クエリを投げる方法を解説します。

シリーズの一覧は以下の通りです。

よかったらシェアしてね!

CD

目次