您的位置:首页 > 编程语言 > Java开发

《Java 7 并发编程指南》学习概要 (7) 定制并发类

2015-09-28 16:28 507 查看
1、定制ThreadPoolExecutor类

public class MyExecutor extends ThreadPoolExecutor {

private ConcurrentHashMap<String, Date> startTimes;

public MyExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
startTimes = new ConcurrentHashMap<>();
}

@Override
public void shutdown() {
System.out.printf("MyExecutor: Going to shutdown.\n");
System.out.printf("MyExecutor: Executed tasks:%d\n",
getCompletedTaskCount());
System.out.printf("MyExecutor: Running tasks:%d\n", getActiveCount());
System.out.printf("MyExecutor: Pending tasks:%d\n", getQueue().size());
super.shutdown();
}

@Override
public List<Runnable> shutdownNow() {
System.out.printf("MyExecutor: Going to immediately shutdown.\n");
System.out.printf("MyExecutor: Executed tasks: %d\n",
getCompletedTaskCount());
System.out.printf("MyExecutor: Running tasks: %d\n", getActiveCount());
System.out.printf("MyExecutor: Pending tasks: %d\n", getQueue().size());
return super.shutdownNow();
}

@Override
protected void beforeExecute(Thread t, Runnable r) {
System.out.printf("MyExecutor: A task is beginning: %s : %s\n",
t.getName(), r.hashCode());
startTimes.put(String.valueOf(r.hashCode()), new Date());
}

@Override
protected void afterExecute(Runnable r, Throwable t) {
Future<?> result = (Future<?>) r;
try {
System.out.printf("*********************************\n");
System.out.printf("MyExecutor: A task is finishing.\n");
System.out.printf("MyExecutor: Result: %s\n", result.get());
Date startDate = startTimes.remove(String.valueOf(r.hashCode()));
Date finishDate = new Date();
long diff = finishDate.getTime() - startDate.getTime();
System.out.printf("MyExecutor: Duration: %d\n", diff);
System.out.printf("*********************************\n");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
MyExecutor myExecutor = new MyExecutor(2, 4, 1000,
TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>());
List<Future<String>> results = new ArrayList<>();
for (int i = 0; i < 10; i++) {
SleepTwoSecondsTask task = new SleepTwoSecondsTask();
Future<String> result = myExecutor.submit(task);
results.add(result);
}
for (int i = 0; i < 5; i++) {
try {
String result = results.get(i).get();
System.out.printf("Main: Result for Task %d %s\n", i, result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}

myExecutor.shutdown();

for (int i = 5; i < 10; i++) {
try {
String result = results.get(i).get();
System.out.printf("Main: Result for Task %d :%s\n", i, result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
try {
myExecutor.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException e) {
e.printStackTrace();
}

System.out.printf("Main: End of the program.\n");
}
}

class SleepTwoSecondsTask implements Callable<String> {
public String call() throws Exception {
TimeUnit.SECONDS.sleep(2);
return new Date().toString();
}
}


2、实现一个基于优先级的Executor类

public class MyPriorityTask implements Runnable, Comparable<MyPriorityTask> {

private int priority;

private String name;

public MyPriorityTask(String name, int priority) {
this.name = name;
this.priority = priority;
}

public int getPriority() {
return priority;
}

@Override
public int compareTo(MyPriorityTask o) {
if (this.getPriority() < o.getPriority()) {
return 1;
}
if (this.getPriority() > o.getPriority()) {
return -1;
}
return 0;
}

@Override
public void run() {
System.out.printf("MyPriorityTask: %s Priority : %d\n", name, priority);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 2, 1,
TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>());
for (int i = 0; i < 4; i++) {
MyPriorityTask task = new MyPriorityTask("Task " + i, i);
executor.execute(task);
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
for (int i = 4; i < 8; i++) {
MyPriorityTask task = new MyPriorityTask("Task " + i, i);
executor.execute(task);
}
executor.shutdown();
try {
executor.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("Main: End of the program.\n");

}

}


3、实现ThreadFactory接口生成自定义的线程

public class MyThread extends Thread {

private Date creationDate;
private Date startDate;
private Date finishDate;

public MyThread(Runnable target, String name) {
super(target, name);
setCreationDate();
}

@Override
public void run() {
setStartDate();
super.run();
setFinishDate();
}

public void setCreationDate() {
creationDate = new Date();
}

public void setStartDate() {
startDate = new Date();
}

public void setFinishDate() {
finishDate = new Date();
}

public long getExecutionTime() {
return finishDate.getTime() - startDate.getTime();
}

@Override
public String toString() {
StringBuilder buffer = new StringBuilder();
buffer.append(getName());
buffer.append(": ");
buffer.append(" Creation Date: ");
buffer.append(creationDate);
buffer.append(" : Running time: ");
buffer.append(getExecutionTime());
buffer.append(" Milliseconds.");
return buffer.toString();
}

public static void main(String[] args) throws Exception {
MyThreadFactory myFactory = new MyThreadFactory("MyThreadFactory");
MyTask task = new MyTask();
Thread thread = myFactory.newThread(task);
thread.start();
thread.join();
System.out.printf("Main: Thread information.\n");
System.out.printf("%s\n", thread);
System.out.printf("Main: End of the example.\n");
}

}

class MyThreadFactory implements ThreadFactory {
private int counter;
private String prefix;

public MyThreadFactory(String prefix) {
this.prefix = prefix;
counter = 1;
}

@Override
public Thread newThread(Runnable r) {
MyThread myThread = new MyThread(r, prefix + "-" + counter);
counter++;
return myThread;
}

}

class MyTask implements Runnable {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}


4、在一个Executor对象中使用自定义的ThreadFactory

MyThreadFactory myFactory = new MyThreadFactory("MyThreadFactory");
ExecutorService executor=Executors.newCachedThreadPool(myFactory);
MyTask task = new MyTask();
executor.submit(task);
executor.shutdown();
executor.awaitTermination(1, TimeUnit.DAYS);


5、在计划好的线程池中定制运行任务

public class MyScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {

public MyScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize);
}

@Override
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable,
RunnableScheduledFuture<V> task) {
MyScheduledTask<V> myTask = new MyScheduledTask<V>(runnable, null,
task, this);
return myTask;
}

@Override
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay, long period, TimeUnit unit) {
ScheduledFuture<?> task = super.scheduleAtFixedRate(command,
initialDelay, period, unit);
MyScheduledTask<?> myTask = (MyScheduledTask<?>) task;
myTask.setPeriod(TimeUnit.MILLISECONDS.convert(period, unit));
return task;
}

public static void main(String[] args) throws Exception {

MyScheduledThreadPoolExecutor executor = new MyScheduledThreadPoolExecutor(
2);

Task task = new Task();
System.out.printf("Main: %s\n", new Date());

executor.schedule(task, 1, TimeUnit.SECONDS);

TimeUnit.SECONDS.sleep(3);

task = new Task();
System.out.printf("Main: %s\n", new Date());

executor.scheduleAtFixedRate(task, 1, 3, TimeUnit.SECONDS);

TimeUnit.SECONDS.sleep(10);

executor.shutdown();
executor.awaitTermination(1, TimeUnit.DAYS);

System.out.printf("Main: End of the program.\n");
}

}

class MyScheduledTask<V> extends FutureTask<V> implements
RunnableScheduledFuture<V> {

private RunnableScheduledFuture<V> task;

private ScheduledThreadPoolExecutor executor;

private long period;

private long startDate;

public MyScheduledTask(Runnable runnable, V result,
RunnableScheduledFuture<V> task,
ScheduledThreadPoolExecutor executor) {
super(runnable, result);
this.task = task;
this.executor = executor;
}

@Override
public long getDelay(TimeUnit unit) {
if (!isPeriodic()) {
return task.getDelay(unit);
} else {
if (startDate == 0) {
return task.getDelay(unit);
} else {
Date now = new Date();
long delay = startDate - now.getTime();
return unit.convert(delay, TimeUnit.MILLISECONDS);
}
}
}

@Override
public int compareTo(Delayed o) {
return task.compareTo(o);
}

@Override
public boolean isPeriodic() {
return task.isPeriodic();
}

@Override
public void run() {
if (isPeriodic() && (!executor.isShutdown())) {
Date now = new Date();
startDate = now.getTime() + period;
executor.getQueue().add(this);
}

System.out.printf("Pre-MyScheduledTask: %s\n", new Date());
System.out.printf("MyScheduledTask: Is Periodic:%s\n", isPeriodic());
super.runAndReset();
System.out.printf("Post-MyScheduledTask: %s\n", new Date());
}

public void setPeriod(long period) {
this.period = period;
}
}

class Task implements Runnable {

@Override
public void run() {
System.out.printf("Task: Begin.\n");
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.printf("Task: End.\n");
}

}


6、实现ThreadFactory接口生成自定义的线程给Fork/Join框架

public class MyWorkerThread extends ForkJoinWorkerThread {

private static ThreadLocal<Integer> taskCounter = new ThreadLocal<Integer>();

protected MyWorkerThread(ForkJoinPool pool) {
super(pool);
}

@Override
protected void onStart() {
super.onStart();
System.out.printf("MyWorkerThread %d: Initializing task counter.\n",
getId());
taskCounter.set(0);
}

@Override
protected void onTermination(Throwable exception) {
System.out
.printf("MyWorkerThread %d: %d\n", getId(), taskCounter.get());
super.onTermination(exception);
}

public void addTask() {
int counter = taskCounter.get().intValue();
counter++;
taskCounter.set(counter);
}

public static void main(String[] args) throws Exception {
MyWorkerThreadFactory factory=new MyWorkerThreadFactory();
ForkJoinPool pool=new ForkJoinPool(4, factory, null, false);
int array[]=new int[100000];
for (int i=0; i<array.length; i++){
array[i]=1;
}
MyRecursiveTask task=new MyRecursiveTask(array,0,array.length);
pool.execute(task);
task.join();
pool.shutdown();
pool.awaitTermination(1, TimeUnit.DAYS);
System.out.printf("Main: Result: %d\n",task.get());
}

}

class MyWorkerThreadFactory implements ForkJoinWorkerThreadFactory {
@Override
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
return new MyWorkerThread(pool);
}

}

class MyRecursiveTask extends RecursiveTask<Integer> {

private int array[];
private int start, end;

public MyRecursiveTask(int array[], int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}

@Override
protected Integer compute() {
Integer ret;
MyWorkerThread thread = (MyWorkerThread) Thread.currentThread();
thread.addTask();
return ret;
}

private Integer addResults(Task task1, Task task2) {
int value;
try {
value = task1.get().intValue() + task2.get().intValue();
} catch (InterruptedException e) {
e.printStackTrace();
value = 0;
} catch (ExecutionException e) {
e.printStackTrace();
value = 0;
}
try {
TimeUnit.MILLISECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
return value;
}
}


6、自定义在 Fork/Join 框架中运行的任务


7、实现一个自定义的Lock类

class MyAbstractQueuedSynchronizer extends AbstractQueuedSynchronizer {

private AtomicInteger state;

public MyAbstractQueuedSynchronizer() {
state = new AtomicInteger(0);
}

@Override
protected boolean tryAcquire(int arg) {
return state.compareAndSet(0, 1);
}

@Override
protected boolean tryRelease(int arg) {
return state.compareAndSet(1, 0);
}

}

class MyLock implements Lock {
private AbstractQueuedSynchronizer sync;

public MyLock() {
sync = new MyAbstractQueuedSynchronizer();
}

@Override
public void lock() {
sync.acquire(1);
}

@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}

@Override
public boolean tryLock() {
try {
return sync.tryAcquireNanos(1, 1000);
} catch (InterruptedException e) {
e.printStackTrace();
return false;
}
}

@Override
public boolean tryLock(long time, TimeUnit unit)
throws InterruptedException {
return sync
.tryAcquireNanos(1, TimeUnit.NANOSECONDS.convert(time, unit));
}

@Override
public void unlock() {
sync.release(1);
}

@Override
public Condition newCondition() {
return sync.new ConditionObject();
}

}


8、实现一个基于优先级的传输队列

public class MyPriorityTransferQueue<E> extends PriorityBlockingQueue<E>
implements TransferQueue<E> {

public static void main(String[] args) throws Exception {

MyPriorityTransferQueue<Event> buffer = new MyPriorityTransferQueue<Event>();

Producer producer = new Producer(buffer);
Thread producerThreads[] = new Thread[10];
for (int i = 0; i < producerThreads.length; i++) {
producerThreads[i] = new Thread(producer);
producerThreads[i].start();
}

Consumer consumer = new Consumer(buffer);
Thread consumerThread = new Thread(consumer);
consumerThread.start();

System.out.printf("Main: Buffer: Consumer count: %d\n",
buffer.getWaitingConsumerCount());

Event myEvent = new Event("Core Event", 0);
buffer.transfer(myEvent);
System.out.printf("Main: My Event has ben transfered.\n");

for (int i = 0; i < producerThreads.length; i++) {
try {
producerThreads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

TimeUnit.SECONDS.sleep(1);

System.out.printf("Main: Buffer: Consumer count: %d\n",
buffer.getWaitingConsumerCount());

myEvent = new Event("Core Event 2", 0);
buffer.transfer(myEvent);

consumerThread.join();

System.out.printf("Main: End of the program\n");
}

private AtomicInteger counter;

private LinkedBlockingQueue<E> transfered;

private ReentrantLock lock;

public MyPriorityTransferQueue() {
counter = new AtomicInteger(0);
lock = new ReentrantLock();
transfered = new LinkedBlockingQueue<E>();
}

@Override
public boolean tryTransfer(E e) {
lock.lock();
boolean value;
if (counter.get() == 0) {
value = false;
} else {
put(e);
value = true;
}
lock.unlock();
return value;
}

@Override
public void transfer(E e) throws InterruptedException {
lock.lock();
if (counter.get() != 0) {
put(e);
lock.unlock();
} else {
transfered.add(e);
lock.unlock();
synchronized (e) {
e.wait();
}
}
}

@Override
public boolean tryTransfer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
lock.lock();
if (counter.get() != 0) {
put(e);
lock.unlock();
return true;
} else {
transfered.add(e);
long newTimeout = TimeUnit.MILLISECONDS.convert(timeout, unit);
lock.unlock();
e.wait(newTimeout);
lock.lock();

if (transfered.contains(e)) {
transfered.remove(e);
lock.unlock();
return false;
} else {
lock.unlock();
return true;
}
}
}

@Override
public boolean hasWaitingConsumer() {
return (counter.get() != 0);
}

@Override
public int getWaitingConsumerCount() {
return counter.get();
}

@Override
public E take() throws InterruptedException {
lock.lock();
counter.incrementAndGet();

E value = transfered.poll();
if (value == null) {
lock.unlock();
value = super.take();
lock.lock();

} else {
synchronized (value) {
value.notify();
}
}

counter.decrementAndGet();
lock.unlock();
return value;
}
}

class Event implements Comparable<Event> {

private String thread;

private int priority;

public Event(String thread, int priority) {
this.thread = thread;
this.priority = priority;
}

public String getThread() {
return thread;
}

public int getPriority() {
return priority;
}

public int compareTo(Event e) {
if (this.priority > e.getPriority()) {
return -1;
} else if (this.priority < e.getPriority()) {
return 1;
} else {
return 0;
}
}
}

class Producer implements Runnable {

private MyPriorityTransferQueue<Event> buffer;

public Producer(MyPriorityTransferQueue<Event> buffer) {
this.buffer = buffer;
}

public void run() {
for (int i = 0; i < 100; i++) {
Event event = new Event(Thread.currentThread().getName(), i);
buffer.put(event);
}
}
}

class Consumer implements Runnable {

private MyPriorityTransferQueue<Event> buffer;

public Consumer(MyPriorityTransferQueue<Event> buffer) {
this.buffer = buffer;
}

@Override
public void run() {
for (int i = 0; i < 1002; i++) {
try {
Event value = buffer.take();
System.out.printf("Consumer: %s: %d\n", value.getThread(),
value.getPriority());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

}


9、实现自定义的原子对象


public class ParkingCounter extends AtomicInteger {
private int maxNumber;

public ParkingCounter(int maxNumber) {
set(0);
this.maxNumber = maxNumber;
}

public boolean carIn() {
for (;;) {
int value = get();
if (value == maxNumber) {
System.out.printf("ParkingCounter: The parking lot is full.\n");
return false;
} else {
int newValue = value + 1;
boolean changed = compareAndSet(value, newValue);
if (changed) {
System.out.printf("ParkingCounter: A car has entered.\n");
return true;
}
}
}
}

public boolean carOut() {
for (;;) {
int value = get();
if (value == 0) {
System.out
.printf("ParkingCounter: The parking lot is empty.\n");
return false;
} else {
int newValue = value - 1;
boolean changed = compareAndSet(value, newValue);
if (changed) {
System.out.printf("ParkingCounter: A car has gone out.\n");
return true;
}
}
}
}

public static void main(String[] args) throws Exception {
ParkingCounter counter = new ParkingCounter(5);
Sensor1 sensor1 = new Sensor1(counter);
Sensor2 sensor2 = new Sensor2(counter);
Thread thread1 = new Thread(sensor1);
Thread thread2 = new Thread(sensor2);
thread1.start();
thread2.start();

thread1.join();
thread2.join();

System.out.printf("Main: Number of cars: %d\n", counter.get());

}

}

class Sensor1 implements Runnable {
private ParkingCounter counter;

public Sensor1(ParkingCounter counter) {
this.counter = counter;
}

@Override
public void run() {
counter.carIn();
counter.carIn();
counter.carIn();
counter.carIn();
counter.carOut();
counter.carOut();
counter.carOut();
counter.carIn();
counter.carIn();
counter.carIn();
}

}

class Sensor2 implements Runnable {
private ParkingCounter counter;

public Sensor2(ParkingCounter counter) {
this.counter = counter;
}

@Override
public void run() {
counter.carIn();
counter.carOut();
counter.carOut();
counter.carIn();
counter.carIn();
counter.carIn();
counter.carIn();
counter.carIn();
counter.carIn();
}

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: