pyspark_dataframe

生成dataframe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
PandasSparkDataFrame两者互相转换:
pandas_df = spark_df.toPandas() --->也会将所有数据收集到驱动器,容易造成memory error
spark_df = sqlContext.createDataFrame(pandas_df)
RDD之间的相互转换
rdd_df = df.rdd
rdd_df.toDF()
1.rdd.toDF()


####空DF
schema = StructType([
StructField("列名1", StringType(), True),
StructField("列名2", StringType(), True),
StructField("列名3", StringType(), True),
StructField("列名4", StringType(), True)
])
df_new = spark.createDataFrame(spark.sparkContext.emptyRDD(),schema)

df = spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],("id", "v"))
1
df.collect(),会将所有程序的数据收集到驱动上,如果数据集过大会出现memory error  -->所以尽量少用,推荐take,tail
打印数据
1
2
df.show()  #默认20行
df.show(30)
查看前几行
1
2
df.head(3)
df.take(3)
查看dataframe的总行数
1
df.count()
查看列名
1
df.columns
重新设置列名
1
2
df.select(df.age.alias('age_value'),'name')
df.withColumnRenamed(”原列名“,"新列名")
去重
1
2
df.drop_duplicates(['app_text'])
df.select('app_texts').distinct()
随机抽样
1
df.sample(False,0.5,0)
选取列
1
2
3
4
5
df.age
df['age']
df.select('age')
df.select(df['age'])
df.select(df.age,df.name) #选取多列
排序
1
2
df.orderBy('group_name',ascending=False)  按指定字段升序
df.sort('age',ascending=True)
按条件筛选
1
2
3
4
from pyspark.sql import functions
df.select(df.name,functions.when(df.age > 4,1).when(df.age<3,-1).otherwise(0))

df.select(df.name, df.age.between(2, 4))
过滤数据=两者等价
1
2
df.filter(df.age>21)  
df.where(df.age>21)
数据分割
1
2
3
4
5
6
7
8
9
10
11
12
https://blog.csdn.net/intersting/article/details/84500978
df = spark.createDataFrame([
(1, 144.5,'5.9 2032', 33, 'M'),
(2, 167.2, '5.4 2012', 45, 'M'),
(3, 124.1, '5.2 2013', 23, 'F'),
(4, 144.5, '5.9 2014', 33, 'M'),
(5, 133.2, '5.7 2015', 54, 'F'),
(3, 124.1, '5.2 2011', 23, 'F'),
(5, 129.2, '5.3 2010', 42, 'M'),
], ['id', 'weight', 'height', 'age', 'gender'])
df = df.withColumn("s", split(df['height'], " ")).show()===>如果列已存在新数据会替换原来的列
==========df.withColumn类似于形成一个新的列,但是参数必须是column...====
正则表达式匹配列名colRegex
1
2
df = spark.createDataFrame([("a", 1), ("b", 2), ("c",  3)], ["Col1", "Col2"])
df.select(df.colRegex("`(Col)?.+`")).show()
collect
1
2
df.collect()
#返回列表形式的一个个Row对象
相关性
1
df_as2.corr('v1','v2',method='pearson')
协方差
1
df.cov('a','b')
列表中取出
1
2
3
https://blog.csdn.net/intersting/article/details/84500978
df.s.getItem(0)====>已知df某一列每个数据都是列表,每个列表取出第0个元素
df.withColumn("ss",df.s.getItem(0))
一行分成多行===>类似于df的.split().stack()
1
2
3
https://blog.csdn.net/intersting/article/details/84500978
from pyspark.sql.functions import explode,split
df_new.withColumn("res",explode(split(df_new.height,' '))).show()
保留位数
1
2
from pyspark.sql.functions import bround
bround(df_join.count1/df_join['count'],4)
列数据合并
1
2
3
4
1.不添加分隔符
df_new.withColumn('concat_res',cancat(df_new.gender,df_new.age))
2.添加分隔符
df_new.withColumn("concat_res",concat_ws(' ',df_new.gender,df_new.age))
把一列的所有行合并
1
2
3
4
5
6
from pyspark.sql.functions import collect_list


df = spark.createDataFrame([('abcd','123'),('xyz','123')], ['s', 'd'])
df.show()
df.groupBy("d").agg(collect_list('s').alias('newcol')).show()
多行转多列
1
2
3
4
5
6
7
8
9
df=spark.sparkContext.parallelize([[15,399,2], \
[15,1401,5], \
[15,1608,4], \
[15,20,4], \
[18,100,3], \
[18,1401,3], \
[18,399,1]])\
.toDF(["userID","movieID","rating"])
df.groupby('userID').pivot('movieID').sum('rating').na.fill(-1)
删除列
1
2
3
df.drop('age')
df.drop(df.age)
df = df.na.drop() # 扔掉任何列包含na的行
列截取字符串
1
df.name.substr(1,2)

#####创建column

1
2
lit(2)
df.withColumn('xx',lit(0)) ====>创造一列全是0
列操作withColumn
1
df.withColumn('xx',df.xx.cast("Int"))  #修改列的类型
合并两个df==join
1
2
df_left.join(df_right,df_left.key = df_right.key,"inner")  
df.join(df4, ['name', 'age']).select(df.name, df.age).collect()
DF上下拼接
1
2
df1.unionALL(df2) #不删除重复数据
df1.union(df2) #会删除重复数据
查看数据类型
1
df.dtypes
groupBy===groupby
1
2
3
4
5
6
df.groupBy("userID").avg("movieID").show()
#应用多个函数
df.groupBy("userID").agg(functions.avg("movieID"), functions.min("rating"),).show()

###apply函数
apply和applyInPandas名字不同意义相同....用的是pyspark.sql.functions.pandas_udf()
查询空值
1
2
3
4
from pyspark.sql.functions import isnull
from pyspark.sql.functions import isnan
df.filter(isnull("userID")) #筛选空值的行
df.where(isnan("userID")) #筛选空值的行
转json内容
1
2
sql_context = SQLContext(sc)
sql_context.read.json(df.rdd.map(lambda r: r.json))
col
1
df_as1.join(df_as2, col("df_as1.name") == col("df_as2.name"), 'inner')
包含
1
df.json.contains('a')
模糊匹配
1
df.filter(df.gender.like('%M')).show()---->sql的模糊匹配
以什么..开始,以什么..结束
1
2
df.filter(df.gender.startswith('M'))
df.filter(df.gender.endswith('M'))
条件筛选
1
2
from pyspark.sql import functions as F
df.select(df.id, F.when(df.age > 34, 1).when(df.age < 34, -1).otherwise(0)).show()
替换
1
2
df.replace(1,2).show()
df.na.replace(22).show()
Row
1
2
====使用方法====
Row(name="Alice", age=11) ====>理解生成一行数据,字段分别为name和age,数据为..
pandas_udf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
摘自:
https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.pandas_udf.html#pyspark.sql.functions.pandas_udf
#######################################案例1###################################################
from pyspark.sql.functions import pandas_udf
@pandas_udf('long') #返回结果的数据类型,多参数也可("col1 string, col2 long")
def get_add(a:pd.Series)->pd.Series:
return a+1
df.withColumn("one_processed", get_add(df["a"])).show()
#######################################案例2####################################################
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf("col1 string, col2 long") #返回的列类型
def func(s1: pd.Series, s2: pd.Series, s3: pd.DataFrame) -> pd.DataFrame: #传参类型和返回结果类型
s3['col2'] = s1 + s2.str.len() #针对结果是结构体,使用pd.DataFrame
return s3
df = spark.createDataFrame(
[[1, "a string", ("a nested string",)]],
"long_col long, string_col string, struct_col struct<col1:string>")
df.printSchema()
df.select(func("long_col", "string_col", "struct_col")).printSchema()
#######################################案例3####################################################
@pandas_udf("string")
def to_upper(s: pd.Series) -> pd.Series:
return s.str.upper()

df = spark.createDataFrame([("John Doe",)], ("name",))
df.select(to_upper("name")).show()
#######################################案例4####################################################
@pandas_udf("first string, last string")
def split_expand(s: pd.Series) -> pd.DataFrame:
return s.str.split(expand=True)
df = spark.createDataFrame([("John Doe",)], ("name",))
df.select(split_expand("name")).show()
#######################################案例5####################################################
@pandas_udf("long")
def calculate(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
# Do some expensive initialization with a state
state = very_expensive_initialization()
for x in iterator:
yield calculate_with_state(x, state)
df.select(calculate("value")).show()
#######################################案例6####################################################
from typing import Iterator
@pandas_udf("long")
def plus_one(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
for s in iterator:
yield s + 1
df = spark.createDataFrame(pd.DataFrame([1, 2, 3], columns=["v"]))
df.select(plus_one(df.v)).show()

#######################################案例7####################################################

from typing import Iterator, Tuple
from pyspark.sql.functions import struct, col
@pandas_udf("long")
def multiply(iterator: Iterator[Tuple[pd.Series, pd.DataFrame]]) -> Iterator[pd.Series]:
for s1, df in iterator:
yield s1 * df.v

df = spark.createDataFrame(pd.DataFrame([1, 2, 3], columns=["v"]))
df.withColumn('output', multiply(col("v"), struct(col("v")))).show()

#######################################案例8####################################################
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()

df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))
df.groupby("id").agg(mean_udf(df['v'])).show()
mapInPandas
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
############################################案例1#######################################
def pandas_filter_func(iterator):
for pandas_df in iterator:
yield pandas_df[pandas_df.a == 1]

df.mapInPandas(pandas_filter_func, schema=df.schema).show()

############################################案例2#######################################
df = spark.createDataFrame([
['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])
def plus_mean(pandas_df):
return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())

df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()

############################################案例3#######################################

df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
('time', 'id', 'v1'))

df2 = spark.createDataFrame(
[(20000101, 1, 'x'), (20000101, 2, 'y')],
('time', 'id', 'v2'))

def asof_join(l, r):
return pd.merge_asof(l, r, on='time', by='id')

df1.groupby('id').cogroup(df2.groupby('id')).applyInPandas(
asof_join, schema='time int, id int, v1 double, v2 string').show()
UDF
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import udf
df = spark.createDataFrame([(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],("id", "v"))

def normalize(v):
return v+1

plus_one_udf = udf(normalize, returnType=DoubleType())
df.withColumn("one_processed", plus_one_udf(df["v"]))

#######








################
from pyspark.sql import functions as F
from pyspark.sql import types as T

a = sc.parallelize([[1, 'a'],
[1, 'b'],
[1, 'b'],
[2, 'c']]).toDF(['id', 'value'])
a.groupBy('id').agg(F.collect_list('value').alias('value_list')).show()
def find_a(x):
"""Count 'a's in list."""
output_count = 0
for i in x:
if i == 'a':
output_count += 1
return output_count

find_a_udf = F.udf(find_a, T.IntegerType())

a.groupBy('id').agg(find_a_udf(F.collect_list('value')).alias('a_count')).show()

##################
from pyspark.sql import functions as F
from pyspark.sql import types as T

a = sc.parallelize([[1, 1, 'a'],
[1, 2, 'a'],
[1, 1, 'b'],
[1, 2, 'b'],
[2, 1, 'c']]).toDF(['id', 'value1', 'value2'])

a.groupBy('id').agg(find_a_udf( F.collect_list(F.when(F.col('value1') == 1, F.col('value2')))).alias('a_count')).show()


#################################

def sum_func(key, pdf):
# key is a tuple of two numpy.int64s, which is the values
# of 'id' and 'ceil(df.v / 2)' for the current group
return pd.DataFrame([key + (pdf.v.sum(),)])
df.groupby(df.id, F.ceil(df.v / 2)).applyInPandas(
sum_func, schema="id long, `ceil(v / 2)` long, v double").show()

################################
from pyspark.sql.functions import pandas_udf, PandasUDFType
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def normalize(pdf):
v = pdf.v
return pdf.assign(v=(v - v.mean()) / v.std())
df.groupby("id").apply(normalize).show()

################
@pandas_udf('group string,year int,app_texts string , app_num int ,high_apps string ,high_num int', PandasUDFType.GROUPED_MAP)
def deal_func(df):
ls = []
year_early = min(df['app_date'])
gp = df['group'].values[0]
for year in range(int(year_early), 2022):
res_year = df[(df.app_date <= year) & (df.ceased_date >= year)]
# 该年有效核心专利
high_value_year = res_year[res_year.xingji >= 4]['app_text'].unique()
ls.append([gp, year,str(list(res_year['app_text'].unique())), len(res_year['app_text'].unique()), str(list(high_value_year)), len(high_value_year)])
return pd.DataFrame(ls)

df_sql_res2.groupby('group').apply(deal_func).show()

#####spark与pandas的dataframe对比,列举

Pandas Spark
行结构 Series结构 Row结构,属于Spark DataFrame结构
列结构 Series结构 Column结构
列名称 不允许重名 允许重名修改列名,采用alias方法
列添加 df[“xx”] = 0 df.withColumn(“xx”, functions.lit(0)).show()
排序 df.sort() df.sort()
df.head(2) df.head(2)或者df.take(2)
df.tail(2)
过滤 df[df[‘age’]>21] df.filter(df[‘age’]>21) 或者 df.where(df[‘age’]>21)
df.groupby(“age”) df.groupby(“A”).avg(“B”) df.groupBy(“age”) df.groupBy(“A”).avg(“B”).show() df.groupBy(“A”).agg(functions.avg(“B”), functions.min(“B”), functions.max(“B”)).show()
df.count() 输出每一列的非空行数 df.count() 输出总行数
df.describe() 描述某些列的count, mean, std, min, 25%, 50%, 75%, max df.describe() 描述某些列的count, mean, stddev, min, max
合并 concat
合并 merge
合并 join df.join()
合并 append
fillna df.fillna() df.na.fill()
dropna df.dropna() df.na.drop()
两者互相转换 pandas_df = spark_df.toPandas() spark_df = sqlContext.createDataFrame(pandas_df)
函数应用 df.apply(f) df.foreach(f) 或者 df.rdd.foreach(f) 将df的每一列应用函数fdf.foreachPartition(f) 或者 df.rdd.foreachPartition(f) 将df的每一块应用函数f

#####某一列去重返回列表

1
2
from pyspark.sql import Functions as F
df.select(F.collect_set('applicant_name').alias('applicant'))).first()['applicant']
按行转成字典
1
data_select4.rdd.map(lambda row: row.asDict(True)====>跟pandas的to_dict(orient='records')一致

列总结

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
df.name.startswith('Al')
df.name.substr(1, 3)
df.name.rlike('ice$')
df.name.like('Al%')
df.age.isin([1, 2, 3])
df.height.isNull()
df.height.isNotNull()
df.d.getItem("key")
df.r.getField("b")
df.name.endswith('ice$')
df.name.desc()
df.name.contains('o')
df.age.cast("string").alias('ages')
df.age.between(2, 4)
df.name.asc()=====>df.select(df.name).orderBy(df.name.asc()).collect()
df4.na.fill(50)
df4.na.fill({'age': 50, 'name': 'unknown'}).show()
df4.na.replace(10, 20)

在这里插入图片描述

二次学习

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
agg ----->聚合操作
df.groupby('a').agg({"b":"min"})

alias ------->表别名,实际指向的同一块数据
df2 = df.alias('df2') 效果等价于 df2 = df

cache------>df放到内存里,效果等价于persist
df.cache()

coalesce ------>降低分区,效果等价于rdd 的 coalesce
df.coalesce(1).rdd.getNumPartitions()

colRegex -------->写正则匹配符合条件的列数据
df.select(df.colRegex("`(Col1)?+.+`")).show()

collect-------->以列表形式返回所有的数据
df.collect()

corr() ----->查看两列的相关性,只支持皮尔逊相关性,新版本才有该函数
df.corr()

##创建视图表
createGlobalTempView() //创建全局视图,重名会报错
createOrReplaceGlobalTempView() //创建全局视图,重名会替换
createReplaceTempView() //创建本地视图表,重名会替换
createTempView() //创建本地视图表,重名会报错

distinct ----->dataframe整体去重

drop_duplicates(等价于dropduplicates,别名)------>按照指定的列去重

mapInPandas