您的位置:首页 > 编程语言 > Java开发

java 使用comet4j向客户端主动推送例子

2016-07-01 19:03 531 查看

1.准备工作:

1、下载comet4j.js

2、下载comet4j-tomcat7.jar  这个现在只支持tomcat6和7两个版本  一定要对应上了,我这边测试的  在tomcat8下面是用comet4j-tomcat7.jar这个jar文件也是可以推送的

2.maven配置

因为comet4j-tomcat6.jar这个jar文件在maven远程仓库中不存在,所以需要将jar包放到lib下面,但是maven打包的时候会找不到这个jar文件  所以在pom.xml中加入下面代码即可

<dependency>

            <groupId>comet4j-tomcat7</groupId>

            <artifactId>test</artifactId>

            <version>1.0</version>

            <scope>system</scope>

            <systemPath>${basedir}/src/main/webapp/WEB-INF/lib/comet4j-tomcat7.jar</systemPath>

</dependency>

3.修改tomcat配置文件conf/server.xml

修改之前为:

<Connector connectionTimeout="20000" port="8080" protocol="HTTP/1.1" redirectPort="8443"/>

修改之后为:

<Connector connectionTimeout="20000" port="8080"  protocol="org.apache.coyote.http11.Http11NioProtocol" redirectPort="8443" URIEncoding="UTF-8"/>

4.修改web.xml配置

<listener>

        <description>Comet4J容器侦听</description>

        <listener-class>org.comet4j.core.CometAppListener</listener-class>

    </listener>

    

    <listener>

        <description>监听我们自己的推送类</description>

        <listener-class>com.util.CometUtil</listener-class>

    </listener>

    <servlet>

        <description>客户端访问入口</description>

        <servlet-name>CometServlet</servlet-name>

        <servlet-class>org.comet4j.core.CometServlet</servlet-class>

    </servlet>

    <servlet-mapping>

        <servlet-name>CometServlet</servlet-name>

        <url-pattern>/conn</url-pattern>

    </servlet-mapping>

5.java后端推送工具类

CometUtil.java

import javax.servlet.ServletContextEvent;

import javax.servlet.ServletContextListener;

import org.comet4j.core.CometConnection;

import org.comet4j.core.CometContext;

import org.comet4j.core.CometEngine;

import org.comet4j.core.event.ConnectEvent;

import org.comet4j.core.listener.ConnectListener;

import com.Comet;

public class CometUtil extends ConnectListener implements ServletContextListener {

     /**

      * 初始化上下文

      */

     public void contextInitialized(ServletContextEvent arg0) {

             // CometContext : Comet4J上下文,负责初始化配置、引擎对象、连接器对象、消息缓存等。

             CometContext cc = CometContext.getInstance();

             // 注册频道,即标识哪些字段可用当成频道,用来作为向前台传送数据的“通道”

             cc.registChannel(Constant.CHANNEL_MSGCOUNT);

             cc.registChannel(Constant.CHANNEL_MSG_DATA);

             //添加监听器  

             CometEngine engine = CometContext.getInstance().getEngine();  

             engine.addConnectListener(this);

     }

    @Override

    public void contextDestroyed(ServletContextEvent sce) {

        // TODO Auto-generated method stub

    
4000
}

    @Override

    public boolean handleEvent(ConnectEvent connEvent){

        // TODO Auto-generated method stub

        final CometConnection conn = connEvent.getConn();

           Object userId = conn.getRequest().getSession().getAttribute("currentUserId");

        CacheManager.putContent(userId.toString(), connEvent);

        return true;

    }

    private void doCache(final CometConnection conn,String userId) {  

        if (userId != null) {  

            CacheManager.putContent(conn.getId(), String.valueOf(userId), Constant.EXPIRE_AFTER_ONE_HOUR);  

        }  

    }

    /**

     * 推送给所有的客户端

     * @param comet

     */

    public void pushToAll(Comet comet){

        try {

            CometEngine engine = CometContext.getInstance().getEngine();

               //推送到所有客户端  

               engine.sendToAll(Constant.CHANNEL_MSGCOUNT,comet.getMsgCount());

               engine.sendToAll(Constant.CHANNEL_MSG_DATA,comet.getMsgData());

        } catch (Exception e) {

            // TODO: handle exception

            System.out.println(e.getMessage());

        }

           

    }

    /**

     * 推送给指定客户端

     * @param comet

     */

    public void pushTo(Comet comet){

        try {

            ConnectEvent connEvent = (ConnectEvent) CacheManager.getContent(comet.getUserId()).getValue();

            final CometConnection conn = connEvent.getConn();

               //建立连接和用户的关系  

               doCache(conn,comet.getUserId());

               final String connId = conn.getId();

               CometEngine engine = CometContext.getInstance().getEngine();

               if (CacheManager.getContent(connId).isExpired()) {  

                   doCache(conn,comet.getUserId());  

               }

               //推送到指定的客户端  

              engine.sendTo(Constant.CHANNEL_MSGCOUNT, engine.getConnection(connId), comet.getMsgCount());

              engine.sendTo(Constant.CHANNEL_MSG_DATA, engine.getConnection(connId), comet.getMsgData());

        } catch (Exception e) {

            // TODO: handle exception

            System.out.println(e.getMessage());

        }

    }

Constant.java

public class Constant {

    public static long EXPIRE_AFTER_ONE_HOUR = 30; //cache过期时间

    public static String CHANNEL_MSGCOUNT= "msgCount";

    public static String CHANNEL_MSG_DATA= "msgData";

}

Comet.java

import java.util.List;

import java.util.Map;

public class Comet {

    private String userId;

    private String msgCount;

    private List<Map> msgData;

    public String getUserId() {

        return userId;

    }

    public void setUserId(String userId) {

        this.userId = userId;

    }

    public String getMsgCount() {

        return msgCount;

    }

    public void setMsgCount(String msgCount) {

        this.msgCount = msgCount;

    }

    public List<Map> getMsgData() {

        return msgData;

    }

    public void setMsgData(List<Map> msgData) {

        this.msgData = msgData;

    }

}

Cache.java

public class Cache {

     private String key;

     private Object value;

     private long timeOut;

     private boolean expired;

     public Cache() {

             super();

     }

              

     public Cache(String key, String value, long timeOut, boolean expired) {

             this.key = key;

             this.value = value;

             this.timeOut = timeOut;

             this.expired = expired;

     }

 

     public String getKey() {

             return key;

     }

 

     public long getTimeOut() {

             return timeOut;

     }

 

     public Object getValue() {

             return value;

     }

 

     public void setKey(String string) {

             key = string;

     }

 

     public void setTimeOut(long l) {

             timeOut = l;

     }

 

     public void setValue(Object object) {

             value = object;

     }

 

     public boolean isExpired() {

             return expired;

     }

 

     public void setExpired(boolean b) {

             expired = b;

     }

}

CacheManager.java

public class CacheManager {

    private static HashMap cacheMap = new HashMap();

    

    /**

     * This class is singleton so private constructor is used.

     */

    private CacheManager() {

            super();

    }

    /**

     * returns cache item from hashmap

     * @param key

     * @return Cache

     */

    private synchronized static Cache getCache(String key) {

            return (Cache)cacheMap.get(key);

    }

    /**

     * Looks at the hashmap if a cache item exists or not

     * @param key

     * @return Cache

     */

    private synchronized static boolean hasCache(String key) {

            return cacheMap.containsKey(key);

    }

    /**

     * Invalidates all cache

     */

    public synchronized static void invalidateAll() {

            cacheMap.clear();

    }

    /**

     * Invalidates a single cache item

     * @param key

     */

    public synchronized static void invalidate(String key) {

            cacheMap.remove(key);

    }

    /**

     * Adds new item to cache hashmap

     * @param key

     * @return Cache

     */

    private synchronized static void putCache(String key, Cache object) {

       cacheMap.put(key, object);

    }

    /**

     * Reads a cache item's content

     * @param key

     * @return

     */

    public static Cache getContent(String key) {

             if (hasCache(key)) {

                    Cache cache = getCache(key);

                    if (cacheExpired(cache)) {

                            cache.setExpired(true);

                    }

                    return cache;

             } else {

                     return null;

             }

    }

    /**

     *

     * @param key

     * @param content

     * @param ttl

     */

    public static void putContent(String key, Object content, long ttl) {

            Cache cache = new Cache();

            cache.setKey(key);

            cache.setValue(content);

            cache.setTimeOut(ttl + new Date().getTime());

            cache.setExpired(false);

            putCache(key, cache);

    }

    public static void putContent(String key, Object content) {

        Cache cache = new Cache();

        cache.setKey(key);

        cache.setValue(content);

        cache.setExpired(false);

        putCache(key, cache);

}

     

    /** @modelguid {172828D6-3AB2-46C4-96E2-E72B34264031} */

    private static boolean cacheExpired(Cache cache){

            if (cache == null) {

                    return false;

            }

            long milisNow = new Date().getTime();

            long milisExpire = cache.getTimeOut();

            if (milisExpire < 0) {                // Cache never expires  

                    return false;

            } else if (milisNow >= milisExpire) {

                    return true;

            } else {

                    return false;

            }

    }

}

6、前端jsp代码

在前段要显示推送的页面引入js

<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8" %>

<%@taglib prefix="c" uri="http://java.sun.com/jsp/jstl/core" %>

<%@taglib prefix="fn" uri="http://java.sun.com/jsp/jstl/functions" %>

<c:set var="ctx" value="${pageContext.request.contextPath}" />

<script type="text/javascript" src="${ctx }/js/comet4j.js"></script>

<script type="text/javascript">

var count = 0;

window.onload = function(){

    // 建立连接,conn 即web.xml中 CometServlet的<url-pattern>

    JS.Engine.start('${ctx}/conn');

    <% 

         //保存用户id到session中

         session.setAttribute("currentUserId",user.getId().toString());

    %>  

    // 监听后台某个频道

    JS.Engine.on(

         {

            // 对应服务端 “频道1” 的值 msgCount

            msgCount : function(msgCount){

                $("#msgCount").html(msgCount);

            },

            // 对应服务端 “频道2” 的值 msgData

            msgData : function(msgData){

                $("#msgData").html(msgData);

            },

        }

    );

}

</script>

<body>

消息数量:<span id="msgCount"></span>

消息数据:<span id="msgData"></span>

</body>

经过以上的工作,我们就可以实现推送了 ,项目启动后,在任何类中调用下面的代码就可以推送给前端了,例如:

//所有客户端推送:

Comet comet = new Comet();

comet.setMsgCount(String.valueOf(msgCount));

comet.setMsgData(resultList);

new CometUtil()..pushToAll(comet);

//精准推送给某个客户端

Comet comet = new Comet();

comet.setUserId("1");//前端到session中的用户id

comet.setMsgCount(String.valueOf(msgCount));

comet.setMsgData(resultList);

new CometUtil()..pushTo(comet);

那么怎么实现实时推送呢 ,我用的是spring定时任务  定时扫描需要推送的内容  然后在调用推送工具类

spring定时器配置

1、pom.xml中引入依赖

<dependency>

            <groupId>org.quartz-scheduler</groupId>

            <artifactId>quartz</artifactId>

            <version>2.2.3</version>

        </dependency>

2、spring.xml文件加入下面代码

<!--     消息推送定时任务 -->

    <!-- 定时器配置 -->

    <bean id="pushMessageTask" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">

        <property name="targetObject">

            <bean class="com.util.timeTask.PushMessageTimer"/>        

        </property><!-- 指定任务类 -->

        <property name="targetMethod" value="doit"></property><!-- 指定任务方法 -->

    </bean>

    <!-- 定义时间间隔触发器 -->

    <bean id="pushMessageTigger" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">

        <property name="jobDetail" ref="pushMessageTask"></property>

        <property name="cronExpression">

            <value>*/10 * * * * ?</value><!-- 10秒执行一次 -->

        </property>

    </bean>

  <!-- 启动定时器 -->

    <bean id="startQuertz" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">

    <property name="triggers">

        <list>

            <ref bean="pushMessageTigger"/> 

        </list>

    </property>

3、PushMessageTimer.java

import java.util.List;

import java.util.Map;

import org.springframework.beans.factory.annotation.Autowired;

import com.MessageMapper;

import com.Comet;

import com.util.CometUtil;

public class PushMessageTimer {

    @Autowired

    MessageMapper messageMapper;

    

    private static CometUtil cometUtil = null;

    public void doit(){

        if(cometUtil == null){

            cometUtil = new CometUtil();

        }

        List<Map> resultList = null;

        int msgCount = 0;

        try {

//            String userId = (String) request.getSession().getAttribute("currentUserId");//精准推送需要获取前端保存到session的用户id

            resultList = messageMapper.getUnReadMessageList(null);

            msgCount   = messageMapper.getUnReadMessageCount(null);

            //推送消息

            Comet comet = new Comet();

//            comet.setUserId(userId);

            comet.setMsgCount(String.valueOf(msgCount));

            comet.setMsgData(resultList);

            cometUtil.pushToAll(comet);

        } catch (Exception e) {

            // TODO Auto-generated catch block

            e.printStackTrace();

            System.out.println(e.getMessage());

        }

    }

}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: