Java消息中间件

学习这个的契机是在做练手项目的时候用到了兔子(RabbitMQ),它遵循 AMQP 协议,属于消息中间件实现的一种,既然这样就来看看什么是消息中间件。
正好看到慕课网有相关的课程就顺便学习下吧,为什么使用消息中间件?
解耦、异步、横向扩展、安全可靠、顺序保证,这些还不够么,回想 Rabbit 官网的那几幅图吧
如果忘记了就再看看吧:飞机

关于JMS

Java消息服务(Java Message Service,JMS)应用程序接口是一个 Java 平台中关于面向消息中间件(MOM)的 API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。
Java 消息服务是一个与具体平台无关的 API,绝大多数 MOM 提供商都对 JMS 提供支持。

JSM 的组成有:

  • JMS提供者
    连接面向消息中间件的,JMS接口的一个实现。提供者可以是Java平台的JMS实现,也可以是非Java平台的面向消息中间件的适配器。
  • JMS客户
    生产或消费消息的基于Java的应用程序或对象。
  • JMS生产者
    创建并发送消息的JMS客户。
  • JMS消费者
    接收消息的JMS客户。
  • JMS消息
    包括可以在JMS客户之间传递的数据的对象
  • JMS队列
    一个容纳那些被发送的等待阅读的消息的区域。队列暗示,这些消息将按照顺序发送。一旦一个消息被阅读,该消息将被从队列中移走。
  • JMS主题

    一种支持发送消息给多个订阅者的机制。

Java 消息服务应用程序结构支持两种模型:

  1. 点对点或队列模型
    包含生产者和消费者,队列中的消息只能被一个消费者消费,消费者可以随时消费消息
    每一个连接都依次平均分担消息队列中的消息(即使一个应用建立了两个连接)
  2. 发布/订阅模型
    包括发布者和订阅者,主题中的消息会被所有的订阅者消费,消费者不能消费在订阅之前的消息
    每一个连接都会收到主题中完整的消息

关于架构等详细信息可参考 wiki :
https://zh.wikipedia.org/wiki/Java%E6%B6%88%E6%81%AF%E6%9C%8D%E5%8A%A1

中间件&AMQP

中间件(英语:Middleware),是提供系统软件和应用软件之间连接的软件,以便于软件各部件之间的沟通,特别是应用软件对于系统软件的集中的逻辑,在现代信息技术应用框架如Web服务、面向服务的体系结构等中应用比较广泛。
如数据库、Apache的Tomcat,IBM公司的WebSphere,BEA公司的WebLogic应用服务器,东方通公司的Tong系列中间件,以及Kingdee公司的等都属于中间件。
或者简单说就是:非底层操作系统软件,非业务应用软件,不是直接给最终用户使用的,不能直接给用户带来价值的软件统称为中间件


高级消息队列协议(AMQP)是一个异步消息传递所使用的应用层协议规范。
作为线路层协议,而不是 API(例如 JMS),AMQP 客户端能够无视消息的来源任意发送和接受信息。现在,已经有相当一部分不同平台的服务器和客户端可以投入使用。

注意:JMS 是规范(针对 Java),AMQP 是协议

然后来张图来比较,JMS 和 AMQP:

然后是市面上常见的一些 MQ 方案:
MQ.png

ActiveMQ使用

导入依赖就不用多说了,下面的 Java 代码中的使用(以主题订阅模式为例),先来消息发布者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private static final String URL = "tcp://127.0.0.1:61616";
private static final String TOPIC_NAME = "topic-test";

public static void main(String[] args) throws JMSException {
// 1.创建 ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);

// 2.创建连接
Connection connection = connectionFactory.createConnection();

// 3.启动连接
connection.start();

// 4.创建会话
// 第一个参数是是否在事务中,第二个是自动提交
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 5.创建一个目标(只需要修改这里)
Destination destination = session.createTopic(TOPIC_NAME);

// 6.创建一个生产者
MessageProducer producer = session.createProducer(destination);

// 7.创建消息/发送消息
TextMessage message = session.createTextMessage("testMessage");
producer.send(message);

// 8.关闭连接
connection.close();
}

然后是消息订阅者,在此模式下订阅之前发的消息是没法接收到的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private static final String URL = "tcp://127.0.0.1:61616";
private static final String TOPIC_NAME = "topic-test";

public static void main(String[] args) throws JMSException {
// 1.创建 ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);

// 2.创建连接
Connection connection = connectionFactory.createConnection();

// 3.启动连接
connection.start();

// 4.创建会话
// 第一个参数是是否在事务中,第二个是自动提交
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

// 5.创建一个目标
Destination destination = session.createTopic(TOPIC_NAME);

// 6.创建一个消费者
MessageConsumer consumer = session.createConsumer(destination);

// 7.创建监听器(异步监听)
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage msg = (TextMessage) message;
try {
System.out.println(msg.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});

// 9.关闭连接(监听器异步,应该在程序退出时关闭)
// connection.close();
}

并且接收消息的过程是异步的,所以不要马上 close;
完整的代码见 GitHub

关于集群

集群总的来说就是为了实现高可用和负载均衡,以 ActiveMQ 为例,集群不只是在服务器端配置,客户端也需要支持;在ActiveMQ 中提供了失效转移的支持,URL 类似于:
failover:(tcp://127.0.0.1:61617,tcp://192.168.1.11:61617)?randomize=true
在服务器方面,想要实现负载均衡就要保证服务器之间的消息同步,采用的是 “网络连接器”的方式,用于服务器的透传消息,分为静态连接器和动态连接器(用网址来代替写死的 IP 地址)。
对于高可用服务器的方案一般有两种:

  • 共享存储集群
    采用集群共享一份持久化数据(使用 NAS 或者 JDBC),服务器获取排它锁来独占资源的方式,当此服务器宕机时会释放(配合客户的的失效转移机制)备用服务器会获取锁成为新的 Master
  • 基于复制的 LeveIDB Store
    至少需要三台来保证稳定性,还用到了 ZooKeeper(ZK 本身也需要三台来保证自己的稳定性)
    也就是 Master 由 ZK 来选举,然后通过 ZK 来实现各服务器之间的消息同步

他们能达到高可用,但是实现不了负载均衡,想要同时实现就需要进行一些改造。
还有一些其他的问题解决方案:
实现每个系统消费各自的消息可以使用 ActiveMQ 提供的虚拟主题功能;
解决消息发送的一致性问题可以使用 JMS 中的 XA 系列接口;
解决幂等性的问题,方案和上面一样,使用本地事务或者内存日志

JMS 中的 XA 协议常用于分布式事务,因为效率较低所以不太使用,或者还可以使用本地事务、内存日志解决(都要配合消息补偿机制)
解决这些问题一般分段考虑比较好。

幂等性就是指处理一次和多次的消息最终的效果是一样的。
HTTP方法的幂等性是指一次和多次请求某一个资源应该具有同样的副作用

为了解决代码过于复杂和复用,可以使用“基于消息机制的事件总线”,简单说 EDA (事件驱动架构)就是:有事你叫我,没事别烦我,这样一般就需要先在事件总线上注册,事件总线一般还需要包含消息提供者(各种 MQ 的实现)。
可以尝试面向服务的架构

Spring集成

主要涉及的有:
ConnectionFactory:用于管理连接的连接工厂;Spring 提供了两个具体的类,SingleConnectionFactory 和 CachingConnectionFactory ,后者是带有缓存功能的。
JmsTemplate:用于发送和接收消息的模板类,它是线程安全的
MessageListerner:消息监听器
使用时记得引入 Spring-jms 依赖,具体的相关依赖有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<!-- Java JMS 原生API -->
<dependency>
<groupId>javax.jms</groupId>
<artifactId>javax.jms-api</artifactId>
<version>2.0</version>
</dependency>
<!-- spring-jms API -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency>
<!-- active-mq核心包 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>

接下来就是在 Spring 的配置文件中进行配置 Bean 了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
<!-- 配置连接ActiveMQ的ConnectionFactory -->
<bean id="amqConnectionFactory"
class="org.apache.activemq.ActiveMQConnectionFactory"
p:brokerURL="tcp://localhost:61616"/>

<!--为了提高效率,配置一个连接池-->
<bean id="cachedConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory"
p:targetConnectionFactory-ref="amqConnectionFactory"
p:sessionCacheSize="10"/>

<!--配置队列-->
<bean id="destination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="${queue.name}"/>
</bean>

<!--配置主题-->
<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="${topic.name}"/>
</bean>

<!-- **************配置消息生产者************* -->
<!--点对点模型-->
<bean id="queueJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connectionFactory"/>
<!--消息持久-->
<property name="deliveryPersistent" value="true"/>
<property name="defaultDestination" ref="destination"/>
<property name="pubSubDomain" value="false"/>
</bean>

<!--发布/订阅模型-->
<bean id="topicJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connectionFactory"/>
<!--消息持久-->
<property name="deliveryPersistent" value="true"/>
<!--目的地-->
<property name="defaultDestination" ref="destinationTopic"/>
<!--订阅模型 -->
<property name="pubSubDomain" value="true"/>
</bean>

<!-- **************配置消息消费者************* -->
<!-- 配置消息队列监听者(Queue) -->
<bean id="queueMessageListener" class="com.bfchengnuo.Filter.QueueMessageListener" />

<!-- 显示注入消息监听容器(Queue),配置连接工厂,监听器是上面定义的监听器 -->
<bean id="queueListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="cachedConnectionFactory" />
<property name="destination" ref="destination" />
<property name="messageListener" ref="queueMessageListener" />
</bean>

消费者和生产者的配置最好是分开来放,可以抽取相同的配置到独立的文件再利用 include 导入
Java 代码的使用,也分为两个角色,写在一起了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 接收消息,记得配置到 IOC 容器中
public class MyQueueMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
try {
TextMessage textMessage = (TextMessage) message;
System.out.println("MyQueueMessageListener收到消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}

// 发送消息 Destination 已注入
public void sendMessage(final String msg){
// String destination = jmsTemplate.getDefaultDestinationName();
System.out.println(Thread.currentThread().getName()+" 向队列"+destination+"发送消息--->"+msg);
jmsTemplate.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
return session.createTextMessage(msg);
}
});
}

实际使用时记得把它们给分开,更改模式只需要在 Destination 注入的时候选择合适的模式即可,其他的地方不需要修改

更多内容:
https://my.oschina.net/thinwonton/blog/889805
http://www.cnblogs.com/jaycekon/p/ActiveMq.html

评论框加载失败,无法访问 Disqus

你可能需要魔法上网~~