您的位置:首页 > 数据库 > Memcache

基于memcache的java分布式队列实现。

2020-07-14 05:33 1161 查看

主要有两个类,一个队列类和一个job的抽象类。

保证队列类中的key的唯一性,就可以用spring配置多个实例。水平有限,欢迎吐槽。

上代码:

1、队列类

import net.spy.memcached.MemcachedClient;
import net.spy.memcached.internal.OperationFuture;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;

import com.izx.services.common.Constant;

/**
 * 
* @ClassName: MemCacheQueue 
* @Description: 基于memcache的消息队列的实现
* @author hai.zhu
* @date 2016-3-31 下午3:29:00 
*
 */
public class MemCacheQueue implements InitializingBean, DisposableBean,ApplicationContextAware {
    private static final Log log = LogFactory.getLog(MemCacheQueue.class);
    
    /**
     * 队列名
     */
    private String key;
    
    /**
     * 队列锁失效分钟
     */
    private Integer lockExpireMinite = 3;
    
    private MemcachedClient memcachedClient;
    
    private ApplicationContext applicationContext;
    
    ListenerThread listenerThread = new ListenerThread();
    
    public void setKey(String key) {
        this.key = key;
    }
    
    public void setMemcachedClient(MemcachedClient memcachedClient) {
        this.memcachedClient = memcachedClient;
    }
    
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    @Override
    public void destroy() throws Exception {
        try {
            this.sign = false;
            listenerThread.interrupt();
        } catch (Exception e) {
            log.error(e);
        }
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        //初始化队列,用add防止重启覆盖
        memcachedClient.add(Constant.MEMCACHE_GLOBAL_QUEUE_STARTKEY + key, 0, "0");
        memcachedClient.add(Constant.MEMCACHE_GLOBAL_QUEUE_ENDKEY + key, 0, "0");
        //设置任务线程
        listenerThread.setDaemon(true);
        listenerThread.start();
    }
    
    /**
     * 
    * @Title: push 
    * @Description: 唯一对外方法,放入要执行的任务
    * @param @param value
    * @param @throws Exception    设定文件 
    * @return void    返回类型 
    * @throws
     */
    public synchronized void push(MemCacheQueueJobAdaptor value) throws Exception {
        //分布加锁
        queuelock();
        //放入队列
        memcachedClient.incr(Constant.MEMCACHE_GLOBAL_QUEUE_ENDKEY + key, 1);
        Object keyorder = memcachedClient.get(Constant.MEMCACHE_GLOBAL_QUEUE_ENDKEY + key);
        memcachedClient.set(Constant.MEMCACHE_GLOBAL_QUEUE_VARIABLE + key + "_" + keyorder, 0, value);
        //分布解锁
        queueUnLock();
    }
    
    /**
     * 
    * @Title: pop 
    * @Description: 取出要执行的任务
    * @param @return
    * @param @throws Exception    设定文件 
    * @return MemCacheQueueJobAdaptor    返回类型 
    * @throws
     */
    private synchronized MemCacheQueueJobAdaptor pop() throws Exception {
        Object keyorderstart = memcachedClient.get(Constant.MEMCACHE_GLOBAL_QUEUE_STARTKEY + key);
        Object keyorderend = memcachedClient.get(Constant.MEMCACHE_GLOBAL_QUEUE_ENDKEY + key);
        if(keyorderstart.equals(keyorderend)){
            return null;
        }
        MemCacheQueueJobAdaptor adaptor = (MemCacheQueueJobAdaptor)memcachedClient.get(Constant.MEMCACHE_GLOBAL_QUEUE_VARIABLE + key + "_" + keyorderstart);
        memcachedClient.incr(Constant.MEMCACHE_GLOBAL_QUEUE_STARTKEY + key, 1);
        memcachedClient.delete(Constant.MEMCACHE_GLOBAL_QUEUE_VARIABLE + key + "_" + keyorderstart);
        return adaptor;
    }
    
    /**
     * 
    * @Title: queuelock 
    * @Description: 加锁
    * @param @throws InterruptedException    设定文件 
    * @return void    返回类型 
    * @throws
     */
    private void queuelock() throws Exception {
        do {
            OperationFuture<Boolean> sign = memcachedClient.add(Constant.MEMCACHE_GLOBAL_QUEUE_LOCK + key, lockExpireMinite * 60, key);
            if(sign.get()){
                return;
            } else {
                log.debug("key: " + key + " locked by another business");
            }
            Thread.sleep(300);
        } while (true);
    }
    
    /**
     * 
    * @Title: queueUnLock 
    * @Description: 解锁
    * @param     设定文件 
    * @return void    返回类型 
    * @throws
     */
    private void queueUnLock() {
        memcachedClient.delete(Constant.MEMCACHE_GLOBAL_QUEUE_LOCK + key);
    }
    
    private boolean sign = true;
    private long THREAD_SLEEP = 10;
    class ListenerThread extends Thread {
        @Override
        public void run(){
            log.error("队列["+key+"]开始执行");
            while(sign){
                try {
                    Thread.sleep(THREAD_SLEEP);
                    dojob();
                } catch (Exception e) {
                    log.error(e);
                }
            }
        }
        
        private void dojob(){
            try{
                queuelock();
                MemCacheQueueJobAdaptor adaptor = pop();
                //逐个执行
                if(adaptor != null){
                    THREAD_SLEEP = 10;
                    try {
                        adaptor.setApplicationContext(applicationContext);
                        adaptor.onMessage();
                    } catch (Exception e) {
                        log.error(e);
                    }
                }else{
                    THREAD_SLEEP = 5000;
                }
            }catch(Exception e){
                log.error(e);
            }finally{
                queueUnLock();
            }
        }
    }
}
[/code]

2、job抽象类

import org.springframework.context.ApplicationContext;
import java.io.Serializable;

/**
 * 
 * @ClassName: MemCacheQueueJobAdaptor 
 * @Description: 基于memcache队列的任务适配器
 * @author hai.zhu
 * @date 2015-12-11 上午11:48:26 
 * @param <T>
 */
public abstract class MemCacheQueueJobAdaptor implements Serializable{
    private static final long serialVersionUID = -5071415952097756327L;
    
    private ApplicationContext applicationContext;
    
    public ApplicationContext getApplicationContext() {
        return applicationContext;
    }

    public void setApplicationContext(ApplicationContext applicationContext) {
        this.applicationContext = applicationContext;
    }
    
    /**
     * 
     * @Title: onMessage 
     * @Description: 异步执行任务接口
     * @author hai.zhu
     * @param @param value 设定文件 
     * @return void 返回类型 
     * @throws
     */
    public abstract void onMessage();
}
[/code]

3、部分放在constant的常量

/**
     * 基于memcache的队列存放前缀
     */
    public static String MEMCACHE_GLOBAL_QUEUE_VARIABLE = "MEMCACHE_GLOBAL_QUEUE_VARIABLE_";
    
    /**
     * 基于memcache的队列锁的前缀
     */
    public static String MEMCACHE_GLOBAL_QUEUE_LOCK = "MEMCACHE_GLOBAL_QUEUE_LOCK_";
    
    /**
     * 基于memcache的队列锁的开始元素
     */
    public static String MEMCACHE_GLOBAL_QUEUE_STARTKEY = "MEMCACHE_GLOBAL_QUEUE_STARTKEY_";
    
    /**
     * 基于memcache的队列锁的结束元素
     */
    public static String MEMCACHE_GLOBAL_QUEUE_ENDKEY = "MEMCACHE_GLOBAL_QUEUE_ENDKEY_";
[/code]

4、spring配置,保证队列名的唯一性就可以配置多个队列

<!-- 枚举类型需要转换 -->
    <bean id="KETAMA_HASH" class="org.springframework.beans.factory.config.FieldRetrievingFactoryBean">    
        <property name="staticField" value="net.spy.memcached.DefaultHashAlgorithm.KETAMA_HASH" />    
    </bean>
    
    <!-- memcache客户端 -->
    <bean id="memcachedClient" class="net.spy.memcached.spring.MemcachedClientFactoryBean">
        <property name="servers" value="192.168.75.154:11277,192.168.75.154:11277,192.168.75.154:11277"/>
        <property name="protocol" value="BINARY"/>
        <property name="transcoder">
            <bean class="net.spy.memcached.transcoders.SerializingTranscoder">
                <property name="compressionThreshold" value="1024"/>
            </bean>
        </property>
        <property name="opTimeout" value="1000"/>
        <property name="timeoutExceptionThreshold" value="1998"/>
        <property name="hashAlg" ref="KETAMA_HASH"/>
        <property name="locatorType" value="CONSISTENT"/> 
        <property name="failureMode" value="Redistribute"/>
        <property name="useNagleAlgorithm" value="false"/>
    </bean>
    
    <!-- 队列配置 -->
    <bean id="onequeue" class="com.izx.services.queque.MemCacheQueue">
        <property name="memcachedClient" ref="memcachedClient"/>
        <property name="key" value="onequeue"/>
    </bean>


转载于:https://my.oschina.net/zhuxuan/blog/650935

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: