Canal数据同步配置

news/2024/5/19 10:30:36/文章来源:https://blog.csdn.net/qq_35708970/article/details/129306584

文章目录

    • Canal数据同步配置
      • 0.canal工作原理
      • 1.**检查binlog功能是否有开启**
      • 2.如果显示状态为OFF表示该功能未开启,开启binlog功能
      • 3.**在mysql里面添加以下的相关用户和权限**
      • 4.下载安装Canal服务
      • 5.修改配置文件
      • 6.进入bin目录下启动
      • 7.idea中配置

Canal数据同步配置

canal可以用来监控数据库数据的变化,从而获得新增数据,或者修改的数据。
canal是应阿里巴巴存在杭州和美国的双机房部署,存在跨机房同步的业务需求而提出的。
阿里系公司开始逐步的尝试基于数据库的日志解析,获取增量变更进行同步,由此衍生出了增量订阅&消费的业务。

0.canal工作原理

在这里插入图片描述

原理相对比较简单:

canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
mysql master收到dump请求,开始推送binary log给slave(也就是canal)
canal解析binary log对象(原始为byte流)

1.检查binlog功能是否有开启

mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | OFF    |
+---------------+-------+
1 row in set (0.00 sec)

2.如果显示状态为OFF表示该功能未开启,开启binlog功能

1,修改 mysql 的配置文件 my.cnf
vi /etc/my.cnf    
这里使用的是docker安装的mysql,my.cnf已经挂载到了本机的 /mydata/mysql/conf/my.cnf
追加内容:
log-bin=mysql-bin     #binlog文件名
binlog_format=ROW     #选择row模式
server_id=1           #mysql实例id,不能和canal的slaveId重复2,重启 mysql:
service mysql restart	3,登录 mysql 客户端,查看 log_bin 变量
mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON|
+---------------+-------+
1 row in set (0.00 sec)
————————————————
如果显示状态为ON表示该功能已开启

3.在mysql里面添加以下的相关用户和权限

CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT SHOW VIEW, SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

使用docker操作,先启动mysql服务,启动mysql后,如果想进入mysql的命令行,执行如下命令

[root@jack conf]# docker exec -it mysql_3307 bash  //mysql_3307是我启动的mysql服务的name
root@96d65412b303:/# mysql -uroot -p密码

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-VRPMkUOz-1677753459490)(../.assets/images/image-20220511205849713.png)]

会在mysql的mysql数据库的user表中插入上图中的canal用户

4.下载安装Canal服务

下载地址:

https://github.com/alibaba/canal/releases

(1)下载之后,放到目录中,解压文件

[root@jack opt]# cd /opt/
[root@jack opt]# ls
canal.deployer-1.1.5.tar.gz  containerd  redis-5.0.7.tar.gz[root@jack opt]# tar -zxvf canal.deployer-1.1.5.tar.gz 

5.修改配置文件

vi conf/example/instance.properties
#需要改成自己的数据库信息
canal.instance.master.address=192.168.44.132:3306
#需要改成自己的数据库用户名与密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
#需要改成同步的数据库表规则,例如只是同步一下表
#canal.instance.filter.regex=.*\\..*
canal.instance.filter.regex=guli_ucenter.ucenter_member  #这个可以不设置

6.进入bin目录下启动

sh bin/startup.sh

7.idea中配置

1.引入依赖

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><!--mysql--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.26</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency><dependency><groupId>commons-dbutils</groupId><artifactId>commons-dbutils</artifactId><version>1.7</version></dependency><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.0.12</version></dependency></dependencies>

2.配置类

package jack.java.canal.client;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.dbutils.DbUtils;
import org.apache.commons.dbutils.QueryRunner;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import javax.sql.DataSource;
import java.net.InetSocketAddress;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;@Component
public class CanalClient {//sql队列private Queue<String> SQL_QUEUE = new ConcurrentLinkedQueue<>();@Resourceprivate DataSource dataSource;/*** canal入库方法*/public void run() {//写远程的数据库地址CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.137.10",
//        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("47.97.99.28",11111), "example", "", "");int batchSize = 1000;try {connector.connect();connector.subscribe(".*\\..*");connector.rollback();try {while (true) {//尝试从master那边拉去数据batchSize条记录,有多少取多少Message message = connector.getWithoutAck(batchSize);long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {Thread.sleep(1000);} else {dataHandle(message.getEntries());}connector.ack(batchId);//当队列里面堆积的sql大于一定数值的时候就模拟执行if (SQL_QUEUE.size() >= 1) {executeQueueSql();}}} catch (InterruptedException e) {e.printStackTrace();} catch (InvalidProtocolBufferException e) {e.printStackTrace();}} finally {connector.disconnect();}}/*** 模拟执行队列里面的sql语句*/public void executeQueueSql() {int size = SQL_QUEUE.size();for (int i = 0; i < size; i++) {String sql = SQL_QUEUE.poll();System.out.println("[sql]----> " + sql);this.execute(sql.toString());}}/*** 数据处理** @param entrys*/private void dataHandle(List<Entry> entrys) throws InvalidProtocolBufferException {for (Entry entry : entrys) {if (EntryType.ROWDATA == entry.getEntryType()) {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());EventType eventType = rowChange.getEventType();if (eventType == EventType.DELETE) {saveDeleteSql(entry);} else if (eventType == EventType.UPDATE) {saveUpdateSql(entry);} else if (eventType == EventType.INSERT) {saveInsertSql(entry);}}}}/*** 保存更新语句** @param entry*/private void saveUpdateSql(Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<RowData> rowDatasList = rowChange.getRowDatasList();for (RowData rowData : rowDatasList) {List<Column> newColumnList = rowData.getAfterColumnsList();StringBuffer sql = new StringBuffer("update " + entry.getHeader().getTableName() + " set ");for (int i = 0; i < newColumnList.size(); i++) {sql.append(" " + newColumnList.get(i).getName()+ " = '" + newColumnList.get(i).getValue() + "'");if (i != newColumnList.size() - 1) {sql.append(",");}}sql.append(" where ");List<Column> oldColumnList = rowData.getBeforeColumnsList();for (Column column : oldColumnList) {if (column.getIsKey()) {//暂时只支持单一主键sql.append(column.getName() + "=" + column.getValue());break;}}SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}/*** 保存删除语句** @param entry*/private void saveDeleteSql(Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<RowData> rowDatasList = rowChange.getRowDatasList();for (RowData rowData : rowDatasList) {List<Column> columnList = rowData.getBeforeColumnsList();StringBuffer sql = new StringBuffer("delete from " + entry.getHeader().getTableName() + " where ");for (Column column : columnList) {if (column.getIsKey()) {//暂时只支持单一主键sql.append(column.getName() + "=" + column.getValue());break;}}SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}/*** 保存插入语句** @param entry*/private void saveInsertSql(Entry entry) {try {RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());List<RowData> rowDatasList = rowChange.getRowDatasList();for (RowData rowData : rowDatasList) {List<Column> columnList = rowData.getAfterColumnsList();StringBuffer sql = new StringBuffer("insert into " + entry.getHeader().getTableName() + " (");for (int i = 0; i < columnList.size(); i++) {sql.append(columnList.get(i).getName());if (i != columnList.size() - 1) {sql.append(",");}}sql.append(") VALUES (");for (int i = 0; i < columnList.size(); i++) {sql.append("'" + columnList.get(i).getValue() + "'");if (i != columnList.size() - 1) {sql.append(",");}}sql.append(")");SQL_QUEUE.add(sql.toString());}} catch (InvalidProtocolBufferException e) {e.printStackTrace();}}/*** 入库* @param sql*/public void execute(String sql) {Connection con = null;try {if(null == sql) return;con = dataSource.getConnection();QueryRunner qr = new QueryRunner();int row = qr.execute(con, sql);System.out.println("update: "+ row);} catch (SQLException e) {e.printStackTrace();} finally {DbUtils.closeQuietly(con);}}
}

3.启动类

package jack.java.canal;import jack.java.canal.client.CanalClient;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;import javax.annotation.Resource;@SpringBootApplication
public class CanalApplication implements CommandLineRunner {@Resourceprivate CanalClient canalClient;public static void main(String[] args) {SpringApplication.run(CanalApplication.class, args);}@Overridepublic void run(String... strings) throws Exception {//项目启动,执行canal客户端监听canalClient.run();}
}

4.properties配置文件

# 服务端口
server.port=8100
# 服务名
spring.application.name=canal-client# mysql数据库连接
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/guli_edu?serverTimezone=GMT%2B8&useSSL=false
spring.datasource.username=root
spring.datasource.password=971107

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_76417.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Java接口专题

基本介绍 接口给出一些没有实现的方法&#xff0c;封装到一起&#xff0c;到某个类使用时再根据具体情况把这些方法写出来。 注意&#xff1a;在jdk7之前&#xff0c;接口里所有的方法都是抽象方法。在jdk8之后接口中可以有静态方法&#xff0c;默认方法 interface 接口名{/…

MySQL 数据库创建不了外键约束

在数据库的表里面创建不了外键约束❓❓❓ 没错&#xff0c;以我名侦探 q 的分析&#xff08;狗屁&#xff01;&#xff01;&#xff01;&#xff09;&#xff0c;真相只有一个❗❗❗ 那就是&#xff1a;你表的存储引擎非 InnoDB&#xff0c;外键约束只有存储引擎是 InnoDB 才…

flutter window安装过程

这里写自定义目录标题#下载相关官网地址&#xff1a;https://flutter.cn/docs/get-started/install/windows 根据官网下载相关包flutter_windows_3.7.5-stable.zip 解压到c盘&#xff0c;在path配置相关解压路径(c:\flutter)。 执行 where flutter dart &#xff0c;发现没有提…

APP测试面试题汇总(基础篇、进阶篇)

一、基础篇1、请介绍一下&#xff0c;APP测试流程&#xff1f;APP测试流程与web测试流程类似&#xff0c;分为如下七个阶段&#xff1a;1.根据需求说明书编写测试计划&#xff1b;2.制定测试方案&#xff0c;主要是测试任务、测试人员和测试时间的分配&#xff1b;3.测试准备&a…

离散事件动态系统

文章目录离散事件动态系统ppt离散事件系统建模离散事件动态系统的基本组成元素离散事件动态系统仿真具体建模petri建模实例离散事件动态系统 ppt ppt 仿真建模步骤 离散事件系统建模 from&#xff1a;离散事件系统建模 离散事件动态系统的基本组成元素 &#xff08;1&am…

【备战面试】每日10道面试题打卡-Day2

本篇总结的是Java基础知识相关的面试题&#xff0c;后续也会更新其他相关内容 文章目录1、 和 equals 的区别是什么&#xff1f;2、你重写过 hashcode 和 equals 吗&#xff0c;为什么重写equals时必须重写hashCode方法&#xff1f;3、为什么Java中只有值传递&#xff1f;4、BI…

工业机器人有哪些类型?如何利用工业网关集中监测管理?

工业机器人在制造业中的应用与日俱增&#xff0c;使用工业机器人&#xff0c;不仅提高了设备和场地的利用率&#xff0c;还能保持稳定的产品水平。随着工业机器人的大规模部署&#xff0c;对于数量众多、类型各异、功能不一的机器人的监测、管理和维护&#xff0c;也成为企业面…

如何提高软件测试效率 降低开发成本?

1、单元测试以开发人员为主 测试分工需根据测试人员的特点进行&#xff0c;而单元测试应以开发人员为主&#xff0c;以保障每个单元能够完成设计的功能。集成测试也可以以开发人员为主进行。当软件体系结构完成后&#xff0c;独立测试人员应尽量选择比较熟悉相关领域的人员。​…

Webpack-好文

webpack是一个前端资源加载/打包工具&#xff0c;会根据模块的依赖关系进行静态分析&#xff0c;然后将这些模块按照指定的规则生成对应的静态资源Webpack打包js文件创建一个文件夹&#xff0c;cmd进入到终端&#xff0c;运行npm install -g webpack webpack-cli安装webpack we…

原生微信小程序引入npm和安装Vant Weapp

目录一、引入npm安装Vant Weapp1、引入npm2、安装Vant Weapp3、修改 app.json4、修改 project.config.json二、构建npm一、引入npm安装Vant Weapp 环境&#xff1a;Windows10 开发工具&#xff1a;微信开发者工具 本地环境&#xff1a;已安装过node.js 1、引入npm cmd进入到你…

动态内存基础(一)

动态内存基础 ● 栈内存 堆内存 V.S. – 栈内存的特点&#xff1a;更好的局部性&#xff0c;对象自动销毁 – 堆内存的特点&#xff1a;运行期动态扩展&#xff0c;需要显式释放 ● 在 C 中通常使用 new 与 delete 来构造、销毁对象 int* fun() {int res;return &res; //…

|干货 | 五种常用类型之String字符串详解

一. 背景说明小白&#xff1a;哥&#xff0c;java中String是最常用类型&#xff0c;Redis中也是吗?哥&#xff1a;差不多&#xff0c;我给你稍微讲一下。二. 数据类型依据Redis官网&#xff0c;目前Redis数据类型共计九种。具体整理如下&#xff1a;常用的数据类型有&#xff…

Windows 11 安装 Docker Desktop

Windows 环境安装 WSL2 WSL 简介 WSL 全称是 Windows Subsystem for Linux &#xff0c;适用于 Linux 的 Windows 子系统&#xff0c;可让开发人员按原样运行 GNU/Linux 环境&#xff0c;包括大多数命令行工具、实用工具和应用程序&#xff0c;且不会产生传统虚拟机或双启动设…

九州云出席全球人工智能开发者先锋大会,圆桌论道开源未来

2月25日-26日&#xff0c;2023年全球人工智能开发者先锋大会&#xff08;GAIDC&#xff09;在临港成功召开。本届盛会以“向光而行的开发者”为主题&#xff0c;汇集政府职能部门领导、国内外知名专家学者、具有国际影响力的开源创业者&#xff0c;聚焦前瞻探索、开源开放、人才…

运动控制器PSO视觉飞拍与精准输出的C++开发(二):多轴PSO等距/周期输出

本文主要介绍正运动技术EtherCAT控制器在VS平台采用C语言实现的各种PSO功能。正运动提供多种PSO模式供用户搭配不同的场景使用。 上节讲解了采用TABLE寄存器存储的数据表触发比较&#xff0c;本节主要讲解矢量比较两种模式&#xff1a;等距周期比较输出&#xff0c;固定时间周…

python海龟绘图

一、基础 &#xff08;一&#xff09;介绍 海龟绘图&#xff08;Turtle Graphics&#xff09;&#xff1a;“小海龟”turtle是Python语言中一个很流行的绘制图像的函数库&#xff0c;想象一个小乌龟&#xff0c;在一个横轴为x、纵轴为y的坐标系原点&#xff0c;(0,0)位置开始…

JAVA版B2B2C商城源码多商户入驻商城

三勾商城多商户是开发友好的微信小程序商城&#xff0c;框架支持SAAS&#xff0c;支持发布 iOS Android 公众号 H5 各种小程序&#xff08;微信/支付宝/百度/头条/QQ/钉钉/淘宝&#xff09;等多个平台&#xff0c;不可多得的二开神器&#xff0c; 为大中小企业提供极致的移…

基于龙芯+国产FPGA 的VPX以太网交换板设计(一)

“棱镜门”的曝光&#xff0c;暴露出我国的信息安全存在极大的安全隐患&#xff0c;作为信息传输 载体的网络设备&#xff0c;其国产化需求迫切&#xff0c;国产处理器、国产可编程逻辑器件、以太网交 换芯片等具有良好的应用前景&#xff0c;另一方面&#xff0c;宽带数据业务…

“高退货率”标签引热议,亚马逊跨境电商是好是坏?

在多数卖家不知情的情况下&#xff0c;亚马逊“高退货率”标签上线&#xff0c;该消息已被官方证实&#xff0c;目的是为了践行以客户为中心的理念和推动卖家提升服务。 官方确认上线“高退货率”标签 近期&#xff0c;有亚马逊卖家发现产品详情页出现了“高退货率”标签&…

【项目实战】为什么我选择使用CloseableHttpClient,而不是HttpClient,他们俩有什么区别?

一、HttpClient介绍 HttpClient是Commons HttpClient的老版本&#xff0c;已被抛弃&#xff0c;不推荐使用&#xff1b; HttpClient是一个接口&#xff0c;定义了客户端HTTP协议的操作方法。 它可以用于发送HTTP请求和接收HTTP响应。 HttpClient接口提供了很多方法来定制请求…