PySpark DataFrameメモ

DataFrameの作成

SparkSession#cresateDataFrameを使用する
作成元のデータは

  • RDD
  • list
  • pandas.DataFrame
    のいずれか

pyspark.sql module — PySpark 2.4.5 documentation

select

リストカラムを展開して複数行にする

exploedeを使用する。

df_e = df.select(‘id', F.explode(F.col('tags’)).alias(‘tag'))

PySparkで1レコードを複数行にする – iMind Developers Blog

辞書カラムから特定の要素を抜き出す

df.select(
  col(“payload.weight”).alias(“weight”),
    col(“payload.unix_msec”).alias(“unix_msec”), “macAddress”, “sensorType”
)

filter

複数条件は |または&でつなぐ

df.filter((col("act_date”) >= “2016-10-01”) & (col(“act_date”) <= “2017-04-01”))

python – Multiple condition filter on dataframe – Stack Overflow

betweenは以上と以下になる

>>> df.select(df.name, df.age.between(2, 4)).show()
+——+—————————————+
| name|((age >= 2) AND (age <= 4))|
+——+—————————————+
|Alice|                       true|
|  Bob|                      false|
+——+—————————————+

udf

作り方

pysparkでデータハンドリングする時によく使うやつメモ – Qiita

udfのreturn typeの注意点

  • udfでは戻り値の型を指定してやる必要があるが、numpyと併用すると容易にnumpy.floatとかに化けるためエラーが起きる時がある。その時はfloatとかに変換してやればいい。
  • 戻り値をFloatとかにしてIntegerを返すと普通に動くが、適合していないとこはすべてNoneになる

変換

columnの型を変換する

changedTypedf = joindf.withColumn("label”, joindf[“show”].cast("double"))

how to change a Dataframe column from String type to Double type in pyspark

カラムのrename

withColumnRenamed(“old_name”, “new_name”)

from/to pandas

import numpy as np
import pandas as pd
# Enable Arrow-based columnar data transfers
spark.conf.set(“spark.sql.execution.arrow.enabled”, “true”)

# Generate a Pandas DataFrame
pdf = pd.DataFrame(np.random.rand(100, 3))

# Create a Spark DataFrame from a Pandas DataFrame using Arrow
df = spark.createDataFrame(pdf)

# Convert the Spark DataFrame back to a Pandas DataFrame using Arrow
result_pdf = df.select(“*”).toPandas()

unionとunionByNameの違い

カラム名を参照して結合してくれるかどうか

df1 = spark.createDataFrame([(1, 2, 3)], [‘x’, ‘y’, ‘z’])
df2 = spark.createDataFrame([(4, 5, 6)], [‘z’, ‘x’, ‘y’])

df_union = df1.union(df2)
df_unionByName = df1.unionByName(df2)

print(‘df1’)
df1.show()
print(‘df2’)
df2.show()

# df1
# +—+—+—+
# |  x|  y|  z|
# +—+—+—+
# |  1|  2|  3|
# +—+—+—+
#
# df2
# +—+—+—+
# |  z|  x|  y|
# +—+—+—+
# |  4|  5|  6|
# +—+—+—+

print(‘union’)
df_union.show()
print(‘unionByName’)
df_unionByName.show()

# union
# +—+—+—+
# |  x|  y|  z|
# +—+—+—+
# |  1|  2|  3|
# |  4|  5|  6|
# +—+—+—+
#
# unionByName
# +—+—+—+
# |  x|  y|  z|
# +—+—+—+
# |  1|  2|  3|
# |  5|  6|  4|
# +—+—+—+

次の行との計算を行いたい場合

Windowを使って次、または前の行のデータを書く行に連結する。
前の値は lag、次の行の値は leadで取得する。

from pyspark.sql.window import Window
import pyspark.sql.functions as func
from pyspark.sql.functions import lit

dfu = df.withColumn(‘user’, lit(‘tmoore’))

df_lag = dfu.withColumn(‘prev_day_price’,
                        func.lag(dfu[‘price’])
                                 .over(Window.partitionBy(“user”)))

result = df_lag.withColumn(‘daily_return’, 
          (df_lag[‘price’] - df_lag[‘prev_day_price’]) / df_lag[‘price’] )

>>> result.show()
+—+——+———+———————+——————————+
|day|price|   user|prev_day_price|        daily_return|
+---+-----+-------+--------------+--------------------+
|  1| 33.3| tmoore|          null|                null|
|  2| 31.1| tmoore|          33.3|-0.07073954983922816|
|  3| 51.2| tmoore|          31.1|         0.392578125|
|  4| 21.3| tmoore|          51.2|  -1.403755868544601|
+—+——+———+--------------+——————————+

spark dataframe – Applying a Window function to calculate differences in pySpark – Stack Overflow

groupby

groupbyでリストを出力する

pyspark collect_set or collect_list with groupby

groupbyで同じ行の複数カラムを辞書にしたリストを出力

Structメソッドを使う
https://www.programcreek.com/python/example/98235/pyspark.sql.functions.collect_list

groupbyしてaggでudfを使用する

まず collect_listでリスト出力した後にそのカラムに対してudfを適用する

エラー対応

expected zero arguments for construction of ClassDict (for numpy.dtype)

Udfのtype設定が間違っているときに出た
Spark Error:expected zero arguments for construction of ClassDict (for numpy.core.multiarray._reconstruct)

情報源

DataFrame基本操作

Spark DataframeのSample Code集 – Qiita

PythonとNewType Pythonのproperty based testingライブラリhypothesisの使い方 pythonでdynamo dbをORM風に扱えるpynamodbの使い方
View Comments
There are currently no comments.