ELK(分布式大数据搜索和日志挖掘及可视化)之实战(先不要审核只是保存一些记录)
2017-09-18 09:34
543 查看
logstash 配置
input{
file{
path =>"opt/logs/*.sql"
type =>"logjson"
start_position => "beginning"
sincedb_path =>"/dev/null"
}
}
filter{
json{
#将默认中的message内容转换成json内容,并删除message域
source => "message"
remove_field =>"message"
}
}
output{
elasticsearch{
hosts =>["172.16.117.93:9200"]
index =>"query"
document_type=> "%{type}"
flush_size=>20000
idle_flush_time =>10
}
}
es
public class ESsearch{
static Essearch ts=new ESearch();
static Client client =ts.TransportClientContect();
//返回client对象
public Client TransportClientContect(){
Settings settings =Setting.settingsBuilder().put("cluster.name","topic").build();
Client client=null;
try{
client =TransportClient.builder().setting(settings).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("172.16.117.93"),9300));
}catch(){}
return client;
}
}
}
class ESmain{
static ESearch ts=new ESearch();
static Client client=ts.TransportClientContect();
public static void main(String[] args) throws IOException{
Excel excel =new Excel();
Instant startClock=Instant.now();
System.out.println("开始时间"+startClock);
String index="logstash-sql---3p";
String type="loghson";
String value;
List<String> stringList=new ArrayList<String>();
//scroll模式启动 每次50000
SearchResponse scrollResponse=client.prepareSearch(index)
.setSearchType(SearchType.SCAN).setSize(10000)
.setQuery(QueryBuilders.matchAllQuery())
.setQuery(QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery("q1","q1v"))
.operator(prg.elasticsearch.index.query.MatchQueryBuilder.Operator.AND)
.must(QueryBuilders.matchQuery("q2","q2v"))
setScroll(TimeVakue.timeValueMinutes(1))
.execute().actionGet();
int count =(int)scrollResponse.getHits().getTotalHits();//第一次不返回数据
for(int i=0,sum=0;sum<count;i++){
scrollResponse=client.prepareSearchScroll(scrollResponse.getScrollId())
.setScroll(Timevalue.timeValueMinutes(8))
.execute().actionGet();
sum+=scrollResponse.getHits().hits().length;
for(SearchHit hit:srollResponse.getHits()){
value=hits.getSource.get("param").toString();
stringList.add(value);
}
File destFile=new File("./output/"+i+".xls");
try{
excel.createStringExcelFile(stringList,destFile);
}catch(){}
StringList.clear();
System.out.println("总数:"+count+"已查到:"+sum);
}
}
}
input{
file{
path =>"opt/logs/*.sql"
type =>"logjson"
start_position => "beginning"
sincedb_path =>"/dev/null"
}
}
filter{
json{
#将默认中的message内容转换成json内容,并删除message域
source => "message"
remove_field =>"message"
}
}
output{
elasticsearch{
hosts =>["172.16.117.93:9200"]
index =>"query"
document_type=> "%{type}"
flush_size=>20000
idle_flush_time =>10
}
}
es
public class ESsearch{
static Essearch ts=new ESearch();
static Client client =ts.TransportClientContect();
//返回client对象
public Client TransportClientContect(){
Settings settings =Setting.settingsBuilder().put("cluster.name","topic").build();
Client client=null;
try{
client =TransportClient.builder().setting(settings).build()
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("172.16.117.93"),9300));
}catch(){}
return client;
}
}
}
class ESmain{
static ESearch ts=new ESearch();
static Client client=ts.TransportClientContect();
public static void main(String[] args) throws IOException{
Excel excel =new Excel();
Instant startClock=Instant.now();
System.out.println("开始时间"+startClock);
String index="logstash-sql---3p";
String type="loghson";
String value;
List<String> stringList=new ArrayList<String>();
//scroll模式启动 每次50000
SearchResponse scrollResponse=client.prepareSearch(index)
.setSearchType(SearchType.SCAN).setSize(10000)
.setQuery(QueryBuilders.matchAllQuery())
.setQuery(QueryBuilders.boolQuery()
.must(QueryBuilders.matchQuery("q1","q1v"))
.operator(prg.elasticsearch.index.query.MatchQueryBuilder.Operator.AND)
.must(QueryBuilders.matchQuery("q2","q2v"))
setScroll(TimeVakue.timeValueMinutes(1))
.execute().actionGet();
int count =(int)scrollResponse.getHits().getTotalHits();//第一次不返回数据
for(int i=0,sum=0;sum<count;i++){
scrollResponse=client.prepareSearchScroll(scrollResponse.getScrollId())
.setScroll(Timevalue.timeValueMinutes(8))
.execute().actionGet();
sum+=scrollResponse.getHits().hits().length;
for(SearchHit hit:srollResponse.getHits()){
value=hits.getSource.get("param").toString();
stringList.add(value);
}
File destFile=new File("./output/"+i+".xls");
try{
excel.createStringExcelFile(stringList,destFile);
}catch(){}
StringList.clear();
System.out.println("总数:"+count+"已查到:"+sum);
}
}
}
相关文章推荐
- python数据分析与挖掘项目实战记录
- 开源分布式搜索平台ELK(Elasticsearch+Logstash+Kibana)+Redis+Syslog-ng实现日志实时搜索
- 开源分布式搜索平台ELK+Redis+Syslog-ng实现日志实时搜索
- 开源分布式搜索平台ELK+REDIS+SYSLOG-NG实现日志实时搜索(转载)
- CentOS7搭建开源分布式搜索平台ELK实现日志实时搜索并展示图表 推荐
- Mariadb 分布式事务两阶段提交 binlog日志 查询日志 都记录了一些什么内容 以及恢复被丢失数据方式
- 基于网站日志数据挖掘的用户访问行为模式可视化研究
- 收集、分析线上日志数据实战——ELK
- 一共81个,开源大数据处理工具汇总:查询引擎、流式计算、迭代计算、离线计算、键值存储、表格存储、文件存储、资源管理、日志收集系统、消息系统、分布式服务、集群管理、基础设施、搜索引擎、数据挖掘=监控
- 基于用户行为的数据分析与挖掘+分布式日志管理系统
- 【D3.JS数据可视化实战记录】绘制力学图
- 收集、分析线上日志数据实战——ELK
- 一共81个,开源大数据处理工具汇总:查询引擎、流式计算、迭代计算、离线计算、键值存储、表格存储、文件存储、资源管理、日志收集系统、消息系统、分布式服务、集群管理、基础设施、搜索引擎、数据挖掘=监控
- 开源分布式搜索平台ELK+Redis+Syslog-ng实现日志实时搜索
- 【D3.JS数据可视化实战记录】绘制动态状态变化趋势图
- 实战Kibana的日志关键词搜索和日志可视化
- 【D3.JS数据可视化实战记录】绘制条形图
- 数据仓库与数据挖掘的一些基本概念
- 一起谈.NET技术,.NET 分布式架构开发实战之三 数据访问深入一点的思考
- 数据挖掘工程师如何选择数据可视化工具?