flume篇3:flume把json数据写入carbondata(flume-carbondata-sink)
flume篇3:flume把json数据写入carbondata(flume-carbondata-sink)
对应非json数据同样适用,可以把非json数据通过拦截器拼接成一个以 ,分隔的string,然后send出去,这样也是ok的
废话不多说,直接上干货
一、 自定义拦截器:
1 拦截器要求:新建一个新的工程,单独打包,保证每个flume的的拦截器都是单独的一个工程打的包,这样保证每次对拦截器修改的时候不影响其他flume业务
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.7</maven.compiler.source> <maven.compiler.target>1.7</maven.compiler.target> <scala.version>2.10.4</scala.version> <flume.version>1.8.0</flume.version> </properties> <dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>${flume.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>commons-net</groupId> <artifactId>commons-net</artifactId> <version>3.3</version> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.4</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.testng</groupId> <artifactId>testng</artifactId> <version>6.1.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.carbondata</groupId> <artifactId>carbondata-store-sdk</artifactId> <version>1.5.3</version> </dependency> <dependency> <groupId>org.apache.carbondata</groupId> <artifactId>carbondata-core</artifactId> <version>1.5.3</version> </dependency> <dependency> <groupId>org.apache.carbondata</groupId> <artifactId>carbondata-common</artifactId> <version>1.5.3</version> </dependency> <dependency> <groupId>org.apache.carbondata</groupId> <artifactId>carbondata-format</artifactId> <version>1.5.3</version> </dependency> <dependency> <groupId>org.apache.carbondata</groupId> <artifactId>carbondata-hadoop</artifactId> <version>1.5.3</version> </dependency> <dependency> <groupId>org.apache.carbondata</groupId> <artifactId>carbondata-processing</artifactId> <version>1.5.3</version> </dependency> <dependency> <groupId>org.apache.carbondata</groupId> <artifactId>carbonata</artifactId> <version>1.5.3</version> <scope>system</scope> <systemPath>${project.basedir}/lib/apache-carbondata-1.5.3-bin-spark2.3.2-hadoop2.6.0-cdh5.16.1.jar</systemPath> </dependency> <dependency> <groupId>org.apache.mina</groupId> <artifactId>mina-core</artifactId> <version>2.0.9</version> <scope>test</scope> </dependency> <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-all</artifactId> <version>1.9.5</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.sshd</groupId> <artifactId>sshd-core</artifactId> <version>0.14.0</version> <scope>test</scope> </dependency> <dependency> <groupId>com.jcraft</groupId> <artifactId>jsch</artifactId> <version>0.1.54</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.12</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.5</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>16.0.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.11.0.0</version> <scope>compile</scope> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.46</version> <scope>compile</scope> </dependency> </dependencies>
2 拦截器代码如下:
(以下拦截器主要目的是:把一个嵌套2层的body Json中的各个字段取出来,并拼接成一个以 ,作为分隔符的string)
package com.extracting.flume.interceptor.tc; import com.alibaba.fastjson.JSON; import com.google.common.collect.Lists; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.text.SimpleDateFormat; import java.util.List; import java.util.Map; public class XyAccessInterceptorTC_SK implements Interceptor { private static final Logger logger = LoggerFactory.getLogger(XyAccessInterceptorTC_SK.class); private SimpleDateFormat dataFormat; @Override public void initialize() { dataFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); } @Override public Event intercept(Event event) { String body = new String(event.getBody()); try { String value=""; Map maps = (Map)JSON.parse(body); Object body1 = maps.get("body"); //logger.info("@@@@@@@@@@@@@@@@@@@@打印body {}",body1); Map<String, Object> map2=JSON.parseObject(body1.toString(), Map.class); Object body2 = map2.get("body"); Map<String,String> maps3 = (Map)JSON.parse(body2.toString()); Map maps2 = (Map)JSON.parse(maps3.toString()); value = value+maps2.get("id")+","; value = value+maps2.get("name")+","; value = value+maps2.get("age")+","; value = value+dataFormat.format(maps2.get("time")); logger.info("****************打印value {}",value); event.setBody(value.getBytes()); logger.info("拦截器最hou输出结果为resObject:",value); event.setBody(value.getBytes()); return event; } catch (Exception e) { logger.info("ERROR格式数据" + body.toString()); return null; } } @Override public List<Event> intercept(List<Event> events) { List<Event> resultList = Lists.newArrayList(); for (Event event : events) { Event result = intercept(event); if (result != null) { resultList.add(result); } } return resultList; } @Override public void close() { } public static class Builder implements Interceptor.Builder { @Override public Interceptor build() { return new XyAccessInterceptorTC_SK(); } @Override public void configure(Context context) { } } public static void main(String[] args) { } }
3 打包上传,到flume的lib位置,cdh位置如下:/opt/cloudera/parcels/CDH/lib/flume-ng/lib/
二、自定义flume-carbondata-sink
1 flume-carbondata-sink目前官网还没有提供,这是我自定义的一个通用的flume-carbondata-sink包,需要新建一个工程
2 pom 如下:我的pom有点多了,你可以更据情况去除,像apache-carbondata-1.5.3-bin-spark2.3.2-hadoop2.6.0-cdh5.16.1.jar包是自己编译的spark-carbondata包,这里好像用不上,你们可以去除,这里只要保留flume和carbondata相关包即可
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.7</maven.compiler.source> <maven.compiler.target>1.7</maven.compiler.target> <scala.version>2.10.4</scala.version> <flume.version>1.8.0</flume.version> </properties> <dependencies> <dependency> <groupId>org.apache.flume</groupId> <artifactId>flume-ng-core</artifactId> <version>${flume.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>commons-net</groupId> <artifactId>commons-net</artifactId> <version>3.3</version> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.4</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.testng</groupId> <artifactId>testng</artifactId> <version>6.1.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.carbondata</groupId> <artifactId>carbondata-store-sdk</artifactId> <version>1.5.3</version> </dependency> <dependency> <groupId>org.apache.carbondata</groupId> <artifactId>carbondata-core</artifactId> <version>1.5.3</version> </dependency> <dependency> <groupId>org.apache.carbondata</groupId> <artifactId>carbondata-common</artifactId> <version>1.5.3</version> </dependency> <dependency> <groupId>org.apache.carbondata</groupId> <artifactId>carbondata-format</artifactId> <version>1.5.3</version> </dependency> <dependency> <groupId>org.apache.carbondata</groupId> <artifactId>carbondata-hadoop</artifactId> <version>1.5.3</version> </dependency> <dependency> <groupId>org.apache.carbondata</groupId> <artifactId>carbondata-processing</artifactId> <version>1.5.3</version> </dependency> <dependency> <groupId>org.apache.mina</groupId> <artifactId>mina-core</artifactId> <version>2.0.9</version> <scope>test</scope> </dependency> <dependency> <groupId>org.mockito</groupId> <artifactId>mockito-all</artifactId> <version>1.9.5</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.sshd</groupId> <artifactId>sshd-core</artifactId> <version>0.14.0</version> <scope>test</scope> </dependency> <dependency> <groupId>com.jcraft</groupId> <artifactId>jsch</artifactId> <version>0.1.54</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.12</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.47</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.5</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>16.0.1</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.46</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.carbondata</groupId> <artifactId>carbonata</artifactId> <version>1.5.3</version> <scope>system</scope> <systemPath>${project.basedir}/lib/apache-carbondata-1.5.3-bin-spark2.3.2-hadoop2.6.0-cdh5.16.1.jar</systemPath> </dependency> </dependencies> </project>
3 代码如下
package com.flume.carbondata.sink; import com.google.common.util.concurrent.ThreadFactoryBuilder; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.sdk.file.CarbonWriter; import org.apache.carbondata.sdk.file.CarbonWriterBuilder; import org.apache.carbondata.sdk.file.Field; import org.apache.carbondata.sdk.file.Schema; import org.apache.flume.*; import org.apache.flume.conf.Configurable; import org.apache.flume.instrumentation.SinkCounter; import org.apache.flume.sink.AbstractSink; import org.apache.flume.sink.RollingFileSink; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class CarbondataSink extends AbstractSink implements Configurable { private static final Logger logger = LoggerFactory.getLogger(RollingFileSink.class); private SinkCounter sinkCounter; private static final long defaultRollInterval = 60L; private static final int defaultBatchSize = 100; private int batchSize = 100; private String splitType; private long rollInterval; private ScheduledExecutorService rollService; private volatile boolean shouldFlush; private CarbonWriter carbonWriter; private CarbonWriterBuilder carbonWriterBuilder; private SimpleDateFormat dataFormat; private String path; private Schema schema; public Sink.Status process() throws EventDeliveryException { Date data = new Date(); String dateString = this.dataFormat.format(data); if (this.shouldFlush) { logger.error("@@@@@@@@@@@@@@@@@@@@@@@@Time to rotate {}"); if (this.carbonWriter != null) { logger.error("@@@@@@@@@@@@@@@@@开始关闭文件Closing file {}"); try { logger.info("***********carbonWriter {}", this.carbonWriter); this.carbonWriter.close(); logger.info("***********carbonWriter {}", this.carbonWriter); } catch (IOException e) { e.printStackTrace(); } finally { this.carbonWriter = null; this.shouldFlush = false; } this.sinkCounter.incrementConnectionClosedCount(); } } if (this.carbonWriter == null) { try { logger.info("***********carbonWriter为null开始创建"); this.carbonWriter = this.carbonWriterBuilder.build(); logger.info("***********carbonWriter {}", this.carbonWriter); } catch (InvalidLoadOptionException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } Channel channel = getChannel(); Transaction transaction = channel.getTransaction(); Event event = null; Sink.Status result = Sink.Status.READY; try { transaction.begin(); int eventAttemptCounter = 0; for (int i = 0; i < this.batchSize; i++) { event = channel.take(); logger.info("@@@@##判断event是否为空 {}", event); if (event != null) { this.sinkCounter.incrementEventDrainAttemptCount(); String body = new String(event.getBody()); String[] values = body.split(this.splitType); this.carbonWriter.write(values); logger.info("***********carbonWriter {}", this.carbonWriter); logger.info("@@@@##****打印出values {}", values); } else { result = Sink.Status.BACKOFF; break; } } transaction.commit(); this.sinkCounter.addToEventDrainSuccessCount(eventAttemptCounter); } catch (Exception ex) { throw new EventDeliveryException("Failed to process transaction", ex); } finally { transaction.close(); } return result; } public void configure(Context context) { this.batchSize = context.getInteger("sink.batchSize", Integer.valueOf(100)).intValue(); this.rollInterval = context.getLong("sink.rollInterval", Long.valueOf(60L)).longValue(); this.path = context.getString("sink.path", ""); String ziduan = context.getString("sink.fields"); String zdlx = context.getString("sink.fieldType"); this.splitType = context.getString("splitType", ","); this.dataFormat = new SimpleDateFormat("yyyyMMdd"); String[] zds = ziduan.split(","); String[] zdlxs = zdlx.split(","); Map map = new HashMap(); map.put("int", DataTypes.INT); map.put("string", DataTypes.STRING); map.put("date", DataTypes.DATE); map.put("timestamp", DataTypes.TIMESTAMP); map.put("boolean", DataTypes.BOOLEAN); map.put("long", DataTypes.LONG); map.put("double", DataTypes.DOUBLE); map.put("short", DataTypes.SHORT); map.put("null", DataTypes.NULL); map.put("byte", DataTypes.BYTE); List list = new ArrayList(); for (int i = 0; i < zds.length; i++) { list.add(new Field(zds[i], (DataType)map.get(zdlxs[i]))); } Field[] fields = new Field[list.size()]; list.toArray(fields); logger.info("@@@@数组内容 {}", fields[2]); this.schema = new Schema(fields); this.carbonWriterBuilder = CarbonWriter.builder().outputPath(this.path+"22222").withCsvInput(this.schema).writtenBy("SDK"); if (this.rollInterval == 0L) this.rollInterval = 60L; else { this.rollInterval = this.rollInterval; } if (this.sinkCounter == null) this.sinkCounter = new SinkCounter(getName()); } public void start() { logger.info("Starting {}...", this); this.sinkCounter.start(); super.start(); if (this.rollInterval > 0L) { this.rollService = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() .setNameFormat("rollingFileSink-roller-" + Thread.currentThread().getId() + "-%d").build()); this.rollService.scheduleAtFixedRate(new Runnable() { public void run() { CarbondataSink .logger.debug("Marking time to flush batch {}", "执行批处理"); CarbondataSink .this.shouldFlush = true; } } , this.rollInterval, this.rollInterval, TimeUnit.SECONDS); } else { logger.info("RollInterval is not valid, file rolling will not happen."); } logger.info("CarbondataSink {} started.", getName()); } public void stop() { logger.info("Ftp sink {} stopping...", getName()); this.sinkCounter.stop(); super.stop(); try { this.carbonWriter.close(); } catch (IOException e1) { e1.printStackTrace(); } if (this.rollInterval > 0L) { this.rollService.shutdown(); while (!this.rollService.isTerminated()) { try { this.rollService.awaitTermination(1L, TimeUnit.SECONDS); } catch (InterruptedException e) { logger.debug("Interrupted while waiting for roll service to stop. Please report this.", e); } } } logger.info("Ftp sink {} stopped. Event metrics: {}", getName(), this.sinkCounter); } }
3 打包上传flume-carbondata-sink相关jar包
解释一下这个sink:主要是能把csv或者,分隔的数据,依据carbondata sdk的方式,按天写入hdfs的目录,每天都写在不同的分区当中 上传的jar包有:自定义的flume-carbondata-sink包 以及pom里面涉及的carbondata相关jar包:比如carbondata-core,carbondata-sdk等,我这里不一一列举了
三、配置flume的conf
1 在 /opt/cloudera/parcels/CDH/lib/flume-ng/conf目录下,
vi carbondata.conf
输入以下内容:
ng.sources = kafkaSource ng.channels = spillablechannel ng.sinks = carbondata ng.sources.kafkaSource.type= org.apache.flume.source.kafka.KafkaSource ng.sources.kafkaSource.kafka.bootstrap.servers=cdh01:9092,cdh02:9092,cdh03:9092 ng.sources.kafkaSource.kafka.consumer.group.id=zhsq_ry33 ng.sources.kafkaSource.kafka.topics= pd_ry_txjl ng.sources.kafkaSource.batchSize=1000 ng.sources.kafkaSource.channels= spillablechannel ng.sources.kafkaSource.kafka.consumer.auto.offset.reset=latest ng.sources.kafkaSource.interceptors= i2 ng.sources.kafkaSource.interceptors.i2.type=com.iflytek.extracting.flume.interceptor.XyAccessInterceptorTC_SK $Builder #前面自定义的拦截器 ng.channels.spillablechannel.type =memory ng.channels.spillablechannel.memoryCapacity=20000 ng.channels.spillablechannel.transactionCapacity=20000 ng.channels.spillablechannel.capacity = 20000 ng.sinks.carbondata.type = com.iflytek.CarbonDataSink #自定义的CarbonDataSink ng.sinks.carbondata.sink.fields =id,name,age,time ng.sinks.carbondata.sink.fieldType = string,string,int,string # long以及其他格式都是支持的 ng.sinks.carbondata.sink.path = hdfs://cdh01:8020/user/hive/warehouse/carbon.store/carbondata/xytest #hdfs的目录,数据真正会写在/user/hive/warehouse/carbon.store/carbondata/xytest/yyyyMMdd目录里面 ng.sinks.carbondata.sink.rollInterval = 300 ng.sinks.carbondata.sink.batchSize = 1000 ng.sinks.carbondata.channel = spillablechannel
2 启动flume:
然后前台启动flume,进行调试: bin/flume-ng agent -n ng -c conf -f conf/carbondata.conf cdh版本的flume默认的日志打在 /var/log/flume/flume.log里面 查看数据已经接入kudu,并确定没问题可以使用后台提交: nohup bin/flume-ng agent -n ng -c conf -f conf/carbondata.conf & 任务停止: jcmd | grep hbase.conf # 找到含有 hbase.conf的任务 然后kill 任务id即可 此时打开hdfs的50070目录查看数据是否写入 比如hdfs://cdh01:8020/user/hive/warehouse/carbon.store/carbondata/xytest/20191210目录下是否有文件写入 然后在hive(spark sql)中建立外部表 建表语句如下 create external table table_flume stored as carbondata localtion '/user/hive/warehouse/carbon.store/carbondata/xytest/'
四、优化
这样利用flume sink会生成大量小文件,需要我们再建立一个carbondata 的分区表,每天定时把前一天的flume抽来的小文件不断抽取合并到昨天的分区表中:
val createTable= s""" | CREATE TABLE IF NOT EXISTS `table_day` ( | id string, | name string, | age int, | time string | ) | partitioned by(rq string) | stored as carbondata | """.stripMargin sparkSession.sql(createTable) //已经建立好了table_day安装rq(时间yyyyMMdd)作为分区 val data_flume =sparkSession.sql(s""" select * from table_flume where time <= '2019-12-10 23:59:59' and time >= '2019-12-10 00:00:00' """) //sparksql把数据保存到carbondata def writeTable2CarbondataPartition(sparkSession: SparkSession, datas: DataFrame, tableName: String, rq: String) { import org.apache.spark.sql.functions._ datas.coalesce(1).withColumn("RQ", lit(rq)) .write.format("carbondata") .option("BAD_RECORDS_LOGGER_ENABLE", "true") .option("BAD_RECORD_PATH", "hdfs://cdh01:8020/user/hive/warehouse/carbondata.db") .option("BAD_RECORDS_ACTION", "FORCE") // .saveAsTable(tableName) .insertInto(tableName) } //把昨天table_flume中的数据合并分区抽取到table_day这张分区表中 writeTable2CarbondataPartition(sparkSession,data_flume ,"table_day",rq) //然后创建视图 val createView= s""" | CREATE VIEW IF NOT EXISTS `table_view` | as | select id ,name,age,time from table_flume | union all | select id ,name,age,time from table_day | """.stripMargin sparkSession.sql(createView)
- 点赞 1
- 收藏
- 分享
- 文章举报
- flume篇4:flume把json数据写入elasticsearch(flume-elasticsearch-sink)
- Flume 1.7 源码分析(五)从Channel获取数据写入Sink
- 大数据小视角3:CarbonData,来自华为的中国力量
- 修改Flume-NG的hdfs sink解析时间戳源码大幅提高写入性能
- 没有该栏目数据 可能缓存文件(data/cache/inc_catalog_base.inc)没有更新请检查是否有写入权限
- python 读取单所有json数据写入mongodb(单个)
- php将从数据库查询到的数据转化为json格式,并写入json文件中
- IE9.0或者360下js(JavaScript、jQuery)不能正确执行(加载),按F12后执行正常;Firefox下ajax的success返回数据data(json、string)无法获取
- Ext.data.JsonStore使用HttpProxy加载数据时传递参数的两种方法
- Flume-ng HDFS Sink “丢数据”
- 修改Flume-NG的hdfs sink解析时间戳源码大幅提高写入性能
- unity中从txt文件中读取解析json数据&&unity中json数据写入txt文件
- Flume读取日志数据并写入到Kafka,ConsoleConsumer进行实时消费
- 使用data()方法在元素上存取移除JSON格式数据 10-6
- json文件读取并将数据写入文件
- 将pandas的DataFrame数据写入MySQL数据库 + sqlalchemy
- 解决Python自带的json不能序列化data,datetime类型数据问题
- XZ_Swift之加载本地json数据、将json数据写入磁盘、从磁盘读取json数据
- 实现一份数据的梦想, Apache CarbonData里程碑式版本1.3发布
- 从返回的json格式的data数据内随机取得n个