多线程(二)
2017-09-30 17:03
106 查看
接下来,我们说 feature模式跟master-worker 模式,另外还有个知识点,Executors框架的一系列问题。
feature模式:之前稍微提到了一点,现在用代码细细实现下。所谓的feature模式,我的理解就是,当前用户执行一个任务,这个任务主线程执行时间很长很长,类似于财务系统的结算,这时候呢,我们首先让用户看见程序的执行结果(当然可以是伪造的),然后后台悄无声息的启动N个线程,分批的去执行用户刚才提交的那个任务,等到客户想看真正的数据的时候,这时候,多线程可能就执行完了,没执行完的话,就让用户wait,不多说,看伪代码
public classMain {
public static void main(String[] args) {
FeatureClient fc = new FeatureClient();
Data data = fc.request("请求");
System.out.println("请求成功");
List<RealData> msg = data.getRequest();
}
}
首先呢,fc类似于request的请求,data数据是用户想执行看到的结果,fc.request()。往下看这个方法
内部类里面启用了一个线程,去执行用户的请求,主函数执行返回伪结果,
这是setData的代码,feature模式就是这么简单。
-------------------------------
下面说下master-worker模式。
master-worker讲的是什么事呢,就是说,举个场景,当前有个消息队列,队列里面全都是任务,worker是打工者,需要执行这些任务,
master类似于一个中枢轴的概念,专门添加任务和解析任务吧,负责统计一些事情。这里需要用到concurrentMap这个保证线程安全的类
,还有个LinkedBlockingQueue,专门负责装载任务,下面看代码,
当前呢,有两个类Worker,Master。我初始化了100个任务,需要用到master的添加任务的方法,放到Queue里面,
Master里面有添加任务的方法(submit),有判断任务是否执行完毕(isComplete),有初始化执行任务的方法,然后呢,再看master的构造方法,两个传参,worker对象
和数量,有个hashMap,专门记录当前是哪个Thread。然后再看,Worker类代码,
里面有个实现了Runnable的方法,在run里面执行了while,我解释下poll方法,poll方法的意思是,取出当前Queue队列的第一个元素并删除,这里的伪代码,我简单的打印了id,(当然,这里可以统计Task类里面的属性,执行++操作。),把结果放到了concurrentMap的结果集里面。再看Main类,Master的构造函数,我初始化了worker对象,10个线程去执行,100个任务,直到Queue里面的任务执行完毕,返回到concurrentMap的结果集里面去。这就是Master-worker模式(需要掌握精髓)。实在不理解的可以动手写写代码,去debug下。好记性不如烂笔头。
----------------------------
最后一个知识点,Executors框架。
Executors框架 : Executors类,提供了一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService接口。比如
下面介绍下我理解的参数的意义,corePoolSize,核心线程数,就是初始化的线程数,maximumPoolSize,最大线程数,这两者的区别需要根据传入的Queue去介绍,举个例
子,
大家都知道,ArrayBlockingQueue是有界队列,LinkedBlockingQueue是无界队列(当然可以变成有界),看上面的代码,new ThreadPoolExecutor的构造参数,5代表,我初始化5个线程,10代表, 支持最大10个线程,60呢代表,这些线程的空闲时间,当然得有单位啦,下个参数就是60的参数,可以是天,可以是秒,也可以是小时,再下一个参数就是,Queue类的,这里的Queue可以是上面队列的任何一个,但是代表的意义都不一样。Queue后面其实还有个参数,叫做劝退策略,专门负责队列满了,剩下的任务也超过了最大线程数,就会执行劝退策略,jdk有默认的,所以不写也行,当然也可以自己写,自己写的话,必须要实现RejectedExecutionHandler接口
jdk提供了四种劝退策略,
AbortPolicy:直接抛出异常组织系统正常工作,
CallerRunsRolicy:只要线程池未关闭,该侧罗直接在调用者线程中,运行当前被丢弃的任务,
DiscardOldestPolicy:丢弃最老的一个请求,尝试再次提交当前任务,
DiscardPolicy:丢弃无法处理的任务,不给于任何处理。
上面的代码什么意思呢,稍后的博客会带来更详细的分析。今天的知识点,可以说有点难度,需要好好的理解理解,当然,我说的也不太好,尽量让大家明白。
题外话:多线程这个东西吧,面试经常考,但是呢,绝大公司技术是不会用到的,多线程呢,属于java基础,出去面试你想要更高的工资,20k,25k,更有30k的,面试官说了一个知识点(当然,面试官也可能只是懂点皮毛),你不知道,那你的身价可是大大打了折扣,如果你了解的更深入底层的代码的话,能说出来,那你在面试官心里的感觉就不一样了(工资也好要了),这只是个人的理解。也希望下回各位小伙伴跳槽工资涨的最少5000块吧、有兴趣的可以加我QQ
:82479297 我也是个技术成长道路上的小白,
可以交流交流。下次聊。
feature模式:之前稍微提到了一点,现在用代码细细实现下。所谓的feature模式,我的理解就是,当前用户执行一个任务,这个任务主线程执行时间很长很长,类似于财务系统的结算,这时候呢,我们首先让用户看见程序的执行结果(当然可以是伪造的),然后后台悄无声息的启动N个线程,分批的去执行用户刚才提交的那个任务,等到客户想看真正的数据的时候,这时候,多线程可能就执行完了,没执行完的话,就让用户wait,不多说,看伪代码
public classMain {
public static void main(String[] args) {
FeatureClient fc = new FeatureClient();
Data data = fc.request("请求");
System.out.println("请求成功");
List<RealData> msg = data.getRequest();
}
}
首先呢,fc类似于request的请求,data数据是用户想执行看到的结果,fc.request()。往下看这个方法
public class FeatureClient { public Data request(final String m) { final FeatureData featureData = new FeatureData(); new Runnable() { public void run() { List<RealData> realData = new ArrayList<RealData>(); //数据库操作realData featureData.setRequest(realData); } }; return featureData; } }
内部类里面启用了一个线程,去执行用户的请求,主函数执行返回伪结果,
public class FeatureClient { public Data request(final String m) { final FeatureData featureData = new FeatureData(); new Runnable() { public void run() { List<RealData> realData = new ArrayList<RealData>(); //数据库操作realData featureData.setRequest(realData); } }; return featureData; } }
public interface Data { List<RealData> getRequest(); }
public class RealData { }
这是setData的代码,feature模式就是这么简单。
-------------------------------
下面说下master-worker模式。
master-worker讲的是什么事呢,就是说,举个场景,当前有个消息队列,队列里面全都是任务,worker是打工者,需要执行这些任务,
master类似于一个中枢轴的概念,专门添加任务和解析任务吧,负责统计一些事情。这里需要用到concurrentMap这个保证线程安全的类
,还有个LinkedBlockingQueue,专门负责装载任务,下面看代码,
public class Main { public static void main(String[] args) { Worker worker = new Worker(); Master master = new Master(worker, 10); for (int i = 1; i <= 100; i++) { Task task = new Task(); task.setId(i); task.setPrice(new Random().nextInt(1000)); master.submit(task); } master.execute(); Long time = System.currentTimeMillis(); while (master.isComplete()) { Long time1 = System.currentTimeMillis() - time; System.out.println(time1); break; } } }
当前呢,有两个类Worker,Master。我初始化了100个任务,需要用到master的添加任务的方法,放到Queue里面,
public class Master { private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>(); private HashMap<String, Thread> workers = new HashMap<String, Thread>(); private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>(); public Master(Worker worker, int workerCount) { worker.setWorkQueue(workQueue); worker.setResultMap(resultMap); for (int i = 0; i < workerCount; i++) { workers.put("子节点" + Integer.toString(i), new Thread(worker)); } } public void submit(Task task) { workQueue.add(task); } public void execute() { for (Map.Entry<String, Thread> me : workers.entrySet()) { me.getValue().start(); } } public boolean isComplete() { for (Map.Entry<String, Thread> me : workers.entrySet()) { if (me.getValue().getState() != Thread.State.TERMINATED) { return false; } } return true; } }
Master里面有添加任务的方法(submit),有判断任务是否执行完毕(isComplete),有初始化执行任务的方法,然后呢,再看master的构造方法,两个传参,worker对象
和数量,有个hashMap,专门记录当前是哪个Thread。然后再看,Worker类代码,
public class Worker implements Runnable { private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>(); private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>(); public void run() { while (true) { Task task = this.workQueue.poll(); if (task == null) { break; } resultMap.put(Integer.toString(task.getId()), task); } } public void setWorkQueue(ConcurrentLinkedQueue<Task> workQueue) { this.workQueue = workQueue; } public void setResultMap(ConcurrentHashMap<String, Object> resultMap) { this.resultMap = resultMap; } }
里面有个实现了Runnable的方法,在run里面执行了while,我解释下poll方法,poll方法的意思是,取出当前Queue队列的第一个元素并删除,这里的伪代码,我简单的打印了id,(当然,这里可以统计Task类里面的属性,执行++操作。),把结果放到了concurrentMap的结果集里面。再看Main类,Master的构造函数,我初始化了worker对象,10个线程去执行,100个任务,直到Queue里面的任务执行完毕,返回到concurrentMap的结果集里面去。这就是Master-worker模式(需要掌握精髓)。实在不理解的可以动手写写代码,去debug下。好记性不如烂笔头。
----------------------------
最后一个知识点,Executors框架。
Executors框架 : Executors类,提供了一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService接口。比如
newFixedThreadPool,newSingleThreadExecutor,newCachedThreadPool,newScheduledThreadPool.这四个方法jdk实现机制都是
ThreadPoolExecutor,利用了工厂方法,根据传入参数的不同,去实例化不同的ThreadPool对象。下面简单说下ThreadPoolExecutor的构
造、方法的各个参数。
corePoolSize, 核心线程数,
maximumPoolSize,最大线程数,
keepAliveTime,存活时间,
unit,存活时间的单位,
workQueue,需要放任务的队列,
RejectedExecutionHandler,放弃操作。
下面介绍下我理解的参数的意义,corePoolSize,核心线程数,就是初始化的线程数,maximumPoolSize,最大线程数,这两者的区别需要根据传入的Queue去介绍,举个例
子,
public class UseThreadExecutorPool implements Runnable { private static AtomicInteger count = new AtomicInteger(); @Override public void run() { int all = count.incrementAndGet(); System.out.println(all); } //自定义线程池 各个参数什么意思 corePoolSize 核心线程数,maximumPoolSize 最大线程数, //当 任务数量 <= corePoolSize 直接执行, 当 maximumPoolSize =>任务数量 > corePoolSize ,任务数量-corePoolSize 的任务 加到queue里面,
//拿corePoolSize 线程执行, //当 maximumPoolSize < 任务数量 在不超过 maximumPoolSize的情况下 新建线程执行 加满queue之后的 任务,当超过了最大线程数,就报错,
//执行劝退侧洛 public static void main(String[] args) { ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<UseThreadExecutorPool>(4); ExecutorService executorService = new ThreadPoolExecutor(5, 10, 60, TimeUnit.DAYS, arrayBlockingQueue); UseThreadExecutorPool useThreadExecutorPool0 = new UseThreadExecutorPool(); UseThreadExecutorPool useThreadExecutorPool1 = new UseThreadExecutorPool(); //UseThreadExecutorPool useThreadExecutorPool2 = new UseThreadExecutorPool(); //UseThreadExecutorPool useThreadExecutorPool3 = new UseThreadExecutorPool(); //UseThreadExecutorPool useThreadExecutorPool4 = new UseThreadExecutorPool(); //UseThreadExecutorPool useThreadExecutorPool5 = new UseThreadExecutorPool(); executorService.execute(useThreadExecutorPool0); executorService.execute(useThreadExecutorPool1); //executorService.execute(useThreadExecutorPool2); //executorService.execute(useThreadExecutorPool3); //executorService.execute(useThreadExecutorPool4); //executorService.execute(useThreadExecutorPool5); executorService.shutdown(); } }
大家都知道,ArrayBlockingQueue是有界队列,LinkedBlockingQueue是无界队列(当然可以变成有界),看上面的代码,new ThreadPoolExecutor的构造参数,5代表,我初始化5个线程,10代表, 支持最大10个线程,60呢代表,这些线程的空闲时间,当然得有单位啦,下个参数就是60的参数,可以是天,可以是秒,也可以是小时,再下一个参数就是,Queue类的,这里的Queue可以是上面队列的任何一个,但是代表的意义都不一样。Queue后面其实还有个参数,叫做劝退策略,专门负责队列满了,剩下的任务也超过了最大线程数,就会执行劝退策略,jdk有默认的,所以不写也行,当然也可以自己写,自己写的话,必须要实现RejectedExecutionHandler接口
jdk提供了四种劝退策略,
AbortPolicy:直接抛出异常组织系统正常工作,
CallerRunsRolicy:只要线程池未关闭,该侧罗直接在调用者线程中,运行当前被丢弃的任务,
DiscardOldestPolicy:丢弃最老的一个请求,尝试再次提交当前任务,
DiscardPolicy:丢弃无法处理的任务,不给于任何处理。
上面的代码什么意思呢,稍后的博客会带来更详细的分析。今天的知识点,可以说有点难度,需要好好的理解理解,当然,我说的也不太好,尽量让大家明白。
题外话:多线程这个东西吧,面试经常考,但是呢,绝大公司技术是不会用到的,多线程呢,属于java基础,出去面试你想要更高的工资,20k,25k,更有30k的,面试官说了一个知识点(当然,面试官也可能只是懂点皮毛),你不知道,那你的身价可是大大打了折扣,如果你了解的更深入底层的代码的话,能说出来,那你在面试官心里的感觉就不一样了(工资也好要了),这只是个人的理解。也希望下回各位小伙伴跳槽工资涨的最少5000块吧、有兴趣的可以加我QQ
:82479297 我也是个技术成长道路上的小白,
可以交流交流。下次聊。
相关文章推荐
- 简单的SHELL多线程脚本!!
- Java多线程编程模式实战指南(三):Two-phase Termination模式--转载
- 浅谈Java多线程的同步问题
- Java多线程之Condition
- 一、javaSE (二十四)多线程、设计模式
- java多线程挂起
- 多线程和多进程的区别(小结)
- java之十三篇:java中多线程
- 多线程中的join(),yield()与优先级Priority
- c++的对象计数问题的多线程实现
- 多线程题 3个线程各打印一种字母,按顺序打印若干次ABC 的2种实现
- iOS 多线程--NSThread
- 详解Linux多线程使用信号量同步
- JAVA 多线程的简单实现(runnable接口方式)
- iOS多线程编程之NSOperation和NSOperationQueue的使用
- C#多线程操作WPF GUI控件,串口数据接收发送
- Java 多线程深入分析(一)
- Java多线程
- 代码演示:如何消除多线程渲染与物理模拟间的相互影响
- 利用多线程实现对网站状态的监控