adrien-converse-kCrrUx7US04-unsplash.jpg

AWS Glue色々

 
0
このエントリーをはてなブックマークに追加
Kazuki Moriyama
Kazuki Moriyama (森山 和樹)

GUI 操作

テーブルの分類の変更方法

テーブルページで編集したいテーブルを選択したあと、 テーブルの編集ボタンを押す。
モーダルが出てくるので、 テーブルのプロパティ 項目の classification を json に変更する.

ジョブの登録方法

  1. ジョブで走らせたいスクリプトを作成する
    1. GlueApp というオブジェクトの中に走らせたいコードをすべて配置する
  2. s3 に走らせたいスクリプトファイルを配置する
  3. Glue 内のジョブページで新規作成する
    1. ほとんどデフォルト
    2. Role は GlueJobRole に設定
    3. Glue version は Spark 2.4, Scala2(Glue Version 1.0)
    4. このジョブ実行はユーザ提供の既存のスクリプトを選択
    5. クラス名を GlueApp に設定
    6. スクリプトの S3 パスをさっきのスクリプトのパスに設定
  4. 保存を連打して完了
    1. 完了したらスクリプト編集画面が出てくる

オンデマンドトリガーの実行方法

Glue のトリガータブで作成したオンデマンドトリガーを選択し、アクションからトリガーの実行を選択する。 トリガーによって実行されたかどうかはジョブタブでお目当てのジョブを選択すると下にログが表示される。

ジョブパラメータの注入方法

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
args = getResolvedOptions(sys.argv, ['JOB_NAME','VAL1','VAL2','VAL3','DEST_FOLDER'])
job.init(args['JOB_NAME'], args)

v_list=[{"VAL1":args['VAL1'],"VAL2":args['VAL2'],"VAL3":args['VAL3']}]

df=sc.parallelize(v_list).toDF()
df.repartition(1).write.mode('overwrite').format('csv').options(header=True, delimiter = ';').save("s3://"+ args['DEST_FOLDER'] +"/")

job.commit()

amazon web services - AWS Glue Job Input Parameters - Stack Overflow

コードを書くときの注意点

glue の 1 カラムに複数の型が入っていたときの対応

1 カラムに複数型が入ると choice 型になってネストした辞書っぽくなる。
DynamicFrame#resolveChoice メソッドを使えば解決できる。
aws-samples/aws-glue-samples
以下は glue のテーブルのスキーマにカラムスキーマをあわせる例。

import com.amazonaws.services.glue.ChoiceOption
val resolvedDf = logs.resolveChoice(choiceOption = Some(ChoiceOption("match_catalog")),
database = Some("test"),
tableName = Some("logs")).toDF()

AWS Glue で新しく Scala がサポートされました | cloudpack.media

polynote から Glue スクリプトに移植するときの注意点

  1. poly で必要な import spark.* 系の import は Glue では全部いらない
  2. polynote では spark.SparkContext() で SparkContext を作成しているが、Glue では new SparkContext() で作成するように変更する
  3. polynote ではスクリプトベタ書きだが、Glue では GlueApp オブジェクト内に作成する
object GlueApp { def main(args: Array[String]) = { // Glue ではここにコードを書いていく } }
  1. poly では spark.createDataFrame でデータフレームを作成しているが、Glue では spark が使えないので sqlContext.createDataFrame で作成する
  2. polynote では case class を Dataset に詰め替えるときに import spark.implicits._ でエンコーダを用意できるが、Glue では自前で implicit encoder を用意してあげなければならない
// Glue では implicit Encoder を用意する
case class Test(mean_cost: Double, median_cost: Double, mean_time: Double, median_time: Double, hour: String)
implicit val testEnc: Encoder[Test] = Encoders.product[Test]()

scala - Why is “Unable to find encoder for type stored in a Dataset” when creating a dataset of custom case class? - Stack Overflow

例外対応

toDF 無いよ的なこと言われた時
sqlContrext を使用するときは implicits も同時に import しないと toDF などが使えない value toDF is not a member of org.apache.spark.rdd.RDD(Long, org.apache.spark.ml.linalg.Vector) - Stack Overflow

val sqlContext= new org.apache.spark.sql.SQLContext(sc)

import sqlContext.implicits._

DynamicFrame

  • Spark や pandas の DataFrame と似た挙動をする
  • Spark の Row や pandas の Series に当たるものとして内部的に DynamicRecord というものがある
  • AWS の他のサービスと接続性がある
    • 自身を S 3に吐き出したり
  • toDF は同列に複数型がある場合は Spark の DataFrame に変換できない

情報源

開発エンドポイントと glue job の差異

  • 開発エンドポイントだと sparkContext がもともと与えられているが、glue job だと自分で作成しなければいけないのでそこで差異が生じる。具体的には foreach などの中で sc を使った処理を行うと Task not Serializable が出る

PySpark を使用するときの tips

dynamic frame 系の optional 引数 transformation_ctx

  • glue は生データの重複処理を防ぐための仕組みとして job bookmark というものを持っている
  • transformation_ctx 引数は job bookmark を制御するためのもので、詳しくはよくわからんがとりあえず入れとくのをすすめる What is transformation_ctx used for in aws glue?

カタログからのデータの読み込み

pre_logs = glueContext.create_dynamic_frame.from_catalog(
database = "db_name",
table_name = "table_name",
transformation_ctx = "anything"
)

choice の解決方法

logs = ResolveChoice.apply(
frame=pre_logs, choice="MATCH_CATALOG", database="test", table_name="logs", transformation_ctx="logs"
)

既存の python ライブラリを読み込む方法

下のやつで行ける。
注意点はルートに init.py を作成すること。
AWS Glue で S3 上にある Python の外部ライブラリをインポートして利用する - YOMON8.NET

info-outline

お知らせ

K.DEVは株式会社KDOTにより運営されています。記事の内容や会社でのITに関わる一般的なご相談に専門の社員がお答えしております。ぜひお気軽にご連絡ください。