您的位置:首页 > 其它

2013.1.30 作业第二题

2013-01-30 20:20 274 查看
使用线程池

* 什么样的任务需要特殊的执行策略呢?
1) 有依赖关系的任务.
    想想如果提交给线程池的任务之间是有依赖关系的, 怎么办?
2) 在原本单线程访问外部资源的情况下, 如果突然改成了线程池呢?
3) 考虑即时响应性的程序.
    如果在用户界面提交了一个长时间运行的任务而线程池是单线程的 , 或者提交N个长时间运行的任务, 而线程池只有N-1的线程数, 会怎样?
4) 不要在线程池的线程里面使用ThreadLocal对象
    线程池可以自由使用/重用/替换某个工作线程, 因而可能造成工作线程所访问的ThreadLocal风格的对象被调换或重置或清除. 这一点很严重. 除非ThreadLocal的对象有特定的生命周期, 并且可以在线程池对线程的"借出"周期内完成, 才可以谨                                                  慎考虑使用.
   
    考虑下Hibernate? 很多情况下, 如果用EJB容器+Hibernate, 情况会怎样? hibernate是否会在session退出的时候清理ThreadLocal对象?

* 线程池最理想的工作情景是: 所处理的任务全部是同类的, 或者全部是不相关的

* 考虑线程死锁
    当然这发生在有相互关系的线程身上. 如果提交的任务之间有结果依赖关系的时候, 要考虑进去
public class ThreadDeadlock {
    ExecutorService exec = Executors.newSingleThreadExecutor();

    public class RenderPageTask implements Callable<String> {
        public String call() throws Exception {
            Future<String> header, footer;
            header = exec.submit(new LoadFileTask("header.html"));
            footer = exec.submit(new LoadFileTask("footer.html"));
            String page = renderBody();

            // Will deadlock -- task waiting for result of subtask
            // get()是取自一个阻塞队列的数据?
            return header.get() + page + footer.get();
        }
    }
}

* 如果确实有需要长时间运行的任务, 并且客户端需要及时的响应, 可以考虑使用有时间限制版本的线程API, 很多API如Thread.join, BlockingQueue.put, CountDownLatch.await, and Selector.select都有不限时和定时的版本. 对于超时的情况可以及时反馈给客户端

* 考虑线程池的大小, 太大造成资源过度消耗, 太小造成吞吐量不足
1) 计算机的cpu数量, 内存
2) 任务分为哪些种类? (可以为不同类型的任务使用不同的线程池)
3) cpu利用率

@? EJB规范对线程的限制? 在session bean调用之后不能启动新的线程?

对于可能出现阻塞操作的线程, 线程池的容量应该要大一些, 以适应吞吐量的要求. 比如使用N[cpu]+1个线程的线程池, 就可以在"某一个"线程被阻塞或者出现问题的时候, 仍有空闲的线程来处理"额外"的请求.

要更准确的估计线程池容量的大小, 就要考虑到线程执行所需大概的时间, 这个时间不一定很精确, 但是可以通过性能测试来比较, 试着调整线程池的容量, 在高并发的情况下, 吞吐量和cpu,内存使用率之间需要一个折中的结果.

给出一个估算的"公式"
N[cpu] : Number of cpu, 表示cpu个数(从Runtime.availableProcessors获得)
U[cpu] : Utilization of cpu, 表示期望的cpu使用率, 1%~100%
W/C : Wait time/Computing time, 可能出现的等待时间除以实际计算时间
在这三个因素的影响下, 线程数的"公式"为
N[thread] = N[cpu] * U[cpu] * (1+W/C)

比如一个双核CPU的计算机且希望cpu利用率为30%, 且某一个任务需要等待数据库连接返回结果0.2s, 实际计算时间0.1s
N[thread] = 2 * 30% * (1+0.2/0.1) = 1.8, 即为2线程.

但是如果我们期待数据库的等待时间为0.05, 那么我们就要把这个代价分摊开, 需要提高响应的线程数(当然同时提高了CPU占用率).

还有些有趣的情况, 如果这些任务每一个都需要使用一个从连接池获取的连接, 那么其实线程池的容量已经被连接池的容量所限制了, 连接池的容量便成为了线程池容量的上限, 因为如果过多的线程也无法获取连接, 只能阻塞.

* 核心线程数(core pool size), 最大线程数
核心线程数其实就是目标线程数,即使没有任务可以执行, 线程池也尝试维护这个数量的线程. 如果工作队列的满了再创建更多的线程. 最大线程数是不言而喻的, 当任务超过这个数量的时候, 多余的任务讲被挂起. 如果一个线程空闲下来, 超过了空闲时间限制, 就有可能被回收.

Whena ThreadPoolExecutor is initially created, the core threads are not started immediately but instead as tasks are submitted, unless you call prestartAllCoreThreads

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) { ... }

Executors.newFixedThreadPool: 核心线程数=最大线程数=设定的线程数
newCachedThreadPool :   核心线程数=设定的线程数
                        最大线程数=Integer.MAX_VALUE
                        超时时间=1分钟, 且会自动收缩

* 使用请求队列的好处: 防止大量并发的请求对服务器端造成冲击, 并且在压力过大的情况下抛弃某些请求.

ThreadPoolExecutor allows you to supply a BlockingQueue to hold tasks awaiting execution. There are three basic approaches to task queueing: unbounded queue, bounded queue, and synchronous handoff

newFixedThreadPool和newSingleThreadExecutor 固定了线程池的线程数, 但是用的任务队列是"无限"的队列, 可以提交任意多的任务, 但这样带来的问题就是内存使用率的直线上升

使用有容量限制的ArrayBlockingQueue, LinkedBlockingQueue , Priority-BlockingQueue利于降低内存,cpu使用率

使用SynchronousQueue其实是忽略了在producer/consumer之间的队列,这是一种在线程间传递请求的机制. 要求是必须有可用的线程, 或者能按需创建新线程, 否则请求会被拒绝. 实际上, SynchronousQueue适用于线程池是无限大或者可以拒绝请求的情况.

* 再提一次:
LinkedBlockingQueue or ArrayBlockingQueue 按顺序处理
PriorityBlockingQueue: 要求任务实现Comparable或提供一个Comparator

* 饱和策略
说的是, 当任务队列满了的时候, 该怎么办? 通常是拒绝, 但是具体怎么拒绝, 也有一些讲究
ThreadPoolExecutor.setRejectedExecutionHandler 可用的有 AbortPolicy, CallerRunsPolicy, DiscardPolicy, and DiscardOldestPolicy
AbortPolicy: 抛出异常 RejectedExecutionException
CallerRunsPolicy: "借用"调用者的线程来执行, 即不使用线程池内的线程
DiscardPolicy: 忽略任务
DiscardOldestPolicy: 忽略即将执行的下一个任务
    如果同PriorityBlockingQueue合用, 那么被忽略的将是优先级最高的.

ThreadPoolExecutor executor= new ThreadPoolExecutor(
        N_THREADS, N_THREADS,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>(CAPACITY));
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

也可以使用Semaphore来代替有界队列
public class BoundedExecutor {
    private final Executor exec;
    private final Semaphore semaphore;

    public BoundedExecutor(Executor exec, int bound) {
        this.exec = exec;
        this.semaphore = new Semaphore(bound);
    }

    public void submitTask(final Runnable command)
            throws InterruptedException {
        semaphore.acquire();
        try {
            exec.execute(new Runnable() {
                public void run() {
                    try {
                        command.run();
                    } finally {
                        semaphore.release();
                    }
                }
            });
        } catch (RejectedExecutionException e) {
            semaphore.release();
        }
    }
}

* Thread Factories
线程池是使用Thread Factory来提供线程的, 当有需要的时候, 会调用ThreadFactory.newThread
public interface ThreadFactory {
    Thread newThread(Runnable r);
}
显然是为了自己一些目的才实现自己的ThreadFactory

public class MyThreadFactory implements ThreadFactory {
    private final String poolName;

    public MyThreadFactory(String poolName) {
        this.poolName = poolName;
    }

    public Thread newThread(Runnable runnable) {
        return new MyAppThread(runnable, poolName);
    }
}

如果在线程池里面创建一个这样的线程, 那么这个线程池就有了log和异常处理的功能
public class MyAppThread extends Thread {
    public static final String DEFAULT_NAME = "MyAppThread";
    private static volatile boolean debugLifecycle = false;
    private static final AtomicInteger created = new AtomicInteger();
    private static final AtomicInteger alive = new AtomicInteger();
    private static final Logger log = Logger.getAnonymousLogger();

    public MyAppThread(Runnable r) { this(r, DEFAULT_NAME); }

    public MyAppThread(Runnable runnable, String name) {
        super(runnable, name + "-" + created.incrementAndGet());
        setUncaughtExceptionHandler(
            new Thread.UncaughtExceptionHandler() {
                public void uncaughtException(Thread t,
                                              Throwable e) {
                    log.log(Level.SEVERE,
                        "UNCAUGHT in thread " + t.getName(), e);
                }
            });
    }

    public void run() {
        // Copy debug flag to ensure consistent value throughout.
        boolean debug = debugLifecycle;
        if (debug) log.log(Level.FINE, "Created "+getName());
        try {
            alive.incrementAndGet();
            super.run();
        } finally {
            alive.decrementAndGet();
            if (debug) log.log(Level.FINE, "Exiting "+getName());
        }
    }

    public static int getThreadsCreated() { return created.get(); }
    public static int getThreadsAlive() { return alive.get(); }
    public static boolean getDebug() { return debugLifecycle; }
    public static void setDebug(boolean b) { debugLifecycle = b; }
}

当然, ThreadExecutor创建后还是可以做一些额外的配置
ExecutorService exec = Executors.newCachedThreadPool();
if (exec instanceof ThreadPoolExecutor)
    ((ThreadPoolExecutor) exec).setCorePoolSize(10);
else
    throw new AssertionError("Oops, bad assumption");

不过注意, Executors.newFixedThreadPool()和newSingleThreadPool()不能重新配置

* ThreadPoolExecutor的扩展
可覆盖的方法有beforeExecute, afterExecute, and terminate
很简单, 看代码就行
public class TimingThreadPool extends ThreadPoolExecutor {
    private final ThreadLocal<Long> startTime
            = new ThreadLocal<Long>();
    private final Logger log = Logger.getLogger("TimingThreadPool");
    private final AtomicLong numTasks = new AtomicLong();
    private final AtomicLong totalTime = new AtomicLong();

    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        log.fine(String.format("Thread %s: start %s", t, r));
        startTime.set(System.nanoTime());
    }

    protected void afterExecute(Runnable r, Throwable t) {
        try {
            long endTime = System.nanoTime();
            long taskTime = endTime - startTime.get();
            numTasks.incrementAndGet();
            totalTime.addAndGet(taskTime);
            log.fine(String.format("Thread %s: end %s, time=%dns",
                    t, r, taskTime));
        } finally {
            super.afterExecute(r, t);
        }
    }

    protected void terminated() {
        try {
            log.info(String.format("Terminated: avg time=%dns",
                    totalTime.get() / numTasks.get()));
        } finally {
            super.terminated();
        }
    }
}

* 并行算法, 有点意思
串行方式:
void processSequentially(List<Element> elements) {
    for (Element e : elements)
        process(e);
}
最简单的并行, 不过只适用于各个任务是相对独立的, 并且每个任务不需要返回值:
void processInParallel(Executor exec, List<Element> elements) {
    for (final Element e : elements)
        exec.execute(new Runnable() {
            public void run() { process(e); }
        });
}

* 递归算法的简单并行化
这是一种很简单的情况, 每个递归不需要返回值. 当然, 在需要返回值的情况下, 可以考虑把值存放在其他地方
原始算法:
public<T> void sequentialRecursive(List<Node<T>> nodes,Collection<T> results) {
    for (Node<T> n : nodes) {
        results.add(n.compute());
        sequentialRecursive(n.getChildren(), results);
    }
}

并行化:
public<T> void parallelRecursive(final Executor exec, List<Node<T>> nodes, final Collection<T> results) {
    for (final Node<T> n : nodes) {
        exec.execute(new Runnable() {
            public void run() {
                results.add(n.compute());
            }
        });
        parallelRecursive(exec, n.getChildren(), results);
    }
}
我原来就做过类似这样的产品, 涉及到这类递归, 但是更复杂一些, 因为递归的对象不是一棵树, 而是一张图, 也就是说有着多个起点, 每一个节点可以有多个父节点... 想来兴许可以考虑把它并行化

看一个例子, 搜索迷宫出口的代码, 用并发的方式来实现
public class ConcurrentPuzzleSolver<P, M> {
    private final Puzzle<P, M> puzzle;
    private final ExecutorService exec;
    private final ConcurrentMap<P, Boolean> seen;
    final ValueLatch<Node<P, M>> solution
            = new ValueLatch<Node<P, M>>();
    ...
    public List<M> solve() throws InterruptedException {
        try {
            P p = puzzle.initialPosition();
            exec.execute(newTask(p, null, null));
            // block until solution found
            /*留意这里是一个阻塞方法, 如果这个迷宫没有出路, 这里就永远阻塞了*/
            Node<P, M> solnNode = solution.getValue();
            return (solnNode == null) ? null : solnNode.asMoveList();
        } finally {
            exec.shutdown();
        }
    }

    protected Runnable newTask(P p, M m, Node<P,M> n) {
        return new SolverTask(p, m, n);
    }

    class SolverTask extends Node<P, M> implements Runnable {
        ...
        public void run() {
            if (solution.isSet()
                    || seen.putIfAbsent(pos, true) != null)
                return; // already solved or seen this position
            if (puzzle.isGoal(pos))
                solution.setValue(this);
            else
                for (M m : puzzle.legalMoves(pos))
                    exec.execute(
                        newTask(puzzle.move(pos, m), m, this));
        }
    }
}

当然, 在上面的代码里面, 为了解决可能出现的阻塞问题, 可以使用一个计数器,保存每个任务运行的状态.
private final AtomicInteger taskCount = new AtomicInteger(0);
每创建一个线程, 就增加1, 每执行完毕一次, 就减1, 当taskCount为0的时候,清空solution即可
if (taskCount.decrementAndGet() == 0)
                    solution.setValue(null);

转自:http://hi.baidu.com/iwishyou2/blog/item/479388649e96fbf8f6365404.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: