您的位置:首页 > 编程语言 > Java开发

java消息队列ActiveMQ的简单使用

2016-08-03 14:39 846 查看

activeMQ

是学习java消息队列的实现项目,使用jfinal + jfinal-ext + activeMQ + quartz快速构建。

1.消息队列

消息队列,其实是一种基于数据结构实现的服务。而java语言中的实现,有apache的activeMQ,比较主流。

2.环境搭建

首先去apache的官网下载apache-activeMQ-...-.zip的包,解压后,运行bin中的activeMQ服务。
在浏览器中输入http://localhost:8186/admin,出现登陆界面输入admin/admin登陆即可。




然后创建一个FirstQueue队列(给后面的实例提供服务)。

3.activeMQ原始操作

记住activeMQ服务一定要一直开启,发送者和接收者都会通过tcp协议去链接服务器,以取得消息队列中的消息体。
如下图是我的服务器cmd截图:




3.1.首先建立发送者Sender.java

<code class="language-java hljs  has-numbering"><span class="hljs-keyword">package</span> com.mg.demo;

<span class="hljs-keyword">import</span> javax.jms.Connection;
<span class="hljs-keyword">import</span> javax.jms.ConnectionFactory;
<span class="hljs-keyword">import</span> javax.jms.DeliveryMode;
<span class="hljs-keyword">import</span> javax.jms.Destination;
<span class="hljs-keyword">import</span> javax.jms.MessageProducer;
<span class="hljs-keyword">import</span> javax.jms.Session;
<span class="hljs-keyword">import</span> javax.jms.TextMessage;

<span class="hljs-keyword">import</span> org.apache.activemq.ActiveMQConnection;
<span class="hljs-keyword">import</span> org.apache.activemq.ActiveMQConnectionFactory;

<span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">Sender</span> {</span>
<span class="hljs-keyword">private</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">final</span> <span class="hljs-keyword">int</span> SEND_NUMBER = <span class="hljs-number">5</span>;

<span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">main</span>(String[] args) {
<span class="hljs-comment">// ConnectionFactory :连接工厂,JMS 用它创建连接</span>
ConnectionFactory connectionFactory;
<span class="hljs-comment">// Connection :JMS 客户端到JMS</span>
<span class="hljs-comment">// Provider 的连接</span>
Connection connection = <span class="hljs-keyword">null</span>;
<span class="hljs-comment">// Session: 一个发送或接收消息的线程</span>
Session session;
<span class="hljs-comment">// Destination :消息的目的地;消息发送给谁.</span>
Destination destination;
<span class="hljs-comment">// MessageProducer:消息发送者</span>
MessageProducer producer;
<span class="hljs-comment">// TextMessage message;</span>
<span class="hljs-comment">// 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar</span>
connectionFactory = <span class="hljs-keyword">new</span> ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, <span class="hljs-string">"tcp://localhost:61616"</span>);
<span class="hljs-keyword">try</span> { <span class="hljs-comment">// 构造从工厂得到连接对象</span>
connection = connectionFactory.createConnection();
<span class="hljs-comment">// 启动</span>
connection.start();
<span class="hljs-comment">// 获取操作连接</span>
session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
<span class="hljs-comment">// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置</span>
destination = session.createQueue(<span class="hljs-string">"FirstQueue"</span>);
<span class="hljs-comment">// 得到消息生成者【发送者】</span>
producer = session.createProducer(destination);
<span class="hljs-comment">// 设置不持久化,此处学习,实际根据项目决定</span>
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
<span class="hljs-comment">// 构造消息,此处写死,项目就是参数,或者方法获取</span>
sendMessage(session, producer);
session.commit();
} <span class="hljs-keyword">catch</span> (Exception e) {
e.printStackTrace();
} <span class="hljs-keyword">finally</span> {
<span class="hljs-keyword">try</span> {
<span class="hljs-keyword">if</span> (<span class="hljs-keyword">null</span> != connection)
connection.close();
} <span class="hljs-keyword">catch</span> (Throwable ignore) {
}
}
}

<span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">sendMessage</span>(Session session, MessageProducer producer) <span class="hljs-keyword">throws</span> Exception {
<span class="hljs-keyword">for</span> (<span class="hljs-keyword">int</span> i = <span class="hljs-number">1</span>; i <= SEND_NUMBER; i++) {
TextMessage message = session.createTextMessage(<span class="hljs-string">"ActiveMq 发送的消息"</span> + i);
<span class="hljs-comment">// 发送消息到目的地方</span>

System.out.println(<span class="hljs-string">"发送消息:"</span> + <span class="hljs-string">"ActiveMq 发送的消息"</span> + i);
producer.send(message);
}
}
}
</code><ul style="" class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li><li>5</li><li>6</li><li>7</li><li>8</li><li>9</li><li>10</li><li>11</li><li>12</li><li>13</li><li>14</li><li>15</li><li>16</li><li>17</li><li>18</li><li>19</li><li>20</li><li>21</li><li>22</li><li>23</li><li>24</li><li>25</li><li>26</li><li>27</li><li>28</li><li>29</li><li>30</li><li>31</li><li>32</li><li>33</li><li>34</li><li>35</li><li>36</li><li>37</li><li>38</li><li>39</li><li>40</li><li>41</li><li>42</li><li>43</li><li>44</li><li>45</li><li>46</li><li>47</li><li>48</li><li>49</li><li>50</li><li>51</li><li>52</li><li>53</li><li>54</li><li>55</li><li>56</li><li>57</li><li>58</li><li>59</li><li>60</li><li>61</li><li>62</li><li>63</li><li>64</li><li>65</li><li>66</li><li>67</li><li>68</li><li>69</li></ul><ul style="" class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li><li>5</li><li>6</li><li>7</li><li>8</li><li>9</li><li>10</li><li>11</li><li>12</li><li>13</li><li>14</li><li>15</li><li>16</li><li>17</li><li>18</li><li>19</li><li>20</li><li>21</li><li>22</li><li>23</li><li>24</li><li>25</li><li>26</li><li>27</li><li>28</li><li>29</li><li>30</li><li>31</li><li>32</li><li>33</li><li>34</li><li>35</li><li>36</li><li>37</li><li>38</li><li>39</li><li>40</li><li>41</li><li>42</li><li>43</li><li>44</li><li>45</li><li>46</li><li>47</li><li>48</li><li>49</li><li>50</li><li>51</li><li>52</li><li>53</li><li>54</li><li>55</li><li>56</li><li>57</li><li>58</li><li>59</li><li>60</li><li>61</li><li>62</li><li>63</li><li>64</li><li>65</li><li>66</li><li>67</li><li>68</li><li>69</li></ul>

3.2.再创建接收者Receiver.java

<code class="language-java hljs  has-numbering"><span class="hljs-keyword">package</span> com.mg.demo;

<span class="hljs-keyword">import</span> javax.jms.Connection;
<span class="hljs-keyword">import</span> javax.jms.ConnectionFactory;
<span class="hljs-keyword">import</span> javax.jms.Destination;
<span class="hljs-keyword">import</span> javax.jms.MessageConsumer;
<span class="hljs-keyword">import</span> javax.jms.Session;
<span class="hljs-keyword">import</span> javax.jms.TextMessage;

<span class="hljs-keyword">import</span> org.apache.activemq.ActiveMQConnection;
<span class="hljs-keyword">import</span> org.apache.activemq.ActiveMQConnectionFactory;

<span class="hljs-keyword">public</span> <span class="hljs-class"><span class="hljs-keyword">class</span> <span class="hljs-title">Receiver</span>{</span>
<span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">main</span>(String[] args) {
<span class="hljs-comment">// ConnectionFactory :连接工厂,JMS 用它创建连接</span>
ConnectionFactory connectionFactory;
<span class="hljs-comment">// Connection :JMS 客户端到JMS Provider 的连接</span>
Connection connection = <span class="hljs-keyword">null</span>;
<span class="hljs-comment">// Session: 一个发送或接收消息的线程</span>
Session session;
<span class="hljs-comment">// Destination :消息的目的地;消息发送给谁.</span>
Destination destination;
<span class="hljs-comment">// 消费者,消息接收者</span>
MessageConsumer consumer;
connectionFactory = <span class="hljs-keyword">new</span> ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, <span class="hljs-string">"tcp://localhost:61616"</span>);
<span class="hljs-keyword">try</span> {
<span class="hljs-comment">// 构造从工厂得到连接对象</span>
connection = connectionFactory.createConnection();
<span class="hljs-comment">// 启动</span>
connection.start();
<span class="hljs-comment">// 获取操作连接</span>
session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
<span class="hljs-comment">// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置</span>
destination = session.createQueue(<span class="hljs-string">"FirstQueue"</span>);
consumer = session.createConsumer(destination);
<span class="hljs-keyword">while</span> (<span class="hljs-keyword">true</span>) {
<span class="hljs-comment">// 设置接收者接收消息的时间,为了便于测试,这里谁定为100s</span>
TextMessage message = (TextMessage) consumer.receive(<span class="hljs-number">100000</span>);
<span class="hljs-keyword">if</span> (<span class="hljs-keyword">null</span> != message) {
System.out.println(<span class="hljs-string">"收到消息"</span> + message.getText());
} <span class="hljs-keyword">else</span> {
<span class="hljs-keyword">break</span>;
}
}
} <span class="hljs-keyword">catch</span> (Exception e) {
e.printStackTrace();
} <span class="hljs-keyword">finally</span> {
<span class="hljs-keyword">try</span> {
<span class="hljs-keyword">if</span> (<span class="hljs-keyword">null</span> != connection)
connection.close();
} <span class="hljs-keyword">catch</span> (Throwable ignore) {
}
}
}
}</code><ul style="" class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li><li>5</li><li>6</li><li>7</li><li>8</li><li>9</li><li>10</li><li>11</li><li>12</li><li>13</li><li>14</li><li>15</li><li>16</li><li>17</li><li>18</li><li>19</li><li>20</li><li>21</li><li>22</li><li>23</li><li>24</li><li>25</li><li>26</li><li>27</li><li>28</li><li>29</li><li>30</li><li>31</li><li>32</li><li>33</li><li>34</li><li>35</li><li>36</li><li>37</li><li>38</li><li>39</li><li>40</li><li>41</li><li>42</li><li>43</li><li>44</li><li>45</li><li>46</li><li>47</li><li>48</li><li>49</li><li>50</li><li>51</li><li>52</li><li>53</li><li>54</li><li>55</li><li>56</li></ul><ul style="" class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li><li>5</li><li>6</li><li>7</li><li>8</li><li>9</li><li>10</li><li>11</li><li>12</li><li>13</li><li>14</li><li>15</li><li>16</li><li>17</li><li>18</li><li>19</li><li>20</li><li>21</li><li>22</li><li>23</li><li>24</li><li>25</li><li>26</li><li>27</li><li>28</li><li>29</li><li>30</li><li>31</li><li>32</li><li>33</li><li>34</li><li>35</li><li>36</li><li>37</li><li>38</li><li>39</li><li>40</li><li>41</li><li>42</li><li>43</li><li>44</li><li>45</li><li>46</li><li>47</li><li>48</li><li>49</li><li>50</li><li>51</li><li>52</li><li>53</li><li>54</li><li>55</li><li>56</li></ul>

3.3.测试结果

先运行接收者Receiver.java,在运行Sender.java。
得到结果如下图:(2个控制台都会输出如下图数据)




4.使用jfinal-ext中的jms插件操作activeMQ

整合quartz任务调度框架,实现每10秒发送一次消息到队列。

4.1.核心代码

<code class="language-java hljs  has-numbering"><span class="hljs-keyword">public</span> <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> <span class="hljs-title">main</span>(String[] args) <span class="hljs-keyword">throws</span> InstantiationException, IllegalAccessException, ClassNotFoundException {
JmsPlugin jp = <span class="hljs-keyword">new</span> JmsPlugin(<span class="hljs-string">"jms.properties"</span>);
jp.start();
PropertyConfig pc = PropertyConfig.me();
pc.loadPropertyFile(<span class="hljs-string">"job.properties"</span>);
QuartzPlugin qp = <span class="hljs-keyword">new</span> QuartzPlugin();
<span class="hljs-keyword">if</span> (pc.getPropertyToBoolean(<span class="hljs-string">"a.enable"</span>)) {
qp.add(pc.getProperty(<span class="hljs-string">"a.cron"</span>), (Job) Class.forName(pc.getProperty(<span class="hljs-string">"a.job"</span>)).newInstance());
}
qp.start();
}</code><ul style="" class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li><li>5</li><li>6</li><li>7</li><li>8</li><li>9</li><li>10</li><li>11</li></ul><ul style="" class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li><li>5</li><li>6</li><li>7</li><li>8</li><li>9</li><li>10</li><li>11</li></ul>

4.2.配置文件jms.properties

<code class="language-txt hljs coffeescript has-numbering"><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">##</span>
<span class="hljs-comment">#          server info         #</span>
<span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">##</span>
<span class="hljs-comment"># jms服务器地址</span>
serverUrl=<span class="hljs-attribute">tcp</span>:<span class="hljs-regexp">//</span><span class="hljs-attribute">localhost</span>:<span class="hljs-number">61616</span>
username=admin
password=admin

<span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">##</span>
<span class="hljs-comment">#          queue info          #</span>
<span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">######</span><span class="hljs-comment">##</span>
<span class="hljs-comment"># 发送的队列名字,用“,”号分隔</span>
sendQueues=firstMQ

<span class="hljs-comment"># 接受的队列的名字,用“,”号分隔</span>
receiveQueues=firstMQ
<span class="hljs-comment"># 队列firstMQ上消息名字为a的消息号</span>
queue.firstMQ.a=<span class="hljs-number">10000</span>
<span class="hljs-comment">#接受到队列q1上消息名字为a的消息的时候调用的处理器</span>
queue.firstMQ.a.resolver=com.mg.jfinal.ext.demo.resolver.MGResolver</code><ul style="" class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li><li>5</li><li>6</li><li>7</li><li>8</li><li>9</li><li>10</li><li>11</li><li>12</li><li>13</li><li>14</li><li>15</li><li>16</li><li>17</li><li>18</li><li>19</li><li>20</li></ul><ul style="" class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li><li>5</li><li>6</li><li>7</li><li>8</li><li>9</li><li>10</li><li>11</li><li>12</li><li>13</li><li>14</li><li>15</li><li>16</li><li>17</li><li>18</li><li>19</li><li>20</li></ul>

4.3.配置文件job.properties

<code class="language-txt hljs avrasm has-numbering"><span class="hljs-preprocessor">#JobA</span>
a<span class="hljs-preprocessor">.job</span>=<span class="hljs-keyword">com</span><span class="hljs-preprocessor">.mg</span><span class="hljs-preprocessor">.jfinal</span><span class="hljs-preprocessor">.task</span><span class="hljs-preprocessor">.JobA</span>
a<span class="hljs-preprocessor">.cron</span>=*/<span class="hljs-number">10</span> * * * * ?
a<span class="hljs-preprocessor">.enable</span>=true</code><ul style="" class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li></ul><ul style="" class="pre-numbering"><li>1</li><li>2</li><li>3</li><li>4</li></ul>

4.4.运行结果

如图:




5.代码地址

http://git.oschina.net/mgang/activeMQ

6. bat


@echo 启动程序

set JAVA_HOME=F:\back\dev\jdk1.7.0_71

set path=%java_home%\bin;

java -cp classes;./lib/UserAgentUtils-1.9.jar;./lib/spring-tx-4.3.2.RELEASE.jar;./lib/spring-messaging-4.3.2.RELEASE.jar;./lib/spring-jms-4.3.2.RELEASE.jar;./lib/spring-expression-4.3.2.RELEASE.jar;./lib/spring-core-4.3.2.RELEASE.jar;./lib/spring-context-4.3.2.RELEASE.jar;./lib/spring-beans-4.3.2.RELEASE.jar;./lib/spring-aop-4.3.2.RELEASE.jar;./lib/slf4j-api-1.7.10.jar;./lib/quartz-jobs-2.2.1.jar;./lib/quartz-2.2.1.jar;./lib/mybatis-spring-1.2.2.jar;./lib/mybatis-3.2.8.jar;./lib/logback-core-1.1.2.jar;./lib/logback-classic-1.1.2.jar;./lib/log4j-over-slf4j-1.7.10.jar;./lib/log4jdbc-log4j2-jdbc4-1.15.jar;./lib/log4j-1.2.12.jar;./lib/jul-to-slf4j-1.7.10.jar;./lib/jstl-1.2.jar;./lib/jfinal-ext-3.1.3.jar;./lib/jfinal-1.9.jar;./lib/jcl-over-slf4j-1.7.10.jar;./lib/hawtbuf-1.11.jar;./lib/guava-18.0.jar;./lib/geronimo-jta_1.0.1B_spec-1.0.1.jar;./lib/geronimo-jms_1.1_spec-1.1.1.jar;./lib/geronimo-j2ee-management_1.1_spec-1.0.1.jar;./lib/commons-pool2-2.4.2.jar;./lib/commons-logging-1.2.jar;./lib/commons-lang3-3.3.2.jar;./lib/c3p0-0.9.1.1.jar;./lib/activemq-pool-5.13.4.jar;./lib/activemq-jms-pool-5.13.4.jar;./lib/activemq-client-5.13.4.jar;./lib/activemq-all-5.5.0.jar;
com.mg.jfinal.ext.demo.TestJMS

pause

此博文转载处:http://blog.csdn.net/mg0324/article/details/49666429
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: