您的位置:首页 > 编程语言 > Java开发

多线程(二)

2017-09-30 17:03 106 查看
接下来,我们说 feature模式跟master-worker 模式,另外还有个知识点,Executors框架的一系列问题。

feature模式:之前稍微提到了一点,现在用代码细细实现下。所谓的feature模式,我的理解就是,当前用户执行一个任务,这个任务主线程执行时间很长很长,类似于财务系统的结算,这时候呢,我们首先让用户看见程序的执行结果(当然可以是伪造的),然后后台悄无声息的启动N个线程,分批的去执行用户刚才提交的那个任务,等到客户想看真正的数据的时候,这时候,多线程可能就执行完了,没执行完的话,就让用户wait,不多说,看伪代码

public classMain {
public static void main(String[] args) {

FeatureClient fc = new FeatureClient();

Data data = fc.request("请求");

System.out.println("请求成功");
List<RealData> msg = data.getRequest();

}

}
首先呢,fc类似于request的请求,data数据是用户想执行看到的结果,fc.request()。往下看这个方法

public class FeatureClient {

public Data request(final String m) {
final FeatureData featureData = new FeatureData();
new Runnable() {
public void run() {
List<RealData> realData = new ArrayList<RealData>();
//数据库操作realData
featureData.setRequest(realData);
}
};
return featureData;
}
}


内部类里面启用了一个线程,去执行用户的请求,主函数执行返回伪结果,

public class FeatureClient {

public Data request(final String m) {
final FeatureData featureData = new FeatureData();
new Runnable() {
public void run() {
List<RealData> realData = new ArrayList<RealData>();
//数据库操作realData
featureData.setRequest(realData);
}
};
return featureData;
}
}

public interface Data {
List<RealData> getRequest();
}
public class RealData {

}


这是setData的代码,feature模式就是这么简单。

-------------------------------

下面说下master-worker模式。

master-worker讲的是什么事呢,就是说,举个场景,当前有个消息队列,队列里面全都是任务,worker是打工者,需要执行这些任务,

master类似于一个中枢轴的概念,专门添加任务和解析任务吧,负责统计一些事情。这里需要用到concurrentMap这个保证线程安全的类

,还有个LinkedBlockingQueue,专门负责装载任务,下面看代码,

public class Main {
public static void main(String[] args) {
Worker worker = new Worker();
Master master = new Master(worker, 10);
for (int i = 1; i <= 100; i++) {
Task task = new Task();
task.setId(i);
task.setPrice(new Random().nextInt(1000));
master.submit(task);
}
master.execute();
Long time = System.currentTimeMillis();
while (master.isComplete()) {
Long time1 = System.currentTimeMillis() - time;
System.out.println(time1);
break;
}
}
}

当前呢,有两个类Worker,Master。我初始化了100个任务,需要用到master的添加任务的方法,放到Queue里面,

public class Master {
private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>();

private HashMap<String, Thread> workers = new HashMap<String, Thread>();

private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>();

public Master(Worker worker, int workerCount) {
worker.setWorkQueue(workQueue);
worker.setResultMap(resultMap);
for (int i = 0; i < workerCount; i++) {
workers.put("子节点" + Integer.toString(i), new Thread(worker));
}
}

public void submit(Task task) {
workQueue.add(task);
}

public void execute() {
for (Map.Entry<String, Thread> me : workers.entrySet()) {
me.getValue().start();
}
}

public boolean isComplete() {
for (Map.Entry<String, Thread> me : workers.entrySet()) {
if (me.getValue().getState() != Thread.State.TERMINATED) {
return false;
}
}
return true;
}
}


Master里面有添加任务的方法(submit),有判断任务是否执行完毕(isComplete),有初始化执行任务的方法,然后呢,再看master的构造方法,两个传参,worker对象

和数量,有个hashMap,专门记录当前是哪个Thread。然后再看,Worker类代码,

public class Worker implements Runnable {
private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<Task>();

private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>();

public void run() {
while (true) {
Task task = this.workQueue.poll();
if (task == null) {
break;
}
resultMap.put(Integer.toString(task.getId()), task);
}
}

public void setWorkQueue(ConcurrentLinkedQueue<Task> workQueue) {
this.workQueue = workQueue;
}

public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
this.resultMap = resultMap;
}
}

里面有个实现了Runnable的方法,在run里面执行了while,我解释下poll方法,poll方法的意思是,取出当前Queue队列的第一个元素并删除,这里的伪代码,我简单的打印了id,(当然,这里可以统计Task类里面的属性,执行++操作。),把结果放到了concurrentMap的结果集里面。再看Main类,Master的构造函数,我初始化了worker对象,10个线程去执行,100个任务,直到Queue里面的任务执行完毕,返回到concurrentMap的结果集里面去。这就是Master-worker模式(需要掌握精髓)。实在不理解的可以动手写写代码,去debug下。好记性不如烂笔头。

----------------------------

最后一个知识点,Executors框架。

Executors框架 : Executors类,提供了一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService接口。比如

newFixedThreadPool,newSingleThreadExecutor,newCachedThreadPool,newScheduledThreadPool.这四个方法jdk实现机制都是


ThreadPoolExecutor,利用了工厂方法,根据传入参数的不同,去实例化不同的ThreadPool对象。下面简单说下ThreadPoolExecutor的构

造、方法的各个参数。


corePoolSize, 核心线程数,

maximumPoolSize,最大线程数,
keepAliveTime,存活时间,
unit,存活时间的单位,
workQueue,需要放任务的队列,
RejectedExecutionHandler,放弃操作。


下面介绍下我理解的参数的意义,corePoolSize,核心线程数,就是初始化的线程数,maximumPoolSize,最大线程数,这两者的区别需要根据传入的Queue去介绍,举个例
子,

public class UseThreadExecutorPool implements Runnable {
private static AtomicInteger count = new AtomicInteger();

@Override
public void run() {
int all = count.incrementAndGet();
System.out.println(all);
}

//自定义线程池  各个参数什么意思 corePoolSize 核心线程数,maximumPoolSize 最大线程数,
//当 任务数量 <= corePoolSize 直接执行, 当   maximumPoolSize =>任务数量 > corePoolSize ,任务数量-corePoolSize 的任务 加到queue里面,

//拿corePoolSize 线程执行,
//当 maximumPoolSize < 任务数量  在不超过 maximumPoolSize的情况下 新建线程执行 加满queue之后的 任务,当超过了最大线程数,就报错,

//执行劝退侧洛
public static void main(String[] args) {
ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue<UseThreadExecutorPool>(4);
ExecutorService executorService = new ThreadPoolExecutor(5, 10, 60, TimeUnit.DAYS, arrayBlockingQueue);

UseThreadExecutorPool useThreadExecutorPool0 = new UseThreadExecutorPool();
UseThreadExecutorPool useThreadExecutorPool1 = new UseThreadExecutorPool();
//UseThreadExecutorPool useThreadExecutorPool2 = new UseThreadExecutorPool();
//UseThreadExecutorPool useThreadExecutorPool3 = new UseThreadExecutorPool();
//UseThreadExecutorPool useThreadExecutorPool4 = new UseThreadExecutorPool();
//UseThreadExecutorPool useThreadExecutorPool5 = new UseThreadExecutorPool();
executorService.execute(useThreadExecutorPool0);
executorService.execute(useThreadExecutorPool1);
//executorService.execute(useThreadExecutorPool2);
//executorService.execute(useThreadExecutorPool3);
//executorService.execute(useThreadExecutorPool4);
//executorService.execute(useThreadExecutorPool5);

executorService.shutdown();
}
}

大家都知道,ArrayBlockingQueue是有界队列,LinkedBlockingQueue是无界队列(当然可以变成有界),看上面的代码,new ThreadPoolExecutor的构造参数,5代表,我初始化5个线程,10代表, 支持最大10个线程,60呢代表,这些线程的空闲时间,当然得有单位啦,下个参数就是60的参数,可以是天,可以是秒,也可以是小时,再下一个参数就是,Queue类的,这里的Queue可以是上面队列的任何一个,但是代表的意义都不一样。Queue后面其实还有个参数,叫做劝退策略,专门负责队列满了,剩下的任务也超过了最大线程数,就会执行劝退策略,jdk有默认的,所以不写也行,当然也可以自己写,自己写的话,必须要实现RejectedExecutionHandler接口

jdk提供了四种劝退策略,

AbortPolicy:直接抛出异常组织系统正常工作,

CallerRunsRolicy:只要线程池未关闭,该侧罗直接在调用者线程中,运行当前被丢弃的任务,

DiscardOldestPolicy:丢弃最老的一个请求,尝试再次提交当前任务,

DiscardPolicy:丢弃无法处理的任务,不给于任何处理。

上面的代码什么意思呢,稍后的博客会带来更详细的分析。今天的知识点,可以说有点难度,需要好好的理解理解,当然,我说的也不太好,尽量让大家明白。

题外话:多线程这个东西吧,面试经常考,但是呢,绝大公司技术是不会用到的,多线程呢,属于java基础,出去面试你想要更高的工资,20k,25k,更有30k的,面试官说了一个知识点(当然,面试官也可能只是懂点皮毛),你不知道,那你的身价可是大大打了折扣,如果你了解的更深入底层的代码的话,能说出来,那你在面试官心里的感觉就不一样了(工资也好要了),这只是个人的理解。也希望下回各位小伙伴跳槽工资涨的最少5000块吧、有兴趣的可以加我QQ
:82479297  我也是个技术成长道路上的小白,

可以交流交流。下次聊。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息