spring:
cloud:
stream:
bindings:
my_output: #自定义的输出通道
destination: zzz #交换机名
content-type: application/json
my_input: #自定义的输入通道
destination: zzz
content-type: application/json
group: one #通过这个指定队列名,zzz.one
rabbit:
bindings:
my_output:
producer: #设置生产者的参数
exchangeType: topic
routing-key-expression: '''test.one'''
my_input:
consumer: #设置消费者的参数
bindingRoutingKey: test.one
acknowledgeMode: manual
public interface StreamMessageChanel {
String MY_OUTPUT = "my_output";
String MY_INPUT = "my_input";
@Output(StreamMessageChanel.MY_OUTPUT)
MessageChannel sendMessage();
@Input(StreamMessageChanel.MY_INPUT)
SubscribableChannel dealMessage();
}
@Component
@EnableBinding(StreamMessageChanel.class)
public class InputOutBean {
@Resource
private StreamMessageChanel streamMessageChanelImpl;
@StreamListener(StreamMessageChanel.MY_INPUT)
@SendTo(StreamMessageChanel.MY_OUTPUT)
public String input(Message<String> message, @Header(AmqpHeaders.CHANNEL) Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) {
try {
System.out.println("接收到消息!!!!!!!");
channel.basicAck(deliveryTag, false);
System.out.println(message.getPayload().toString());
} catch (Exception e) {
System.out.println("error");
}
return "循环!!!";
}
public void output(String data) {
System.out.println("发送消息-------------------");
streamMessageChanelImpl.sendMessage().send(MessageBuilder.withPayload(data).build());
}
}
@RestController
@RequestMapping("/queue")
public class TestMqController {
@Resource
private InputOutBean inputOutBean;
@GetMapping("/one")
public KeyValue one() {
inputOutBean.output("测试队列");
return KeyValue.ok("over");
}
}

Spring Cloud Strenm 整合 Rabbit MQ
© 本文著作权归作者所有,转载前请务必署名