本文利用spring cloud集成消息服务,建立消息生产者与消息消费者,与消息服务器通信
首先需要搭建rabbit消息服务器,这里作为测试,利用在线服务https://www.cloudamqp.com
然后建立消息生产者微服务cloud-stream-producer-rabbitmq
消息生产者微服务需要引入依赖spring-cloud-stream、spring-cloud-stream-binder-rabbit,其pom.xml文件如下;1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 <?xml version="1.0" encoding="UTF-8"?> <project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion > 4.0.0</modelVersion > <parent > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-parent</artifactId > <version > 2.1.8.RELEASE</version > <relativePath /> </parent > <groupId > com.ralab</groupId > <artifactId > cloud-stream-producer-rabbitmq</artifactId > <version > 0.0.1-SNAPSHOT</version > <name > cloud-stream-producer-rabbitmq</name > <description > Demo project for Spring Boot</description > <properties > <java.version > 1.8</java.version > <spring-cloud.version > Greenwich.SR3</spring-cloud.version > </properties > <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-amqp</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > </dependency > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-stream</artifactId > </dependency > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-stream-binder-rabbit</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > <scope > test</scope > </dependency > <dependency > <groupId > org.springframework.amqp</groupId > <artifactId > spring-rabbit-test</artifactId > <scope > test</scope > </dependency > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-stream-test-support</artifactId > <scope > test</scope > </dependency > </dependencies > <dependencyManagement > <dependencies > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-dependencies</artifactId > <version > ${spring-cloud.version}</version > <type > pom</type > <scope > import</scope > </dependency > </dependencies > </dependencyManagement > <build > <plugins > <plugin > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-maven-plugin</artifactId > </plugin > </plugins > </build > </project >
User消息类1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 package com.ralab.cloudstreamproducerrabbitmq;import com.fasterxml.jackson.annotation.JsonIgnoreProperties;@JsonIgnoreProperties (ignoreUnknown = true )public class User { private String userName; private String addr; private int age; public User () {} public User (String userName, String addr, int age) { this .userName = userName; this .addr = addr; this .age = age; } public String getUserName () { return userName; } public void setUserName (String userName) { this .userName = userName; } public String getAddr () { return addr; } public void setAddr (String addr) { this .addr = addr; } public int getAge () { return age; } public void setAge (int age) { this .age = age; } }
ProducerController3消息生产类1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 package com.ralab.cloudstreamproducerrabbitmq;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.cloud.stream.annotation.EnableBinding;import org.springframework.cloud.stream.messaging.Source;import org.springframework.messaging.Message;import org.springframework.messaging.MessageChannel;import org.springframework.messaging.support.MessageBuilder;import org.springframework.web.bind.annotation.PostMapping;import org.springframework.web.bind.annotation.RequestBody;import org.springframework.web.bind.annotation.RestController;@RestController @EnableBinding (Source.class)public class ProducerController3 { @Autowired @Qualifier (Source.OUTPUT) private MessageChannel userChannel; @PostMapping ("/produce" ) public void publish3 (@RequestBody User user) { Message<User> msg = MessageBuilder.withPayload(user).build(); this .userChannel.send(msg); } }
application.properties配置1 2 3 4 spring.rabbitmq.addresses=amqp://byqzaxwz:Tlrt32CSQYMHEeitFVUB43wkCSsiUlwo@hornet.rmq.cloudamqp.com/byqzaxwz spring.cloud.stream.bindings.output.destination = user server.port=8080
运行该应用CloudStreamProducerRabbitmqApplication
同时建立消息消费者微服务cloud-stream-consumer-rabbitmq,其pom.xml文件如下:1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 <?xml version="1.0" encoding="UTF-8"?> <project xmlns ="http://maven.apache.org/POM/4.0.0" xmlns:xsi ="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd" > <modelVersion > 4.0.0</modelVersion > <parent > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-parent</artifactId > <version > 2.1.8.RELEASE</version > <relativePath /> </parent > <groupId > com.ralab</groupId > <artifactId > cloud-stream-consumer-rabbitmq</artifactId > <version > 0.0.1-SNAPSHOT</version > <name > cloud-stream-consumer-rabbitmq</name > <description > Demo project for Spring Boot</description > <properties > <java.version > 1.8</java.version > <spring-cloud.version > Greenwich.SR3</spring-cloud.version > </properties > <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-amqp</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > </dependency > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-stream</artifactId > </dependency > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-stream-binder-rabbit</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > <scope > test</scope > </dependency > <dependency > <groupId > org.springframework.amqp</groupId > <artifactId > spring-rabbit-test</artifactId > <scope > test</scope > </dependency > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-stream-test-support</artifactId > <scope > test</scope > </dependency > </dependencies > <dependencyManagement > <dependencies > <dependency > <groupId > org.springframework.cloud</groupId > <artifactId > spring-cloud-dependencies</artifactId > <version > ${spring-cloud.version}</version > <type > pom</type > <scope > import</scope > </dependency > </dependencies > </dependencyManagement > <build > <plugins > <plugin > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-maven-plugin</artifactId > </plugin > </plugins > </build > </project >
同样需要添加User消息类,与消息生产者微服务相同 添加消息消费类UserListener1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 package com.ralab.cloudstreamconsumerrabbitmq;import org.springframework.cloud.stream.annotation.EnableBinding;import org.springframework.cloud.stream.annotation.StreamListener;import org.springframework.cloud.stream.messaging.Sink;@EnableBinding (Sink.class)public class UserListener { @StreamListener (target = Sink.INPUT) public void processUserChannelGreeting (User user) { System.out.println(user.getUserName()+":" +user.getAddr()+":" +user.getAge()); } }
application.properties配置1 2 3 spring.rabbitmq.addresses=amqp://byqzaxwz:Tlrt32CSQYMHEeitFVUB43wkCSsiUlwo@hornet.rmq.cloudamqp.com/byqzaxwz spring.cloud.stream.bindings.input.destination = user server.port=9090
运行CloudStreamConsumerRabbitmqApplication 查看rabbitmq消息服务器 测试发送消息 然后就可以看到在消息消费者微服务的控制台输出消息