エラーメッセージ
AWS Glue ETLジョブが順調に動作していたのに、突然停止してしまいました。CloudWatchログを確認すると、以下の特定のエラーが発生しています。
Command failed with exit code 1. java.lang.OutOfMemoryError: Java heap space
このクラッシュは、SparkのドライバーまたはエグゼキューターのJVMメモリが不足したときに発生します。本質的には、処理しようとしているデータが、Glueワーカーノードで利用可能なRAMよりも物理的に大きいことが原因です。
Glueジョブでヒープスペースが不足する理由
Sparkのメモリ問題が偶然発生することは稀です。通常、データの構造やコードでの処理方法に起因します。主な原因は以下の通りです。
- データスキュー(Data Skew): 特定のパーティションが他のパーティションより10倍以上大きい場合です。これにより、他のワーカーがアイドル状態である一方で、単一のワーカーにすべての負荷が集中し、最終的にその過負荷なノードがクラッシュします。
- 大規模なブロードキャストジョイン: Sparkは「小さな」テーブルをすべてのワーカーに送信することで、ブロードキャストジョインを実行しようとします。そのテーブルが実際には500MBや1GBある場合、デフォルトの10MBというブロードキャストのしきい値をすぐに超え、ヒープを使い果たしてしまいます。
- スモールファイル問題: それぞれ10KBしかないファイルを50,000個読み込むような場合、Sparkドライバーはすべてのファイルのメタデータを追跡しようとしてパンクしてしまいます。
- メモリ負荷の高いオペレーション:
.collect()や.toPandas()を使用すると、データセット全体がドライバーノードのメモリに読み込まれます。データセットが5GBで、ドライバーのヒープスペースが4GBしかない場合、即座に失敗します。
ステップ1:ワーカータイプのスケールアップ
時には、より大きなリソースが必要なこともあります。G.1Xワーカー(16GB RAM)で失敗する場合は、G.2X(32GB RAM)にアップグレードするのがジョブを安定させる最短の方法です。これにより、シャッフルやジョインの際にSparkが利用できるメモリ領域が広がります。
AWSコンソールでこれを変更するには:
- Glueジョブの構成に移動します。
- **Job details(ジョブの詳細)タブの下にあるWorker type(ワーカータイプ)**を探します。
G.1XからG.2Xに切り替えます。- 緊急ではないジョブについては、**Flex execution(Flex実行)**を有効にすることを検討してください。これにより、大きなワーカーを使用しながらコストを最大34%削減できます。
ステップ2:データスキューとスモールファイルの修正
スケールアップしても解決しない場合は、データが不均等に分散している可能性があります。カーディナリティの高いカラムに基づいてデータを再パーティショニングすることで、Sparkに負荷を分散させることができます。
# データを200個の均等なパーティションに分散させる
df = df.repartition(200, "user_id")
S3のスモールファイル問題を解決するには、Glueの groupFiles 機能を使用します。これにより、Sparkが処理を開始する前に、小さなファイルを管理しやすい大きなチャンク(例:128MB)に結合します。
datasource = glueContext.create_dynamic_frame.from_catalog(
database = "sales_db",
table_name = "transactions",
additional_options = {"groupFiles": "inPartition", "groupSize": "134217728"} # 128MBのグループ
)
ステップ3:Sparkメモリの微調整
デフォルトのSpark設定が、すべてのETLパターンに最適であるとは限りません。Glueジョブにパラメータを追加することで、これらを上書きできます。これらの設定は、ドライバーが一度に多くの処理を行おうとするのを防ぐのに役立ちます。
--conf spark.sql.autoBroadcastJoinThreshold: 大きなルックアップテーブルがある場合、自動ブロードキャストを無効にするために-1に設定します。--conf spark.driver.maxResultSize: 最終的なデータ収集時にドライバーがクラッシュする場合は、これを4g以上に増やします。
これらをGlue構成の**Job parameters(ジョブパラメータ)**セクションに追加します:
Key: --conf
Value: spark.sql.autoBroadcastJoinThreshold=-1 --conf spark.driver.maxResultSize=4g
ステップ4:Glue Dynamic Frameの活用
ネイティブのSpark DataFrameは固定スキーマを必要とするため、ネストされたJSONや変動するフィールドを扱う際にメモリを大量に消費することがあります。Glueの Dynamic Frame は遅延評価(lazy evaluation)され、スキーマの変更をより柔軟に処理できます。複雑な計算のためにSpark DataFrameに変換する前の、最初の重い処理にはこれを使用してください。
# Spark SQLの代わりにGlueネイティブのFilter変換を使用する
dynamic_frame = glueContext.create_dynamic_frame.from_options(...)
filtered_frame = Filter.apply(frame = dynamic_frame, f = lambda x: x["status"] == "active")
検証
実際に修正されたかどうかをどのように確認すればよいでしょうか?以下の3つの項目をチェックしてください:
- CloudWatchメトリクス:
glue.driver.jvm.heap.usageを監視します。70〜80%以下に保たれていれば、ジョブは正常です。95%まで急上昇してそのまま推移している場合は、まだリスクがあります。 - Spark UI: Spark History Serverを開きます。他のタスクよりも大幅に時間がかかっている単一のタスクを探してください。それがあれば、依然としてデータスキューの問題があることを示しています。
- ログステータス: ジョブが
Succeededステータスで終了し、ログに "Exit Code 1" が出ていないことを確認します。
長期的な安定性のためのプロのヒント
- オートスケーリングを有効にする: ワーカー数の管理をGlueに任せます。メモリ負荷が高まるとノードを追加し、ジョブが終盤に差し掛かるとノードを削減します。
--enable-auto-scaling: true
- .collect() の使用をやめる: これはドライバーのOOM(メモリ不足)の最大の原因です。データを確認する必要がある場合は、データセット全体をドライバーに読み込むのではなく、
df.show(10)を使用してください。 - プッシュダウン述語(Pushdown Predicates): S3レベルでデータをフィルタリングし、不要な行をメモリにロードしないようにします。

