我报名参与金石方案1期挑战——分割10万奖池,这是我的第20篇文章,点击检查活动详情

大数据开发!Pandas转spark无痛指南!
  • 作者:韩信子@ShowMeAI
  • 大数据技能◉技能提高系列:www.showmeai.tech/tutorials/8…
  • 数据剖析实战系列:www.showmeai.tech/tutorials/4…
  • 本文地址:www.showmeai.tech/article-det…
  • 声明:版权一切,转载请联络平台与作者并注明出处
  • 保藏ShowMeAI检查更多精彩内容
大数据开发!Pandas转spark无痛指南!

Pandas 是每位数据科学家和 Python 数据剖析师都了解的东西库,它灵敏且强壮具备丰厚的功用,但在处理大型数据集时,它是非常受限的。

这种情况下,咱们会过渡到 PySpark,结合 Spark 生态强壮的大数据处理才干,充分利用多机器并行的核算才干,能够加速核算。不过 PySpark 的语法和 Pandas 差异也比较大,许多开发人员会感觉这很让人头大。

大数据开发!Pandas转spark无痛指南!

在本篇内容中, ShowMeAI 将对最核心的数据处理和剖析功用,整理 PySpark 和 Pandas 相对应的代码片段,以便咱们能够无痛地完结 Pandas 到大数据 PySpark 的转化

大数据开发!Pandas转spark无痛指南!

大数据处理剖析及机器学习建模相关常识,ShowMeAI制作了具体的教程与东西速查手册,咱们能够经过如下内容打开学习或许回顾相关常识。

  • 图解数据剖析:从入门到通晓系列教程
  • 图解大数据技能:从入门到通晓系列教程
  • 图解机器学习算法:从入门到通晓系列教程
  • 数据科学东西库速查表 | Spark RDD 速查表
  • 数据科学东西库速查表 | Spark SQL 速查表

导入东西库

在运用具体功用之前,咱们需求先导入所需的库:

# pandas vs pyspark,东西库导入
import pandas as pd
import pyspark.sql.functions as F

PySpark 一切功用的进口点是 SparkSession 类。经过 SparkSession 实例,您能够创立spark dataframe、运用各种转化、读取和写入文件等,下面是界说 SparkSession的代码模板:

from pyspark.sql import SparkSession
spark = SparkSession\
.builder\
.appName('SparkByExamples.com')\
.getOrCreate()

创立 dataframe

在 Pandas 和 PySpark 中,咱们最便利的数据承载数据结构都是 dataframe,它们的界说有一些不同,咱们来比照一下看看:

Pandas

columns = ["employee","department","state","salary","age"]
data = [("Alain","Sales","Paris",60000,34),
        ("Ahmed","Sales","Lyon",80000,45),
        ("Ines","Sales","Nice",55000,30),
        ("Fatima","Finance","Paris",90000,28),
        ("Marie","Finance","Nantes",100000,40)]

创立DataFrame的 Pandas 语法如下:

df = pd.DataFrame(data=data, columns=columns)
# 检查头2行
df.head(2)

PySpark

创立DataFrame的 PySpark 语法如下:

df = spark.createDataFrame(data).toDF(*columns)
# 检查头2行
df.limit(2).show()

指定列类型

Pandas

Pandas 指定字段数据类型的办法如下:

types_dict = {
    "employee": pd.Series([r[0] for r in data], dtype='str'),
    "department": pd.Series([r[1] for r in data], dtype='str'),
    "state": pd.Series([r[2] for r in data], dtype='str'),
    "salary": pd.Series([r[3] for r in data], dtype='int'),
    "age": pd.Series([r[4] for r in data], dtype='int')
}
df = pd.DataFrame(types_dict)

Pandas 能够经过如下代码来检查数据类型:

df.dtypes

PySpark

PySpark 指定字段数据类型的办法如下:

from pyspark.sql.types import StructType,StructField, StringType, IntegerType
schema = StructType([ \
    StructField("employee",StringType(),True), \
    StructField("department",StringType(),True), \
    StructField("state",StringType(),True), \
    StructField("salary", IntegerType(), True), \
    StructField("age", IntegerType(), True) \
  ])
df = spark.createDataFrame(data=data,schema=schema)

PySpark 能够经过如下代码来检查数据类型:

df.dtypes
# 检查数据类型 
df.printSchema() 

读写文件

Pandas 和 PySpark 中的读写文件方式非常相似。 具体语法比照如下:

Pandas

df = pd.read_csv(path, sep=';', header=True)
df.to_csv(path, ';', index=False)

PySpark

df = spark.read.csv(path, sep=';')
df.coalesce(n).write.mode('overwrite').csv(path, sep=';')

留意 ①

PySpark 中能够指定要分区的列:

df.partitionBy("department","state").write.mode('overwrite').csv(path, sep=';')

留意 ②

能够经过上面一切代码行中的 parquet 更改 CSV 来读取和写入不同的格局,例如 parquet 格局

数据挑选 – 列

Pandas

在 Pandas 中挑选某些列是这样完结的:

columns_subset = ['employee', 'salary']
df[columns_subset].head()
df.loc[:, columns_subset].head()

PySpark

在 PySpark 中,咱们需求运用带有列名列表的 select 办法来进行字段挑选:

columns_subset = ['employee', 'salary']
df.select(columns_subset).show(5)

数据挑选 – 行

Pandas

Pandas能够运用 iloc对行进行挑选:

# 头2行
df.iloc[:2].head()

PySpark

在 Spark 中,能够像这样挑选前 n 行:

df.take(2).head()
# 或许
df.limit(2).head()

留意:运用 spark 时,数据可能分布在不同的核算节点上,因此“第一行”可能会跟着运转而变化。

条件挑选

Pandas

Pandas 中根据特定条件过滤数据/挑选数据的语法如下:

# First method
flt = (df['salary'] >= 90_000) & (df['state'] == 'Paris')
filtered_df = df[flt]
# Second Method: Using query which is generally faster
filtered_df = df.query('(salary >= 90_000) and (state == "Paris")')
# Or
target_state = "Paris"
filtered_df = df.query('(salary >= 90_000) and (state == @target_state)')

PySpark

在 Spark 中,运用 filter办法或执行 SQL 进行数据挑选。 语法如下:

# 办法1:根据filter进行数据挑选
filtered_df = df.filter((F.col('salary') >= 90_000) & (F.col('state') == 'Paris'))
# 或许
filtered_df = df.filter(F.expr('(salary >= 90000) and (state == "Paris")'))
# 办法2:根据SQL进行数据挑选
df.createOrReplaceTempView("people")
filtered_df = spark.sql("""
SELECT * FROM people
WHERE (salary >= 90000) and (state == "Paris")
""") 

添加字段

Pandas

在 Pandas 中,有几种添加列的办法:

seniority = [3, 5, 2, 4, 10]
# 办法1
df['seniority'] = seniority
# 办法2
df.insert(2, "seniority", seniority, True)

PySpark

在 PySpark 中有一个特定的办法withColumn可用于添加列:

seniority = [3, 5, 2, 4, 10]
df = df.withColumn('seniority', seniority)

dataframe拼接

2个dataframe – pandas

# pandas拼接2个dataframe
df_to_add = pd.DataFrame(data=[("Robert","Advertisement","Paris",55000,27)], columns=columns)
df = pd.concat([df, df_to_add], ignore_index = True)

2个dataframe – PySpark

# PySpark拼接2个dataframe
df_to_add = spark.createDataFrame([("Robert","Advertisement","Paris",55000,27)]).toDF(*columns)
df = df.union(df_to_add)

多个dataframe – pandas

# pandas拼接多个dataframe
dfs = [df, df1, df2,...,dfn]
df = pd.concat(dfs, ignore_index = True)

多个dataframe – PySpark

PySpark 中 unionAll 办法只能用来连接两个 dataframe。咱们运用 reduce 办法合作unionAll来完结多个 dataframe 拼接:

# pyspark拼接多个dataframe
from functools import reduce
from pyspark.sql import DataFrame
def unionAll(*dfs):
    return reduce(DataFrame.unionAll, dfs)
dfs = [df, df1, df2,...,dfn]
df = unionAll(*dfs)

简单核算

Pandas 和 PySpark 都供给了为 dataframe 中的每一列进行核算核算的办法,能够轻松对下列核算值进行核算核算:

  • 列元素的计数
  • 列元素的平均值
  • 最大值
  • 最小值
  • 标准差
  • 三个分位数:25%、50% 和 75%

Pandas 和 PySpark 核算这些核算值的办法很相似,如下:

Pandas & PySpark

df.summary()
#或许
df.describe()

数据分组聚合核算

Pandas 和 PySpark 分组聚合的操作也是非常相似的:

Pandas

df.groupby('department').agg({'employee': 'count', 'salary':'max', 'age':'mean'})

PySpark

df.groupBy('department').agg({'employee': 'count', 'salary':'max', 'age':'mean'})

可是,终究显现的结果需求一些调整才干一致。

在 Pandas 中,要分组的列会主动成为索引,如下所示:

大数据开发!Pandas转spark无痛指南!

要将其作为列康复,咱们需求运用 reset_index办法:

df.groupby('department').agg({'employee': 'count', 'salary':'max', 'age':'mean'}).reset_index()
大数据开发!Pandas转spark无痛指南!

在 PySpark 中,列名会在结果dataframe中被重命名,如下所示:

大数据开发!Pandas转spark无痛指南!

要康复列名,能够像下面这样运用别号办法:

df.groupBy('department').agg(F.count('employee').alias('employee'), F.max('salary').alias('salary'), F.mean('age').alias('age'))
大数据开发!Pandas转spark无痛指南!

数据转化

在数据处理中,咱们常常要进行数据改换,最常见的是要对「字段/列」运用特定转化,在Pandas中咱们能够轻松根据apply函数完结,但在PySpark 中咱们能够运用udf(用户界说的函数)封装咱们需求完结的改换的Python函数。

例如,咱们对salary字段进行处理,如果薪酬低于 60000,咱们需求添加薪酬 15%,如果超越 60000,咱们需求添加 5%。

Pandas

Pandas 中的语法如下:

df['new_salary'] = df['salary'].apply(lambda x: x*1.15 if x<= 60000 else x*1.05)

Pyspark

PySpark 中的等价操作下:

from pyspark.sql.types import FloatType
df.withColumn('new_salary', F.udf(lambda x: x*1.15 if x<= 60000 else x*1.05, FloatType())('salary'))

⚠️ 请留意, udf办法需求清晰指定数据类型(在咱们的比如中为 FloatType)

总结

本篇内容中, ShowMeAI 给咱们总结了Pandas和PySpark对应的功用操作细节,咱们能够看到Pandas和PySpark的语法有许多相似之处,可是要留意一些细节差异。

另外,咱们仍是要根据场景进行合适的东西挑选:

  • 在处理大型数据集时,运用 PySpark 能够为您供给很大的优势,由于它答应并行核算。
  • 如果您正在运用的数据集很小,那么运用Pandas会很快和灵敏。

参考资料

  • 图解数据剖析:从入门到通晓系列教程:www.showmeai.tech/tutorials/3…
  • 图解大数据技能:从入门到通晓系列教程:www.showmeai.tech/tutorials/8…
  • 图解机器学习算法:从入门到通晓系列教程:www.showmeai.tech/tutorials/3…
  • 数据科学东西库速查表 | Spark RDD 速查表:www.showmeai.tech/article-det…
  • 数据科学东西库速查表 | Spark SQL 速查表:www.showmeai.tech/article-det…

大数据开发!Pandas转spark无痛指南!