elasticsearch & MySQL 数据同步。
文章目录
- elasticsearch & MySQL 数据同步。
- 3. 数据同步。
- 3.1. 思路分析。
- 3.1.1. 同步调用。
- 3.1.2. 异步通知。
- 3.1.3. 监听 binlog。
- 3.1.4. 选择。
- 3.2. 实现数据同步。
- 3.2.1. 思路。
- 3.2.2. 导入 demo。
- 3.2.3. 声明交换机、队列。
- 1)引入依赖。
- 2)声明队列交换机名称。
- 3)声明队列交换机。
- 3.2.4. 发送 MQ 消息。
- 3.2.5. 接收 MQ 消息。
3. 数据同步。
elasticsearch 中的酒店数据来自于 mysql 数据库,因此 mysql 数据发生改变时,elasticsearch 也必须跟着改变,这个就是 elasticsearch 与 mysql 之间的数据同步。
在微服务中,负责酒店管理(操作 MySQL)的业务与负责酒店搜索(操作 elasticsearch)的业务可能在两个不同的微服务上,两者数据该如克如何保持同步?
3.1. 思路分析。
常见的数据同步方案有三种。
-
同步调用。
-
异步通知。
-
监听 binlog。
3.1.1. 同步调用。
方案一:同步调用。
基本步骤如下。
-
hotel-demo 对外提供接口,用来修改 elasticsearch 中的数据。
-
酒店管理服务在完成数据库操作后,直接调用 hotel-demo 提供的接口,
3.1.2. 异步通知。
方案二:异步通知。
流程如下。
-
hotel-admin 对 mysql 数据库数据完成增、删、改后,发送 MQ 消息。
-
hotel-demo 监听 MQ,接收到消息后完成 elasticsearch 数据修改。
3.1.3. 监听 binlog。
方案三:监听 binlog。
流程如下。
-
给 mysql 开启 binlog 功能。
-
mysql 完成增、删、改操作都会记录在 binlog 中。
-
hotel-demo 基于 canal 监听 binlog 变化,实时更新 elasticsearch 中的内容。
3.1.4. 选择。
方式一:同步调用。
-
优点:实现简单,粗暴。
-
缺点:业务耦合度高。
方式二:异步通知。
-
优点:低耦合,实现难度一般。
-
缺点:依赖 mq 的可靠性。
方式三:监听 binlog。
-
优点:完全解除服务间耦合。
-
缺点:开启 binlog 增加数据库负担、实现复杂度高。
3.2. 实现数据同步。
3.2.1. 思路。
开发 hotel-admin 项目作为酒店管理的微服务。当酒店数据发生增、删、改时,要求对 elasticsearch 中数据也要完成相同操作。
步骤。
-
导入 hotel-admin 项目,启动并测试酒店数据的 CRUD。
-
声明 exchange、queue、routingKey。
-
在 hotel-admin 中的增、删、改业务中完成消息发送。
-
在 hotel-demo 中完成消息监听,并更新 elasticsearch 中数据。
-
启动并测试数据同步功能。
3.2.2. 导入 demo。
导入 hotel-admin 项目。
运行后,访问 http://localhost:8099。
其中包含了酒店的 CRUD 功能。
3.2.3. 声明交换机、队列。
MQ 结构如图。
1)引入依赖。
在 hotel-admin、hotel-demo 中引入 rabbitmq 的依赖。
<!-- amqp。-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2)声明队列交换机名称。
在 hotel-admin 和 hotel-demo 中的 com.geek.elasticsearchgeek.hotel.constatnts
包下新建一个类 MqConstants
。
package com.geek.elasticsearchgeek.hotel.constant;/*** @author geek*/
public interface IMqConstants {/*** 交换机。*/String TOPIC_EXCHANGE_HOTEL = "topic.exchange.hotel";/*** 监听新增和修改的队列。*/String QUEUE_HOTEL_INSERT = "queue.hotel.insert";/*** 监听删除的队列。*/String QUEUE_HOTEL_DELETE = "queue.hotel.delete";/*** 新增或修改的 RoutingKey。*/String ROUTING_KEY_HOTEL_INSERT = "routing.key.hotel.insert";/*** 删除的 RoutingKey。*/String ROUTING_KEY_HOTEL_DELETE = "routing.key.hotel.delete";}
3)声明队列交换机。
在 hotel-demo 中,定义配置类,声明队列、交换机。
package com.geek.elasticsearchgeek.hotel.config;import com.geek.elasticsearchgeek.hotel.constant.IMqConstants;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** @author geek*/
@Configuration
public class MqConfig {@Beanpublic TopicExchange topicExchange() {return new TopicExchange(IMqConstants.TOPIC_EXCHANGE_HOTEL, true, false);}@Beanpublic Queue queueHotelInsert() {return new Queue(IMqConstants.QUEUE_HOTEL_INSERT, true);}@Beanpublic Queue queueHotelDelete() {return new Queue(IMqConstants.QUEUE_HOTEL_DELETE, true);}@Beanpublic Binding hotelInsertQueueRoutingKeyBinding() {return BindingBuilder.bind(queueHotelInsert()).to(topicExchange()).with(IMqConstants.ROUTING_KEY_HOTEL_INSERT);}@Beanpublic Binding hotelDeleteQueueRoutingKeyBinding() {return BindingBuilder.bind(queueHotelDelete()).to(topicExchange()).with(IMqConstants.ROUTING_KEY_HOTEL_DELETE);}}
3.2.4. 发送 MQ 消息。
在 hotel-admin 中的增、删、改业务中分别发送 MQ 消息。
package com.geek.elasticsearch.hotel.admin.controller;import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.geek.elasticsearch.hotel.admin.constant.IMqConstants;
import com.geek.elasticsearch.hotel.admin.dataobject.Hotel;
import com.geek.elasticsearch.hotel.admin.dto.PageResult;
import com.geek.elasticsearch.hotel.admin.service.IHotelService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;import java.security.InvalidParameterException;/*** @author geek*/
@Slf4j
@RestController
@RequestMapping("/hotel")
public class HotelController {@Autowiredprivate IHotelService hotelService;@Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("/{id}")public Hotel queryById(@PathVariable("id") Long id) {return this.hotelService.getById(id);}@GetMapping("/list")public PageResult hotelList(@RequestParam(value = "page", defaultValue = "1") Integer page,@RequestParam(value = "size", defaultValue = "1") Integer size) {Page<Hotel> result = this.hotelService.page(new Page<>(page, size));return new PageResult(result.getTotal(), result.getRecords());}@PostMappingpublic void saveHotel(@RequestBody Hotel hotel) {this.hotelService.save(hotel);this.rabbitTemplate.convertAndSend(IMqConstants.TOPIC_EXCHANGE_HOTEL,IMqConstants.ROUTING_KEY_HOTEL_INSERT,hotel.getId());}@DeleteMapping("/{id}")public void deleteById(@PathVariable("id") Long id) {this.hotelService.removeById(id);this.rabbitTemplate.convertAndSend(IMqConstants.TOPIC_EXCHANGE_HOTEL,IMqConstants.ROUTING_KEY_HOTEL_DELETE,id);}@PutMappingpublic void updateById(@RequestBody Hotel hotel) {if (hotel.getId() == null) {throw new InvalidParameterException("id 不能为空。");}this.hotelService.updateById(hotel);this.rabbitTemplate.convertAndSend(IMqConstants.TOPIC_EXCHANGE_HOTEL,IMqConstants.ROUTING_KEY_HOTEL_INSERT,hotel.getId());}}
3.2.5. 接收 MQ 消息。
hotel-demo 接收到 MQ 消息要做的事情包括。
-
新增消息:根据传递的 hotel 的 id 查询 hotel 信息,然后新增一条数据到索引库。
-
删除消息:根据传递的 hotel 的 id 删除索引库中的一条数据。
1)首先在 hotel-demo 的 com.geek.elasticsearchgeek.hotel.service
包下的 IHotelService
中新增新增、删除业务。
void deleteById(Long id);void insertById(Long id);
2)给 hotel-demo 中的 com.geek.elasticsearchgeek.hotel.service.impl
包下的 HotelService 中实现业务。
@Overridepublic void insertById(Long id) {// 根据 id 查询酒店数据。Hotel hotel = getById(id);// 转换为文档类型。HotelDoc hotelDoc = new HotelDoc(hotel);// 准备 Request 对象。IndexRequest indexRequest = new IndexRequest("hotel").id(hotel.getId().toString());// 准备 Json 文档。indexRequest.source(JSON.toJSONString(hotelDoc), XContentType.JSON);// 发送请求。try {this.restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic void deleteById(Long id) {// 准备 Request。DeleteRequest deleteRequest = new DeleteRequest("hotel", id.toString());// 发送请求。try {this.restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT);} catch (IOException e) {throw new RuntimeException(e);}}
3)编写监听器。
在 hotel-demo 中的com.geek.elasticsearchgeek.hotel.mq
包新增一个类。
package com.geek.elasticsearchgeek.hotel.mq;import com.geek.elasticsearchgeek.hotel.constant.IMqConstants;
import com.geek.elasticsearchgeek.hotel.service.IHotelService;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** @author geek*/
@Component
public class HotelMqListener {@Autowiredprivate IHotelService hotelService;/*** 监听酒店新增或修改的业务。** @param id 酒店 id。*/@RabbitListener(queues = IMqConstants.QUEUE_HOTEL_INSERT)public void listenHotelInsertOrUpdate(Long id) {this.hotelService.insertById(id);}/*** 监听酒店删除的业务。** @param id 酒店 id。*/@RabbitListener(queues = IMqConstants.QUEUE_HOTEL_DELETE)public void listenHotelDelete(Long id) {this.hotelService.deleteById(id);}}