微服务篇-C 深入理解第一代微服务(SpringCloud)_IX 深入理解Stream消息驱动

news/2024/7/27 8:01:03/文章来源:https://blog.csdn.net/qq_30056341/article/details/137169087

原创作者:田超凡(程序员田宝宝)

版权所有,引用请注明原作者,严禁复制转载

Part 1 理论部分

1 什么是SpringCloud Stream消息驱动?

SpringCloud Stream消息驱动可以简化开发人员对消息中间件使用的复杂度和耦合度,让开发人员更专注于核心业务逻辑的开发,SpringCloud Stream基于SpringBoot实现,具有自动化配置的功能,类似一些ORM框架,可以平滑切换多种不同的数据库,目前SpringCloud Stream消息驱动仅支持整合RabbitMQ和Kafka消息中间件。

2 SpringCloud Stream消息驱动实现的原理?

通过定义Binder绑定器作为中间层,实现了应用程序和消息中间件之间实现细节的隔离,通过向应用程序暴露统一的Channel通道,可以让应用程序不再需要考虑各种不同的消息中间件实现的兼容性问题,当需要升级消息中间件,或者更换其他的消息中间件产品时,我们需要做的就只是更换对应的Binder绑定器即可,不需要再修改任何应用中对接消息中间件的实现逻辑。

Stream消息驱动中有以下几个核心概念:

1 Source:当需要发送消息时,就需要使用Source来实现,Source会把需要发送的消息(POJO对象)进行序列化(默认转换成JSON格式的字符串),然后将这些数据发送到Channel中。

2 Channel:消息通道是Stream消息驱动的抽象之一,通常我们向消息中间件中发送消息或者消费消息的时候需要指定主题(Topic)名称或消息队列名称,但这样一来,当我们需要变更主题名称的时候就需要修改大量的消息发起方和消息消费方的代码,但是通过使用Channel消息通道,消息发起方和消息消费方的业务代码只需要连接到Channel消息通道就可以了,具体这个Channel消息通道对应的是哪个主题,就可以在配置文件中指定,这样当主题变更的时候我们不需要对代码做任何修改,就实现了业务代码和具体消息中间件的解耦。

3 Binder:Stream消息驱动中的另外一个抽象层,通过不同的Binder可以实现和不同的消息中间件整合,比如针对Kafka的Binder等等,通过Binder提供统一的消息收发接口,我们可以根据实际需要部署不同的消息中间件,或者根据实际生产环境中部署的消息中间件来调整我们的配置。

4 Sink:当需要监听消息时,就需要使用Sink来实现,Sink负责从Channel消息通道中获取消息,并将消息反序列化成消息对象(POJO对象),然后交给具体的消息消费方处理相应的业务逻辑。

Part 2 实践部分

消息驱动环境搭建

生产者环境

Maven依赖信息

    <parent>

        <groupId>org.springframework.boot</groupId>

        <artifactId>spring-boot-starter-parent</artifactId>

        <version>2.0.1.RELEASE</version>

    </parent>

    <dependencies>

        <!-- SpringBoot整合Web组件 -->

        <dependency>

             <groupId>org.springframework.boot</groupId>

             <artifactId>spring-boot-starter-web</artifactId>

        </dependency>

        <dependency>

             <groupId>org.springframework.cloud</groupId>

             <artifactId>spring-cloud-starter-stream-rabbit</artifactId>

             <version>2.0.1.RELEASE</version>

        </dependency>

    </dependencies>

application.yml信息

server:

  port: 9000

spring:

  application:

    name: spingcloud-stream-producer

#  rabbitmq:

#    host: 192.168.112.111

#    port: 5672

#    username: guest

#    password: guest

创建管道

// 创建管道接口

public interface SendMessageInterface {

     // 创建一个输出管道,用于发送消息

     @Output("my_msg")

     SubscribableChannel sendMsg();

}

发送消息

@RestController

public class SendMsgController {

      @Autowired

      private SendMessageInterface sendMessageInterface;

      @RequestMapping("/sendMsg")

      public String sendMsg() {

            String msg = UUID.randomUUID().toString();

            System.out.println("生产者发送内容msg:" + msg);

            Message build = MessageBuilder.withPayload(msg.getBytes()).build();

            sendMessageInterface.sendMsg().send(build);

            return "success";

      }

}

启动服务

@SpringBootApplication

@EnableBinding(SendMessageInterface.class) // 开启绑定

public class AppProducer {

      public static void main(String[] args) {

            SpringApplication.run(AppProducer.class, args);

      }

}

消费者环境

Maven

<parent>

            <groupId>org.springframework.boot</groupId>

            <artifactId>spring-boot-starter-parent</artifactId>

            <version>2.0.1.RELEASE</version>

      </parent>

      <dependencies>

            <!-- SpringBoot整合Web组件 -->

            <dependency>

                  <groupId>org.springframework.boot</groupId>

                  <artifactId>spring-boot-starter-web</artifactId>

            </dependency>

            <dependency>

                  <groupId>org.springframework.cloud</groupId>

                  <artifactId>spring-cloud-starter-stream-rabbit</artifactId>

                  <version>2.0.1.RELEASE</version>

            </dependency>

      </dependencies>

application.yml

server:

  port: 9000

spring:

  application:

    name: spingcloud-stream-consumer

#  rabbitmq:

#    host: 192.168.112.111

#    port: 5672

#    username: guest

#    password: guest

管道中绑定消息

public interface RedMsgInterface {

      // 从管道中获取消息

      @Input("my_msg")

      SubscribableChannel redMsg();

}

消费者获取消息

@Component

public class Consumer {

      @StreamListener("my_msg")

      public void listener(String msg) {

            System.out.println("消费者获取生产消息:" + msg);

      }

}

启动消费者

@SpringBootApplication

@EnableBinding(RedMsgInterface.class)

public class AppConsumer {

      public static void main(String[] args) {

            SpringApplication.run(AppConsumer.class, args);

      }

}

消费组

在现实的业务场景中,每一个微服务应用为了实现高可用和负载均衡,都会集群部署,按照上面我们启动了两个应用的实例,消息被重复消费了两次。为解决这个问题,Spring Cloud Stream 中提供了消费组,通过配置 spring.cloud.stream.bindings.myInput.group 属性为应用指定一个组名,下面修改下配置文件

server:

  port: 8001

spring:

  application:

    name: spring-cloud-stream

#  rabbitmq:

#    host: 192.168.112.111

#    port: 5672

#    username: guest

#    password: guest

  cloud:

    stream:

      bindings:

        mymsg: ###指定 管道名称

          #指定该应用实例属于 stream 消费组

          group: stream

修改消费者

@Component

public class Consumer {

      @Value("${server.port}")

      private String serverPort;

      @StreamListener("my_msg")

      public void listener(String msg) {

            System.out.println("消费者获取生产消息" + msg + "端口:" + serverPort);

      }

}

更改环境为kafka

Maven依赖

            <dependency>

                  <groupId>org.springframework.cloud</groupId>

                  <artifactId>spring-cloud-starter-stream-kafka</artifactId>

                  <version>2.0.1.RELEASE</version>

            </dependency>

生产者配置

server:

  port: 9000

spring:

  cloud:

    stream:

      # 设置成使用kafka

      kafka:

        binder:

          # Kafka的服务端列表,默认localhost

          brokers: 192.168.212.111:9092,192.168.212.112:9092,192.168.212.113:9092

          # Kafka服务端连接的ZooKeeper节点列表,默认localhost

          zkNodes: 192.168.212.111:2181,192.168.212.112:2181,192.168.212.113:2181

          minPartitionCount: 1

          autoCreateTopics: true

          autoAddPartitions: true

消费者配置

server:

  port: 8000

spring:

  application:

    name: springcloud_kafka_consumer

  cloud:

     instance-count: 1

     instance-index: 0

     stream:

        kafka:

          binder:

            brokers: 192.168.212.111:9092,192.168.212.112:9092,192.168.212.113:9092

            zk-nodes: 192.168.212.111:2181,192.168.212.112:2181,192.168.212.113:2181

            auto-add-partitions: true

            auto-create-topics: true

            min-partition-count: 1

        bindings:

          input:

            destination: my_msg

            group: s1

            consumer:

              autoCommitOffset: false

              concurrency: 1

              partitioned: false

本文部分素材转载自蚂蚁课堂

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_1035385.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python中批量修改文件名,去除某些内容

环境&#xff1a;Window10 Python3.9 PyCharm(2023.1.3) -------------------------------------****************** ** *********************----------------------------------------- 这是在Python中批量将指定文件夹下相似的文件名&#xff0c;提取文件名有效信息&am…

[RK3588-Android12] 调试MIPI-双通道-压缩屏(Video Mode/MIPI Dphy 8Lane/DSC 144HZ)

问题描述 被测屏幕&#xff1a;小米Pad6 分辨率&#xff1a;1800X2880 模式&#xff1a;Video Mode/MIPI Dphy 8Lane/DSC 144HZ PPS: 11 00 00 89 30 80 0B 40 03 84 00 14 01 C2 01 C2 02 00 01 F4 00 20 01 AB 00 06 00 0D 05 7A 06 1A 18 00 10 F0 03 0C 20 00 06 0B 0B 33…

SpringBoot+Prometheus+Grafana实现应用监控和报警

一、背景 SpringBoot的应用监控方案比较多&#xff0c;SpringBootPrometheusGrafana是目前比较常用的方案之一。它们三者之间的关系大概如下图&#xff1a; 关系图 二、开发SpringBoot应用 首先&#xff0c;创建一个SpringBoot项目&#xff0c;pom文件如下&#xff1a; <…

Sy6 编辑器vi的应用(+shell脚本3例子)

实验环境&#xff1a; 宿主机为win11&#xff0c;网络&#xff1a;10.255.50.5 6389 WSL2 ubuntu 目标机的OS&#xff1a;Ubuntu 内核、版本如下&#xff1a; linuxpeggy0223:/$ uname -r 5.15.146.1-microsoft-standard-WSL2 linuxpeggy0223:/$ cat /proc/version Linux vers…

MYSQL数据库:告别慢查询,优化性能大揭秘

​&#x1f308; 个人主页&#xff1a;danci_ &#x1f525; 系列专栏&#xff1a;《设计模式》《MYSQL应用》 &#x1f4aa;&#x1f3fb; 制定明确可量化的目标&#xff0c;坚持默默的做事。 MYSQL数据库&#xff1a;告别慢查询&#xff0c;优化性能大揭秘 文章目录 一、揭秘…

Redis慢日志

SLOWLOG 是用来读取和重置 Redis 慢查询日志的命令&#xff0c;Redis 2.2.12 版本开始支持 1.Redis 慢查询日志概述 客户端从发送命令到获取返回结果经过了以下几个步骤&#xff1a; 1. 客户端发送命令 2. 该命令进入 Redis 队列排队等待执行 3. Redis 开始执行命令 - Red…

MySQL 数据库基础操作详解

文章目录 MySQL 数据库基础操作详解1. 基本概念2. 库的操作3. 表的操作4. 数据操作5. 示例示例一&#xff1a;创建表和插入数据示例二&#xff1a;查询数据示例三&#xff1a;更新数据示例四&#xff1a;删除数据 MySQL 数据库基础操作详解 MySQL 是一种常用的关系型数据库管理…

设计模式-结构型-享元模式Flyweight

享元模式的特点&#xff1a; 享元模式可以共享相同的对象&#xff0c;避免创建过多的对象实例&#xff0c;从而节省内存资源 使用场景&#xff1a; 常用于需要创建大量相似的对象的情况 享元接口类 public interface Flyweight { void operate(String extrinsicState); } 享…

计算机网络-TCP/IP 网络模型

TCP/IP网络模型各层的详细描述&#xff1a; 应用层&#xff1a;应用层为应用程序提供数据传输的服务&#xff0c;负责各种不同应用之间的协议。主要协议包括&#xff1a; HTTP&#xff1a;超文本传输协议&#xff0c;用于从web服务器传输超文本到本地浏览器的传送协议。FTP&…

计算机视觉之三维重建(5)---双目立体视觉

文章目录 一、平行视图1.1 示意图1.2 平行视图的基础矩阵1.3 平行视图的极几何1.4 平行视图的三角测量 二、图像校正三、对应点问题3.1 相关匹配法3.2 归一化相关匹配法3.3 窗口问题3.4 相关法存在的问题3.5 约束问题 一、平行视图 1.1 示意图 如下图即是一个平行视图。特点&a…

怎样在Linux搭建NTP服务器

搭建 NTP&#xff08;Network Time Protocol&#xff09;服务器可以帮助你在局域网内提供时间同步服务&#xff0c;让网络中的设备都使用统一的时间。以下是在 Linux 系统上搭建 NTP 服务器的基本步骤&#xff1a; 安装 NTP 服务器软件&#xff1a; 在终端中执行以下命令安装 N…

第二十章 红黑树

大家应该都接触过平衡二叉树(AVLTree)&#xff0c;了解到 AVL 树的性质&#xff0c;其实平衡二叉树最大的作用就是查找&#xff0c;AVL 树的查找、插入和删除在平均和最坏情况下都是 O(logn)。AVL 树的效率就是高在这个地方。如果在 AVL 树中插入或删除节点后&#xff0c;使得高…

JUC:synchronized优化——锁的升级过程(偏向锁->轻量级锁->重量级锁)以及内部实现原理

文章目录 锁的类型轻量级锁重量级锁自旋优化偏向锁偏向锁的细节偏向锁的撤销批量重偏向批量撤销锁消除 锁的类型 重量级锁、轻量级锁、偏向锁。 加锁过程&#xff1a;偏向->轻量级->重量级 轻量级锁 轻量级锁的使用场景&#xff1a;如果一个对象虽然有多线程要加锁&am…

HarmonyOS 应用开发之Actor并发模型对比内存共享并发模型

内存共享并发模型指多线程同时执行复数任务&#xff0c;这些线程依赖同一内存并且都有权限访问&#xff0c;线程访问内存前需要抢占并锁定内存的使用权&#xff0c;没有抢占到内存的线程需要等待其他线程释放使用权再执行。 Actor并发模型每一个线程都是一个独立Actor&#xf…

IDEA无法连接虚拟机中的Redis的解决方案,无法连接Jedis,无法ping通虚拟机的解决方案

首先&#xff0c;笔者先说明一下自身的情况&#xff0c;怎么连接都连不上&#xff0c;网上的教程全部都看了一遍&#xff0c;基本上没用得上的&#xff0c;这篇文章里面的解决方案包括了笔者能在网上找到了最全面的办法总结&#xff0c;最后终于是连上了 目录 一.连接Jedis出错…

大数据学习第十一天(复习linux指令3)

1、su和exit su命令就是用于账户切换的系统命令 基本语法&#xff1a;su[-] [用户名] 1&#xff09;-表示是否在切换用户后加载变量&#xff0c;建议带上 2&#xff09;参数&#xff1a;用户名&#xff0c;表示切换用户 3&#xff09;切换用户后&#xff0c;可以通过exit命令退…

数据结构——lesson12排序之归并排序

&#x1f49e;&#x1f49e; 前言 hello hello~ &#xff0c;这里是大耳朵土土垚~&#x1f496;&#x1f496; &#xff0c;欢迎大家点赞&#x1f973;&#x1f973;关注&#x1f4a5;&#x1f4a5;收藏&#x1f339;&#x1f339;&#x1f339; &#x1f4a5;个人主页&#x…

【CANN训练营笔记】AscendCL图片分类应用(C++实现)

样例介绍 基于PyTorch框架的ResNet50模型&#xff0c;对*.jpg图片分类&#xff0c;输出各图片所属分类的编号、名称。 环境介绍 华为云AI1s CPU&#xff1a;Intel Xeon Gold 6278C CPU 2.60GHz 内存&#xff1a;8G NPU&#xff1a;Ascend 310 环境准备 下载驱动 wget ht…

小折叠手机无法使用车上的无线充电?车和手机都没问题

最近看到一个案例——一位新入手Pocket 2的机主&#xff0c;发现自己的手机无法在车上进行无线充电。检查了手机和汽车都没问题&#xff0c;折腾大半天结果发现是电磁线圈没对准无线充电的位置。 无线充电的原理是手机的无线充电电磁线圈对准电磁线圈&#xff0c;通过电磁波感…

Wireshark TS | HTTP 传输文件慢问题

问题背景 之前有几篇文章写过关于应用传输慢的问题&#xff0c;延用之前的老套话&#xff0c;应用传输慢是一种比较常见的问题&#xff0c;慢在哪&#xff0c;为什么慢&#xff0c;有时候光从网络数据包分析方面很难回答的一清二楚&#xff0c;毕竟应用的定义范围实在太广&…