您的位置:首页 > 其它

使用ScheduledThreadPoolExecutor实现请求合并处理

2018-02-08 15:31 447 查看
需求:当系统产生大量请求,且对实时性要求不高时,可以将请求合并,达到阀值后再提交,以提高系统吞吐量。

实现请求合并功能需要考虑以下两点:

1.当请求收集到一定数量时提交请求;

2.超时后请求没有达到一定数量也提交请求;

代码实现:

/**
* @author wzx
* @time 2018/2/8
*/
public class FlushThread<Item> implements Runnable {

private final String name;
//队列大小
private final int bufferSize;
//操作间隔
private int flushInterval;
//上一次提交时间
private volatile long lastFlushTime;
private volatile Thread writer;
//阻塞队列
private final BlockingQueue<Item> queue;
//达到条件后具体的执行方法
private final Processor<Item> processor;

public FlushThread(String name, int bufferSize, int flushInterval, int queueSize, Processor<Item> processor) {
this.name = name;
this.bufferSize = bufferSize;
this.flushInterval = flushInterval;
this.queue = new ArrayBlockingQueue<>(queueSize);
this.processor = processor;
}

public boolean add(Item item) {
boolean result = queue.offer(item);
flushOnDemand();
return result;
}

private void flushOnDemand() {
if (queue.size() >= bufferSize) {
start();
}
}

private void start() {
LockSupport.unpark(writer);
}

public void timeOut() {
if (System.currentTimeMillis() - lastFlushTime >= flushInterval) {
start();
}
}

private boolean canFlush() {
return queue.size() > bufferSize || System.currentTimeMillis() - lastFlushTime > flushInterval;
}

//执行提交数据的方法
public void flush() {
lastFlushTime = System.currentTimeMillis();
List<Item> temp = new ArrayList<>(bufferSize);
int size = queue.drainTo(temp, bufferSize);
if (size > 0) {
try {
processor.process(temp);
} catch(Exception e) {
e.printStackTrace();
}
}
}

@Override
public void run() {
writer = Thread.currentThread();
writer.setName(name);
while (!writer.isInterrupted()) {
while (!canFlush()) {
//如果线程没有被打断,且达不到执行条件,则阻塞
LockSupport.park(this);
}
flush();
}
}
}


FlushThread类实现Runnable接口,实现原理是判断当前线程是否达到阀值或者超过超时,如果达到则执行flus;;;;

方法,执行Processor类的process方法,没有则使用LockSupport的park方法阻塞当前线程;

外部也可以调用timeOut方法提交请求;

/**
* @author wzx
* @time 2018/2/8
*/
public class Flusher<Item> {

private final FlushThread<Item>[] flushThreads;

private AtomicInteger index;

private static final Random random = new Random();

private static final int delta = 50;

private static ScheduledExecutorService TIMER = new ScheduledThreadPoolExecutor(1);

private static ExecutorService POOL = Executors.newCachedThreadPool();

public Flusher(String name,int bufferSiz,int flushInterval,int queueSize,int threads,Processor<Item> processor) {
this.flushThreads = new FlushThread[threads];
if (threads>1) {
index = new AtomicInteger();
}
for (int i = 0; i < threads; i++) {
final FlushThread thread = new FlushThread(name + "-" + i, bufferSiz, flushInterval, queueSize, processor);
flushThreads[i] = thread;
POOL.submit(thread);
//定时调用timeout方法
TIMER.scheduleAtFixedRate(thread::timeOut, random.nextInt(delta), flushInterval, TimeUnit.MILLISECONDS);
}
}

public boolean add(Item item) {
int len = flushThreads.length;
if (len == 1) {
return flushThreads[0].add(item);
}
int mod = index.incrementAndGet() % len;
return flushThreads[mod].add(item);
}

}


Flusher类构造方法创建了保存FlushThread数组,并将FlushThread提交到线程和定时线程池,超过指定时间间隔,定时线程池将调用timeOut方法,或者往FlushThread添加数据时判断是否大于指定数量,大于则开始执行process方法;

测试方法:

/**
* @author wzx
* @time 2018/2/8
*/
public class Test {
public static void main(String[] args) throws InterruptedException {
Flusher flusher = new Flusher("test", 5, 1000, 30, 1, new PrintOutProcessor());
int index = 1;
while (true) {
flusher.add(String.valueOf(index++));
Thread.sleep(100);
}
}
}


输出结果如下:

start flush
1
2
3
4
5
end flush
start flush
6
7
8
9
10
end flush


https://www.xilidou.com/2018/01/22/merge-request/
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐