SpringCloud 实战二十四:Stream消息驱动

  作者:记性不好的阁主

生产者:cloud-stream-rabbitmq-provider8801


1、引入依赖:


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cloud2020</artifactId>
<groupId>com.laoxu.springcloud</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>cloud-stream-rabbitmq-provider8801</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.laoxu.springcloud</groupId>
<artifactId>cloud-api-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>

</project>


2、配置端口


server:
port: 8801



spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders:
defaultRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: 47.56.236.87
port: 5672
username: admin
password: admin
bindings:
output:
destination: studyExchange
content-type: applicaion/json
binder: defaultRabbit

eureka:
client:
service-url:
defaultZone: http://eureka7001.com:7001/eureka, http://eureka7002.com:7002/eureka






3、启动类


package com.laoxu.springcloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class StreamMQMain8801 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8801.class, args);
}
}


4、创建消息发送接口


package com.laoxu.springcloud.service;

public interface IMessageProvider {
String send();
}


package com.laoxu.springcloud.service.Impl;

import com.laoxu.springcloud.service.IMessageProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;


import java.util.UUID;

@EnableBinding(Source.class) // 定义消息的推送管道
public class MessageProviderImpl implements IMessageProvider {

@Autowired
private MessageChannel output; //消息发送管道

public String send() {
String serial = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build());
System.out.println("******serial " + serial);
return null;
}
}



5、控制器测试


package com.laoxu.springcloud.controller;

import com.laoxu.springcloud.service.IMessageProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class SendMessageController {

@Autowired
private IMessageProvider messageProvider;

@GetMapping("/sendMessage")
public String sendMessage(){
return messageProvider.send();
}
}



6、访问:http://localhost:8801/sendMessage



可以看到发送消息到rabbitmq成功


消费者:cloud-stream-rabbitmq-consumer8802


1、引入依赖


<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cloud2020</artifactId>
<groupId>com.laoxu.springcloud</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>cloud-stream-rabbitmq-consumer8802</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.laoxu.springcloud</groupId>
<artifactId>cloud-api-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>

</project>



2、配置端口


server:
port: 8802



spring:
application:
name: cloud-stream-consumer

cloud:
stream:
binders:
defaultRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: 47.56.236.87
port: 5672
username: admin
password: admin
bindings:
input:
destination: studyExchange
content-type: application/json
binder: defaultRabbit


eureka:
client:
service-url:
defaultZone: http://eureka7001.com:7001/eureka, http://eureka7002.com:7002/eureka



3、编写业务类接收消息生产者发送的消息


package com.laoxu.springcloud.controller;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
@Value("${server.port}")
private String serverPort;

@StreamListener(Sink.INPUT)
public void input(Message<String> message){
System.out.println("消费者1号,------>接收到消息:" + message.getPayload() + "\t" + "port: "+serverPort);
}

}


4、启动服务




5、访问:http://localhost:8801/sendMessage




可以在8802消费者端看到日志输出!



相关推荐

评论 抢沙发

表情

分类选择