您的位置:首页 > 编程语言 > Java开发

java web解决瞬间高并发的策略

2017-03-08 09:13 225 查看
1、任何的高并发,请求总是会有一个顺序的

2、java的队列的数据结构是先进先出的取值顺序

3、BlockingQueue类(线程安全)(使用方法可以百度)



一般使用LinkedBlockingQueue

利用以上几点,我们可以把高并发时候的请求放入一个队列,队列的大小可以自己定义,比如队列容量为1000个数据,那么可以利用过滤器或者拦截器把当前的请求放入队列,如果队列的容量满了,其余的请求可以丢掉或者作出相应回复

具体实施:

利用生产者、消费者模型:



将队列的请求一一处理完。

上代码:

1 /**
2  * @author fuguangli
3  * @description 前沿消费者类
4  * @Create date:    2017/3/7
5  * @using   EXAMPLE
6  */
7 public class Customer implements Runnable{
8
9
10     /**
11      *         抛出异常    特殊值        阻塞         超时
12      插入    add(e)    offer(e)    put(e)    offer(e, time, unit)
13      移除    remove()    poll()    take()    poll(time, unit)
14      检查    element()    peek()    不可用    不可用
15
16      */
17     private BlockingQueue blockingQueue;
18     private AtomicInteger count = new AtomicInteger();
19     public Customer(BlockingQueue blockingQueue) {
20         this.blockingQueue = blockingQueue;
21     }
22
23     /**
24      * When an object implementing interface <code>Runnable</code> is used
25      * to create a thread, starting the thread causes the object's
26      * <code>run</code> method to be called in that separately executing
27      * thread.
28      * <p/>
29      * The general contract of the method <code>run</code> is that it may
30      * take any action whatsoever.
31      *
32      * @see Thread#run()
33      */
34     @Override
35     public void run() {
36         System.out.println("消费者线程启动...");
37         LockFlag.setCustomerRunningFlag(true);
38         try {
39             while (LockFlag.getProducerRunningFlag()){
40                 System.out.println(Thread.currentThread().getId()+"I'm Customer.Queue current size="+blockingQueue.size());
41                 String data = (String) blockingQueue.poll(10, TimeUnit.SECONDS);
42                 if(data!=null){
43                     System.out.println(Thread.currentThread().getId()+"*************正在消费数据 data="+data);
44                 }else{
45                     //表示超过取值时间,视为生产者不再生产数据
46                     System.out.println(Thread.currentThread().getId()+"队列为空无数据,请检查生产者是否阻塞");
47                 }
48                 Thread.sleep(50);
49             }
50             System.err.println("消费者程序执行完毕");
51         } catch (InterruptedException e) {
52             e.printStackTrace();
53             System.err.println("消费者程序退出");
54             LockFlag.setCustomerRunningFlag(false);//异常退出线程
55             Thread.currentThread().interrupt();
56         }
57     }
58 }


1 package com.qysxy.framework.queue;
2
3 import java.util.concurrent.BlockingQueue;
4 import java.util.concurrent.TimeUnit;
5 import java.util.concurrent.atomic.AtomicInteger;
6
7 /**
8  * @author fuguangli
9  * @description 队列生产者类
10  * @Create date:    2017/3/7
11  * @using       EXAMPLE
12  */
13 public class Producer implements Runnable{
14
15
16     /**
17      *         抛出异常    特殊值        阻塞         超时
18      插入    add(e)    offer(e)    put(e)    offer(e, time, unit)
19      移除    remove()    poll()    take()    poll(time, unit)
20      检查    element()    peek()    不可用    不可用
21
22      */
23     private BlockingQueue blockingQueue;
24     private AtomicInteger count = new AtomicInteger();
25     public Producer(BlockingQueue blockingQueue) {
26         this.blockingQueue = blockingQueue;
27     }
28
29     /**
30      * When an object implementing interface <code>Runnable</code> is used
31      * to create a thread, starting the thread causes the object's
32      * <code>run</code> method to be called in that separately executing
33      * thread.
34      * <p/>
35      * The general contract of the method <code>run</code> is that it may
36      * take any action whatsoever.
37      *
38      * @see Thread#run()
39      */
40     @Override
41     public void run() {
42         System.out.println("生产者线程启动...");
43         LockFlag.setProducerRunningFlag(true);
44         try {
45             while (LockFlag.getProducerRunningFlag()){
46                 String data = "data:"+count.incrementAndGet();
47                 if(blockingQueue.offer(data,10, TimeUnit.SECONDS)){
48                     //返回true表示生产数据正确
49                     System.out.println("^^^^^^^^^^^^^^正在生产数据 data="+data);
50                 }else {
51                     //表示阻塞时间内还没有生产者生产数据
52                     System.out.println("生产者异常,无法生产数据");
53                 }
54                 Thread.sleep(50);
55
56             }
57         } catch (InterruptedException e) {
58             e.printStackTrace();
59             System.err.println("生产者程序退出");
60             LockFlag.setProducerRunningFlag(false);//异常退出线程
61             Thread.currentThread().interrupt();
62         }
63     }
64 }


1 package com.qysxy.framework.queue;
2
3 /**
4  * @author fuguangli
5  * @description 前沿生产者消费者模型的锁类
6  * @Create date:    2017/3/7
7  */
8 public class LockFlag {
9     /**
10      * 生产者互斥锁
11      */
12     private static Boolean producerRunningFlag = false;
13     /**
14      * 消费者互斥锁
15      */
16     private static Boolean customerRunningFlag = false;
17
18     public static Boolean getProducerRunningFlag() {
19         return producerRunningFlag;
20     }
21
22     public static void setProducerRunningFlag(Boolean producerRunningFlag) {
23         LockFlag.producerRunningFlag = producerRunningFlag;
24     }
25
26     public static Boolean getCustomerRunningFlag() {
27         return customerRunningFlag;
28     }
29
30     public static void setCustomerRunningFlag(Boolean customerRunningFlag) {
31         LockFlag.customerRunningFlag = customerRunningFlag;
32     }
33 }


1 package com.qysxy.framework.queue;
2
3 import javax.servlet.http.HttpServletRequest;
4 import javax.servlet.http.HttpServletResponse;
5 import java.util.Queue;
6 import java.util.concurrent.*;
7
8 /**
9  * @author fuguangli
10  * @description 前沿队列实用类,用于大量并发用户
11  * @Create date:    2017/3/7
12  */
13 public class BlockingQueueHelper {
14
15
16     private static final Integer maxQueueSize = 1000;
17     private static BlockingQueue blockingQueue = new LinkedBlockingQueue(maxQueueSize);
18     private static ExecutorService threadPool = Executors.newCachedThreadPool();
19
20
21     public static BlockingQueue getBlockingQueue() {
22         if (blockingQueue == null) {
23             blockingQueue = new LinkedBlockingQueue(maxQueueSize);
24         }
25         return blockingQueue;
26     }
27
28     /**
29      * @param o 队列处理对象(包含request,response,data)
30      */
31     public static void requestQueue(Object o) {
32         //检测当前的队列大小
33         if (blockingQueue != null && blockingQueue.size() < maxQueueSize) {
34             //可以正常进入队列
35             if (blockingQueue.offer(o)) {
36                 //添加成功,检测数据处理线程是否正常
37                 if (LockFlag.getCustomerRunningFlag()) {
38                     //说明处理线程类正常运行
39                 } else {
40                     //说明处理线程类停止,此时,应重新启动线程进行数据处理
41                     LockFlag.setCustomerRunningFlag(true);
42
43                     //example:run
44                     Customer customer = new Customer(blockingQueue);
45                     threadPool.execute(customer);
46
47                 }
48
49             } else {
50                 //进入队列失败,做出相应的处理,或者尝试重新进入队列
51
52             }
53         } else {
54             //队列不正常,或队列大小已达上限,做出相应处理
55
56         }
57
58     }
59 }


好了,这时候,利用过滤器或者拦截器将每个请求封装成队列元素进行处理就行。

当然了,对于多应用服务器的部署架构来说,数据库也需要加锁,数据库隔离级别下篇再说。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: