Spring Cloud Strenm 整合 Rabbit MQ

Spring Cloud Strenm 整合 Rabbit MQ

20190612172318397.png

spring:
  cloud:
    stream:
      bindings:
        my_output:  #自定义的输出通道
          destination: zzz  #交换机名
          content-type: application/json
        my_input:   #自定义的输入通道
          destination: zzz
          content-type: application/json
          group: one   #通过这个指定队列名,zzz.one
      rabbit:
        bindings:
          my_output:
            producer: #设置生产者的参数
              exchangeType: topic
              routing-key-expression: '''test.one'''
          my_input:
            consumer:  #设置消费者的参数
              bindingRoutingKey: test.one
              acknowledgeMode: manual





public interface StreamMessageChanel {
    String MY_OUTPUT = "my_output";
    String MY_INPUT = "my_input";

    @Output(StreamMessageChanel.MY_OUTPUT)
    MessageChannel sendMessage();

    @Input(StreamMessageChanel.MY_INPUT)
    SubscribableChannel dealMessage();

}





@Component
@EnableBinding(StreamMessageChanel.class)
public class InputOutBean {
    @Resource
    private StreamMessageChanel streamMessageChanelImpl;
    @StreamListener(StreamMessageChanel.MY_INPUT)
    @SendTo(StreamMessageChanel.MY_OUTPUT)
    public String input(Message<String> message, @Header(AmqpHeaders.CHANNEL) Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) {
        try {
            System.out.println("接收到消息!!!!!!!");
            channel.basicAck(deliveryTag, false);
            System.out.println(message.getPayload().toString());
        } catch (Exception e) {
            System.out.println("error");
        }
        return "循环!!!";
    }
    public void output(String data) {
        System.out.println("发送消息-------------------");
        streamMessageChanelImpl.sendMessage().send(MessageBuilder.withPayload(data).build());
    }

}





@RestController
@RequestMapping("/queue")
public class TestMqController {

    @Resource
    private InputOutBean inputOutBean;

    @GetMapping("/one")
    public KeyValue one() {
        inputOutBean.output("测试队列");
        return KeyValue.ok("over");
    }
}