使用ScheduledThreadPoolExecutor实现请求合并处理
2018-02-08 15:31
447 查看
需求:当系统产生大量请求,且对实时性要求不高时,可以将请求合并,达到阀值后再提交,以提高系统吞吐量。
实现请求合并功能需要考虑以下两点:
1.当请求收集到一定数量时提交请求;
2.超时后请求没有达到一定数量也提交请求;
代码实现:
FlushThread类实现Runnable接口,实现原理是判断当前线程是否达到阀值或者超过超时,如果达到则执行flus;;;;
方法,执行Processor类的process方法,没有则使用LockSupport的park方法阻塞当前线程;
外部也可以调用timeOut方法提交请求;
Flusher类构造方法创建了保存FlushThread数组,并将FlushThread提交到线程和定时线程池,超过指定时间间隔,定时线程池将调用timeOut方法,或者往FlushThread添加数据时判断是否大于指定数量,大于则开始执行process方法;
测试方法:
输出结果如下:
https://www.xilidou.com/2018/01/22/merge-request/
实现请求合并功能需要考虑以下两点:
1.当请求收集到一定数量时提交请求;
2.超时后请求没有达到一定数量也提交请求;
代码实现:
/** * @author wzx * @time 2018/2/8 */ public class FlushThread<Item> implements Runnable { private final String name; //队列大小 private final int bufferSize; //操作间隔 private int flushInterval; //上一次提交时间 private volatile long lastFlushTime; private volatile Thread writer; //阻塞队列 private final BlockingQueue<Item> queue; //达到条件后具体的执行方法 private final Processor<Item> processor; public FlushThread(String name, int bufferSize, int flushInterval, int queueSize, Processor<Item> processor) { this.name = name; this.bufferSize = bufferSize; this.flushInterval = flushInterval; this.queue = new ArrayBlockingQueue<>(queueSize); this.processor = processor; } public boolean add(Item item) { boolean result = queue.offer(item); flushOnDemand(); return result; } private void flushOnDemand() { if (queue.size() >= bufferSize) { start(); } } private void start() { LockSupport.unpark(writer); } public void timeOut() { if (System.currentTimeMillis() - lastFlushTime >= flushInterval) { start(); } } private boolean canFlush() { return queue.size() > bufferSize || System.currentTimeMillis() - lastFlushTime > flushInterval; } //执行提交数据的方法 public void flush() { lastFlushTime = System.currentTimeMillis(); List<Item> temp = new ArrayList<>(bufferSize); int size = queue.drainTo(temp, bufferSize); if (size > 0) { try { processor.process(temp); } catch(Exception e) { e.printStackTrace(); } } } @Override public void run() { writer = Thread.currentThread(); writer.setName(name); while (!writer.isInterrupted()) { while (!canFlush()) { //如果线程没有被打断,且达不到执行条件,则阻塞 LockSupport.park(this); } flush(); } } }
FlushThread类实现Runnable接口,实现原理是判断当前线程是否达到阀值或者超过超时,如果达到则执行flus;;;;
方法,执行Processor类的process方法,没有则使用LockSupport的park方法阻塞当前线程;
外部也可以调用timeOut方法提交请求;
/** * @author wzx * @time 2018/2/8 */ public class Flusher<Item> { private final FlushThread<Item>[] flushThreads; private AtomicInteger index; private static final Random random = new Random(); private static final int delta = 50; private static ScheduledExecutorService TIMER = new ScheduledThreadPoolExecutor(1); private static ExecutorService POOL = Executors.newCachedThreadPool(); public Flusher(String name,int bufferSiz,int flushInterval,int queueSize,int threads,Processor<Item> processor) { this.flushThreads = new FlushThread[threads]; if (threads>1) { index = new AtomicInteger(); } for (int i = 0; i < threads; i++) { final FlushThread thread = new FlushThread(name + "-" + i, bufferSiz, flushInterval, queueSize, processor); flushThreads[i] = thread; POOL.submit(thread); //定时调用timeout方法 TIMER.scheduleAtFixedRate(thread::timeOut, random.nextInt(delta), flushInterval, TimeUnit.MILLISECONDS); } } public boolean add(Item item) { int len = flushThreads.length; if (len == 1) { return flushThreads[0].add(item); } int mod = index.incrementAndGet() % len; return flushThreads[mod].add(item); } }
Flusher类构造方法创建了保存FlushThread数组,并将FlushThread提交到线程和定时线程池,超过指定时间间隔,定时线程池将调用timeOut方法,或者往FlushThread添加数据时判断是否大于指定数量,大于则开始执行process方法;
测试方法:
/** * @author wzx * @time 2018/2/8 */ public class Test { public static void main(String[] args) throws InterruptedException { Flusher flusher = new Flusher("test", 5, 1000, 30, 1, new PrintOutProcessor()); int index = 1; while (true) { flusher.add(String.valueOf(index++)); Thread.sleep(100); } } }
输出结果如下:
start flush 1 2 3 4 5 end flush start flush 6 7 8 9 10 end flush
https://www.xilidou.com/2018/01/22/merge-request/
相关文章推荐
- spring camel 使用一个实现类去处理web service的请求
- MSSQL 使用临时表实现字符串合并处理
- Ajax详解及其案例分析之如何获得Ajax对象,使用Ajax对象发送GET和POST请求,校验用户名,POST和GET请求时的乱码处理,实现级联的下拉列表
- 在ASP.NET中使用IHttpHandler处理请求(如自实现AJAX)时,无法获得Session(或者说是Session 为 null)的原因及解决方法
- 自己写的使用聚集函数实现多行字串合并处理
- 公文处理方案实现之使用模板新建文档并合并正文内容
- 使用Apache通过JK实现多Tomcat负载均衡集群时,Apache不能将请求分发给Tomcat处理(即Apache反向代理不成功)的问题
- 这是一个秒杀系统,即大量用户抢有限的商品,先到先得 用户并发访问流量非常大,需要分布式的机器集群处理请求 系统实现使用Java
- 前端控制器是整个MVC框架中最为核心的一块,它主要用来拦截符合要求的外部请求,并把请求分发到不同的控制器去处理,根据控制器处理后的结果,生成相应的响应发送到客户端。前端控制器既可以使用Filter实现(Struts2采用这种方式),也可以使用Servlet来实现(spring MVC框架)。
- 使用MVP+Retrofit+RxJava实现的的Android Demo (下)使用Retrofit+RxJava处理网络请求
- PHP网页服务实现注册和商店NPC管理(安装+搭建+使用+问题处理)
- 使用Ext.UpdateManager实现页面任意部分自动刷新处理
- 使用SDK实现MFC模式的处理方法(一)
- 使用python实现文本文件合并功能
- C#图像处理类(使用此类可实现生成锐化效果、黑白效果和灰度效果)
- ExtJS之ASP.NET服务器端实现ashx和asmx处理客户端请求
- 在MFC GridCtrl中使用CGridCellCheck类根据GetCheck()状态做不同处理的实现
- [导入]使用Ext.UpdateManager实现页面任意部分自动刷新处理
- aspx页面使用资源文件实现多语言时,对HTML标记的处理
- 使用ADO.NET 实现事务处理