您的位置:首页 > 理论基础 > 计算机网络

flume之Http Source

2016-07-06 14:00 519 查看
摘要: flume之Http Source

一、介绍

flume自带的Http Source可以通过Http Post接收事件。

场景:对于有些应用程序环境,它可能不能部署Flume SDK及其依赖项,或客户端代码倾向于通过HTTP而不是Flume的PRC发送数据的情况,此时HTTP SOURCE可以用来将数据接收到Flume中。

从客户端的角度看,HTTP SOURCE表现的像web服务器一样能接收flume事件

二、参数

配置参数默认值描述
typehttp (org.apache.fluem.source.httpSource)
bind绑定的IP地址或主机名
port绑定的端口号
enableSSLfalse
keystore使用的keystore文件的路径
keystorePassword能够进入keystore的密码
handlerJSONHandlerHTTP SOURCE使用的处理程序类
handler.*传给处理程序类的任何参数 可以 通过使用此参数(*)配置传入
为了安全传输,http source也支持SSL,SSL支持的相关例子可以参见我的关于flume之Avro Source博客

Flume 事件使用一个可插拔的“handler”程序来实现转换,它必须实现的HTTPSourceHandler接口。此处理程序需要一个HttpServletRequest和返回一个flume 事件列表。默认是:JSONHandler。
例如:xxx.handler=org.pq.demo.HTTPSourceXmlHandler

自定义的handler如果想传入参数,可以使用handler.*配置
如:xxx.handler.myparam=zhangsan

如果配置中没有指定处理程序,HTTP SOURCE将使用与Flume绑定的处理程序,即:JSONHandler,它能处理JSON格式的事件。每个事件可以包含包装为数组的几个事件,尽管Source写入的管道可能有限制的事务能力。
处理程序接受UTF-8,UTF-16,UTF-32编码的JSON格式的数据,并且将它转换成一个列表的事件。
格式:

[ { "headers":{"":"","":""
},
"body":"the first event"
},
{ "headers":{"":"","":""
},
"body":"the second event"
}

]

三、应用

1.http source简单例子

1)在conf文件下添加http_test.conf文件

a1.sources=r1
a1.sinks=k1
a1.channels=c1

a1.sources.r1.type=http
a1.sources.r1.bind=192.168.1.102
a1.sources.r1.port=50000
a1.soces.r1.channels=c1

a1.sinks.k1.type=logger
a1.sinks.k1.channel=c1

a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100

2)启动服务:

$ bin/flume-ng agent -c conf -f conf/http_test.conf -n a1 -Dflume.root.logger=INFO,console

3) 测试:

$ curl -X POST -d'[{"headers":{"h1":"v1","h2":"v2"},"body":"hello body"}]' http://192.168.1.102:50000
4) 服务器端结果

......
2015-11-30 11:34:52,451 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: }
2015-11-30 11:34:52,452 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: }
2015-11-30 11:45:14,951 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{h1=v1, h2=v2} body: 68 65 6C 6C 6F 20 62 6F 64 79 hello body }

2.http source handler自定义例子

假定xml请求格式,期望格式如下:

<events>
<event>
<headers><header1>value1</header1></headers>
<body>test</body>
</event>
<event>
<headers><header1>value1</header1></headers>
<body>test2</body>
</event>
</events>
现在要求flume http source可以处理这种请求的xml格式

操作步骤如下:

1)建立maven工程,pom.xml文件如下

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion>
<groupId>org.pq</groupId>
<artifactId>flume-demo</artifactId>
<packaging>jar</packaging>
<version>1.0</version>
<name>flume-demo Maven jar</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.8.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.6.0</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<finalName>flume-demo</finalName>
</build>
</project>

2)开发代码 ,自定义handler类

package org.pq.flumeDemo.sources;
import com.google.common.base.Preconditions;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.http.HTTPBadRequestException;
import org.apache.flume.source.http.HTTPSourceHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import org.xml.sax.SAXException;

import javax.servlet.http.HttpServletRequest;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class HTTPSourceXMLHandler implements HTTPSourceHandler {
private final String ROOT = "events";
private final String EVENT_TAG = "event";
private final String HEADERS_TAG = "headers";
private final String BODY_TAG = "body";

private final String CONF_INSERT_TIMESTAMP = "insertTimestamp";
private final String TIMESTAMP_HEADER = "timestamp";
private final DocumentBuilderFactory documentBuilderFactory
= DocumentBuilderFactory.newInstance();

// Document builders are not thread-safe.
// So make sure we have one for each thread.
private final ThreadLocal<DocumentBuilder> docBuilder
= new ThreadLocal<DocumentBuilder>();

private boolean insertTimestamp;
private static final Logger LOG = LoggerFactory.getLogger(HTTPSourceXMLHandler.class);

public List<Event> getEvents(HttpServletRequest httpServletRequest) throws HTTPBadRequestException, Exception {
if (docBuilder.get() == null) {
docBuilder.set(documentBuilderFactory.newDocumentBuilder());
}
Document doc;
final List<Event> events;
try {
doc = docBuilder.get().parse(httpServletRequest.getInputStream());
Element root = doc.getDocumentElement();

root.normalize();
// Verify that the root element is "events"
Preconditions.checkState(
ROOT.equalsIgnoreCase(root.getTagName()));

NodeList nodes = root.getElementsByTagName(EVENT_TAG);
LOG.info("get nodes={}",nodes);

int eventCount = nodes.getLength();
events = new ArrayList<Event>(eventCount);
for (int i = 0; i < eventCount; i++) {
Element event = (Element) nodes.item(i);
// Get all headers. If there are multiple header sections,
// combine them.
NodeList headerNodes
= event.getElementsByTagName(HEADERS_TAG);
Map<String, String> eventHeaders
= new HashMap<String, String>();
for (int j = 0; j < headerNodes.getLength(); j++) {
Node headerNode = headerNodes.item(j);
NodeList headers = headerNode.getChildNodes();
for (int k = 0; k < headers.getLength(); k++) {
Node header = headers.item(k);

// Read only element nodes
if (header.getNodeType() != Node.ELEMENT_NODE) {
continue;
}
// Make sure a header is inserted only once,
// else the event is malformed
Preconditions.checkState(
!eventHeaders.containsKey(header.getNodeName()),
"Header expected only once " + header.getNodeName());
eventHeaders.put(
header.getNodeName(), header.getTextContent());
}
}
Node body = event.getElementsByTagName(BODY_TAG).item(0);
if (insertTimestamp) {
eventHeaders.put(TIMESTAMP_HEADER, String.valueOf(System
.currentTimeMillis()));
}
events.add(EventBuilder.withBody(
body.getTextContent().getBytes(
httpServletRequest.getCharacterEncoding()),
eventHeaders));
}
} catch (SAXException ex) {
throw new HTTPBadRequestException(
"Request could not be parsed into valid XML", ex);
} catch (Exception ex) {
throw new HTTPBadRequestException(
"Request is not in expected format. " +
"Please refer documentation for expected format.", ex);
}
return events;
}

public void configure(Context context) {
insertTimestamp = context.getBoolean(CONF_INSERT_TIMESTAMP,
false);
}
}

3)在该工程的flume-demo目录下执行命令mvn package,会将该工程打成jar包,会生产target目录,从中找到flume-demo.jar

ps:操作系统需要安装apache maven及配置了对应的环境变量。

4)在$FLUME_HOME目录下新建文件目录

$ mkdir plugins.d
$ mkdir penqiang
$ cd plugins.d/pengqiang
$ mkdir lib
$ mkdir libext
$ mkdir native

5)将flume-demo.jar拷贝到plugins.d/pengqiang/lib下

6)配置flume文件,http_test.conf

a1.sources=r1
a1.sinks=k1
a1.channels=c1

a1.sources.r1.type=http
a1.sources.r1.bind=localhost
a1.sources.r1.port=50000
a1.sources.r1.channels=c1
a1.sources.r1.handler=org.pq.flumeDemo.sources.HTTPSourceXMLHandler
a1.sources.r1.insertTimestamp=true

a1.sinks.k1.type=logger
a1.sinks.k1.channel=c1

a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100


7)启动服务

$ bin/flume-ng agent -c conf -f conf/http_test.conf -n a1 -Dflume.root.logger=INFO,console

8)测试

工具:firefox浏览器+HttpRequester插件



9)服务器端结果:

.....
2015-12-02 14:35:53,809 (1214826834@qtp-1250857451-0) [INFO - org.pq.flumeDemo.sources.HTTPSourceXMLHan
dler.getEvents(HTTPSourceXMLHandler.java:64)] get nodes=org.apache.xerces.dom.DeepNodeListImpl@1beaed6
2015-12-02 14:35:54,490 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.
LoggerSink.process(LoggerSink.java:94)] Event: { headers:{header1=value1} body: 74 65 73 74                  test }
2015-12-02 14:35:54,491 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink
.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{header1=value1} body: 74 65 73 74 32                                  test2 }
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: