elasticsearch MySQL 数据同步。

news/2024/5/1 18:25:55/文章来源:https://blog.csdn.net/lyfGeek/article/details/130094184

elasticsearch & MySQL 数据同步。


文章目录

    • elasticsearch & MySQL 数据同步。
      • 3. 数据同步。
          • 3.1. 思路分析。
            • 3.1.1. 同步调用。
            • 3.1.2. 异步通知。
            • 3.1.3. 监听 binlog。
            • 3.1.4. 选择。
          • 3.2. 实现数据同步。
            • 3.2.1. 思路。
            • 3.2.2. 导入 demo。
            • 3.2.3. 声明交换机、队列。
            • 1)引入依赖。
            • 2)声明队列交换机名称。
            • 3)声明队列交换机。
            • 3.2.4. 发送 MQ 消息。
            • 3.2.5. 接收 MQ 消息。


3. 数据同步。

elasticsearch 中的酒店数据来自于 mysql 数据库,因此 mysql 数据发生改变时,elasticsearch 也必须跟着改变,这个就是 elasticsearch 与 mysql 之间的数据同步

在微服务中,负责酒店管理(操作 MySQL)的业务与负责酒店搜索(操作 elasticsearch)的业务可能在两个不同的微服务上,两者数据该如克如何保持同步?



3.1. 思路分析。

常见的数据同步方案有三种。

  • 同步调用。

  • 异步通知。

  • 监听 binlog。



3.1.1. 同步调用。

方案一:同步调用。

在这里插入图片描述
基本步骤如下。

  • hotel-demo 对外提供接口,用来修改 elasticsearch 中的数据。

  • 酒店管理服务在完成数据库操作后,直接调用 hotel-demo 提供的接口,



3.1.2. 异步通知。

方案二:异步通知。

在这里插入图片描述
流程如下。

  • hotel-admin 对 mysql 数据库数据完成增、删、改后,发送 MQ 消息。

  • hotel-demo 监听 MQ,接收到消息后完成 elasticsearch 数据修改。



3.1.3. 监听 binlog。

方案三:监听 binlog。

在这里插入图片描述
流程如下。

  • 给 mysql 开启 binlog 功能。

  • mysql 完成增、删、改操作都会记录在 binlog 中。

  • hotel-demo 基于 canal 监听 binlog 变化,实时更新 elasticsearch 中的内容。



3.1.4. 选择。

方式一:同步调用。

  • 优点:实现简单,粗暴。

  • 缺点:业务耦合度高。

方式二:异步通知。

  • 优点:低耦合,实现难度一般。

  • 缺点:依赖 mq 的可靠性。

方式三:监听 binlog。

  • 优点:完全解除服务间耦合。

  • 缺点:开启 binlog 增加数据库负担、实现复杂度高。



3.2. 实现数据同步。
3.2.1. 思路。

开发 hotel-admin 项目作为酒店管理的微服务。当酒店数据发生增、删、改时,要求对 elasticsearch 中数据也要完成相同操作。

步骤。

  • 导入 hotel-admin 项目,启动并测试酒店数据的 CRUD。

  • 声明 exchange、queue、routingKey。

  • 在 hotel-admin 中的增、删、改业务中完成消息发送。

  • 在 hotel-demo 中完成消息监听,并更新 elasticsearch 中数据。

  • 启动并测试数据同步功能。



3.2.2. 导入 demo。

导入 hotel-admin 项目。

运行后,访问 http://localhost:8099。

其中包含了酒店的 CRUD 功能。



3.2.3. 声明交换机、队列。

MQ 结构如图。

在这里插入图片描述



1)引入依赖。

在 hotel-admin、hotel-demo 中引入 rabbitmq 的依赖。

<!-- amqp。-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>


2)声明队列交换机名称。

在 hotel-admin 和 hotel-demo 中的 com.geek.elasticsearchgeek.hotel.constatnts 包下新建一个类 MqConstants

package com.geek.elasticsearchgeek.hotel.constant;/*** @author geek*/
public interface IMqConstants {/*** 交换机。*/String TOPIC_EXCHANGE_HOTEL = "topic.exchange.hotel";/*** 监听新增和修改的队列。*/String QUEUE_HOTEL_INSERT = "queue.hotel.insert";/*** 监听删除的队列。*/String QUEUE_HOTEL_DELETE = "queue.hotel.delete";/*** 新增或修改的 RoutingKey。*/String ROUTING_KEY_HOTEL_INSERT = "routing.key.hotel.insert";/*** 删除的 RoutingKey。*/String ROUTING_KEY_HOTEL_DELETE =  "routing.key.hotel.delete";}


3)声明队列交换机。

在 hotel-demo 中,定义配置类,声明队列、交换机。

package com.geek.elasticsearchgeek.hotel.config;import com.geek.elasticsearchgeek.hotel.constant.IMqConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author geek*/
@Configuration
public class MqConfig {@Beanpublic TopicExchange topicExchange() {return new TopicExchange(IMqConstants.TOPIC_EXCHANGE_HOTEL, true, false);}@Beanpublic Queue queueHotelInsert() {return new Queue(IMqConstants.QUEUE_HOTEL_INSERT, true);}@Beanpublic Queue queueHotelDelete() {return new Queue(IMqConstants.QUEUE_HOTEL_DELETE, true);}@Beanpublic Binding hotelInsertQueueRoutingKeyBinding() {return BindingBuilder.bind(queueHotelInsert()).to(topicExchange()).with(IMqConstants.ROUTING_KEY_HOTEL_INSERT);}@Beanpublic Binding hotelDeleteQueueRoutingKeyBinding() {return BindingBuilder.bind(queueHotelDelete()).to(topicExchange()).with(IMqConstants.ROUTING_KEY_HOTEL_DELETE);}}


3.2.4. 发送 MQ 消息。

在 hotel-admin 中的增、删、改业务中分别发送 MQ 消息。

package com.geek.elasticsearch.hotel.admin.controller;import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.geek.elasticsearch.hotel.admin.constant.IMqConstants;
import com.geek.elasticsearch.hotel.admin.dataobject.Hotel;
import com.geek.elasticsearch.hotel.admin.dto.PageResult;
import com.geek.elasticsearch.hotel.admin.service.IHotelService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;import java.security.InvalidParameterException;/*** @author geek*/
@Slf4j
@RestController
@RequestMapping("/hotel")
public class HotelController {@Autowiredprivate IHotelService hotelService;@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/{id}")public Hotel queryById(@PathVariable("id") Long id) {return this.hotelService.getById(id);}@GetMapping("/list")public PageResult hotelList(@RequestParam(value = "page", defaultValue = "1") Integer page,@RequestParam(value = "size", defaultValue = "1") Integer size) {Page<Hotel> result = this.hotelService.page(new Page<>(page, size));return new PageResult(result.getTotal(), result.getRecords());}@PostMappingpublic void saveHotel(@RequestBody Hotel hotel) {this.hotelService.save(hotel);this.rabbitTemplate.convertAndSend(IMqConstants.TOPIC_EXCHANGE_HOTEL,IMqConstants.ROUTING_KEY_HOTEL_INSERT,hotel.getId());}@DeleteMapping("/{id}")public void deleteById(@PathVariable("id") Long id) {this.hotelService.removeById(id);this.rabbitTemplate.convertAndSend(IMqConstants.TOPIC_EXCHANGE_HOTEL,IMqConstants.ROUTING_KEY_HOTEL_DELETE,id);}@PutMappingpublic void updateById(@RequestBody Hotel hotel) {if (hotel.getId() == null) {throw new InvalidParameterException("id 不能为空。");}this.hotelService.updateById(hotel);this.rabbitTemplate.convertAndSend(IMqConstants.TOPIC_EXCHANGE_HOTEL,IMqConstants.ROUTING_KEY_HOTEL_INSERT,hotel.getId());}}


3.2.5. 接收 MQ 消息。

hotel-demo 接收到 MQ 消息要做的事情包括。

  • 新增消息:根据传递的 hotel 的 id 查询 hotel 信息,然后新增一条数据到索引库。

  • 删除消息:根据传递的 hotel 的 id 删除索引库中的一条数据。

1)首先在 hotel-demo 的 com.geek.elasticsearchgeek.hotel.service 包下的 IHotelService 中新增新增、删除业务。

void deleteById(Long id);void insertById(Long id);

2)给 hotel-demo 中的 com.geek.elasticsearchgeek.hotel.service.impl 包下的 HotelService 中实现业务。

@Overridepublic void insertById(Long id) {// 根据 id 查询酒店数据。Hotel hotel = getById(id);// 转换为文档类型。HotelDoc hotelDoc = new HotelDoc(hotel);// 准备 Request 对象。IndexRequest indexRequest = new IndexRequest("hotel").id(hotel.getId().toString());// 准备 Json 文档。indexRequest.source(JSON.toJSONString(hotelDoc), XContentType.JSON);// 发送请求。try {this.restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic void deleteById(Long id) {// 准备 Request。DeleteRequest deleteRequest = new DeleteRequest("hotel", id.toString());// 发送请求。try {this.restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException(e);}}

3)编写监听器。

在 hotel-demo 中的com.geek.elasticsearchgeek.hotel.mq包新增一个类。

package com.geek.elasticsearchgeek.hotel.mq;import com.geek.elasticsearchgeek.hotel.constant.IMqConstants;
import com.geek.elasticsearchgeek.hotel.service.IHotelService;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** @author geek*/
@Component
public class HotelMqListener {@Autowiredprivate IHotelService hotelService;/*** 监听酒店新增或修改的业务。** @param id 酒店 id。*/@RabbitListener(queues = IMqConstants.QUEUE_HOTEL_INSERT)public void listenHotelInsertOrUpdate(Long id) {this.hotelService.insertById(id);}/*** 监听酒店删除的业务。** @param id 酒店 id。*/@RabbitListener(queues = IMqConstants.QUEUE_HOTEL_DELETE)public void listenHotelDelete(Long id) {this.hotelService.deleteById(id);}}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_96756.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ChatGLM-6B论文代码笔记

ChatGLM-6B 文章目录ChatGLM-6B前言一、原理1.1 优势1.2 实验1.3 特点&#xff1a;1.4 相关知识点二、实验2.1 环境基础2.2 构建环境2.3 安装依赖2.4 运行2.5 数据2.6 构建前端页面3 总结前言 Github&#xff1a;https://github.com/THUDM/ChatGLM-6B 参考链接&#xff1a; ht…

GPSS【实践 01】Developing a Greenplum Streaming Server Client 自定义GPSS客户端开发实例

自定义GPSS客户端开发流程1.GPSS是什么2.架构3.组件下载安装4.自定义客户端4.1 GPSS Batch Data API Service Definition4.2 Setting up a Java Development Environment4.3 Generating the Batch Data API Client Classes4.4 Coding the GPSS Batch Data Client4.4.1 Connect …

精准关键词获取-行业搜索词分析

SEO关键词的收集通常可以通过以下几种方法&#xff1a; 根据市场价值、搜索词竞争性和企业实际产品特征进行筛选&#xff1a;确定您的关键词列表之前&#xff0c;建议先进行市场分析&#xff0c;了解您的竞争对手、行业状况和目标受众等信息&#xff0c;以更好的了解所需的特定…

为何ChatGPT如此擅长编造故事?

“幻觉”——人工智能中的一个偏见性术语 AI聊天机器人(如OpenAI的ChatGPT)依赖于一种称为“大型语言模型”(LLM)的人工智能来生成它们的响应。LLM是一种计算机程序&#xff0c;经过数百万文本源的训练&#xff0c;可以阅读并生成“自然语言”文本语言&#xff0c;就像人类自然…

HTTP协议概述 | 简析HTTP请求流程 | HTTP8种请求方法

目录 &#x1f30f; HTTP的简单介绍 何为HTTP HTTP1.0与HTTP1.1 &#x1f30f; HTTP的请求方法 1、OPTIONS 2、HEAD 3、GET 4、POST 5、PUT 6、DELETE 7、TRACE 8、CONNECT &#x1f30f; HTTP的工作原理 &#x1f30f; HTTP请求/响应的步骤 1、客户端连接到Web…

【Linux】用户命令(创建,修改,切换,删除,密码)

目录 1.创建 查看用户信息 查看id 2.修改 修改用户名 修改用户uid 操作前&#xff1a; 操作后 修改组名 操作前&#xff1a; 操作后: 修改组id 操作前&#xff1a; 操作后&#xff1a; 操作前&#xff1a; 操作后: 3.切换用户 4.删除 操作前&#xff1a; 操作…

LeetCode:376. 摆动序列——说什么贪心和动规~

&#x1f34e;道阻且长&#xff0c;行则将至。&#x1f353; &#x1f33b;算法&#xff0c;不如说它是一种思考方式&#x1f340;算法专栏&#xff1a; &#x1f449;&#x1f3fb;123 一、&#x1f331;376. 摆动序列 题目描述&#xff1a;如果连续数字之间的差严格地在正数和…

php7类型约束,严格模式

在PHP7之前&#xff0c;函数和类方法不需要声明变量类型 &#xff0c;任何数据都可以被传递和返回&#xff0c;导致几乎大部分的调用操作都要判断返回的数据类型是否合格。 为了解决这个问题&#xff0c;PHP7引入了类型声明。 目前有两类变量可以声明类型&#xff1a; 形参&a…

拼多多运营中需要采集淘宝天猫京东平台商品详情页面数据上架拼多多店铺,如何使用技术封装接口实现

业务背景&#xff1a;电商平台趋势&#xff0c;平台化。大家可以看到大的电商都开始有自己的平台&#xff0c;其实这个道理很清楚&#xff0c;就是因为这是充分利用自己的流量、自己的商品和服务大效益化的一个过程&#xff0c;因为有平台&#xff0c;可以利用全社会的资源弥补…

RPC调用框架简单介绍

一.Thrift Apache Doris目前使用的RPC调度框架。Thrift是一款基于CS&#xff08;client -server&#xff09;架构的RPC通信框架&#xff0c;开发人员可以根据定义Thrift的IDL(interface decription language)文件来定义数据结构和服务接口&#xff0c;灵活性高&#xff0c;支持…

项目5:实现数据字典的上传下载

项目5&#xff1a;实现数据字典的上传下载 1.什么是数据字典&#xff1f;如何设计&#xff1f; 2.业务流程逻辑 3.数据库表的设计 4.实现上传下载逻辑&#xff08;前端&#xff09; 5.实现上传逻辑&#xff08;后端&#xff09; 6.实现下载依赖&#xff08;后端&#xff…

代码随想录Day49

今天继续学习动规解决完全背包问题。 322.零钱兑换 给你一个整数数组 coins &#xff0c;表示不同面额的硬币&#xff1b;以及一个整数 amount &#xff0c;表示总金额。 计算并返回可以凑成总金额所需的最少的硬币个数 。如果没有任何一种硬币组合能组成总金额&#xff0c;…

vuex中的 mapState, mapMutations

vuex中的 mapState&#xff0c; mapMutations Start 今天使用vuex的过程中&#xff0c;遇到 mapState&#xff0c; mapMutations 这么两个函数&#xff0c;今天学习一下这两个函数。 本文介绍的vuex基于 vuex3.0 1. 官方文档说明 1.1 mapState 官方解释 点击这里&#xff1…

【JUC进阶】详解synchronized锁升级

文章目录1. synchronized概述2. synchronized 的实现原理2.1 Java对象组成2.2 Monitor2.3 从字节码角度看synchronized3. 锁升级3.1 偏向锁3.2 轻量级锁1. synchronized概述 synchronized是一个悲观锁&#xff0c;可以实现线程同步&#xff0c;在多线程的环境下&#xff0c;需…

DIN35电压电流转频率单位脉冲输出信号变换器集电极开路隔离变送器

主要特性 将直流电压或电流信号转换成单位脉冲信号。 精度等级&#xff1a;0.1 级、0.2 级。产品出厂前已检验校正&#xff0c;用户可以直接使用。 国际标准信号输入:0-5V/0-10V/1-5V 等电压信号,0-10mA/0-20mA/4-20mA 等电流信号。 输出标准信号&#xff1a;0-5KHz/0-…

Flink CDC 在京东的探索与实践

摘要&#xff1a;本文整理自京东资深技术专家韩飞&#xff0c;在 Flink Forward Asia 2022 数据集成专场的分享。本篇内容主要分为四个部分&#xff1a; 京东自研 CDC 介绍京东场景的 Flink CDC 优化业务案例未来规划点击查看直播回放和演讲 PPT 一、京东自研 CDC 介绍 京东自研…

小白学Pytorch系列- -torch.distributions API Distributions (1)

小白学Pytorch系列- -torch.distributions API Distributions (1) 分布包包含可参数化的概率分布和抽样函数。这允许构造用于优化的随机计算图和随机梯度估计器。这个包通常遵循TensorFlow分发包的设计。 不可能通过随机样本直接反向传播。但是&#xff0c;有两种主要方法可以…

tomcat中出现RFC7230和RFC3986问题解析

问题截图 问题分析 出现上述问题&#xff0c;是因为各版本tomcat中对特殊字符和请求路径中携带中文参数而产生的错误提示。 解决办法 1、调整tomcat版本 tomcat 7.0.76之前的版本不会出现类似问题 2、tomcat9之前&#xff0c;修改tomcat目录底下的/conf/catalina.properti…

chapter-5 数据库设计

以下课程来源于MOOC学习—原课程请见&#xff1a;数据库原理与应用 考研复习 引言 设计的时候: 我们为什么不能设计成R&#xff08;学号&#xff0c;课程号&#xff0c;姓名&#xff0c;所咋系&#xff0c;系主任&#xff0c;成绩&#xff09;&#xff1f; 因为存在数据冗余…

C++算法初级7——二分查找

C算法初级7——二分查找 文章目录C算法初级7——二分查找在升序的数组上进行二分查找总结应用范围应用二分查找的原理&#xff1a;每次排除掉一半答案&#xff0c;使可能的答案区间快速缩小。 二分查找的时间复杂度&#xff1a;O(log n)&#xff0c;因为每次询问会使可行区间的…