GUI操作
テーブルの分類の変更方法
テーブルページで編集したいテーブルを選択したあと、 テーブルの編集ボタンを押す。
モーダルが出てくるので、 テーブルのプロパティ 項目の classification を json に変更する。
ジョブの登録方法
- ジョブで走らせたいスクリプトを作成する
- GlueAppというオブジェクトの中に走らせたいコードをすべて配置する
- s3に走らせたいスクリプトファイルを配置する
- Glue内のジョブページで新規作成する
- ほとんどデフォルト
- RoleはGlueJobRoleに設定
- Glue versionはSpark 2.4, Scala2(Glue Version 1.0)
- このジョブ実行はユーザ提供の既存のスクリプトを選択
- クラス名をGlueAppに設定
- スクリプトのS3パスをさっきのスクリプトのパスに設定
- 保存を連打して完了
- 完了したらスクリプト編集画面が出てくる
オンデマンドトリガーの実行方法
Glueのトリガータブで作成したオンデマンドトリガーを選択し、アクションからトリガーの実行を選択する。 トリガーによって実行されたかどうかはジョブタブでお目当てのジョブを選択すると下にログが表示される。
ジョブパラメータの注入方法
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, [‘JOB_NAME’]) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) args = getResolvedOptions(sys.argv, [‘JOB_NAME’,’VAL1’,’VAL2’,’VAL3’,’DEST_FOLDER’]) job.init(args[‘JOB_NAME’], args) v_list=[{“VAL1”:args[‘VAL1’],”VAL2”:args[‘VAL2’],”VAL3”:args[‘VAL3’]}] df=sc.parallelize(v_list).toDF() df.repartition(1).write.mode(‘overwrite’).format(‘csv’).options(header=True, delimiter = ‘;’).save(“s3://“+ args[‘DEST_FOLDER’] +”/“) job.commit()
amazon web services – AWS Glue Job Input Parameters – Stack Overflow
コードを書くときの注意点
glueの1カラムに複数の型が入っていたときの対応
1カラムに複数型が入るとchoice型になってネストした辞書っぽくなる。
DynamicFrame#resolveChoiceメソッドを使えば解決できる。
aws-samples/aws-glue-samples
以下はglueのテーブルのスキーマにカラムスキーマをあわせる例。
import com.amazonaws.services.glue.ChoiceOption val resolvedDf = logs.resolveChoice(choiceOption = Some(ChoiceOption(“match_catalog”)), database = Some(“test”), tableName = Some(“logs”)).toDF()
AWS Glueで新しくScalaがサポートされました | cloudpack.media
polynoteからGlueスクリプトに移植するときの注意点
- polyで必要なimport spark.*系のimportはGlueでは全部いらない
- polynoteではspark.SparkContext()でSparkContextを作成しているが、Glueではnew SparkContext()で作成するように変更する
- polynoteではスクリプトベタ書きだが、GlueではGlueAppオブジェクト内に作成する
object GlueApp { def main(args: Array[String]) = { // Glueではここにコードを書いていく } }
- polyではspark.createDataFrameでデータフレームを作成しているが、Glueではsparkが使えないのでsqlContext.createDataFrameで作成する
- polynoteではcase classをDatasetに詰め替えるときにimport spark.implicits._でエンコーダを用意できるが、Glueでは自前でimplicit encoderを用意してあげなければならない
// Glueではimplicit Encoderを用意する case class Test(mean_cost: Double, median_cost: Double, mean_time: Double, median_time: Double, hour: String) implicit val testEnc: Encoder[Test] = Encoders.product[Test]
例外対応
toDF無いよ的なこと言われた時
sqlContrextを使用するときはimplicitsも同時にimportしないとtoDFなどが使えない value toDF is not a member of org.apache.spark.rdd.RDD(Long, org.apache.spark.ml.linalg.Vector) – Stack Overflow
val sqlContext= new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._
DynamicFrame
- SparkやpandasのDataFrameと似た挙動をする
- SparkのRowやpandasのSeriesに当たるものとして内部的にDynamicRecordというものがある
- AWSの他のサービスと接続性がある
- 自身をS3に吐き出したり
- toDFは同列に複数型がある場合はSparkのDataFrameに変換できない
情報源
- 公式
- AWS GlueのDynamicFrameの動きを見てみる | Developers.IO
- AWS Glue
- AWS Glue がScala をサポートしました | Amazon Web Services ブログ
開発エンドポイントとglue jobの差異
- 開発エンドポイントだと sparkContextがもともと与えられているが、glue jobだと自分で作成しなければいけないのでそこで差異が生じる。具体的にはforeachなどの中でscを使った処理を行うと Task not Serializableが出る
PySparkを使用するときのtips
dynamic frame系のoptional引数transformation_ctx
- glueは生データの重複処理を防ぐための仕組みとしてjob bookmarkというものを持っている
- transformation_ctx引数はjob bookmarkを制御するためのもので、詳しくはよくわからんがとりあえず入れとくのをすすめる What is transformation_ctx used for in aws glue?
カタログからのデータの読み込み
pre_logs = glueContext.create_dynamic_frame.from_catalog( database = “db_name”, table_name = “table_name”, transformation_ctx = “anything” )
choiceの解決方法
logs = ResolveChoice.apply( frame=pre_logs, choice=“MATCH_CATALOG”, database=“test”, table_name=“logs”, transformation_ctx=“logs” )
既存のpythonライブラリを読み込む方法
下のやつで行ける。
注意点はルートに init.pyを作成すること。
AWS GlueでS3上にあるPythonの外部ライブラリをインポートして利用する – YOMON8.NET
View Comments