您的位置:首页 > Web前端 > JavaScript

flume篇3:flume把json数据写入carbondata(flume-carbondata-sink)

2020-01-13 21:44 274 查看

flume篇3:flume把json数据写入carbondata(flume-carbondata-sink)
对应非json数据同样适用,可以把非json数据通过拦截器拼接成一个以 ,分隔的string,然后send出去,这样也是ok的
废话不多说,直接上干货

一、 自定义拦截器:
1 拦截器要求:新建一个新的工程,单独打包,保证每个flume的的拦截器都是单独的一个工程打的包,这样保证每次对拦截器修改的时候不影响其他flume业务

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<scala.version>2.10.4</scala.version>
<flume.version>1.8.0</flume.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>${flume.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
<version>3.3</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>6.1.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-store-sdk</artifactId>
<version>1.5.3</version>
</dependency>
<dependency>
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-core</artifactId>
<version>1.5.3</version>
</dependency>
<dependency>
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-common</artifactId>
<version>1.5.3</version>
</dependency>
<dependency>
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-format</artifactId>
<version>1.5.3</version>
</dependency>
<dependency>
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-hadoop</artifactId>
<version>1.5.3</version>
</dependency>
<dependency>
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-processing</artifactId>
<version>1.5.3</version>
</dependency>
<dependency>
<groupId>org.apache.carbondata</groupId>
<artifactId>carbonata</artifactId>
<version>1.5.3</version>
<scope>system</scope>
<systemPath>${project.basedir}/lib/apache-carbondata-1.5.3-bin-spark2.3.2-hadoop2.6.0-cdh5.16.1.jar</systemPath>
</dependency>
<dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-core</artifactId>
<version>2.0.9</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.sshd</groupId>
<artifactId>sshd-core</artifactId>
<version>0.14.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
<version>0.1.54</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.12</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.5</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>16.0.1</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.46</version>
<scope>compile</scope>
</dependency>
</dependencies>

2 拦截器代码如下:
(以下拦截器主要目的是:把一个嵌套2层的body Json中的各个字段取出来,并拼接成一个以 ,作为分隔符的string)

package com.extracting.flume.interceptor.tc;

import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.SimpleDateFormat;
import java.util.List;
import java.util.Map;

public class XyAccessInterceptorTC_SK implements Interceptor {

private static final Logger logger = LoggerFactory.getLogger(XyAccessInterceptorTC_SK.class);

private SimpleDateFormat dataFormat;
@Override
public void initialize() {
dataFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
}

@Override
public Event intercept(Event event) {
String body = new String(event.getBody());
try {
String value="";
Map maps = (Map)JSON.parse(body);
Object body1 = maps.get("body");
//logger.info("@@@@@@@@@@@@@@@@@@@@打印body {}",body1);
Map<String, Object> map2=JSON.parseObject(body1.toString(), Map.class);
Object body2 = map2.get("body");
Map<String,String> maps3 = (Map)JSON.parse(body2.toString());
Map maps2 = (Map)JSON.parse(maps3.toString());
value = value+maps2.get("id")+",";
value = value+maps2.get("name")+",";
value = value+maps2.get("age")+",";
value = value+dataFormat.format(maps2.get("time"));
logger.info("****************打印value {}",value);
event.setBody(value.getBytes());
logger.info("拦截器最hou输出结果为resObject:",value);
event.setBody(value.getBytes());
return event;
} catch (Exception e) {
logger.info("ERROR格式数据" + body.toString());
return null;
}
}

@Override
public List<Event> intercept(List<Event> events) {
List<Event> resultList = Lists.newArrayList();
for (Event event : events) {
Event result = intercept(event);
if (result != null) {
resultList.add(result);
}
}
return resultList;
}

@Override
public void close() {

}

public static class Builder implements Interceptor.Builder {

@Override
public Interceptor build() {
return new XyAccessInterceptorTC_SK();
}

@Override
public void configure(Context context) {
}

}
public static void main(String[] args) {
}

}

3 打包上传,到flume的lib位置,cdh位置如下:/opt/cloudera/parcels/CDH/lib/flume-ng/lib/

二、自定义flume-carbondata-sink
1 flume-carbondata-sink目前官网还没有提供,这是我自定义的一个通用的flume-carbondata-sink包,需要新建一个工程

2 pom 如下:我的pom有点多了,你可以更据情况去除,像apache-carbondata-1.5.3-bin-spark2.3.2-hadoop2.6.0-cdh5.16.1.jar包是自己编译的spark-carbondata包,这里好像用不上,你们可以去除,这里只要保留flume和carbondata相关包即可

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<scala.version>2.10.4</scala.version>
<flume.version>1.8.0</flume.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>${flume.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
<version>3.3</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
<version>6.1.1</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-store-sdk</artifactId>
<version>1.5.3</version>
</dependency>
<dependency>
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-core</artifactId>
<version>1.5.3</version>
</dependency>
<dependency>
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-common</artifactId>
<version>1.5.3</version>
</dependency>
<dependency>
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-format</artifactId>
<version>1.5.3</version>
</dependency>
<dependency>
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-hadoop</artifactId>
<version>1.5.3</version>
</dependency>
<dependency>
<groupId>org.apache.carbondata</groupId>
<artifactId>carbondata-processing</artifactId>
<version>1.5.3</version>
</dependency>

<dependency>
<groupId>org.apache.mina</groupId>
<artifactId>mina-core</artifactId>
<version>2.0.9</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.sshd</groupId>
<artifactId>sshd-core</artifactId>
<version>0.14.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
<version>0.1.54</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.12</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.5</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>16.0.1</version>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.46</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.apache.carbondata</groupId>
<artifactId>carbonata</artifactId>
<version>1.5.3</version>
<scope>system</scope>
<systemPath>${project.basedir}/lib/apache-carbondata-1.5.3-bin-spark2.3.2-hadoop2.6.0-cdh5.16.1.jar</systemPath>
</dependency>
</dependencies>
</project>

3 代码如下

package com.flume.carbondata.sink;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.sdk.file.CarbonWriter;
import org.apache.carbondata.sdk.file.CarbonWriterBuilder;
import org.apache.carbondata.sdk.file.Field;
import org.apache.carbondata.sdk.file.Schema;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.instrumentation.SinkCounter;
import org.apache.flume.sink.AbstractSink;
import org.apache.flume.sink.RollingFileSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CarbondataSink extends AbstractSink
implements Configurable
{
private static final Logger logger = LoggerFactory.getLogger(RollingFileSink.class);
private SinkCounter sinkCounter;
private static final long defaultRollInterval = 60L;
private static final int defaultBatchSize = 100;
private int batchSize = 100;
private String splitType;
private long rollInterval;
private ScheduledExecutorService rollService;
private volatile boolean shouldFlush;
private CarbonWriter carbonWriter;
private CarbonWriterBuilder carbonWriterBuilder;
private SimpleDateFormat dataFormat;
private String path;
private Schema schema;

public Sink.Status process()
throws EventDeliveryException
{
Date data = new Date();
String dateString = this.dataFormat.format(data);
if (this.shouldFlush) {
logger.error("@@@@@@@@@@@@@@@@@@@@@@@@Time to rotate {}");

if (this.carbonWriter != null) {
logger.error("@@@@@@@@@@@@@@@@@开始关闭文件Closing file {}");
try {
logger.info("***********carbonWriter {}", this.carbonWriter);
this.carbonWriter.close();
logger.info("***********carbonWriter {}", this.carbonWriter);
}
catch (IOException e) {
e.printStackTrace();
} finally {
this.carbonWriter = null;
this.shouldFlush = false;
}
this.sinkCounter.incrementConnectionClosedCount();
}
}
if (this.carbonWriter == null) {
try {
logger.info("***********carbonWriter为null开始创建");
this.carbonWriter = this.carbonWriterBuilder.build();
logger.info("***********carbonWriter {}", this.carbonWriter);
}
catch (InvalidLoadOptionException e) {
e.printStackTrace();
}
catch (IOException e) {
e.printStackTrace();
}
}
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
Event event = null;
Sink.Status result = Sink.Status.READY;
try {
transaction.begin();
int eventAttemptCounter = 0;

for (int i = 0; i < this.batchSize; i++) {
event = channel.take();
logger.info("@@@@##判断event是否为空 {}", event);
if (event != null) {
this.sinkCounter.incrementEventDrainAttemptCount();

String body = new String(event.getBody());

String[] values = body.split(this.splitType);

this.carbonWriter.write(values);
logger.info("***********carbonWriter {}", this.carbonWriter);
logger.info("@@@@##****打印出values {}", values);
}
else
{
result = Sink.Status.BACKOFF;
break;
}
}
transaction.commit();
this.sinkCounter.addToEventDrainSuccessCount(eventAttemptCounter);
}
catch (Exception ex) {
throw new EventDeliveryException("Failed to process transaction", ex);
} finally {
transaction.close();
}
return result;
}

public void configure(Context context)
{
this.batchSize = context.getInteger("sink.batchSize", Integer.valueOf(100)).intValue();
this.rollInterval = context.getLong("sink.rollInterval", Long.valueOf(60L)).longValue();
this.path = context.getString("sink.path", "");
String ziduan = context.getString("sink.fields");
String zdlx = context.getString("sink.fieldType");
this.splitType = context.getString("splitType", ",");
this.dataFormat = new SimpleDateFormat("yyyyMMdd");

String[] zds = ziduan.split(",");

String[] zdlxs = zdlx.split(",");
Map map = new HashMap();
map.put("int", DataTypes.INT);
map.put("string", DataTypes.STRING);
map.put("date", DataTypes.DATE);
map.put("timestamp", DataTypes.TIMESTAMP);
map.put("boolean", DataTypes.BOOLEAN);
map.put("long", DataTypes.LONG);
map.put("double", DataTypes.DOUBLE);
map.put("short", DataTypes.SHORT);
map.put("null", DataTypes.NULL);
map.put("byte", DataTypes.BYTE);
List list = new ArrayList();
for (int i = 0; i < zds.length; i++) {
list.add(new Field(zds[i], (DataType)map.get(zdlxs[i])));
}
Field[] fields = new Field[list.size()];
list.toArray(fields);
logger.info("@@@@数组内容 {}", fields[2]);
this.schema = new Schema(fields);
this.carbonWriterBuilder = CarbonWriter.builder().outputPath(this.path+"22222").withCsvInput(this.schema).writtenBy("SDK");

if (this.rollInterval == 0L)
this.rollInterval = 60L;
else {
this.rollInterval = this.rollInterval;
}

if (this.sinkCounter == null)
this.sinkCounter = new SinkCounter(getName());
}

public void start()
{
logger.info("Starting {}...", this);
this.sinkCounter.start();
super.start();

if (this.rollInterval > 0L)
{
this.rollService = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder()
.setNameFormat("rollingFileSink-roller-" +
Thread.currentThread().getId() + "-%d").build());

this.rollService.scheduleAtFixedRate(new Runnable()
{
public void run() {
CarbondataSink .logger.debug("Marking time to flush batch {}", "执行批处理");
CarbondataSink .this.shouldFlush = true;
}
}
, this.rollInterval, this.rollInterval, TimeUnit.SECONDS);
}
else
{
logger.info("RollInterval is not valid, file rolling will not happen.");
}
logger.info("CarbondataSink {} started.", getName());
}

public void stop() {
logger.info("Ftp sink {} stopping...", getName());
this.sinkCounter.stop();
super.stop();
try {
this.carbonWriter.close();
}
catch (IOException e1) {
e1.printStackTrace();
}

if (this.rollInterval > 0L) {
this.rollService.shutdown();

while (!this.rollService.isTerminated()) {
try {
this.rollService.awaitTermination(1L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
logger.debug("Interrupted while waiting for roll service to stop. Please report this.", e);
}
}
}
logger.info("Ftp sink {} stopped. Event metrics: {}", getName(), this.sinkCounter);
}
}

3 打包上传flume-carbondata-sink相关jar包

解释一下这个sink:主要是能把csv或者,分隔的数据,依据carbondata sdk的方式,按天写入hdfs的目录,每天都写在不同的分区当中
上传的jar包有:自定义的flume-carbondata-sink包
以及pom里面涉及的carbondata相关jar包:比如carbondata-core,carbondata-sdk等,我这里不一一列举了

三、配置flume的conf
1 在 /opt/cloudera/parcels/CDH/lib/flume-ng/conf目录下,
vi carbondata.conf
输入以下内容:

ng.sources = kafkaSource
ng.channels = spillablechannel
ng.sinks =  carbondata

ng.sources.kafkaSource.type= org.apache.flume.source.kafka.KafkaSource
ng.sources.kafkaSource.kafka.bootstrap.servers=cdh01:9092,cdh02:9092,cdh03:9092
ng.sources.kafkaSource.kafka.consumer.group.id=zhsq_ry33
ng.sources.kafkaSource.kafka.topics= pd_ry_txjl
ng.sources.kafkaSource.batchSize=1000
ng.sources.kafkaSource.channels= spillablechannel
ng.sources.kafkaSource.kafka.consumer.auto.offset.reset=latest
ng.sources.kafkaSource.interceptors= i2
ng.sources.kafkaSource.interceptors.i2.type=com.iflytek.extracting.flume.interceptor.XyAccessInterceptorTC_SK $Builder
#前面自定义的拦截器

ng.channels.spillablechannel.type =memory
ng.channels.spillablechannel.memoryCapacity=20000
ng.channels.spillablechannel.transactionCapacity=20000
ng.channels.spillablechannel.capacity = 20000

ng.sinks.carbondata.type = com.iflytek.CarbonDataSink   #自定义的CarbonDataSink
ng.sinks.carbondata.sink.fields =id,name,age,time
ng.sinks.carbondata.sink.fieldType = string,string,int,string  # long以及其他格式都是支持的
ng.sinks.carbondata.sink.path = hdfs://cdh01:8020/user/hive/warehouse/carbon.store/carbondata/xytest
#hdfs的目录,数据真正会写在/user/hive/warehouse/carbon.store/carbondata/xytest/yyyyMMdd目录里面
ng.sinks.carbondata.sink.rollInterval = 300
ng.sinks.carbondata.sink.batchSize = 1000
ng.sinks.carbondata.channel = spillablechannel

2 启动flume:

然后前台启动flume,进行调试:
bin/flume-ng agent -n ng -c conf -f conf/carbondata.conf
cdh版本的flume默认的日志打在 /var/log/flume/flume.log里面
查看数据已经接入kudu,并确定没问题可以使用后台提交:
nohup bin/flume-ng agent -n ng -c conf -f conf/carbondata.conf &
任务停止:
jcmd | grep hbase.conf   # 找到含有 hbase.conf的任务
然后kill 任务id即可

此时打开hdfs的50070目录查看数据是否写入
比如hdfs://cdh01:8020/user/hive/warehouse/carbon.store/carbondata/xytest/20191210目录下是否有文件写入

然后在hive(spark sql)中建立外部表
建表语句如下
create external table table_flume stored as carbondata localtion '/user/hive/warehouse/carbon.store/carbondata/xytest/'

四、优化
这样利用flume sink会生成大量小文件,需要我们再建立一个carbondata 的分区表,每天定时把前一天的flume抽来的小文件不断抽取合并到昨天的分区表中:

val createTable=
s"""
| CREATE TABLE IF NOT EXISTS `table_day` (
|   id string,
|   name string,
|   age int,
|   time string
|  )
|  partitioned by(rq string)
|  stored as carbondata
|  """.stripMargin
sparkSession.sql(createTable)
//已经建立好了table_day安装rq(时间yyyyMMdd)作为分区

val data_flume =sparkSession.sql(s"""
select * from table_flume where time <= '2019-12-10 23:59:59' and time >= '2019-12-10 00:00:00'
""")

//sparksql把数据保存到carbondata
def writeTable2CarbondataPartition(sparkSession: SparkSession, datas: DataFrame, tableName: String, rq: String) {
import org.apache.spark.sql.functions._
datas.coalesce(1).withColumn("RQ", lit(rq))
.write.format("carbondata")
.option("BAD_RECORDS_LOGGER_ENABLE", "true")
.option("BAD_RECORD_PATH", "hdfs://cdh01:8020/user/hive/warehouse/carbondata.db")
.option("BAD_RECORDS_ACTION", "FORCE")
//      .saveAsTable(tableName)
.insertInto(tableName)
}
//把昨天table_flume中的数据合并分区抽取到table_day这张分区表中
writeTable2CarbondataPartition(sparkSession,data_flume ,"table_day",rq)

//然后创建视图
val createView=
s"""
| CREATE VIEW IF NOT EXISTS `table_view`
|  as
|  select id ,name,age,time from table_flume
|  union all
|  select id ,name,age,time from table_day
|  """.stripMargin
sparkSession.sql(createView)
  • 点赞 1
  • 收藏
  • 分享
  • 文章举报
合肥菜鸟许某人 发布了11 篇原创文章 · 获赞 4 · 访问量 233 私信 关注
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐