Java8学习计划--关于多核多线程并发编程-自定义Future并且增加监听器的实现
2018-03-09 17:16
417 查看
零零散散接近一个月的课余时间,学完Java8InAction和Guava,感触很多,收获也很大,特别开心,接下来会利用空余时间学习Spark,希望自己在技术上慢慢积累,越来越从容。对于Java8 最大的改变是lambda表达式 Collecotors CompletableFutures等 Funtional Programing.的思想真的很强大自定义Future的实现增加监听器 ,写完后更加理解明白guava中的Future的设计package com.company.LambdaExpressions.Futures;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
* Created by mengxiaopeng on 2018/3/9.
* com.company.LambdaExpressions.Futures
* MyFutureInAction2 会在MyFutureInAction基础上注册一个Listen监听 当存在值后通知调用者线程
* 好处:不阻塞主线程 Future执行完后会通知调用者 不用采用while方式获取Callable的返回值
*/
public class MyFutureInAction2 {
public static void main(String[] args) {
//######测试自定义 submit()
Future<String> submitFuture = submit(() -> {
try {
Thread.sleep(10000);
return "Success";
} catch (InterruptedException e) {
e.printStackTrace();
return "Error";
}
});
submitFuture.AddListener(new ListenBack<String>() {
@Override
public void onSuccess(String value) {
System.out.println("Get Result By Listener..Value=["+value+"]");
}
@Override
public void OnFailed(Throwable cause) {
cause.printStackTrace();
}
});
System.out.println("3==main=no==Blocker===");
//上面执行时 不需要将主线程Block住 因为执行submit时 不是守护进程
//但是对于Java 8 CompletableFuture.supplyAsync 执行时是守护进程 如果主线程一直在 不用采用其他线程池或者Join 可以获取执行的返回值
//否则需要采用其他的线程池或者Join 保证CompletableFuture.supplyAsync 在执行过程中主线程不会提前执行完
}
/**
* Executors.newCachedThreadPool().submit()
* 自定义实现了上面的功能
* @param callable
* @param <T>
* @return 更加理解Future底层采用的方式
*/
private static <T> Future<T> submit(Callable<T> callable) {
AtomicReference<T> result = new AtomicReference<T>();
AtomicBoolean isFlag = new AtomicBoolean(false);
//##先初始化Future
Future<T> future = new Future<T>() {
//后续通过Future.AddListener 回调函数传进来
private ListenBack<T> listenBack;
@Override
public T get() {
T t = result.get();
return t;
}
@Override
public boolean isDone() {
return isFlag.get();
}
@Override
public void AddListener(ListenBack<T> listenBack) {
this.listenBack=listenBack;
}
@Override
public ListenBack<T> getLister() {
return listenBack;
}
};
//## 正在处理任务
new Thread(() -> {
try {
T action = callable.action();
result.set(action);
isFlag.set(true);
if (future.getLister()!=null){
future.getLister().onSuccess(action);
}
//为空不做处理 因为不回调就不用管
} catch (Exception e) {
//处理异常
if (future.getLister()!=null){
future.getLister().OnFailed(e);
}
}
}).start();
return future;
}
/**
* 定义Future 将来由future返回
* @param <T>
*/
public interface Future<T> {
T get();
boolean isDone();
//增加一个回调监听
void AddListener(ListenBack<T> listenBack);
ListenBack<T> getLister();
}
//增加一个回调监听 里面存在两个方法 一个成功 一个Exception
private interface ListenBack<T>{
void onSuccess(T value);
void OnFailed(Throwable cause);
}
/**
* 定义Callable的接口 具体需要做的事情
* 可以采用lamdba表达式
* @param <T>
*/
private interface Callable<T> {
T action();
}
}
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
/**
* Created by mengxiaopeng on 2018/3/9.
* com.company.LambdaExpressions.Futures
* MyFutureInAction2 会在MyFutureInAction基础上注册一个Listen监听 当存在值后通知调用者线程
* 好处:不阻塞主线程 Future执行完后会通知调用者 不用采用while方式获取Callable的返回值
*/
public class MyFutureInAction2 {
public static void main(String[] args) {
//######测试自定义 submit()
Future<String> submitFuture = submit(() -> {
try {
Thread.sleep(10000);
return "Success";
} catch (InterruptedException e) {
e.printStackTrace();
return "Error";
}
});
submitFuture.AddListener(new ListenBack<String>() {
@Override
public void onSuccess(String value) {
System.out.println("Get Result By Listener..Value=["+value+"]");
}
@Override
public void OnFailed(Throwable cause) {
cause.printStackTrace();
}
});
System.out.println("3==main=no==Blocker===");
//上面执行时 不需要将主线程Block住 因为执行submit时 不是守护进程
//但是对于Java 8 CompletableFuture.supplyAsync 执行时是守护进程 如果主线程一直在 不用采用其他线程池或者Join 可以获取执行的返回值
//否则需要采用其他的线程池或者Join 保证CompletableFuture.supplyAsync 在执行过程中主线程不会提前执行完
}
/**
* Executors.newCachedThreadPool().submit()
* 自定义实现了上面的功能
* @param callable
* @param <T>
* @return 更加理解Future底层采用的方式
*/
private static <T> Future<T> submit(Callable<T> callable) {
AtomicReference<T> result = new AtomicReference<T>();
AtomicBoolean isFlag = new AtomicBoolean(false);
//##先初始化Future
Future<T> future = new Future<T>() {
//后续通过Future.AddListener 回调函数传进来
private ListenBack<T> listenBack;
@Override
public T get() {
T t = result.get();
return t;
}
@Override
public boolean isDone() {
return isFlag.get();
}
@Override
public void AddListener(ListenBack<T> listenBack) {
this.listenBack=listenBack;
}
@Override
public ListenBack<T> getLister() {
return listenBack;
}
};
//## 正在处理任务
new Thread(() -> {
try {
T action = callable.action();
result.set(action);
isFlag.set(true);
if (future.getLister()!=null){
future.getLister().onSuccess(action);
}
//为空不做处理 因为不回调就不用管
} catch (Exception e) {
//处理异常
if (future.getLister()!=null){
future.getLister().OnFailed(e);
}
}
}).start();
return future;
}
/**
* 定义Future 将来由future返回
* @param <T>
*/
public interface Future<T> {
T get();
boolean isDone();
//增加一个回调监听
void AddListener(ListenBack<T> listenBack);
ListenBack<T> getLister();
}
//增加一个回调监听 里面存在两个方法 一个成功 一个Exception
private interface ListenBack<T>{
void onSuccess(T value);
void OnFailed(Throwable cause);
}
/**
* 定义Callable的接口 具体需要做的事情
* 可以采用lamdba表达式
* @param <T>
*/
private interface Callable<T> {
T action();
}
}
相关文章推荐
- Java8学习计划--关于多核多线程并发编程-Java8-CompletableFuture 4的介绍
- Java8学习计划--关于多核多线程并发编程-JDK8之前的Future使用
- Java8学习计划--关于多核多线程并发编程-Java8-CompletableFuture 3的介绍
- Java8学习计划--关于多核多线程并发编程-Java8-CompletableFuture 1的介绍
- Java8学习计划--关于多核多线程并发编程-Java8-CompletableFuture 2的介绍
- 关于SQLServer2005的学习笔记——自定义分组的实现
- java学习笔记——自定义实现Stack集合
- java基础学习总结——关于Java中事件分发和监听机制实现的代码实例
- java 关于Map的key可不可以是自定义对象的学习
- java8学习之自定义收集器实现
- [JAVA][学习笔记]关于java.util.concurrent中 Future类的一些验证
- java学习笔记—自定义实现linkedList集合
- 关于java打印功能的最简单实现的学习笔记
- 【JAVA】系统中关于自定义比例选择的功能实现
- RadioButtton java代码实现左右带自定义的图片,并且控制字体与图片间距,同时控件长度对其功能。
- solr关于对文档的索引java 并且给文档添加自定义的域
- 关于SQLServer2005的学习笔记——自定义分组的实现
- Java关于链表的增加、删除、获取长度、打印数值的实现
- java 中关于自定义信号在linux下的实现