大数据技术基础

HDFS 读写操作

分布式系统的产生原因

对于现有存储系统来说,面临着以下四点问题

  • 海量数据存储、统一管理、统一调度问题

  • 大容量并发读写,使得设备性能成为瓶颈

  • 需要及时修复的设备故障,似的维护成本激增

  • 不同的应用系统之间数据难以融合

解决上述问题的方式是分布式/云存储技术,用户按需使用存储资源,无须关心存储设备身在何处,如何构建,提供的解决方案是

  • 提供分布式文件系统,数据调度与存储分离

  • 将单打独斗升级为团队作战

  • 由及时维护转变为定期维护

  • 将各种数据融合,提供统一接口

分布式系统架构

云存储的系统架构分为“控制节点”和存储节点两个部分。控制节点采用集群架构,存储元数据,对存储节点进行调度。存储节点堆数据进行分布式存储,可进行复制或者RAID备份

云存储的工作流程

业务系统向控制节点申请存储资源,控制节点回复调度信息

数据经由网络,到达存储节点进行分布式存储

控制节点与存储节点间同步调度与存储信息

控制节点进行元数据同步备份

存储节点的数据进行复制或RAID备份

流程图如下:

Hadoop介绍

Hadoop是Apache Software Foundation开发的一个分布式系统基础构架,应用JAVA语言进行开发。Hadoop能够在大规模计算机集群中对海量数据进行分布式计算。它的核心设计为两个模块:

  • 分布式文件系统Hadoop Distributed File System

  • HDFS分布式计算框架MapReduce

HDFS介绍

HDFS是一个分布式文件系统,是Hadoop体系中数据存储管理的基础。采用的是Master/Slave架构,集群主要有NameNode,DataNode,Secondary NameNode,JobTracker,TaskTracker组成

HDFS结构

物理结构是由计算机集群的多个节点构成,节点分为两类

  • Master Node 或称为 NameNode。用于管理文件系统的命名空间,又叫元数据节点

  • Slave Node 或称为 DataNode。用于存储实际的数据

HDFS集群由一个NameNode和若干个DataNode构成。

NameNodeDataNode
存储元数据存储文件内容
元数据保存在内存中文件内容保存在磁盘
保存文件,block,datanide之间的映射关系维护block id到datanode本地文件的映射关系

HDFS的NameSpace管理和Block块管理

NameSpace支持创建、删除、修改、列举命名空间相关系统的操作

NameSpace命名空间以层次结构组织存储着文件名和BlockID的对应关系、BlockID和具体Block位置的对应关系

在NameNode中,维护块的位置信息创建,修改,删除,查询块管理副本和副本位置

NameNode

NameNode负责:文件元数据信息的操作(对整个文件系统的命名空间进行管理)以及处理客户端的请求(对外界的文件访问请求进行处理)

将所有的文件和文件夹的元数据保存在一个文件系统树中

NameNode也负责向DataNode分配数据块并建立数据块和 DataNode的对应关系

NameNode维护:文件系统树(FileSystem)以及文件树中所有的文件和文件夹的元数据信息维护文件到块的对应关系和块到节点的对应关系

NameNode文件:NameSpace镜像文件(FsImage)操作日志文件(EditLog)这些信息被Cache在RAM内存中,这两个文件也会被持久化存储在本地硬盘

NameNode记录:每个文件中各个块所在的数据节点的位置信息。但它并不永久保存块的位置信息,因为这些信息在系统启动时由数据节点重建

从DataNode数据节点重建:在NameNode启动时,DataNode向NameNode进行注册时发送给NameNode

DataNode

DataNode是文件系统的工作节点,同时也是文件存储的基本单元

DataNode将block存储在本地文件系统中保存了block的Meta-data,同时周期性地将所有存在的block信息发送给NameNode

Block是用来存储数据的最小单元,通常一个文件会存储在一个或者多个Block中默认Block大小为64MB

每个DataNode会周期性的向Namenode发送心跳消息,报告自己所在DataNode的使用状态

HDFS架构的部署

HDFS通常的部署是NameNode程序单独运行于一台服务器节点上(主服务器),其余的服务器节点每台运行一个DataNode程序

HDFS在进行基本文件访问时,首先由用户的应用程序经过HDFS的客户端将文件发送到NameNode

HDFS所有的通讯协议都是基于TCP/IP协议

HDFS为了提高NameNode的可靠性,引入了Secondary NameNode节点,辅助NameNode对映像文件和事物日志进行处理

SecondaryNameNode

作用:定期将Namespace镜像FsImage与操作日志文件(EditLog )合并SecondaryNamenode并不能被用作Namenode

SecondaryNamenode通常运行在一个单独的物理机上,因为合并操作需要占用大量的CPU时间以及和Namenode相当的内存

工作流程

  • 将hdfs更新记录写入一个新的文edits.new

  • 将fsimage和editlog通过http协议发送至Secondary NameNode

  • 在Secondary NameNode中将fsimage与editlog合并,生成一个新的文件fsimage.ckpt

  • 将生成的fsimage.ckpt通过http协议发送至Namenode

  • 重命名fsimage.ckpt为fsimageedits.newJedits

HDFS文件读写操作

HDFS文件写入数据流程

  • 客户端通过调用FileSystem对象的create()来创建文件;客户端发起文件写入请求,通过RPC与NameNode建立通讯;NameNode检查目标文件,返回是否可以上传

  • client请求第一个block该传输到哪些DataNode服务器上

  • NameNode根据配置文件中指定的备份数量及机架感知原理进行文件分配,返回可用的DN的地址

  • Client 端和NameNode分配的多个DataNode构成pipeline管道

  • Client 调用FSDataOutputStream的write方法往pipeline管道里写大小为64K的packet数据,packet数据通过pipeline管道不断流向对应的DN节点上进行存储

  • 当一个bock块写入完成之后,客户端继续向NameNode获取下一个block块的位置信息,继续写入

  • 当把所有块写入完成后,Client调用FSDataOutputStream.close()方法,关闭输出流

  • 最后调用FileSystem.complete()方法,告诉NN节点写入成功

HDFS文件读出数据流程

  • 客户端通过调用FileSystem对象的open()来读取希望打开的文件

  • 客户端向NameNode发起RPC请求,来确定请求文件block所在的位置

  • NameNode会视情况返回文件的部分或者全部block列表,对于每个block, NameNode都会返回含有该block副本的DataNode地址;这些返回的DataNode地址,会按照集群拓扑结构得出 DN与客户端的距离,然后进行排序,排序两个规则:网络拓扑结构中距离 Client近的排靠前;心跳机制中超时汇报的DataNode的排靠后

  • Client调用FSDataInputStream的read方法读取数据,进行读取数据的的Client 选取排序靠前的DataNode来读取block,如果客户端本身就是DataNode,那么将从本地直接获取数据(短路读取特性)

  • 读完一批 bock块后,若文件读取还没结束,客户端会继续向NN获取下一批的 block 列表,继续读取

  • 所有block块读取完成后,Client调用FSDatalnputStream.close()方法,关闭输入流,并将读取来所有的 block块合并成一个完整的最终文件

实验注意事项

  • 每次重启服务器后,需要将/etc/hosts/里serverName 127.0.0.1部分删除

  • 需要打开防火墙的DataNode对应端口

代码

HBase应用实践

HBase介绍

HBase是Big Table的开源实现,存储半结构话的数据,通过ZooKeeper协同管理服务。

HBase存储采用面向聚合思想,以订单数据为例:若查询一段时间某个用户浏览的商品

某个用户ID行键-浏览记录列族-时间范围过滤,一次读取

HBase列式存储模型

HBase 中的一个表有若干行,每行有多个列族,每个列族中包含多个列,而列中的值有多个版本

行键、列族、列限定符和时间戳来确定一个单元格

[行键,列族,列限定符,时间戳]: 视为一个“四维坐标”

行键:一个行可以有一个行键和任意多个列

列族:列族就是列的家庭,列是家庭成员,通常家庭成员有多个,一个列族包含多个列

列限定符:列修饰符位于列族里,用来标识一条条数据

HBase物理存储模型

Client:通过库函数与Master进行管理操作通信;与RegionServer进行数据读写操作通信

HBase Client使用HBase的RPC机制与HMaster和HRegionServer进行通信

对于管理类操作,Cient与HMaster进行RPC

对于数据读写类操作,Client与HRegionServer进行RPC

Master:RegionServer的管理;RegionServer的负载均衡;Region分布调整

RegionServer: 负责存储不同的Region提供表数据读写等服务;是HBase数据处理和计算单元

Zookeeper:维护元数据总入口,记录Master地址,监控集群,对故障节点处理

Zookeeper是HBase体系中的协同管理节点,提供分布式协作,分布式同步,配置管理等功能

ZookeeperOuorum中存储的信息包括:

存储-ROOT表的地址: /hbase/root-region-server
存储HMaster的地址: /hbase/master

存储所有HRegionServer的状态:HRegionServer会把自己以短暂的方式注册到 Zookeeper中: /hbase/rs

HBase的使用

实验注意事项

  1. 当集群中所有服务器的ZooKeeoer均启动时,./zkServer.sh status才会正常显示Mode

代码

MapReduce分布式数据处理

MapReduce简介

面对海量数据,需要行之有效的并行处理技术,MapReduce提供了一个分布式计算环境和框架,让程序员仅需关注问题本身,编写很少的程序代码即可完成看似很复杂的任务

典型流式大数据问题的特征

Map => 大量数据记录/元素进行重复处理;对每个数据记录/元素做感兴趣的处理/获取感兴趣的中间结果信息

Reduce => 收集整理中间结果/产生最终输出

关键思想:伟大数据处理过程中的两个主要要处理操作提供一种抽象机制

基于Map和Reduce的并行计算模型

基于Map和Reduce的并行计算模型

各个map函数对所划分的数据并行处理,从不同的输入数据产生不同的中间结果输出

各个reduce也各自并行计算,各自负责处理不同的中间结果数据集合

进行reduce处理之前,必须等到所有的map函数做完,因此,在进入reduce前需要有一个同步障(barrier);这个阶段也负责对map的中间结果数据进行收集整理(aggregation & shufe)处理,以便reduce更有效地计算最终结果

最终汇总所有reduce的输出结果即可获得最终结果

MapReduce并行处理的基本过程

  1. 有一个待处理的大数据,被划分为大小相同的数据块,以及与此相应的用户作业模块

  2. 系统中有一个负责调度的主节点,以及数据Map和Reduce工作节点

  3. 用户作业程序提交给主节点

  4. 主节点为作业程序寻找和配备可用的Map节点,并将程序传送给map节点

  5. 主节点页为作业程序寻找和配备可用的Reduce节点,并将程序传送给Reduce节点

  6. 主节点启动每个Map节点执行程序,每个map节点尽可能读取本地或本机架的数据进行计算

  7. 每个Map节点处理读取的数据块,并做一些数据整理工作(combining, sorting等)并将中间结果存放在本地;同时通知主节点计算任务完成并告知中间结果数据存储位置

  8. 主节点等所有Map节点计算完成后,开始启动Reduce节点运行;Reduce节点从主节点所掌握的中间结果数据位置信息,远程读取这些数据

  9. Reduce节点计算结果汇总输出到一个结果文件即获得整个处理结果

MapReduce运行原理

一个MapReduce程序可对应若干个作业,而每个作业会被分解成若干个Map/Reduce任务(Task)

Task分为Map Task和Reduce Task两种,slot分为Map slot和Reduce slot两种,分别供Map Task和Reduce Task使用

TaskTracker通过slot数目(可配置参数)限定Task的并发度

MapReduce的详细架构

Client:用户可以编写MapReduce程序,通过Client提交到JobTracker端

每一个 Job 都会在用户端,将应用程序以及配置參数 Configuration 打包成 JAR文件

将JAR 文件存储在HDFS,并把路径提交到JobTracker的master 服务

由 master 创建每一个Task(即 MapTask 和 ReduceTask)将它们分发到各个TaskTracker 服务中去执行

Map阶段数据处理单位split

HDFS以block为基本单位存储数据

MapReduce的Map阶段处理单位是split

split分片:将数据源根据用户自定义规则分成若干个分片数据集;一个split分片对应一个Map Task

split是一个逻辑概念,它只包含一些元数据信息,比如数据起始位置、数据长度、数据所在节点等;划分方法由用户决定

输入分片不是物理分片,而是逻辑分片:不是把原来的一个大文件,如10MB的文件,切分成10个1MB的小文件

逻辑分片就是根据文件的字节索引进行分割,比如01MB位置定义为第一个分片,1MB2MB定义为为第二个分片,依次类推…

输入分片(inputsplit)存储的并非数据本身,而是一个分片长度和一个记录数据的位置的数组

Map Task的个数等于split的个数

mapreduce在处理大文件的时候会根据一定的规则,把大文件划分成多个分片,提高map的并行度

划分出来的就是ImputSplit,每个map处理一个InputSplit。

多少个InputSplit,对应多少个Map task

MapReduce执行

第一阶段分片

分片主要是InputFormat类来负责划分Split

FileInputFormat是InputFormat的子类,是使用比较广泛的类

每个输入分片的大小是固定的,默认情况下,输入片的大小与数据块的大小相同

第二阶段Map

每个Mapper任务是一个Java进程,它会读取HDFS文件中自己对应的输入切片

Map Task先将对应的split迭代解析成一个个key/value对

依次调用用户自定义的map0函数进行处理

将切片中记录按照一定的规则解析成很多的键值对

第三阶段Shuffle

Shuffle阶段也成为洗牌阶段,该阶段是将输出的<k2,v2/>传给Shuffle,Shuffle完成对数据的分区、排序和合并等操作

Shuffle过程包含在Map和Reduce两段,即Map Shuffle和Reduce shuffle,Shuffle描述着数据从map task流向reduce task这段过程

第四阶段Reduce

依次读取<key,value list>调用用户自定义的reduce()函数处理,并将最终结果存到HDFS上

完整生命周期

当用户(Application)提交作业后JobClient 将作业所需要的相关文件上传到HDFS,并通过RPC(远程调用协议)通知 JobTracker

JobTracker 内的任务调度模块为作业创建一个JobInProgress对象,用来跟踪作业的实时运行状况

JobInProgress 为每个Task创建一个TaskInProgress 对象,用来跟踪子任务的运行状态。

TaskTracker为任务运行准备环境每个TaskTracker可以运行多个TaskTaskTracher会为每个Task启动一个独立的JVM以避免Task之间的相互影响环境准备完毕后,TaskTracker便会启动Task。

所有的Task执行完毕,MapReduce作业生命周期结束

实验注意事项

代码