{"id":1731,"date":"2023-03-26T19:59:04","date_gmt":"2023-03-26T11:59:04","guid":{"rendered":"https:\/\/www.appblog.cn\/?p=1731"},"modified":"2023-04-23T21:33:18","modified_gmt":"2023-04-23T13:33:18","slug":"implement-mq-integration-with-spring-cloud-stream","status":"publish","type":"post","link":"https:\/\/www.appblog.cn\/index.php\/2023\/03\/26\/implement-mq-integration-with-spring-cloud-stream\/","title":{"rendered":"Spring Cloud Stream \u5b9e\u73b0MQ\u96c6\u6210"},"content":{"rendered":"<p><code>Spring Cloud Stream<\/code>\u662f\u4e00\u4e2a\u6784\u5efa\u6d88\u606f\u9a71\u52a8\u5fae\u670d\u52a1\u7684\u6846\u67b6\uff0c\u5e94\u7528\u7a0b\u5e8f\u901a\u8fc7<code>input<\/code>\u901a\u9053\u6216\u8005<code>output<\/code>\u901a\u9053\u6765\u4e0e<code>Spring Cloud Stream<\/code>\u4e2d<code>binder<\/code>\u4ea4\u4e92\uff0c\u901a\u8fc7\u914d\u7f6e\u6765<code>binding<\/code>\u3002\u800c<code>Spring Cloud Stream<\/code>\u7684<code>binder<\/code>\u8d1f\u8d23\u4e0e\u4e2d\u95f4\u4ef6\u4ea4\u4e92\uff0c\u6d88\u606f\u7684\u4e2d\u95f4\u4ef6\u6709(<code>RabbitMQ<\/code>, <code>Kafka<\/code>, <code>ActiveMQ<\/code>)\u3002<\/p>\n<p>\u672c\u6587\u4ee5<code>RabbitMQ<\/code>\u4f5c\u4e3a\u4e2d\u95f4\u4ef6\u4f5c\u4e3a\u8bb2\u89e3<\/p>\n<p><!-- more --><\/p>\n<h2>Producer<\/h2>\n<p>\uff081\uff09\u901a\u8fc7IDEA\u7684spring init\u65b0\u5efaSpring Boot\u9879\u76ee<\/p>\n<p>\u5f15\u5165\u4f9d\u8d56<\/p>\n<pre><code class=\"language-yml\">&lt;dependency&gt;\n    &lt;groupId&gt;org.projectlombok&lt;\/groupId&gt;\n    &lt;artifactId&gt;lombok&lt;\/artifactId&gt;\n&lt;\/dependency&gt;\n&lt;dependency&gt;\n    &lt;groupId&gt;org.springframework.cloud&lt;\/groupId&gt;\n    &lt;artifactId&gt;spring-cloud-starter-stream-rabbit&lt;\/artifactId&gt;\n&lt;\/dependency&gt;<\/code><\/pre>\n<p>\u914d\u7f6eyml\u6587\u4ef6<\/p>\n<pre><code class=\"language-yml\">server:\n  port: 8081\nspring:\n  application:\n    name: producer\n  cloud:\n    stream:\n      bindings:\n        output1:\n          destination: Theme-People #\u53ef\u4ee5\u7406\u89e3\u4e3a\u4e00\u4e2a\u548c\u8ba2\u9605\u8005\u7684\u901a\u9053\n          group: Group-Boy\n          content-type: application\/json   \n          producer:\n            partitionKeyExpression: headers[&#039;partitionKey&#039;]\n            partitionCount: 2<\/code><\/pre>\n<p>\uff082\uff09\u65b0\u5efa\u4e00\u4e2a\u5b9e\u4f53\u7c7b\u7528\u4e8e\u53d1\u9001<\/p>\n<pre><code class=\"language-java\">import lombok.AllArgsConstructor;\nimport lombok.Data;\nimport lombok.NoArgsConstructor;\nimport lombok.ToString;\n\n@Data\n@AllArgsConstructor\n@NoArgsConstructor\n@ToString\npublic class Person {\n\n    private String id;\n\n    private String name;\n\n    private int age;\n\n    private String password;\n}<\/code><\/pre>\n<p>\uff083\uff09\u65b0\u5efa\u8f93\u51fa\u901a\u9053<\/p>\n<pre><code class=\"language-java\">import org.springframework.cloud.stream.annotation.Output;\nimport org.springframework.messaging.MessageChannel;\nimport org.springframework.stereotype.Component;\n\n@Component\npublic interface OutputInterface {\n    String OUTPUT1 = &quot;output1&quot;;\n    String OUTPUT2 = &quot;output2&quot;;\n    \/\/\u521b\u5efa\u4e00\u4e2a\u8f93\u51fa\u901a\u9053 output1\n    @Output(OUTPUT1)\n    MessageChannel output1();\n}<\/code><\/pre>\n<p>\uff084\uff09\u65b0\u5efa\u4e00\u4e2aService\u7528\u57df\u5904\u7406\u53d1\u9001\u6d88\u606f<\/p>\n<pre><code class=\"language-java\">import com.cloud.cloud_streaming.vo.Person;\nimport lombok.extern.slf4j.Slf4j;\nimport org.springframework.beans.factory.annotation.Autowired;\nimport org.springframework.beans.factory.annotation.Qualifier;\nimport org.springframework.messaging.MessageChannel;\nimport org.springframework.messaging.support.MessageBuilder;\nimport org.springframework.stereotype.Component;\n\n@Slf4j\n@Component\npublic class StreamService {\n\n    @Autowired\n    @Qualifier(OutputInterface.OUTPUT1)\n    MessageChannel channel1;\n\n    public void sendMessage(String channel) {\n        String message = &quot;Send Message Manually,From Channel:&quot; + channel;\n        log.info( &quot;Send Message from channel:&quot; + channel );\n        switch (channel) {\n            case OutputInterface.OUTPUT1:\n                channel1.send( MessageBuilder.withPayload( message ).build() );\n                return;\n            default:\n                log.info( &quot;\u53c2\u6570\u9519\u8bef: &quot; + channel );\n                return;\n        }\n    }\n}<\/code><\/pre>\n<p>\uff085\uff09\u65b0\u5efa\u4e00\u4e2a\u5b9a\u65f6\u4efb\u52a1\u7c7b<\/p>\n<pre><code class=\"language-java\">import com.cloud.cloud_streaming.vo.Person;\nimport lombok.extern.slf4j.Slf4j;\nimport org.springframework.beans.factory.annotation.Autowired;\nimport org.springframework.beans.factory.annotation.Qualifier;\nimport org.springframework.cloud.stream.annotation.EnableBinding;\nimport org.springframework.messaging.Message;\nimport org.springframework.messaging.MessageChannel;\nimport org.springframework.messaging.support.MessageBuilder;\nimport org.springframework.scheduling.annotation.EnableScheduling;\nimport org.springframework.scheduling.annotation.Scheduled;\n\n@Slf4j\n@EnableBinding(value = {OutputInterface.class})\n@EnableScheduling\npublic class MessageSender {\n\n    @Autowired\n    private OutputInterface outputInterface;\n\n    \/**\n     * \u7b2c\u4e00\u79cd\u65b9\u6cd5, \u6ca1\u6709\u6307\u5b9aoutput\u7684MessageChannel, \u901a\u8fc7OutputInterface\u53bb\u62ff\u5177\u4f53\u7684Channel\n     * \u8bbe\u7f6epartitionKey\u4e3b\u8981\u662f\u4e3a\u4e86\u5206\u533a\u7528, \u53ef\u4ee5\u6839\u636e\u6839\u636e\u8fd9\u4e2apartitionKey\u6765\u5206\u533a\n\u91c7\u7528\u5b9a\u65f6\u4efb\u52a1\u53bb\u53d1\u9001\n     *\/\n    @Scheduled(initialDelay = 1000, fixedRate = 5000)\n    public void sendMessageMethod1() {\n        Message message = MessageBuilder.withPayload( &quot;From sendMessageMethod1&quot; )\n                .setHeader( &quot;partitionKey&quot;, 1 )\n                .build();\n\n        outputInterface.output1().send( message );\n    }\n\n    \/**\n     * \u7b2c\u4e8c\u79cd\u65b9\u6cd5, \u76f4\u63a5\u6307\u5b9aoutput\u7684MessageChannel\n     *\/\n\/\/    @Autowired\n\/\/    @Qualifier(OutputInterface.OUTPUT2)\n\/\/    MessageChannel output;\n\/\/\n\/\/    @Scheduled(initialDelay = 2000, fixedRate = 4000)\n\/\/    public void sendMessageMethod2() {\n\/\/        Person p = new Person();\n\/\/        p.setName( &quot;Person2&quot; );\n\/\/        p.setAge( 1 );\n\/\/\n\/\/        output.send( MessageBuilder.withPayload( p )\n\/\/                .setHeader( &quot;partitionKey&quot;, 2 )\n\/\/                .build() );\n\/\/    }\n}<\/code><\/pre>\n<p>\uff086\uff09Controller\u5c42<\/p>\n<pre><code class=\"language-java\">import lombok.extern.slf4j.Slf4j;\nimport org.springframework.beans.factory.annotation.Autowired;\nimport org.springframework.web.bind.annotation.GetMapping;\nimport org.springframework.web.bind.annotation.RestController;\n\n@RestController\n@Slf4j\npublic class StreamController {\n    @Autowired\n    StreamService streamService;\n\n    @GetMapping(&quot;\/sendMessageByChannalName&quot;)\n    public void sendMessage(String channel) {\n        streamService.sendMessage( channel );\n    }\n}<\/code><\/pre>\n<h2>Cusomer<\/h2>\n<p>\uff081\uff09\u5f15\u5165\u4f9d\u8d56<\/p>\n<pre><code class=\"language-xml\">&lt;dependency&gt;\n    &lt;groupId&gt;org.projectlombok&lt;\/groupId&gt;\n    &lt;artifactId&gt;lombok&lt;\/artifactId&gt;\n&lt;\/dependency&gt;\n&lt;dependency&gt;\n    &lt;groupId&gt;org.springframework.cloud&lt;\/groupId&gt;\n    &lt;artifactId&gt;spring-cloud-starter-stream-rabbit&lt;\/artifactId&gt;\n    &lt;version&gt;2.1.2.RELEASE&lt;\/version&gt;\n&lt;\/dependency&gt;<\/code><\/pre>\n<p>\uff082\uff09\u914d\u7f6eyml<\/p>\n<pre><code class=\"language-yml\">spring:\n  application:\n    name: consumer\n  cloud:\n    stream:\n      bindings:\n        input1:\n          destination: Theme-Animal  #h\u5bf9\u5e94\u7684\u8f93\u51fa\u901a\u9053 \u540c\u4e00\u4e2a\u901a\u9053\n          group: Group-Boy\n          content-type: application\/json\n          consumer:\n            partitioned: true #\u662f\u5426\u5206\u533a\n      instance-index: 0  #\u7528\u90a3\u4e2a\u5206\u533a\u6765\u63a5\u6536\u6d88\u606f\n      instance-count: 2  #\u5206\u533a\u6570<\/code><\/pre>\n<p>\uff083\uff09\u8f93\u5165\u7c7b<\/p>\n<pre><code class=\"language-java\">import org.springframework.cloud.stream.annotation.Input;\nimport org.springframework.messaging.SubscribableChannel;\nimport org.springframework.stereotype.Component;\n\n@Component\npublic interface InputInterface {\n    String INPUT1 = &quot;input1&quot;;\n\n    @Input(INPUT1)\n    SubscribableChannel input1();\n}<\/code><\/pre>\n<p>\uff084\uff09\u540c\u6837\u65b0\u5efa\u4e00\u4e2a\u5b9e\u4f53\u7c7b\uff0c\u56e0\u4e3a\u91c7\u7528\u7684\u662f\u53d1\u9001\u5b9e\u4f53<\/p>\n<pre><code class=\"language-java\">import lombok.AllArgsConstructor;\nimport lombok.Data;\nimport lombok.NoArgsConstructor;\nimport lombok.ToString;\n\n@Data\n@AllArgsConstructor\n@NoArgsConstructor\n@ToString\npublic class Person {\n    private String name;\n\n    private int age;\n}<\/code><\/pre>\n<p>\uff085\uff09\u63a5\u53d7\u6d88\u606f\u7684\u5904\u7406\u5c42<\/p>\n<pre><code class=\"language-java\">import lombok.extern.slf4j.Slf4j;\nimport org.springframework.amqp.support.AmqpHeaders;\nimport org.springframework.cloud.stream.annotation.EnableBinding;\nimport org.springframework.cloud.stream.annotation.StreamListener;\nimport org.springframework.messaging.Message;\nimport org.springframework.messaging.handler.annotation.Header;\nimport org.springframework.messaging.handler.annotation.Payload;\n\n@Slf4j\n@EnableBinding(value = {InputInterface.class})\npublic class MessageReceiver {\n    @StreamListener(InputInterface.INPUT1)\n    public void receiveMessageFromChannel1(@Payload Message &lt;String&gt; payload, @Header(AmqpHeaders.CONSUMER_QUEUE) String partition) {\n        log.info( &quot;ReceiveMessageFrom INPUT1, message: {}, Queue:{}&quot;, payload.getPayload(), partition );\n    }\n}<\/code><\/pre>\n<h2>\u542f\u52a8<\/h2>\n<ul>\n<li>\u542f\u52a8<code>Producer<\/code>\u7684project<\/li>\n<li>\u518d\u542f\u52a8<code>Cusomer<\/code>\u7684project<\/li>\n<\/ul>\n","protected":false},"excerpt":{"rendered":"<p>Spring Cloud Stream\u662f\u4e00\u4e2a\u6784\u5efa\u6d88\u606f\u9a71\u52a8\u5fae\u670d\u52a1\u7684\u6846\u67b6\uff0c\u5e94\u7528\u7a0b\u5e8f\u901a\u8fc7input\u901a\u9053\u6216\u8005outpu [&hellip;]<\/p>\n","protected":false},"author":1,"featured_media":0,"comment_status":"open","ping_status":"open","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[42],"tags":[435],"class_list":["post-1731","post","type-post","status-publish","format-standard","hentry","category-spring-cloud","tag-mq"],"_links":{"self":[{"href":"https:\/\/www.appblog.cn\/index.php\/wp-json\/wp\/v2\/posts\/1731","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/www.appblog.cn\/index.php\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/www.appblog.cn\/index.php\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/www.appblog.cn\/index.php\/wp-json\/wp\/v2\/users\/1"}],"replies":[{"embeddable":true,"href":"https:\/\/www.appblog.cn\/index.php\/wp-json\/wp\/v2\/comments?post=1731"}],"version-history":[{"count":0,"href":"https:\/\/www.appblog.cn\/index.php\/wp-json\/wp\/v2\/posts\/1731\/revisions"}],"wp:attachment":[{"href":"https:\/\/www.appblog.cn\/index.php\/wp-json\/wp\/v2\/media?parent=1731"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/www.appblog.cn\/index.php\/wp-json\/wp\/v2\/categories?post=1731"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/www.appblog.cn\/index.php\/wp-json\/wp\/v2\/tags?post=1731"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}