您的位置:首页 > 产品设计 > UI/UE

Spark Streaming + Kafka + Opencv + Face Recognizer + HDFS Sequence File + Mysql

2016-06-06 18:10 726 查看
<pre name="code" class="java">/**
* Created by lwc on 6/17/16.
*/

import java.io.*;
import java.sql.*;
import java.util.*;

import kafka.serializer.DefaultDecoder;
import kafka.serializer.StringDecoder;

import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.opencv.core.*;
import org.opencv.face.Face;
import org.opencv.face.FaceRecognizer;
import org.opencv.imgproc.Imgproc;
import scala.Tuple2;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;

import org.apache.hadoop.io.Text;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.opencv.imgcodecs.Imgcodecs;
import org.opencv.objdetect.CascadeClassifier;
import org.apache.spark.streaming.Durations;

class GlobleData implements Serializable {
private static final long serialVersionUID = 1L;
public Map<Integer, String> idToNameMapping;
//  public FaceRecognizer faceRecognizer;
//  public  Map<String, Mat> lableMat = new HashMap<String, Mat>();
public Map<String, String> lableMat = new HashMap<String, String>();
}

public class AppMatSeq {
static Map<Integer, String> idToNameMapping;
static FaceRecognizer faceRecognizer;
static MatOfInt labelsBuf;
static List<Mat> mats;
static Map<String, String> lableMat = new HashMap<String, String>();
static String fzString;
static GlobleData globleData = new GlobleData();

@SuppressWarnings("rawtypes")
public static void train() throws Exception {
String uri = "hdfs://10.75.161.88/newfaces.seq";
mats = new ArrayList<Mat>();
idToNameMapping = new HashMap<Integer, String>();
Configuration conf = new Configuration();
Path path = new Path(uri);
System.out.println("0");
SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(path));
System.out.println("1");
Map<Text, OutputStream> keyStream = new HashMap<Text, OutputStream>();
Text key = new Text();
Text value = new Text();
int count = 0;
while (reader.next(key, value)) {
if (!idToNameMapping.containsValue(key.toString().split("_")[0])) {
idToNameMapping.put(count++, key.toString().split("_")[0]);
}
if (key.toString().trim() != null && !keyStream.containsKey(key)) {
keyStream.put(new Text(key), new ByteArrayOutputStream(1024));
}
keyStream.get(key).write(value.getBytes(), 0, value.getLength());
}
Map<String, Integer> nameToId = new HashMap<String, Integer>();
for (Map.Entry entry : idToNameMapping.entrySet()) {
nameToId.put((String) entry.getValue(), (Integer) entry.getKey());
}
Mat mat;
ByteArrayOutputStream bs = null;
int counter = 0;
labelsBuf = new MatOfInt(new int[keyStream.size()]);
for (Map.Entry out : keyStream.entrySet()) {
bs = ((ByteArrayOutputStream) out.getValue());
bs.flush();//Imgcodecs.CV_LOAD_IMAGE_GRAYSCALE
mat = Imgcodecs.imdecode(new MatOfByte(bs.toByteArray()), Imgcodecs.CV_IMWRITE_JPEG_OPTIMIZE);
Mat matSave = Imgcodecs.imdecode(new MatOfByte(bs.toByteArray()), Imgcodecs.CV_LOAD_IMAGE_COLOR);
mats.add(mat.clone());
int labelId = nameToId.get(out.getKey().toString().split("_")[0]);
//  lableMat.put(out.getKey().toString().split("_")[0], matSave.clone());
lableMat.put(out.getKey().toString().split("_")[0], matToJson(matSave.clone()));
labelsBuf.put(counter++, 0, labelId);
}
IOUtils.closeStream(bs);
IOUtils.closeStream(reader);

faceRecognizer = Face.createFisherFaceRecognizer();
//         FaceRecognizer faceRecognizer = Face.createEigenFaceRecognizer();
//         FaceRecognizer faceRecognizer = Face.createLBPHFaceRecognizer();

faceRecognizer.train(mats, labelsBuf);
if (faceRecognizer == null) {
System.out.println("in the static after tain, face rec is null");
} else {
System.out.println("!!!!!!!!face rec is not null");
}
//  globleData.faceRecognizer = faceRecognizer;
globleData.idToNameMapping = idToNameMapping;
globleData.lableMat = lableMat;
}

@SuppressWarnings("serial")
public static void main(String[] args) throws Exception {
System.loadLibrary(Core.NATIVE_LIBRARY_NAME);//54
System.out.println("train before");
train();

System.out.println("train after");
String brokers = args[0];
String topics = args[1];

// Create context with a 2 seconds batch interval
SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaVideoData");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);

final Broadcast<GlobleData> bcVar = jsc.broadcast(globleData);
JavaStreamingContext jssc = new JavaStreamingContext(jsc, Durations.seconds(2));

// for graceful shutdown of the application ...
//        Runtime.getRuntime().addShutdownHook(new Thread() {
//            @Override
//            public void run() {
//                System.out.println("Shutting down streaming app...");
//                if (producer != null)
//                    producer.close();
//                jssc.stop(true, true);
//                System.out.println("Shutdown of streaming app complete.");
//            }
//        });

HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(",")));
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", brokers);
kafkaParams.put("group.id", "groupid");
kafkaParams.put("consumer.id", "consumerid");

// Create direct kafka stream with brokers and topics
JavaPairInputDStream<String, byte[]> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
byte[].class,
StringDecoder.class,
DefaultDecoder.class,
kafkaParams,
topicsSet
);

JavaDStream<String> content = messages.map(new Function<Tuple2<String, byte[]>, String>() {
//@Override
public String call(Tuple2<String, byte[]> tuple2) throws IOException {
System.loadLibrary(Core.NATIVE_LIBRARY_NAME);
if ((tuple2 == null) || (tuple2._2().length < 1000))
return null;

Mat image = new Mat(new Size(640, 480), 16);
image.put(0, 0, tuple2._2());
List<Mat> detectResults = detectFace(image);
if (detectResults.size() == 0)
return null;

Mat person = detectResults.get(0);
FaceRecognizer fz = Face.createFisherFaceRecognizer();
fz.load("/tmp/faceRec.yml");
GlobleData gd = bcVar.value();
Map<Integer, String> id2Name = gd.idToNameMapping;
Map<String, String> lm = gd.lableMat;

int[] label = new int[1];
double[] confidence = new double[1];
fz.predict(person, label, confidence);

fz = null;
System.gc();
System.out.println("confidence: " + confidence[0]);
int predictedLabel = label[0];

System.out.println("Predicted label: " + id2Name.get(predictedLabel));
System.out.println("**********");
System.out.println("**********");
System.out.println("**********");

try {
Class.forName("com.mysql.jdbc.Driver");
Connection connection = DriverManager.getConnection("jdbc:mysql://10.75.161.87/", "devnet", "devnet");
Statement statement = connection.createStatement();
statement.executeUpdate("use images;");

PreparedStatement preparedStatement = connection.prepareStatement("INSERT INTO faces VALUES (NULL, ?, ?, ?, ?, ?)");
Imgcodecs.imwrite("/tmp/1.jpg", image);
preparedStatement.setBinaryStream(1, new FileInputStream("/tmp/1.jpg"));
preparedStatement.setString(2, String.valueOf(System.currentTimeMillis()));
preparedStatement.setString(3, String.valueOf(confidence[0]));
preparedStatement.setString(4, id2Name.get(predictedLabel));
Imgcodecs.imwrite("/tmp/2.jpg", matFromJson(lm.get(id2Name.get(predictedLabel))));
preparedStatement.setBinaryStream(5, new FileInputStream("/tmp/2.jpg"));
preparedStatement.execute();
connection.close();
System.out.println("sql insert, name = " + id2Name.get(predictedLabel));
} catch (Exception e) {
System.out.println(e.toString());
}
return id2Name.get(predictedLabel);
}
});
content.count().print();

// Start the computation
jssc.start();
jssc.awaitTermination();
}

public static List<Mat> detectFace(Mat inputFrame) {
List<Mat> detectedElements = new ArrayList<Mat>();
Mat mRgba = new Mat();
Mat mGrey = new Mat();
inputFrame.copyTo(mRgba);
inputFrame.copyTo(mGrey);
MatOfRect results = new MatOfRect();
Imgproc.cvtColor(mRgba, mGrey, Imgproc.COLOR_BGR2GRAY);
Imgproc.equalizeHist(mGrey, mGrey);
CascadeClassifier cascadeClassifier =
new CascadeClassifier(GetResourceFilePath("/haarcascade_frontalface_alt.xml").toString());
cascadeClassifier.detectMultiScale(mGrey, results);
Rect[] classifiedElements = results.toArray();
for (Rect rect : classifiedElements) {
Mat face = new Mat(mGrey, rect);
Mat resizedImage = new Mat();
Imgproc.resize(face, resizedImage, new Size((double) 92, (double) 112));
face.release();
detectedElements.add(resizedImage);
}
mRgba.release();
mGrey.release();
results.release();
System.out.println("Get face: " + detectedElements.size());
return detectedElements;
}

public static String GetResourceFilePath(String filename) {
System.loadLibrary(Core.NATIVE_LIBRARY_NAME);
InputStream inputStream = null;
OutputStream outputStream = null;
String tempFilename = "/tmp" + filename;
try {
// read this file into InputStream
inputStream = BigData.KafkaSpark.App.class.getResourceAsStream(filename);
if (inputStream == null)
System.out.println("empty streaming");
// write the inputStream to a FileOutputStream
outputStream =
new FileOutputStream(tempFilename);

int read;
byte[] bytes = new byte[102400];

while ((read = inputStream.read(bytes)) != -1) {
outputStream.write(bytes, 0, read);
//                System.out.println("read bytes is " + Integer.toString(read));
}
outputStream.flush();

System.out.println("Load XML file, Done!");

} catch (IOException e) {
e.printStackTrace();
} finally {
if (inputStream != null) {
try {
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (outputStream != null) {
try {
// outputStream.flush();
outputStream.close();
} catch (IOException e) {
e.printStackTrace();
}

}

}
return tempFilename;
}

public static String matToJson(Mat mat) {
JsonObject obj = new JsonObject();

if (mat.isContinuous()) {
int cols = mat.cols();
int rows = mat.rows();
int elemSize = (int) mat.elemSize();

byte[] data = new byte[cols * rows * elemSize];

mat.get(0, 0, data);

obj.addProperty("rows", mat.rows());
obj.addProperty("cols", mat.cols());
obj.addProperty("type", mat.type());

// We cannot set binary data to a json object, so:
// Encoding data byte array to Base64.
String dataString = new String(Base64.encodeBase64(data));

obj.addProperty("data", dataString);

Gson gson = new Gson();
String json = gson.toJson(obj);

return json;
}
return "{}";
}

public static Mat matFromJson(String json) {
JsonParser parser = new JsonParser();
JsonObject JsonObject = parser.parse(json).getAsJsonObject();

int rows = JsonObject.get("rows").getAsInt();
int cols = JsonObject.get("cols").getAsInt();
int type = JsonObject.get("type").getAsInt();

String dataString = JsonObject.get("data").getAsString();
byte[] data = Base64.decodeBase64(dataString.getBytes());

Mat mat = new Mat(rows, cols, type);
mat.put(0, 0, data);

return mat;
}
}



import java.io.*;
import java.sql.*;
import java.util.*;

import kafka.serializer.DefaultDecoder;
import kafka.serializer.StringDecoder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.opencv.core.*;
import org.opencv.face.Face;
import org.opencv.face.FaceRecognizer;
import org.opencv.imgproc.Imgproc;
import scala.Tuple2;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

/*import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;*/

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka.KafkaUtils;
import org.opencv.imgcodecs.Imgcodecs;
import org.opencv.objdetect.CascadeClassifier;
import org.apache.spark.streaming.Durations;

public class App {
static Producer<String, String> producer;
static Map<Integer, String> idToNameMapping;
static FaceRecognizer faceRecognizer;
static MatOfInt labelsBuf;
static List<Mat> mats;
static Map<String, Mat> lableMat = new HashMap<String, Mat>();

@SuppressWarnings("serial")
public static class ConvertToWritableTypes implements PairFunction<Tuple2<String, byte[]>, Text, BytesWritable> {
@SuppressWarnings({"unchecked", "rawtypes"})
public Tuple2<Text, BytesWritable> call(Tuple2<String, byte[]> record) {
return new Tuple2(new Text(record._1), new BytesWritable(record._2));
}
}
public static void train(){
try {
System.loadLibrary(Core.NATIVE_LIBRARY_NAME);

String uri = "hdfs://10.75.161.88/stars.seq";
//            String uri = "hdfs://10.75.161.242:9000/all.seq";
mats = new ArrayList<Mat>();
idToNameMapping = new HashMap<Integer, String>();
Configuration conf = new Configuration();
Path path = new Path(uri);
SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(path));
//            System.out.println("1");
Map<Text, OutputStream> keyStream = new HashMap<Text, OutputStream>();
Text key = new Text();
Text value = new Text();
int count = 0;
while (reader.next(key, value)) {
if (!idToNameMapping.containsValue(key.toString().split("_")[0])) {
idToNameMapping.put(count++, key.toString().split("_")[0]);
}
if (key.toString().trim() != null && !keyStream.containsKey(key)) {
keyStream.put(new Text(key), new ByteArrayOutputStream(1024));
}
keyStream.get(key).write(value.getBytes(), 0, value.getLength());
}
Map<String, Integer> nameToId = new HashMap<String, Integer>();
for (Map.Entry entry : idToNameMapping.entrySet()) {
nameToId.put((String) entry.getValue(), (Integer) entry.getKey());
}
Mat mat;
ByteArrayOutputStream bs = null;
int counter = 0;
labelsBuf = new MatOfInt(new int[keyStream.size()]);
for (Map.Entry out : keyStream.entrySet()) {
bs = ((ByteArrayOutputStream) out.getValue());
bs.flush();//Imgcodecs.CV_LOAD_IMAGE_GRAYSCALE
mat = Imgcodecs.imdecode(new MatOfByte(bs.toByteArray()), Imgcodecs.CV_IMWRITE_JPEG_OPTIMIZE);
Mat matSave = Imgcodecs.imdecode(new MatOfByte(bs.toByteArray()), Imgcodecs.CV_LOAD_IMAGE_COLOR);
mats.add(detectElements(mat.clone()).get(0));
Imgcodecs.imwrite("/tmp/" + new Random().nextInt(100) + ".jpg",detectElements(mat.clone()).get(0));
int labelId = nameToId.get(out.getKey().toString().split("_")[0]);
lableMat.put(out.getKey().toString().split("_")[0], matSave.clone());
labelsBuf.put(counter++, 0, labelId);
}
IOUtils.closeStream(bs);
IOUtils.closeStream(reader);

faceRecognizer = Face.createFisherFaceRecognizer();
//         FaceRecognizer faceRecognizer = Face.createEigenFaceRecognizer();
//         FaceRecognizer faceRecognizer = Face.createLBPHFaceRecognizer();

faceRecognizer.train(mats, labelsBuf);
if(faceRecognizer == null) {
System.out.println("in the static after tain, face rec is null");
} else {
System.out.println("!!!!!!!!face rec is not null");
}
} catch (Exception e) {
System.out.println(e.toString());
System.exit(100);
}
}

@SuppressWarnings("serial")
public static void main(String[] args) throws Exception{
System.loadLibrary(Core.NATIVE_LIBRARY_NAME);//54
System.out.println("train after");
String brokers = "10.75.161.54:9092";
//        String brokers = "172.16.1.11:9092";
String Stringbrokers = "10.75.161.88:9092";
//    	String brokers = "10.140.92.221:9092";
String topics = "ddp_video_source";

// Create context with a 2 seconds batch interval
SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaVideoData");
sparkConf.setMaster("local");
sparkConf.set("spark.testing.memory", "2147480000");

final JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));

// for graceful shutdown of the application ...
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
System.out.println("Shutting down streaming app...");
if (producer != null)
producer.close();
jssc.stop(true, true);
System.out.println("Shutdown of streaming app complete.");
}
});

HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topics.split(",")));
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", brokers);
kafkaParams.put("group.id", "groupid");
kafkaParams.put("consumer.id", "consumerid");
//        kafkaParams.put("bootstrap.servers", "cdhmanage");
kafkaParams.put("zookeeper.connect", "cdh3:2181/kafka");

// Create direct kafka stream with brokers and topics
JavaPairInputDStream<String, byte[]> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
byte[].class,
StringDecoder.class,
DefaultDecoder.class,
kafkaParams,
topicsSet
);

train();
JavaDStream<String> content = messages.map(new Function<Tuple2<String, byte[]>, String>() {
//@Override
public String call(Tuple2<String, byte[]> tuple2) throws IOException {
System.loadLibrary(Core.NATIVE_LIBRARY_NAME);
String lable = null;
if (tuple2 == null) {
System.out.println("null");
return null;
} else {
if (tuple2._2().length > 1000) {
Mat image = new Mat(new Size(640, 480), 16);
image.put(0, 0, tuple2._2());
//                        Imgcodecs.imwrite("/tmp/test"+ new Random().nextInt(100) +".jpg", image);
System.out.println("tuple2._2().length > 1000");
if (detectElements(image.clone()).size() > 0) {

Mat person = detectElements(image.clone()).get(0);
System.out.println(person.width() + "person.width");
System.out.println(person.height() + "person.height");
if(faceRecognizer == null) {
System.out.println("after tain, face rec is null");
}
if(person == null) {
System.out.println("person is null");
}
int predictedLabel = faceRecognizer.predict(person);
//                            Imgcodecs.imwrite("/home/test/person"+ new Random().nextInt(100) +".pgm", person);
System.out.println("Predicted label: " + idToNameMapping.get(predictedLabel));
System.out.println("**********");
System.out.println("**********");
System.out.println("**********");
System.out.println("**********");
try {
Class.forName("com.mysql.jdbc.Driver");
Connection connection = DriverManager.getConnection("jdbc:mysql://localhost/", "root", "root");
Statement statement = connection.createStatement();
statement.executeUpdate("CREATE DATABASE IF NOT EXISTS images;");
statement.executeUpdate("use images;");
statement.executeUpdate("CREATE TABLE IF NOT EXISTS faces (\n" +
"  id    INT          NOT NULL AUTO_INCREMENT,\n" +
"  originalImage MediumBlob  NOT NULL,\n" +
"  timeLabel VARCHAR(100) NOT NULL,\n" +
"  matchedImage MediumBlob         NOT NULL,\n" +
"  PRIMARY KEY (id)\n" +
")\n" +
"  ENGINE = InnoDB;");
PreparedStatement preparedStatement = connection.prepareStatement("INSERT INTO faces VALUES (NULL, ?, ?, ?)");
Imgcodecs.imwrite("/tmp/1.jpg", image);
preparedStatement.setBinaryStream(1, new FileInputStream("/tmp/1.jpg"));
//                                new BufferedInputStream(new FileInputStream(new File("")));
preparedStatement.setString(2, "Time:" + System.nanoTime() + ", name:" + idToNameMapping.get(predictedLabel));

/*for (Map.Entry kv: idToNameMapping.entrySet()
) {
System.out.println(kv.getKey().toString() + " id");
System.out.println(kv.getValue().toString() + " value");
}
for (Map.Entry kv: lableMat.entrySet()
) {
System.out.println(kv.getKey() + " key");
System.out.println("/tmp/value");
Imgcodecs.imwrite("/tmp/" + kv.getKey() + ".pgm", (Mat)kv.getValue());
}*/
Imgcodecs.imwrite("/tmp/2.jpg", lableMat.get(idToNameMapping.get(predictedLabel)));
preparedStatement.setBinaryStream(3, new FileInputStream("/tmp/2.pgm"));
preparedStatement.execute();

connection.close();
System.out.println("sql insert, name = " + idToNameMapping.get(predictedLabel));
}
catch (Exception e) {
System.out.println(e.toString());
}

} else {
System.out.println("NO person");

}
} else {
System.out.println("tuple2._2().length < 1000");
}
return lable;
}
}
});
content.count().print();

// Start the computation
jssc.start();
jssc.awaitTermination();
}

public static List<Mat> detectElements(Mat inputFrame) {
//        Imgcodecs.imwrite("/tmp/" + new Random().nextInt(100) + ".pgm", inputFrame);
System.loadLibrary(Core.NATIVE_LIBRARY_NAME);
List<Mat> detectedElements = new ArrayList<Mat>(10);
Mat mRgba = new Mat();
Mat mGrey = new Mat();
inputFrame.copyTo(mRgba);
inputFrame.copyTo(mGrey);
MatOfRect results = new MatOfRect();
Imgproc.cvtColor( mRgba, mGrey, Imgproc.COLOR_BGR2GRAY);
Imgproc.equalizeHist( mGrey, mGrey );
CascadeClassifier cascadeClassifier =
new CascadeClassifier(GetResourceFilePath("/haarcascade_frontalface_alt.xml"));
cascadeClassifier.detectMultiScale(mGrey, results);
Rect[] classifiedElements = results.toArray();
System.out.println("Dectected person: " + classifiedElements.length);

for (Rect rect : classifiedElements) {
// and adds it to the
Mat convert = new Mat();
Mat face = new Mat(mRgba.clone(), rect);
Imgproc.cvtColor(face, face, Imgproc.COLOR_BGR2GRAY);
face.convertTo(convert, Imgproc.COLOR_BGR2GRAY);
detectedElements.add(resizeFace(convert));
//            Imgcodecs.imwrite("/tmp/face" + new Random().nextInt(10)+ ".pgm", resizeFace(convert));
}
System.out.println("Get fave: " + detectedElements.size());
return detectedElements;
}
public static Mat resizeFace(Mat originalImage) {
System.loadLibrary(Core.NATIVE_LIBRARY_NAME);
Mat resizedImage = new Mat();
Imgproc.resize(originalImage, resizedImage, new Size((double)92,(double)112));
return resizedImage;
}

public static String GetResourceFilePath(String filename) {

InputStream inputStream = null;
OutputStream outputStream = null;
String tempFilename = "/tmp" + filename;
try {
// read this file into InputStream
inputStream = App.class.getResourceAsStream(filename);
if (inputStream == null)
System.out.println("empty streaming");
// write the inputStream to a FileOutputStream
outputStream =
new FileOutputStream(tempFilename);

int read;
byte[] bytes = new byte[102400];

while ((read = inputStream.read(bytes)) != -1) {
outputStream.write(bytes, 0, read);
//                System.out.println("read bytes is " + Integer.toString(read));
}
outputStream.flush();

System.out.println("Load XML file, Done!");

} catch (IOException e) {
e.printStackTrace();
} finally {
if (inputStream != null) {
try {
inputStream.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if (outputStream != null) {
try {
// outputStream.flush();
outputStream.close();
} catch (IOException e) {
e.printStackTrace();
}

}

}
return tempFilename;
}
}
package kafkaCamera;

/**
* Created by lwc on 5/31/16.
*/

import java.io.UnsupportedEncodingException;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.Future;

import org.apache.kafka.clients.producer.*;
import org.opencv.core.Core;
import org.opencv.core.Size;
import org.opencv.core.Mat;
import org.opencv.videoio.VideoCapture;

public class KafkaDistributor implements Runnable {

private boolean tracking = true;
private String brokerUrl = "10.75.161.54:9092";
private String topic = "ddp_video_source";

public void Init(String cameraUrlIn, String brokerUrlIn, String topicIn) {
cameraUrl = cameraUrlIn;
brokerUrl = brokerUrlIn;
topic = topicIn;
}

public void run() {
boolean isOpen = false;
VideoCapture capture = null;

System.loadLibrary(Core.NATIVE_LIBRARY_NAME);

//        try {
//            capture = new VideoCapture();
//            isOpen = capture
//                    .open(cameraUrl);
//        } catch (Exception e) {
//            e.printStackTrace();
//        }
int count = 0;
while (!isOpen) {
capture = new VideoCapture();
isOpen = capture
.open(0);
System.out.println("Try to open times: " + ++count);
}

if (!isOpen) {
System.out.println("not open the stream!");
return;
}

Mat frame = new Mat(new Size(640, 480), 16);//new Size(640, 480), 16

Properties props = new Properties();
props.put("bootstrap.servers", brokerUrl);
props.put("metadata.broker.list", brokerUrl);
props.put("acks", "all");
props.put("client.id", "DemoProducer");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer",
"org.apache.kafka.common.serialization.ByteArraySerializer");

Producer<String, byte[]> producer = new KafkaProducer<String, byte[]>(
props);

Future<RecordMetadata> a = null;

long frameCount = 0;

while (tracking) {

capture.read(frame);
//            org.opencv.imgcodecs.Imgcodecs.imwrite("/tmp/camera" + new Random().nextInt(100) + ".pgm", frame);
// frameCount;
byte[] frameArray = new byte[((int) frame.total() * frame
.channels())];
frame.get(0, 0, frameArray);

System.out.println("FrameSize:" + frameArray.length);
System.out.println("Mat:height " + frame.height());
System.out.println("Mat: width " + frame.width());
System.out.println("channels: " + frame.channels());
System.out.println("type: " + frame.type());
//			Mat formatFram = new Mat();
//			frame.convertTo(formatFram, 0);
//			frame = formatFram;
//			System.out.println("FrameSize:" + "");

//a = producer.send(new ProducerRecord<String, byte[]>("ddp_video_source", Long
//		.toString(frameCount), frameArray));

a = producer.send(new ProducerRecord<String, byte[]>(topic, Long
.toString(frameCount), frameArray));

System.out.println("Send one frame" + a.isDone());
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

producer.close();
}

public static void main(String[] args) throws UnsupportedEncodingException {

KafkaDistributor distributor = new KafkaDistributor();
//	distributor.Init(URLDecoder.decode(args[0], "UTF-8"), args[1], args[2]);
//        distributor.Init(args[0], args[1], args[2]);
Thread producerProcess = new Thread(distributor);
producerProcess.start();

}

}

package opencvImageSeq;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;

import java.io.*;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;

/**
* Created by lwc on 5/23/16.
*/
public class ImageSeqWriter {

public static void main(String[] args) throws Exception {
File inputDir = new File("/home/test/faceData");
if(!inputDir.isDirectory()) {
throw new Exception("input dir is wrong");
}
File[] inputFiles = inputDir.listFiles();
List<String> imageNames = new ArrayList<>();
InputStream inputStream = null;

//        String uri = "hdfs://localhost:9000/all.seq";
String uri = "hdfs://10.75.161.88/stars.seq";
//        String uri = "hdfs://10.75.161.242:9000/all.seq";
Configuration conf = new Configuration();
//        FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path path = new Path(uri);
Text key;
Text value;
//        SequenceFile.Writer writer;
//        writer = SequenceFile.createWriter(fs, conf, path,key.getClass(), value.getClass());
SequenceFile.Writer writer = SequenceFile.createWriter(conf,
SequenceFile.Writer.file(path), SequenceFile.Writer.keyClass(Text.class),
SequenceFile.Writer.valueClass(Text.class));
for (File file: inputFiles) {
inputStream = new BufferedInputStream(new FileInputStream(file));
String imageName =file.getName();
imageNames.add(imageName);
key = new Text(imageName);
value = new Text();
byte[] buffer = new byte[1024];

while ((inputStream.read(buffer))!= -1) {
value.set(buffer);
writer.append(key, value);//将每条记录追加到SequenceFile.Writer实例的末尾
value.clear();
}
}
for (String name: imageNames
) {
System.out.println(name);
}
IOUtils.closeStream(inputStream);
IOUtils.closeStream(writer);
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息