10 06 2025

JAVA 程序连接ActiveMq 发送接收消息


POM.xml


<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-core</artifactId>
    <version>5.7.0</version>
</dependency>


Controller


我测试的时候,采用的是浏览器输入url路径地址,执行代码,具体可根据情况选择启动方式


import org.apache.activemq.ActiveMQConnectionFactory; 
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import javax.jms.*;
import java.util.Date;

@Controller
@RequestMapping("/compare/jddjorder")
public class TestController{
    @GetMapping("/send/{msg}")
    public String send(@PathVariable("msg") String msg) throws Exception {
        // 创建连接工厂
         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://xxx:61616");
        // 获取 connection  (线上用户名密码,这一步在上篇文章中 提及的 后台管理入口的用户名 密码)
        Connection connection = (Connection) connectionFactory.createConnection("admin", "admin");
        connection.start();
        // 创建会话 session,参数第一个是事务,第二个是签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //Topic topic = session.createTopic("topicName");
       // 创建目的地,queue 或者 topic  具体区别看下面介绍
        Queue queue=session.createQueue("testQueue");
//        MessageProducer producer = session.createProducer(topic);
       // 创建消息的生产者
        MessageProducer producer = session.createProducer(queue);
        TextMessage textMessage = session.createTextMessage("message--" +msg+ new Date());
        for (int w=1;w<=10000;w++){
            textMessage=session.createTextMessage("message"+w+"--" +msg+ new Date());
            producer.send(textMessage);
        }

        producer.close();
        session.close();
        connection.close();
        System.out.println("消息发送成功~");
        return "";
    }
    @GetMapping("/accept")
    public String accept() throws Exception {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://139.196.230.48:61616");
        Connection connection = connectionFactory.createConnection("admin", "admin");
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//        Topic topic = session.createTopic("topicName");
//        MessageConsumer consumer = session.createConsumer(topic);
        Queue queue=session.createQueue("testQueue");
       // 创建消费者
        MessageConsumer consumer = session.createConsumer(queue);
        consumer.setMessageListener((message) -> {
            if (message != null && message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("消费者接收到消息:" + textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        System.in.read();
        consumer.close();
        session.close();
        connection.close();
        return "";
    }
}


队列(Queue)点对点的模式:


主要建立在一个队列上面,当连接一个列队的时候,发送端不需要知道接 收端是否正在接收,可以直接向 ActiveMQ 发送消息,发送的消息,将会先进入队列中,如


果有接收端在监听,则会发向接收端,如果没有接收端接收,则会保存在 activemq 服务器, 直到接收端接收消息,点对点的消息模式可以有多个发送端,多个接收


端,但是一条消息, 只会被一个接收端给接收到,哪个接收端先连上 ActiveMQ,则会先接收到,而后来的接收 端则接收不到那条消息。



主题 (Topic) 订阅-发布模式


消息的发送方称为发布者(Publisher),消息接收者称为订阅者(Subscriber),服务端存放消息的容器称为(Topic),发布者将消息发送到 Topic 中,订阅者需要订


阅主题,每个订阅者都可以接收到订阅之后主题发布所有的消息,生产者和消费者有时间上的相关性,生产者生产时,topic 不保存消息它是无状态的,应该先启动消费


者再启动生产者,当先启动服务提供方,在启动服务消费方,那么消费方是订阅不到自身启动前的数据