DataFrameの作成
SparkSession#cresateDataFrame
を使用する
作成元のデータは
- RDD
- list
- pandas.DataFrame
のいずれか
pyspark.sql module — PySpark 2.4.5 documentation
select
リストカラムを展開して複数行にする
exploedeを使用する。
df_e = df.select(‘id', F.explode(F.col('tags’)).alias(‘tag'))
PySparkで1レコードを複数行にする – iMind Developers Blog
辞書カラムから特定の要素を抜き出す
df.select( col(“payload.weight”).alias(“weight”), col(“payload.unix_msec”).alias(“unix_msec”), “macAddress”, “sensorType” )
filter
複数条件は |
または&
でつなぐ
df.filter((col("act_date”) >= “2016-10-01”) & (col(“act_date”) <= “2017-04-01”))
python – Multiple condition filter on dataframe – Stack Overflow
betweenは以上と以下になる
>>> df.select(df.name, df.age.between(2, 4)).show() +——+—————————————+ | name|((age >= 2) AND (age <= 4))| +——+—————————————+ |Alice| true| | Bob| false| +——+—————————————+
udf
作り方
pysparkでデータハンドリングする時によく使うやつメモ – Qiita
udfのreturn typeの注意点
- udfでは戻り値の型を指定してやる必要があるが、numpyと併用すると容易にnumpy.floatとかに化けるためエラーが起きる時がある。その時はfloatとかに変換してやればいい。
- 戻り値をFloatとかにしてIntegerを返すと普通に動くが、適合していないとこはすべてNoneになる
変換
columnの型を変換する
changedTypedf = joindf.withColumn("label”, joindf[“show”].cast("double"))
how to change a Dataframe column from String type to Double type in pyspark
カラムのrename
withColumnRenamed(“old_name”, “new_name”)
from/to pandas
import numpy as np import pandas as pd # Enable Arrow-based columnar data transfers spark.conf.set(“spark.sql.execution.arrow.enabled”, “true”) # Generate a Pandas DataFrame pdf = pd.DataFrame(np.random.rand(100, 3)) # Create a Spark DataFrame from a Pandas DataFrame using Arrow df = spark.createDataFrame(pdf) # Convert the Spark DataFrame back to a Pandas DataFrame using Arrow result_pdf = df.select(“*”).toPandas()
unionとunionByNameの違い
カラム名を参照して結合してくれるかどうか
df1 = spark.createDataFrame([(1, 2, 3)], [‘x’, ‘y’, ‘z’]) df2 = spark.createDataFrame([(4, 5, 6)], [‘z’, ‘x’, ‘y’]) df_union = df1.union(df2) df_unionByName = df1.unionByName(df2) print(‘df1’) df1.show() print(‘df2’) df2.show() # df1 # +—+—+—+ # | x| y| z| # +—+—+—+ # | 1| 2| 3| # +—+—+—+ # # df2 # +—+—+—+ # | z| x| y| # +—+—+—+ # | 4| 5| 6| # +—+—+—+ print(‘union’) df_union.show() print(‘unionByName’) df_unionByName.show() # union # +—+—+—+ # | x| y| z| # +—+—+—+ # | 1| 2| 3| # | 4| 5| 6| # +—+—+—+ # # unionByName # +—+—+—+ # | x| y| z| # +—+—+—+ # | 1| 2| 3| # | 5| 6| 4| # +—+—+—+
次の行との計算を行いたい場合
Windowを使って次、または前の行のデータを書く行に連結する。
前の値は lag
、次の行の値は lead
で取得する。
from pyspark.sql.window import Window import pyspark.sql.functions as func from pyspark.sql.functions import lit dfu = df.withColumn(‘user’, lit(‘tmoore’)) df_lag = dfu.withColumn(‘prev_day_price’, func.lag(dfu[‘price’]) .over(Window.partitionBy(“user”))) result = df_lag.withColumn(‘daily_return’, (df_lag[‘price’] - df_lag[‘prev_day_price’]) / df_lag[‘price’] ) >>> result.show() +—+——+———+———————+——————————+ |day|price| user|prev_day_price| daily_return| +---+-----+-------+--------------+--------------------+ | 1| 33.3| tmoore| null| null| | 2| 31.1| tmoore| 33.3|-0.07073954983922816| | 3| 51.2| tmoore| 31.1| 0.392578125| | 4| 21.3| tmoore| 51.2| -1.403755868544601| +—+——+———+--------------+——————————+
spark dataframe – Applying a Window function to calculate differences in pySpark – Stack Overflow
groupby
groupbyでリストを出力する
pyspark collect_set or collect_list with groupby
groupbyで同じ行の複数カラムを辞書にしたリストを出力
Structメソッドを使う
https://www.programcreek.com/python/example/98235/pyspark.sql.functions.collect_list
groupbyしてaggでudfを使用する
まず collect_listでリスト出力した後にそのカラムに対してudfを適用する
エラー対応
expected zero arguments for construction of ClassDict (for numpy.dtype)
Udfのtype設定が間違っているときに出た
Spark Error:expected zero arguments for construction of ClassDict (for numpy.core.multiarray._reconstruct)