您的位置:首页 > 运维架构 > 网站架构

flume 高可用性 高可靠性 agent source

2015-01-21 17:54 671 查看
--------没有整理过的,稍后进行整理

生词表

consumes  消耗,

delivered   交付,传递

passive    被动

encapsulate  封装  enˈkaps(y)əˌlāt

riˈtrēvəl   retrieval  回复

ensures   保证 

temporarily  暂时的 

causing  造成

exponentially  指数型的增

 

An Event is a unit of data that flows through a Flume agent. The Event flows from Source to Channel to Sink,

 and is represented by an implementation of the Event interface. An Event carries a payload (byte array)

 that is accompanied by an optional set of headers (string attributes).

 A Flume agent is a process (JVM) that hosts the components that allow Events to flow from an external source to a external destination

 

 

 翻译:Flume agent(客户端)是把事件作为最小的数据单位,一个事件流从source(源端),传递到channel(通道),在到达sink(目标端),是基于事件接口的实现完成的,一个

 事件携带者必要的全部信息(以字节数据方式传递),最终被编译成可选的数据集头文件(字符串),Flume 的agent(客户端)是执行在JVM容器中的组建,同时也允许事件从外部的soure(源端)

 到外部的目标端.

 

 ------------------------------------------------------------------------------------------------------------------------

A Source consumes Events having a specific format, and those Events are delivered to the Source by an external

source like a web server. For example, an AvroSource can be used to receive Avro Events from clients or from other

 Flume agents in the flow. When a Source receives an Event, it stores it into one or more Channels.

  The Channel is a passive store that holds the Event until that Event is consumed by a Sink. One type of Channel

  available in Flume is the FileChannel which uses the local filesystem as its backing store. A Sink is responsible

  for removing an Event from the Channel and putting it into an external repository like HDFS (in the case of an HDFSEventSink)

   or forwarding it to the Source at the next hop of the flow. The Source and Sink within the given agent run asynchronously with the Events staged in the Channel.

  

  翻译:在源端所使用的事件有指定的格式,这些事件的产生是用外部的源提供或产生的如,一个web容器 ,举个例子来说 ,一个AvroSource类型的源能够用于 接受 Avro事件 ,可以

  是其他的agent上发出的流,当一个 源接受到 一个事件 ,就会将相关的事件信息存放到 一个 或者获得通道中 ,通道 是被动是的存储 ,被通道接收到的事件

  流可以被 sink 端进行消费

  ,在 通道 channel 中 文件通道是很有效的 ,将事件流信息存放到本地的文件系统作为备用的存储 ,一个 sink 组件也可以相应从 通道中移除一个事件 ,可以将流的信息存放到像分布式存储

  文件系统 ,HDFS 中(前提是将sink 配置只想HDFS,类型为HDFSEventSink),也可以将目标指向下一个流的源。对于源和目标(source and sink),通过异步的方式运行于管道上。

  -----------------------------------------------------------------------------------------------------------------------------

 

  Reliability

An Event is staged in a Flume agent’s Channel. Then it’s the Sink‘s responsibility to deliver the Event to the next agent or

 terminal repository (like HDFS) in the flow. The Sink removes an Event from the Channel only after the Event is stored into the Channel

 of the next agent or stored in the terminal repository. This is how the single-hop message delivery semantics in

  Flume provide end-to-end reliability of the flow. Flume uses a transactional approach to guarantee the reliable

   delivery of the Events. The Sources and Sinks encapsulate the storage/retrieval of the Events in a Transaction

    provided by the Channel. This ensures that the set of Events are reliably passed from point to point in the flow.

     In the case of a multi-hop flow, the Sink from the previous hop and the Source of the next hop both have their

     Transactions open to ensure that the Event data is safely stored in the Channel of the next hop.

 在一个agent(客户端)的通道内,事件在通道内承载运行对于目标(sink)响应被传送到下一个agent(客户端或代理)或者最终存储在向HDFS 上。只有当事件被存放到下一个agent 的通道或是

 最终存储到最终的资源库中 ,sink 的事件才被从通道内移除。这是为什么flume 能够保证端到端的传输过程高可靠性。完成的传递事件流。 Flume 使用传动的高可靠的传递事件。

 

 Flume 使用传统的方式,通过通道提供事务一致性的,并保证了source 和sink信息传递的高高可靠性。也确保了事件的集合数据从一个数据节点传到下一数据节点的可靠性传递

 ,事件的数据通过通道的方式从一端传到另一端,从前一个agent 的sink 到下一个agent 的source ,都在一个事务中,确保了可靠性。

 

 大白话的翻译: 在一个agent 上能够保证事务的一致性,通过通道实现的,当一个事件流被消费的时候或是移除(从sink ),在发一一个事件给channel ,这个时候才把事件从对应的channel 中删除

 

 对于从一个agent sink 我两一个 agent 源的情况,事务的一致性也是通过channel 来实现的。 从分保证了高可靠性

 

 ---------------------------------------------------------------------

 

 Getting the source  如何获取源代码

Check-out the code using Git. Click here for the git repository root.

点击链接获得git资源库的根目录,使用git 进行下载

The Flume 1.x development happens under the branch “trunk” so this command line can be used:

git clone https://git-wip-us.apache.org/repos/asf/flume.git
如果使用命令行的方式获取1.x的trunk下的开发版本,请执行命令 git clone https://git-wip-us.apache.org/repos/asf/flume.git
 ---------------------------------------------------------------------  ---------------------------------------------------------------------

Compile/test Flume  编译和测试 flume

 The Flume build is mavenized. You can compile Flume using the standard Maven commands:

因 flume是 maven项目 ,你可以编译 flume 使用标准的maven 命令

Compile only: mvn clean compile

仅仅编译 :  mvn clean compile

Compile and run unit tests: mvn clean test

编译并且运行 测试测试 :  mvn clean test

Run individual test(s): mvn clean test -Dtest=<Test1>,<Test2>,... -DfailIfNoTests=false

运行私人 的测试用例 :   mvn clean test -Dtest=<Test1>,<Test2>,... -DfailIfNoTests=false

Create tarball package: mvn clean install

创建 压缩包 :  执行 mvn clean install

Create tarball package (skip unit tests): mvn clean install -DskipTests

如果想跳过 测试但愿 执行 :  mvn clean install  -DskipTests

Please note that Flume builds requires that the Google Protocol Buffers compiler be in the path. You can download and insta
4000
ll it by following the instructions here.

记住 flume 的 构建是 必须在google 的protocol buffers的 jar 放到项目的环境变量中 ,你可以下载和初始化 按照下面的文档 。

-----------------------------------------------------------------------------

Developing custom components

Client

The client operates at the point of origin of events and delivers them to a Flume agent.

Clients typically operate in the process space of the application they are consuming data from.

Flume currently supports Avro, log4j, syslog, and Http POST (with a JSON body) as ways to transfer data from a external source.

 Additionally, there’s an ExecSource that can consume the output of a local process as input to Flume.

It’s quite possible to have a use case where these existing options are not sufficient.

 In this case you can build a custom mechanism to send data to Flume. There are two ways of achieving this.

  The first option is to create a custom client that communicates with one of Flume’s existing Sources like AvroSource or SyslogTcpSource.

  Here the client should convert its data into messages understood by these Flume Sources.

  The other option is to write a custom Flume Source that directly talks with your existing client application using some IPC or RPC protocol,

   and then converts the client data into Flume Events to be sent downstream.

    Note that all events stored within the Channel of a Flume agent must exist as Flume Events.

开发 客户端组件

客户端

对于原始的 事件流的操作和传输 给 flume 的agent ,客户端的典型的操作和传递消费的数据 是来自应用执行的时候 ,在当前的版本中 ,Flume支持的 AVRO,log4j,syslog,

和 HttpPOST(JSON bOGY)HTTP json 格式的请求 ,通用方式可以传输外部的数据

另外 本地的程序产生的输出也可以 作为flume 的的数据 。

如果在实际的案例中使用存在的选项可能是不完全能够满足需求的 ,在这种情况下你可以使用客户端机制 ,去发送数据到flume中,有两种方式可以实现这个功能

一个选则是 创建客户化的客户端 ,是哟功能以及功能存在 想 AVROSOURCE HUOSHI SYSLOG TCPsource 。 这样 客户端会转换为各自的数据 并发送flume 能读懂的消息或是接受 SOURCE

另一种选择是先客户端 flume source ,直接的和已经存在的客户端惊醒沟通,使用ipc 或是 rpc protocol这样也能够 吧clientdata 放到flume 的流中发送 。

注意 不是所有的事件在flume 的agent的通道中存储 ,必须是存在的flume 的事件流 。

----------------------------------------------------------------------------------------------------

Client SDK  客户端的开发工具

Though Flume contains a number of built-in mechanisms (i.e. Sources) to ingest data,

often one wants the ability to communicate with Flume directly from a custom application.

The Flume Client SDK is a library that enables applications to connect to Flume and send data into Flume’s data flow over RPC.

通常想要一个客户端的应用程序与flume进行数据通信的 ,尽管flume包含了通信的机制,通常情况下 ,flume 的client 端的开发工具以jarlib 的确保应用正常连接flume 和通过 RPC的方式

将数据传递到flume 的数据流中

RPC client interface

An implementation of Flume’s RpcClient interface encapsulates the RPC mechanism supported by Flume.

 The user’s application can simply call the Flume Client SDK’s append(Event) or appendBatch(List<Event>) to send data

  and not worry about the underlying message exchange details. The user can provide the required Event arg by either directly

   implementing the Event interface, by using a convenience implementation such as the SimpleEvent class, or

   by using EventBuilder‘s overloaded withBody() static helper methods.

  

   RPC 的客户端借口

  

 flume 支持通过实现Rpc客户端的接口的方式在完成RPC的沟通机制 ,用户可以使用简单的电泳flume 的sdk 的客户端的方式,把时间或是批量的事件集发送数据到 flume中

也不需要了解底层的转换的详细信息 ,用户可以通过向simpleEvet这样实现类来很方便的实现事件借口,通过时间加载器 的从在的withbody的方法 ,

 提供请求时间的数组通过通过实现事件的接口 。调用静态的帮助的方法 。冲在withbody()方法。

 

 ----------------------------------------------------------------------------------------------------------------------

 RPC clients - Avro and Thrift

As of Flume 1.4.0, Avro is the default RPC protocol.

The NettyAvroRpcClient and ThriftRpcClient implement the RpcClient interface.

 The client needs to create this object with the host and port of the target Flume agent,

 and can then use the RpcClient to send data into the agent.

 The following example shows how to use the Flume Client SDK API within a user’s data-generating application:

 

   RPC客户端--Avro和Thrift

  

   像Flume1.4.0 的版本 ,默认的Avro 是默认的RPC 协议 。,

  

   nettyavrorpclient 和 thriftrpc客户端都实现了 rpcclient 借口哦iu,

   这中类型的客户端都需要 创建带有主机和端口(目标地址)的对flumeagent 的对象

   就可以 使用Rpc客户端发送数据到 agent 中 ,接下来的例子讲像你闪失如何 让用户的应用生成数据 并调用sdkapi

  

   ----------------------------------------------------------------------------------------------------------

  

   代码就不翻译了代价都能看懂

  

   --------------------------------------------------------------------------------------------------------

  The remote Flume agent needs to have an AvroSource (or a ThriftSource if you are using a Thrift client)

   listening on some port. Below is an example Flume agent configuration that’s waiting for a connection from MyApp:

  

   远程的flume 的agent 需要有一个 avrosource (或是使用thriftclient),监听一些端口 ,下面是 flume agent 配置 ,waiting 从myapp 进行 连接的例子

  

   -------------------------------------------------------------------------------

  

   import org.apache.flume.Event;

import org.apache.flume.EventDeliveryException;

import org.apache.flume.api.RpcClient;

import org.apache.flume.api.RpcClientFactory;

import org.apache.flume.event.EventBuilder;

import java.nio.charset.Charset;

public class MyApp {

  public static void main(String[] args) {

    MyRpcClientFacade client = new MyRpcClientFacade();

    // Initialize client with the remote Flume agent's host and port

    client.init("host.example.org", 41414);

    // Send 10 events to the remote Flume agent. That agent should be

    // configured to listen with an AvroSource.

    String sampleData = "Hello Flume!";

    for (int i = 0; i < 10; i++) {

      client.sendDataToFlume(sampleData);

    }

    client.cleanUp();

  }

}

class MyRpcClientFacade {

  private RpcClient client;

  private String hostname;

  private int port;

  public void init(String hostname, int port) {

    // Setup the RPC connection

    建立 rpc 连接

    this.hostname = hostname;

    this.port = port;

    this.client = RpcClientFactory.getDefaultInstance(hostname, port);

    // Use the following method to create a thrift client (instead of the above line):

    使用下面的方法去创建一个thrift的客户端

    // this.client = RpcClientFactory.getThriftInstance(hostname, port);

  }

  public void sendDataToFlume(String data) {

    // Create a Flume Event object that encapsulates the sample data

    创建一个集成了例子数据段额事件对象

    Event event = EventBuilder.withBody(data, Charset.forName("UTF-8"));

    // Send the event

    try {

      client.append(event);

    } catch (EventDeliveryException e) {

      // clean up and recreate the client

      如果失败了,重新创建客户端

      client.close();

      client = null;

      client = RpcClientFactory.getDefaultInstance(hostname, port);

      // Use the following method to create a thrift client (instead of the above line):

      // this.client = RpcClientFactory.getThriftInstance(hostname, port);

    }

  }

  public void cleanUp() {

    // Close the RPC connection

    client.close();

  }

}

The remote Flume agent needs to have an AvroSource (or a ThriftSource if you are using a Thrift client) listening on some port.

 Below is an example Flume agent configuration that’s waiting for a connection from MyApp:

 调用远程的flume 的agent 需要有一个Avro source 或是一个 ThriftSOURCE ,要不停的对端口进行监听

 像下面的flume 的实例的agent 配置,能够连接到我app

a1.channels = c1

a1.sources = r1

a1.sinks = k1

a1.channels.c1.type = memory

a1.sources.r1.channels = c1

a1.sources.r1.type = avro

# For using a thrift source set the following instead of the above line.

# a1.source.r1.type = thrift

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 41414

a1.sinks.k1.channel = c1

a1.sinks.k1.type = logger

For more flexibility, the default Flume client implementations (NettyAvroRpcClient and ThriftRpcClient) can be configured with these properties:

为了提高适应性 ,默认的flume 客户端时间 被配置在属性文件中

client.type = default (for avro) or thrift (for thrift)

hosts = h1                           # default client accepts only 1 host

                                     # (additional hosts will be ignored)

hosts.h1 = host1.example.org:41414   # host and port must both be specified

                                     # (neither has a default)

batch-size = 100                     # Must be >=1 (default: 100)

connect-timeout = 20000              # Must be >=1000 (default: 20000)

request-timeout = 20000              # Must be >=1000 (default: 20000)

-----------------------------------------------------------------

Failover Client

This class wraps the default Avro RPC client to provide failover handling capability to clients. This takes a whitespace-separated list of <host>:<port> representing the Flume agents that make-up a failover group. The Failover RPC Client currently does not
support thrift. If there’s a communication error with the currently selected host (i.e. agent) agent, then the failover client automatically fails-over to the next host in the list. For example:

// Setup properties for the failover

Properties props = new Properties();

props.put("client.type", "default_failover");

// List of hosts (space-separated list of user-chosen host aliases)

props.put("hosts", "h1 h2 h3");

// host/port pair for each host alias

String host1 = "host1.example.org:41414";

String host2 = "host2.example.org:41414";

String host3 = "host3.example.org:41414";

props.put("hosts.h1", host1);

props.put("hosts.h2", host2);

props.put("hosts.h3", host3);

// create the client with failover properties

RpcClient client = RpcClientFactory.getInstance(props);

For more flexibility, the failover Flume client implementation (FailoverRpcClient) can be configured with these properties:

client.type = default_failover

hosts = h1 h2 h3                     # at least one is required, but 2 or

                                     # more makes better sense

hosts.h1 = host1.example.org:41414

hosts.h2 = host2.example.org:41414

hosts.h3 = host3.example.org:41414

max-attempts = 3                     # Must be >=0 (default: number of hosts

                                     # specified, 3 in this case). A '0'

                                     # value doesn't make much sense because

                                     # it will just cause an append call to

                                     # immmediately fail. A '1' value means

                                     # that the failover client will try only

                                     # once to send the Event, and if it

                                     # fails then there will be no failover

                                     # to a second client, so this value

                                     # causes the failover client to

                                     # degenerate into just a default client.

                                     # It makes sense to set this value to at

                                     # least the number of hosts that you

                                     # specified.

batch-size = 100                     # Must be >=1 (default: 100)

connect-timeout = 20000              # Must be >=1000 (default: 20000)

request-timeout = 20000              # Must be >=1000 (default: 20000)

---------------------------------------------------------------------------故障转移的配置----------------

Failover Client  故障转移客户端

This class wraps the default Avro RPC client to provide failover handling capability to clients.

This takes a whitespace-separated list of <host>:<port> representing the Flume agents that make-up a

 failover group. The Failover RPC Client currently does not support thrift.

 If there’s a communication error with the currently selected host (i.e. agent) agent,

 then the failover client automatically fails-over to the next host in the list. For example:

这类事包装了默认的avro rpc 的客户端程序,提供了对client端故障转移的处理

这个类用空白行在分割一个list列表  去代替flume的客户端建立一个故障转移组 ,出现故障的客户端不支持thrit 的方式,如果当前的版本出现

故障类的RPC,请选择主机 HOST ,这样出现故障的机器自动的加入了下一步的清单中

// Setup properties for the failover

Properties props = new Properties();

props.put("client.type", "default_failover");

// List of hosts (space-separated list of user-chosen host aliases)

props.put("hosts", "h1 h2 h3");

// host/port pair for each host alias

String host1 = "host1.example.org:41414";

String host2 = "host2.example.org:41414";

String host3 = "host3.example.org:41414";

adc8
props.put("hosts.h1", host1);

props.put("hosts.h2", host2);

props.put("hosts.h3", host3);

// create the client with failover properties

RpcClient client = RpcClientFactory.getInstance(props);

For more flexibility, the failover Flume client implementation (FailoverRpcClient) can be configured with these properties:

client.type = default_failover

hosts = h1 h2 h3                     # at least one is required, but 2 or

                                     # more makes better sense

hosts.h1 = host1.example.org:41414

hosts.h2 = host2.example.org:41414

hosts.h3 = host3.example.org:41414

max-attempts = 3                     # Must be >=0 (default: number of hosts

                                     # specified, 3 in this case). A '0'

                                     # value doesn't make much sense because

                                     # it will just cause an append call to

                                     # immmediately fail. A '1' value means

                                     # that the failover client will try only

                                     # once to send the Event, and if it

                                     # fails then there will be no failover

                                     # to a second client, so this value

                                     # causes the failover client to

                                     # degenerate into just a default client.

                                     # It makes sense to set this value to at

                                     # least the number of hosts that you

                                     # specified.

batch-size = 100                     # Must be >=1 (default: 100)

connect-timeout = 20000              # Must be >=1000 (default: 20000)

request-timeout = 20000              # Must be >=1000 (default: 20000)

-----------------------------------------------------------------------------------------------------------------

LoadBalancing RPC client  负载均衡的RPC 客户端的使用

The Flume Client SDK also supports an RpcClient which load-balances among multiple hosts.

 This type of client takes a whitespace-separated list of <host>:<port> representing the Flume agents that make-up a load-balancing group.

 This client can be configured with a load balancing strategy that either randomly selects

  one of the configured hosts, or selects a host in a round-robin fashion.

  You can also specify your own custom class that implements the LoadBalancingRpcClient$HostSelector interface so that a custom selection order is used.

  In that case, the FQCN of the custom class needs to be specified as the value of the host-selector property.

   The LoadBalancing RPC Client currently does not support thrift.

If backoff is enabled then the client will temporarily blacklist hosts that fail,

 causing them to be excluded from being selected as a failover host until a given timeout.

  When the timeout elapses, if the host is still unresponsive then this is considered a sequential failure,

  and the timeout is increased exponentially to avoid potentially getting stuck in long waits on unresponsive hosts.

如果 回退被启用了 ,客户端将暂时的放到到主机失败的链中将一直放到 返回timeout 的时候 ,否则将是作为故障转移的机器被选择

这个主机仍然是不可相应的,因为考虑到一个序列失败的问题 ,超时可以之属性的增长了为了避免对一个不可相应的主机的长时间潜在的无效的等待

The maximum backoff time can be configured by setting maxBackoff (in milliseconds).

最大的回相应的时间可以配置到 文件中

The maxBackoff default is 30 seconds (specified in the OrderSelector class that’s the superclass of both load balancing strategies).

默认的是30秒 ,可以指定提供其的响应的时间 通过超类中的负载存储策略

The backoff timeout will increase exponentially with each sequential failure up to the maximum possible backoff timeout.

The maximum possible backoff is limited to 65536 seconds (about 18.2 hours). For example:

flume 的sdk 也同样支持负载均衡在多个主机列表中 ,这种类型的client 需要 用空白行分开的列表来实现,

这样的client 能够通过配置的方式实现,夹在不同的负载存储,也可以支持随机的选择一个节点的配置主机列表,或者选择一个主机在整个国政中循环的调用一个host

你可以指定你自己的客户化的类来实现负载均衡器的借口,那样客户化的选择其别应用 ,

在这种情况下 ,fqcn 客户化的泪,需要填写 host选择其的属性文件 ,负载均很的RPC客户端 不支持 thrift 的方式

// Setup properties for the load balancing

Properties props = new Properties();

props.put("client.type", "default_loadbalance");

// List of hosts (space-separated list of user-chosen host aliases)

props.put("hosts", "h1 h2 h3");

// host/port pair for each host alias

String host1 = "host1.example.org:41414";

String host2 = "host2.example.org:41414";

String host3 = "host3.example.org:41414";

props.put("hosts.h1", host1);

props.put("hosts.h2", host2);

props.put("hosts.h3", host3);

props.put("host-selector", "random"); // For random host selection

// props.put("host-selector", "round_robin"); // For round-robin host

//                                            // selection

props.put("backoff", "true"); // Disabled by default.

props.put("maxBackoff", "10000"); // Defaults 0, which effectively

                                  // becomes 30000 ms

// Create the client with load balancing properties

RpcClient client = RpcClientFactory.getInstance(props);

For more flexibility, the load-balancing Flume client implementation (LoadBalancingRpcClient) can be configured with these properties:

client.type = default_loadbalance

hosts = h1 h2 h3                     # At least 2 hosts are required

hosts.h1 = host1.example.org:41414

hosts.h2 = host2.example.org:41414

hosts.h3 = host3.example.org:41414

backoff = false                      # Specifies whether the client should

                                     # back-off from (i.e. temporarily

                                     # blacklist) a failed host

                                     # (default: false).

maxBackoff = 0                       # Max timeout in millis that a will

                                     # remain inactive due to a previous

                                     # failure with that host (default: 0,

                                     # which effectively becomes 30000)

host-selector = round_robin          # The host selection strategy used

                                     # when load-balancing among hosts

                                     # (default: round_robin).

                                     # Other values are include "random"

                                     # or the FQCN of a custom class

                                     # that implements

                                     # LoadBalancingRpcClient$HostSelector

batch-size = 100                     # Must be >=1 (default: 100)

connect-timeout = 20000              # Must be >=1000 (default: 20000)

request-timeout = 20000              # Must be >=1000 (default: 20000)

--------------------------------------------------------------------------------
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息