您的位置:首页 > 运维架构

使用Spark批量上传图片到HBase中并同时使用OpenCV提取SIFT特征值

2016-12-16 21:17 405 查看
最近正在学习利用Spark做图像的分类和检索实验,首先需要上传图像数据(保存在本地文件系统中)到HBase中,提取的图像特征是SIFT,借助OpenCV库提取,刚开始是写一个任务上传图片,然后再写一个任务提取HBase中图像的特征值,考虑到图片的序列化和反序列化会耗费大量的时间,且频繁的磁盘IO对时间消耗也很大,因此,将两个任务合并成一个任务处理,减少处理时间,同时实验过程中也遇到不少错误。

批量上传数据并提取SIFT特征

package com.fang.spark

import java.awt.image.{BufferedImage, DataBufferByte}
import java.io.ByteArrayInputStream
import javax.imageio.ImageIO
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.{SparkConf, SparkContext}
import org.opencv.core.{Core, CvType, Mat, MatOfKeyPoint}
import org.opencv.features2d.{DescriptorExtractor, FeatureDetector}

/**
* Created by hadoop on 16-11-15.
* 批量上传数据到HBase表中,并计算sift值
* 表结构为:
* imagesTable:(image:imageBinary,sift)
* RowKey设计为:
*/

object HBaseUpLoadImages {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
.setAppName("HBaseUpLoadImages").
setMaster("local[4]").
set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sparkContext = new SparkContext(sparkConf)
//TODO 单机测试情况下,图片文件太多,程序运行失败,打出hs_err_pid***_log日志,具体情况不明
val imagesRDD = sparkContext.binaryFiles("/home/fang/images/train/test")
System.loadLibrary(Core.NATIVE_LIBRARY_NAME)
// val imagesRDD = sparkContext.newAPIHadoopFile[Text, BufferedImage, ImmutableBytesWritable]("/home/fang/images/train/1")
// val columnFaminlys :Array[String] = Array("image")
//createTable(tableName,columnFaminlys,connection)
imagesRDD.foreachPartition {
iter => {
val hbaseConfig = HBaseConfiguration.create()
hbaseConfig.set("hbase.zookeeper.property.clientPort", "2181")
hbaseConfig.set("hbase.zookeeper.quorum", "fang-ubuntu,fei-ubuntu,kun-ubuntu")
val connection: Connection = ConnectionFactory.createConnection(hbaseConfig);
val tableName = "imagesTest"
val table: Table = connection.getTable(TableName.valueOf(tableName))
iter.foreach {
imageFile => {
//val bi: BufferedImage = ImageIO.read(new ByteArrayInputStream(imageFile._2.toArray()))
//println(bi.getColorModel)
val tempPath = imageFile._1.split("/")
val len = tempPath.length
val imageName = tempPath(len - 1)
//TODO 尝试直接获取BufferedImage数据,提升效率
val imageBinary: scala.Array[Byte] = imageFile._2.toArray()
val put: Put = new Put(Bytes.toBytes(imageName))
put.addColumn(Bytes.toBytes("image"), Bytes.toBytes("binary"), imageBinary)
put.addColumn(Bytes.toBytes("image"), Bytes.toBytes("path"), Bytes.toBytes(imageFile._1))
val sift = getImageSift(imageBinary)
if(!sift.isEmpty) {
put.addColumn(Bytes.toBytes("image"), Bytes.toBytes("sift"),sift.get)
}
table.put(put)
}
}
connection.close()
}
}

sparkContext.stop()
}

//获取图像的sift特征
def getImageSift(image: Array[Byte]):Option[Array[Byte]] = {
val bi: BufferedImage = ImageIO.read(new ByteArrayInputStream(image))
val test_mat = new Mat(bi.getHeight, bi.getWidth, CvType.CV_8U)
//java.lang.UnsupportedOperationException:
// Provided data element number (188000) should be multiple of the Mat channels count (3)
//更改CvType.CV_8UC3为CvType.CV_8U,解决上面错误
// val test_mat = new Mat(bi.getHeight, bi.getWidth, CvType.CV_8UC3)
val data = bi.getRaster.getDataBuffer.asInstanceOf[DataBufferByte].getData
// if(bi.getColorModel.getNumComponents==3) {
test_mat.put(0, 0, data)
val desc = new Mat
val fd = FeatureDetector.create(FeatureDetector.SIFT)
val mkp = new MatOfKeyPoint
fd.detect(test_mat, mkp)
val de = DescriptorExtractor.create(DescriptorExtractor.SIFT)
//提取sift特征
de.compute(test_mat, mkp, desc)
//判断是否有特征值
if(desc.rows()!=0) {
Some(Utils.serializeMat(desc))
}else{
None
}
}

}


需要序列化提取的Mat类型特征值

public class Utils{
public static byte[] serializeMat(Mat mat) {
ByteArrayOutputStream bos = new ByteArrayOutputStream();
try {
float[] data = new float[(int) mat.total() * mat.channels()];
mat.get(0, 0, data);
ObjectOutput out = new ObjectOutputStream(bos);
out.writeObject(data);
out.close();
// Get the bytes of the serialized object
byte[] buf = bos.toByteArray();
return buf;
} catch (IOException ioe) {
ioe.printStackTrace();
return null;
}
}
}


遇到的错误及解决

java.lang.UnsupportedOperationException:Provided data element number (188000) should be multiple of the Mat channels count (3)

更改CvType.CV_8UC3为CvType.CV_8U,解决上面错误

not serialization

没有使用foreachPartition出现下面的错误

org.apache.spark.SparkException: Task not serializable

UnsatisfiedLinkError: Native method not found: org.opencv.core.Mat.n_Mat:()J

忘记加载或加载失败(System.loadLibrary(Core.NATIVE_LIBRARY_NAME))

java.lang.UnsupportedOperationException: Mat data type is not compatible: 0

mat类型数据序列化时没有判断mat是否有值

运行的过程中需要加载opencv lib包

-Djava.library.path=/home/fang/BigDataSoft/opencv-2.4.13/release/lib

当上传过多的图像时程序会运行失败,初步推断是内存溢出,下一步解决问题
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: