PySpark on AWS Glue

dynamic frame系のoptional引数transformation_ctx

  • glueは生データの重複処理を防ぐための仕組みとしてjob bookmarkというものを持っている
  • transformation_ctx引数はjob bookmarkを制御するためのもので、詳しくはよくわからんがとりあえず入れとくのをすすめる What is transformation_ctx used for in aws glue?

カタログからのデータの読み込み

pre_logs = glueContext.create_dynamic_frame.from_catalog(
    database = “db_name”,
    table_name = “table_name”,
    transformation_ctx = “anything”
)

choiceの解決方法

logs = ResolveChoice.apply(
    frame=pre_logs, choice=“MATCH_CATALOG”, database=“test”, table_name=“logs”, transformation_ctx=“logs”
)

既存のpythonライブラリを読み込む方法

下のやつで行ける。
注意点はルートに init.pyを作成すること。
AWS GlueでS3上にあるPythonの外部ライブラリをインポートして利用する – YOMON8.NET

Apiを用いたjobの取り扱い

コードからのjobの呼び出し

response = client.start_job_run(
             JobName = 'my_test_Job',
             Arguments = {
               '--day_partition_key':   'partition_0',
               '--hour_partition_key':  'partition_1',
               '--day_partition_value':  day_partition_value,
               '--hour_partition_value': hour_partition_value } )

jobRun

コードからのパラメタの取得方法

import sys
from awsglue.utils import getResolvedOptions

args = getResolvedOptions(sys.argv,
                          ['JOB_NAME',
                           'day_partition_key',
                           'hour_partition_key',
                           'day_partition_value',
                           'hour_partition_value'])
print "The day-partition key is: ", args['day_partition_key']
print "and the day-partition value is: ", args['day_partition_value']

Accessing Parameters Using getResolvedOptions

APIを用いたworkflowの取り扱い

workflowの実行

jobからのworkflowプロパティの取得

import sys
import boto3
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from pyspark.context import SparkContext

glue_client = boto3.client("glue")
args = getResolvedOptions(sys.argv, ['JOB_NAME','WORKFLOW_NAME', 'WORKFLOW_RUN_ID'])
workflow_name = args['WORKFLOW_NAME']
workflow_run_id = args['WORKFLOW_RUN_ID']
workflow_params = glue_client.get_workflow_run_properties(Name=workflow_name,
                                        RunId=workflow_run_id)["RunProperties"]

target_database = workflow_params['target_database']
target_s3_location = workflow_params['target_s3_location']

ワークフローの実行プロパティの取得と設定

pythonでdynamo dbをORM風に扱えるpynamodbの使い方 PySpark DataFrameメモ TypedDictでpythonのtype hintでdictのkeyとvalueに厳格に型をつける
View Comments
There are currently no comments.