一般来说,大数据系统都是由多个子系统集成构成的系统,考虑到大数据架构的设计原则以及大数据系统构建的经济性和稳定性,软件基础架构设计方案将全面采用成熟的开源项目成果,比如非结构化存储采用HDFS分布式文件系统,结构化数据存储采用HBase分布式数据库系统。每个子系统都采用主从模式,即由一个主节点和多个从节点构成。大数据系统逻辑架构是一个多层次的体系架构,采用“分布式平台+分析式应用”的模式进行设计。
该系统架构的目标是设计出一个能满足大数据系统关键特性的架构,包括有:高容错、低延时和可扩展等。该系统整合了离线计算和实时计算,集成了Hadoop、Kafka、Storm、HBase等各类大数据组件。
系统逻辑架构设计如图2.1所示。
该架构主要由Batch Layer、View Layer和Speed Layer组成,Batch Layer主要由数据采集系统、分布式文件存储系统HDFS以及分布式并行计算框架MapReduce组成;View Layer主要采用了分布式高速数据库HBase;Speed Layer主要由分布式消息队列服务系统Kafka以及流式实时计算框架Storm组成。
在该系统框架中,数据来源可以是应用系统、日志文件、网络舆情、智能终端以及传感器等。数据采集系统可以从数据源以及HBase中获得数据,将数据存储在HDFS上,再由MapReduce对这些数据进行分布式计算处理;Kafka和数据采集系统相同的是它也可以从数据源以及HBase中获得数据,不同的是它主要针对流式数据通道进行解耦,并交给Storm进行实时计算处理;MapReduce和Storm处理后的信息都在HBase中进行保存;当不同的应用需要显示结果时,只需要从HBase中取出处理以后的数据即可,不需要每次都对原始数据进行计算处理,提高了系统的响应时间和资源的使用率。
下面我们将对每个部分进行详细的介绍。
1.数据采集系统
图2.1 系统逻辑架构设计
针对目前互联网资源呈指数级增长,网络信息更新速度快等现象,实现对特定领域数据的实时监测和有效分析,将相关网络信息存储到本地,为后续的信息抽取、链接分析、知识库构建、可视化等工作提供重要的基础数据成为目前研究的热点话题,由此产生数据采集这一概念,数据采集又称数据获取,是从传感器和其他待测设备等的模拟和数字被测单元中自动采集数据并输入到系统内部的一个接口。
信息采集是数据采集的一种表现形式,指从大量的网页中将非结构化的信息抽取出来,采用正则函数,进行信息清洗过滤后保存到结构化的数据库中,同时提供个性化的信息定制及强大的全文检索能力。即对信息的收集和处理,其中非结构化数据(如网页)的采集过程主要是通过爬取系统Nutch技术来实现,而对关系型数据的抽取,转换和加载即ETL通过Sqoop技术和Flume技术来实现。
1)爬取系统Nutch
Nutch是由Java语言开发实现的开源式高扩展搜索引擎,其提供易用的接口和插件机制,为分布式搜索引擎的开发奠定了良好的基础。Nutch的实现主要是基于Hadoop平台,由若干个数据服务器和一个客户端组成,包含用于从网络上抓取网页并为网页建立索引的爬虫crawler和利用索引检索用户的查找关键词来产生查找结果的检索searcher两个部分。
其中爬虫crawler有几个基本概念,我们在这里进行简要的介绍。
crawler数据文件主要包括三类,分别是Web Database、多Segment和Index。
(1)Web Database。
Web Database即WebDB,用于存储爬虫所抓取网页之间的链接结构信息,存储page和link两种实体的信息。其中page实体描述的网页特征主要包括网页内的link数目,抓取此网页的时间等相关信息,对此网页的重要度评分等。link实体描述的是两个page实体之间的链接关系。
(2)多Segment。
各Segment内存储爬虫Crawler在单次抓取循环中抓到的网页以及网页索引。
(3)Index。
Index是Crawler通过对所有单个Segment中的索引进行合并处理抓取的所有网页索引。
下面对Nutch爬虫的运行机制进行简单的介绍。
Nutch爬虫的实现机制主要是通过与solr配合,采用宽度优先遍历的方式实现爬取,并将数据索引添加到solr数据库,由solr负责查询工作,由此提供了运行搜索引擎所需的全文搜索和Web爬虫工具。其工作的原理又称作“产生/抓取/更新”循环,在Nutch中,Crawler操作的实现是通过一系列子操作的实现来完成的。这些子操作Nutch都提供子命令行可以单独进行调用。Nutch的实现机制见图2.2,下面是这些子操作的功能描述。
第1步:执行create操作,创建一个新的WebDB;
第2步:建立初始URL集合,执行inject操作,并将其注入至存储URL集合的WebDB中;
第3步:执行generate操作,根据WebDB生成抓取列表fetchlist并写入相应的segment;
第4步:执行fetch操作,根据抓取列表fetchlist中的URL抓取网页数据;
第5步:执行parse操作,解析抓取的网页数据,抽取链接URL,提取文本信息至segments数据库;
第6步:执行updatedb操作,将抓取网页更新至WebDB;
循环进行3~6步直至达到预先设定的抓取深度;
第7步:执行updatesegs操作,根据WebDB得到的网页评分和links更新segments;
第8步:执行index操作,对所抓取的网页进行索引;
第9步:执行dedup操作,在索引中丢弃有重复内容的网页和重复的URLs;
第10步:执行merge操作,将segments中的索引进行合并生成用于检索的最终index。
图2.2 Nutch运行机制图
2)Sqoop
Sqoop是将关系型数据库中的数据交换到Hadoop平台,存储在HBase、Hive中,为日后分析处理使用的数据交换系统,实现将关系型数据库数据导入到Hadoop与其相关的系统或将数据从Hadoop系统里抽取并导出到关系型数据库,处理分析后供客户端使用,实现高效、可靠的数据传输。其经历Sqoop1和Sqoop2两个变迁历程,Sqoop1是客户端,以命令行控制台的形式实现数据交换,缺乏安全性,而Sqoop2引入基于角色的安全机制,实现对connector的管理,在安全性及访问方式方面均有较大改善。
Sqoop设计对象为大数据批量传输,能够分割数据集并创建Hadoop任务来处理每个区块,具有包括导入和导出、列出数据库和表信息、生成Java类来操纵数据、解析SQL命令以及其他一些更专门的命令。
下面对Sqoop连接字符串策略进行简单介绍。
Sqoop根据数据库连接字符串决定策略,包括JDBC导出策略和mysqlimport直接模式策略。多数情况下,Sqoop使用JDBC导出策略,即利用导出工具从序列化文件中读取对象,导出Mapreducejob输出的文本文件或导出序列化存储的记录到外部表中,之后直接发送给OutputCollector,将对象传递到数据库,导出Output-Format的过程。其具体实现过程为:首先构建INSERT语句并将把记录写到目标表格中,其次Sqoop生成一个基于目标表定义的Java类,从文本文件中解析记录并且向表中插入合适的数据类型,最后Mapreducejob单线程启动,从HDFS中读取原数据,用生成的代码解析记录。mysqlimport直接模式策略的实现过程为:每一个maptask产生一个mysqlimport进程,该进程通过FIFO管道数据以流的形式,到mysqlimport,然后再到数据库。
下面将对Sqoop的运行机制进行简单介绍。
Sqoop架构简单,通过Hadoop的Mapreduce把数据从关系型数据库导入到HDFS,同时整合Hive、Hbase和Oozie,通过实现map-reduce任务来传输数据,提供并发特性和容错功能。Sqoop的基本工作流程如图2.3所示。
图2.3 Sqoop的基本工作流程图
第1步:Sqoop在import时,需要生成运行类,制定split-by参数。
第2步:Sqoop根据不同的split-by参数值来进行切分,然后将切分出来的区域分配到不同的map中。
第3步:每个map中再处理数据库中获取的一行一行的值,写入到HDFS中,然后创建RecordReader从数据库中读取数据。
第4步:创建map,每个map获取各自SQL中的数据进行导入工作(基于JDBC的导出策略)。
3)Flume
Flume是Cloudera提供使用JRuby来构建的一个开源的、高可用的、高可靠的、容易管理的、支持客户扩展的分布式海量日志数据采集、聚合和传输的系统。Flume支持在日志系统中定制各类数据发送方,提供了从console(控制台)、RPC(Thrift-RPC)、text(文件)、tail(UNIX tail)、syslog(syslog日志系统)、exec(命令执行)等数据源上收集数据的能力和对数据进行简单处理,并写到各种数据接收方(可定制)的能力。Flume减少了时间复杂度,具有高效性和高可用性,在处理流数据事件中应用广泛。
Flume的功能是从数据源收集并缓存数据,再送到目的地而后删除缓存数据,其运行的核心是Agent。作为完整的数据收集工具,含有三个核心组件,分别是source、channel、sink。传输数据的基本单位是Event,Event代表着一个数据流的最小完整单元,从外部数据源来,向外部的目的地去。从Source流向Channel,再到Sink,作为byte数组,可携带headers信息。
如图2.4所示,下面对Flume中核心组件的基本概念进行介绍。
图2.4 Flume的核心组件
(1)source。
source用于接收外部源发送的数据,完成对日志数据的收集,分成transtion和event,不同的source接收不同的数据格式。直接读取文件Source的两种方式为:ExecSource和SpoolSource,其中ExecSource以运行Linux命令的方式,持续地输出最新的数据;SpoolSource用来监测配置的目录下新增的文件,并将文件中的数据读取出来。
(2)channel。
channe是存储地,对source中提供的数据进行简单的缓存,直到有sink消费掉channel中的数据。channel中的数据直到进入下一个channel中或者进入终端才会被删除。当sink写入失败后,可以自动重启,不会造成数据丢失,因此很可靠。Channel有MemoryChannel、JDBC、Channel、MemoryRecoverChannel、FileChannel等多种方式。其中,MemoryChannel可以实现高速的吞吐,但是无法保证数据的完整性。MemoryRecoverChannel已经建议使用FileChannel来替换。FileChannel保证数据的完整性与一致性。但建议FileChannel设置的目录和程序日志文件保存的目录设成不同的磁盘,以便提高效率。
(3)sink。
sink消费Channel中的数据,进行相应文件系统、数据库的存储,或者提交到远程服务器。对现有程序改动最小的使用方式是直接读取原来记录的日志文件,基本可以实现无缝接入,不需要对现有程序进行任何改动,在日志数据较少时,可以将数据存储在文件系统中,并且设定一定的时间间隔保存数据。在日志数据较多时,可以将相应的日志数据存储到Hadoop中,以便于日后进行相应的数据分析。
Flume有两种工作模式,分别为Push Sources和Polling Sources。Push Sources:外部系统会主动地将数据推送到Flume中,如RPC、syslog。Polling Sources:Flume到外部系统中获取数据,一般使用轮询的方式,如text和exec。
下面将对Flume运行机制进行简单的介绍。
Flume的运行机制见图2.5,其中虚线代表的是节点间通信。作为收集日志的每个agent节点定期通过RPC方式向master节点发起心跳,master节点通过监听与回调处理RPC请求,并将配置的更改信息发送给agent,collector与master的通信原理与agent相同。实线代表的是数据流的流向。Agent节点按照配置定期地收集日志并将数据发送给collector节点,collector节点可以接收多个agent节点的数据并按照配置中指定的时间间隔,将汇集后的数据流转到目标节点HDFS。
图2.5 Flume运行机制图
上述流程中如果agent或者collector节点在指定的间隔时间内未能发送消息给master,master将其视为“死亡”,如果是collector A死亡,则master可以按照配置将collector A负责收集的agent A、agent B两个节点的数据发送给collector B。
2.分布式文件存储系统HDFS
Hadoop的分布式文件系统HDFS(Hadoop Distributed File System)可以部署在多台廉价的机器硬件上,能够安全可靠地存储TB级甚至PB级的海量非结构数据,一个HDFS实例的文件数目可以达到千万个。HDFS采用流式数据访问模式来读取数据,它的基本思想是一次写入,多次读取。存储超大文件的文件系统HDFS可以和MapReduce分布式编程模式相结合,能够为应用程序提供高吞吐量的数据访问,适用于大数据集应用,比如基于文件的分析和挖掘应用。
HDFS集群规模可以动态扩展,节点可以动态加入集群中,以满足不断增长的数据规模,一个集群里的节点数据可达数百个。当节点发生故障时,设计上需要考虑具备错误检测和快速自动恢复的功能。
HDFS有几个基本概念,我们在这里进行简单的介绍。
1)块
一个磁盘上读写的操作单位是块,最基本的操作方式是将一个块一次性地从磁盘中读取到内存中或写入到磁盘中。任何一个文件系统都是通过处理大小为一个磁盘块的整数倍数的数据块来运作这个磁盘,文件系统块一般为几千字节,而一个磁盘块一般为512个字节。这些值息对用户来说都是透明的,都由系统来维护。
HDFS是一个文件系统,它也遵循按块的方式进行文件操作的原则。在默认情况下,HDFS块的大小为64MB。也就是说,HDFS上的文件会被划分为多个大小为64MB(缺省时)的数据块。当一个文件小于HDFS块的大小时,HDFS不会让这个文件占据整个块的空间。一般来说,HDFS的块要比磁盘的块要大,因为如果HDFS块太小,那么大量的时间将会花在磁盘块的定位时间上,如果这个块足够大,那么就可以减少定位这个块开始端所需要的时间,即减小了寻址开销。
在分布式文件系统中使用块的方式会有以下三个好处。
(1)一个文件可以大于网络中任意一个磁盘的容量。文件是以块的方式进行存储,这些文件分块不需要存储在同一个磁盘上。这也正是使用分布式文件存储大数据的原因。
(2)使用块而不是文件可以简化存储子系统。因为块的内容和块的数据是分开存放和处理的,所以可以分而治之来管理。
(3)块适合于提供容错和复制操作。每个块都会以副本的形式在其他机器上进行存储,这样可以防止当磁盘发生故障时导致数据的丢失,当一个块损坏时,系统会从其他节点读取另一个副本。
2)名称节点与数据节点
HDFS集群中有两类节点,以主从模式运行。一类是名称节点(namenode),也是主服务器,另一类是数据节点(datanode),也是从服务器。一般在一个集群中只有一个名称节点和有多个数据节点。
名称节点管理文件系统的命名空间,它维护着这个文件系统树及这个树内所有的文件和索引目录。它以命名空间镜像和编辑日志两种形式将文件永久保存在本地磁盘上。名称节点也记录着每个文件的每个块所在的数据节点,但它并不永久保存块的位置,因为这些信息会在系统启动时由数据节点重建。
数据节点负责文件数据的具体存储。它们存储并提供定位块的服务,并且定时向名称节点发送它们存储的块的列表。
没有名称节点,文件系统将无法使用,如果运行名称节点的机器损坏,文件系统上所有的文件将会丢失。所以HDFS提供了两种机制来解决因为名称节点无法使用而给整个系统带来的风险。第一种方法是复制组成文件系统的元数据为持久化的文件,最常用的配置方法就是在本地磁盘写入的同时,写入到一个远程的NFS挂载的磁盘中。第二种方法是运行一个二级名称节点,它会保存通过编辑日志合并后的命名空间镜像副本,只能在名称节点失效后才能够使用。
下面对HDFS的运行机制进行简单的介绍。
HDFS采用流式数据访问模式来读取数据。其中客户端和名称节点(namenode)之间传递的是指令流;客户端和数据节点(datanode)之间传递的是数据流。下面通过客户端、名称节点和数据节点之间的指令流和数据流来分析HDFS的运行机制。
(1)文件读取。
文件读取的数据流和指令流如图2.6所示,该图所表示的是一个跨多节点的进行文件读取内部事件时序,对用户而言,这一切都是透明的。
图中对文件进行读取一共有5个步骤,下面分别对每个步骤的操作过程进行简单说明。
第1步:HDFS客户端调用DistributedFileSystem的open()函数打开指定的文件。
第2步:DistributedFileSystem用RPC调用元数据节点,得到文件的数据块信息。对于每一个数据块,元数据节点返回保存数据块的数据节点的地址。DistributedFileSystem返回FSDataInputStream给客户端,用来读取数据。
第3步:客户端调用DFSIuputStream的read()函数开始读取数据。
图2.6 文件读取过程
第4步:数据节点将块数据返回给客户端,块数据读取完毕后,DFSInput-Stream对象将关闭与该数据节点的连接,然后连接此文件下一个数据块的最近的数据节点,读取数据。重复此步骤至所有的数据块读取完毕。
第5步:HDFS客户端从流中读取数据时,当客户端数据读取完毕的时候,就调用FSDataInputStream的close()函数关闭该数据输入流。
在客户端读取数据时,如果客户端与数据节点的通信中出现错误,客户端便会尝试连接包含此块数据的下一个数据节点。客户端会记住故障节点,以保证不再对此块进行尝试。
这种由客户端直接联系名称节点指定的块所在的数据节点的设计模式,可以把数据流动分散在各数据节点上进行。这样设计的好处有:一是可以扩展客户端的并发数量,二是名称节点只采用指令流与客户端进行块位置信息的交互,而不提供数据,可以避免名称节点成为数据读写的“瓶颈”。
(2)文件写入。
文件写入的数据流和指令流如图2.7所示,它比数据读取的步骤要复杂一些,因为它需要在写入时进行副本的复制操作。文件写入的流程也是一个跨多节点的文件写入内部事件时序,对用户而言,这一切也都是透明的。
在图中,从客户端发起创建请求到关闭共有7个步骤,下面分别对每个步骤的操作过程进行简单说明。
第1步:HDFS客户端调用DistributedFileSystem对象的create方法,创建一个文件输出流(DFSOutputStream)对象。
第2步:通过DistributedFileSystem对象与Hadoop集群的名称节点进行一次RPC远程调用,在HDFS的Namespace中创建一个文件条目。
图2.7 文件写入过程
第3步:通过DFSOutputStream对象,向数据节点写入数据,数据首先被写入DFSOutputStream对象内部的Buffer中,然后数据被分割成一个个Packet数据包。
第4步:以Packet为最小单位,基于Socket连接发送到按特定算法选择的HDFS集群中一组数据节点(正常是3个,可能大于等于1个)中的一个节点上,然后在这组数据节点组成的管线上依次传输Packet数据包。
第5步:这组数据节点组成的管线反方向发送确认响应,最终由管线中的第一个数据节点将确认响应发送给客户端。
第6步:完成向文件写入后,客户端就会在文件输出流(DFSOutputStream)对象上调用close()函数,关闭数据流。
第7步:调用DistributedFileSystem对象的complete()函数,通知名称节点文件写入成功。
在数据写入期间,如果数据节点发生故障,那么管线首先会被关闭,并且把队列中的所有Packet数据包都退回到数据队列中,以确保不会由于节点故障而丢失任何数据包,故障节点会从管线中删除,余下块的数据会写入管线中剩余的两个好的数据节点。
3.分布式并行计算框架MapReduce
MapReduce是一种分布式软件编程框架,按照这个框架,分布式编程会变得很容易。无论是HDFS分布式文件系统还是HBase分布式数据库系统,其非结构化数据处理和结构化数据处理都是依赖于MapReduce来进行的,如文件的读写、数据库的增删改查都需要经MapReduce的函数处理才能完成。
MapReduce的运行依赖于分布式作业系统。分布式作业系统也是主从模式的,它由一个作业主节点(JobTracker)和多个任务从节点(TaskTracker)构成。MapReduce的客户端向作业主节点发出一个数据处理请求,JobTracker接收到请求后,会将MapReduce的程序代码通过网络传输到多个TaskTracker上,由多个作业节点调用MapReduce程序对本地的数据进行处理。
MapReduce的函数从代码层面来看,由一个map()函数和一个reduce()函数构成。map()函数用于数据的分析,reduce()函数用于对分析结果的归纳处理。map()函数和reduce()函数均采用<key,value>键值对作为数据输入输出的结构。
想要掌握MapReduce分布式编辑模型,必须要对几个重要的概念理解清楚,主要有map()函数、reduce()函数、<key,value>键值对、分布式作业系统等。下面将对这些内容进行简单的介绍。
1)map()函数
map()函数的输入数据是由任务节点分配的预先已分割成固定大小的数据片段(splits),也就是数据节点上的一个数据块(默认是64MB)。这个数据片段是由任务节点将其变为一组<key,value>键值对逐条传递给map()函数的,这组键值对叫作源键值对。
Hadoop会为每一个split创建一个map任务,这个任务是用map()函数对源键值对的键和值按程序定义的规则进行处理,生成中间键值对。比如,取出源键值对值中的一个单词作为中间键值对,将单词在值中的出现次数作为值。如果将源键值对抽象地看作<K1,V1>,那么中间键值对会被抽象地看作<K2,V2>。map()函数的数据流模型如图2.8所示。
图2.8 map()函数的数据流模型
对于中间结果,可以指定另一个处理函数进行排序处理,这个处理函数叫combine,其处理的输入是<K2,V2>,也就是map()函数的输出,处理方法是将K2值相同的V2值组合成一个数组,形成<K2,[V2-1,V2-2,…]>的键值对。这就形成了map阶段最终端的输出键值对。
在这里要注意map阶段、map任务、map()函数、combine()函数四者的不同。map阶段是由map()函数调用活动和combine()函数活动按先后时序构成的一个活动阶段;map任务是调用map()函数的组织工作,是分布式作业系统通过驱动map任务来调用map()函数的;map()函数是指对源键值对的处理函数。combine()函数是指对map()函数的输出结果进行组合的函数。
2)reduce()函数
任务节点首先会把不同map任务输出的中间数组整合起来并进行排序产生<K2,[V2-1,V2-2,…]>键值对,然后调用用户自定义的reduce()函数,对输入的<K2,[V2-1,V2-2,…]>键值对进行处理,得到键值对<K3,V3>,并将结果输出到HDFS上。<K3,V3>又叫目标键值对。reduce()函数的数据流模型如图2.9所示。
图2.9 reduce()函数的数据流模型
reduce任务的数量是由mapred-site.xml配置文件中的mapred.reduce.tasks的属性值来决定的,该属性值的默认值是1。开发人员可以通过调整配置文件或job.setNumReduceTasks()的方法进行设定。
将map任务输出的中间数组整合起来的工作称为shuffle过程,它将输出的结果按照key值分成N份(N是Reduce的任务数),其划分方法采用哈希函数,如“hask(key)mod N”。这样可以保证某一范围内的key由统一的Reduce来集中处理。
同map()函数一样,reduce()函数也要注意reduce阶段、reduce任务、reduce()函数这三者的不同。reduce阶段是由shuffle()函数调用活动和reduce()函数按先后时序构成的一个活动阶段;reduce任务是调用reduce()函数的组织工作,是分布式作业系统通过驱动reduce任务来调用reduce()函数的;reduce()函数是指对中间键值对的处理函数。
3)键值对
<key,value>是键值对,key本质上是一个广义数组的下标,而value是一个这种广义数组下标对应的值,所以大家可以把键值对理解成一个数组的下标和值。采用键值对作为原始数据、中间数据到目标数据的一种描述方式,这种方式去掉了数组名和数组值的型,只留其关键的部分。这种方式的好处,一是可以使MapReduce这种分布式编程模型适合非结构化、结构化、半结构化的开发,不用受到值类型的约束;二是也符合于布式作业系统遵循好莱坞原则的设计思想,无论是什么样的MapReduce应用,其传入传出参数方式对外都一样,而键和值的不同都封装在MapReduce的函数内部来处理;三是编程模式非常简单,各阶段的处理数据无论是否有健,也无论是否有值,对于分布式作业系统和MapReduce来说都不用改变规则来处理。
在MapReduce处理阶段中键值对分为源键值对、中间键值对、中间集合键值对和目标键值对四种类型。
(1)源键值对是由分布式作业系统基于数据节点上的数据块生成的键值对。其键可以是内容的位置序号,也可以是数据块本身已有的键。键值的生成方式则以在MapReduce提交任务运行时指定。
(2)中间键值对是map()函数处理后的键值对,其键和值将根据Map定义的规则来生成。
(3)中间集合键值对是由map阶段的combine()函数基于中间键值对按照key值相同的原则进行集合归类生成的。这样可以减少传输数据量和网络带宽的消耗。
(4)目标键值对是reduce()函数处理后的键值对,其键和值将根据Reduce的定义规则来生成。
下面对HDFS的运行机制进行简单的介绍。
分布式作业系统是一个复杂的系统,保障其高效、可靠、安全地运行是系统设计目标之一。分布式作业系统建立了MapReduce的作业从运行开始直到运行完成的作业运行机制,同时也建立了作业调度、任务执行故障处理的作业管理机制,通过这些机制的建立来确保设计目标的实现。在这里我们主要介绍作业运行机制。
MapReduce的map任务和reduce任务都是在JobTracker的统一调度下由TaskTracker来执行的。而任务从发起到执行返回,一切都源于MapReduce提交作业的那一刻。在作业提交到JobTracker服务端的过程中,JobTracker提供给MR客户端JobClient的runjob做了大量的工作。整个MapReduce的运行时序关系图如图2.10所示。
从图中可以看出,MapReduce程序的运行时序共有10个步骤,下面对每一个步骤进行具体分析。
第1步:申请作业。
MR客户端调用作业job接口,按照作业接口的要求设置作业名称、输入数据路径、输出数据路径、输入数据的格式、输出数据的格式、MapReduce的Jar类路径、map类的类名、reduce类的类名等信息,并调用Job接口发出作业申请。
第2步:获取作业ID。
JobClient与JobTracker建立通信连接,申请获取一个新的作业ID,然后Job-Tracker返回一个新的作业ID给MR客户端。
图2.10 MapReduce的运行时序关系
JobClient先检查数据输出路径是否设置,如果没有设置则返回出错信息给客户端。JobClient与NameNode建立通信连接,检查设置数据输出路径是否已经存在,如果已存在则返回出错信息给客户端。
JobClient再检查数据输入路径是否设置,如果没有设置则返回出错信息给客户端。JobClient与NameNode通信,获取输入路径指向的文件的分片信息,如果无法获取,则返回出错信息给客户端
第3步:上传MR资源。
JobClient调用HDFS提供的文件接收FileSystem,将作业所需的MapReduce的Jar文件、配置文件和计算所得的输入分片信息,上传到以作业ID命名的目录中。同时根据配置参数mapred.submit.replication的值,在目录中进行Jar副本的复制。
第4步:提交作业。
JobClient在第2步、第3步成功完成后,向JobTracker发出作业环境已准备好的信息。之后MR客户端将每秒轮询作业的执行进度,将作业过程中的信息输出到控制台。作业成功完成后,将显示作业的计数器信息。如果作业失败,则将作业失败信息显示在控制台上。
第5步:初始化作业。
JobTracker自此真正开始履行作业调度和任务分配的工作职责。JobTracker会将提交的作业放入一个内部队列,该队列由作业调度器进行调度。作业调度器会对该作业进行初始化,类似编制作业计划。这个工作包括创建一个维护该作业的上下文对象,对象由任务列表信息和记录信息构成,作业调度器将依据这些条目获取任务的状态和作业的完成进度信息。
第6步:获取分片信息。
JobTracker在编制任务列表信息时,作业调度器调用HDFS的文件系统访问接口依据作业ID去获取上传在HDFS的MR资源中的输入分片信息,也就是输入文件的数据节点上存放的数据块的信息。然后根据获取的分片信息,对每一个分片在任务列表创建一个map任务项。任务列表中reduce任务项的数量将会由配置文件中mapred.reduce.tasks的值来决定,作业调度器根据这个值在任务列表中创建相应数量的reduce任务。每个任务都会有个ID号,记录在任务列表中。
第7步:任务分配。(www.daowen.com)
TaskTracker会定期发送心跳信息给JobTracker,一方面是告诉JobTracker它是否还存活,另一方面也是双方的信息交互通道。每一次发送的心跳信息中包含有TaskTracker是否已经准备好运行新的任务,如果已经准备好,JobTracker将会为其分配一个任务列表中的任务。
针对map任务和reduce任务,TaskTracker有固定数量的槽,用以在说明任务节点的同时确定运行任务的数量,任务的数量将由TaskTracker设置的槽的数量和所在机器的内存情况来决定。
TaskTracker在执行任务时遵循这样的准则:在处理槽中的reduce任务之前,会先填满空的map任务槽。
要选择一个reduce任务,JobTracker只是简单地从尚未运行的reduce任务列表中选取下一个来执行;对于map任务,JobTracker会考虑TaskTracker的网络位置并选取一个离输入分片最近的TaskTracker。在理想情况下,map任务的执行都是数据本地化的,即输入分片与TaskTracker是在一个机器节点上。
第8步:取回MR资源。
TaskTracker已经被分配了任务,在任务执行前,它将通过HDFS的文件系统访问接口,依据作业ID去获取上传在HDFS的MR资源中的Jar文件,并将其复制到TaskTracker所在机器的本地磁盘中。同时将应用程序所需要的全部文件从分布式缓存中复制到本地磁盘中,在本地磁盘中为任务新建一个工作目录,将Jar文件解压到这个工作目录下。
第9步:环境加载。
TaskTracker会新建一个TaskRunner实例来运行分配的任务。TaskRunner是一个本地的Java虚拟机,它是TaskRunner的一个子进程,在这个子进程上来执行分配的任务。这样做的目的是屏蔽因为用户编制的map()函数和reduce()函数对TaskTracker的影响。
父子进程通过unbilical接口与父进程通信。子进程会每隔几秒告知父进程它的进度。
第10步:执行任务。
TaskRunner启动一个本地的Java虚拟机,作为它的一个子进程,在这个子进程上来执行分配的任务。该子进程会根据任务的性质来调用用户编制的map()函数和reduce()函数。
4.分布式数据库HBase
HBase是一个构建在HDFS之上的分布式面向列存储的数据库系统。尽管已经有很多数据存储、访问的策略和实现方法,但不可否认的是大多数关系型数据库系统都是重数据生产,轻数据应用的,具体表现为都没有考虑到大规模数据和分布式的特点。
许多关系型数据库系统通过复制和分区的办法来扩展数据库,使其突破单个节点的限制,一是这些功能通常是后加的,安装和维护复杂;二是原有的代码也需进行重构来适应变化。HBase是从另一个角度来解决处理伸缩性的问题的。它通过线性方式来不断增加节点进行规模的扩展,数据库表结构可以动态扩展,随着时间变化,同一个数据库表的表结构可以不同。
HBase在设计时主要需要考虑以下六个问题。
(1)HBase在读和写时要遵循一定的规则。HBase的文件管理是继承的HDFS文件系统管理,集成了HDFS文件系统的文件创建、存储、读写等方式,但所有HBase的文件中的内容是有统一结构的。HBase的文件内容格式是HDFS文件内容格式的一个特例,是一种有结构的内容存储格式。
(2)HBase要求结构统一。传统的关系型数据库是以关系型记录的方式进行存储的。这种方式会使数据库结构过于严格,当数据库表结构变化时,相应的整个表的存储也要跟着变化。对HBase而言,无论外部表现的结构字段是多少,其内部存储的结构始终不变,并且允许同一个表的结构也可以随时间变化而动态变化。
(3)HBase需要支持动态的数据库表访问。由用户来指定要访问的表名、表列、表行,然后HBase根据用户要求来组织对数据库表的访问,而不以强制约束的固定语言结构,如SQL来实现对数据库表的访问。
(4)HBase的查询性能不会因为数据量的不断增长而降低。而这点也是传统的数据库最为薄弱的地方。
(5)HBase还应该在数据量不断增长时,允许快速地、动态地加入物理节点。节点的加入不会影响数据库的性能,也不会带来数据处理逻辑的变动。
(6)HBase应提供各类接口供不同的客户端访问,如采用Web的方式访问接口、Java的API接口、直接REST协议的接口等。
分布式数据库也是一种数据库,数据库必然是由表、行、列、键、数据内容等构成。分布式数据库的表、行、列、键等内容名称与传统关系型数据库虽然相同,但其含义存在一定区别,并且由于分布式的特点,分布式数据库还引入了一些新的概念,要掌握分布式数据库,就必须从了解这些基本概念开始。
HBase的表和传统数据库表不同,它是一个多表集合的表,与主题库的概念类似。传统的数据库表只存储一个结构的所有数据,比如学生表、成绩表等。而分布式数据库表的概念是一个与主题相关的多个表都存在这张表中,比如一个学校主题库,学校主题库由学校的教职工信息、财务信息、学生信息等构成,这些表虽然结构不同,但有一个特征,就是主键(也即行键)都是学校的编号。
HBase的数据库表在逻辑上也是在表格里存储了多行数据。本质上每行也是一个关系型记录。HBase数据库表上的每行是由行关键字(row key)、数据的列(column)、时间戳(time stamp)三个部分构成,其中数据的列是动态可变的,如果假定其不变的话,用户可以将HBase表理解为由三个字段,即行ID、内容、时间构成的一个关系表。
(1)行关键字。
行关键字是在表中的唯一标识,这里的唯一是区别于其他行键的,不是说在这个表中只有唯一一行的意思,而是这个行关键字在表中物理存储时会有多行。在HBase中行键也作为索引的主键。在HBase中访问表中的行有三种方式,一是通过单个行键访问;二是给定行键范围访问;三是全表扫描。行键可以是任意不大于64KB的字符串,它不是按照数据存入数据库的时间顺序进行存储的,而是按照字典序进行顺序存储的。
(2)数据的列。
列定义为<family>:<label>(<列簇>:<标签>),通过列簇和标签唯一地确定一个数据存储的数据。列簇的定义和修改操作需要有管理员权限,而标签可以随时增加。用户可以简单地理解为列簇就是表名、标签就是字段名。HBase在磁盘上是按照列簇来存储数据的,一个列簇里的所有项都有相同的读写方式。
(3)时间戳。
时间戳是每次数据操作时系统产生的时间。HBase的更新操作有时间戳,对每个数据单元,只存储指定个数的最新版本,客户端可以指定查询某个时刻的数据,或者一次得到所有数据的版本。如果查询时没有指明时间戳,那么会返回指定最新的数据值,并且最新的值在表格里也是最先找到的,这也就是经常说的倒排方式。
如果将逻辑模型理解为HBase中一个行整体是由N个列簇构成的整体模型的话,那么物理模型就是HBase中实际存储的具体方式。一个行逻辑上由N个列簇构成,而在物理上,HBase表是由行+列簇+时间三列构成的N行。从物理上看,所有列簇成员在文件系统中被存储在一起。所以HBase是一种面向列的存储,确切地说是面向列簇的存储。下面,我们将在物理模型方面对HBase的基本概念进行简单的介绍。
(1)区域。
表在HBase上存储的地方即为域,每个区域包含表中所有行的一个子集。一个表的域初始只有一个域,这个域中存着表的数据,由于表会随着时间变化其数据规模将不断增大,一个域到了一定存储量时,将会划分成大小差不多的两个新区域。在第一次划分开始前,所有正在载入的数据将会存在主机服务器上。划分后,一张表就有了两个域,分别存放在两台机器上。区域是分散在HBase集群上的单元,也是程序运行处理的运算对象单元,因此对于任何一个服务器,再大的表都可以由服务器集群来处理。它们通过管理整个区域某部分的节点来管理整个表。原则上在一台机器上一个表只有一个域,一台机器上可以存放多张表。
(2)基本单元。
HBase分布式数据库中有四个基本单元,分别是表、行键、列簇、区域。对于客户端而言,用户的请求总是对应着一张表的,无论HBase的表内是否还有表,用户只知道外部表的存在。对于表中的列簇,通俗地说是用户访问这张表的内部表,用户通过指定外部表名和列簇就可以访问到表中列簇对应的数据内容,行与列定位的数据也叫单元数据。所以表是用户与HBase进行交互的一个基本单元之一。
表中行键的值是以字节数组的形式存在的,行键是确定行在表中的唯一标识。表用行键对表里的行进行排序,对于一些经常需要同时读取的行,在对HBase表进行设计时需要注意把它们集中存储在一起,以便能够快速读取数据。表中的行键对于用户而言,是定位表中行的一个概念单元;对于开发人员而言,是一个需要重点设计的单元;对于系统存储而言,是数据排序的唯一依据单元。
(3)Region服务器。
Region是HBase中分布式存储和负载均衡的最小单位,一个表的区域会分布在不同的Region服务器上。一个Region内的数据只会存储在一台服务器上。物理上所有数据都通过调用HDFS的文件系统接口存储在机器上,并由Region服务器来提供数据服务。通常一台机器上运行一个Region服务进程(RegionServer),每个进程管理多个Region实例。
一个RegionServer在机器上只维护一个HLog,它是用来做灾难备份的,它使用的是预写式日志。在集群模式下运行时,HLog日志的文件写在HDFS分布式文件系统中,而不是写在本地机器中。这样即使RegionServer所在机器发生故障,也不会丢失。当Master主服务器知道一个RegionServer失效后。它会按Region-Server的区域划分提交日志,即每个Region一个日志,在新的机器节点加入进来后,Master主服务器会将Region划分逐个恢复到机器上,然后重新运行。
来自不同表的Region日志写在单个文件中,这样做的好处是可以减少为了写多个文件而对磁盘的寻址次数,缺点是如果一台RegionServer失效,为了恢复其上的Region,就需要将HLog拆分,然后分发到其他服务器来恢复Region。Region-Server内部结构逻辑模型如图2.11所示。
(4)Master主服务器。
HBase在集群中某个时段内只有一个HMaster(主服务器)在运行,HMaster(主服务器)是一个内部管理者,它主要管理的对象是逻辑Region和Region服务器,它负责将Region分配给Region服务器,协调Region服务器的负载等;它主要的协同对象是Zookeeper,通过Zookeeper感知到HRegionServer的故障终止后,并处理相应的HLog文件,然后将失效的Region进行重新分配。它不和HBase客户端进行交互。这是它与HDFS客户端和MapReduce客户端的不同之处。
由于HMaster只维护表和Region的元数据而不与数据产生输入输出过程,HMaster失效仅会导致所有的元数据无法被修改,但表的数据读写还是可以正常进行的。
(5).META.元数据表。
一个表对应多个Region的元数据,如表名、表在Region的起始行、结束行、Region所在的机器IP地址等这些元数据都会被存在HBase创建的.META.表中。随着Region的增大,.META.表的数据也会增大,.META.表也会被分到多个Region。为了定位.META.表在哪些机器的Region上,又需要把.META.表对应的多个Region的元数据,如.META.表中涉及的表名、.META.表所在Region的机器的IP地址等保存到HBase创建的一个-ROOT-表中。.META.表的Region与各个表的Region之间的关系如图2.12所示。
图2.11 RegionServer内部结构逻辑模型
.META.表是用来说明各个表所在Region和各个表起始行键这些表的内部信息的。当用户要访问某张表的某个行的信息时,就需要在.META.表进行查询是否有此表,表所在的行是否存在,如果行存在是在哪些Region上的,以便用户可以快速定位Region,并与Region进行数据交互。
(6)-ROOT-数据表。
-ROOT-表是所有.META.表所在Region的统一代理,是用户访问表数据时必经的关口,只有从-ROOT-表中取到.META.表的信息,才能获悉Region所在机器的地址信息,才能与其建立联系,从而实现对数据的访问。-ROOT-表永远不会被分割成多个区域,它只存在于一个Region中,这样可以保证最多需要三次跳转——-ROOT-表、.META.、Region服务器,就可以定位所需的Region。
客户端会将查询过的位置信息缓存起来,且缓存不会主动失效。当客户端根据缓存信息访问不到数据时,则询问持有相关.META.表的Region服务器,试图获取位置信息,如果还是失效,则询问-ROOT-表相关的.META.表在哪里。最后如果前面的信息也失效,则通过Zookeeper重新定位Region信息,所以如果客户端上的缓存全部是失效的,则需要进行6次网络来回,才能定位到正确的Region。-ROOT-表与.META.表的关系模型如图2.13所示。
图2.12 .META.表的Region与各个表的Region之间的关系
-ROOT-表的位置信息是在Zookeeper中的一条记录。所有客户端访问用户数据前都需要先访问Zookeeper以获得-ROOT-表所在Region的位置信息,然后访问-ROOT-中.META.表的位置,最后根据.META.表中的信息确定用户数据所存放的RegionServer服务器地址。
下面对HBase的运行机制进行简单的介绍。
分布式数据库系统的操作请求主要是由客户端发起,在运行过程中HMaster不主动参与数据读写,而是由客户端与HRegionServer进行交互。客户端根据数据库的大小可以同时向多台HRegionServer发起请求,HRegionServer以分布式并行的方式来处理请求。
图2.13 -ROOT-表与.META.表的关系模型
HBase客户端与HBase系统的运行时序关系图如图2.14所示。
从图2.14所示的运行时序图可以看出整个运行过程共有7个步骤,下面对每一个步骤进行分析。
第1步:读写请求。
HBase客户端调用HBase的client端接口,告诉HBase系统要访问的数据库表名、列簇、行信息等。
第2步:缓存查询。
客户端会缓存它们已知的所有-ROOT-和.META.的地址,以及用户空间的起始行和结束行。HBase的client端先在自己的缓存中查询HRegionServer的主机节点,当查找不到时,则会向Zookeeper发起请求查询-ROOT-的位置信息。
第3步:获取-ROOT-。
客户端连接到Zookeeper后,获取到-ROOT-的地址。然后根据-ROOT-地址查找.META.区域的地址。在.META.表中存有所有要查询行的信息,客户端根据行信息在.META.表中查询行所对应的用户空间区域和HRegionServer的地址,然后客户端就可以与HRegionServer进行交互了。
图2.14 HBase客户端与HBase系统的运行时序关系图
第4步:提交请求。
客户端与HRegionServer建立连接,并提交客户的请求。客户端会根据.META.表中返回的HRegionServer机器数量来与这个机器建立连接,并提交请求。如果只是写入请求,则客户端只和一台HRegionServer机器建立连接,并发出写的请求。
第5步:状态检查。
当HRegionServer接收到请求后,如果是写请求,它会先把操作请求载入到提交日志中,将其加入内部缓存中进行处理;同时需要向Zookeeper发出写行锁的申请,将要写入的行进行加锁。
如果是读取操作,HRegionServer首先查询分布式缓存,如果满足客户要求则返回,否则要读取Region数据到内存进行查询。
第6步:返回数据。
当HRegionServer接收到请求并处理后,会将数据返回给客户端,至此用户的一个提交请求就结束了。结束后HRegionServer并不会将缓存文件清空,它可以加速下一次的操作请求。
第7步:日志提交。
日志数据先是存在每个HRegionServer的内存中,当达到一定阈值之后将其写入本地磁盘。然后由一个后台线程将其写到HDFS文件系统中,写完后告知HMaster主服务器,主服务器在监控HRegionServer失效后,可以将该域的日志文件从HDFS取出分配给新的节点。
很多用户会奇怪,在HBase集群中一个表的多个Region域在多台Region服务器上,但每个服务器上只有这个表的一个Region数据块,一旦这个Region服务器坏了,数据库表的数据不就丢失了吗?实际上日志数据就是做灾难备份的,存在HDFS上,而不存在HRegionServer中也就是这个道理。当一个Region服务器坏了,HMaster主服务器可以从HDFS取回日志来恢复Region服务器失效之前的数据,并将其写入新的HRegionServer中。
5.实时计算Storm
Hadoop主要专注于批处理,但是当我们需要处理一些实时信息的时候,Hadoop就显得力不从心。为了解决这个问题,一系列实时计算框架应运而生。Nathan Marz推出的Storm是应用最广泛的计算框架之一。Storm与Hadoop相比存在很多不同之处,首先,Hadoop主要处理大量的离线数据,这些离线数据必须已经存放在HDFS或者HBase中,而Storm是一个实时的流计算框架,处理的数据是在实时消息队列中。Hadoop关注的数据是一次写入,多次查询,而Storm关注的数据是多次处理,一次写入。Hadoop往往只在业务有需要时才调用数据,而Storm在系统运行起来后是持续不断调用数据的。
Storm具有很多的特点,如使用场景广泛、处理速度快、健壮性强、高可靠、可水平扩展、容错性好、快速、保证数据无丢失、拥有简单的编程模式等。这就确保了Storm包含比Hadoop更智能的处理方式,流程会由监管员来进行管理,以确保资源得到充分使用。
Storm是一套分布式的、可靠的、可容错的用于处理流式数据的系统,处理工作会被委派给不同类型的组件,每个组件负责一项简单的、特定的处理任务,如Storm集群的输入流由名为Spout的组件负责,Spout将数据传递给名为Bolt的组件,后者将以某种方式处理这些数据等。在这里我们将对这些概念进行简单的介绍。
1)主节点
主节点通常运行一个后台的守护进程,名为“Nimbus”,用于分配代码、布置任务、检测故障、响应分布在集群中的节点。它的作用类似Hadoop里面的JobTracker。
2)工作节点
每个工作节点同样会运行一个后台守护进程,名为“Supervisor”,用于监听主节点指派的任务并基于要求开始并终止工作进程。Nimbus和Supervisor都是快速失败和无状态的,这样一来它们就变得十分健壮。它的作用类似Hadoop里面的TaskTracker。
3)Zookeeper
Zookeeper是完成Supervisor和Nimbus之间协调的服务。而应用程序实现实时的逻辑则被封装进Storm中的“Topology”里。
4)Topology
Topology是Storm中运行的一个实时应用程序,因为各个组件间的消息流动形成逻辑上的一个拓扑结构。一个Topology是由Spouts(数据源)和Bolts(数据操作)组成的,通过Stream Groupings进行连接。一个Topology相当于一个MapReduce Job。
5)Tuple
Tuple是一次消息传递的基本单元。本来应该是一个key-value的map,但是由于各个组件间传递的Tuple的字段名称已经事先定义好,所以Tuple中只要按序填入各个value就行了,所以它就是一个Value List。
6)Stream
源源不断传递的Tuple组成的一条有向无界的数据流就是Stream。
7)Spout
Spout是在一个Topology中产生源数据流的组件。通常情况下Spout会从外部数据源中读取数据,然后转换为Topology内部的源数据。Spout分成可靠和不可靠两种。当Storm接收失败时,可靠的Spout会对Tuple进行重发;而不可靠的Spout不会考虑接收成功与否,只发射一次。
8)Bolt
Bolt是在一个Topology中接收数据然后执行处理的组件。Bolt执行过滤、合并、函数操作、访问文件/数据库等操作。Bolt从Spout中接收数据并进行处理,如果遇到复杂流的处理也可能将Tuple发送给另一个Bolt进行处理。
9)Stream Grouping
Stream Grouping定义了一个流在Bolt任务间该如何被切分。这里有Storm提供的6个Stream Grouping类型。
(1)随机分组(Shuffle Grouping):随机地把Tuple分发到Bolt的任务中,保证每个Bolt获得相等数量的Tuple。
(2)字段分组(Fields Grouping):根据指定字段分割数据流,并分组。例如,根据“user-id”字段,相同“user-id”的元组总是分发到同一个Bolt的任务中,不同“user-id”的元组可能分发到不同的Bolt的任务中。
(3)全部分组(All Grouping):Tuple被复制到所有的Bolt中。
(4)全局分组(Global Grouping):全部Tuple都分配到一个Bolt的同一个任务中。
(5)无分组(None Grouping):你不需要关心Stream是如何分组的。目前,无分组等效于Shuffle Grouping。但最终,Storm将把无分组的Bolt放到Bolt或Spout订阅它们的同一线程去执行(如果可能)。
(6)直接分组(Direct Grouping):这是一个特别的分组类型。消息发送者决定Tuple由消息接收者的哪个Task接收并处理。
Storm的工作流程如图2.15所示。
图2.15 Storm的工作流程
第1步:客户端提交Topology到Nimbus。
第2步:Nimbus针对该Topology建立本地的目录,根据Topology的配置计算Task,分配Task,在Zookeeper上建立Assignments节点存储Task和Supervisor机器节点中Woker的对应关系。
第3步:在Zookeeper上创建Taskbeats节点来监控Task的心跳,启动Topology。
第4步:Supervisor去Zookeeper上获取分配的Task,启动多个Woker进程,每个Woker生成一个Task,一个Task一个线程;根据Topology信息初始化建立Task之间的连接;Task和Task之间是通过ZeroMQ管理的。
所有Topology任务的提交必须在Storm客户端节点上进行(需要配置storm.yaml文件),由Nimbus节点分配给其他Supervisor节点进行处理。Nimbus节点首先将提交的Topology进行分片,分成一个个的Task,并将Task和Supervisor相关的信息提交到Zookeeper集群上,Supervisor会去Zookeeper集群上认领自己的Task,通知自己的Worker进程进行Task的处理。
和同样是计算框架的MapReduce相比,MapReduce集群上运行的是Job,而Storm集群上运行的是Topology。但是Job在运行结束之后会自行结束,Topology却只能被手动的Kill掉,否则会一直运行下去。
Storm不处理计算结果的保存,这是应用代码需要负责的事情,如果数据不大,可以简单地保存在内存里,也可以每次都更新数据库,还可以采用NoSQL存储。
数据存储之后的展现,也是用户需要自己处理的,Storm UI只提供对Topology的监控和统计。总体的Topology处理流程如图2.16所示。
图2.16 Topology处理流程
第1步:Storm提交后,会把代码首先存放到Nimbus节点的inbox目录下,之后,会把当前Storm运行的配置生成一个stormconf.ser文件放到Nimbus节点的Stormdist目录中,在此目录中同时还有序列化之后的Topology代码文件。
第2步:在设定Topology所关联的Spouts和Bolts时,可以同时设置当前Spout和Bolt的Executor数目和Task数目,默认情况下,一个Topology的Task的总和是和Executor的总和一致的。之后,系统根据Worker的数目,尽量平均地分配这些Task的执行任务。Worker在哪个Supervisor节点上运行是由Storm本身决定的。
第3步:任务分配好之后,Nimbus节点会将任务的信息提交到Zookeeper集群,同时在Zookeeper集群中会有Workerbeats节点,这里存储了当前Topology的所有Worker进程的心跳信息。
第4步:Supervisor节点会不断地轮询Zookeeper集群,在Zookeeper的assignments节点中保存了所有Topology的任务分配信息、代码存储目录、任务之间的关联关系等,Supervisor通过轮询此节点的内容,来领取自己的任务,启动Worker进程运行。
第5步:一个Topology运行之后,就会不断地通过Spouts来发送Stream流,通过Bolts来不断地处理接收到的Stream流,Stream流是无界的。最后一步会不间断地执行,除非手动结束Topology。
6.分布式消息队列服务系统Kafka
Kafka是用于日志处理的高吞吐量分布式发布订阅消息系统,具有高吞吐量、可通过磁盘数据结构提供消息的持久化、支持通过Kafka服务器和消费机集群来分区消息以及支持Hadoop并行数据加载等特点。其目的是通过Hadoop的并行加载机制来统一线上和离线的流数据消息处理,通过集群机来提供实时的消费。
Kafka集群由多个Kafka实例组成,Kafka对消息保存根据Topic进行归类,发送消息者成为Producer,消息接收者成为Consumer,都依赖于Zookeeper来保证系统的可用性,集群保存一些meta信息。Kafka的一些基本概念介绍如下。
1)Topics/logs
Topic表示消息,每个Topic将被分成多个Partition,每个Partition在存储层面是append log文件。每条消息在文件中的位置称为offset(偏移量),唯一标记一条消息,不支持对消息进行“随机读写”,Partitions的主要目的是通过分区可以将日志内容分散到多个server上,来避免文件尺寸达到单机磁盘的上限。
2)Distribution
一个Topic的多个Partitions被分布在Partitions集群中的多个server上;每个server负责Partitions中消息的读写操作;此外通过配置Partitions备份个数到多台机器上提高可用性。
3)Producers
Producer将消息发布到指定的Topic中,决定此消息归属的Partition。
4)Consumers
每个Consumer属于一个Consumer Group;发送到Topic的消息,只会被订阅此Topic的Group中的一个Consumer消费,Consumer端向Broker发送“fetch”请求,并告知其获取消息的offset;此后Consumer将会获得一定条数的消息;Consumer端也可以重置offset来重新消费消息。如果所有的Consumer都具有相同的Group,消息将会在Consumers之间负载均衡。如果所有的Consumer都具有不同的Group,消息将会广播给所有的消费者。
5)Guarantees
发送到Partitions中的消息将会按照它接收的顺序追加到日志中。对于消费者而言,它们消费消息的顺序和日志中消息的顺序一致。如果Topic的“replicationfactor”为N,那么允许N-1个Kafka实例失效。
6)Kafka的消息传送机制
(1)消息传输有且只有一次(Exactly-once):在Kafka中的表现形式。
(2)最多一次(At-most-once):最多一次,消费者fetch消息,保存offset,然后处理消息;当client保存offset之后,但是在消息处理过程中出现了异常,导致部分消息未能继续处理,那么此后“未处理”的消息将不能被fetch。
(3)消息至少发送一次(At-least-once):消息至少发送一次,消费者fetch消息,处理消息,然后保存offset。如果消息处理成功之后,但是在保存offset阶段Zookeeper异常导致保存操作未能执行成功,这就导致接下来再次fetch时可能获得上次已经处理过的消息。
(4)消息只会发送一次(Exactly-once)。
下面将对Kafka实现机制进行简单介绍。
第1步:启动Zookeeper的server。
第2步:启动Kafka的server。
第3步:Producer如果产生了数据,使用Zookeeper来“发现”Broker列表以及和Topic下每个Partition Leader建立socket连接并发送消息。
第4步:Broker端使用Zookeeper来注册Broker信息,以及监测Partition Leader的存活性。
第5步:Consumer如果要消费数据,使用Zookeeper来注册Consumer信息,其中包括Consumer消费的partition列表等,同时也用来发现Broker列表,并和partition leader建立socket连接,并获取消息。
Kafka消息交付实现具有如下特点。
(1)对消息的重复、丢失、错误以及顺序没有严格的要求。
(2)提供at-least-once delivery,即当Consumer宕机后,有些消息可能会被重复delivery。
(3)因每个partition只会被Consumer group内的一个Consumer消费,故Kafka保证每个partition内的消息会被顺序地订阅。
(4)为每条消息计算CRC校验,用于错误检测,CRC校验不通过的消息会直接被丢弃掉。
Kafka运行机制如图2.17所示。
图2.17 Kafka运行机制
免责声明:以上内容源自网络,版权归原作者所有,如有侵犯您的原创版权请告知,我们将尽快删除相关内容。