10
06月
2025
JAVA 程序连接ActiveMq 发送接收消息
POM.xml
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
Controller
我测试的时候,采用的是浏览器输入url路径地址,执行代码,具体可根据情况选择启动方式
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import javax.jms.*;
import java.util.Date;
@Controller
@RequestMapping("/compare/jddjorder")
public class TestController{
@GetMapping("/send/{msg}")
public String send(@PathVariable("msg") String msg) throws Exception {
// 创建连接工厂
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://xxx:61616");
// 获取 connection (线上用户名密码,这一步在上篇文章中 提及的 后台管理入口的用户名 密码)
Connection connection = (Connection) connectionFactory.createConnection("admin", "admin");
connection.start();
// 创建会话 session,参数第一个是事务,第二个是签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//Topic topic = session.createTopic("topicName");
// 创建目的地,queue 或者 topic 具体区别看下面介绍
Queue queue=session.createQueue("testQueue");
// MessageProducer producer = session.createProducer(topic);
// 创建消息的生产者
MessageProducer producer = session.createProducer(queue);
TextMessage textMessage = session.createTextMessage("message--" +msg+ new Date());
for (int w=1;w<=10000;w++){
textMessage=session.createTextMessage("message"+w+"--" +msg+ new Date());
producer.send(textMessage);
}
producer.close();
session.close();
connection.close();
System.out.println("消息发送成功~");
return "";
}
@GetMapping("/accept")
public String accept() throws Exception {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://139.196.230.48:61616");
Connection connection = connectionFactory.createConnection("admin", "admin");
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Topic topic = session.createTopic("topicName");
// MessageConsumer consumer = session.createConsumer(topic);
Queue queue=session.createQueue("testQueue");
// 创建消费者
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener((message) -> {
if (message != null && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消费者接收到消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
System.in.read();
consumer.close();
session.close();
connection.close();
return "";
}
}
队列(Queue)点对点的模式:
主要建立在一个队列上面,当连接一个列队的时候,发送端不需要知道接 收端是否正在接收,可以直接向 ActiveMQ 发送消息,发送的消息,将会先进入队列中,如
果有接收端在监听,则会发向接收端,如果没有接收端接收,则会保存在 activemq 服务器, 直到接收端接收消息,点对点的消息模式可以有多个发送端,多个接收
端,但是一条消息, 只会被一个接收端给接收到,哪个接收端先连上 ActiveMQ,则会先接收到,而后来的接收 端则接收不到那条消息。
主题 (Topic) 订阅-发布模式
消息的发送方称为发布者(Publisher),消息接收者称为订阅者(Subscriber),服务端存放消息的容器称为(Topic),发布者将消息发送到 Topic 中,订阅者需要订
阅主题,每个订阅者都可以接收到订阅之后主题发布所有的消息,生产者和消费者有时间上的相关性,生产者生产时,topic 不保存消息它是无状态的,应该先启动消费
者再启动生产者,当先启动服务提供方,在启动服务消费方,那么消费方是订阅不到自身启动前的数据
下一篇: Linux 系统解压 rar 压缩文件