您的位置:首页 > 其它

Nifi初探 - 创建一个简单的自定义Nifi Processor

2017-07-19 10:41 239 查看
网上关于Nifi自定义Processor的中文资料,要么是很古老的eclipse版本,要么太过于简单,学习Nifi的道路确实有点看不清楚,好在找到一篇从零构造一个JsonPROCESSOR的英文文章,单纯翻译外加心得记录,使用的Nifi版本是1.3.0,希望对大家有帮助。
源地址:http://www.nifi.rocks/developing-a-custom-apache-nifi-processor-json/


1. 开始

Nifi有很多可用的、文档化的Processor资源,但是某些时候你依然需要去开发属于你自己的Processor,例如从某些特殊的数据库中提取数据、提取不常见的文件格式,或者其他特殊情况。这篇文章我们创建了一个基础的json文件读取Processor,将内容转化为属性值,本片文章的代码位于
[GitHub](https://github.com/coco11563/jsonNifiExampleProcessor)


2. 项目依赖

目前搭建Nifi Processor项目环境使用较多的还是Maven + IDEA,与其他的组件开发不同,Nifi的要求是在/src/main/resources/META-INF/services/目录下新建一个文件org.apache.nifi.processor.Processor,这个类似于配置文件,指向该Processor所在的目录,比如我的配置文件内容就是

sha0w.pub.jsonNifi.processors.JsonProcessor


我的Maven POM文件的配置讯息如下所示,主要的几个包是

提供nifi api的nifi-api

提供基础工具的nifi-utils

提供Process抽象类接口的nifi-processor-utils

测试的nifi-mock以及junit

jsonpath包

common-io

附上POM文件详细内容

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion>

<groupId>sha0w.pub</groupId>
<artifactId>jsonNifi</a
4000
rtifactId>
<version>1.0-SNAPSHOT</version>
<packaging>nar</packaging>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<nifi.version>1.3.0</nifi.version>
</properties>

<build>
<plugins>
<plugin>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-nar-maven-plugin</artifactId>
<version>1.0.0-incubating</version>
<extensions>true</extensions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.15</version>
</plugin>
</plugins>
</build>

<dependencies>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-api</artifactId>
<version>${nifi.version}</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-utils</artifactId>
<version>${nifi.version}</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-processor-utils</artifactId>
<version>${nifi.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-io</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>com.jayway.jsonpath</groupId>
<artifactId>json-path</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
<version>${nifi.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>


POM内的plugin组件提供了一个将类打包成nifi组件的nar包打包方式(类似于war包),打包部分需要nifi-api依赖,其他组件在之后可以看到对应的作用。

3. JSON Processor

现在自定义Nifi Processor的前期准备工作都做完了,可以开始构建属于我们自己的Processor了。首先我们在我们之前META-INF中的文件中写入的地址处创建一个java class(比如说我的java class 就创建在sha0w.pub.jsonNifi.processors/下的 JsonProcessor.java)

@SideEffectFree
@Tags({"JSON","SHA0W.PUB"})
@CapabilityDescription("Fetch value from json path.")
public class JsonProcessor extends AbstractProcessor{
}


我们使其extends AbstractProcessor这个抽象类,@Tag标签是为了在web GUI中,能够使用搜索的方式快速找到我们自己定义的这个Processor。CapabilityDescription内的值会暂时在Processor选择的那个页面中,相当于一个备注。

一般来说只需要继承AbstractProcessor就可以了,但是某些复杂的任务可能需要去继承更底层的AbstractSessionFactoryProcessor这个抽象类。

private List<PropertyDescriptor> properties;
private Set<Relationship> relationships;

public static final String MATCH_ATTR = "match";

public static final PropertyDescriptor JSON_PATH = new PropertyDescriptor.Builder()
.name("Json Path")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();

public static final Relationship SUCCESS = new Relationship.Builder()
.name("SUCCESS")
.description("Succes relationship")
.build();


以上代码展示的是一些Processor内部变量以及一些初始化的Nifi变量,可见我们通过
PropertyDescriptor
以及
Relationship
中的模板方法定义了两个新的关系和属性描述值,这些值会出现在webUI中,多个选项型的属性值定义如下:

public static final AllowableValue EXTENSIVE = new AllowableValue("Extensive", "Extensive",
"Everything will be logged - use with caution!");
public static final AllowableValue VERBOSE = new AllowableValue("Verbose", "Verbose",
"Quite a bit of logging will occur");
public static final AllowableValue REGULAR = new AllowableValue("Regular", "Regular",
"Typical logging will occur");

public static final PropertyDescriptor LOG_LEVEL = new PropertyDescriptor.Builder()
.name("Amount to Log")
.description("How much the Processor should log")
.allowableValues(REGULAR, VERBOSE, EXTENSIVE)
.defaultValue(REGULAR.getValue())
...
.build();


这样定义的处理器的WEB UI的LOG_LEVEL选项内就会有三个选项。

@Override
public void init(final ProcessorInitializationContext context){
List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(JSON_PATH);
this.properties = Collections.unmodifiableList(properties);

Set<Relationship> relationships = new HashSet<>();
relationships.add(SUCCESS);
this.relationships = Collections.unmodifiableSet(relationships);
}

@Override
public Set<Relationship> getRelationships(){
return relationships;
}

@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors(){
return properties;
}


如上是初始化Nifi进程,由于Nifi是高度并发条件,所以在此处进行更改时需要很小心,同时这也是为什么
properties
relationship
是存储在一个不可变的集合中。

@Override
public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException {
final AtomicReference<String> value = new AtomicReference<>();

FlowFile flowFile = processSession.get();

processSession.read(flowFile, in -> {
try{
String json = IOUtils.toString(in);
String result = JsonPath.read(json, "$.hello");
value.set(result);
}catch(Exce
bc9f
ption ex){
ex.printStackTrace();
getLogger().error("Failed to read json string.");
}
});

String results = value.get();
if(results != null && !results.isEmpty()){
flowFile = processSession.putAttribute(flowFile, "match", results);
}

// To write the results back out ot flow file
flowFile = processSession.write(flowFile, out -> out.write(value.get().getBytes()));

processSession.transfer(flowFile, SUCCESS);

}


然后是整个Processor的核心部分 -> onTrigger 部分, onTrigger方法会在一个flow file被传入处理器时调用。为了读取以及改变传递来的FlowFile,Nifi提供了三个callback接口方法:

InputStreamCallback:

该接口继承细节如下:

session.read(flowfile, new InputStreamCallback() {
@Override
public void process(InputStream in) throws IOException {
try{
String json = IOUtils.toString(in);
String result = JsonPath.read(json, "$.hello");
value.set(result);
}catch(Exception ex){
ex.printStackTrace();
getLogger().error("Failed to read json string.");
}
}
});


使用了Apache Commons内的方法去读取输入流,并将其转换为String形式,使用JsonPath包去读取Json数据同时将值传递回来

- OutputStreamCallback

flowfile = session.write(flowfile, new OutputStreamCallback() {
@Override
public void process(OutputStream out) throws IOException {
out.write(value.get().getBytes());
}
});


可见这部分我们只是单纯把之前的读取内容写到值中

- StreamCallback

flowfile = session.write(flowfile, new OutputStreamCallback() {

@Override
public void process(OutputStream out) throws IOException {
out.write(value.get().getBytes());
}
});


StreamCallback同时是同一个FlowFile的读取以及写入流

然后下面是将读取json数据写入到flowFile中。

// Write the results to an attribute
String results = value.get();
if(results != null && !results.isEmpty()){
flowfile = session.putAttribute(flowfile, "match", results);
}


最后使用
transfer()
功能传递回这个flowFile以及成功标识。

最后使用mvn clean install功能去build出这个nar包

4. 部署

Copy the target/examples-1.0-SNAPSHOT.nar to $NIFI_HOME/lib

$NIFI_HOME/bin/nifi.sh stop

$NIFI_HOME/bin/nifi.sh start
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐