ForkJoin、BlockingDeque、ReentrantLock的使用(BAT-JUC笔试题)
1:有一个总任务A,分解为子任务A1 A2 A3 ...,任何一个子任务失败后要快速取消所有任务,请写程序模拟。
「请寻求最优解,不要只是粗暴wait()」
本题解题思路:Fork/Join
通常使用其更专门的类型之一 RecursiveTask(可以返回结果)或 RecursiveAction。
Oracle 官方文档:https://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html
主功能就是将一个大任务拆分成多个小任务进行处理。
处理过程中只要有个小任务失败报错,剩下的任务将可能被立即停止。
以下为代码实现:
1.1:实现代码
public class SonTask extends RecursiveAction { private static final Logger logger = LoggerFactory.getLogger(SonTask.class); /** * 总共任务量 **/ private final int taskCount; /** * 当前task被分配的任务量 **/ private final int taskMete; /** * 当前task序号 **/ private int taskRank; /** * 每个task最大可处理任务量 **/ private final int maxTask = 1; public SonTask(int taskCount) { this.taskCount = taskCount; this.taskMete = taskCount; } private SonTask(int taskCount, int taskMete, int taskRank) { this.taskCount = taskCount; this.taskMete = taskMete; this.taskRank = taskRank; } @Override protected void compute() { // 任务分配量是否满足处理条件,不满足则将任务再拆分 if (taskMete == maxTask) { printSelf(); } else { List<SonTask> sonTaskList = new ArrayList<>(); for (int i = 1; i <= taskCount; i++) { sonTaskList.add(new SonTask(taskCount, 1, i)); } // 执行所有任务 invokeAll(sonTaskList); } } /** * task 1 正常结束 -> * task 2 执行报错 -> * task 3 直接终止 **/ private void printSelf() { logger.info("SON TASK RANK [{}] START", taskRank); try { TimeUnit.SECONDS.sleep(taskRank * 3); if (taskRank == 2) { logger.error("eroor occured"); throw new RuntimeException("error"); } else { logger.info("TASK [{}] OVER", taskRank); } } catch (InterruptedException e) { e.printStackTrace(); } } }
1.2:测试
public class StartMain { public static void main(String[] args) { ForkJoinPool pool = new ForkJoinPool(10); SonTask sonTask = new SonTask(3); pool.invoke(sonTask); } }
在task 1结束后由于task 2报错了,task 3被取消执行。
看看ForkJoinTask#invokeAll(Collection tasks) 的源码注释中有这么一句话:
If any task 56c encounters an exception, others may be cancelled.
/** * Forks all tasks in the specified collection, returning when * {@code isDone} holds for each task or an (unchecked) exception * is encountered, in which case the exception is rethrown. If * more than one task encounters an exception, then this method * throws any one of these exceptions. If any task encounters an * exception, others may be cancelled. However, the execution * status of individual tasks is not guaranteed upon exceptional * return. The status of each task may be obtained using {@link * #getException()} and related methods to check if they have been * cancelled, completed normally or exceptionally, or left * unprocessed. * * @param tasks the collection of tasks * @param <T> the type of the values returned from the tasks * @return the tasks argument, to simplify usage * @throws NullPointerException if tasks or any element are null */ public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks) { ... }
2:请用两个线程 56c 交替输出A1B2C3D4...,A线程输出字母,B线程输出数字,要求A线程首先执行,B线程其次执行!(多种同步机制的运用)
「请寻求最优解,不要简单的synchronized」
本题解题思路:ReentrantLock、Condtional
1:利用Conditon#await、Condition#signal 进行线程之间的通信,替代Object#wait、Object#notify。
2:勿使用Thread#join 这种阻塞主线程的方式,也达不到该题的需求。
Condition的类注释:
/** * {@code Condition} factors out the {@code Object} monitor * methods ({@link Object#wait() wait}, {@link Object#notify notify} * and {@link Object#notifyAll notifyAll}) into distinct objects to * give the effect of having multiple wait-sets per object, by * combining them with the use of arbitrary {@link Lock} implementations. * Where a {@code Lock} replaces the use of {@code synchronized} methods * and statements, a {@code Condition} replaces the use of the Object * monitor methods. * ... */
2.1:实现代码
public class StartMain { private static final Logger logger = LoggerFactory.getLogger(StartMain.class); private static final ReentrantLock lock = new ReentrantLock(); private static final String[] arr = new String[]{"A1", ad0 "B2", "C3", "D4"}; private static final AtomicInteger index = new AtomicInteger(0); public static void main(String[] args) { Condition conditionA = lock.newCondition(); Condition conditionB = lock.newCondition(); Thread threadA = new Thread(() -> { while (index.get() < arr.length) { try { lock.lock(); logger.info(arr[index.get()]); index.incrementAndGet(); conditionB.signal(); conditionA.await(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }, "thread-A"); Thread threadB = new Thread(() -> { while (index.get() < arr.length) { try { lock.lock(); conditionB.await(); logger.info(arr[index.get()]); index.incrementAndGet(); conditionA.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }, "thread-B"); threadB.start(); // 为了使测试更加逼真,先让B开始 try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } threadA.start(); } }
2.2:Condition#await
/** * Causes the current thread to wait until it is signalled or * {@linkplain Thread#interrupt interrupted}. * * <p>The lock associated with this {@code Condition} is atomically * released and the current thread becomes disabled for thread scheduling * purposes and lies dormant until <em>one</em> of four things happens: * * ... * @throws InterruptedException if the current thread is interrupted * (and interruption of thread suspension is supported) */ void await() throws InterruptedException;
**The lock associated with this {@code Condition} is atomically released and the current thread becomes disabled **
for thread scheduling purposes and lies dormant until ...
调用了Condition.await()方法后该线程所持有的Lock锁会被释放掉,并且当前线程会变得不可用(阻塞),直到调用了Condtion.signal()方法。
3:华为面试题
「请寻求最优解,不要简单的生产者-消费者模式」
< 2e73 p>有一个生产奶酪的厂家,每天需要生产100000份奶酪卖给超市,通过一辆货车发货,送货车每次送100份。厂家有一个容量为1000份的冷库,用于奶酪保鲜,生产的奶酪需要先存放在冷库,运输车辆从冷库取货。
厂家有三条生产线,分别是牛奶供应生产线,发酵剂制作生产线,奶酪生产线。
生产每份奶酪需要2份牛奶和一份发酵剂。
请设计生产系统?
本题解题思路: BlockingDeque阻塞队列、Atomic原子类
Oracle 官方文档:https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/BlockingDeque.html
3.1:api注释
1:BlockingDeque#take()
/** * Retrieves and removes the head of the queue represented by this deque * (in other words, the first element of this deque), waiting if * necessary until an element becomes available. * * <p>This method is equivalent to {@link #takeFirst() takeFirst}. * * @return the head of this deque * @throws InterruptedException if interrupted while waiting */ E take() throws InterruptedException;
该方法若从队列中取不到元素会造成当前线程阻塞,直到拿到元素为止。
2:BlockingDeque#put(E e)
/** * Inserts the specified element into the queue represented by this deque * (in other words, at the tail of this deque), waiting if necessary for * space to become available. * * <p>This method is equivalent to {@link #putLast(Object) putLast}. * * @param e the element to add * @throws InterruptedException {@inheritDoc} * @throws ClassCastException if the class of the specified element * prevents it from being added to this deque * @throws NullPointerException if the specified element is null * @throws IllegalArgumentException if some property of the specified * element prevents it from being added to this deque */ void put(E e) throws InterruptedException;
当队列容量达到上限时,其他元素无法入队且使当前线程阻塞,直到队有可用空间为止。
3.2:实现代码
public class ProductOnlineBus { private static final Logger logger = LoggerFactory.getLogger(ProductOnlineBus.class); /** * 生产奶酪数量 **/ private final int prodNum; /** * 牛奶=奶酪*2 **/ private final int milkMultiple = 2; /** * 发酵剂=奶酪*1 **/ private final int fjjMultiple = 1; /** * 奶酪仓库容量 **/ private final int cheeseCapacity = 1000; /** * 单词运输奶酪数量 **/ private final int truckCapacity = 100; /** * 总共需要运输多少次 **/ private final int needTruckTimes; /** * 生产线--阻塞队列 **/ private final BlockingDeque<MiikNode> milkNodeBlockingDeque; private final BlockingDeque<FJJNode> fjjNodeBlockingDeque; private final BlockingDeque<CheeseNode> cheeseNodeBlockingDeque; /** * 实际运输次数 **/ private final AtomicInteger trucked = new AtomicInteger(0); /** * 各生产线生产次数 **/ private final AtomicInteger milkProded = new AtomicInteger(0); private final AtomicInteger fjjProded = new AtomicInteger(0); private final AtomicInteger cheeseProded = new AtomicInteger(0); public ProductOnlineBus(int prodNum) { if ((prodNum % truckCapacity) != 0) { throw new RuntimeException("请输入truckCapacity的倍数"); } this.prodNum = prodNum; this.milkNodeBlockingDeque = new LinkedBlockingDeque<>(milkMultiple); this.fjjNodeBlockingDeque = new LinkedBlockingDeque<>(fjjMultiple); this.cheeseNodeBlockingDeque = new LinkedBlockingDeque<>(cheeseCapacity); this.needTruckTimes = prodNum / truckCapacity; } public void starter() { new Thread(() -> { int len = prodNum * milkMultiple; for (int i = 0; i < len; i++) { try { milkNodeBlockingDeque.put(new MiikNode(i)); milkProded.incrementAndGet(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "MilkThread").start(); new Thread(() -> { int len = prodNum * fjjMultiple; for (int i = 0; i < len; i++) { try { fjjNodeBlockingDeque.put(new FJJNode(i)); fjjProded.incrementAndGet(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "FJJThread").start(); new Thread(() -> { for (int i = 0; i < prodNum; i++) { try { for (int j = 0; j < milkMultiple; j++) { milkNodeBlockingDeque.take(); } for (int j = 0; j < fjjMultiple; j++) { fjjNodeBlockingDeque.take(); } cheeseNodeBlockingDeque.put(new CheeseNode(i)); cheeseProded.incrementAndGet(); } catch (InterruptedException e) { e.printStackTrace(); } } }, "CheeseThread").start(); new Thread(() -> { while (trucked.get() < needTruckTimes) { try { for (int i = 0; i < truckCapacity; i++) { cheeseNodeBlockingDeque.take(); } trucked.incrementAndGet(); } catch (InterruptedException e) { e.printStackTrace(); } } logger.info("over of->cheese:[{}],milk:[{}],fjj[{}],truck:[{}]", cheeseProded.get(), milkProded.get(), fjjProded.get(), trucked.get()); }, "TruckThread").start(); } /** * 牛奶 **/ private class MiikNode { public MiikNode(int seq) { logger.info("生产牛奶[{}]...", seq); } } /** * 发酵剂 **/ private class FJJNode { public FJJNode(int seq) { logger.info("生产发酵剂[{}]...", seq); } } /** * 奶酪 **/ private class CheeseNode { public CheeseNode(int seq) { logger.info("生产奶酪[{}]...", seq); } } }
3.3:运行
public class StartMain { public static void main(String[] args) { ProductOnlineBus pb = new ProductOnlineBus(100000); pb.starter(); } }
- JUC forkJoinPool
- 判断某个问题是否适合使用ForkJoin解决
- 线程池ThreadPoolExecutor 和 ForkJoinPool 的分析使用
- 【高并发10】JUC组件扩展(FutureTask, Fork / Join 并行框架 ,阻塞队列BlockingQueue)
- systemverilog在for循环中使用fork_join和fork_join_none的区别
- Java多线程 -- JUC包源码分析19 -- ForkJoinPool/ForkJoinTask
- ForkJoinPool 使用的错误写法
- JUC学习笔记五:四大函数式接口和流Stream; ForkJoinPool
- Java并发编程--Fork/Join框架使用
- java8中forkjoin和optional框架使用
- java并行计算Fork和Join的使用
- 关于Java ForkJoin框架的使用demo
- Java使用Fork/Join框架来并行执行任务
- Java使用Fork/Join框架来并行执行任务
- java-forkjoin框架的使用
- Fork/Join简单使用
- java8 - fork-join之CompletableFuture 使用总结
- (四)juc线程高级特性——线程池 / 线程调度 / ForkJoinPool
- 【Java高并发学习】使用Thread和ForkJoin解决特别消耗时间的操作
- Fork/Join框架原理和使用探秘