Parquet 读写
2016-06-08 17:52
381 查看
write and read
MessageType schema = MessageTypeParser.parseMessageType("message Pair {\n" + " required binary left (UTF8);\n" + " required binary right (UTF8);\n" + "}"); GroupFactory factory = new SimpleGroupFactory(schema); Group group = factory.newGroup().append("left","L").append("right","R"); Path path = new Path("data.parquet"); Configuration configuration = new Configuration(); GroupWriteSupport writeSupport = new GroupWriteSupport(); writeSupport.setSchema(schema,configuration); ParquetWriter<Group> writer = new ParquetWriter<Group>(path,writeSupport, ParquetWriter.DEFAULT_COMPRESSION_CODEC_NAME, ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, /* dictionary page size */ ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED, ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED, ParquetProperties.WriterVersion.PARQUET_1_0, configuration ); writer.write(group); writer.close(); GroupReadSupport readSupport = new GroupReadSupport(); ParquetReader<Group> reader = new ParquetReader<Group>(path,readSupport); Group result = reader.read(); System.out.println(result);
读取比写入简单,不需要指定schema,以及存储信息。
write Avro record
以GenericRecord形式写入:Schema schema = new Schema.Parser().parse(new File("/data/workspace/hadoop/src/main/resources/Stock.avsc")); List<String> list = Files.lines(new File("/data/workspace/hadoop/src/main/resources/stocks.txt").toPath()).collect(Collectors.toList()); Path path = new Path("stock_record.parquet"); AvroParquetWriter<GenericRecord> writer = new AvroParquetWriter<GenericRecord>(path,schema); list.stream().forEach(s -> { String[] arrays = s.split(","); GenericRecord record = new GenericData.Record(schema); record.put("symbol",arrays[0]); record.put("date",arrays[1]); record.put("open",Double.valueOf(arrays[2])); record.put("high",Double.valueOf(arrays[3])); record.put("low",Double.valueOf(arrays[4])); record.put("close",Double.valueOf(arrays[5])); record.put("volume",Integer.valueOf(arrays[6])); record.put("adjClose",Double.valueOf(arrays[7])); try { writer.write(record); } catch (IOException e) { e.printStackTrace(); } }); writer.close();
在使用Avro时,还可用先生成java model的方式进行操作,在写入Parquet时也是可以的。
File input = new File("/data/workspace/hadoop/src/main/resources/stocks.txt"); Path out = new Path("stock.parquet"); AvroParquetWriter<Stock> writer = new AvroParquetWriter<>(out,Stock.SCHEMA$, CompressionCodecName.SNAPPY, ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.DEFAULT_PAGE_SIZE, true); for (Stock stock : AvroStockUtils.fromCsvFile(input)){ writer.write(stock); } //一定要调用此方法,不然不会写数据 writer.close(); return 0;
读取方法一:
AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(new Path("stock_record.parquet")); GenericRecord record ; while ((record = reader.read())!= null){ System.out.println(record); }
读取方法二:
Path path = new Path("stock.parquet"); AvroParquetReader<Stock> reader = new AvroParquetReader<>(path); Stock stock; while ((stock = reader.read()) != null) { System.out.println(stock); } reader.close();
这个两个方法是可以互读的,它们生成的Parquet file 的schema是一样的。
message com.hadoop2.data.Stock { required binary symbol (UTF8); required binary date (UTF8); required double open; required double high; required double low; required double close; required int32 volume; required double adjClose; }
那么反转也应该是一致的!
相关文章推荐
- TPC-H测试数据表生成,以及在Impala中的使用
- spark 1.6 下parquet vs orc
- spark partition discovery
- Parquet
- Two ways to load mysql tables into hdfs via spark
- How-to: use spark to suport query across mysql tables and hbase tables
- 基于Spark DataFrame的数据仓库框架
- Hive中配置Parquet(CDH4.3)
- Parquet性能测试调优及其优化建议
- Parquet性能测试之项目实践中应用测试
- SPARK命令行读取parquet数据
- 基于spark2.0整合spark-sql + mysql + parquet + HDFS
- RC ORC Parquet 格式比较和性能测试
- 4.sqoop RDBMS与Hive数据互导
- 关于parquet
- 大数据IMF传奇行动绝密课程第64课:Spark SQL下Parquet的数据切分和压缩内幕详解
- 基于spark2.0整合spark-sql + mysql + parquet + HDFS
- parquet.hadoop 狂打日志,不受控制
- Hive扩展功能(一)--Parquet
- Hive文件存储格式