Flink之异步I/O案例(一)HttpClient异步查询高德API
2020-04-04 07:13
3847 查看
文章目录
一、异步IO简单释义
- 数据在请求的时候,不是同步的。
- 请求丢线程池,多线程同时进行,并发能力强。
- 在访问外部API或者内部数据库时,可大大提高查询效率,但是也会占用大量的计算资源。
二、需求
- 输入数据类型:Json
- 将下列数据,提取出经纬度,通过查询高德API,关联上省市区
{"oid":"o001","uid":"u001","money":99.99,"longitude":115.690417, "latitude":36.239344} {"oid":"o002","uid":"u002","money":"1009","longitude":121.51417, "latitude":31.220616} {"oid":"o003","uid":"u003","money":"89.99","longitude":106.550464, "latitude":29.563761} {"oid":"o005","uid":"u005","money":"125.99","longitude":91.15476608, "latitude":29.67172188} {"oid":"o006","uid":"u006","money":"799","longitude":113.274379, "latitude":34.445122}
三、代码实现
1、OrderBean
class OrderBean { private String oid; private String uid; private double money; private double longitude; private double latitude; private String province; private String city; private String district; public OrderBean() { } public OrderBean(String oid, String uid, double money, double longitude, double latitude, String province, String city, String district) { this.oid = oid; this.uid = uid; this.money = money; this.longitude = longitude; this.latitude = latitude; this.province = province; this.city = city; this.district = district; } public String getOid() { return oid; } public void setOid(String oid) { this.oid = oid; } public String getUid() { return uid; } public void setUid(String uid) { this.uid = uid; } public double getMoney() { return money; } public void setMoney(double money) { this.money = money; } public double getLongitude() { return longitude; } public void setLongitude(double longitude) { this.longitude = longitude; } public double getLatitude() { return latitude; } public void setLatitude(double latitude) { this.latitude = latitude; } public String getProvince() { return province; } public void setProvince(String province) { this.province = province; } public String getCity() { return city; } public void setCity(String city) { this.city = city; } public String getDistrict() { return district; } public void setDistrict(String district) { this.district = district; } @Override public String toString() { return "OrderBean{" + "oid='" + oid + '\'' + ", uid='" + uid + '\'' + ", money=" + money + ", longitude=" + longitude + ", latitude=" + latitude + ", province='" + province + '\'' + ", city='" + city + '\'' + ", district='" + district + '\'' + '}'; } }
2、主代码
- 导入高效异步的HttpClient依赖
<dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpasyncclient</artifactId> <version>4.1.4</version> </dependency>
import org.apache.flink.streaming.api.datastream.AsyncDataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.concurrent.TimeUnit; public class HttpAsyncQueryGaoDe { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> lines = env.socketTextStream("linux01", 8888); SingleOutputStreamOperator<OrderBean> result = AsyncDataStream.unorderedWait( lines, //输入的数据流 new AsyncHttpQueryFunction(), //异步查询的Function实例 2000, //超时时间 TimeUnit.MILLISECONDS, //时间单位 10 //最大异步并发请求数量(并发的线程队列数) ); result.print(); env.execute("HttpAsyncQueryGaoDe"); } }
3、异步查询的Function实例
- 异步HttpClient官网
- HttpClient支持异步查询,自带连接池
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.async.ResultFuture; import org.apache.flink.streaming.api.functions.async.RichAsyncFunction; import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.methods.HttpGet; import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; import org.apache.http.impl.nio.client.HttpAsyncClients; import org.apache.http.util.EntityUtils; import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; import java.util.function.Consumer; import java.util.function.Supplier; public class AsyncHttpQueryFunction extends RichAsyncFunction<String, OrderBean> { private static final String key = "da02bb7d98d5104958087f144e3b4432"; CloseableHttpAsyncClient httpclient = null; @Override public void open(Configuration parameters) throws Exception { //创建一个异步的HttpClient连接池 //初始化异步的HttpClient RequestConfig requestConfig = RequestConfig.custom() .setSocketTimeout(3000) .setConnectTimeout(3000) //设置HttpClient连接池超时时间 .build(); httpclient = HttpAsyncClients.custom() .setMaxConnTotal(20) //连接池最大连接数 .setDefaultRequestConfig(requestConfig) .build(); httpclient.start(); } // 异步处理函数的主执行方法 @Override public void asyncInvoke(String line, ResultFuture<OrderBean> resultFuture) throws Exception { // try 处理异常的json解析 try { // 1、解析JSON,拿到经纬度信息 OrderBean orderBean = JSON.parseObject(line, OrderBean.class); double longitude = orderBean.getLongitude(); double latitude = orderBean.getLatitude(); // 创建httpGet请求 HttpGet httpGet = new HttpGet("https://restapi.amap.com/v3/geocode/regeo?&location="+ longitude+"," + latitude +"&key=" + key); // 2、提交httpclient异步请求,获取异步请求的future对象 Future<HttpResponse> future = httpclient.execute(httpGet, null);//callback是回调函数(也可通过回调函数拿结果) // 3、从成功的Future中取数据,返回的是orderBean CompletableFuture<OrderBean> orderBeanAndPCD = CompletableFuture.supplyAsync(new Supplier<OrderBean>() { @Override public OrderBean get() { // 用try包住,处理get不到值时的报错程序 try { HttpResponse response = future.get(); String province = null; String city = null; String district = null; if (response.getStatusLine().getStatusCode() == 200) { //拿出响应的实例对象 HttpEntity entity = response.getEntity(); //将对象toString String result = EntityUtils.toString(entity); JSONObject jsonObject1 = JSON.parseObject(result); JSONObject regeocodeObject = jsonObject1.getJSONObject("regeocode"); if (regeocodeObject != null && !regeocodeObject.isEmpty()) { JSONObject addObject = regeocodeObject.getJSONObject("addressComponent"); district = addObject.getString("district"); city = addObject.getString("city"); province = addObject.getString("province"); } } orderBean.setProvince(province); orderBean.setCity(city); orderBean.setDistrict(district); return orderBean; } catch (Exception e) { // 拿不到的返回null(还没有查询到结果,就从future取了) return null; } } }); // 4、将取出的数据,存入ResultFuture,返回给方法 orderBeanAndPCD.thenAccept(new Consumer<OrderBean>() { @Override public void accept(OrderBean resultOrderBean) { //complete()里面需要的是Collection集合,但是一次执行只返回一个结果 //所以集合使用singleton单例模式,集合中只装一个对象 resultFuture.complete(Collections.singleton(resultOrderBean)); } }); } catch (Exception e) { resultFuture.complete(Collections.singleton(null)); } } // 关闭连接 @Override public void close() throws Exception { httpclient.close(); } }
四、技术点
- Flink的异步I/O
- 异步HttpClient的使用
- Json格式文本的处理
- 高德地图API的调用
- 点赞
- 收藏
- 分享
- 文章举报
相关文章推荐
- Flink之异步I/O案例(二)异步查询MySQL数据库
- Apache HttpClient Fluent API:在后台线程中异步执行多个请求
- Jersey Client api 异步请求
- Chrome浏览器开发者扩展工具:Postman - REST Client 在线测试rest api 或者普通http web m模拟接口的最佳工具。
- AsyncHttpClient异步任务
- HttpClient异步,及连接池应用
- 服务查询的API(Weblogic)编写的client实例
- 用HttpClient类实现查询申通快递的功能
- Android OkHttpClient的get和post的同步和异步请求
- Jenkins系列:使用HttpClient操作Jenkins API
- 网络编程-HttpClient、异步http、Gson、多线程下载
- Android Asynchronous Http Client-Android异步网络请求客户端接口
- 07_android入门_採用HttpClient的POST方式、GET方式分别实现登陆案例
- 开源异步请求框架AndroidHttpClient的使用简介
- Httpclient 请求带Authorization(授权)的REST API 返回JSON数据
- Android Asynchronous Http Client--Android 开源的网络异步加载类
- httpclient中文API
- 高德地图模糊查询---JavaScript API
- 使用HTTP POST请求12306网站接口查询火车车次API
- Part2-HttpClient官方教程-Chapter5-流利的API