spring cloud开发Greenwich版:消息服务(十七)

本文利用spring cloud集成消息服务,建立消息生产者与消息消费者,与消息服务器通信

首先需要搭建rabbit消息服务器,这里作为测试,利用在线服务https://www.cloudamqp.com

然后建立消息生产者微服务cloud-stream-producer-rabbitmq

消息生产者微服务需要引入依赖spring-cloud-stream、spring-cloud-stream-binder-rabbit,其pom.xml文件如下;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.8.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.ralab</groupId>
<artifactId>cloud-stream-producer-rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>cloud-stream-producer-rabbitmq</name>
<description>Demo project for Spring Boot</description>

<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Greenwich.SR3</spring-cloud.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>

User消息类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.ralab.cloudstreamproducerrabbitmq;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;

@JsonIgnoreProperties(ignoreUnknown = true)
public class User {
private String userName;
private String addr;
private int age;

public User(){}
public User(String userName, String addr, int age) {
this.userName = userName;
this.addr = addr;
this.age = age;
}

public String getUserName() {
return userName;
}

public void setUserName(String userName) {
this.userName = userName;
}

public String getAddr() {
return addr;
}

public void setAddr(String addr) {
this.addr = addr;
}

public int getAge() {
return age;
}

public void setAge(int age) {
this.age = age;
}
}

ProducerController3消息生产类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.ralab.cloudstreamproducerrabbitmq;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

@RestController
@EnableBinding(Source.class)
public class ProducerController3 {

@Autowired
@Qualifier(Source.OUTPUT)
private MessageChannel userChannel;

@PostMapping("/produce")
public void publish3(@RequestBody User user) {

Message<User> msg = MessageBuilder.withPayload(user).build();
this.userChannel.send(msg);
//this.binding.greeting().send(msg);
}
}

application.properties配置

1
2
3
4
spring.rabbitmq.addresses=amqp://byqzaxwz:Tlrt32CSQYMHEeitFVUB43wkCSsiUlwo@hornet.rmq.cloudamqp.com/byqzaxwz
spring.cloud.stream.bindings.output.destination = user

server.port=8080

运行该应用CloudStreamProducerRabbitmqApplication

同时建立消息消费者微服务cloud-stream-consumer-rabbitmq,其pom.xml文件如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.8.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.ralab</groupId>
<artifactId>cloud-stream-consumer-rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>cloud-stream-consumer-rabbitmq</name>
<description>Demo project for Spring Boot</description>

<properties>
<java.version>1.8</java.version>
<spring-cloud.version>Greenwich.SR3</spring-cloud.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>

同样需要添加User消息类,与消息生产者微服务相同
添加消息消费类UserListener

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.ralab.cloudstreamconsumerrabbitmq;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;


@EnableBinding(Sink.class)
public class UserListener {

@StreamListener(target = Sink.INPUT)
public void processUserChannelGreeting(User user) {
System.out.println(user.getUserName()+":"+user.getAddr()+":"+user.getAge());
}
}

application.properties配置

1
2
3
spring.rabbitmq.addresses=amqp://byqzaxwz:Tlrt32CSQYMHEeitFVUB43wkCSsiUlwo@hornet.rmq.cloudamqp.com/byqzaxwz
spring.cloud.stream.bindings.input.destination = user
server.port=9090

运行CloudStreamConsumerRabbitmqApplication
查看rabbitmq消息服务器

测试发送消息

然后就可以看到在消息消费者微服务的控制台输出消息

坚持原创技术分享,您的支持是我前进的动力!