您的位置:首页 > 产品设计 > UI/UE

【JAVA】JAVA线程及锁基础知识-niyuelin

2016-10-11 14:07 169 查看

JAVA线程及锁基础知识-niyuelin

1、线程实现方式

2、并发关键字

3、Lock锁

4、并发工具类

5、线程池Exector

6、问题及实践

1、线程实现方式

1.Thread
继承Thread类,使用new Thread().start()实现


/**
* 线程死锁  用jstack pid查看死锁
* @author niyuelin
*
*/
public class DeadLock extends Thread {
protected Object tool;
static Object fork1 = new Object();
static Object fork2 = new Object();

public DeadLock(Object tool) {
this.tool = tool;
if (tool == fork1) {
this.setName("哲学家A");
}
if (tool == fork2) {
this.setName("哲学家B");
}
}

@Override
public void run() {
if(tool == fork1){
synchronized (fork1) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (fork2) {
System.out.println("哲学家A开始吃饭了");
}
}
}
if(tool == fork2){
synchronized (fork2) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (fork1) {
System.out.println("哲学家B开始吃饭了");
}
}
}
}
public static void main(String[] args) throws InterruptedException {
DeadLock A = new DeadLock(fork1);
DeadLock B = new DeadLock(fork2);
A.start();
B.start();
Thread.sleep(1000);
}

}


2.Runable
实现接口, new Thread(new Runable()).start()实现


/**
* 线程中断
* @author niyuelin
*
*/
public class ThreadInterrupt implements Runnable {

public void run() {
try {
System.out.println("in run() - about to work2()");
work();
System.out.println("in run() - back from  work2()");
} catch (InterruptedException x) {
System.out.println("in run() -  interrupted in work2()");
return;
}
System.out.println("in run() - doing stuff after nap");
System.out.println("in run() - leaving normally");
}

public void work() throws InterruptedException {
while (true) {
if (Thread.currentThread().isInterrupted()) {
System.out.println("C isInterrupted()=" + Thread.currentThread().isInterrupted());
Thread.sleep(2000);
System.out.println("D isInterrupted()=" + Thread.currentThread().isInterrupted());
}
}
}

public static void main(String[] args) {
ThreadInterrupt si = new ThreadInterrupt();
Thread t = new Thread(si);
t.start();
try {
Thread.sleep(2000);
} catch (InterruptedException x) {
}
System.out.println("in main() - interrupting other thread");
t.interrupt();
System.out.println("in main() - leaving");
}

}


3.Callable
带返回值的线程


/**
* Callable举例
* @author niyuelin
*
*/
public class CallableExample {
public static void main(String[] args) {
Callable<Integer> callable = new Callable<Integer>() {
public Integer call() throws Exception {
return new Random().nextInt(100);
}
};
FutureTask<Integer> future = new FutureTask<Integer>(callable);
new Thread(future).start();
try {
Thread.sleep(1000);// 可能做一些事情
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}


2、并发关键字

1.Synchronized

使用范围:1.对于普通同步方法,锁水当前实例对象

2.对于静态同步方法,锁是当前类的class对象

3。对于同步方法快,锁是synchonized内配置的对象

实现原理:JVM要保证每个monitorenter必须有对应的monitorexit与之配对。

任何对象都有一个monitor与之关联,当且一个monitor被持有后,它将处于锁定状态。

线程执行到monitornter

/**
* Synchronized 举例
* @author niyuelin
*
*/
public class SynchronizedExample {
public Integer inc = 0;

/**
* 方法锁
*/
public synchronized void increase() {
inc++;
}

/**
* 代码块锁
*/
public void increase2() {
synchronized(this){
inc++;
}
}

/**
* 对象锁
*/
public void increase3() {
synchronized(inc.getClass()){
inc++;
}
}

public static void main(String[] args) {
final SynchronizedExample test = new SynchronizedExample();
for (int i = 0; i < 10; i++) {
new Thread() {
public void run() {
for (int j = 0; j < 1000; j++)
test.increase3();
};
}.start();
}

while (Thread.activeCount() > 1) // 保证前面的线程都执行完
Thread.yield();
System.out.println(test.inc);
}
}


2.Volatile

内存语义:1)保证了不同线程对这个变量进行操作时的可见性,即一个线程修改了某个变量的值,这新值对其他线程来说是立即可见的。

2)禁止进行指令重排序。

实现原理: 1.lock前缀指令会引起处理器缓存回写到内存

2.一个处理器的缓存回写到内存回导致其他处理器的缓存无效

例子1:

/**
* -server模式下 保证了不同线程对这个变量进行操作时的可见性
* @author niyuelin
*
*/
public class VolatileExample {
private static volatile boolean stop = false;
public static void main(String[] args) throws Exception {
Thread t = new Thread(new Runnable() {
public void run() {
int i = 0;
while (!stop) {
i++;
System.out.println("hello");
}
}
});
t.start();

Thread.sleep(1000);
TimeUnit.SECONDS.sleep(1);
System.out.println("Stop Thread");
stop = true;
}
}


例子2:

/**
* volatile不保证原子性
* @author niyuelin
*
*/
public class VolatileExample2 {
public volatile int inc = 0;

public void increase() {
inc++;
}

public static void main(String[] args) {
final VolatileExample2 test = new VolatileExample2();
for(int i=0;i<10;i++){
new Thread(){
public void run() {
for(int j=0;j<1000;j++)
test.increase();
};
}.start();
}

while(Thread.activeCount()>1)  //保证前面的线程都执行完
Thread.yield();
System.out.println(test.inc);
}
}


3、 Lock锁

1.ReentrantLock

ReentrantLock 类实现了 Lock ,它拥有与 synchronized 相同的并发性和内存语义,但是添加了类似锁投票、定时锁等候和可中断锁等候的一些特性。此外,它还提供了在激烈争用情况下更佳的性能。(换句话说,当许多线程都想访问共享资源时,JVM 可以花更少的时候来调度线程,把更多时间用在执行线程上。)

/**
* lock
* @author niyuelin
*
*
*/
public class ReentrantLockTest {
public int inc = 0;
Lock lock = new ReentrantLock();

public void increase() {
lock.lock();
try {
inc++;
} finally {
lock.unlock();
}
}

public static void main(String[] args) {
final ReentrantLockTest test = new ReentrantLockTest();
for (int i = 0; i < 10; i++) {
new Thread() {
public void run() {
for (int j = 0; j < 1000; j++)
test.increase();
};
}.start();
}
while (Thread.activeCount() > 1) // 保证前面的线程都执行完
Thread.yield();
System.out.println(test.inc);
}
}


2.ReentrantReadWriteLock

读写锁:分为读锁和写锁,多个读锁不互斥,读锁与写锁互斥,这是由jvm自己控制的,你只要上好相应的锁即可。如果你的代码只读数据,可以很多人同时读,但不能同时写,那就上读锁;如果你的代码修改数据,只能有一个人在写,且不能同时读取,那就上写锁。总之,读的时候上读锁,写的时候上写锁!

/**
* ReadWriteLock
* @author niyuelin
*
*/
public class ReentrantReadWriteLockTest {
static class MyObject {
private Object object;
private ReadWriteLock lock = new ReentrantReadWriteLock();

public void get() {
lock.readLock().lock();
System.out.println(Thread.currentThread().getName() + "准备读数据!!");

try {
Thread.sleep(new Random().nextInt(1000));
System.out.println(Thread.currentThread().getName() + "读数据为:" + this.object);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.readLock().unlock();
}
}

public void put(Object object) {
lock.writeLock().lock();
System.out.println(Thread.currentThread().getName() + "准备写数据");

try {
Thread.sleep(new Random().nextInt(1000));
this.object = object;
System.out.println(Thread.currentThread().getName() + "写数据为" + this.object);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.writeLock().unlock();
}
}
}

public static void main(String[] args) throws InterruptedException {
final MyObject myObject = new MyObject();
ExecutorService executor = Executors.newCachedThreadPool();

for (int i = 0; i < 3; i++) {
executor.execute(new Runnable() {

@Override
public void run() {
for (int j = 0; j < 5; j++)
myObject.put(new Random().nextInt(1000));
}
});
}

for (int i = 0; i < 3; i++) {
executor.execute(new Runnable() {

@Override
public void run() {
for (int j = 0; j < 5; j++)
myObject.get();
}
});
}

executor.shutdown();
}
}


3.Condition 条件变量

条件 (也称为 条件队列 或 条件变量 )为线程提供了一个含义,以便在某个状态条件现在可能为 true 的另一个线程通知它之前,一直挂起该线程(即让其“等待”)。因为访问此共享状态信息发生在不同的线程中,所以它必须受保护,因此要将某种形式的锁与该条件相关联

/**
* ReentrantLock
* @author niyuelin
*
*/
public class ReeterLockCondition implements Runnable{
public static ReentrantLock lock = new ReentrantLock();
public static Condition condition = lock.newCondition();
@Override
public void run() {
lock.lock();
try{
condition.await();
System.out.println("Thread is going on");
}catch(InterruptedException e){
e.printStackTrace();
}finally {
lock.unlock();
}
}

public static void main(String[] args) throws InterruptedException {
ReeterLockCondition r1 = new ReeterLockCondition();
Thread t1 = new Thread(r1);
t1.start();
Thread.sleep(2000);
lock.lock();
condition.signal();
lock.unlock();
System.out.println(Runtime.getRuntime().availableProcessors());
}

}


4、并发工具类

1.CountDownLatch

等待多线程完成

例子1:

/**
* CountDownLatch
* @author niyuelin
*
*/
public class CountDownLatchDemo {
final static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(2);// 两个工人的协作
Worker worker1 = new Worker("zhang san", 5000, latch);
Worker worker2 = new Worker("li si", 8000, latch);
worker1.start();//
worker2.start();//
latch.await();// 等待所有工人完成工作
System.out.println("all work done at " + sdf.format(new Date()));
}

static class Worker extends Thread {
String workerName;
int workTime;
CountDownLatch latch;

public Worker(String workerName, int workTime, CountDownLatch latch) {
this.workerName = workerName;
this.workTime = workTime;
this.latch = latch;
}

public void run() {
System.out.println("Worker " + workerName + " do work begin at " + sdf.format(new Date()));
doWork();// 工作了
System.out.println("Worker " + workerName + " do work complete at " + sdf.format(new Date()));
latch.countDown();// 工人完成工作,计数器减一

}

private void doWork() {
try {
Thread.sleep(workTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}


例子2:

/**
* CountDownLatch
* @author niyuelin
*
*/
public class CountDownLatchDemo2 {
final static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
static List<String> returnList = new ArrayList<String>();

public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(2);// 两个工人的协作
Worker worker1 = new Worker("zhang san", 5000, latch);
Worker worker2 = new Worker("li si", 8000, latch);
worker1.start();//
worker2.start();//
latch.await();// 等待所有工人完成工作
System.out.println("all work done at " + sdf.format(new Date()));
System.out.println(JSON.toJSON(returnList));
}

static class Worker extends Thread {
String workerName;
int workTime;
CountDownLatch latch;

public Worker(String workerName, int workTime, CountDownLatch latch) {
this.workerName = workerName;
this.workTime = workTime;
this.latch = latch;
}

public void run() {
System.out.println("Worker " + workerName + " do work begin at " + sdf.format(new Date()));
doWork();// 工作了
System.out.println("Worker " + workerName + " do work complete at " + sdf.format(new Date()));
returnList.add(workerName);
latch.countDown();// 工人完成工作,计数器减一

}

private void doWork() {
try {
Thread.sleep(workTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}


2.CyclicBarrier

同步屏障

例子1:

/**
* CyclicBarrier
* @author niyuelin
*
*/
public class CyclicBarrierDemo {
public static void main(String[] args) {
int N = 4;
CyclicBarrier barrier = new CyclicBarrier(N);
for (int i = 0; i < N; i++)
new Writer(barrier).start();
}

static class Writer extends Thread {
private CyclicBarrier cyclicBarrier;

public Writer(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}

@Override
public void run() {
System.out.println("线程" + Thread.currentThread().getName() + "正在写入数据...");
try {
Thread.sleep(5000); // 以睡眠来模拟写入数据操作
System.out.println("线程" + Thread.currentThread().getName() + "写入数据完毕,等待其他线程写入完毕");
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("所有线程写入完毕,继续处理其他任务...");
}
}
}


例子2:

/**
* CyclicBarrier
* @author niyuelin
*
*/
public class CyclicBarrierDemo2 {
public static void main(String[] args) {
int N = 4;
CyclicBarrier barrier = new CyclicBarrier(N, new Runnable() {
@Override
public void run() {
System.out.println("当前线程" + Thread.currentThread().getName());
}
});

for (int i = 0; i < N; i++)
new Writer(barrier).start();
}

static class Writer extends Thread {
private CyclicBarrier cyclicBarrier;

public Writer(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}

@Override
public void run() {
System.out.println("线程" + Thread.currentThread().getName() + "正在写入数据...");
try {
Thread.sleep(5000); // 以睡眠来模拟写入数据操作
System.out.println("线程" + Thread.currentThread().getName() + "写入数据完毕,等待其他线程写入完毕");
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
System.out.println("所有线程写入完毕,继续处理其他任务...");
}
}
}


3.Semaphore

控制并发线程数

/**
* Semaphore
* @author niyuelin
*
*/
public class SemaphoreTest {

private static final int THREAD_COUNT = 30;
private static ExecutorService executorService = Executors.newFixedThreadPool(30);

private static Semaphore s = new Semaphore(10);
private static int in = 0;

public static synchronized void add(){
in++;
}

public static void main(String[] args) throws InterruptedException {
for(int i =0; i<THREAD_COUNT; i++){
executorService.execute(new Runnable() {
public void run() {
try {
s.acquire();
Thread.sleep(1000);
add();
System.out.println("save data");
s.release();
} catch (InterruptedException e) {
e.printStackTrace();
}

}
});
}
Thread.sleep(1500);
System.out.println(in);
executorService.shutdown();
}
}


4.Exchanger

线程间交换

/**
* Exchanger
* @author niyuelin
*
*/
public class ExChangerTest {
private static final Exchanger<String> exgr = new Exchanger<String>();
private static ExecutorService executorService = Executors.newFixedThreadPool(2);

public static void main(String[] args) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
String A = "银行流水A";
String c = exgr.exchange(A);
System.out.println("c: "+ c);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
executorService.execute(new Runnable() {
public void run() {
try {
String B = "银行流水B";
String A = exgr.exchange(B);
System.out.println("A:"+A+" ,B:"+B);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
executorService.shutdown();
}
}


5、线程池Exector

1.newFixedThreadPool创建一个指定工作线程数量的线程池。

每当提交一个任务就创建一个工作线程,如果工作线程数量达到线程池初始的最大数,则将提交的任务存入到池队列中。

2.newCachedThreadPool创建一个可缓存的线程池。

1).工作线程的创建数量几乎没有限制(其实也有限制的,数目为Interger. MAX_VALUE), 这样可灵活的往线程池中添加线程。 2).如果长时间没有往线程池中提交任务,即如果工作线程空闲了指定的时间(默认为1分钟),则该工作线程将自动终止。终止后,如果你又提交了新的任务,则线程池重新创建一个工作线程。

3.newSingleThreadExecutor创建一个单线程化的Executor

即只创建唯一的工作者线程来执行任务,如果这个线程异常结束,会有另一个取代它,保证顺序执行(我觉得这点是它的特色)。单工作线程最大的特点是可保证顺序地执行各个任务,并且在任意给定的时间不会有多个线程是活动的 。

4.ThreadPoolExector 自定义线程池工作线程大小,和队列形式。

5.newScheduleThreadPool创建一个定长的线程池,而且支持定时的以及周期性的任务执行,类似于Timer。

public class ThreadPoolExample {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(5));
for (int i = 0; i < 15; i++) {
MyTask myTask = new MyTask(i);
executor.execute(myTask);
System.out.println("线程池中线程数目:" + executor.getPoolSize() + ",队列中等待执行的任务数目:" + executor.getQueue().size()
+ ",已执行玩别的任务数目:" + executor.getCompletedTaskCount());
}
executor.shutdown();
}
}

class MyTask implements Runnable {
private int taskNum;

public MyTask(int num) {
this.taskNum = num;
}

@Override
public void run() {
System.out.println("正在执行task " + taskNum);
try {
Thread.currentThread().sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("task " + taskNum + "执行完毕");
}
}


6、问题及实践

1.hashMap及ConcurrentHashMap

1)ConcurrentHashMap对整个桶数组进行了分段,而HashMap则没有

2)ConcurrentHashMap在每一个分段上都用锁进行保护,从而让锁的粒度更精细一些,并发性能更好,而HashMap没有锁机制,不是线程安全的

/**
* ConcurrentHashMap 不会有问题,HashMap会有问题
* @author niyuelin
*
*/
public class ConcurrentMapExample {
private static Map<Integer, Integer> map = new ConcurrentHashMap<Integer, Integer>();
private static Map<Integer, Integer> map2 = new HashMap<Integer, Integer>();

public static void main(String[] args) {
concurentMap(map2);
}

public static void concurentMap(Map<Integer, Integer> newMap){
ExecutorService executors = Executors.newCachedThreadPool();
for(int i =1; i <=20; i++){
executors.execute(new MapSet(newMap, i));
}
executors.shutdown();
System.out.println(newMap);
}

static class MapSet implements Runnable {
private Map<Integer, Integer> map;
private Integer value;

public MapSet(Map<Integer, Integer> map, Integer value) {
super();
this.map = map;
this.value = value;
}

public void run() {
map.put(value, value);
}
}
}


2.线程消费者生产者

/**
* 生产者消费者
* @author niyuelin
*
*/
public class ProducerConsumer {
public static void main(String[] args) {
ProducerConsumer pc = new ProducerConsumer();

Storage s = pc.new Storage();

ExecutorService service = Executors.newCachedThreadPool();
Producer p = pc.new Producer("张三", s);
Consumer c = pc.new Consumer("王五", s);
service.submit(p);
service.submit(c);
}

/**
* 消费者
*/
class Consumer implements Runnable {
private String name;
private Storage s = null;

public Consumer(String name, Storage s) {
this.name = name;
this.s = s;
}

public void run() {
try {
while (true) {
System.out.println(name + "准备消费产品.");
Product product = s.pop();
System.out.println(name + "已消费(" + product.toString() + ").");
System.out.println("===============");
Thread.sleep(500);
}
} catch (InterruptedException e) {
e.printStackTrace();
}

}

}

/**
* 生产者
*/
class Producer implements Runnable {
private String name;
private Storage s = null;

public Producer(String name, Storage s) {
this.name = name;
this.s = s;
}

public void run() {
try {
while (true) {
Product product = new Product((int) (Math.random() * 10000)); // 产生0~9999随机整数
System.out.println(name + "准备生产(" + product.toString() + ").");
s.push(product);
System.out.println(name + "已生产(" + product.toString() + ").");
System.out.println("===============");
Thread.sleep(500);
}
} catch (InterruptedException e1) {
e1.printStackTrace();
}

}
}

/**
* 仓库,用来存放产品
*/
public class Storage {
BlockingQueue<Product> queues = new LinkedBlockingQueue<Product>(10);

/**
* 生产
*
* @param p
*            产品
* @throws InterruptedException
*/
public void push(Product p) throws InterruptedException {
queues.put(p);
}

/**
* 消费
*
* @return 产品
* @throws InterruptedException
*/
public Product pop() throws InterruptedException {
return queues.take();
}
}

/**
* 产品
*/
public class Product {
private int id;

public Product(int id) {
this.id = id;
}

public String toString() {// 重写toString方法
return "产品:" + this.id;
}
}
}


3.线上问题排查方法

jstack命令的语法格式: jstack 。可以用jps查看java进程id

参考:

1:http://www.cnblogs.com/dolphin0520/p/3920373.html

2:http://www.tuicool.com/articles/6vANna

3:http://www.cnblogs.com/dolphin0520/p/3932921.html

4:http://itindex.net/detail/48955-jstack-dump-%E6%97%A5%E5%BF%97

5:《Java并发编程的艺术》
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  java 线程 线程池 并发