您的位置:首页 > 大数据

《storm实战-构建大数据实时计算读书笔记》

2017-01-05 10:24 429 查看

自己的思考:

1、接收任务到任务的分发和协调 nimbus、supervisor、zookeeper

2、高容错性 各个组件都是无状态的,状态要自己去处理

3、消息 消息在流式框架的作用和可靠性处理,消息可靠处理的原理

4、事务消息 1、finishbatch 2、commit的强顺序性 3、事务性spout分为协调器和分发器,协调器把batch元数据写到zookeeper,分发器负责从zokeeper不断重试分派任务

第1章 storm基础

1.1 storm能做什么

1.2 storm特性

 1、编程模型简单

2、可扩展

进程、线程、任务

3、高可靠性

1、spout发出的消息可能会触发成千上万条消息,ACK组件可以跟踪以spout发出消息为树根的消息树,只有这棵消息树上的消息都处理完了,storm才认为这个消息已经被完全处理,如果没有完全处理或者超时(默认为30s)则会重发这个消息。

如果消息树被成功处理,则调用spout组件的ack;如果失败或者超时则调用fail。

   2、storm保证每个消息至少被处理一次,但是对于有些计算场合,严格要求每个消息只能被处理一次,storm0.7.0引入了事务性拓扑,解决了这个问题。

4、高容错性

storm的spout和blot都是无状态的,nimbus和supervisor的状态也都是持久化在zookeeper和磁盘上的,所以如果有东西挂了,可以很容器的启动。因为storm是无状态的,所以使用storm处理业务逻辑或者重启或者错误恢复时都需要自己处理中间状态,一般

    是通过mysql、hbase等存储中间状态。

5、支持多种编程语言

storm允许spout或者blot使用标准输入和标准输出来进行消息传递,传递的消息为单行的文本或者json编码的多行

storm支持多语言编程主要是同ShellBlot、ShellSpout和ShellProcess这些类实现的。

6、支持本地模式

一个进程里面通过多线程方式模拟了spout和blot

7、高效

用zeroMQ作为底层消息队列,保证消息能被快速处理

8、运维和部署简单

9、图形化监控

1.3其他流计算系统

1.4 应用模式

1、海量数据处理

主要是实时的流式处理

2、中间状态存储和查询

storm的各个组件可以认为都是无状态的,一方面无状态保证了storm的高效和高容错性,另一方面也导致了用户需要自己处理中间状态的存储和查询。用户需要通过自己处理中间状态进而处理业务上、错误恢复、重启、消息重发等问题。

3、数据增量更新

用户需要自己处理中间状态,那么中间状态的处理很可能成为瓶颈。那么对于数据的增量更新,可以通过批次的更新中间状态(如hbase或mysql),进而减轻hbase或者mysql的压力,当然如果部分处理单元处理失败,那么增量可能没有及时的更新到hbase或者

mysql上,那么会丢失这部分增量。尽可能做到崩溃时数据的快速恢复和误差可控。

4、结合概率算法实时计算复杂指标

用户需要自己处理中间状态,那么中间状态的处理很可能成为瓶颈。对于实时计算的指标,有时不需要完全精确,因此,可以利用一些概率算法来解决这种问题。

第2章 storm初体验

2.1 本地环境搭建

讲解了以本地模式运行storm-starter里面的wordcount里的例子

2.2 storm集群

2.2.1 storm集群组件

  主控节点:Numbus后台程序 负责接收任务,以及在storm集群内分发代码,分配任务给工作节点,并负责监控集群的运行状态。Nimbus的作用类似于hadoop里面JobTracker的角色。

工作节点:supervisor后台程序 从Nimbus接收任务,并启动或者停止任务的工作进程。每一个工作进程执行一个Topology的子集(不会同时执行两个),一个运行中的Topology由分布在不同节点上的多个工作进程组成。

NimBus和Supervisor是通过Zookeeper来协调的,都是无状态的,storm集群里面的所有状态要么存储在zookeeper上,要么存储在本地磁盘上,都是可以快速失败的,同时可以使用kill -9毙掉,然后再重启。

2.2.2安装storm集群

搭建zookeeper集群

安装storm依赖库

    ZeroMQ

   JAVA

    Python

    Unzip

下载并解压storm发行版本

修改storm.yaml配置文件

启动storm的各个后台进程

2.2.3向集群提交任务

  1、提交 Topology

2、停止 Topology

第3章 构建Topology

3.1 storm基本概念

MapReduce job最终会结束,而Topology会永远运行(除非你手动结束)

3.1.1 Topologies

Nimbus负责处理用户的提交Topology请求,负责接收请求和jar包的上传。

Topology的定义是一个Thrift服务,并且Nimbus就是一个Thrift服务,你可以提交由任何语言创建的Topology。

3.1.2 Streams

流,是没有边界的tuple序列,通过对tuple中的每个字段命名来定义stream。在默认情况下,tuple中的字段类型可以是integer、long、short、byte、string、double、float、boolean、byte array。也可以自定义类型(只要实现相对应的序列化器)。

每个流在定义的时候会被分配一个id,默认是default。

3.1.3 Spouts

Spout是Topology中的消息产生者,负责外部数据源读取并且向Topology里面发出消息。Spout可以是可靠的也可以是不可靠的,可靠的会重发消息,不可靠的一旦发出stream就不能重发了。

Spout 可以发射多个消息流tuple。

nextTuple 该方法不能阻塞,因为storm在同一个线程上面调用所有消息源spout的方法。

ack 和 fail ACKer组件会监控从spout发出的tuple为根的消息树,当这个树被完全处理之后,Acker会通知storm调用发出该消息树根的spout调用ack。如果失败或者超时则调用fail。storm只对可靠的Spout调用ack和fail。

3.1.4 Blots

消息处理逻辑封装在blots里面。blots可以做很多事情,过滤、聚合、查询数据库等。

Blots可以发出多个数据流。

Blots对消息的一般处理逻辑是:处理一个输入tuple,发射0或者多个tuple,然后调用ack通知storm自己已经处理过这个tuple了。storm提供了一个IBasicBlot会自动调用ACK。

3.1.5 Stream Groupings

Stream grouping就是用来定义一个stream应该如何分配数据给blots上面的多个tasks的。

1)shuffleGrouping(随机分组)

2)fieldsGrouping(按照字段分组,在这里即是同一个单词只能发送给一个Bolt)

3)allGrouping(广播发送,即每一个Tuple,每一个Bolt都会收到)

4)globalGrouping(全局分组,将Tuple分配到task id值最低的task里面)

5)noneGrouping(随机分派)

6)directGrouping(直接分组,指定Tuple与Bolt的对应发送关系)

7)Local or shuffle Grouping

8)customGrouping (自定义的Grouping)

3.1.6 Reliability

主要是消息的可靠性。

Storm会通过Acker追踪消息树,如果消息树成功调用spout的ack,失败或者超时调用spout的fail。用户通过OutputCollector来通知storm消息的产生和处理,emit通知storm一个新的tuple的产生,ack通知storm一个tuple处理完毕。

3.1.7 Tasks

Spout或者Blot实例

一个线程可能运行多个Task(循环调用)

3.1.8 workers

3.1.9 configuration

default.yaml>storm.yaml(自己的classpath里面定义的)>代码里配置的

第4章 Yopology的并行度

一个进程只属于一个Topology。一个线程同时只能为一个组件服务,也就是说要么为spoutyaome为blot服务。

task在整个topology运行周期内是不变的,但是executor是可以修改。

线程数<=任务数,默认情况下,storm的每个线程执行一个任务。

4.2配置并行度

default.yaml<storm.yaml<Topology的具体配置<内部组件的具体配置<外部组件的具体配置。

4.2.2配置Executors数

默认线程数为1

4.2.3 配置任务数

在Topology的整个生命周期里,component的任务总数总是相同的,但是component的Executor的数量时可以修改的。

4.4如何更新运行中的Topology的并行度

可以不用重启Topology,可以通过动态的增加或者减少工作进程数或者Executor的数量,叫做rebalancing。

有两种方法rebalance一个Topology:

1、使用web界面

2、使用命令行工具

storm balance ....

第5章 消息的可靠处理

5.2理解消息被完整处理

1、uple tree不再生长 2、tuple tree的任何消息被标识为“已处理”

当在一定时间内(默认为30s)满足上面两个条件才认为tuple tree被完整处理。

5.3消息的生命周期

一个消息只会由发送它的哪个Spout任务调用ack或者fail。如果系统中某个spout由多个任务执行,消息也只会由创建它的Spout任务来应答(ack或者fail),绝不会由其他的spout任务来应答。

5.4 可靠的相关API

锚定:为tuple tree中指定的节点增加一个新的节点,我们称之为锚定。锚定是在我们发送消息的同时进行的。

一个输出消息可以被锚定在一个或者对哦个输入消息上,这在做join或者聚合的时候是很有用的。一个被多重锚定的消息处理失败,会导致与之关联的多个Spout消息被重新发送。

多重锚定可能会破坏传统的树形结构,从而形成一个DAGs

storm用内存跟踪每个消息的处理情况,如果被处理的消息没有应答的话,内存迟早会被耗尽。

BasicBlot接口处理的消息会被自动锚定到输入消息中去,而且当execute执行完毕时,会自动应答输入消息。

很多时候,一个消息因为聚合或者join时需要延迟应答,那么这些特性不是IBasicBlot能解决的。

5.5 高效的实现tuple tree

tuple tree 由acker负责跟踪,默认acker的并行度为1,当系统中有大量的消息时,应当适当提高acker任务的并行度。

每个消息产生时,除了用户会给消息赋予一个id(默认为default)外,storm还为该消息产生一个64bit的随机值作为随机id,这些随机id是acker用来跟踪由Spout消息派生出来的tuple tree的。

档blot产生一个新的消息时,对应的tuple tree的根消息的随机id就被复制到这个消息中。每当消息应答的时候,就把消息的变化情况报告给跟踪这棵tuple tree的acker。

系统使用一种哈希算法根据spout消息的id确定由哪个acker跟踪消息派生出来的tuple tree。因为每个消息都知道与之对应的根消息的id,因此它知道应该与哪个acker通信。

acker保存了spout消息id到2个值的映射。第一个值就是spout的任务id(以便调用spout的ack和fail),第二个值是一个64bit的数字,我们称之为"ack val",它是树种所有消息的随机id的异或结果。当消息被创建或者应答的时候,就把消息的随机id和这个值做异或。 当acker发现一棵树的ack val的值为0时,就知道这棵树被完全处理了。因为消息的随机id是一个64bit的值,因此ack val在树处理完之前被置为0的概率非常小,至少需要50000000年才有机会发生一次错误。通过这种方式,storm只需要固定的内存(大约20字节) 就可以跟踪一棵树。这个算法是storm正确运行的核心,也是storm最大的突破。

当spout发送一个消息的时候,就通知对应的acker一个新的根消息产生了,这时acker会跟踪和产生一个新的tuple tree。当acker发现这棵树被完全处理了之后,就会通知对应的spout任务。

5.6选择合适的可靠性级别

acker任务是轻量级的,所以在拓扑中并不需要太多的acker存在,如果再UI中发现acker任务的吞吐量不够的话,可以加大acker的并行度。

如果不需要每个消息被完全的处理(允许一些消息被丢失),可以关闭消息处理机制,从而获得更好的性能。1:消息数会减半,因为不需要ack消息了。2:因为每个消息不需要存储对应的根消息的id,所以会减小内存。

有三种方法可以调整消息的可靠传输机制:

1、Config.TOPOLOGY.ACKERS设置为0,那么spout发送一个消息时,ack方法会被立刻调用。

2、发送一个消息时,不指定消息的id。

3、子消息不做锚定。因为子消息没有被锚定在任何的tuple tree中,因此他们的失败不会引起spout重新发送消息。

5.7 集群的各级容错

第6章 一致性事务

保证出错的消息只被处理一次

1、批次处理的能力,只有一批处理完毕才会提交

2、commit的强顺序性,只有前面的处理完毕后面的才会处理。

6.1 简单设计1:强顺序流

6.2 简单设计2: 强顺序batch流

6.3 CoordinateBlot的原理

6.4 Transactional Topology

分为process和commit两个阶段,process阶段可以并行处理多个batch,commit阶段则保证batch的强顺序性(前一个必须处理成功后一个才能处理)。

一个TransactionalTopology中只能有一个TransactionSpout,TransactionSpout需要实现协调器和分发器。

协调器:负责生成批次元数据,Storm把协调器生成的批次元数据跟txid一起保存在zookeeper。这样就确保了一旦发生故障,Storm可以利用分发器(Emitter)重新发送批次。

分发器:分发器从数据源读取数据并从数据流组发送数据。分发器应当问题能够为相同的事务id和事务元数据发送相同的批次(不断的从zk读取该事务的批次元数据,直到该批次数据被成功commit)。

TransactionAttempt包含两个值:1:transactionId 事务的唯一标识

2:attemptid 该batch的tuple每重发一次就加1,可以理解为replay-times

TransactionSpout只能有一个,负责将tuple分成一个个batch,而且保证同一个batch的transaction Id始终一样,还要维护attemptId

BatchBlot:对于每个tuple调用execute方法,当整个batch的tuple都被处理完之后就调用finishBatch方法。

Commiter:被标记成Commiter的BatchBlot,只能在commit时调用finishBatch。一个batch的commit阶段由storm保证只有在前一个batch成功提交之后才会执行,并且它会重试直到Topology里面的所有Blot在commit完成提交。

第7章 DRPC

DRPC通过DRPC Server来实现,Drpc Server的整体工作过程如下:

1、接收到一个RPC调用请求

2、发送请求到storm上的Topology

3、从storm上接收计算结果

4、将计算结果返回给客户端

DRPC的内部工作流程如下:

1、client向DRPC Server发送被调用执行的DRPC函数名称和参数

2、storm上的Topology通过DRPCSpout实现这一函数,从DRPC Server接收到函数调用流。(函数名称、参数、id)

3、DRPC Server会为每次函数调用生成唯一的id。

4、storm上运行的Topology开始计算结果,最好通过一个ReturnResults的Blot连接到DRPC Server,发送指定id的计算结果。

5、DRPC Server通过使用之前为每个函数调用生成的id,将结果关联到对应的发起调用的client,将计算结果返回给client。

7.3 LinearDRPCTopologyBuilder

7.4 本地模式DRPC

7.5 远程模式DRPC

7.6 一个复杂的例子

7.7 非线性DRPC拓扑

7.8LinearDRPCTopologyBuilder的工作过程

第8章 Trident的特性

Trident是基于storm进行实时流处理的高级抽象,提供了实时流的聚集、投影、过滤操作,从而大大简化了开发storm的工作量。另外Trident提供了原语处理针对数据库或其他持久化存储的有状态的、增量的更新操作。

Trident是按批处理数据的,同时提供了事务支持。

8.2 结合多个Trident任务

8.3 消费和生产Field

8.4 State 状态保存

然后是各个spout...

8.5 Trident Topology的执行过程

Trident Topology会被编译成高效的storm Topology。只有当数据需要进行重新分区时,如group by或者shuffle,才会通过网络传输tuples。

Trident为我们提供了一种开发storm的更便捷的方式,而且其对事务的支持也使得使用storm开发可靠的分布式应用成为可能。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: