Flume采用zookeeper管理配置
2015-10-27 11:05
337 查看
Flume支持通过zookeeper来管理Agent的配置,但是这是一个实验性的功能。配置文件必须先上传到zookeeper中。以下Agent在Zookeeper节点树的结构:
处理配置文件的类:
org.apache.flume.node.PollingZooKeeperConfigurationProvider : 如果zookeeper指定的路径有变更,就从Zookeeper重新获取配置文件。
org.apache.flume.node.StaticZooKeeperConfigurationProvider : 启动Flume后,不会重新加载配置文件,即使Zookeeper的配置文件有变更。
org.apache.flume.agent.embedded.MemoryConfigurationProvider : 从存储中读取配置文件。传入数据格式是Map。
org.apache.flume.node.PollingPropertiesFileConfigurationProvider : 定时冲硬盘读取配置文件。
org.apache.flume.node.AbstractZooKeeperConfigurationProvider创建Zookeeper客户端:
Flume采用Curator作为zookeeper的客户端,Curator是Netflix公司开源的一个Zookeeper客户端,与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量。
Curator的maven配置:
Zookeeper还有一个原生态的客户端,maven配置:
使用原生态的客户端,上传配置文件flume-zookeeper.properties到zookeeper集群:
flume-zookeeper.properties配置文件内容:
avro获取数据,通过Memory Channel写入文件中。
配置文件上传到Zookeeper后,通过如下命令启动Flume:
zookeeper中的配置如果有更新,Flume会通过PollingZooKeeperConfigurationProvider类的refreshConfiguration方法重新加载配置:
- /flume |- /a1 [Agent配置文件] |- /a2 [Agent配置文件]
处理配置文件的类:
org.apache.flume.node.PollingZooKeeperConfigurationProvider : 如果zookeeper指定的路径有变更,就从Zookeeper重新获取配置文件。
org.apache.flume.node.StaticZooKeeperConfigurationProvider : 启动Flume后,不会重新加载配置文件,即使Zookeeper的配置文件有变更。
org.apache.flume.agent.embedded.MemoryConfigurationProvider : 从存储中读取配置文件。传入数据格式是Map。
org.apache.flume.node.PollingPropertiesFileConfigurationProvider : 定时冲硬盘读取配置文件。
org.apache.flume.node.AbstractZooKeeperConfigurationProvider创建Zookeeper客户端:
protected CuratorFramework createClient() { return CuratorFrameworkFactory.newClient(zkConnString, new ExponentialBackoffRetry(1000, 1)); }
Flume采用Curator作为zookeeper的客户端,Curator是Netflix公司开源的一个Zookeeper客户端,与Zookeeper提供的原生客户端相比,Curator的抽象层次更高,简化了Zookeeper客户端的开发量。
Curator的maven配置:
<dependency> <groupId>org.apache.curator</groupId> <artifactId>apache-curator</artifactId> <version>2.9.0</version> </dependency>
Zookeeper还有一个原生态的客户端,maven配置:
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> </dependency>
使用原生态的客户端,上传配置文件flume-zookeeper.properties到zookeeper集群:
@Test public void uploadFileToZK() throws KeeperException, InterruptedException { String propFilePath = "D:\\flume-zookeeper.properties"; ZooKeeper zk = null; try { zk = new ZooKeeper("10.0.1.85:2181,10.0.1.86:2181,10.0.1.87:2181", 300000, new Watcher() { // 监控所有被触发的事件 public void process(WatchedEvent event) { System.out.println("已经触发了" + event.getType() + "事件!"); } }); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } if (zk.exists("/flume", true) == null) { zk.create("/flume", null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } InputStream is = null; ByteArrayOutputStream bytestream = null; byte[] data = null; try { is = new FileInputStream(propFilePath); bytestream = new ByteArrayOutputStream(); int ch; while ((ch = is.read()) != -1) { bytestream.write(ch); } data = bytestream.toByteArray(); System.out.println(new String(data)); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { try { bytestream.close(); is.close(); } catch (IOException e) { e.printStackTrace(); } } // 创建一个目录节点 Stat stat = zk.exists("/flume/a1", true); if (stat == null) { zk.create("/flume/a1", data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } else { zk.delete("/flume/a1", stat.getVersion()); zk.create("/flume/a1", data, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } @Test public void get() throws KeeperException, InterruptedException { ZooKeeper zk = null; try { zk = new ZooKeeper("10.0.1.85:2181,10.0.1.86:2181,10.0.1.87:2181", 300000, new Watcher() { // 监控所有被触发的事件 public void process(WatchedEvent event) { System.out.println("已经触发了" + event.getType() + "事件!"); } }); } catch (IOException e) { e.printStackTrace(); } System.out.println(new String(zk.getData("/flume/a1", true, null))); }
flume-zookeeper.properties配置文件内容:
a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 a1.sources.r1.channels = c1 a1.sources.r1.type = AVRO a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 41414 a1.sinks.k1.channel = c1 a1.sinks.k1.type = file_roll a1.sinks.k1.sink.directory = /logs/ a1.sinks.k1.sink.rollInterval = 0 a1.channels = c1 a1.sources = r1 a1.sinks = k1
avro获取数据,通过Memory Channel写入文件中。
配置文件上传到Zookeeper后,通过如下命令启动Flume:
$ bin/flume-ng agent –conf conf -z 10.0.1.85:2181,10.0.1.86:2181,10.0.1.87:2181 -p /flume –name a1 -Dflume.root.logger=INFO,console
参数名称 | 默认值 | 描述 |
---|---|---|
z | – | Zookeeper的连接字符串hostname:port列表通过逗号分隔。 |
p | /flume | Agent配置文件的根路径。 |
private void refreshConfiguration() throws IOException { LOGGER.info("Refreshing configuration from ZooKeeper"); byte[] data = null; ChildData childData = agentNodeCache.getCurrentData(); if (childData != null) { data = childData.getData(); } flumeConfiguration = configFromBytes(data); eventBus.post(getConfiguration()); }
相关文章推荐
- asp.net操作Word实现批量替换
- android.support
- C++中的extern "C" {}
- 腾讯设计导航
- DIY Ruby CPU 分析 Part II
- arcgis-viewer-flex-3.6 入门部署
- 上海派出所地图接口演示示例
- 网站如何数据备份
- 可调用对象,列表和字典的原处修改特征16
- Android的NDK开发
- 面试sohu
- DataSet与DataType
- java求解一元一次方程组
- IOS --- 日期时间格式 更改
- 税友集团java面试题
- Ubuntu安装QQ
- DIY Ruby CPU 分析 Part II
- 网站文件备份
- [转]在Eclipse中使用JUnit4进行单元测试(初级篇)
- Android动态注册监听广播