您的位置:首页 > 其它

线程池的实现(生产者消费者),借助队列实现

2014-10-10 23:34 375 查看
线程的操作最原始的例子就是生存者消费者模式。

利用的数据结构就是队列

线程协作(信号量机制)
==================================================================================

1、线程是CPU调度的单位

  进程是资源分配的单位、线程是资源调度单位

2、线程的状态:就绪、等待、睡眠、结束

  等待状态是基于共享资源被其他线程加锁,需要等待

3、线程之间的协作

  通知机制、协作机制(信号量机制)

4、线程资源的共享

  多线程的目的是为了并发操作,但是并发的前提是共享资源要加锁,保证数据的一致性。

5、线程锁

  锁是针对资源的,而资源上有一个容器,操作次资源的线程必须获得资源的锁,如果获取不到就去资源对象的容器里面等待

  锁有优先级FIFO、优先级的锁等

6、阻塞与无阻塞

  1、阻塞就是处于等待状态,其他什么事情都不能干。悲观锁就是这种机制

  2、非阻塞采用的轮询机制,才有cpu指令的CAS,实现非阻塞的加锁机制。jdk1.5新特性里面的显示锁就是cas机制的

7、线程中断

  实际上,如果线程运行起来,用户只能发送中断请求,CPU绝对是不是中断,所以中断是不可控制的。

8、网络io慢的原因

  1、数据的多次拷贝(用户空间到内核空间,再到网卡的缓存区)

  2、线程开销 

  3、解决办法就是实现IO多路服用,就是基于线程池的模式以及NIO(减少数据拷贝) 

==================================================================================

import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Scanner;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

/**
* 模拟线程池的工作原理
* 基于队列+线程实现
* 原理:生成者消费者模式
*
*/
public class App {
private static Scanner scanner;

public static void main(String[] args) throws InterruptedException {
CostThreadPools costThreadPools = new App.CostThreadPools(10);
scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String cmd = scanner.nextLine();
System.out.println(cmd);
int count = Integer.parseInt(cmd);
while (count-- > 0) {
final int index = count;
costThreadPools.submitTask(new Runnable() {
public void run() {
System.out.println(Thread.currentThread().getName() + " | task" + "-" + index + "|"
+ new Date().toString());
try {
Thread.currentThread().sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}

}

static class CostThreadPools {
List<MyThreadPool> pools = new ArrayList<App.CostThreadPools.MyThreadPool>();
private static Queue<Runnable> workQueue;
private AtomicInteger count = new AtomicInteger(0);
int size;

public CostThreadPools(int size) {
this.size = size;
initPools();
}

private void initPools() {
this.workQueue = new ArrayBlockingQueue<Runnable>(100);
int i = 0;
while (i < size) {
App.CostThreadPools.MyThreadPool myThreadPool = new App.CostThreadPools.MyThreadPool(
"MYTHREAD" + "@" + i, new HashMap<String, Integer>());
pools.add(myThreadPool);
myThreadPool.start();
System.out.println(
"线程初始化启动,已经创建线程:" + myThreadPool.getName() + ",state=" + myThreadPool.getState().toString());
i++;
}
hearBeatThreadStatue();
}

void submitTask(Runnable task) {
workQueue.offer(task);
}

void hearBeatThreadStatue() {
new Thread(new Runnable() {

public void run() {
while (true) {
for (MyThreadPool myThreadPool : pools) {
System.out.println(
myThreadPool.name + "已经执行了" + myThreadPool.map.get(myThreadPool.name) + "个任务");
}
try {
Thread.currentThread().sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "监控线程").start();

}

static class MyThreadPool extends Thread {
private String name;
private volatile Map<String, Integer> map;

public MyThreadPool(String name, Map<String, Integer> map) {
this.map = map;
this.name = name;
}

@Override
public void run() {
while (true) {
while (!workQueue.isEmpty()) {
int count = 0;
while (true) {
Runnable task = workQueue.poll();
if (task == null) {
try {
Thread.currentThread().sleep(15000);
System.out.println(Thread.currentThread().getName() + "无任务");
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
System.out.println(Thread.currentThread().getName() + "获取到任务");
task.run();
count++;
map.put(name, count);
}
}
}
}
}
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: