spark入门

Apache重要的三个基金会项目(hadoop,spark,storm)

Spark提供了内存计算,减少了迭代计算时的IO开销;虽然,Hadoop已成为大数据的事实标准,但其MapReduce分布式计算模型仍存在诸多缺陷,而Spark不仅具备Hadoop MapReduce所具有的优点,且解决了Hadoop MapReduce的缺陷。

Spark:可以作为一个更加快速、高效的大数据计算平台。基于内存的大数据并行计算框架.将计算分解成多个任务在不同的机器上运行.

别人的学习总结:https://blog.csdn.net/qq_33247435/article/details/83653584#8Spark_71

spark的概念

​ 1.$RDD$(resilient distribute dataset 弹性分布式训练集).是分布式内存里的一个抽象概念,表示的是高度受限的共享内存模型

​ 2.$DAG$(directed acyclic gragh),有向无环图,表明了RDD之间的依赖关系

​ 3.$EXECUTOR:$运行在工作节点上的一个进程,负责运行任务,以及应用程序存储数据

​ 4.$程序:$编写的spark程序

​ 5.$任务:$运行在executor上的工作单元

​ 6.$作业:$包含多个RDD及对应RDD上的操作

​ 7.$阶段:$作业的基本调度单位,一个作业会分成多组任务,每组任务称为阶段

​ 8.$shuffle过程:$简单认为就是将不同节点上的相同key拉到同一个节点上计算

​ 9.$SparkSession:$代表了spark集群中的一个连接,在应用程序实例化的时候启动

====>spark的入口,2.0之前spark core是sparkcontext,spark sql是sqlcontext,sparkstreaming应用使用streamingContext.2.0之后,sparksession对象把所有的对象组合到一起.称为所有程序统一的入口

RDD详解(待补充)

pass

spark运行架构

1609144230200

​ $driver:$每个应用的任务控制节点

​ $cluster manager:$集群资源管理器

​ $node:$运行作业任务的工作节点

​ $Executor$:每个工作节点上负责具体任务的执行进程

​ $\textcolor{red}{关系:}$

​ 一个应用由一个一个控制节点(driver)和若干个作业构成,一个作业由若干个阶段构成,一个阶段由多个任务构成.当执行一个应用时,任务控制节点会向集群管理器申请资源,启动executor,并向executor发送应用程序和代码和文件.然后在executor上执行任务,运行结束后,执行结果会返回给任务控制节点,或者写到数据库中.

​ $\textcolor{red}{Executor优点:}$

​ 1.采用的是多线程(map reduce 使用的是进程模型),减少了开销

​ 2.executor中有一个blockmanager存储模块,会将内存和磁盘作为存储模块,当需要多轮迭代计算的时候,可以将数据存储到这个模块.有效减少了IO开销;或者在交互式查询场景下,预先将表缓存到该存储系统上,从而可以提高读写IO性能。

spark运行基本流程
1.当一个spark应用被提交时,需要为这个应用提供基本的运行环境,即有任务控制节点(driver)创建一个sparkcontext,负责和资源管理器的通信以及资源的申请和任务的分配和监控等.sparkcontext会向资源管理器注册并申请运行Executor的资源.

​ 2.资源管理器为Executor分配资源,并启动Executor进程,Executor运行情况将随着心跳发送到资源管理器上

​ 3.任务在Executor上运行,并将结果返回给任务调度器,然后反馈给DAG调度器,运行完毕,写入资源并释放所有资源.

​ $\textcolor{orange}{详解}$

​ 1.构建spark applicantion的运行环境,启动SparkContext

​ 2.sparkcontext向资源管理器申请运行Executor

​ 3.Executor向Sparkcontext申请Task

​ 4.SparkContext将应用程序分发给Executor

​ 5.sparkcontext构建DAG图,将DAG图分解成Stage,将tasket发送给Task Scheduler,最后由Task Scheduler将Task发送给Executor运行

​ 6.Task在Executor上运行,运行完释放所有资源

​ $\textcolor{red}{SparkContext原理:}$

​ 依据RDD的依赖关系构建DAG图,然后将DAG图提交给DAG调度器进行解析,将DAG图分解成多个阶段,并计算出各个阶段的依存关系,然后把一个个任务集提交给底层的调度器进行处理.Executor向sparkcontext申请人无,任务调度器将任务发送给Executor并将应用程序代码发送给Executor

1609221852735

spark on standalone流程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
1、我们提交一个任务,任务就叫Application
2、初始化程序的入口SparkContext,
  2.1 初始化DAG Scheduler
  2.2 初始化Task Scheduler
3、Task Scheduler向master去进行注册并申请资源(CPU Core和Memory)
4、Master根据SparkContext的资源申请要求和Worker心跳周期内报告的信息决定在哪个Worker上分配资源,然后在该Worker上获取资源,然后启动StandaloneExecutorBackend;顺便初
始化好了一个线程池
5、StandaloneExecutorBackend向Driver(SparkContext)注册,这样Driver就知道哪些Executor为他进行服务了。
  到这个时候其实我们的初始化过程基本完成了,我们开始执行transformation的代码,但是代码并不会真正的运行,直到我们遇到一个action操作。生产一个job任务,进行stage的划分
6、SparkContext将Applicaiton代码发送给StandaloneExecutorBackend;并且SparkContext解析Applicaiton代码,构建DAG图,并提交给DAG Scheduler分解成Stage(当碰到Action操作 时,就会催生Job;每个Job中含有1个或多个Stage,Stage一般在获取外部数据和shuffle之前产生)。
7、将Stage(或者称为TaskSet)提交给Task Scheduler。Task Scheduler负责将Task分配到相应的Worker,最后提交给StandaloneExecutorBackend执行;
8、对task进行序列化,并根据task的分配算法,分配task
9、对接收过来的task进行反序列化,把task封装成一个线程
10、开始执行Task,并向SparkContext报告,直至Task完成。
11、资源注销



心跳是分布式技术的基础,我们知道在Spark中,是有一个Master和众多的Worker,那么Master怎么知道每个Worker的情况呢,这就需要借助心跳机制了。心跳除了传输信息,另一个主要的作用就是Worker告诉Master它还活着,当心跳停止时,方便Master进行一些容错操作,比如数据转移备份等等。
spark部署

​ $\textcolor{red}{三种部署方式:}$1.standalone 2.spark on Mesos 3.spark on YARN

​ $\textcolor{green}{standalone:}$

​ 分布式集群服务,自带的完整服务,Spark自己进行资源管理和任务监控,一定程度上来说,此模式是其他两个模式的基础

​ $\textcolor{green}{Sapark on Mesos:}$

​ 官方推荐(都是apache的),spark设计之初就考虑支持mesos,spark在mesos上比在YARN上更灵活

​ $Mesos$是一种资源调度管理框架

​ $两种调度模式:$

​ 1.粗粒度模式

​ 每个应用程序的运行环境由一个driver和若干个executor组成.其中,每个executor占用若干资源,内部可运行多个Task,应用程序在开始之前需要将运行环境的资源全部申请好,且运行过程中要一直占用这些资源,即使不用.当程序结束时,会进行回收.

​ 2.细粒度模式

​ 粗粒度会造成很大的资源浪费,动态分配

​ $\textcolor{red}{Spark on Yarn}$

​ 是一种最有前景的部署模式。但限于YARN自身的发展,目前仅支持粗粒度模式

​ Spark可运行于YARN之上,与Hadoop进行统一部署

​ 分布式部署集群,这是由于YARN(资源管理器)上的Container资源是不可以动态伸缩的,一旦Container启动之后,可使用的资源不能再发生变化.

spark相对hadoop的优势

​ hadoop的mapreduce计算模型延迟过高,无法胜任$\textcolor{red}{实时}$和$\textcolor{red}{快速}$计算,只适用离线批处理

​ $hadoop的缺点$:

​ 1.表达能力有限.分为map阶段和reduce阶段.难以描述复杂数据处理过程.

​ 2.磁盘io开销大.每次执行都需要从磁盘读取数据,且存的时候需要将中间数据存到磁盘中.

​ 3.延迟高,一次计算可能需要分解成一系列按顺序执行的MapReduce任务,任务之间的衔接涉及到IO开销,会产生较高的延迟.

​ $Spark的优点$:

​ 1.计算模式也属于MapReduce,但不局限于map和reduce操作,提供多种数据类型操作,比MapReduce更灵活

​ 2.spark提供了内存计算,中间结果直接放到内存中

​ 3.spark使用的是DAG进行任务调度,比MapReduce的迭代执行机制强

​ $整体:$

​ spark最大的优点就是将计算结果,中间数据存储到内存中,大大减少了IO开销.因为spark更适合迭代运算多的数据挖掘和机器学习运算

​ $总:$尽管整体上spark比hadoop要好,但是无法完全替代hadoop,通常是用来替代hadoop的MapReduce部分

spark生态

spark生态

​ spark生态主要包含$\textcolor{red}{Spark Core}$ ,$\textcolor{red}{Spark Sql}$,$\textcolor{red}{Spark Screaming}$,$\textcolor{red}{MLlib}$,$\textcolor{red}{Graphx}$

​ $\textcolor{red}{Spark Core:}$Spark Core包含Spark的基本功能,如内存计算、任务调度、部署模式、故障恢复、存储管理等。通常所说的Apache Spark,就是指Spark Core

​ $\textcolor{red}{Spark Sql:}$Spark SQL允许直接处理RDD,同时也可查询Hive、HBase等外部数据源。Spark SQL的一个重要特点是其能够统一处理关系表和RDD,使得开发人员可以轻松地使用SQL命令进行查询,并进行更复杂的数据分析;

​ $\textcolor{red}{Spark Screaming:}$ Spark Streaming支持高吞吐量、可容错处理的实时流数据处理,其核心思路是将流式计算分解成一系列短小的批处理作业。

​ $\textcolor{red}{Graphx:}$GraphX是Spark中用于图计算的API

​ $\textcolor{red}{MLlib:}$MLlib提供了常用机器学习算法的实现,包括聚类、分类、回归、协同过滤等

在pyspark中执行词频统计

​ $案例1.词频统计$

​ ①首先创建一个worldcount目录(shell 命令下)

​ ②然后创建一个txt文件,里面随便写一些文字.作为统计的原材料

​ ③词频统计需要启动pyspark.cd /usr/local/spark 然后 ./bin/pyspark

​ ④加载文件—>确定是在本地还是在分布式的hdfs上

​ $本地$:

​ textFile = sc.textFile(‘file:///usr/local/spark/mycode/wordcount/word.txt’)(要加载本地文件,必须采用“file:///”开头的这种格式)–>惰性的,需要first()这种才能打印出数据

​ textFile.first()(打印第一行数据,文件不存在会显示拒绝连接)

​ $HDFS$:

​ 需要首先启动Hadoop中的HDFS组件

​ 1. cd /usr/local/hadoop

​ 2. ./sbin/start-dfs.sh

​ $\textcolor{red}{上传文件到hdfs上}$./bin/hdfs dfs -put /usr/local/spark/mycode/wordcount/word.txt .

​ 3.加载文件textFile = sc.textFile(“hdfs://localhost:9000/user/hadoop/word.txt”)–>惰性的

​ ⑤写代码,在pyspark窗口(类似于ipython那种>>>),代码如下

​ textFile = sc.textFile(“file:///usr/local/spark/mycode/wordcount/word.txt”)

​ wordCount = textFile.flatMap(lambda line: line.split(“ “)).map(lambda word: (word,1)).reduceByKey(lambda a, b : a + b)

​ wordCount.collect()

​ $\textcolor{red}{代码解释:}$

​ 1.第一行即从本地加载文件数据

​ 2.第二行textFile.flatMap(lambda line: line.split(“ “))表示按行处理,每行按照空白符分割.这样每行得到一个单词集合.textFile.flatMap()操作就把这多个单词集合“拍扁”得到一个大的单词集合.map(lambda word: (word,1))会遍历单词集合中的每一个单词,并执行Lamda表达式word : (word, 1).

​ 程序执行到这里,已经得到一个RDD,这个RDD的每个元素是(key,value)形式的tuple。最后,针对这个RDD,执行reduceByKey(labmda a, b : a + b)操作,这个操作会把所有RDD元素按照key进行分组,然后使用给定的函数(这里就是Lamda表达式:a, b : a + b)

​ 如:(“hadoop”,1)和(“hadoop”,1)—–>(“hadoop”,2)

编写独立应用程序执行词频统计

​ 1.创建test.py文件内容如下

​ from pyspark import SparkContext

​ sc = SparkContext( ‘local’, ‘test’)

​ textFile = sc.textFile(“file:///usr/local/spark/mycode/wordcount/word.txt”)

​ wordCount = textFile.flatMap(lambda line: line.split(“ “)).map(lambda word: (word,1)).reduceByKey(lambda a, b : a + b)

​ wordCount.foreach(print)

在集群上运行spark
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
1.启动hadoop集群
①cd /usr/local/hadoop/
②sbin/start-all.sh
2.启动spark的master节点和索引slaves节点
①cd /usr/local/spark/
②sbin/start-master.sh
③sbin/start-slaves.sh
3.介绍两种资源管理方式standalone 和 spark on yarn
独立资源管理器:
1>安装jar包
向独立集群管理器提交应用,需要把spark://master:7077作为主节点参数传递给spark-submit.
eg:bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://master:7077 examples/jars/spark-examples_2.11-2.0.2.jar 100 2>&1 | grep "Pi is roughly"
2>在集群中运行pyspark
①在shell中输入命令进入pyspark中
cd /usr/local/spark/
bin/pyspark --master spark://master:7077
spark on yarn:
1>安装应用程序jar包
需要把yarn-cluster作为主节点参数传递给spark-submit
2>在集群中运行pyspark
①bin/pyspark --master yarn

$\textcolor{red}{spark-submit}$ 可以提交任务到 spark 集群执行,也可以提交到 hadoop 的 yarn 集群执行。

bin/spark-submit –class org.apache.spark.examples.SparkPi –master表示以集群模式启动spark

Jupyter Notebook调试PySpark
RDD的弹性
1
2
3
1.自动的进行内存和磁盘的存储切换
2.Task如果失败,会自动进行特定次数的重试
3.数据分片的高度弹性(coalesce),优先内存,内存不够才放磁盘

主从架构 和 P2P架构

宽依赖 债依赖
shuffle操作

fork和join

RDD运行原理
1
2
3
4
5
6
7
1.RDD无法直接更改数据,每次操作都会生成一个新的RDD
2.每个RDD都会分成很多个分区,每个分区都是部分数据集片段,一个RDD的不同分区可以保存到不同集群的不同节点上,从而可以实现在不同节点的并行计算
3.执行过程:读入外部的数据源(或者内存中的集合)进行 RDD 创建;
RDD 经过一系列的 “转换” 操作,每一次都会产生不同的 RDD,供给下一个转换使用;
最后一个 RDD 经过 “行动” 操作进行处理,并输出指定的数据类型和值。
RDD 采用了惰性调用,即在 RDD 的执行过程中,所有的转换操作都不会执行真正的操作,只会记录依赖关 系,而只有遇到了行动操作,才会触发真正的计算,并根据之前的依赖关系得到最终的结果。
4.RDD发生行为操作并生成输出数据时,Spark 才会根据 RDD 的依赖关系生成有向无环图(DAG),并从起点开始执行真正的计算。正是 RDD 的这种惰性调用机制,使得转换操作得到的中间结果不需要保存,而是直接管道式的流入到下一个操作进行处理
RDD可以实现高效计算的原因
1
2
3
4
5
1.高效的容错性。可以直接利用 RDD 之间的依赖关系来重新计算得到丢失的分区。
2.中间结果持久化到内存。不需要存储到磁盘,降低了IO.
3.存放的数据可以是 Java 对象,避免了不必要的对象序列化和反序列化开销。
序列化:将数据转换为字节存储的过程(存储数据到磁盘)
反序列化:将二进制字节码转换成java对象(从磁盘读数据)
RDD之间的依赖关系
1
2
3
4
5
6
宽依赖:父 RDD 与子 RDD 之间的一对多关系,即一个父 RDD 转换成多个子 RDD

窄依赖:父 RDD 和子 RDD 之间的一对一关系或者多对一关系,主要包括的操作有 map、filter、union 等

宽依赖的RDD通常伴随着Shuffle操作(非常复杂且昂贵的操作,包含在executors和machines上的数据复制)
首先需要计算好所有父分区数据,然后在节点之间进行 Shuffle.在进行数据恢复时,窄依赖只需要根据父 RDD 分区重新计算丢失的分区即可,而且可以并行地在不同节点进行重新计算。而对于宽依赖而言,单个节点失效通常意味着重新计算过程会涉及多个父 RDD 分区,开销较大。

360截图17290508434977

RDD编程

######1.通过加载数据创建RDD

1
2
3
4
5
6
7
8
9
from pyspark import SparkContext
sc = SparkContext( 'local', 'test')
lines = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt") #hdfs,HBase、Cassandra、Amazon S3等外部数据源中加载数据集等文件系统加载
textFile = sc.textFile('file:///usr/local/spark/mycode/wordcount/word.txt') #本地节点加载

注意:
1.如果使用了本地文件系统的路径,必须保证在所有的worker节点上,也可以采用相同的路径访问到改文件
(既可以复制到每个worker节点上,也可以使用网络挂载共享文件系统)
2.textFile输入的参数可以是文件名,可以说目录,也可以说压缩文件等.
2.通过并行集合(数组)创建RDD
1
2
3
调用sparkcontext的parallelize方法,在Driver中一个已存在的集合(数组)上创建
nums = [1,2,3,4]
rdd = sc.parallelize(nums)
RDD操作
1
2
1.转换:基于现有的数据集创建一个新的数据集
2.行动:在数据集上进行运算,返回计算值
1.转换操作
1
2
3
4
5
6
7
8
对于RDD而言,每一次转换操作都会产生不同的RDD,供给下一个“转换”使用。转换得到的RDD是惰性求值的,也就是说,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会发生真正的计算,开始从血缘关系源头开始,进行物理的转换操作。

下面列出一些常见的转换操作(Transformation API):
* filter(func):筛选出满足函数func的元素,并返回一个新的数据集
* map(func):将每个元素传递到函数func中,并将结果返回为一个新的数据集
* flatMap(func):与map()相似,但每个输入元素都可以映射到0或多个输出结果
* groupByKey():应用于(K,V)键值对的数据集时,返回一个新的(K, Iterable)形式的数据集
* reduceByKey(func):应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中的每个值是将每个key传递到函数func中进行聚合
2.行动操作
1
2
3
4
5
6
7
8
行动操作是真正触发计算的地方。Spark程序执行到行动操作时,才会执行真正的计算,从文件中加载数据,完成一次又一次转换操作,最终,完成行动操作得到结果。
下面列出一些常见的行动操作(Action API):
* count() 返回数据集中的元素个数
* collect() 以数组的形式返回数据集中的所有元素
* first() 返回数据集中的第一个元素
* take(n) 以数组的形式返回数据集中的前n个元素
* reduce(func) 通过函数func(输入两个参数并返回一个值)聚合数据集中的元素
* foreach(func) 将数据集中的每个元素传递到函数func中运行*

######3.惰性机制解释

1
2
3
4
5
6
7
lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s : len(s))
totalLength = lineLengths.reduce( lambda a, b : a + b)

1.第一行textFile读取文件构建一个RDD,textFile()只是一个转换操作,并不会直接将数据读到内存中,这时的lines只是一个指向这个文件的指针.
2.map是一个转换操作,并不会立即计算每行的长度
3.reduce是一个动作,这时就会触发真正的计算.spark会把计算分解和产能很多个小任务在不同的机器上运行,每台机器运行位于属于它的map和reduce.最后把结果返回给Driver.
4.持久化
1
2
3
4
5
6
7
8
9
10
11
由于RDD采用的是惰性求值的方法,每次遇到行动操作都会从头开始计算,当程序有多个行动时,这样的代价就会很大.

为了解决这个问题,通过持久化(缓存)机制避免重复计算的开销.通过persisit()方法对RDD标记为持久化(当触发第一个行动操作后,会将计算结果持久化,持久化的后的RDD将会保留在计算节点的内存中被后面的行动操作重复使用

unpersist()方法手动地把持久化的RDD从缓存中移除。

list = ["Hadoop","Spark","Hive"]
rdd = sc.parallelize(list)
rdd.cache() //会调用persist(MEMORY_ONLY),但是,语句执行到这里,并不会缓存rdd,这是rdd还没有被计算生成
print(rdd.count()) //第一次行动操作,触发一次真正从头到尾的计算,这时才会执行上面的rdd.cache(),把这个rdd放到缓存中
print(','.join(rdd.collect())) //第二次行动操作,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rdd

$rdd.foreach(print)或者rdd.map(print)打印输出$

$rdd.collect().foreach(print):将所有节点的数据打印,容易爆内存$

$rdd.take(100).foreach(print):打印RDD部分数据$

5,键值对RDD
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
#首先创建键值对RDD---两种方法
##方法1.读文件
lines = sc.textFile("file:///usr/local/spark/mycode/pairrdd/word.txt")
pairRDD = lines.flatMap(lambda line : line.split(" ")).map(lambda word : (word,1))
pairRDD.foreach(print)
##方法2.通过列表
list = ["Hadoop","Spark","Hive","Spark"]
rdd = sc.parallelize(list)
pairRDD = rdd.map(lambda word : (word,1))
pairRDD.foreach(print)

#输出
'''
(Hadoop,1)
(Spark,1)
(Hive,1)
(Spark,1)
'''

#-----------------------------常用的键值对转换操作------------------------------
#reduceByKey()、groupByKey()、sortByKey()、join()、cogroup()等

#==============================reduceByKey()==================================
#用func函数合并具有相同键的值
pairRDD.reduceByKey(lambda a,b : a+b).foreach(print) #a,b都表示键对应的value,表示按照键合并,将值相加
'''
输出
(Spark,2)
(Hive,1)
(Hadoop,1)
'''

#==============================groupByKey()==================================
#按照键进行分组
pairRDD.groupByKey().foreach(print)
'''
输出
(Spark,(1,1))
(Hive,(1,))
(Hadoop,(1,))
'''

#==============================keys()==================================
pairRDD.keys().foreach(print) #{“spark”,”spark”,”hadoop”,”hadoop”}
'''
输出
hadoop
spark
hive
spark
'''

#==============================values()==================================
pairRDD.values().foreach(print)#{1,2,3,5}
'''
输出
1
2
3
5
'''

#==============================sortByKey()==================================
pairRDD.sortByKey() #返回根据键排序的RDD
'''
输出
(Hadoop,1)
(Hive,1)
(Spark,1)
(Spark,1)
'''

#==============================mapValues()==================================
pairRDD.mapValues(lambda x : x+1) #对RDD键值对的所有value做相同的处理
'''
输出
(Hadoop,2)
(Spark,2)
(Hive,2)
(Spark,2)
'''

#==============================join()==================================
pairRDD1 = sc.parallelize([('spark',1),('spark',2),('hadoop',3),('hadoop',5)])
pairRDD2 = sc.parallelize([('spark','fast')])
pairRDD1.join(pairRDD2).foreach(print) #join默认内连接,相同的键才会返回
'''
输出
('spark',1,'fast')
('spark',2,'fast')
'''

#===================================实例===================================
#计算每个键对应的平均值
rdd = sc.parallelize([("spark",2),("hadoop",6),("hadoop",4),("spark",6)])
rdd.mapValues(lambda x : (x,1)).reduceByKey(lambda x,y : (x[0]+y[0],x[1] + y[1])).mapValues(lambda x : (x[0] / x[1])).collect()
'''
输出:
[('hadoop', 5.0), ('spark', 4.0)]
'''

collect()是一个行动操作,功能是以数组的形式返回数据集中的所有元素,当我们要实时查看一个RDD中的元素内容时,就可以调用collect()函数。

#####共享变量

1
2
3
4
目的:要在多个任务之间共享变量,或者在任务(Task)和任务控制节点(Driver Program)之间共享变量
Spark提供了两种类型的变量:广播变量(broadcast variables)和累加器(accumulators)
广播变量:用来把变量在所有节点的内存之间进行共享。
累加器:则支持在所有不同节点之间进行累加计算(比如计数或者求和)。
1.广播变量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
通过广播方式进行传播的变量,会经过序列化,然后在被任务使用时再进行反序列化。

broadcastVar = sc.broadcast([1, 2, 3])
broadcastVar.value

一旦广播变量创建后,普通变量v的值就不能再发生修改,从而确保所有节点都获得这个广播变量的相同的值。


用途:经常需要把两个数据集组合起来获取结果数据集
方法1:可以直接以rdd形式连接两个数据集====>但是可能会导致数据混洗(shuffle),代价很大
方法2:将小的数据集初始化为广播变量,原理是将将变量复制到所有节点上

目的:进程间共享数据


要点:
1.使用广播变量避免了数据混洗
2.每个节点复制一份数据而非每个任务复制一次
3.广播变量可以被多个任务多次使用(广播变量的好处,不需要每个task带上一份变量副本,而是变成每个节点的executor才一份副本)
2.累加器
1
2
3
4
通常可以被用来实现计数器(counter)和求和(sum)
accum = sc.accumulator(0)
sc.parallelize([1, 2, 3, 4]).foreach(lambda x : accum.add(x))
accum.value
RDD打印数据
1
2
3
1.rdd.foreach(print)
2.rdd.collect() ==可能导致内存溢出
3.rdd.take(100)

#####

RDD运行原理—-阶段的划分和RDD的运行过程

1
2
3
4
宽依赖窄依赖
宽依赖:父RDD的分区被子RDD的多个分区使用(一对一,多对一)
窄依赖:父RDD的每个分区都只被子RDD的一个分区使用 (一对多)
shuffle操作:洗牌

fork and join机制

####standalone环境配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
Spark standalone部署配置
1.在home路径下创建文件夹spark,然后cd spark----->这个文件夹需要所有用户都可以使用
1.配置JAVA环境
(1)在spark文件夹下,创建java文件夹,然后cd java
(2)将压缩包jdk-15.0.1_linux-x64_bin.tar.gz 上传到java目录下
(3)解压缩jdk包.使用命令tar -xvf jdk-15.0.1_linux-x64_bin.tar.gz 会出现一个文件夹jdk-15.0.1
(4)cd jdk-15.0.1 进入文件夹下,使用pwd命令查看当前路径,并记住路径,下面要用
(5)打开配置文件 vim /etc/profile(非管理 ~/.bashrc),加入下面配置,JAVA_HOME为(4)输出的路径
JAVA_HOME=/home/spark/java/jdk-15.0.1
CLASSPATH=$JAVA_HOME/lib:$JAVA_HOME/jre/lib
PATH=$PATH:$JAVA_HOME/bin:$JAVA_HOME/jre/bin

(export JAVA_HOME=/home/spark/java/jdk-15.0.1
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar 上面那个如果没成功试试这个)
(6)source /etc/profile 保存修改
(7)输入javac,未报错即成功配置java环境
(8)执行两次cd .. 进入spark目录下

2.配置spark环境
(1)在spark目录下创建pyspark文件夹,mkdir pyspark
(2)cd pyspark,然后将压缩包spark-3.0.1-bin-hadoop3.2.tgz 上传到pyspark目录下
(3)解压缩tar -xvf spark-3.0.1-bin-hadoop3.2.tgz
(4)cd spark-3.0.1-bin-hadoop3.2/conf ,进入文件夹,可看到以下文件

(5)cp spark-env.sh.template spark-env.sh ,复制一个spark-env.sh文件
(6)然后vim spark-env.sh,在末尾加入配置
export PYSPARK_PYTHON=/usr/bin/python3
(7)source spark-env.sh

(8)打开配置文件 vim /etc/profile(非管理vim ~/.bashrc vi ~/.bash_profile),加入下面配置,SPARK_HOME为spark的安装路径
SPARK_HOME=/home/spark/pyspark/spark-3.0.1-bin-hadoop3.2
export PATH=$PATH:$SPARK_HOME/bin
export PYSPARK_PYTHON=python3
(9)source /etc/profile
(10)pip3 install pyspark -i https://pypi.tuna.tsinghua.edu.cn/simple
(11)输入pyspark,进入下面的页面即成功


注意:
1.需要在227,231和217分别进行相同的配置
2.创建的文件夹spark,需要所有用户均可操作(用以启动服务,跑代码)

当三台全部配置好的时候:
进入217:
1.cd /home/spark/pyspark/spark-3.0.1-bin-hadoop3.2/sbin
2.执行 ./start-master.sh
进入231:
1. cd /home/spark/pyspark/spark-3.0.1-bin-hadoop3.2/sbin
2. 执行 ./start-slave.sh spark://192.168.0.217:7077
进入227:
1. cd /home/spark/pyspark/spark-3.0.1-bin-hadoop3.2/sbin
2. 执行 ./start-slave.sh spark://192.168.0.217:7077

export JAVA_HOME=/home/kuailiang/2020/java/jdk1.8.0_281
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
问题
1
2
3
4
5
6
7
8
9
10
11
12
1.Spark在计算的过程中,是不是特别消耗内存?
不是。Spark是在管道中计算的,而管道中不是特别耗内存。即使有很多管道同时进行,也不是特别耗内存。
2.什么样的场景最耗内存?
使用控制类算子的时候耗内存,特别是使用cache时最耗内存。
3.如果管道中有cache逻辑,他是如何缓存数据的?
有cache时,会在一个task运行成功时(遇到action类算子时),将这个task的运行结果缓存到内存中
4.RDD(弹性分布式数据集),为什么他不存储数据还叫数据集?
虽然RDD不具备存储数据的能力,但是他具备操作数据的能力。
5.如果有1T数据,单机运行需要30分钟,但是使用Saprk计算需要两个小时(4node),为什么?
1)、发生了计算倾斜。大量数据给少量的task计算。少量数据却分配了大量的task。
2)、开启了推测执行机制
6.
运行spark程序
1
2
3
4
5
sc.master可以查看当前的运行模式
1.本地运行pyspark程序
pyspark --master local[4] ====>local[4]表示在本地运行,使用四个线程,local[*]表示尽可能多的使用核心

pyspark --master spark://192.168.0.217:7077 暂时有问题不知道是不是231ip问题,后面重启后在尝试一下
pandas的DF和spark的DF对比
1
2
3
4
5
6
两者的异同:

Pyspark DataFrame是在分布式节点上运行一些数据操作,而pandas是不可能的;
Pyspark DataFrame的数据反映比较缓慢,没有Pandas那么及时反映;
Pyspark DataFrame的数据框是不可变的,不能任意添加列,只能通过合并进行;
pandas比Pyspark DataFrame有更多方便的操作以及很强大
1
2
3
4
有效专利:针对一件专利,从申请日期开始,一直到失效日期都是有效的,若专利没有失效日期,需要结合当前状态去判断,如果当前状态为授权状态/再审状态,按照最大年限去认定失效日期,发明为20年,新型为10年.如果当前状态为失效,则判断不了具体失效时间,不做统计(总共只有一条).


,没有失效日期的按照最大年限去考虑,如发明专利,从申请日期开始往后20年都是有效的
spark sql架构
1
2
3
4
5
1.列式存储

2.dataframe api

3.DAG部分执行(pde),让我们在执行时根据处理过程中发现的一些数据动态修改和优化DAG.

jupyter notebook连接spark

1
2
3
4
5
6
import  os
import sys
spark_name = '/home/spark/pyspark/spark-3.0.1-bin-hadoop3.2'
sys.path.insert(0,os.path.join(spark_name,'python'))
sys.path.insert(0,os.path.join(spark_name,'python/lib/py4j-0.10.9-src.zip'))
exec(open(os.path.join(spark_name,'python/pyspark/shell.py')).read())