AWS Glue色々

GUI操作

テーブルの分類の変更方法

テーブルページで編集したいテーブルを選択したあと、 テーブルの編集ボタンを押す。
モーダルが出てくるので、 テーブルのプロパティ 項目の classification を json に変更する。

ジョブの登録方法

  1. ジョブで走らせたいスクリプトを作成する
    1. GlueAppというオブジェクトの中に走らせたいコードをすべて配置する
  2. s3に走らせたいスクリプトファイルを配置する
  3. Glue内のジョブページで新規作成する
    1. ほとんどデフォルト
    2. RoleはGlueJobRoleに設定
    3. Glue versionはSpark 2.4, Scala2(Glue Version 1.0)
    4. このジョブ実行はユーザ提供の既存のスクリプトを選択
    5. クラス名をGlueAppに設定
    6. スクリプトのS3パスをさっきのスクリプトのパスに設定
  4. 保存を連打して完了
    1. 完了したらスクリプト編集画面が出てくる

オンデマンドトリガーの実行方法

Glueのトリガータブで作成したオンデマンドトリガーを選択し、アクションからトリガーの実行を選択する。 トリガーによって実行されたかどうかはジョブタブでお目当てのジョブを選択すると下にログが表示される。

ジョブパラメータの注入方法

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, [‘JOB_NAME’])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
args = getResolvedOptions(sys.argv, [‘JOB_NAME’,’VAL1’,’VAL2’,’VAL3’,’DEST_FOLDER’])
job.init(args[‘JOB_NAME’], args)

v_list=[{“VAL1”:args[‘VAL1’],”VAL2”:args[‘VAL2’],”VAL3”:args[‘VAL3’]}]

df=sc.parallelize(v_list).toDF()
df.repartition(1).write.mode(‘overwrite’).format(‘csv’).options(header=True, delimiter = ‘;’).save(“s3://“+ args[‘DEST_FOLDER’] +”/“)

job.commit()

amazon web services – AWS Glue Job Input Parameters – Stack Overflow

コードを書くときの注意点

glueの1カラムに複数の型が入っていたときの対応

1カラムに複数型が入るとchoice型になってネストした辞書っぽくなる。
DynamicFrame#resolveChoiceメソッドを使えば解決できる。
aws-samples/aws-glue-samples
以下はglueのテーブルのスキーマにカラムスキーマをあわせる例。

import com.amazonaws.services.glue.ChoiceOption
val resolvedDf = logs.resolveChoice(choiceOption = Some(ChoiceOption(“match_catalog”)),
                   database = Some(“test”),
                   tableName = Some(“logs”)).toDF()

AWS Glueで新しくScalaがサポートされました | cloudpack.media

polynoteからGlueスクリプトに移植するときの注意点

  1. polyで必要なimport spark.*系のimportはGlueでは全部いらない
  2. polynoteではspark.SparkContext()でSparkContextを作成しているが、Glueではnew SparkContext()で作成するように変更する
  3. polynoteではスクリプトベタ書きだが、GlueではGlueAppオブジェクト内に作成する
object GlueApp { def main(args: Array[String]) = { // Glueではここにコードを書いていく } }
  1. polyではspark.createDataFrameでデータフレームを作成しているが、Glueではsparkが使えないのでsqlContext.createDataFrameで作成する
  2. polynoteではcase classをDatasetに詰め替えるときにimport spark.implicits._でエンコーダを用意できるが、Glueでは自前でimplicit encoderを用意してあげなければならない
// Glueではimplicit Encoderを用意する case class Test(mean_cost: Double, median_cost: Double, mean_time: Double, median_time: Double, hour: String) implicit val testEnc: Encoder[Test] = Encoders.product[Test]

scala – Why is “Unable to find encoder for type stored in a Dataset” when creating a dataset of custom case class? – Stack Overflow

例外対応

toDF無いよ的なこと言われた時
sqlContrextを使用するときはimplicitsも同時にimportしないとtoDFなどが使えない value toDF is not a member of org.apache.spark.rdd.RDD(Long, org.apache.spark.ml.linalg.Vector) – Stack Overflow

val sqlContext= new org.apache.spark.sql.SQLContext(sc)

import sqlContext.implicits._

DynamicFrame

  • SparkやpandasのDataFrameと似た挙動をする
  • SparkのRowやpandasのSeriesに当たるものとして内部的にDynamicRecordというものがある
  • AWSの他のサービスと接続性がある
    • 自身をS3に吐き出したり
  • toDFは同列に複数型がある場合はSparkのDataFrameに変換できない

情報源

開発エンドポイントとglue jobの差異

  • 開発エンドポイントだと sparkContextがもともと与えられているが、glue jobだと自分で作成しなければいけないのでそこで差異が生じる。具体的にはforeachなどの中でscを使った処理を行うと Task not Serializableが出る

PySparkを使用するときのtips

dynamic frame系のoptional引数transformation_ctx

  • glueは生データの重複処理を防ぐための仕組みとしてjob bookmarkというものを持っている
  • transformation_ctx引数はjob bookmarkを制御するためのもので、詳しくはよくわからんがとりあえず入れとくのをすすめる What is transformation_ctx used for in aws glue?

カタログからのデータの読み込み

pre_logs = glueContext.create_dynamic_frame.from_catalog(
    database = “db_name”,
    table_name = “table_name”,
    transformation_ctx = “anything”
)

choiceの解決方法

logs = ResolveChoice.apply(
    frame=pre_logs, choice=“MATCH_CATALOG”, database=“test”, table_name=“logs”, transformation_ctx=“logs”
)

既存のpythonライブラリを読み込む方法

下のやつで行ける。
注意点はルートに init.pyを作成すること。
AWS GlueでS3上にあるPythonの外部ライブラリをインポートして利用する – YOMON8.NET

PySpark DataFrameメモ VPC内で動くGlue開発エンドポイントでPyCharmを使用する方法 VPCエンドポイント
View Comments
There are currently no comments.