您的位置:首页 > 其它

ActiveMQ 入门helloworld

2017-08-31 17:57 330 查看
1.下载安装ActiveMQ

官网下载地址:http://activemq.apache.org/download.html

ActiveMQ 提供了Windows 和Linux、Unix 等几个版本,我选择了Windows 版本下进行开发。



下载完压缩包后,直接解压:



 

目录:

         bin存放的是脚本文件

         conf存放的是基本配置文件

        data存放的是日志文件

        docs存放的是说明文档

        examples存放的是简单的实例

        lib存放的是activemq所需jar包

        webapps用于存放项目的目录

 

2.启动ActiveMQ

可以在命令行中用命令启动,我这里就直接鼠标点击运行脚本启动了



ActiveMQ默认启动时,启动了内置的jetty服务器,提供一个用于监控ActiveMQ的admin应用。

admin:http://127.0.0.1:8161/admin/

我们在浏览器打开链接之后输入账号密码(默认账号密码都是admin),类似于访问tomcat。



 

登陆成功后页面显示:



ActiveMQ启动就完成了,关闭直接点击关闭脚本或用命令管就可以了。

 

3.不多bb,直接上helloworld代码

创建一个maven项目,最好是用jdk1.8,我用jdk1.7会报错,pom.xml依赖如下:

    <dependencies>

      <dependency>

          <groupId>org.apache.activemq</groupId>

          <artifactId>activemq-all</artifactId>

          <version>5.15.0</version>

      </dependency>

    </dependencies>

3.1创建生产者类:

package com.iflytek;

import java.util.concurrent.atomic.AtomicInteger;

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.JMSException;

import javax.jms.MessageProducer;

import javax.jms.Queue;

import javax.jms.Session;

import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;

import org.apache.activemq.ActiveMQConnectionFactory;

/**

 *

 * @author cjf

 * 生产者

 */

public class Producter {

    //ActiveMq 的默认用户名

    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;

    //ActiveMq 的默认登录密码

    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;

    //ActiveMQ 的链接地址

    private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    AtomicInteger count = new AtomicInteger(0);

    //链接工厂

    ConnectionFactory connectionFactory;

    //链接对象

    Connection connection;

    //事务管理

    Session session;

    ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>();

    public void init(){

        try {

            //创建一个链接工厂

            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);

            //从工厂中创建一个链接

            connection  = connectionFactory.createConnection();

            //开启链接

            connection.start();

            //创建一个事务(这里通过参数可以设置事务的级别)

            session = connection.createSession(true,Session.SESSION_TRANSACTED);

        } catch (JMSException e) {

            e.printStackTrace();

        }

    }

    public void sendMessage(String disname){

        try {

            //创建一个消息队列

            Queue queue = session.createQueue(disname);

            //消息生产者

            MessageProducer messageProducer = null;

            if(threadLocal.get()!=null){

                messageProducer = threadLocal.get();

            }else{

                messageProducer = session.createProducer(queue);

                threadLocal.set(messageProducer);

            }

           while(true){

                Thread.sleep(1000);

                int num = count.getAndIncrement();

                //创建一条消息

                TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+

                        "productor:我是大帅哥,我现在正在生产东西!,count:"+num);

                System.out.println(Thread.currentThread().getName()+

                        "productor:我是大帅哥,我现在正在生产东西!,count:"+num);

                //发送消息

                messageProducer.send(msg);

                //提交事务

                session.commit();

            }

        } catch (JMSException e) {

            e.printStackTrace();

        } catch (InterruptedException e) {

            e.printStackTrace();

        }

    }

}

 

3.2,创建消费者类:

package com.iflytek;

import java.util.concurrent.atomic.AtomicInteger;

import javax.jms.Connection;

import javax.jms.ConnectionFactory;

import javax.jms.JMSException;

import javax.jms.MessageConsumer;

import javax.jms.Queue;

import javax.jms.Session;

import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;

import org.apache.activemq.ActiveMQConnectionFactory;

/**

 *

 * @author cjf

 * 消费者

 */

public class Comsumer {

    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;

    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;

    private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;

    ConnectionFactory connectionFactory;

    Connection connection;

    Session session;

    ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>();

    AtomicInteger count = new AtomicInteger();

    public void init(){

        try {

            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);

            connection  = connectionFactory.createConnection();

            connection.start();

            session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

        } catch (JMSException e) {

            e.printStackTrace();

        }

    }

    public void getMessage(String disname){

        try {

            Queue queue = session.createQueue(disname);

            MessageConsumer consumer = null;

            if(threadLocal.get()!=null){

                consumer = threadLocal.get();

            }else{

                consumer = session.createConsumer(queue);

                threadLocal.set(consumer);

            }

            while(true){

                Thread.sleep(1000);

                TextMessage msg = (TextMessage) consumer.receive();

                if(msg!=null) {

                    msg.acknowledge();

                    System.out.println(Thread.currentThread().getName()+": Consumer:我是消费者,我正在消费Msg"+msg.getText()+"--->"+count.getAndIncrement());

                }else {

                    break;

                }

            }

        } catch (JMSException e) {

            e.printStackTrace();

        } catch (InterruptedException e) {

            e.printStackTrace();

        }

    }

}

 

4.运行ActiveMQ项目,进行测试

4.1,生产者生产消息:

package com.iflytek;

/**

 *

 * @author cjf

 * 测试生产者生产消息

 */

public class TestProducter {

    public static void main(String[] args){

        Producter producter = new Producter();

        producter.init();

        TestProducter testMq = new TestProducter();

        try {

            Thread.sleep(1000);

        } catch (InterruptedException e) {

            e.printStackTrace();

        }

        //Thread 1

        new Thread(testMq.new ProductorMq(producter)).start();

        //Thread 2

        new Thread(testMq.new ProductorMq(producter)).start();

        //Thread 3

        new Thread(testMq.new ProductorMq(producter)).start();

        //Thread 4

        new Thread(testMq.new ProductorMq(producter)).start();

        //Thread 5

        new Thread(testMq.new ProductorMq(producter)).start();

    }

    private class ProductorMq implements Runnable{

        Producter producter;

        public ProductorMq(Producter producter){

            this.producter = producter;

        }

        @Override

        public void run() {

            while(true){

                try {

                    producter.sendMessage("Jaycekon-MQ");

                    Thread.sleep(10000);

                } catch (InterruptedException e) {

                    e.printStackTrace();

                }

            }

        }

    }

}

运行结果:



4.2,测试消费者消费消息:

package com.iflytek;

/**

 *

 * @author cjf

 * activeMQ测试消费者消费消息

 */

public class TestConsumer {

    public static void main(String[] args){

        Comsumer comsumer = new Comsumer();

        comsumer.init();

        TestConsumer testConsumer = new TestConsumer();

        new Thread(testConsumer.new ConsumerMq(comsumer)).start();

        new Thread(testConsumer.new ConsumerMq(comsumer)).start();

        new Thread(testConsumer.new ConsumerMq(comsumer)).start();

        new Thread(testConsumer.new ConsumerMq(comsumer)).start();

    }

    private class ConsumerMq implements Runnable{

        Comsumer comsumer;

        public ConsumerMq(Comsumer comsumer){

            this.comsumer = comsumer;

        }

        @Override

        public void run() {

            while(true){

                try {

                    comsumer.getMessage("Jaycekon-MQ");

                    Thread.sleep(10000);

                } catch (InterruptedException e) {

                    e.printStackTrace();

                }

            }

        }

    }

}

运行结果:



 

 

5.ActiveMQ的helloworld就写完了,更多MQ其他的知识自己再好好学,我也是个菜鸡刚看了2天这东西,求关注啊,可以交流一下。

 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: