java操作nsq数据插入elasticsearch
2017-09-21 11:08
846 查看
在网上找了很多资料关于怎么从nsq中通过java拉取数据存到es中去,因为之前没有接触过nsq和elasticsearch。所以没有一套完整的流程案例,也折腾了好几天。写下来给大家借鉴,也给自己做个记录,以后忘了拿出来看看。
要通过java调用nsq消息服务,nsq官方提供了很多的client。
用maven构建项目,在Pom添加依赖:
javaNSQclient:
然后根据例子建立生产者和消费者:
生产者:
消费者:
添加相应信息后,就可以消费数据啦。
这是我的目录结构。
实体类
其中实体类中必须要有id属性(案例中创建了一个id属性,在插入时并不赋值,es会自动生成唯一键的id),如果在原来的数据中有一个类似于唯一键的,直接在该属性上添加@ID 注解(如上的uuid,假设是唯一键),如果没有就添加一个ID属性并get,set。不然运行时会报错。
在service实现类中注入repository。
调用其save方法进行插入。
最后在controller控制层中调用即可,如果nsq中拉取的数据没有唯一键的,在最后的save方法前,通过map来代替bean进行转换就可以啦。
java调用Nsq消息服务
首先,要了解一下nsq是一个什么东西,百度一下就可以稍做了解,关于安装可以参照以下博客(在windows下安装,其他系统还没了解):http://blog.csdn.net/tian_lai_yuyuh/article/details/52700174。要通过java调用nsq消息服务,nsq官方提供了很多的client。
这里我使用的是javaNSQclient
在github上也有案例,可以参考https://github.com/brainlag/JavaNSQClient用maven构建项目,在Pom添加依赖:
javaNSQclient:
<dependency> <groupId>com.github.brainlag</groupId> <artifactId>nsq-client</artifactId> <version>1.0.0.RC4</version> </dependency>
然后根据例子建立生产者和消费者:
生产者:
NSQProducer producer = new NSQProducer().addAddress("localhost", 4150).start(); producer.produce("TestTopic", ("this is a message").getBytes());
消费者:
NSQLookup lookup = new DefaultNSQLookup(); lookup.addLookupAddress("localhost", 4161); NSQConsumer consumer = new NSQConsumer(lookup, "speedtest", "dustin", (message) -> { System.out.println("received: " + message); //now mark the message as finished. message.finished(); //or you could requeue it, which indicates a failure and puts it back on the queue. //message.requeue(); }); consumer.start();
添加相应信息后,就可以消费数据啦。
插入到elasticsearch中。
这是我的目录结构。
实体类
private String id; private String ip; private String level; private String push_time; private String body; private String uuid; private String service_name; @ID private String uuid;
其中实体类中必须要有id属性(案例中创建了一个id属性,在插入时并不赋值,es会自动生成唯一键的id),如果在原来的数据中有一个类似于唯一键的,直接在该属性上添加@ID 注解(如上的uuid,假设是唯一键),如果没有就添加一个ID属性并get,set。不然运行时会报错。
repository类
@Repository public interface DataRepository extends ElasticsearchRepository<Data,String>{ //继承ElasticsearchRepository的操作功能 //ElasticsearchRepository接口中有相应的增删改差的功能。如有其//他需求可以重写接口中的方法。 }
配置elasticsearch地址:
.yml和properties文件都可以,此处我选用的是yml格式:spring: data: 4000 elasticsearch: cluster-nodes: 192.168.1.8:9300 cluster-name: poch-test repositories: enabled: true properties: client.transport.ignore_cluster_name: true
service层
先写service接口(此处省略).在service实现类中注入repository。
调用其save方法进行插入。
最后在controller控制层中调用即可,如果nsq中拉取的数据没有唯一键的,在最后的save方法前,通过map来代替bean进行转换就可以啦。
第一次写博客,如有写的不好的地方请指教!
相关文章推荐
- Java 数据结构之数组的操作二:数据插入与二分查找法
- java操作elasticsearch使用QueryBuilders进行数据查询
- 大数据学习[11]:JAVA连接elasticsearch5.6.1操作|问题|分析
- elasticsearch__1__java操作之连接es,创建Mapping,保存数据
- java操作elasticsearch 5.6.0查询、插入
- Java操作ElasticSearch之Get数据
- 以CSV文件导入MySQL的批量数据插入操作之Java操作
- JAVA对数据库进行操作,实现数据库中数据的插入,查询,更改,删除操作
- elasticsearch Insert 插入数据和delete 删除数据(Java)
- Java在数据库操作中批量插入数据
- Elasticsearch通过JAVA创建索引、Mapping以及数据的增删该查操作
- elasticsearch__3__java操作之Facets 数据分组统计处理
- Java JDBC批处理插入数据操作
- 大数据下的日志--ElasticSearch部分(二)--结合Java基本操作
- Java操作ElasticSearch之Update数据 and Java操作ElasticSearch之Delete数据
- JAVA对数据库进行操作,实现数据库中数据的插入,查询,更改,删除操作
- elasticsearch__2__java操作之数据搜索查询
- Elasticsearch教程(七) elasticsearch Insert 插入数据(Java)
- 实时数据之python操作elasticsearch监控数据插入图表分析
- 传参数无返回值的 java 调oracle的存储过程.(向数据库表中插入数据)