您的位置:首页 > 产品设计 > UI/UE

Parquet 读写

2016-06-08 17:52 381 查看

write and read

MessageType schema = MessageTypeParser.parseMessageType("message Pair {\n" +
" required binary left (UTF8);\n" +
" required binary right (UTF8);\n" +
"}");

GroupFactory factory = new SimpleGroupFactory(schema);

Group group = factory.newGroup().append("left","L").append("right","R");

Path path = new Path("data.parquet");

Configuration configuration = new Configuration();
GroupWriteSupport writeSupport = new GroupWriteSupport();

writeSupport.setSchema(schema,configuration);

ParquetWriter<Group> writer = new ParquetWriter<Group>(path,writeSupport,
ParquetWriter.DEFAULT_COMPRESSION_CODEC_NAME,
ParquetWriter.DEFAULT_BLOCK_SIZE,
ParquetWriter.DEFAULT_PAGE_SIZE,
ParquetWriter.DEFAULT_PAGE_SIZE, /* dictionary page size */
ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED,
ParquetWriter.DEFAULT_IS_VALIDATING_ENABLED,
ParquetProperties.WriterVersion.PARQUET_1_0,
configuration
);

writer.write(group);
writer.close();

GroupReadSupport readSupport = new GroupReadSupport();
ParquetReader<Group> reader = new ParquetReader<Group>(path,readSupport);

Group result = reader.read();
System.out.println(result);


读取比写入简单,不需要指定schema,以及存储信息。

write Avro record

以GenericRecord形式写入:

Schema schema = new Schema.Parser().parse(new File("/data/workspace/hadoop/src/main/resources/Stock.avsc"));

List<String> list = Files.lines(new File("/data/workspace/hadoop/src/main/resources/stocks.txt").toPath()).collect(Collectors.toList());

Path path = new Path("stock_record.parquet");

AvroParquetWriter<GenericRecord> writer = new AvroParquetWriter<GenericRecord>(path,schema);
list.stream().forEach(s -> {

String[] arrays = s.split(",");
GenericRecord record = new GenericData.Record(schema);
record.put("symbol",arrays[0]);
record.put("date",arrays[1]);
record.put("open",Double.valueOf(arrays[2]));
record.put("high",Double.valueOf(arrays[3]));
record.put("low",Double.valueOf(arrays[4]));
record.put("close",Double.valueOf(arrays[5]));
record.put("volume",Integer.valueOf(arrays[6]));
record.put("adjClose",Double.valueOf(arrays[7]));

try {
writer.write(record);
} catch (IOException e) {
e.printStackTrace();
}

});

writer.close();


在使用Avro时,还可用先生成java model的方式进行操作,在写入Parquet时也是可以的。

File input = new File("/data/workspace/hadoop/src/main/resources/stocks.txt");

Path out = new Path("stock.parquet");

AvroParquetWriter<Stock> writer = new AvroParquetWriter<>(out,Stock.SCHEMA$,
CompressionCodecName.SNAPPY,
ParquetWriter.DEFAULT_BLOCK_SIZE,
ParquetWriter.DEFAULT_PAGE_SIZE,
true);

for (Stock stock : AvroStockUtils.fromCsvFile(input)){
writer.write(stock);
}
//一定要调用此方法,不然不会写数据
writer.close();

return 0;


读取方法一:

AvroParquetReader<GenericRecord> reader = new AvroParquetReader<GenericRecord>(new Path("stock_record.parquet"));

GenericRecord record ;

while ((record = reader.read())!= null){
System.out.println(record);
}


读取方法二:

Path path = new Path("stock.parquet");

AvroParquetReader<Stock> reader = new AvroParquetReader<>(path);

Stock stock;
while ((stock = reader.read()) != null) {
System.out.println(stock);
}

reader.close();


这个两个方法是可以互读的,它们生成的Parquet file 的schema是一样的。

message com.hadoop2.data.Stock {
required binary symbol (UTF8);
required binary date (UTF8);
required double open;
required double high;
required double low;
required double close;
required int32 volume;
required double adjClose;
}


那么反转也应该是一致的!
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  parquet