Rdd操作

#####基本的RDD转换操作—————->将一个Rdd转换成宁一个Rdd

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
1.map()--->最基本的转换操作========================>对rdd的每一个元素做处理
word = sc.parallelize(['a b','c d'])
word.map(lambda x:x.split(' ')).collect()
返回结果:
[['a', 'b'], ['c', 'd']]

2.flatMap()--->先map然后flat平铺========>注意M是大写的
word = sc.parallelize(['a b','c d'])
word.flatMap(lambda x:x.split(' ')).collect()
返回结果:
['a', 'b', 'c', 'd']

3.filter()--->过滤操作===============>筛掉不符合条件的
word = sc.parallelize(['a b','c d'])
word.filter(lambda x:len(x) == 1).collect()
返回结果:
[]

4.distinct()---->去重操作============>去重rdd
word = sc.parallelize(['a b','c d','a b'])
word.distinct().collect()

5.groupBy()----->按照指定条件对RDD分组==============>B大写,且一般不使用这种方法(要求键对应的值可以全部放入内存中)==========>推荐更高效的aggregateByKey()和reduceByKey()等
word = sc.parallelize([('a',1),('a',2),('b',3)])
word.groupBy(lambda x:x[0]).collect()
返回结果:
[('b', <pyspark.resultiterable.ResultIterable object at 0x7f7064cfa7b8>), ('a', <pyspark.resultiterable.ResultIterable object at 0x7f7064cfa1d0>)]

6.sortBy()------>按照指定键进行排序
word = sc.parallelize(['a b','c d','a b','a c'])
word.sortBy(lambda x:x[1],ascending=True)

7. cartesian------>求两个rdd的笛卡尔乘积
a.cartesian(a).collect()

8. glom ----->以列表的形式查看各个分区的数据
a = sc.parallelize([1,2,3,4],2)
a.glom().collect()

9.coalesce---->降低rdd 的分区数(小于等于之前分区数)
a.coalesce(1).glom().collect()

10. cogroup ----将两个pairrdd 中相同键的值放在一起
a = sc.parallelize([("a",2)])
b = sc.parallelize([("a",3),("b",2)])
[(x,tuple([list(i) for i in y])) for x,y in a.cogroup(b).collect()]
输出:[('b', ([], [2])), ('a', ([2], [3]))]

10.1 groupWith ------将多个pair rdd中相同的键的值放一起,效果同cogroup
a = sc.parallelize([("a",2)])
b = sc.parallelize([("a",3),("b",2)])
c = sc.parallelize([("a",3),("b",2)])
[(x,tuple([list(i) for i in y])) for x,y in a.groupWith(b,c).collect()]
输出: [('a', ([2], [3], [3])), ('b', ([], [2], [2]))]


11. union ---->合并两个rdd
a = sc.parallelize([1,2,3])
b = sc.parallelize([3,4,5])
a.union(b)

12. countByValue() ---->返回RDD值出现的次数,而非pairrdd的键值对应的值
x = [('a', 2), ('a', 2), ('b', 1)]
x.countByValue() 输出:{('a', 2): 2, ('b', 1): 1}

13. countByKey() ----->针对pairrdd 按照key聚合===>这个得是pairrdd
x.countByKey()

14. getNumPartitions ----->获取rdd的分区个数,查看具体的分区可以使用glom

15. groupby ------>按照传入参数的[返回值]进行聚合
a = sc.parallelize([1,2,3,4,5])
a.groupBy(lambda x:x%2 == 0).map(lambda x:{x[0]:list(x[1])}).collect()
[{False: [1, 3, 5]}, {True: [2, 4]}]

16. groupByKey ----->pair rdd 按照键进行聚合,值聚合成迭代器
a = sc.parallelize([(1,2),(2,3),(1,4)])
a.groupByKey().mapValues(list).collect()
输出:[(1, [2, 4]), (2, [3])]

17. mapValues ------> 对pair rdd 的值操作
a = sc.parallelize([(1,2),(2,3),(1,4)])
a.mapValues(lambda x:x+2).collect()
输出:[(1, 4), (2, 5), (1, 6)]

18. intersection ----->求两个rdd的交集
a = sc.parallelize([(1,2),(2,3),(1,4)])
a.intersection(a).collect()

19. isEmpty ----->判断rdd 是否为空
sc.parallelize([]).isEmpty()

20. mapPartitions ------->对rdd 的每个分区做处理,返回各个分区的结果
a = sc.parallelize([1,2,3,4,5],2)
a.mapPartitions(lambda x:[len(list(x))]).collect()
返回结果:[3,2]

21. zip -----> 生成pair rdd,x和y的长度需要一致
x.zip(y).collect()

#####基本RDD的行动操作———>进行求值或计算或输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
1.count() ------>计数=======>rdd中元素的个数
word = sc.parallelize([('a',1),('a',2),('b',3)])
word.count()
返回结果:
3

2.collect() ------>输出======>返回rdd元素构成的列表
word = sc.parallelize([('a',1),('a',2),('b',3)])
word.collect()
输出结果:
[('a',1),('a',2),('b',3)]

3.take() ------->输出=====>返回RDD指定个数的元素,返回的元素不固定.选元素没特定的顺序
word = sc.parallelize([('a',1),('a',2),('b',3)])
word.take(1)
输出结果:
[('a',1)]

4.top() ------>输出======>返回降序排序好的前N个元素
word = sc.parallelize([('a',1),('a',2),('b',3),('b',1)])
word.top(1)
输出结果:
('b',3)

5.first() ----->输出====>同take一样返回结果不固定,不同的是take返回的是列表#无参数
word = sc.parallelize([('a',1),('a',2),('b',3),('b',1)])
word.first() #无参数
输出结果:
('a',1)

===================用以聚合的行动操作reduce和fold====================
6.reduce()------>聚合========>数据需要同一个类型,空的rdd会报错
word = sc.parallelize([('a',1),('a',2),('b',3),('b',1)])
word.reduce(lambda x,y:x+y)
输出结果
('a', 1, 'a', 2, 'b', 3, 'b', 1)

7.fold()------->聚合======>需要给定初始值,且空的rdd不会报错
word = sc.parallelize([('a',1),('a',2),('b',3),('b',1)])
word.fold((),lambda x,y:x+y) #加法0,乘法1等
输出结果:
('a', 1, 'a', 2, 'b', 3, 'b', 1)


8.foreach()----->把函数应用到所有元素上======>不允许对rdd进行操作
word = sc.parallelize([('a',1),('a',2),('b',3),('b',1)])
word.foreach(lambda x:print(x)) #加法0,乘法1等
输出结果:
('a', 1)
('b', 1)
('b', 3)
('a', 2)
8.1 foreachPartition() 按照分区执行foreach
a = sc.parallelize([1,2,3,4,5,6],2)

9.aggregate() ------>二次聚合,分成两步执行,第一次是分区执行,第二次是对上一步结果再聚合
求均值(函数解释详见https://blog.csdn.net/qingyang0320/article/details/51603243)
nums = sc.parallelize([1,2,3,4,5,21],2)
nums.aggregate((0,0),lambda x,y:(x[0]+y,x[1]+1),lambda x,y:(x[0]+y[0],x[1]+y[1])) 得到(36,6)

10. aggregateByKey()------>也是进行二次聚合,第一次是分区计算(相同key的),第二次是对不同分区处理后的结果进行相同key聚合处理
b = sc.parallelize([(1,2),(1,3),(2,3),(3,4),(1,4),(2,7),(3,6)],2) 分成了两个区
b.aggregateByKey(0,lambda x,y:max(x,y),lambda x,y:x+y).collect()
解释:各个分区相同key的先取出各个分区值的最大值,然后不同分区再把结果值相加
得到结果:[(2, 10), (1, 7), (3, 6)]
键值对Rdd的行动操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
1.keys()----->获取键
per_info = sc.parallelize([('city','hefei'),('age',25),('name','kuailiang')])
per_info.keys().collect()
输出结果:
['city', 'age', 'name']

2.values()----->获取值
per_info = sc.parallelize([('city','hefei'),('age',25),('name','kuailiang')])
per_info.values().collect()
输出结果:
['hefei', 25, 'kuailiang']

3.keyBy() ----->构建pairRDD类型的数据=======>指定元组中的指定索引数据作为键,原来的援助作为值构成新的pairRDD
per_info = sc.parallelize([('city','hefei'),('age',25),('name','kuailiang')])
per_info.keyBy(lambda x:x[0]).collect()
输出结果:
[('city', ('city', 'hefei')), ('age', ('age', 25)), ('name', ('name', 'kuailiang'))]

4.mapValues()---->对pairRdd的值统一处理==========>V大写,相当于普通rdd的map
per_info = sc.parallelize([('city','hefei'),('age',25),('name','kuailiang')])
per_info.mapValues(lambda x:str(x)+'1').collect()
输出结果:
[('city', 'hefei1'), ('age', '251'), ('name', 'kuailiang1')]

5.flatMapValues()---->对pairRdd的值统一处理,并拍平======>先把键对应的值拍开然后mapvalues==========>就是先对k,v中的value使用函数处理,然后相同的键按照value拆开,value通常为列表
y = sc.parallelize([('a', [1, 2, 3])])
y.flatMapValues(lambda x:[i**2 for i in x]).collect()
输出结果:
[('a', 1), ('a', 4), ('a', 9)]

6.groupByKey()----->按照键进行聚合=========>聚合后返回的是一个迭代器===>推荐使用reduceByKey和foldByKey
per_info = sc.parallelize([('city','hefei'),('age',25),('name','kuailiang'),('city','beijing'),('age',28)])
per_info.groupByKey().mapValues(lambda x:list(x)).collect()
输出结果:
[('name', ['kuailiang']), ('age', [25, 28]), ('city', ['hefei', 'beijing'])]

7.reduceByKey()------>按照键聚合===========>和reduce一样是一个一个处理的
per_info = sc.parallelize([('city','hefei'),('age',25),('name','kuailiang'),('city','beijing'),('age',28)])
per_info.reduceByKey(lambda x,y:x + [y] if type(x) == list else [x,y]).collect()
输出结果:
[('name', 'kuailiang'), ('age', [25, 28, 30]), ('city', ['hefei', 'beijing'])]

8.foldByKey()-------->按照键聚合======>需要给定初始值
per_info = sc.parallelize([('age',25),('name',11),('age',28),('name',22)])
per_info.foldByKey(0,lambda x,y:x+y).collect()
输出结果:
[('name', 33), ('age', 53)]

9.sortByKey()------>按照键排序
per_info = sc.parallelize([('age',25),('name',11),('age',28),('name',22)])
per_info.sortByKey(0,lambda x,y:x+y).collect()
输出结果:
[('age', 25), ('age', 28), ('name', 11), ('name', 22)]

10.collectAsMap()------>将pairrdd输出为字典
m = sc.parallelize([(1, 2), (3, 4)]).collectAsMap() ,相同键value为最后一次出现的值

11.reduceByKey()------>按照key聚合处理,总共三个参数,第一参数表示当遍历到的键从未出现过的时候执行第一个函数,当出现过的时候执行第二个参数,第三个参数表示对相同键的不同分区的结果进行聚合
x = sc.parallelize([("a", 1), ("b", 1), ("a", 2)],2)
x.combineByKey(lambda x:[x],lambda x,y:x.append(y),lambda x,y:x+y).collect()


12. 对pair rdd的各种join操作

①join:内连接,共有的键才会连接
②leftOuterJoin
③rightOuterJoin
④fullOuterJoin
⑤union: 合并两个rdd
数值运算
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
1.min()------>返回符合要求的最小值
test1 = sc.parallelize([1,2,4,56,123,123,23,5])
test1.min(lambda x:1/x)
输出结果:
123

2.max() ------>返回最大值
test1 = sc.parallelize([1,2,4,56,123,123,23,5])
test1.max(lambda x:x//2)
输出结果:
123

3.mean()------>返回均值
test1 = sc.parallelize([1,2,4,56,123,123,23,5])
test1.mean()
输出结果:
42.125

4.sum()--------->求和
test1 = sc.parallelize([1,2,4,56,123,123,23,5])
test1.sum()
输出结果:
337

5.stdev()------->求标准差
test1 = sc.parallelize([1,2,4,56,123,123,23,5])
test1.stdev()
输出结果:
49.71528311294224

6.variance()---->方差
test1 = sc.parallelize([1,2,4,56,123,123,23,5])
test1.variance()
输出结果:
2471.609375


7.stats()------>所有的结构,如min,max,sum等等
test1 = sc.parallelize([1,2,4,56,123,123,23,5])
test1.stats()
输出结果:
(count: 8, mean: 42.125, stdev: 49.71528311294224, max: 123.0, min: 1.0)