Nifi初探 - 创建一个简单的自定义Nifi Processor
2017-07-19 10:41
239 查看
网上关于Nifi自定义Processor的中文资料,要么是很古老的eclipse版本,要么太过于简单,学习Nifi的道路确实有点看不清楚,好在找到一篇从零构造一个JsonPROCESSOR的英文文章,单纯翻译外加心得记录,使用的Nifi版本是1.3.0,希望对大家有帮助。 源地址:http://www.nifi.rocks/developing-a-custom-apache-nifi-processor-json/
1. 开始
Nifi有很多可用的、文档化的Processor资源,但是某些时候你依然需要去开发属于你自己的Processor,例如从某些特殊的数据库中提取数据、提取不常见的文件格式,或者其他特殊情况。这篇文章我们创建了一个基础的json文件读取Processor,将内容转化为属性值,本片文章的代码位于[GitHub](https://github.com/coco11563/jsonNifiExampleProcessor)
2. 项目依赖
目前搭建Nifi Processor项目环境使用较多的还是Maven + IDEA,与其他的组件开发不同,Nifi的要求是在/src/main/resources/META-INF/services/目录下新建一个文件org.apache.nifi.processor.Processor,这个类似于配置文件,指向该Processor所在的目录,比如我的配置文件内容就是sha0w.pub.jsonNifi.processors.JsonProcessor
我的Maven POM文件的配置讯息如下所示,主要的几个包是
提供nifi api的nifi-api
提供基础工具的nifi-utils
提供Process抽象类接口的nifi-processor-utils
测试的nifi-mock以及junit
jsonpath包
common-io
附上POM文件详细内容
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>sha0w.pub</groupId> <artifactId>jsonNifi</a 4000 rtifactId> <version>1.0-SNAPSHOT</version> <packaging>nar</packaging> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <nifi.version>1.3.0</nifi.version> </properties> <build> <plugins> <plugin> <groupId>org.apache.nifi</groupId> <artifactId>nifi-nar-maven-plugin</artifactId> <version>1.0.0-incubating</version> <extensions>true</extensions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.15</version> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-api</artifactId> <version>${nifi.version}</version> </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-utils</artifactId> <version>${nifi.version}</version> </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-processor-utils</artifactId> <version>${nifi.version}</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-io</artifactId> <version>1.3.2</version> </dependency> <dependency> <groupId>com.jayway.jsonpath</groupId> <artifactId>json-path</artifactId> <version>1.2.0</version> </dependency> <dependency> <groupId>org.apache.nifi</groupId> <artifactId>nifi-mock</artifactId> <version>${nifi.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.10</version> <scope>test</scope> </dependency> </dependencies> </project>
POM内的plugin组件提供了一个将类打包成nifi组件的nar包打包方式(类似于war包),打包部分需要nifi-api依赖,其他组件在之后可以看到对应的作用。
3. JSON Processor
现在自定义Nifi Processor的前期准备工作都做完了,可以开始构建属于我们自己的Processor了。首先我们在我们之前META-INF中的文件中写入的地址处创建一个java class(比如说我的java class 就创建在sha0w.pub.jsonNifi.processors/下的 JsonProcessor.java)@SideEffectFree @Tags({"JSON","SHA0W.PUB"}) @CapabilityDescription("Fetch value from json path.") public class JsonProcessor extends AbstractProcessor{ }
我们使其extends AbstractProcessor这个抽象类,@Tag标签是为了在web GUI中,能够使用搜索的方式快速找到我们自己定义的这个Processor。CapabilityDescription内的值会暂时在Processor选择的那个页面中,相当于一个备注。
一般来说只需要继承AbstractProcessor就可以了,但是某些复杂的任务可能需要去继承更底层的AbstractSessionFactoryProcessor这个抽象类。
private List<PropertyDescriptor> properties; private Set<Relationship> relationships; public static final String MATCH_ATTR = "match"; public static final PropertyDescriptor JSON_PATH = new PropertyDescriptor.Builder() .name("Json Path") .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); public static final Relationship SUCCESS = new Relationship.Builder() .name("SUCCESS") .description("Succes relationship") .build();
以上代码展示的是一些Processor内部变量以及一些初始化的Nifi变量,可见我们通过
PropertyDescriptor以及
Relationship中的模板方法定义了两个新的关系和属性描述值,这些值会出现在webUI中,多个选项型的属性值定义如下:
public static final AllowableValue EXTENSIVE = new AllowableValue("Extensive", "Extensive", "Everything will be logged - use with caution!"); public static final AllowableValue VERBOSE = new AllowableValue("Verbose", "Verbose", "Quite a bit of logging will occur"); public static final AllowableValue REGULAR = new AllowableValue("Regular", "Regular", "Typical logging will occur"); public static final PropertyDescriptor LOG_LEVEL = new PropertyDescriptor.Builder() .name("Amount to Log") .description("How much the Processor should log") .allowableValues(REGULAR, VERBOSE, EXTENSIVE) .defaultValue(REGULAR.getValue()) ... .build();
这样定义的处理器的WEB UI的LOG_LEVEL选项内就会有三个选项。
@Override public void init(final ProcessorInitializationContext context){ List<PropertyDescriptor> properties = new ArrayList<>(); properties.add(JSON_PATH); this.properties = Collections.unmodifiableList(properties); Set<Relationship> relationships = new HashSet<>(); relationships.add(SUCCESS); this.relationships = Collections.unmodifiableSet(relationships); } @Override public Set<Relationship> getRelationships(){ return relationships; } @Override public List<PropertyDescriptor> getSupportedPropertyDescriptors(){ return properties; }
如上是初始化Nifi进程,由于Nifi是高度并发条件,所以在此处进行更改时需要很小心,同时这也是为什么
properties和
relationship是存储在一个不可变的集合中。
@Override public void onTrigger(ProcessContext processContext, ProcessSession processSession) throws ProcessException { final AtomicReference<String> value = new AtomicReference<>(); FlowFile flowFile = processSession.get(); processSession.read(flowFile, in -> { try{ String json = IOUtils.toString(in); String result = JsonPath.read(json, "$.hello"); value.set(result); }catch(Exce bc9f ption ex){ ex.printStackTrace(); getLogger().error("Failed to read json string."); } }); String results = value.get(); if(results != null && !results.isEmpty()){ flowFile = processSession.putAttribute(flowFile, "match", results); } // To write the results back out ot flow file flowFile = processSession.write(flowFile, out -> out.write(value.get().getBytes())); processSession.transfer(flowFile, SUCCESS); }
然后是整个Processor的核心部分 -> onTrigger 部分, onTrigger方法会在一个flow file被传入处理器时调用。为了读取以及改变传递来的FlowFile,Nifi提供了三个callback接口方法:
InputStreamCallback:
该接口继承细节如下:
session.read(flowfile, new InputStreamCallback() { @Override public void process(InputStream in) throws IOException { try{ String json = IOUtils.toString(in); String result = JsonPath.read(json, "$.hello"); value.set(result); }catch(Exception ex){ ex.printStackTrace(); getLogger().error("Failed to read json string."); } } });
使用了Apache Commons内的方法去读取输入流,并将其转换为String形式,使用JsonPath包去读取Json数据同时将值传递回来
- OutputStreamCallback
flowfile = session.write(flowfile, new OutputStreamCallback() { @Override public void process(OutputStream out) throws IOException { out.write(value.get().getBytes()); } });
可见这部分我们只是单纯把之前的读取内容写到值中
- StreamCallback
flowfile = session.write(flowfile, new OutputStreamCallback() { @Override public void process(OutputStream out) throws IOException { out.write(value.get().getBytes()); } });
StreamCallback同时是同一个FlowFile的读取以及写入流
然后下面是将读取json数据写入到flowFile中。
// Write the results to an attribute String results = value.get(); if(results != null && !results.isEmpty()){ flowfile = session.putAttribute(flowfile, "match", results); }
最后使用
transfer()功能传递回这个flowFile以及成功标识。
最后使用mvn clean install功能去build出这个nar包
4. 部署
Copy the target/examples-1.0-SNAPSHOT.nar to $NIFI_HOME/lib$NIFI_HOME/bin/nifi.sh stop
$NIFI_HOME/bin/nifi.sh start
相关文章推荐
- 在ArcGIS Server Java ADF中创建一个自定义task的简单步骤
- 创建一个简单的web服务器(二):使用自定义的类加载器来替换URLClassLoader
- 创建一个简单的表视图&自定义UITableView的表单元格
- sharepoint 2010 自定义字段开发(1) 创建一个简单的列表自定义字段
- 用自定义的module创建一个简单站点
- [原创]java WEB学习笔记40:简单标签概述(背景,使用一个标签,标签库的API,SimpleTag接口,创建一个自定义的标签的步骤 和简单实践)
- sharepoint 2010 创建一个简单的列表自定义字段
- 日期选取器、单滚轮选取器、多滚轮选取器、滚轮内容根据环境变化、自定义选取器创建一个简单游戏( 抽奖机 )
- android 创建一个简单的自定义对话框
- BASH 创建一个命令 f1 使之可以简单管理文件版本
- nodejs 利用express框架 创建一个简单的web项目
- 0002、node 之用express创建一个简单的服务器以及响应
- Django学习笔记(二)创建一个简单页面
- 第六章 创建一个基于Table的简单App(三)
- 在ASP.NET 2.0中操作数据之六十:创建一个自定义的Database-Driven Site Map Provider
- 使用Subversion创建一个简单的svn服务器
- 如何创建一个最简单的Windows桌面应用程序 (C++)
- Visual Studio DSL 入门 3---创建一个简单的DSL模型
- 如何创建一个最简单的Linux自启动服务?
- Box2D 物理引擎---创建一个简单的模拟物理世界