在springboot中Redis数据与MySQL数据的一致性方案思考和案例

news/2024/5/30 17:34:01/文章来源:https://blog.csdn.net/txhlxy/article/details/136664770

文章目录

  • 前言
  • 一、双写一致性模式(同步)
    • Redis->MySQL
    • MySQL->Redis
  • 二、数据监听模式(异步)
    • Redis->MySQL
    • MySQL -> Redis
  • 总结


前言

Redis和MySQL之间保持数据一致性是个复杂的问题,搜索资料发现大部分也只做了理论的说明。主流的方案大概是两种,一种是同步,一种是异步。下面我们来分析这两种模式。


一、双写一致性模式(同步)

双写就是在插入Redis数据的同时再向MySQL写入,或者在写入MySQL的同时再向Redis写入。这种方式的优点是数据高度一致,而且实时同步。但缺点也很明显,侵入性太强,需要时刻编码,同时还需要考虑各自的事务控制。具体实现方案如下:

Redis->MySQL

这种方式需要Redis来显示控制事务,当然数据库事务也必须要有

package com.test.spring;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@SpringBootApplication
@RestController
@EnableTransactionManagement
public class TestRedisToMysqlApplication {@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Autowiredprivate JdbcTemplate jdbcTemplate;@RequestMapping("/test")@Transactionalpublic String test1(){stringRedisTemplate.execute(new RedisCallback<Boolean>() {@Overridepublic Boolean doInRedis(RedisConnection connection) throws DataAccessException {connection.multi();connection.commands().set("k1".getBytes(),"1".getBytes());connection.commands().set("k2".getBytes(),"2".getBytes());jdbcTemplate.update("insert into t_user (k1,k2) values (?,?)","1","2");connection.exec();return true;}});return "success";}public static void main(String[] args) {SpringApplication.run(TestRedisToMysqlApplication.class, args);}}

MySQL->Redis

这种方式,只需要控制jdbc事务即可:

package com.test.spring;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@SpringBootApplication
@RestController
@EnableTransactionManagement
public class TestMysqlToRedisApplication {@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Autowiredprivate JdbcTemplate jdbcTemplate;@RequestMapping("/test")@Transactionalpublic String test1(){jdbcTemplate.update("insert into t_user (k1,k2) values (?,?)","1","2");stringRedisTemplate.opsForValue().set("k1","1");stringRedisTemplate.opsForValue().set("k2","2");return "success";}public static void main(String[] args) {SpringApplication.run(TestMysqlToRedisApplication.class, args);}}

二、数据监听模式(异步)

异步模式是通过对Redis的监听或者对MySQL的监听来实现,这种方式具有一定延迟,但是对原有代码无侵入性,可以单独开发程序来独立执行,并且无需关心各自的事务操作。在不需要绝对实时性的情况下,是不错的选择。

Redis->MySQL

这种模式需要在Redis的配置文件redis.conf中修改:

notify-keyspace-events "KEA"
package com.test.spring;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.annotation.Bean;
import org.springframework.context.event.EventListener;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.Topic;
import org.springframework.jdbc.core.JdbcTemplate;import java.nio.charset.StandardCharsets;
import java.util.Objects;@SpringBootApplication
public class TestRedisApplication {@Autowiredprivate StringRedisTemplate stringRedisTemplate;@Autowiredprivate JdbcTemplate jdbcTemplate;@Beanpublic MessageListener redisMessageListener() {return (Message message, byte[] pattern)->{String key = new String(message.getBody(), StandardCharsets.UTF_8);String value=stringRedisTemplate.opsForValue().get(key);System.out.println("key:" + key+"  发生变化。变化的值:"+value);//下面进行数据库操作,具体的逻辑需要根据你的设计来编写jdbcTemplate.update("insert into t_user ("+key+") values (?)",key,value);};}@Beanpublic RedisMessageListenerContainer redisMessageListenerContainer() {final RedisMessageListenerContainer container = new RedisMessageListenerContainer();container.setConnectionFactory(Objects.requireNonNull(stringRedisTemplate.getConnectionFactory()));return container;}@EventListenervoid listener(ApplicationReadyEvent event) {Topic topic = new PatternTopic("__keyevent@*");// 监听 整个redis数据库 的所有事件;RedisMessageListenerContainer redisMessageListenerContainer = event.getApplicationContext().getBean(RedisMessageListenerContainer.class);MessageListener redisMessageListener = event.getApplicationContext().getBean(MessageListener.class);redisMessageListenerContainer.addMessageListener(redisMessageListener, topic);}public static void main(String[] args) {SpringApplication.run(TestRedisApplication.class, args);}}

MySQL -> Redis

监听MySQL最方便的方式是监听MySQL的二进制文件,这种方式对原有数据无侵入。关于二进制文件的监听方案有很多,比如:Canal ,但是Canal再和Java集成上稍显复杂,这里给大家介绍另外一款工具:Debezium,在集成上很方便,具体操作如下:
加入maven依赖:

<dependency><groupId>io.debezium</groupId><artifactId>debezium-api</artifactId><version>1.6.0.Final</version>
</dependency>
<dependency><groupId>io.debezium</groupId><artifactId>debezium-embedded</artifactId><version>1.6.0.Final</version>
</dependency>
<dependency><groupId>io.debezium</groupId><artifactId>debezium-connector-mysql</artifactId><version>1.6.0.Final</version>
</dependency>

编写DebeziumServerBootstrap用作启动Debezium

package com.test.spring;import io.debezium.engine.DebeziumEngine;
import lombok.Data;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.SmartLifecycle;
import org.springframework.util.Assert;@Data
@Slf4j
public class DebeziumServerBootstrap implements InitializingBean, SmartLifecycle {private final Thread.UncaughtExceptionHandler handler = new Thread.UncaughtExceptionHandler() {public void uncaughtException(Thread t, Throwable e) {log.error("解析事件有一个错误 ", e);}};private Thread thread = null;private boolean running = false;private DebeziumEngine<?> debeziumEngine;@Overridepublic void start() {thread=new Thread(debeziumEngine);thread.setName("debezium-server-thread");thread.setUncaughtExceptionHandler(handler);thread.start();running = true;}@SneakyThrows@Overridepublic void stop() {debeziumEngine.close();this.running=false;thread.join();log.info("DebeziumServerBootstrap stop ");}@Overridepublic boolean isRunning() {return running;}@Overridepublic void afterPropertiesSet() throws Exception {Assert.notNull(debeziumEngine, "debeziumEngine must not be null");}
}

编写DebeziumConfiguration配置

package com.test.spring;import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.data.Envelope;
import io.debezium.embedded.Connect;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.RecordChangeEvent;
import io.debezium.engine.format.ChangeEventFormat;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.data.Field;
import org.springframework.context.annotation.Bean;
import org.springframework.core.env.Environment;
import org.apache.commons.lang3.tuple.Pair;import java.util.List;
import java.util.Map;import static io.debezium.data.Envelope.FieldName.AFTER;
import static io.debezium.data.Envelope.FieldName.BEFORE;
import static io.debezium.data.Envelope.FieldName.OPERATION;
import static java.util.stream.Collectors.toMap;@Slf4j
public class DebeziumConfiguration {private static final String serverName="debecontrol";/*** Debezium 配置.** @return configuration*/@Beanpublic io.debezium.config.Configuration debeziumConfig(Environment environment) {String username=environment.getProperty("spring.datasource.username");String password=environment.getProperty("spring.datasource.password");String dir=environment.getProperty("canal.conf.dir");String defaultDatabaseName=environment.getProperty("canal.defaultDatabaseName");String slaveId=environment.getProperty("canal.slaveId");String url=environment.getProperty("canal.address");String[] urls=url.split("[:]");return io.debezium.config.Configuration.create()
//            连接器的Java类名称.with("connector.class", MySqlConnector.class.getName())
//            偏移量持久化,用来容错 默认值.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
//                捕获偏移量的周期.with("offset.flush.interval.ms", "6000")
//               连接器的唯一名称.with("name", "mysql-connector")
//                数据库的hostname.with("database.hostname", urls[0])
//                端口.with("database.port", urls[1])
//                用户名.with("database.user", username)
//                密码.with("database.password", password)
//                 包含的数据库列表.with("database.include.list", defaultDatabaseName)
//                是否包含数据库表结构层面的变更,建议使用默认值true.with("include.schema.changes", "false")
//                mysql.cnf 配置的 server-id.with("database.server.id", slaveId)
//                	MySQL 服务器或集群的逻辑名称.with("database.server.name", serverName)
//                历史变更记录.with("database.history", "io.debezium.relational.history.FileDatabaseHistory").build();}/*** Debezium server bootstrap debezium server bootstrap.** @param configuration the configuration* @return the debezium server bootstrap*/@Beanpublic DebeziumServerBootstrap debeziumServerBootstrap(io.debezium.config.Configuration configuration) {DebeziumServerBootstrap debeziumServerBootstrap = new DebeziumServerBootstrap();DebeziumEngine<RecordChangeEvent<SourceRecord>> debeziumEngine = DebeziumEngine.create(ChangeEventFormat.of(Connect.class)).using(configuration.asProperties()).notifying(this::handlePayload).build();debeziumServerBootstrap.setDebeziumEngine(debeziumEngine);return debeziumServerBootstrap;}private void handlePayload(List<RecordChangeEvent<SourceRecord>> recordChangeEvents, DebeziumEngine.RecordCommitter<RecordChangeEvent<SourceRecord>> recordCommitter) {recordChangeEvents.forEach(r -> {SourceRecord sourceRecord = r.record();Struct sourceRecordChangeValue = (Struct) sourceRecord.value();if(sourceRecordChangeValue==null) return;this.handlePayload1(sourceRecordChangeValue);});}private void handlePayload1(Struct sourceRecordChangeValue){try{// 判断操作的类型 过滤掉读 只处理增删改   这个其实可以在配置中设置Envelope.Operation operation = Envelope.Operation.forCode((String) sourceRecordChangeValue.get(OPERATION));if(operation==Envelope.Operation.READ) return;//customer_mysql_db_server.control.t_dic.Envelope//debecontrol.control.t_dic.EnvelopeString name = sourceRecordChangeValue.schema().name();String[] names=name.split("[.]");String talbe=names[2];// 获取增删改对应的结构体数据Struct before_struct = (Struct) sourceRecordChangeValue.get(BEFORE);// 将变更的行封装为MapMap<String, Object> before_payload =null;if(before_struct!=null){before_payload = before_struct.schema().fields().stream().map(Field::name).filter(fieldName -> before_struct.get(fieldName) != null).map(fieldName -> Pair.of(fieldName, before_struct.get(fieldName))).collect(toMap(Pair::getKey, Pair::getValue));}// 获取增删改对应的结构体数据Struct after_struct = (Struct) sourceRecordChangeValue.get(AFTER);Map<String, Object> after_payload =null;if(after_struct!=null){// 将变更的行封装为Mapafter_payload = after_struct.schema().fields().stream().map(Field::name).filter(fieldName -> after_struct.get(fieldName) != null).map(fieldName -> Pair.of(fieldName, after_struct.get(fieldName))).collect(toMap(Pair::getKey, Pair::getValue));}//在这里进行Redis操作if(operation==Envelope.Operation.CREATE){//数据库插入}else if(operation==Envelope.Operation.UPDATE){//数据库更新}else if(operation==Envelope.Operation.DELETE){//数据库删除}}catch (Exception e){log.warn("解析数据错误:"+e.getMessage());}}}

入口类

package com.test.spring;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Import;
@SpringBootApplication
@Import(DebeziumConfiguration.class)
public class TestMysqlApplication {public static void main(String[] args) {SpringApplication.run(TestMysqlApplication.class, args);}}

这里我们需要开启MySQL的二进制日志,需要修改my.cnf文件,增加如下配置:

log-bin=mysql-bin
binlog_format=row
server-id=1
log_bin_trust_function_creators=1

总结

关于Redis与MySQL数据一致性,我觉得还需要考虑各自的数据结构如何设计,因为这两种存储方式完全不一样。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_1006133.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

tongweb7部署应用后应用卡顿的参考思路(by lqw)

文章目录 1.优化jvm和openfile相关参数2.排除网络延迟&#xff08;仅供参考&#xff09;3 查看服务器资源的使用情况3.1查看方式3.1.1cpu占用过高方法1&#xff1a;使用脚本show-busy-java-threads.sh进行分析方法2&#xff1a;使用jstack 3.1.2内存占用过高3.1.1线程阻塞 3 数…

【Python使用】嘿马头条完整开发md笔记第1篇:课程简介,ToutiaoWeb虚拟机使用说明【附代码文档】

嘿马头条项目从到完整开发笔记总结完整教程&#xff08;附代码资料&#xff09;主要内容讲述&#xff1a;课程简介&#xff0c;ToutiaoWeb虚拟机使用说明&#xff0c;Pycharm远程开发&#xff0c;产品与开发&#xff0c;数据库1 产品介绍,2 原型图与UI图,3 技术架构,4 开发。OS…

鸿蒙开发学习:【媒体引擎组件】

简介 HiStreamer是一个轻量级的媒体引擎组件&#xff0c;提供播放、录制等场景的媒体数据流水线处理。 播放场景分为如下几个节点&#xff1a;数据源读取、解封装、解码、输出&#xff1b;录制场景分为如下几个节点&#xff1a;数据源读取、编码、封装、输出。 这些节点的具…

云原生消息流系统 Apache RocketMQ 在腾讯云的大规模生产实践

导语 随着云计算技术的日益成熟&#xff0c;云原生应用已逐渐成为企业数字化转型的核心驱动力。在这一大背景下&#xff0c;高效、稳定、可扩展的消息流系统显得尤为重要。腾讯云高级开发工程师李伟先生&#xff0c;凭借其深厚的技术功底和丰富的实战经验&#xff0c;为我们带…

错误: 找不到或无法加载主类 Hello.class

在运行这串代码 public class Hello{ public static void main(String[] args){ System.out.println("Hello world!"); } } 的时候出现报错&#xff1a;错误: 找不到或无法加载主类 Hello.class 入门级错误 1.公共类的文件名和类名不一致 hello.j…

【LeetCode热题100】240. 搜索二维矩阵 II

一.题目要求 编写一个高效的算法来搜索 m x n 矩阵 matrix 中的一个目标值 target 。该矩阵具有以下特性&#xff1a; 每行的元素从左到右升序排列。 ‘每列的元素从上到下升序排列。 二.题目难度 中等 三.输入样例 示例 1&#xff1a; 输入&#xff1a;matrix [[1,4,7…

搭建Hadoop3.x完全分布式集群

零、资源准备 虚拟机相关&#xff1a; VMware workstation 16&#xff1a;虚拟机 > vmware_177981.zipCentOS Stream 9&#xff1a;虚拟机 > CentOS-Stream-9-latest-x86_64-dvd1.iso Hadoop相关 jdk1.8&#xff1a;JDK > jdk-8u261-linux-x64.tar.gzHadoop 3.3.6&am…

17、设计模式之策略模式(Strategy)

一、什么是策略模式 策略模式属于行为型设计模式。定义了一系列算法&#xff0c;并将这些算法封装到一个类中&#xff0c;使得他们可以相互替换。这样&#xff0c;我们可以在改变某个对象使用的算法的情况下&#xff0c;选择一个合适的算法来处理特定的任务&#xff0c;主要解决…

全球首位AI软件工程师诞生,未来程序员会被取代吗?

今天早上看到一条消息&#xff0c;Cognition发布了世界首位AI程序员Devin&#xff0c;直接把我惊呆了&#xff0c;难道程序员是真要失业了吗&#xff1f; 全球首位AI软件工程师一亮相&#xff0c;直接引爆整个互联网圈。只需要一句指令&#xff0c;Devin就可以通过使用自己的s…

摄像机内存卡删除的视频如何恢复?恢复指南来袭

在现代社会&#xff0c;摄像机已成为记录生活、工作和学习的重要设备。然而&#xff0c;随着使用频率的增加&#xff0c;误删或意外丢失视频的情况也时有发生。面对这样的情况&#xff0c;许多用户可能会感到无助和困惑。那么&#xff0c;摄像机内存卡删除的视频真的无法恢复吗…

【05】消失的数字

hellohello~这里是土土数据结构学习笔记&#x1f973;&#x1f973; &#x1f4a5;个人主页&#xff1a;大耳朵土土垚的博客 &#x1f4a5;所属专栏&#xff1a;C语言函数实现 感谢大家的观看与支持&#x1f339;&#x1f339;&#x1f339; 有问题可以写在评论区或者私信我哦…

数据结构-链表(二)

1.两两交换列表中的节点 给你一个链表&#xff0c;两两交换其中相邻的节点&#xff0c;并返回交换后链表的头节点。你必须在不修改节点内部的值的情况下完成本题&#xff08;即&#xff0c;只能进行节点交换&#xff09;。 输入&#xff1a;head [1,2,3,4] 输出&#xff1a;[2…

ASP.NET排课实验室排课,生成班级课表实验室课表教师课表(vb.net)-214-(代码+说明)

转载地址: http://www.3q2008.com/soft/search.asp?keyword214 要看成品演示 请联系客服发给您成品演示 课题&#xff1a;实验课排课系统 计算机 上机课 一周上5天课&#xff0c;周一到周五 一周上5天课&#xff0c;周一到周五 因为我排的是实验课&#xff0c;最好1&#xf…

GPT-4.5 Turbo意外曝光,最快明天发布?OpenAI终于要放大招了!

大家好&#xff0c;我是木易&#xff0c;一个持续关注AI领域的互联网技术产品经理&#xff0c;国内Top2本科&#xff0c;美国Top10 CS研究生&#xff0c;MBA。我坚信AI是普通人变强的“外挂”&#xff0c;所以创建了“AI信息Gap”这个公众号&#xff0c;专注于分享AI全维度知识…

Java基于微信小程序的童装商城

博主介绍&#xff1a;✌程序员徐师兄、7年大厂程序员经历。全网粉丝12w、csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ &#x1f345;文末获取源码联系&#x1f345; &#x1f447;&#x1f3fb; 精彩专栏推荐订阅&#x1f447;…

【MySQL 系列】MySQL 索引篇

在 MySQL 中&#xff0c;索引是一种帮助存储引擎快速获取数据的数据结构&#xff0c;形象的说就是索引是数据的目录。它一般是以包含索引键值和一个指向索引键值对应数据记录物理地址的指针的节点的集合的清单的形式存在。通过使用索引&#xff0c; MySQL 可以在不需要扫描整个…

K-means算法(一篇文章讲透)

目录 一、引言 二、K-means算法的基本原理 三、优缺点 优点&#xff1a; 1 简单易懂 2 收敛速度快 3 聚类效果好 4 优化迭代功能 缺点&#xff1a; 1 对初始值敏感 2 局部最优问题 3 对非凸形状聚类效果不佳 4 易受噪声和异常值影响 5 K值难以确定 6 数据类型限…

OCR-free相关论文梳理

⚠️注意&#xff1a;暂未写完&#xff0c;持续更新中 引言 通用文档理解&#xff0c;是OCR任务的终极目标。现阶段的OCR各种垂类任务都是通用文档理解任务的子集。这感觉就像我们一下子做不到通用文档理解&#xff0c;退而求其次&#xff0c;先做各种垂类任务。 现阶段&…

Redis 哨兵集群如何实现高可用?(1)

目录 1.哨兵的介绍 2.哨兵的核心知识 3.Redis 哨兵主备切换的数据丢失问题 &#xff08;1&#xff09;异步复制导致的数据丢失 &#xff08;2&#xff09;脑裂导致的数据丢失 4.数据丢失问题的解决方案 &#xff08;1&#xff09;减少异步复制数据的丢失 &#xff08;2&…

6、设计模式之适配器模式(Adapter)

一、什么是适配器模式 适配器模式是一种结构型设计模式&#xff0c;它允许将不兼容的对象转换成可兼容的接口。主要目的是解决在不改变现有代码的情况下&#xff0c;使不兼容的接口之间能够正常工作&#xff0c;通过创建一个中间转换的适配器来将一个对象转换成我们所需要的接口…