java消息队列ActiveMQ的简单使用
2016-08-03 14:39
846 查看
activeMQ
是学习java消息队列的实现项目,使用jfinal + jfinal-ext + activeMQ + quartz快速构建。
1.消息队列
消息队列,其实是一种基于数据结构实现的服务。而java语言中的实现,有apache的activeMQ,比较主流。
2.环境搭建
首先去apache的官网下载apache-activeMQ-...-.zip的包,解压后,运行bin中的activeMQ服务。 在浏览器中输入http://localhost:8186/admin,出现登陆界面输入admin/admin登陆即可。
然后创建一个FirstQueue队列(给后面的实例提供服务)。
3.activeMQ原始操作
记住activeMQ服务一定要一直开启,发送者和接收者都会通过tcp协议去链接服务器,以取得消息队列中的消息体。 如下图是我的服务器cmd截图:
3.1.首先建立发送者Sender.java
<code class="language-java hljs has-numbering"><span class="hljs-keyword">package</span> com.mg.demo; <span class="hljs-keyword">import</span> javax.jms.Connection; <span class="hljs-keyword">import</span> javax.jms.ConnectionFactory; <span class="hljs-keyword">import</span> javax.jms.DeliveryMode; <span class="hljs-keyword">import</span> javax.jms.Destination; <span class="hljs-keyword">import</span> javax.jms.MessageProducer; <span class="hljs-keyword">import</span> javax.jms.Session; <span class="hljs-keyword">import</span> javax.jms.TextMessage; <span class="hljs-keyword">import</span> org.apache.activemq.ActiveMQConnection; <span class="hljs-keyword">import</span> org.apache.activemq.ActiveMQConnectionFactory; <span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">Sender</span> {</span> <span class="hljs-keyword">private</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">final</span> <span class="hljs-keyword">int</span> SEND_NUMBER = <span class="hljs-number">5</span>; <span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">main</span>(String[] args) { <span class="hljs-comment">// ConnectionFactory :连接工厂,JMS 用它创建连接</span> ConnectionFactory connectionFactory; <span class="hljs-comment">// Connection :JMS 客户端到JMS</span> <span class="hljs-comment">// Provider 的连接</span> Connection connection = <span class="hljs-keyword">null</span>; <span class="hljs-comment">// Session: 一个发送或接收消息的线程</span> Session session; <span class="hljs-comment">// Destination :消息的目的地;消息发送给谁.</span> Destination destination; <span class="hljs-comment">// MessageProducer:消息发送者</span> MessageProducer producer; <span class="hljs-comment">// TextMessage message;</span> <span class="hljs-comment">// 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar</span> connectionFactory = <span class="hljs-keyword">new</span> ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, <span class="hljs-string">"tcp://localhost:61616"</span>); <span class="hljs-keyword">try</span> { <span class="hljs-comment">// 构造从工厂得到连接对象</span> connection = connectionFactory.createConnection(); <span class="hljs-comment">// 启动</span> connection.start(); <span class="hljs-comment">// 获取操作连接</span> session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); <span class="hljs-comment">// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置</span> destination = session.createQueue(<span class="hljs-string">"FirstQueue"</span>); <span class="hljs-comment">// 得到消息生成者【发送者】</span> producer = session.createProducer(destination); <span class="hljs-comment">// 设置不持久化,此处学习,实际根据项目决定</span> producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); <span class="hljs-comment">// 构造消息,此处写死,项目就是参数,或者方法获取</span> sendMessage(session, producer); session.commit(); } <span class="hljs-keyword">catch</span> (Exception e) { e.printStackTrace(); } <span class="hljs-keyword">finally</span> { <span class="hljs-keyword">try</span> { <span class="hljs-keyword">if</span> (<span class="hljs-keyword">null</span> != connection) connection.close(); } <span class="hljs-keyword">catch</span> (Throwable ignore) { } } } <span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">sendMessage</span>(Session session, MessageProducer producer) <span class="hljs-keyword">throws</span> Exception { <span class="hljs-keyword">for</span> (<span class="hljs-keyword">int</span> i = <span class="hljs-number">1</span>; i <= SEND_NUMBER; i++) { TextMessage message = session.createTextMessage(<span class="hljs-string">"ActiveMq 发送的消息"</span> + i); <span class="hljs-comment">// 发送消息到目的地方</span> System.out.println(<span class="hljs-string">"发送消息:"</span> + <span class="hljs-string">"ActiveMq 发送的消息"</span> + i); producer.send(message); } } } </code><ul style="" class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li><li>5</li><li>6</li><li>7</li><li>8</li><li>9</li><li>10</li><li>11</li><li>12</li><li>13</li><li>14</li><li>15</li><li>16</li><li>17</li><li>18</li><li>19</li><li>20</li><li>21</li><li>22</li><li>23</li><li>24</li><li>25</li><li>26</li><li>27</li><li>28</li><li>29</li><li>30</li><li>31</li><li>32</li><li>33</li><li>34</li><li>35</li><li>36</li><li>37</li><li>38</li><li>39</li><li>40</li><li>41</li><li>42</li><li>43</li><li>44</li><li>45</li><li>46</li><li>47</li><li>48</li><li>49</li><li>50</li><li>51</li><li>52</li><li>53</li><li>54</li><li>55</li><li>56</li><li>57</li><li>58</li><li>59</li><li>60</li><li>61</li><li>62</li><li>63</li><li>64</li><li>65</li><li>66</li><li>67</li><li>68</li><li>69</li></ul><ul style="" class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li><li>5</li><li>6</li><li>7</li><li>8</li><li>9</li><li>10</li><li>11</li><li>12</li><li>13</li><li>14</li><li>15</li><li>16</li><li>17</li><li>18</li><li>19</li><li>20</li><li>21</li><li>22</li><li>23</li><li>24</li><li>25</li><li>26</li><li>27</li><li>28</li><li>29</li><li>30</li><li>31</li><li>32</li><li>33</li><li>34</li><li>35</li><li>36</li><li>37</li><li>38</li><li>39</li><li>40</li><li>41</li><li>42</li><li>43</li><li>44</li><li>45</li><li>46</li><li>47</li><li>48</li><li>49</li><li>50</li><li>51</li><li>52</li><li>53</li><li>54</li><li>55</li><li>56</li><li>57</li><li>58</li><li>59</li><li>60</li><li>61</li><li>62</li><li>63</li><li>64</li><li>65</li><li>66</li><li>67</li><li>68</li><li>69</li></ul>
3.2.再创建接收者Receiver.java
<code class="language-java hljs has-numbering"><span class="hljs-keyword">package</span> com.mg.demo; <span class="hljs-keyword">import</span> javax.jms.Connection; <span class="hljs-keyword">import</span> javax.jms.ConnectionFactory; <span class="hljs-keyword">import</span> javax.jms.Destination; <span class="hljs-keyword">import</span> javax.jms.MessageConsumer; <span class="hljs-keyword">import</span> javax.jms.Session; <span class="hljs-keyword">import</span> javax.jms.TextMessage; <span class="hljs-keyword">import</span> org.apache.activemq.ActiveMQConnection; <span class="hljs-keyword">import</span> org.apache.activemq.ActiveMQConnectionFactory; <span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">Receiver</span>{</span> <span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">main</span>(String[] args) { <span class="hljs-comment">// ConnectionFactory :连接工厂,JMS 用它创建连接</span> ConnectionFactory connectionFactory; <span class="hljs-comment">// Connection :JMS 客户端到JMS Provider 的连接</span> Connection connection = <span class="hljs-keyword">null</span>; <span class="hljs-comment">// Session: 一个发送或接收消息的线程</span> Session session; <span class="hljs-comment">// Destination :消息的目的地;消息发送给谁.</span> Destination destination; <span class="hljs-comment">// 消费者,消息接收者</span> MessageConsumer consumer; connectionFactory = <span class="hljs-keyword">new</span> ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, <span class="hljs-string">"tcp://localhost:61616"</span>); <span class="hljs-keyword">try</span> { <span class="hljs-comment">// 构造从工厂得到连接对象</span> connection = connectionFactory.createConnection(); <span class="hljs-comment">// 启动</span> connection.start(); <span class="hljs-comment">// 获取操作连接</span> session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); <span class="hljs-comment">// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置</span> destination = session.createQueue(<span class="hljs-string">"FirstQueue"</span>); consumer = session.createConsumer(destination); <span class="hljs-keyword">while</span> (<span class="hljs-keyword">true</span>) { <span class="hljs-comment">// 设置接收者接收消息的时间,为了便于测试,这里谁定为100s</span> TextMessage message = (TextMessage) consumer.receive(<span class="hljs-number">100000</span>); <span class="hljs-keyword">if</span> (<span class="hljs-keyword">null</span> != message) { System.out.println(<span class="hljs-string">"收到消息"</span> + message.getText()); } <span class="hljs-keyword">else</span> { <span class="hljs-keyword">break</span>; } } } <span class="hljs-keyword">catch</span> (Exception e) { e.printStackTrace(); } <span class="hljs-keyword">finally</span> { <span class="hljs-keyword">try</span> { <span class="hljs-keyword">if</span> (<span class="hljs-keyword">null</span> != connection) connection.close(); } <span class="hljs-keyword">catch</span> (Throwable ignore) { } } } }</code><ul style="" class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li><li>5</li><li>6</li><li>7</li><li>8</li><li>9</li><li>10</li><li>11</li><li>12</li><li>13</li><li>14</li><li>15</li><li>16</li><li>17</li><li>18</li><li>19</li><li>20</li><li>21</li><li>22</li><li>23</li><li>24</li><li>25</li><li>26</li><li>27</li><li>28</li><li>29</li><li>30</li><li>31</li><li>32</li><li>33</li><li>34</li><li>35</li><li>36</li><li>37</li><li>38</li><li>39</li><li>40</li><li>41</li><li>42</li><li>43</li><li>44</li><li>45</li><li>46</li><li>47</li><li>48</li><li>49</li><li>50</li><li>51</li><li>52</li><li>53</li><li>54</li><li>55</li><li>56</li></ul><ul style="" class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li><li>5</li><li>6</li><li>7</li><li>8</li><li>9</li><li>10</li><li>11</li><li>12</li><li>13</li><li>14</li><li>15</li><li>16</li><li>17</li><li>18</li><li>19</li><li>20</li><li>21</li><li>22</li><li>23</li><li>24</li><li>25</li><li>26</li><li>27</li><li>28</li><li>29</li><li>30</li><li>31</li><li>32</li><li>33</li><li>34</li><li>35</li><li>36</li><li>37</li><li>38</li><li>39</li><li>40</li><li>41</li><li>42</li><li>43</li><li>44</li><li>45</li><li>46</li><li>47</li><li>48</li><li>49</li><li>50</li><li>51</li><li>52</li><li>53</li><li>54</li><li>55</li><li>56</li></ul>
3.3.测试结果
先运行接收者Receiver.java,在运行Sender.java。 得到结果如下图:(2个控制台都会输出如下图数据)
4.使用jfinal-ext中的jms插件操作activeMQ
整合quartz任务调度框架,实现每10秒发送一次消息到队列。
4.1.核心代码
<code class="language-java hljs has-numbering"><span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">main</span>(String[] args) <span class="hljs-keyword">throws</span> InstantiationException, IllegalAccessException, ClassNotFoundException { JmsPlugin jp = <span class="hljs-keyword">new</span> JmsPlugin(<span class="hljs-string">"jms.properties"</span>); jp.start(); PropertyConfig pc = PropertyConfig.me(); pc.loadPropertyFile(<span class="hljs-string">"job.properties"</span>); QuartzPlugin qp = <span class="hljs-keyword">new</span> QuartzPlugin(); <span class="hljs-keyword">if</span> (pc.getPropertyToBoolean(<span class="hljs-string">"a.enable"</span>)) { qp.add(pc.getProperty(<span class="hljs-string">"a.cron"</span>), (Job) Class.forName(pc.getProperty(<span class="hljs-string">"a.job"</span>)).newInstance()); } qp.start(); }</code><ul style="" class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li><li>5</li><li>6</li><li>7</li><li>8</li><li>9</li><li>10</li><li>11</li></ul><ul style="" class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li><li>5</li><li>6</li><li>7</li><li>8</li><li>9</li><li>10</li><li>11</li></ul>
4.2.配置文件jms.properties
<code class="language-txt hljs coffeescript has-numbering"><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">##</span> <span class="hljs-comment"># server info #</span> <span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">##</span> <span class="hljs-comment"># jms服务器地址</span> serverUrl=<span class="hljs-attribute">tcp</span>:<span class="hljs-regexp">//</span><span class="hljs-attribute">localhost</span>:<span class="hljs-number">61616</span> username=admin password=admin <span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">##</span> <span class="hljs-comment"># queue info #</span> <span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">##</span> <span class="hljs-comment"># 发送的队列名字,用“,”号分隔</span> sendQueues=firstMQ <span class="hljs-comment"># 接受的队列的名字,用“,”号分隔</span> receiveQueues=firstMQ <span class="hljs-comment"># 队列firstMQ上消息名字为a的消息号</span> queue.firstMQ.a=<span class="hljs-number">10000</span> <span class="hljs-comment">#接受到队列q1上消息名字为a的消息的时候调用的处理器</span> queue.firstMQ.a.resolver=com.mg.jfinal.ext.demo.resolver.MGResolver</code><ul style="" class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li><li>5</li><li>6</li><li>7</li><li>8</li><li>9</li><li>10</li><li>11</li><li>12</li><li>13</li><li>14</li><li>15</li><li>16</li><li>17</li><li>18</li><li>19</li><li>20</li></ul><ul style="" class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li><li>5</li><li>6</li><li>7</li><li>8</li><li>9</li><li>10</li><li>11</li><li>12</li><li>13</li><li>14</li><li>15</li><li>16</li><li>17</li><li>18</li><li>19</li><li>20</li></ul>
4.3.配置文件job.properties
<code class="language-txt hljs avrasm has-numbering"><span class="hljs-preprocessor">#JobA</span> a<span class="hljs-preprocessor">.job</span>=<span class="hljs-keyword">com</span><span class="hljs-preprocessor">.mg</span><span class="hljs-preprocessor">.jfinal</span><span class="hljs-preprocessor">.task</span><span class="hljs-preprocessor">.JobA</span> a<span class="hljs-preprocessor">.cron</span>=*/<span class="hljs-number">10</span> * * * * ? a<span class="hljs-preprocessor">.enable</span>=true</code><ul style="" class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li></ul><ul style="" class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li></ul>
4.4.运行结果
如图:
5.代码地址
http://git.oschina.net/mgang/activeMQ6. bat
@echo 启动程序
set JAVA_HOME=F:\back\dev\jdk1.7.0_71
set path=%java_home%\bin;
java -cp classes;./lib/UserAgentUtils-1.9.jar;./lib/spring-tx-4.3.2.RELEASE.jar;./lib/spring-messaging-4.3.2.RELEASE.jar;./lib/spring-jms-4.3.2.RELEASE.jar;./lib/spring-expression-4.3.2.RELEASE.jar;./lib/spring-core-4.3.2.RELEASE.jar;./lib/spring-context-4.3.2.RELEASE.jar;./lib/spring-beans-4.3.2.RELEASE.jar;./lib/spring-aop-4.3.2.RELEASE.jar;./lib/slf4j-api-1.7.10.jar;./lib/quartz-jobs-2.2.1.jar;./lib/quartz-2.2.1.jar;./lib/mybatis-spring-1.2.2.jar;./lib/mybatis-3.2.8.jar;./lib/logback-core-1.1.2.jar;./lib/logback-classic-1.1.2.jar;./lib/log4j-over-slf4j-1.7.10.jar;./lib/log4jdbc-log4j2-jdbc4-1.15.jar;./lib/log4j-1.2.12.jar;./lib/jul-to-slf4j-1.7.10.jar;./lib/jstl-1.2.jar;./lib/jfinal-ext-3.1.3.jar;./lib/jfinal-1.9.jar;./lib/jcl-over-slf4j-1.7.10.jar;./lib/hawtbuf-1.11.jar;./lib/guava-18.0.jar;./lib/geronimo-jta_1.0.1B_spec-1.0.1.jar;./lib/geronimo-jms_1.1_spec-1.1.1.jar;./lib/geronimo-j2ee-management_1.1_spec-1.0.1.jar;./lib/commons-pool2-2.4.2.jar;./lib/commons-logging-1.2.jar;./lib/commons-lang3-3.3.2.jar;./lib/c3p0-0.9.1.1.jar;./lib/activemq-pool-5.13.4.jar;./lib/activemq-jms-pool-5.13.4.jar;./lib/activemq-client-5.13.4.jar;./lib/activemq-all-5.5.0.jar;
com.mg.jfinal.ext.demo.TestJMS
pause
此博文转载处:http://blog.csdn.net/mg0324/article/details/49666429
相关文章推荐
- java消息队列ActiveMQ的简单使用
- [置顶] spring boot 使用activeMQ实现消息队列简单应用
- 简单实现Java消息队列之activemq
- 【Java消息中间件】Java消息中间件( 第4章 使用activemq - 队列模式、主题模式的消息演示 )
- Windows平台下的ActiveMQ消息队列的简单使用
- Java消息中间件学习笔记四 -- ActiveMQ的使用,【队列模式】
- 使用java实现阿里云消息队列简单封装
- 浅谈使用java实现阿里云消息队列简单封装
- JAVA消息中间件-ActiveMQ队列模式简单实例(笔记1)
- Java消息队列--ActiveMq简单实例
- Java使用starling分布式消息队列异步处理事务
- C#中使用Windows消息队列服务(MSMQ)简单示例
- spring-jms(activemq实现)使用queue发送消息简单例子
- java消息队列的使用场景
- golang:高性能消息队列moonmq的简单使用
- ActiveMQ(二):使用队列Queue方式发送消息
- java通过ActiveMQ实现JMS的消息队列实例
- MSMQ?不,太弱了。使用ActiveMQ实现消息队列服务
- java队列简单声明和使用
- java redis使用之利用jedis实现redis消息队列