hadoop入门

hadoop基础知识
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
hadoop:分为1.02.03.0版本.目前企业最常见的是2.0版本. 1.0版本由hdfs和mapreduce构成,2.0在hdfs的上层添加了yarn,并且计算框架也由单一的mapreduce,添加了其他计算框架如spark,Mr等

hadoop 是在nutch distributed file System的基础上开发的.


1.hdfs架构的组成:namenode和datanode,然后数据是分块的存储在datanode中(block),namenode相当于一个大管家进行整体的控制.除此之外进行的备份策略,有个secondary namenode,定期和namenode联系进行备份,防止节点挂了影响整体功能.secondnameode 会定期整合fsimage和fsedits发送给namenode)
fsimage:元数据的镜像文件
fsedits:元数据的操作日志
(并非 NameNode 的热备。当NameNode 挂掉的时候,它并不能马上替换 NameNode 并提供服务。)

namenode:
1.管理hdfs的命名空间
2.控制数据块的映射
3.配置备份策略
4.管理读写语句

datanode:
1.存储分块数据
2.进行数据的读写操作


hadoop本身是具有高吞吐量,高扩展性等优点,但是不具备快速查询功能,可使用hbase实现

secondnamenode 是namenode 的冷备份
热备份:一个节点坏了后,宁一个节点可以直接替换这个节点继续工作,称为热节点
冷备份:一个节点坏了后,宁一个节点不能直接替换工作,只是为了减少损失

每次重启namenode都会从fsimage读取元文件,然后经过edits中记录的操作,当edits中记录很多时候,namenode启动会非常慢,因而需要定时的合并fsimage文件和edits文件,同时清空edits文件.

合并fsimage和edits文件是在secondaryname中实现的,会让namenode暂停写入edits,然后从namenode中获取fsimage和edits文件,合并后得到新的fsimage和edits传输给namenode,


视频学习流程:
hadoop发展---->hadoop架构解释及面试问题---->hadoop命令---->namenode和datenode详解--->edits文件详解--->fsimage文件详解--->hdfsApi操作----->hdfs流程图(写入,写出,删除操作流程)---->hadoop源码详解---->租约锁和hadoop特点(见:https://blog.csdn.net/sinat_35667067/article/details/104250251?utm_medium=distribute.pc_relevant.none-task-blog-2~default~baidujs_title~default-0.pc_relevant_default&spm=1001.2101.3001.4242.1&utm_relevant_index=2)

hive是建立在hadoop基础上的数据仓库,会将hdfs中的文件映射到表中,提供sql查询和mapreduce功能.Hive本身不存储和计算数据,依靠于hadoop的hdfs和mapreduce,只是一种映射
hbase是物理表,简单理解为,hive是文件的视图,hbase是建了索引的key-value表。
hadoop API
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package HDFS.learnself;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Hdfs;
import org.apache.hadoop.fs.Path;

public class Write {
public static void main(String[] args) throws IOException, InterruptedException, URISyntaxException {
//1.加载hdfs的配置文件
Configuration conf=new Configuration();
//2.获取hdfs的操作对象
FileSystem fs=FileSystem.get(new URI("hdfs://hdp02:9000"), conf, "hdp02");
//3.文件路径
Path File=new Path("hdfs://hdp02:9000/test/cao.txt");
//4.创建FSDataOutputStream对象
FSDataOutputStream out=fs.create(File);
//5.写入数据
out.writeUTF("Hello world!");
out.close();
System.out.println("数据写入成功!");
}
}

hadoop特点

1
2
3
4
5
6
7
1.分布式存储架构,支持海量数据(GB,PB,TB级别)
2.高容错性,数据块存在多个副本,副本丢失自动恢复
3.低成本部署,可在廉价机器上搭建
4.能够检测和快速应对硬件故障,通过rpc心跳机制来实现
5.hdfs不能实现低延迟数据访问(毫秒级),hadoop的优势是高吞吐量(单位时间内产生的数据流),低延迟可通过hbase实现
6.hadoop不支持数据修改,适用于:一次写入,多次读取.但是允许追加数据
7.hadoop不适合存储海量小文件,会浪费namenode 的内存空间
map reduce
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
任务分成:map任务和reduce任务。map操作完后会进行shuffle操作,将相同key的分发到一起,同一个reduce任务中
mapreduce阶段:泛型对应 String ---->Text,Integer---->Intwritable,Long----->LongWritable
原因: 在map完了之后,需要将数据传递给reduce,这时候涉及到了序列化和反序列化,但是jdk自带的效率较慢,因而hadoop自己实现了序列化接口.
* hadoop为jdk中的常用基本类型Long String Integer Float等数据类型封住了自己的实现了hadoop序列化接口的类型:LongWritable,Text,IntWritable,FloatWritable


####map阶段
1.明确输入的键值是什么类型,输出的键值是什么类型
案例:
* KEYIN :是map task读取到的数据的key的类型,是一行的起始偏移量Long
* VALUEIN:是map task读取到的数据的value的类型,是一行的内容String
*
* KEYOUT:是用户的自定义map方法要返回的结果kv数据的key的类型,在wordcount逻辑中,我们需要返回的是单词String
* VALUEOUT:是用户的自定义map方法要返回的结果kv数据的value的类型,在wordcount逻辑中,我们需要返回的是整数Integer

map_task的数量会根据切片的大小自动分配,reduce_task的数量可以指定,默认1

Map是对分块的每一行操作还是需要遍历?是对每一行操作的,不需要遍历

map和reduce间的分区,默认的是键的hash值与reducetask的数量取模得到,也可以通过partitioner自定义划分到哪一个分区(分区数等于reduce_task数量),通常一个reduce任务中数据已经经过键排序过了,当需要全局排序,一方面可通过一个reduce task实现,但是会影响整体的效率,宁一方面就是重构parpartitioner来实现

每个reduce_task 生成一个文件

要点:输出对象,分区指定,序列化指定

####################################job任务的执行流程##########################


多进程和多线程的区别
1.多进程占用内存多,cpu利用率低;多线程占用内存少,cpu利用率高
2.多线程进程间不会互相影响,多线程一个断了则全断
Yarn架构

1
2
3
4
5
6
7
8
9
10
11
12
13
yarn:是在hadoop1.0的基础上衍生出来的,为了缓解jobtracker的压力.主要进行资源调度和任务分配.
yarn的三大组件:resourceManager(rm),nodemanager(nm),applicantionmaster
rm主要进行资源分配,applicationmaster主要进行任务调度

ResourceManager主要有两个组件:Scheduler和ApplicationManager
Scheduler:负责资源的分配,纯调度
ApplicationManager:为应用分配container来运行applicantionmaster,并且负责监控applicantionmaster

NodeManager是yarn节点的一个“工作进程”代理,管理hadoop集群中独立的计算节点,主要负责与ResourceManager通信,负责启动和管理应用程序的container的生命周期,监控它们的资源使用情况(cpu和内存),跟踪节点的监控状态,管理日志等。并报告给RM。

container:一个抽象资源对象,封装了程序运行需要的资源(如内存,cpu,磁盘等)

ApplicationMaster: ApplicationMaster负责与scheduler协商合适的container,跟踪应用程序的状态,以及监控它们的进度,ApplicationMaster是协调集群中应用程序执行的进程。每个应用程序都有自己的ApplicationMaster,负责与ResourceManager协商资源(container)和NodeManager协同工作来执行和监控任务 。
yarn的调度流程

Flume
1
2
3
4
5
6
7
8
9
10
Flume:海量日志采集、聚合和传输的系统。最主要的作用是实时读取服务器本地磁盘的数据,将数据写入到HDFS中。
Flume架构:webserver--->Agent---->HDFS
Agent(Source、channel、sink)
Agent:Jvm的一个进程,以事务的形式将数据从源头送到目的
Source:责任是将数据传输到Flume Agent的一个组件;
channel: 解耦合,具有缓冲的作用.Flume自带两个channel,一个memory channel 和File channel,前者存在内存,存在数据丢失风险,速度快.后者存在磁盘上,安全性高,速度慢
sink:不断轮询channel并删除,然后将这些事件批量写入存储或者索引系统(如hdfs,hbase,avro等)
一个source对应多个channel,一个sink只能对应一个channel,一个channel对应多个sink
channel功能:一个是数据传输的可靠性,一个是数据流入流出的异步执行.前面一句是因为只有当数据确认到到目的地或者下一个agent时才会删除channel内对应的event.
source可实现扇出,即一个source对应多个channel,然后方式有两种,一个是复制,即每个channel的数据都一样.一个是选择(多路复用),即event有选择的流入哪个channel

1646213144225

Kafka
1
2
3
4
5
6
7
8
9
10
快速了解:https://max.book118.com/html/2021/1027/8005064135004025.shtm
定义:一个分布式的流媒体平台(需要具备三个功能)
1.发布和订阅记录流
2.以容错的方式永久订阅记录流
3.记录发生时处理流
通用:用在实时数据平台上,由flume到kafka到storm
kafka应用场景:
kafaka特点:
1. poll模式.消费的速率由下游的消费者(计算框架决定)
2. Kafka提供了消费的持久化机制,无论消费者是否消费都会存储,通过副本冗余机制提供数据的冗余机制

hive

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
hive是数据仓库的一种
数据仓库的特点:
1.存放的是历史数据
2. 存储大容量数据,gb,tb甚至pb
hive本身并不存储数据,而是hadoop中hdfs的映射,进行hql操作时候实际操作的是hdfs的文件夹,且数据源需要配置mysql,hql默认会使用mr,针对数据倾斜问题,一个是group by 一个是join,只需要在当前session改两个配置即可.然后分成外部表和内部表.分区表既可以是外部表也可以是内部表,通常是以时间来作为分区依据,分区的好处是避免全表扫描,(实际上是每个分区键都有对应的文件夹,只到对应文件夹下去读取数据.

hive知识点梳理
1. 内部表、外部表 和分桶表(用来随机抽样用的)
2. metastore: hive元数据,存放在关系型数据库中,如derby和mysql
3. hive复杂数据结构:array、map、struct
4. hive常用的字符串操作函数
5. hive 的udf函数
6. hive数据倾斜问题及优化(①group by ②join ③count)
7. hive 的优化:大概七八个吧①map参数 ②reduce参数 ③jvm重用 ④严格模式
8. sqoop mysql<->hdfs
9. hive api接口:①client ②代码jdbc ③可视化 webui

sqoop既可以实现数仓到关系型数据库,也可以实现关系型数据库到数仓

大数据创建总共分成两种一种是离线批处理一种是实时处理

离线批处理流程:

  1. 日志通过flume进行采集
2. 将数据存入hdfs中
  3. 通过hive进行处理
    1. 建立总表
  4. 建立清洗表,实现etl
  5. 建立业务表,清洗业务字段
  6. 建立多维度表
  7. sqoop将清洗完的数据导入mysql,进行可视化

####Scala

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
函数定义:
var f1 = (x:Int) => x*2
var f2 : (Int,Int)=>(Int) = (x,y) => x*y
def fu(name:String):Unit = {println(name)}

match case break,continue (导包:import util.control.Breaks._)
kl match{case "kl" => println(2) case "sd" => println(3)}
break写法:
Breaks.breakable(
| for (i <- 1 to 10){
| if (i==5){
| Breaks.break()
| }
| println(i)
| }
| )
continue写法:
for (i<- 1 to 5)
| {Breaks.breakable(if (i==3) Breaks.break() else println(i))}
区别就是break是写在循环外的,cntinue是写在循环里的,写法是一样的
try catch finally

//==============================函数===========================
可变参数
def people(name:String*)===>在类型后加一个星号表示可变,实际是个容器,放了同一类型的多个数据,可通过
遍历获取出来
函数返回值:Unit表示没有返回值,可以不写返回值的类型,会自己推断(前提是有等号,等号不写,全都默认没有返回值)
scala的泛型使用中括号[],不同于java的尖括号<>
函数体只有一行的时候可以简写
匿名函数 var a = (x:Int,y:Int)=>x*y (x,y)=>x*y x=>x**2
函数支持公开
函数中调用匿名函数
def f2(a:Int,b:Int,f:(Int,Int)=>{Int}) = {f(a,b)}

f2(2,3,(x,y)=>x+y)
最简化版本: f2(2,3,_*_) 其中_指代的是参数
匿名函数和高阶函数

println(s"Lines with a: $numAs, Lines with b: $numBs")


scala完全面向对象,因而去掉了java中的非面向对象的元素,如static和void
scala无static关键字,通过object实现类似静态方法的功能
void 无返回值scala通过Unit实现
类中使用BeanProperty修饰表示该变量会自动生成get和set方法 @BeanProperty var age = 20

伴生类和伴生对象,名称相同,伴生对象可以使用伴生类的所有东西(即object和class),又因为不支持静态方法,这样在编译时候就会生成两个同名的文件,加$符号的是伴生对象的生成类,不加$是入口类

scala的数据类型:Any:包含AnyvalAnyref(即引用和数值),anyref包含:scala classes 和 other scala classes 和 all java classes.然后all java classes又包含null,null的下一级为Nothing
anyval:包含Unit、stringops(java中string的增强)、char、boolean、Double(下属float->long->int>short->byte)下一级为Nothing
通常Unit表示函数没有返回值,Null表示的是对象为空(必须是引用类型),Nothing表示没有任何返回值,不同于nullUnit,通常用在函数内抛出异常时使用
数据类型转化分为隐式转换和显示转换,隐式转换表名低阶可以自动转换成高阶,高阶转换成低阶需要调用方法如toInt等

数值转换成字符串 ①2.toString ②2+""

scala循环推荐使用for循环而非while和do while,且scala中没有breakcontinue关键字,要想实现需要调用方法:scala终止循环实际上是通过抛出异常实现的,不同的是调方法是进行了封装
try{
for(i <- 1 to 10; j = 20 - i){
if (i == 9){
throw new RuntimeException
}
println("i+j="+i+j)
}} catch {case exception: Exception =>}
println("结束了")

Breaks.breakable(
for (i <- 1 until 10){
if (i ==6){
Breaks.break()
}
println(i)
})

匿名函数化简:
1.参数类型可以省略 (x:Int)=>{x+1} 变成(x)=>{x+1}
2.参数只有一个时候括号可以省略 x=>{x+1}
3.函数体只有一行,花括号可以省略 x => x+1
4.如果参数只出现一次可以使用_代替 (x,y) => x+y 变成 _ + _
闭包和柯里化
scala导包 com.公司名.项目名.模块名 或者package{}三层嵌套也可以

面向对象:
scala的构造器分为主构造器和辅助构造器,主构造器是在类名上加参数,辅助构造器是重载this方法
构造器参数修饰,var可变参数,val不可变参数,只能访问不能修改


集合:
Array:
可变的添加:a.+=(2) 或者a.append(2)
插入:a.insert(0,2)
赋值:a(1) = 3
可变与不可变的转换a.toBuffer a.toArray
创建多维矩阵 Array.ofDim[Double](3,4)

spark操作

1
2
3
4
5
val a = sc.parallelize(Array((1,"xaioming"),(2,"dsada"),(3,"sda")))
val b = a.toDF("ID","name")
b.filter($"name" === "sda").show()
b.filter($"name".isin("sads","dsda"))
b.filter($"name".contains("sads"))
自定义Spark累加器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.benying.spark

import org.apache.spark.util.AccumulatorV2
import org.apache.spark.{SparkConf, SparkContext}

import java.lang
import scala.collection.mutable

/*accumulator自定义操作*/
object Accumulator_learning {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sc = new SparkContext(conf)
val rdd1 = sc.makeRDD(List("Hello","Spark","Scala","hello","Spark"),2)
//自定义累加器
val myacc = new MyAccumulator_()
sc.register(myacc,"spark_ACC")

rdd1.foreach(word=>myacc.add(word))

println(myacc.value)
//关闭
sc.stop()

}
class MyAccumulator_ extends AccumulatorV2[String, mutable.Map[String,Long]]{
private val word_map = mutable.Map[String,Long]()

override def isZero: Boolean = {
word_map.isEmpty
}

override def copy(): AccumulatorV2[String, mutable.Map[String,Long]] = {
new MyAccumulator_()
}

override def reset(): Unit = {
word_map.clear()
}

override def add(word: String): Unit = {
val num:Long = word_map.getOrElse(word,0L) + 1
word_map.update(word,num)
}

override def merge(other: AccumulatorV2[String, mutable.Map[String,Long]]): Unit = {
val other_value: mutable.Map[String, Long] = other.value
other_value.foreach{
case (word,num) =>
val new_num:Long = word_map.getOrElse(word,0L) + num
word_map.update(word,new_num)
}
}

override def value: mutable.Map[String,Long] = {
word_map
}
}
}

FLUME–HIVE–KAFKA–HBASE–STORM–SPARK

zookeeper:贯穿全部