准备工作

  1. 从下面官网下载ActiveMQ,本文使用5.7版本,写博时Maven库能找到的最高版本为5.7

  2. 解压缩下载的文件到你希望安装的目录

  3. 进入bin目录运行activemq.bat以启动ActiveMQ服务,启动后默认

    broker URL : tcp://localhost:61616

    Admin URL:http://localhost:8161/admin

  4. 打开Admin URL可以查看Queue、Topic等信息

  5. 关于ActiveMQ相关依赖包就交给Gradle工具,请参看demo配置

JMS框架基本角色和编程模块

  • JMS Message Producers : 消息生产者,向消息队列提供消息

  • JMS Provider(Broker) : JMS提供商,如ActiveMQ,提供JMS Queues/Topics服务

  • JMS Message listener/Consumer :接收并使用消息

JMS Provider有如数据库,Producers/Consumers发送/接收消息前需要先连接到Provider,与建立数据库连接相似,JMS ConnectionFactory负责创建JMS Connection,JMS Connection创建JMS Session

  • JMS ConnectionFactory : Producers/Consumers用于创建一个到Provider的连接

  • JMS Connection :封装一个到Provider的连接

  • JMS Session : 消息发送接收上下文

在JMS Provider上可以定义多个Queue和Topic,Producers发送消息到哪个Queue/Topic,称具体的那个Queue/Topic为Destination.

  • JMS Destination : 一对一的Queue或者一对多的Topic

关于JMS编程模型可以参考以下文章:

Spring集成配置

  • Queue消息发送者(JMS Message Producers) :

ConnectionFactory : 用于连接Provider,支持连接池配置

JmsTemplate : Spring封装类,可用于创造消息、发送消息、接收消息等

1
2
3
4
5
6
7
<
bean 
id
=
"connectionFactory" 
class
=
"org.apache.activemq.ActiveMQConnectionFactory"
    
p:brokerURL
=
"tcp://localhost:61616" 
/>
<
bean 
id
=
"jmsTemplate" 
class
=
"org.springframework.jms.core.JmsTemplate"
>
    
<
constructor-arg 
name
=
"connectionFactory" 
ref
=
"connectionFactory" 
/>
    
<
property 
name
=
"defaultDestinationName" 
value
=
"TestQueue" 
/>
</
bean
>
<
context:component-scan 
base-package
=
"com.stevex.demo" 
/>
  • 连接池配置示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<!-- a pooling based JMS provider -->
  
<
bean 
id
=
"jmsFactory" 
class
=
"org.apache.activemq.pool.PooledConnectionFactory" 
destroy-method
=
"stop"
>
    
<
property 
name
=
"connectionFactory"
>
      
<
bean 
class
=
"org.apache.activemq.ActiveMQConnectionFactory"
>
        
<
property 
name
=
"brokerURL"
>
          
<
value
>tcp://localhost:61616</
value
>
        
</
property
>
      
</
bean
>
    
</
property
>
  
</
bean
>
                                                                                                  
  
<!-- Spring JMS Template -->
  
<
bean 
id
=
"jmsTemplate" 
class
=
"org.springframework.jms.core.JmsTemplate"
>
    
<
property 
name
=
"connectionFactory"
>
      
<
ref 
local
=
"jmsFactory"
/>
    
</
property
>
  
</
bean
>
  • Queue消息接收者(JMS Message listener) :

ConnectionFactory : 用于连接Provider,支持连接池配置

Listener : 接收消息

jms:listener-container 简化了配置工作,destination只需要给queue/topic名称,不需要额外定义

1
2
3
4
5
6
7
8
<
bean 
id
=
"connectionFactory" 
class
=
"org.apache.activemq.ActiveMQConnectionFactory"
    
p:brokerURL
=
"tcp://localhost:61616" 
/>
<
jms:listener-container 
container-type
=
"default"
    
connection-factory
=
"connectionFactory" 
acknowledge
=
"auto"
>
    
<
jms:listener 
destination
=
"TestQueue" 
ref
=
"messageListener"
        
method
=
"onMessage" 
/>
</
jms:listener-container
>
<
bean 
id
=
"messageListener" 
class
=
"com.stevex.demo.SimpleMessageListener"
/>
  • Topic消息发送者(JMS Message Producers) :

ConnectionFactory : 用于连接Provider,支持连接池配置

JmsTemplate : Spring封装类,可用于创造消息、发送消息、接收消息等

JmsTemplate 的pubSubDomain属性值为true表示destination为Topic类型,默认false表示destination为Queue类型。

1
2
3
4
5
6
7
8
<
bean 
id
=
"connectionFactory" 
class
=
"org.apache.activemq.ActiveMQConnectionFactory"
    
p:brokerURL
=
"tcp://localhost:61616" 
/>
<
bean 
id
=
"jmsTemplate" 
class
=
"org.springframework.jms.core.JmsTemplate"
>
    
<
constructor-arg 
name
=
"connectionFactory" 
ref
=
"connectionFactory" 
/>
    
<
property 
name
=
"defaultDestinationName" 
value
=
"TestTopic" 
/>
    
<
property 
name
=
"pubSubDomain" 
value
=
"true" 
/>
</
bean
>
<
context:component-scan 
base-package
=
"com.stevex.demo" 
/>
  • Topic消息接收者(JMS Message listener) :

ConnectionFactory : 用于连接Provider,支持连接池配置

Listener : 接收消息

Topic类型的消息,所有订阅者都可以接收到,本文Demo定义了两个接收者。

jms:listener-container 的destination-type属性默认值为queue,如果是Topic需要显示指定。

1
2
3
4
5
6
7
8
9
<
bean 
id
=
"connectionFactory" 
class
=
"org.apache.activemq.ActiveMQConnectionFactory"
    
p:brokerURL
=
"tcp://localhost:61616" 
/>
<
bean 
id
=
"subscriber1" 
class
=
"com.stevex.demo.SimpleMessageListener" 
/>
<
jms:listener-container 
container-type
=
"default"
    
destination-type
=
"topic" 
connection-factory
=
"connectionFactory"
    
acknowledge
=
"auto"
>
    
<
jms:listener 
destination
=
"TestTopic" 
ref
=
"subscriber1"
        
method
=
"onMessage" 
/>
</
jms:listener-container
>
  • Producer类实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Component
(
"messageSender"
)
public 
class 
SimpleMessageSender 
implements 
MessageSender {
    
@Autowired
    
private 
JmsTemplate jmsTemplate;
    
@Override
    
public 
void 
sendMessage(
final 
String message) {
        
jmsTemplate.send(
new 
MessageCreator() {
                                                                                                   
            
public 
Message createMessage(Session session) 
throws 
JMSException {
                
return 
session.createTextMessage(message);
            
}
                                                                                                   
        
});
    
}
}
  • Listener类实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
@Component
(
"messageListener"
)
public 
class 
SimpleMessageListener 
implements 
MessageListener {
    
private 
static 
final 
Logger logger = LoggerFactory.getLogger(SimpleMessageListener.
class
);
    
public 
void 
onMessage(Message message) {
        
TextMessage textMessage = (TextMessage) message;
                                                                                             
        
try 
{
            
logger.info(
"Message received: " 
+ textMessage.getText());
        
catch 
(JMSException ex) {
            
logger.error(
"JMS error"
, ex);
        
}
    
}
}

应用跑起来后,ActiveMQ接收到连接后,如果请求的Queue/Topic不存在它会自动创建,我们也可以通过ActiveMQ Admin界面给Listener发消息。