您的位置:首页 > 编程语言

并发编程之基础(二)

2016-03-20 11:49 519 查看

java并发编程中对共享资源的访问

1.解决共享资源竞争问题

  设想一种极端状况,A、B两人都想拿篮子中的苹果,但是现在篮子中只剩一个苹果,如果我们不加管理让A、B自己去争,可能会导致A、B为此掐架。所以我们应该有个规定,比如:当A、B都想拿苹果时,谁先碰到篮子谁先拿苹果(同时碰到不做考虑),并且当其中一个人拿完之后归还篮子,另个一人才能去取。这个篮子中的苹果就相当于共享资源,篮子就相当锁。谁先碰到篮子就意味着谁先拿到锁,只有持有锁的人才能对资源进行访问,归还篮子就相当于释放锁,这样就又进行下一轮的资源竞争。

java中提供了synchronized关键字来对共享资源(即所谓的域)进行控制访问 关于该关键字的使用方法详见该博文 /article/5084362.html

synchronized是可重入锁。同一线程在调用自己类中其他synchronized方法/块或调用父类的synchronized方法/块都不会阻碍该线程的执行,就是说同一线程对同一个对象锁是可重入的,而且同一个线程可以获取同一把锁多次,也就是可以多次重入。其实现方法是为每个锁关联一个线程持有者和计数器,当计数器为0时表示该锁没有被任何线程持有,那么任何线程都可能获得该锁而调用相应的方法;当某一线程请求成功后,JVM会记下锁的持有线程,并且将计数器置为1;此时其它线程请求该锁,则必须等待;而该持有锁的线程如果再次请求这个锁,就可以再次拿到这个锁,同时计数器会递增;当线程退出同步代码块时,计数器会递减,如果计数器为0,则释放该锁。

使用显示的Lock对象,但是该对象必须被显示的创建、锁定和释放 使用方法 /article/4719064.html

2.原子类

  2.1 原子性操作

  原子性操作可用于longdouble之外的所有基本类型上的“简单操作”(即不可分割操作)。当定义longdouble变量时,如果使用volatile关键字,就会获得原子性。原子性操作虽然可以不可被中断,但也存在不安全的问题。在多处理器系统中,一个任务做出的修改,即使在不中断的意义上讲是原子性的,但对其它任务可能是不可视的(例如:修改只是暂时性的存储到处理器的cache中去,并没有同步到内存中),即其它任务可能看不到这个修改操作。如果将一个域(其实就是变量)声明为volatile,那么对这个域写操作的结果会立即被写到主存,即便用了本地缓存机制(java中的线程访问堆中的某个对象的成员属性时,会在本地缓存一个副本,只是对副本进行操作),所有线程的读操作都可以看到这个修改,所以volatile可以确保应用中的可视性。如果多个任务同时访问某个域,那么这个域应该是volatile,否则,这个域应该只能经由同步来访问。同步会导致向主存中刷新,因此如果使用了synchronized,就不必使用volatile。但是,volatile有其局限性,当一个域的值依赖于它之前的值时(例如一个计数器),volatile就无法正常工作,所以首选应该是[b]synchronized。[/b]

  我们希望下面代码一直是死循环状态,但是在实际运行中并不是这样,如何更改呢?

public class Test implements Runnable {
private volatile int i = 0;
public static void main(String[] args) {
Test test = new Test();
Thread t = new Thread(test);
t.start();
while (true) {
int val = test.getValue();
if (val % 2 != 0) {
System.out.println(val);
System.exit(0);
}
}
}

@Override
public void run() {
while (true) {
incr();
}
}

public int getValue() {
return i;
}

public synchronized void incr() {
i++;
i++;
}
}


  2.2 原子类

  java中引入了如AtomicInteger、AtomicLong、AtomicReference等特殊原子性变量类,并提供了相应的更新操作,对于常规编程来说,它们很少会派上用场,但是涉及性能调优时,就会有用武之地。下面只是一个简单的事例程序。

import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicInteger;

public class Test {
public static void main(String[] args) {
new Timer().schedule(new TimerTask() {
public void run() {
System.out.println("Aborting");
System.exit(0);
}
}, 10000);
AtomicIntegerTest a = new AtomicIntegerTest();
Thread t = new Thread(a);
t.start();
while (true) {
int val = a.getValue();
if (val % 2 != 0) {
System.out.println(val);
System.exit(0);
}
}
}
}

class AtomicIntegerTest implements Runnable {
private AtomicInteger i = new AtomicInteger(0);
public int getValue() {
return i.get();
}
private void evenIncrement() {
i.addAndGet(2);
}

@Override
public void run() {
while (true)
evenIncrement();
}

}


3.线程本地存储

线程本地存储可以为使用同一种数据类型的不同线程创建不同的存储,可以简单的理解为为每个线程增加了一个类似线程名、优先级这样的属性,我们可以随时随地的进行访问,并且不用担心线程冲突的问题。关于本地存储的详细说明可以参考以下链接 链接1 链接2

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class Test {

private static ThreadLocal<Integer> value = new ThreadLocal<Integer>() {
private Random rand = new Random(47);
protected synchronized Integer initialValue() {
return rand.nextInt(1000);
}
};

public static void increment() {
value.set(value.get() + 1);
}

public static int get() {
return value.get();
}

public static void main(String[] args) throws InterruptedException {
ExecutorService exe = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
exe.execute(new Accessor(i));
}
TimeUnit.SECONDS.sleep(3);
exe.shutdown();
}

}

class Accessor implements Runnable {
private final int id;
public Accessor(int idn) {
id = idn;
}
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
Test.increment();
System.out.println(this);
Thread.yield();
}

}

public String toString() {
return "#" + id + ": " + Test.get();
}
}


4.阻塞

  线程可以通过一下进入阻塞态

调用sleep函数 Thread.sleep(1000)或TimeUnit.SECONDS.sleep(1)睡眠1S

调用wait()挂起 wait()函数是每个线程都已经实现的函数,并且只能通过notify()或notifyAll()来唤醒

I/O操作

等待锁

5.中断

  每个线程有个中断标志位,当调用该线程的Interrupt()方法时,会把该中断标志位置为1,可以在该线程内部通过isInterrupted()方法查看该位是否被置为1,但是标志位不会被清零。使用Thread.interrupted方法也可查看该位是否为1,并具有清零效果。当然在有些情况下,触发中断会直接抛出异常。下面的事例代码是在睡眠阻塞、I/O阻塞和锁阻塞的情况下使用中断。

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

class SleepBlocked implements Runnable {

@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(100);
} catch (InterruptedException e) {
System.out.println("InterruptionExecption");

}
System.out.println("Exiting SleepBlocked.run()");
}
}

class IOBlocked implements Runnable {
private InputStream in;
public IOBlocked(InputStream is) {
in = is;
}
@Override
public void run() {
try {
System.out.println("Waiting for read():");
in.read(); //等待输入而阻塞
} catch (IOException e) {
if (Thread.currentThread().isInterrupted())
System.out.println("Interrupted from blocked I/O");
else
throw new RuntimeException();
}
System.out.println("Exiting IOBlocked.run()");
}
}

class SynchronizedBlocked implements Runnable {
public synchronized void f() {
while (true) //永久性的获取锁,并不释放
Thread.yield();
}

public SynchronizedBlocked() {
new Thread() {
public void run() {
f();
}
}.start();
}

@Override
public void run() {
System.out.println("Trying to call f()");
f(); //因等待锁而阻塞
System.out.println("Exiting SychronizedBlocked.run()");
}
}

class NormalRun implements Runnable {

@Override
public void run() {
while (true) {
if (Thread.interrupted()) {
System.out.println("fdfdf");
break;
}

}
}

}
public class Interrupting {
private static ExecutorService exec = Executors.newCachedThreadPool();

static void test(Runnable r) throws InterruptedException {
Future<?> f = exec.submit(r);
TimeUnit.MILLISECONDS.sleep(100);
System.out.println("Interrupting " + r.getClass().getName());
f.cancel(true);
System.out.println("Interrupting sent to " + r.getClass().getName());
}

public static void main(String[] args) throws Exception {
test(new SleepBlocked());
test(new IOBlocked(System.in));
test(new SynchronizedBlocked());
test(new NormalRun());
TimeUnit.MILLISECONDS.sleep(3);
System.out.println("Aborting with System.exit(0)");
System.exit(0);
}
}




运行结果分析:SleepBlocked线程是可中断的,并会捕获中断异常。而IOBlocked和SynchronizedBlocked是不可中断的阻塞示例,即使给他们发中断信信号,它们也置之不顾。代码中也表明在一个非阻塞的线程中也可以通过查看中断标志位来查看是否被中断(没有实际用途)。

6.中断因I/O而发生阻塞的任务

如果我们的确需要中断阻塞的IO操作可以通过关闭底层资源的方式来实现。

package com.dy.xidian;

import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

class IOBlocked implements Runnable {
private InputStream in;
public IOBlocked(InputStream is) {
in = is;
}
@Override
public void run() {
System.out.println("Waiting for read()");
try {
in.read();
} catch (IOException e) {
if (Thread.currentThread().isInterrupted())
System.out.println("Interrupted from blocked I/O");
else
throw new RuntimeException();
}
System.out.println("Exiting IOBlock.run()");
}

}
public class CloseResource {
public static void main(String[] args) throws Exception {
ExecutorService exec = Executors.newCachedThreadPool();
@SuppressWarnings("resource")
InputStream socketInput = new Socket("localhost", 80).getInputStream();
exec.execute(new IOBlocked(socketInput));
exec.execute(new IOBlocked(System.in));
TimeUnit.MILLISECONDS.sleep(111);
System.out.println("Shutting down all threads");
exec.shutdownNow();
TimeUnit.SECONDS.sleep(1);
System.out.println("Closing " + socketInput.getClass().getName());
socketInput.close();
TimeUnit.SECONDS.sleep(1);
System.out.println("Closing " + System.in.getClass().getName());
System.in.close();
}
}




7.中断因锁阻塞的任务

java中提供了ReentranLock类,在该类上的阻塞任务具备可以被中断的能力。

package com.dy.xidian;

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

class BlockedMutex {
private Lock lock = new ReentrantLock();
public BlockedMutex() {
lock.lock(); //获取锁
}

public void f() {
InputStream is = System.in;
try {
lock.lockInterruptibly(); //如果该线程未被中断,则获取锁;若未拿到锁,则睡眠,直到锁被释放或中断发生
System.out.println("lock acquired in f()");
} catch (InterruptedException e) {
System.out.println("Interrupted from lock acquisition in f()");
try {
is.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}

class Blocked2 implements Runnable {
BlockedMutex blokced = new BlockedMutex();
@Override
public void run() {
System.out.println("Waiting for f() in BlockedMutex");
blokced.f();
System.out.println("Broken out of blocked call");
}
}

public class Test {
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(new Blocked2()); // 在创建该对象时主线程已经持有BlockedMutex对象锁
t.start();
TimeUnit.SECONDS.sleep(2);
System.out.println("Issuing t.intrrupt()");
t.interrupt();
}
}


内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: