直接用pyspark 的配置
1
2
3
4
5
6
7
8
9
1.vim ~/.bashrc
2.spark_home补充上 SPARK_HOME=/home/kuailiang/spark/spark-3.0.1-bin-hadoop3.2
3.export PATH=$PATH:$SPARK_HOME/bin

4.export PYSPARK_PYTHON=python3
source ~/.bashrc

3.spark-env.sh----->conf下
4.export PYSPARK_PYTHON=/usr/bin/python3
运行代码
1
2
3
4
spark-submit new.py
spark-submit --master spark://192.168.0.217:7077 pi.py 2000
spark-submit wordcount.py file:///home/tst #运行本地文件
./bin/spark-submit examples/src/main/python/pi.py
基本须知
1
2
3
4
5
6
1.spark程序必须做的第一件事就是创建一个sparkcontext对象(Spark如何访问集群)
2.数据还要使用则lineLengths.persist()
export PATH
export JAVA_HOME=/home/kuailiang/2020/java/jdk-15.0.1
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
RDD操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
转换:
map
filter
flatmap
sample
groupByKey
reduceByKey
union
join
cogroup
crossProduct
mapValues
sort
partitionBy
操作:
count
collect
reduce
lookup
save
在转换方法中的函数执行完后生成的还是一个RDD结构
自带案例1
1
2
3
4
textFile = spark.read.text("README.md")
textFile.count()
textFile.first()
linesWithSpark = textFile.filter(textFile.value.contains("Spark"))
查找包含最多单词的行
1
2
3
4
5
6
from pyspark.sql.functions import *
textFile.select(size(split(textFile.value,"\s+")).name("numWords")).agg(max(col("numWords"))).collect()

wordCounts = textFile.select(explode(split(textFile.value,"\s+")).alias("word")).groupBy("word").count()

wordCounts.collect()
spark操作mysql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
####注意对应版本的jar包要放在jars文件夹下https://mvnrepository.com/artifact/mysql/mysql-connector-java
clickhouse包 https://mvnrepository.com/artifact/ru.yandex.clickhouse/clickhouse-jdbc/0.2.4
============================读数据==================
jdbcDF = spark.read.format("jdbc").\
option("url", "jdbc:mysql://192.168.0.251:3306/pre_formal_2").\
option("driver","com.mysql.jdbc.Driver").\
option("dbtable", "ipc_split_10000_c_list_v3").\
option("user", "user_rw").\
option("password", "1a2s3d4f").load()
####读指定字段
industry_coms = spark.read.format("jdbc").\
option("url", "jdbc:mysql://192.168.0.251:3306/pre_formal_1").\
option("driver","com.mysql.jdbc.Driver").\
option("dbtable", "(select industryId,companyee_num from industry_company_wilson_20210112) t").\
option("user", "user_rw").\
option("password", "1a2s3d4f").load()
#返回的是dataframe
jdbcDF.show()
============================存数据===================
mysql_url = "jdbc:mysql://192.168.0.251:3306/pre_formal_2?user=user_rw&password=1a2s3d4f"
mysql_table = "people"
jdbcDF.write.mode("append").jdbc(mysql_url, mysql_table)
spark操作clickhouse
1
2
3
4
5
6
companys = spark.read.format("jdbc"). \
option("url", "jdbc:clickhouse://192.168.0.246:8123/pre_formal_1"). \
option("driver", "ru.yandex.clickhouse.ClickHouseDriver"). \
option("dbtable", f"(select applicant_name as applicant_other from company_20210208) t"). \
option("user", "default"). \
option("password", "123456").load().cache()
spark存成不同格式(csv,json,text,parquet)
1
2
3
4
5
6
7
jdbcDF.select("names").write.text("/root/mimo/people_text")

jdbcDF.write.csv("/root/mimo/people_text/people_csv", sep=':')

jdbcDF.write.json("/root/mimo/people_text/people_json", mode='overwrite')

peopledf.write.parquet("/root/mimo/people_text/people_parquet", mode='append')
Spark包
1
2
pyspark.SparkContext:
SparkContext表示与Spark集群的连接,可用于RDD在该集群上创建和广播变量
pyspark.sql
统计成绩案例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
#studentExample 例子 练习
def map_func(x):
s = x.split()
return (s[0], [int(s[1]),int(s[2]),int(s[3])]) #返回为(key,vaklue)格式,其中key:x[0],value:x[1]且为有三个元素的列表
#return (s[0],[int(s[1],s[2],s[3])]) #注意此用法不合法
def has100(x):
for y in x:
if(y == 100): #把x、y理解为 x轴、y轴
return True
return False
def allis0(x):
if(type(x)==list and sum(x) == 0): #类型为list且总分为0 者为true;其中type(x)==list :判断类型是否相同
return True
return False
def subMax(x,y):
m = [x[1][i] if(x[1][i] > y[1][i]) else y[1][i] for i in range(3)]
return('Maximum subject score', m)
def sumSub(x,y):
n = [x[1][i]+y[1][i] for i in range(3)]
#或者 n = ([x[1][0]+y[1][0],x[1][1]+y[1][0],x[1][2]+y[1][2]])
return('Total subject score', n)
def sumPer(x):
return (x[0],sum(x[1]))#停止之前的SparkContext,不然重新运行或者创建工作会失败;另外,只有 sc.stop()也可以,但是首次运行会有误
try:
sc.stop()
except:
pass
from pyspark import SparkContext #导入模块
sc=SparkContext(appName='Student') #命名
lines=sc.textFile("/home/kuailiang/spark/code/dtudent.txt").map(lambda x:map_func(x)).cache() #导入数据且保持在内存中,其中cache():数据保持在内存中
count=lines.count() #对RDD中的数据个数进行计数;其中,RDD一行为一个数据集#RDD'转换'运算 (筛选 关键字filter)
whohas100 = lines.filter(lambda x: has100(x[1])).collect() #注意:处理的是value列表,也就是x[1]
whois0 = lines.filter(lambda x: allis0(x[1])).collect()
sumScore = lines.map(lambda x: (x[0],sum(x[1]))).collect()
#‘动作’运算
maxScore = max(sumScore,key=lambda x: x[1]) #总分最高者
minScore = min(sumScore,key=lambda x: x[1]) #总分最低者
sumSubScore = lines.reduce(lambda x,y: sumSub(x,y))
avgScore = [x/count for x in sumSubScore[1]]#单科成绩平均值
#RDD key-value‘转换’运算
subM = lines.reduce(lambda x,y: subMax(x,y))
redByK = lines.reduceByKey(lambda x,y: [x[i]+y[i] for i in range(3)]).collect() #合并key相同的value值x[0]+y[0],x[1]+y[1],x[2]+y[2]
#RDD'转换'运算
sumPerSore = lines.map(lambda x: sumPer(x)).collect() #每个人的总分 #sumSore = lines.map(lambda x: (x[0],sum(x[1]))).collect()
sorted = lines.sortBy(lambda x: sum(x[1])) #总成绩低到高的学生成绩排序
sortedWithRank = sorted.zipWithIndex().collect()#按总分排序
first3 = sorted.takeOrdered(3,key=lambda x:-sum(x[1])) #总分前三者#限定以空格的形式输出到文件中
first3RDD = sc.parallelize(first3)\
.map(lambda x:str(x[0])+' '+str(x[1][0])+' '+str(x[1][1])+' '+str(x[1][2])).saveAsTextFile("result")

#print(lines.collect())
print("数据集个数(行):",count)
print("单科满分者:",whohas100)
print("单科零分者:",whois0)
print("单科最高分者:",subM)
print("单科总分:",sumSubScore)
print("合并名字相同的分数:",redByK)
print("总分/(人)",sumPerSore)
print("最高总分者:",maxScore)
print("最低总分者:",minScore)
print("每科平均成绩:",avgScore)
# print("总分倒序:",sortedWithRank)
print("总分前三者:",first3)
print(first3RDD)
sc.stop()
saprk sql和dataframe
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
df.show() #展示数据
df.select("name").show() #挑选指定的列
df.select(df['name'], df['age'] + 1).show() #展示name和age字段并将age字段+1
df.filter(df['age'] > 21).show() #过滤,展示age字段大于21的数据
df.groupBy("age").count().show()
import pyspark.sql.functions as func
df.groupBy("department").agg(df["department"], func.max("age"), func.sum("expense"))
df.groupBy("department").agg(func.max("age"), func.sum("expense"))
sqlContext.setConf("spark.sql.retainGroupColumns", "false")


from pyspark.sql import Row
sc = spark.sparkContext
# Load a text file and convert each line to a Row.
lines = sc.textFile("examples/src/main/resources/people.txt")
parts = lines.map(lambda l: l.split(","))
people = parts.map(lambda p: Row(name=p[0], age=int(p[1])))

# Infer the schema, and register the DataFrame as a table.
schemaPeople = spark.createDataFrame(people) #创建dataframe
schemaPeople.createOrReplaceTempView("people")

# SQL can be run over DataFrames that have been registered as a table.
teenagers = spark.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

# The results of SQL queries are Dataframe objects.
# rdd returns the content as an :class:`pyspark.RDD` of :class:`Row`.
teenNames = teenagers.rdd.map(lambda p: "Name: " + p.name).collect()
for name in teenNames:
print(name)


Spark SQL可以将Row对象的RDD转换为DataFrame,从而推断数据类型。通过将键/值对列表作为kwargs传递给Row类来构造行。该列表的键定义表的列名,并且通过对整个数据集进行采样来推断类型,类似于对JSON文件执行的推断。
createDataFrame
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
DataFrame从RDD,列表或创建一个pandas.DataFrame。
from pyspark.sql.types import Row
def f(x):
rel = {}
rel['name'] = x[0]
rel['age'] = x[1]
return rel
peopleDF = sc.textFile("examples/src/main/resources/people.txt").map(lambda line : line.split(',')).map(lambda x: Row(**f(x))).toDF()

peopleDF.createOrReplaceTempView("people") #必须注册为临时表才能供下面的查询使用
personsDF = spark.sql("select * from people")

#########创建DataFrame########
=====1
l = [('Alice', 1)]
spark.createDataFrame(l, ['name', 'age']).collect()
=====2
d = [{'name': 'Alice', 'age': 1}]
spark.createDataFrame(d).collect()
=====3
l = [('Alice', 1)]
rdd = sc.parallelize(l)
spark.createDataFrame(rdd).collect()
spark.createDataFrame(rdd, ['name', 'age']).collect()
=====4
from pyspark.sql import Row
l = [('Alice', 1)]
rdd = sc.parallelize(l)
Person = Row('name', 'age')
person = rdd.map(lambda r: Person(*r))
spark.createDataFrame(person).collect()
=====5
from pyspark.sql.types import *
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)])
spark.createDataFrame(rdd, schema).collect()
=====6
spark.createDataFrame(df.toPandas()).collect() #将 pandas的dataframe转换成spark的
spark.createDataFrame(pandas.DataFrame([[1, 2]])).collect()
=====7
spark.createDataFrame(rdd, "a: string, b: int").collect()

#####DataFrame操作

1
2
3
4
5
6
#按照一列进行排序
df.sort(df.age.desc()).show()
#多列排序
df.sort(df.age.desc(), df.name.asc()).show()
#对列进行重命名
df.select(df.name.alias("username"),df.age).show()
SparkConf====>资源控制,配置spark
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
sc_conf = SparkConf()
sc_conf.setMaster('spark://192.168.0.217:7077')
sc_conf.setAppName('my-app')
sc_conf.set('spark.executor.memory', '60g') #executor memory是每个节点上占用的内存。每一个节点可使用内存
sc_conf.set("spark.executor.cores", '4') #spark.executor.cores:顾名思义这个参数是用来指定executor的cpu内核个数,分配更多的内核意味着executor并发能力越强,能够同时执行更多的task
sc_conf.set('spark.cores.max', 40) #spark.cores.max:为一个application分配的最大cpu核心数,如果没有设置这个值默认为spark.deploy.defaultCores
sc_conf.set('spark.logConf', True) #当SparkContext启动时,将有效的SparkConf记录为INFO。
print(sc_conf.getAll())

=============================大类下的方法======
sc_conf.contains('spark.executor.memory') ====>返回True和False
sc_conf.getAll() ====>获取配置信息
sc_conf.set('spark.executor.memory', '2g') ======>设置spark的配置
sc_conf.setAll([('spark.executor.memory', '2g'),('spark.cores.max', 40)]) ===>一次设置多个配置
sc_conf.setMaster() 设置连接的主URL


动态资源分配:============还不懂==================>反正很重要
def conf(self):
conf = super(TbtestStatisBase, self).conf
conf.update({
'spark.shuffle.service.enabled': 'true',
'spark.dynamicAllocation.enabled': 'false',
'spark.dynamicAllocation.initialExecutors': 50,
'spark.dynamicAllocation.minExecutors': 1,
'spark.dynamicAllocation.maxExecutors': 125,
'spark.sql.parquet.compression.codec': 'snappy',
'spark.yarn.executor.memoryOverhead': 4096,
"spark.speculation": 'true',
'spark.kryoserializer.buffer.max': '512m',
})
SparkContext
1
2
3
SparkContext每个JVM仅应激活一个。在创建新 的活动目录之前,必须先停止活动目录
getOrCreate:获取或实例化SparkContext并将其注册为单例对象
glom()返回通过将每个分区内的所有元素合并到列表中而创建的RDD

#####其他方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
parallelize([1,2,3],3)  分发本地Python集合以形成RDD
pickleFile() 使用saveAsPickleFile()保存的RDD
sc.runJob(sc.parallelize(range(6), 3), lambda part: [x * x for x in part], [0, 2], True) 对指定的分区进行指定的操作,未指定分区则在全部分区执行
sc.sparkUser() 获取正在使用spark_context的用户
sc.stop() 关闭sparkcontext
sc.TextFile() 从对于路径读取文件
sc.union([rdd1,rdd2]) 建立rdd列表的并集
rdd1.intersection(rdd2) 建立rdd的交集
rdd1.subtract(rdd2) 建立rdd的差集
rdd1.distinct() RDD去重
rdd1.takeOrdered(3,key=lambda x:-x) 从大到小排序,从小到大不用lambda
rdd1.randomsplit([0.4,0.6]) rdd等比例分割
sc.parallelize([100, 200, 300, 400, 500], 5).aggregate((1, 1), seqOp1, combOp1)
rdd1.cache() rdd放到内存中
rdd1.cartesian(rdd2) 计算两个rdd的笛卡尔乘积
rdd.getNumPartitions() 获取分区数量
sc.parallelize([1, 2, 3, 4, 5], 3).coalesce(1).glom().collect() 减少rdd的分区
rdd1.collect() 返回rdd的所有元素列表
sorted(sc.parallelize([("a", 1), ("b", 4)]).cogroup(sc.parallelize([("a", 2)])).collect()) 返回一个元组,其中包含key的所有值的列表(类似按照键聚合)
sc.parallelize([(1, 2), (3, 4)]).collectAsMap() 构建字典,数据较小时(放到内存中)
sc.parallelize([2, 3, 4]).count()
sc.parallelize([("a", 1), ("b", 1), ("a", 1)]).countByKey().items() #计算每个键的元素数
sc.parallelize([1, 2, 1, 2, 2]).countByValue().items() #返回值的计数
sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect()) #返回去重后的RDD
sc.parallelize([1, 2, 3, 4, 5]).filter(lambda x: x % 2 == 0).collect() #筛选符合条件的构建新的rdd
sc.parallelize([2, 3, 4]).first() #返回rdd的第一条数据,空的rdd会报错
sorted(sc.parallelize([2, 3, 4]).flatMap(lambda x: range(1, x)).collect()) #对rdd的每一个元素处理,然后将结果展平
sc.parallelize([]).isEmpty() #判断RDD是否为空
sc.parallelize(range(0,3)).keyBy(lambda x: x*x).collect() 创建元组的键[(0, 0), (1, 1), (4, 2)]
sc.parallelize([(1, 2), (3, 4)]).keys() 返回rdd的键
rdd1.max() 返回最大值,存在参数key
rdd1.is_cached 判断是否是缓存
sc.broadcast(array([1,2,3,4])) 广播变量
sc.parallelize([2, 3, 4, 5, 6]).take(3) 获取前三个元素
sc.parallelize([2, 3, 4, 5, 6]).takeOrdered(3) 排序后取前三个
sc.parallelize([10, 4, 2, 12, 3]).top(3) 取前3个数据,适用于较小的数据
sc.parallelize([1, 2, 3]).variance() 计算rdd的方差
sc.parallelize([1.0, 2.0, 3.0]).sum() 求和



================foreach==============
def f(x): print(x)
sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
============flatMapValues================
x = sc.parallelize([("a", ["x", "y", "z"]), ("b", ["p", "r"])])
def f(x): return x
x.flatMapValues(f).collect()
sc.parallelize([1,2,3,4]).fold(0,lambda x,y:x+y) 类似于reduce的聚合,0 表示初始聚合值和聚合类型。
==============join,leftOuterjoin,rightOuterjoin========
kvRDD1 = sc.parallelize([(3,4),(3,6),(5,6),(1,2)])
kvRDD2 = sc.parallelize([(3,8)])
kvRDD1.join(kvRDD2) #按照相同的key值拼接
kvRDD1.leftOuterJoin(kvRDD2) #左侧认rdd为准,没有的为None==>[(1,(2,None)),(3,(4,8)),(3,(6,8)),(5,(6,None))]
kvRDD1.rightOuterJoin(kvRDD2) 右连接
kvRDD1.fullOuterJoin(kvRDD2) 外连接
===========subtractByKey===== 删除相同key值的数据
kvRDD1.subtractByKey(kvRDD2)
===============groupBy==============将RDD中每个键的值分组为单个序列
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.groupByKey().mapValues(len).collect())
sorted(rdd.groupByKey().mapValues(list).collect())
=================mapPartitions==============RDD的每个分区应用函数
rdd = sc.parallelize([1, 2, 3, 4], 2)
def f(iterator): yield sum(iterator)
rdd.mapPartitions(f).collect()
=================mapValues============不更改键,对值操作
x = sc.parallelize([("a", ["apple", "banana", "lemon"]), ("b", ["grapes"])])
def f(x): return len(x)
x.mapValues(f).collect()
===================zip==================
x = sc.parallelize(range(0,5))
y = sc.parallelize(range(1000, 1005))
x.zip(y).collect()
aggregate
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
1、>>>  seqOp  =  ( lambda  x, y: (x[ 0 ]  +  y, x[ 1 ]  +   1 ))
2、>>> combOp = ( lambda x, y: (x[ 0 ] + y[ 0 ], x[ 1 ] + y[ 1 ]))
3、>>> sc . parallelize([ 1 , 2 , 3 , 4 ],4) . aggregate(( 0 , 0 ), seqOp, combOp)
(10, 4)
4、>>> sc.parallelize([ 1, 2, 3, 4,5 ],3).aggregate((1,1),seqOp,combOp)
(19,9

依次解释上述函数
1、建立各分区内的聚集函数,又初始值依次与分区内的函数做操作
2、建立各分区间的组合函数
3、使用aggregate 样例1
4、使用aggregate 样例2

样例1 解释:
分区数 : 4
01
12
23
34

利用zerovalue (0,0) 和 seqOp 对各分区进行聚集 : ---->看成x为(0,0) y为分区的
0 : (11
1 : (2 , 1
2 : (31
3: (4 , 1

利用 zerovalue和combOp 进行各分区间的聚合 :
0,0) + (1,1)+ (2,1)+ (3,1)+ (4,1) = (10,4

样例2 解释:
分区数 : 3
01
12,3
24,5

利用zerovalue (0,0) 和 seqOp 对各分区进行聚集 :
0 : (2,2)
1 : (6,3)===>来源(1,1)+(2,1)=(3,2)===>(3,2)+(3,1)====>(6,3)
2 : (10,3)

利用 zerovalue和combOp 进行各分区间的聚合 :
(1,1) + (2,2) + (6,3) + (10,3) = (19,9)
1
2
3
4
5
6
7
Application来说,资源是Executor。对于Executor来说资源是内存、core

standalone 集群模式当前只支持一个简单的跨应用程序的 FIFO 调度。然而,为了允许多个并发的用户,您可以控制每个应用程序能用的最大资源数。默认情况下,它将获取集群中的 all cores(核),这只有在某一时刻只允许一个应用程序运行时才有意义, 因为如果此时其他的核被占用, 自然无法获取资源, 运行程序, 此时是有多少核用多少核.

Spark中的调度模式主要有两种:FIFO和FAIR。
默认情况下Spark的调度模式是FIFO(先进先出),谁先提交谁先执行,后面的任务需要等待前面的任务执行。
而FAIR(公平调度)模式支持在调度池中为任务进行分组,不同的调度池权重不同,任务可以按照权重来决定执行顺序。
提交代码优化
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
https://www.cnblogs.com/hd-zg/p/6089207.html 原文
资源参数设置的不合理,可能会导致没有充分利用集群资源,作业运行会极其缓慢;或者设置的资源过大,队列没有足够的资源来提供,进而导致各种异常。总之,无论是哪种情况,都会导致Spark作业的运行效率低下,甚至根本无法运行。因此我们必须对Spark作业的资源使用原理有一个清晰的认识,并知道在Spark作业运行过程中,有哪些资源参数是可以设置的,以及如何设置合适的参数值。

启动原理:
我们使用spark-submit提交一个Spark作业之后,这个作业就会启动一个对应的Driver进程。根据你使用的部署模式(deploy-mode)不同,Driver进程可能在本地启动,也可能在集群中某个工作节点上启动。Driver进程本身会根据我们设置的参数,占有一定数量的内存和CPU core。而Driver进程要做的第一件事情,就是向集群管理器申请运行Spark作业需要使用的资源,这里的资源指的就是Executor进程。
在申请到了作业执行所需的资源之后,Driver进程就会开始调度和执行我们编写的作业代码了。Driver进程会将我们编写的Spark作业代码分拆为多个stage,每个stage执行一部分代码片段,并为每个stage创建一批task,然后将这些task分配到各个Executor进程中执行。
task是最小的计算单元,负责执行一模一样的计算逻辑(也就是我们自己编写的某个代码片段),只是每个task处理的数据不同而已。一个stage的所有task都执行完毕之后,会在各个节点本地的磁盘文件中写入计算中间结果,然后Driver就会调度运行下一个stage。下一个stage的task的输入数据就是上一个stage输出的中间结果。如此循环往复,直到将我们自己编写的代码逻辑全部执行完,并且计算完所有的数据,得到我们想要的结果为止。

  Spark是根据shuffle类算子来进行stage的划分。如果我们的代码中执行了某个shuffle类算子(比如reduceByKey、join等),那么就会在该算子处,划分出一个stage界限来。可以大致理解为,shuffle算子执行之前的代码会被划分为一个stage,shuffle算子执行以及之后的代码会被划分为下一个stage。因此一个stage刚开始执行的时候,它的每个task可能都会从上一个stage的task所在的节点,去通过网络传输拉取需要自己处理的所有key,然后对拉取到的所有相同的key使用我们自己编写的算子函数执行聚合操作(比如reduceByKey()算子接收的函数)。这个过程就是shuffle。

--total-executor-cores参数指定用的总core数量。若不指定则会用光所有剩下的cores。
--executor-memory
每个executor分配内存,若超过worker可用剩余内存则不会提交给此worker,若不可提交给任意worker则报错
--driver-memory
--driver-cores
--total-executor-cores
资源参数调优
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
https://www.cnblogs.com/hd-zg/p/6089207.html
☆☆☆☆num-executors:设置spark任务总共需要多少个Executors
建议:一般设置50-100个,设置太少,集群资源得不到利用.设置太大,大部分无法给与足够的资源
☆☆☆☆☆executor-memory:每个Executor内存大小
建议:直接决定了spark作业的性能,num-executors*executor-memory不能超过总内存,同时因为要是共享资源,所以通常是总量1/3-1/2
☆☆☆☆☆executor-cores:每个executor进程的CPU核心数
建议:num-executors * executor-cores不要超过队列总CPU core的1/3~1/2
driver-memory:设置Driver进程的内存
建议:通常不设置或者为1G,当报OOM内存溢出错误的时候,可能跟这个参数有关
☆☆☆☆☆spark.default.parallelism:设置每个stage的task数量,直接影响性能
建议:Spark作业的默认task数量为500~1000个较为合适,设置该参数为num-executors * executor-cores的2~3倍较为合适,比如Executor的总CPU core数量为300个,那么设置1000个task是可以的,此时可以充分地利用Spark集群的资源。
☆☆☆spark.storage.memoryFraction:RDD持久化数据在Executor内存中可占的比例,不够时候,会写入磁盘
建议:通常默认为0.6,尽量大点,因为spark作业中会有很多的持久化操作,尽量避免写入磁盘
☆☆☆spark.shuffle.memoryFraction:shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2
建议:RDD持久化操作较少,shuffle操作较多时,可提高占比.避免溢出写磁盘
开发调优
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
https://blog.csdn.net/u012102306/article/details/51322209
1.避免创建重复的RDD
2.尽可能复用同一个RDD
3.对多次使用的RDD进行持久化
4.尽量避免使用shuffle类算子===>shuffle过程,简单来说,就是将分布在集群中多个节点上的同一个key,拉取到同一个节点上,进行聚合或join等操作.===>很耗时间
5.使用map-side预聚合的shuffle操作
6.使用高性能的算子
7.广播大变量
8.使用Kryo优化序列化性能
9.优化数据结构


*****************************高性能的算子*******************************
reduceByKey/aggregateByKey替代groupByKey===>这样实现了现在分区按照key聚合,在整体
使用mapPartitions替代普通map
使用foreachPartitions替代foreach
使用filter之后进行coalesce操作
使用repartitionAndSortWithinPartitions替代repartition与sort类操作

***********************序列化********************
Spark默认使用的是Java的序列化机制
Spark同时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多

*****************************优化数据结构******************************
尽量避免使用耗内存的数据结构如以下三种
1.对象
2.集合,如hashmap和链表等.
3.字符串,每个字符串内部都有一个字符数组以及长度等额外信息
数据倾斜调优
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
整个Spark作业的运行进度是由运行时间最长的那个task决定的。数据倾斜只会发生在shuffle过程中 

现象:1.绝大多数task执行得都非常快,但个别task执行极慢。
2.某个正常运行的spark作业,突然出现oom.(内存溢出)

原理:在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理,比如按照key进行聚合或join等操作。

会触发shuffle操作的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等

解决方案:

解决方案1:过滤少数导致倾斜的key
将导致数据倾斜的key给过滤掉之后,这些key就不会参与计算了,自然不可能产生数据倾斜。
前提:发现导致倾斜的key就少数几个,而且对计算本身的影响并不大的话,那么很适合使用这种方案
优点:实现简单,而且效果也很好,可以完全规避掉数据倾斜。
缺点:适用场景不多,大多数情况下,导致倾斜的key还是很多的,并不是只有少数几个。

解决方案2:提高shuffle操作的并行度
建议优先使用这种方案,因为这是处理数据倾斜最简单的一种方案
在对RDD执行shuffle算子时,给shuffle算子传入一个参数,比如reduceByKey(1000),该参数就设置了这个shuffle算子执行时shuffle read task的数量。对于Spark SQL中的shuffle类语句,比如group by、join等,需要设置一个参数,即spark.sql.shuffle.partitions,该参数代表了shuffle read task的并行度,该值默认是200,对于很多场景来说都有点过小。
原理:更多的task分配更多的key,避免集中.
方案优点:实现起来比较简单,可以有效缓解和减轻数据倾斜的影响。
方案缺点:只是缓解了数据倾斜而已,没有彻底根除问题,根据实践经验来看,其效果有限。

解决方案3:两阶段聚合=====>适用于聚合类shuffle
(局部聚合+全局聚合)
原理:分次聚合,将原本相同的key通过附加随机前缀的方式,变成多个不同的key
方案优点:对于聚合类的shuffle操作导致的数据倾斜,效果是非常不错的。通常都可以解决掉数据倾斜,或者至少是大幅度缓解数据倾斜,将Spark作业的性能提升数倍以上。
方案缺点:仅仅适用于聚合类的shuffle操作,适用范围相对较窄。如果是join类的shuffle操作,还得用其他的解决方案
案例:比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着对打上随机数后的数据,执行reduceByKey等聚合操作,进行局部聚合,那么局部聚合结果,就会变成了(1_hello, 2) (2_hello, 2)。

解决方案4:将reduce join转为map join======>join
join操作中的一个RDD或表的数据量比较小(比如几百M或者一两G)
原理:普通的join是会走shuffle过程的,而一旦shuffle,就相当于会将相同key的数据拉取到一个shuffle read task中再进行join,此时就是reduce join。但是如果一个RDD是比较小的,则可以采用广播小RDD全量数据+map算子来实现与join同样的效果,也就是map join.
方案优点:对join操作导致的数据倾斜,效果非常好,因为根本就不会发生shuffle,也就根本不会发生数据倾斜。
方案缺点:适用场景较少,因为这个方案只适用于一个大表和一个小表的情况。毕竟我们需要将小表进行广播,此时会比较消耗内存资源,driver和每个Executor内存中都会驻留一份小RDD的全量数据。如果我们广播出去的RDD数据比较大,比如10G以上,那么就可能发生内存溢出了。因此并不适合两个都是大表的情况。

解决方案5:采样倾斜key并分拆join操作======>join
一个表的key较均匀,而另外一个表的少数几个key数据量较大
实现思路:对包含少数几个数据量过大的key的那个RDD,通过sample算子采样出一份样本来,然后统计一下 每个key的数量,计算出来数据量最大的是哪几个key。
然后将这几个key对应的数据从原来的RDD中拆分出来,形成一个单独的RDD,并给每个key都打上n 以内的随机数作为前缀,而不会导致倾斜的大部分key形成另外一个RDD。
接着将需要join的另一个RDD,也过滤出来那几个倾斜key对应的数据并形成一个单独的RDD,将每 条数据膨胀成n条数据,这n条数据都按顺序附加一个0~n的前缀,不会导致倾斜的大部分key也 形成另外一个RDD。
再将附加了随机前缀的独立RDD与另一个膨胀n倍的独立RDD进行join,此时就可以将原先相同的 key打散成n份,分散到多个task中去进行join了。
而另外两个普通的RDD就照常join即可。
最后将两次join的结果使用union算子合并起来即可,就是最终的join结果
方案优点:对于join导致的数据倾斜,如果只是某几个key导致了倾斜,采用该方式可以用最有效的方式打散key进行join。而且只需要针对少数倾斜key对应的数据进行扩容n倍,不需要对全量数据进行扩容。避免了占用过多内存。
方案缺点:如果导致倾斜的key特别多的话,比如成千上万个key都导致数据倾斜,那么这种方式也不适合。

解决方案6:使用随机前缀和扩容RDD进行join ======>join
如果在进行join操作时,RDD中有大量的key导致数据倾斜,那么进行分拆key也没什么意义
实现思路:首先查看RDD/Hive表中的数据分布情况,找到那个造成数据倾斜的RDD/Hive表,比如有多个key 都对应了超过1万条数据。
然后将该RDD的每条数据都打上一个n以内的随机前缀。
同时对另外一个正常的RDD进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都依次打 上一个0~n的前缀。
最后将两个处理后的RDD进行join即可。
原理:将原先一样的key通过附加随机前缀变成不一样的key,然后就可以将这些处理后的“不同key”分散到 多个task中去处理,而不是让一个task处理大量的相同key。
方案优点:对join类型的数据倾斜基本都可以处理,而且效果也相对比较显著,性能提升效果非常不错。
方案缺点:该方案更多的是缓解数据倾斜,而不是彻底避免数据倾斜。而且需要对整个RDD进行扩容,对内存资源要求很高。

解决方案7:多种方案组合使用
如果只是处理较为简单的数据倾斜场景,那么使用上述方案中的某一种基本就可以解决。但是如果要处理一个较为复杂的数据倾斜场景,那么可能需要将多种方案组合起来使用
shuffle调优====>相比其他三个较为次要
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
  大多数Spark作业的性能主要就是消耗在了shuffle环节,因为该环节包含了大量的磁盘IO、序列化、网络数据传输等操作。
spark.shuffle.file.buffer
默认值:32k
参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

spark.reducer.maxSizeInFlight
默认值:48m
参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。

spark.shuffle.io.retryWait
默认值:5s
参数说明:具体解释同上,该参数代表了每次重试拉取数据的等待间隔,默认是5s。
调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。

spark.shuffle.memoryFraction
默认值:0.2
参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。
调优建议:在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。

spark.shuffle.manager
默认值:sort
参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。
调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前发现了一些相应的bug。

spark.shuffle.sort.bypassMergeThreshold
默认值:200
参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。
调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。


spark.shuffle.consolidateFiles
默认值:false
参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。
调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。
1
2
3
4
5
6
7
8
9
>>> from pyspark.sql.functions import pandas_udf, PandasUDFType
>>> df = spark.createDataFrame(
... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
... ("id", "v"))
>>> :pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP) # doctest: +SKIP
... def normalize(pdf):
... v = pdf.v
... return pdf.assign(v=(v - v.mean()) / v.std())
>>> df.groupby("id").apply(normalize).show() # doctest: +SKIP

下面是使用RDD的场景和常见案例:

  • 你希望可以对你的数据集进行最基本的转换、处理和控制;
  • 你的数据是非结构化的,比如流媒体或者字符流;
  • 你不希望像进行列式处理一样定义一个模式,通过名字或字段来处理或访问数据属性;
  • 你并不在意通过DataFrame和Dataset进行结构化和半结构化数据处理所能获得的一些优化和性能上的好处;

该什么时候使用DataFrame或Dataset呢?

  • 如果你需要丰富的语义、高级抽象和特定领域专用的API,那就使用DataFrame或Dataset;

  • 如果你的处理需要对半结构化数据进行高级处理,如filter、map、aggregation、average、sum、SQL查询、列式访问或使用lambda函数,那就使用DataFrame或Dataset;

  • 如果你想在编译时就有高度的类型安全,想要有类型的JVM对象,用上Catalyst优化,并得益于Tungsten生成的高效代码,那就使用Dataset;

  • 如果你想在不同的Spark库之间使用一致和简化的API,那就使用DataFrame或Dataset;

  • 如果你是R语言使用者,就用DataFrame;

  • 如果你是Python语言使用者,就用DataFrame,在需要更细致的控制时就退回去使用RDD;

DataFrame与RDD相同之处,都是不可变分布式弹性数据集。不同之处在于,DataFrame的数据集都是按指定列存储,即结构化数据。相似于传统数据库中的表。DataFrame的设计是为了让大数据解决起来更容易。

RDD适合需要low-level函数式编程和操作数据集的情况;DataFrame和Dataset适合结构化数据集,用high-level和特定领域语言(DSL)编程,空间效率高和速度快。

在正常情况下都不推荐使用 RDD 算子
  • 在某种抽象层面来说,使用 RDD 算子编程相当于直接使用最底层的 Java API 进行编程
  • RDD 算子与 SQL、DataFrame API 和 DataSet API 相比,更偏向于如何做,而非做什么,这样优化的空间很少
  • RDD 语言不如 SQL 语言友好
仅在一些特殊情况下可以使用 RDD
  • 你希望可以对你的数据集进行最基本的转换、处理和控制;
  • 你的数据是非结构化的,比如流媒体或者字符流;
  • 你想通过函数式编程而不是特定领域内的表达来处理你的数据;
  • 你不希望像进行列式处理一样定义一个模式,通过名字或字段来处理或访问数据属性(更高层次抽象);
  • 你并不在意通过 DataFrame 和 Dataset 进行结构化和半结构化数据处理所能获得的一些优化和性能上的好处;

DataFrame

与 RDD 相似,DataFrame 也是数据的一个不可变分布式集合。但与 RDD 不同的是,数据都被组织到有名字的列中,就像关系型数据库中的表一样。设计 DataFrame 的目的就是要让对大型数据集的处理变得更简单,它让开发者可以为分布式的数据集指定一个模式,进行更高层次的抽象。它提供了特定领域内专用的 API 来处理你的分布式数据,并让更多的人可以更方便地使用 Spark,而不仅限于专业的数据工程师。

Spark 2.0 中,DataFrame 和 Dataset 的 API 融合到一起,完成跨函数库的数据处理能力的整合。在整合完成之后,开发者们就不必再去学习或者记忆那么多的概念了,可以通过一套名为 Dataset 的高级并且类型安全的 API 完成工作。

补充

1
2
3
4
5
driver主要负责向excetuor分发task和代码,负责计算的调度,cluster manager负责资源的调度(可以是standalone,yarn,mesos),driver会向cluster申请资源

executor负责代码的执行,内部包含很多个executor进程,每个stage计算完成后会将结果写入磁盘进行存储,输入下一个stage,stage的划分是按照shuffle算子

driver:主要功能是创建sparkcontext,是一切程序的入口,同时负责和clustermanager进行通信,进行资源的申请和任务的分配,当所有任务完成后会将sparkcontext关闭