生成dataframe
1 | Pandas和Spark的DataFrame两者互相转换: |
1 | df.collect(),会将所有程序的数据收集到驱动上,如果数据集过大会出现memory error -->所以尽量少用,推荐take,tail |
打印数据
1 | df.show() #默认20行 |
查看前几行
1 | df.head(3) |
查看dataframe的总行数
1 | df.count() |
查看列名
1 | df.columns |
重新设置列名
1 | df.select(df.age.alias('age_value'),'name') |
去重
1 | df.drop_duplicates(['app_text']) |
随机抽样
1 | df.sample(False,0.5,0) |
选取列
1 | df.age |
排序
1 | df.orderBy('group_name',ascending=False) 按指定字段升序 |
按条件筛选
1 | from pyspark.sql import functions |
过滤数据=两者等价
1 | df.filter(df.age>21) |
数据分割
1 | https://blog.csdn.net/intersting/article/details/84500978 |
正则表达式匹配列名colRegex
1 | df = spark.createDataFrame([("a", 1), ("b", 2), ("c", 3)], ["Col1", "Col2"]) |
collect
1 | df.collect() |
相关性
1 | df_as2.corr('v1','v2',method='pearson') |
协方差
1 | df.cov('a','b') |
列表中取出
1 | https://blog.csdn.net/intersting/article/details/84500978 |
一行分成多行===>类似于df的.split().stack()
1 | https://blog.csdn.net/intersting/article/details/84500978 |
保留位数
1 | from pyspark.sql.functions import bround |
列数据合并
1 | 1.不添加分隔符 |
把一列的所有行合并
1 | from pyspark.sql.functions import collect_list |
多行转多列
1 | df=spark.sparkContext.parallelize([[15,399,2], \ |
删除列
1 | df.drop('age') |
列截取字符串
1 | df.name.substr(1,2) |
#####创建column
1 | lit(2) |
列操作withColumn
1 | df.withColumn('xx',df.xx.cast("Int")) #修改列的类型 |
合并两个df==join
1 | df_left.join(df_right,df_left.key = df_right.key,"inner") |
DF上下拼接
1 | df1.unionALL(df2) #不删除重复数据 |
查看数据类型
1 | df.dtypes |
groupBy===groupby
1 | df.groupBy("userID").avg("movieID").show() |
查询空值
1 | from pyspark.sql.functions import isnull |
转json内容
1 | sql_context = SQLContext(sc) |
col
1 | df_as1.join(df_as2, col("df_as1.name") == col("df_as2.name"), 'inner') |
包含
1 | df.json.contains('a') |
模糊匹配
1 | df.filter(df.gender.like('%M')).show()---->sql的模糊匹配 |
以什么..开始,以什么..结束
1 | df.filter(df.gender.startswith('M')) |
条件筛选
1 | from pyspark.sql import functions as F |
替换
1 | df.replace(1,2).show() |
Row
1 | ====使用方法==== |
pandas_udf
1 | 摘自: |
mapInPandas
1 | ############################################案例1####################################### |
UDF
1 | from pyspark.sql.types import DoubleType |
#####spark与pandas的dataframe对比,列举
Pandas | Spark | |
---|---|---|
行结构 | Series结构 | Row结构,属于Spark DataFrame结构 |
列结构 | Series结构 | Column结构 |
列名称 | 不允许重名 | 允许重名修改列名,采用alias方法 |
列添加 | df[“xx”] = 0 | df.withColumn(“xx”, functions.lit(0)).show() |
排序 | df.sort() | df.sort() |
df.head(2) | df.head(2)或者df.take(2) | |
df.tail(2) | ||
过滤 | df[df[‘age’]>21] | df.filter(df[‘age’]>21) 或者 df.where(df[‘age’]>21) |
df.groupby(“age”) df.groupby(“A”).avg(“B”) | df.groupBy(“age”) df.groupBy(“A”).avg(“B”).show() df.groupBy(“A”).agg(functions.avg(“B”), functions.min(“B”), functions.max(“B”)).show() | |
df.count() 输出每一列的非空行数 | df.count() 输出总行数 | |
df.describe() 描述某些列的count, mean, std, min, 25%, 50%, 75%, max | df.describe() 描述某些列的count, mean, stddev, min, max | |
合并 | concat | |
合并 | merge | |
合并 | join | df.join() |
合并 | append | |
fillna | df.fillna() | df.na.fill() |
dropna | df.dropna() | df.na.drop() |
两者互相转换 | pandas_df = spark_df.toPandas() | spark_df = sqlContext.createDataFrame(pandas_df) |
函数应用 | df.apply(f) | df.foreach(f) 或者 df.rdd.foreach(f) 将df的每一列应用函数fdf.foreachPartition(f) 或者 df.rdd.foreachPartition(f) 将df的每一块应用函数f |
#####某一列去重返回列表
1 | from pyspark.sql import Functions as F |
按行转成字典
1 | data_select4.rdd.map(lambda row: row.asDict(True)====>跟pandas的to_dict(orient='records')一致 |
列总结
1 | df.name.startswith('Al') |
二次学习
1 | agg ----->聚合操作 |