您的位置:首页 > 其它

以图搜图实现六之web 在线查询服务

2019-08-01 17:13 495 查看
版权声明:本文为博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/weixin_42435657/article/details/97762099
  1. 创建一个python flask 项目
  2. 根据vgg16模型,将图片转为特征 extract_cnn_vgg16_keras.py
from keras.applications.vgg16 import VGG16
from keras.applications.vgg16 import preprocess_input
from keras.preprocessing import image
from numpy import linalg as LA

import keras
import numpy as np

class VGGNet:
def __init__(self):
# weights: 'imagenet'
# pooling: 'max' or 'avg'
# input_shape: (width, height, 3), width and height should >= 48
self.input_shape = (224, 224, 3)
self.weight = 'imagenet'
self.pooling = 'max'
self.model = VGG16(weights = self.weight, input_shape = (self.input_shape[0], self.input_shape[1], self.input_shape[2]), pooling = self.pooling, include_top = False)
self.model.predict(np.zeros((1, 224, 224 , 3)))

'''
Use vgg16 model to extract features
Output normalized feature vector
'''
def extract_feat(self, img_path):
img = image.load_img(img_path, target_size=(self.input_shape[0], self.input_shape[1]))
img = image.img_to_array(img)
img = np.expand_dims(img, axis=0)
img = preprocess_input(img)
feat = self.model.predict(img)
norm_feat = feat[0]/LA.norm(feat[0])
return norm_feat
  1. uuid 生成工具类 strUtil.py
#-*-coding:utf-8-*-
import datetime
import random

class Pic_str:
def create_uuid(self): #生成唯一的图片的名称字符串,防止图片显示时的重名问题
nowTime = datetime.datetime.now().strftime("%Y%m%d%H%M%S");  # 生成当前时间
randomNum = random.randint(0, 100);  # 生成的随机整数n,其䶿0<=n<=100
if randomNum <= 10:
randomNum = str(0) + str(randomNum);
uniqueNum = str(nowTime) + str(randomNum);
return uniqueNum;

4.服务查询类 picture_upload_vgg.py
#encoding:utf-8
#!/usr/bin/env python
from argparse import ArgumentParser
import base64
from itertools import cycle, product
import json
import os
import pdb
from pprint import pformat, pprint
import random
import sys
from time import time, sleep
import time
from urllib.parse import quote_plus

from PIL import Image
from elasticsearch import Elasticsearch, helpers
import elasticsearch
from flask import Flask, render_template, jsonify, request, make_response, send_from_directory, abort
from more_itertools import chunked

import requests
from sklearn.neighbors import NearestNeighbors
from werkzeug.utils import secure_filename

from strUtil import Pic_str
import numpy as np

from keras.applications.vgg16 import VGG16
from keras.applications.vgg16 import preprocess_input
from keras.preprocessing import image
from numpy import linalg as LA
import keras
import tensorflow as tf
from keras.models import load_model

import math

from extract_cnn_vgg16_keras import VGGNet
from fdfs_client.client import Fdfs_client

app = Flask(name)
UPLOAD_FOLDER = ‘upload’
app.config[‘UPLOAD_FOLDER’] = UPLOAD_FOLDER
basedir = os.path.abspath(os.path.dirname(file))
ALLOWED_EXTENSIONS = set([‘png’, ‘jpg’, ‘JPG’, ‘PNG’, ‘gif’, ‘GIF’])

def allowed_file(filename):
return ‘.’ in filename and filename.rsplit(’.’, 1)[1] in ALLOWED_EXTENSIONS

@app.route(’/’)
@app.route(’/upload’)
def upload_test():
return render_template(‘up.html’)

#上传文件
@app.route(’/up_photo’, methods=[‘POST’], strict_slashes=False)
def api_upload():
file_dir = os.path.join(basedir, app.config[‘UPLOAD_FOLDER’])
if not os.path.exists(file_dir):
os.makedirs(file_dir)
f = request.files[‘photo’]

if f and allowed_file(f.filename):
fname = secure_filename(f.filename)
print(fname)
ext = fname.rsplit(’.’, 1)[1]
new_filename = Pic_str().create_uuid() + ‘.’ + ext

f.save(os.path.join(file_dir, new_filename))

print("upload_file_path" ,os.path.join(file_dir, new_filename))

#client = Fdfs_client('/mnt/lwf/client2.conf')
#pic_message =  client.upload_by_filename(os.path.join(file_dir, new_filename))
#print('pic_message' ,pic_message)
#query_img='http://ippic.zgg.com/'+ pic_message["Remote file_id"]

img_dic={}

keras.backend.clear_session()
model = VGGNet()

norm_feat = model.extract_feat(os.path.join(file_dir, new_filename))

#print("norm_feat " + norm_feat.tolist())

if(os.path.exists(os.path.join(file_dir, new_filename))):
os.remove(os.path.join(file_dir, new_filename))
print("删除图片成功")
else:
print("未删除图片")

img_name = new_filename.split(".")[0]

img_dic['_id']=img_name
img_dic['_source']={}
img_dic['_source']['_aknn_vector']=norm_feat.tolist()
img_dic['_source']['url'] =new_filename

img_json = json.dumps(img_dic)
print("img_json ",img_json)

#上传单个img_json 文档
body_data = {}
body_data['_index']='twitter_image'
body_data['_type'] ='twitter_image'
body_data['_aknn_uri']='aknn_models/aknn_model/twitter_image_search'
doc_data=[]
doc_data.append(img_dic);
body_data['_aknn_docs']=doc_data;

body_json = json.dumps(body_data)
print(body_json)

#上传格式
body = '''{
"_index": "
3ff7
twitter_images",
"_type": "twitter_image",
"_aknn_uri": "aknn_models/aknn_model/twitter_image_search",
"_aknn_docs": [
{
"_id": 1,
"_source": {
"_aknn_vector": [
0.12,
0.23
],
"url":"http://a.jpg"
}
}

]
}’’’

hostList = ['172.16.1.8','172.16.1.9','172.16.1.10']
hostIndex = random.randint(0,2)
host = hostList[hostIndex]

headers = {'Content-Type': 'application/json'}
code  = requests.post('http://'+host+':9200/_aknn_index',  headers=headers,data = body_json)
print('code' ,code)

#查询
get_url = 'http://'+host+':9200/twitter_image/twitter_image/'+img_name+'/_aknn_search?k1=10000&k2=10'
print('get_url' ,get_url)
req = requests.get(get_url)
print('req' ,req)

hits = req.json()["hits"]["hits"]
took_ms = req.json()["took"]
#query_img, neighbor_imgs = hits[0], hits[1:]
#query_img, neighbor_imgs = hits[0], hits[1:]
neighbor_imgs = hits[1:]
query_img=hits[0]
print(query_img,' ',neighbor_imgs)

#删除
#http://192.168.43.100:9200/twitter_image/twitter_image/1
delete_code = requests.delete('http://'+host+':9200/twitter_image/twitter_image/' +img_name)
print('删除成功 ' ,delete_code)

#file_dir = os.path.join(basedir, app.config['UPLOAD_FOLDER'])
#image_data = open(os.path.join(file_dir, '%s' % new_filename), "rb").read()
#print("image_data" ,image_data)
#response = make_response(image_data)
#response.headers['Content-Type'] = 'image/png'

# Render template.'file:///' +
return render_template(
"index.html",
es_index="twitter_image",
es_type="twitter_image",
took_ms=took_ms,
query_img_url=os.path.join(file_dir, new_filename),
query_img=query_img,
neighbor_imgs=neighbor_imgs)

#return jsonify({"success": 0, "msg": "上传成功"})

else:
return jsonify({“error”: 1001, “msg”: “上传失败”})

@app.route(’/download/string:filename’, methods=[‘GET’])
def download(filename):
if request.method == “GET”:
if os.path.isfile(os.path.join(‘upload’, filename)):
return send_from_directory(‘upload’, filename, as_attachment=True)
pass

@app.route(’/show/string:filename’, methods=[‘GET’])
def show_photo(filename):
file_dir = os.path.join(basedir, app.config[‘UPLOAD_FOLDER’])
if request.method == ‘GET’:
if filename is None:
pass
else:
image_data = open(os.path.join(file_dir, ‘%s’ % filename), “rb”).read()
response = make_response(image_data)
response.headers[‘Content-Type’] = ‘image/png’
return response
else:
pass

if name == ‘main’:
app.run( host=‘172.16.1.141’, debug=True)

  1. 网页端查询
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: