2.canal服务器配置及java客户端

news/2024/5/3 2:48:05/文章来源:https://blog.csdn.net/PacosonSWJTU/article/details/126903380

【README】

1.本文总结自 B站《尚硅谷-canal》;

2.canal 介绍,可以参考 GitHub - alibaba/canal: 阿里巴巴 MySQL binlog 增量订阅&消费组件

3. canal服务器配置包括 mysql配置,canal配置等;

4.mysql服务器,canal服务器,canal客户端架构如下


 【1】mysql binlog日志

【1.1】定义

1)binglog日志:mysql master节点可以开启biglog日志记录功能,开启后每次向mysql服务端发送写操作命令,会把命令记录在一种特殊的文件中这个特殊的文件称为biglog日志

2)biglog日志作用:若服务器异常退出,借用binlog可以恢复数据!

3)二进制日志包括两类文件:

  • 二进制日志索引文件(文件名后缀为.index)用于记录所有的二进制文件;
  • 二进制日志文件(文件名后缀为.00000*)记录数据库所有的 DDL 和 DML(除了数据查询语句)语句事件;

4)查看日志文件及日志索引文件

[root@centos201 ~]# vim /etc/my.cnf// 内容如下;
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.socklog-error=/var/log/mysqld.log
pid-file=/var/run/mysqld/mysqld.pid
bind-address=192.168.163.201
log-bin=mysql-bin
binlog_format=row
binlog-do-db=trcanal

显然日志文件目录为: /var/lib/mysql

4.1)打开 /var/lib/mysql


 【1.2】binlog日志分类

1)binlog日志有三种:

  • statement:语句级;
  • mixed:混合;
  • row:行级;

在配置文件中可以选择配置  binlog_format= statement | mixed | row


【1.2.1】statement-语句级binlog日志

statement-语句级:binlog 会记录每次执行写操作的语句。相对 row 模式节省空间,但是可能产生不一致性,比如“update tt set create_date=now()”,如果用 binlog 日志进行恢复,由于执行时间不同可能产生的数据就不同。

  • ① 优点:节省空间。
  • ② 缺点:有可能造成数据不一致

【1.2.2】row-行级binlog日志 (推荐作为日志增量解析的binlog日志类型)

行级, binlog 会记录每次操作后每行记录的变化。

  • ① 优点:保持数据的绝对一致性。因为不管 sql 是什么,引用了什么函数,他只记录执行后的效果。
  • ② 缺点:占用较大空间。

【1.2.3】mixed-混合级binlog日志

statement 的升级版,一定程度上解决了,因为一些情况而造成的 statement模式不一致问题,默认还是 statement,在某些情况下譬如:当函数中包含 UUID() 时;包含AUTO_INCREMENT 字段的表被更新时;执行 INSERT DELAYED 语句时;用 UDF 时;会按照ROW 的方式进行处理:

  • ① 优点:节省空间,同时兼顾了一定的一致性。
  • ② 缺点:还有些极个别情况依旧会造成不一致,另外 statement 和 mixed 对于需要对binlog 的监控的情况都不方便。

小结:综合上面对比,Canal 想做监控分析,选择 row 格式比较合适


【2】canal服务器配置

【2.1】mysql 开启binlog日志

1)进入 mysql服务器的配置文件  /etc/my.cnf

打开binlog日志开关,并设置日志类型为 row,如下:

[mysqld]
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.socklog-error=/var/log/mysqld.log
pid-file=/var/run/mysqld/mysqld.pid
bind-address=192.168.163.201 // 绑定的本机的ip地址(对外)
log-bin=mysql-bin // 开启binlog日志 
binlog_format=row // 设置日志级别 
binlog-do-db=trcanal // 设置插入binlog日志的数据库,如果不设置,则所有数据库都要插入binlog日志 

2)如何证明mysql开启 binlog是否成功?

[root@centos201 mysql]# pwd
/var/lib/mysql

 3)创建用户:专门用于抽取日志的 用户canal,并赋权,如下(当然了,可以使用root):

mysql> set global validate_password.length=4;
mysql> set global validate_password.policy=0;
mysql> CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; 

4)重启mysql服务生效  

sudo systemctl restart mysqld


【2.2】canal 服务器配置

1)下载 canal

到  Releases · alibaba/canal · GitHub

2)解压 canal 压缩包,目录结构如下:

[root@centos201 software]# ll canal/
total 104648
drwxr-xr-x. 2 root root        93 Sep 17 13:40 bin
-rwxr-xr-x. 1 root root 107152758 Sep 17 02:42 canal.deployer-1.1.6.tar.gz
drwxr-xr-x. 5 root root       123 Sep 17 13:06 conf
drwxr-xr-x. 2 root root      4096 Sep 17 10:43 lib
drwxrwxrwx. 4 root root        34 Sep 17 12:50 logs
drwxrwxrwx. 2 root root       235 Aug 11 10:52 plugin

3)修改 canal/conf/canal.properties

canal服务器端口号默认为11111,服务器模式设置为tcp;(用于java客户端连接)

 4)修改 example/instance.properties 文件

  • example是canal服务器的一个实例;
  • canal服务器可以有多个实例,如example2;在 conf下新建文件夹 example2 即可
[root@centos201 example]# pwd
/usr/software/canal/conf/example
[root@centos201 example]# vim instance.properties 

canal服务器的example实例 属性修改如下:

5)启动canal服务器,模式为tcp ;接收socket客户端连接,如java的socket客户端;

  • 服务器地址: 192.168.163.201;
  • 端口:  11111 ;
[root@centos201 canal]# bin/startup.sh 

  【小结】

canal服务器配置修改完成,canal服务器启动成功; 


【3】java版的canal客户端

1)创建maven项目,引入 canal 依赖;

<dependencies><dependency><groupId>com.alibaba.otter</groupId><artifactId>canal.client</artifactId><version>1.1.2</version></dependency></dependencies>

2)java客户端:

/*** @Description canal客户端* @author xiao tang* @version 1.0.0* @createTime 2022年09月17日*/
public class MyCanalClient {public static void main(String[] args) throws Exception {// 获取canal服务的连接CanalConnector canalConnector =CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.163.201", 11111), "example", "", "");// 尝试读取服务端是否有新数据while (true) {// 连接canalConnector.connect();// 订阅数据库,监控数据库 trcanal所有表canalConnector.subscribe("trcanal.*");// 获取数据,每次获取100条Message message = canalConnector.get(100);// 获取 Entry 集合List<CanalEntry.Entry> entries = message.getEntries();// 判断集合是否为空,如果为空,则等待继续拉取if (entries == null || entries.isEmpty()) {System.out.println("没有数据,休息3s");Thread.sleep(5000);continue;}// 遍历 entries 单条解析for (CanalEntry.Entry entry : entries) {// 获取表名String tableName = entry.getHeader().getTableName();// 获取类型CanalEntry.EntryType entryType = entry.getEntryType();// 获取序列化后的数据ByteString storeValue = entry.getStoreValue();// 判断当前entryType类型是是否为 RowData 类型if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {// 反序列化数据CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);// 获取当前事件的操作类型CanalEntry.EventType eventType = rowChange.getEventType();// 获取数据集 List<CanalEntry.RowData> rowDatasList = rowChange.getRowDatasList();// 遍历并打印数据集for (CanalEntry.RowData rowData : rowDatasList) {// 获取修改前的数据JSONObject beforeData = new JSONObject();List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();for (CanalEntry.Column column : beforeColumnsList) {beforeData.put(column.getName(), column.getValue());}// 获取修改后的数据JSONObject afterData = new JSONObject();List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();for (CanalEntry.Column column : afterColumnsList) {afterData.put(column.getName(), column.getValue());}// 打印System.out.println("table = " + tableName + ", eventType=" + eventType + " before= " + beforeData + "after " + afterData);}}}}}
}

3)运行结果:

3.1)插入数据,触发binlog日志;

INSERT INTO trcanal.user_inf_tbl (id, name, sex) VALUES 
('20220917_0026', 'zhangsan0026', 'male26')
;

3.2)canal客户端打印日志:

table = user_inf_tbl, eventType=INSERT before= {}after {"sex":"male26","name":"zhangsan0026","id":"20220917_0026"}

【4】 canal服务器运维(如canal抽数失败):

1)查看日志

以 example 实例为例,查看它的运行日志,如下(example.log):

[root@centos201 example]# pwd
/usr/software/canal/logs/example
[root@centos201 example]# ll
total 1796
-rw-r--r--. 1 root root 1797729 Sep 17 17:13 example.log
-rw-r--r--. 1 root root    1399 Sep 17 17:10 meta.log

2)查看 example.log

Caused by: java.io.IOException: handshake exception:
ErrorPacket [errorNumber=1129, fieldCount=-1, message=192.168.163.201' is blocked because of many connection errors; unblock with 'mysqladmin flush-hosts', sqlState=ost ', sqlStateMarker=H]at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.negotiate(MysqlConnector.java:168) ~[canal.parse.driver-1.1.6.jar:na]at com.alibaba.otter.canal.parse.driver.mysql.MysqlConnector.connect(MysqlConnector.java:82) ~[canal.parse.driver-1.1.6.jar:na]... 4 common frames omitted

message=192.168.163.201' is blocked because of many connection errors; unblock with 'mysqladmin flush-hosts' 

【日志解释】

  • 大致说 192.168.163.201 被阻塞了,需要执行 mysqladmin flush-hosts 接触阻塞;
  • 在mysql上执行 FLUSH HOSTS; 即可。

 参考自: mysql - How to unblock with mysqladmin flush hosts - Stack Overflow


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.luyixian.cn/news_show_9298.aspx

如若内容造成侵权/违法违规/事实不符,请联系dt猫网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

完美且简要,如此输出风控中的重要数据指标曲线(如KS等)

先前&#xff0c;我们用excel给大家演示过一个KS的计算方式。 ks值是在模型中用于区分预测正负样本分隔程度的评价指标。每个样本的预测结果化对应的一个个分数&#xff0c;从最低分到最高分&#xff0c;输出为正负样本的累积分布。Ks值为这个两个正负样本中&#xff0c;最大差…

听吧音乐项目测试

听吧音乐项目 听吧音乐测试1.项目背景2.需求分析2.1 用户需求2.2 软件需求3. 测试点分析及测试用例4. 自动化测试代码4.1 注册登录注销模块自动化测试代码4.2 专辑播放自动化测试代码5. 测试报告1.项目背景 听吧音乐是一个在线听歌网站&#xff0c;游客通过首页可以在线收听其…

WinUI 3 踩坑记:第一个窗口

本文是 WinUI 3 踩坑记 的一部分,该系列发布于 GitHub@Scighost/WinUI3Keng,文中的代码也在此仓库中,若内容出现冲突以 GitHub 上的为准。WinUI 3 应用的入口和 UWP 类似,也是继承自 Application 的一个类,略有不同的是没有 UWP 那么多的启动方式可供重写,只有一个 OnLau…

python计算离散积分

前言 本文是傅立叶及其python应用系列的第一篇文章对应的仓库地址为https://github.com/yuanzhoulvpi2017/tiny_python/tree/main/Fourier_Series 介绍 本篇文章将要介绍一个非常小众的scipy函数&#xff1a;simpson. 这个函数的一大功能就是可以对离散数据积分。之所以要介…

P39 事件处理

P39 事件处理1.事件模型的流程2.事件监听器2.1 动作监听器&#xff08;ActionListener&#xff09;2.2 焦点监听器&#xff08;FocusListener&#xff09;2.3 鼠标监听器&#xff08;MouseListener&#xff09;2.4 鼠标移动/拖动监听器&#xff08;MouseMotionListener&#xf…

SpringAOP的概述与实现

目录 SpringAOP的概述 什么是AOP AOP能干什么 AOP的特点 AOP底层实现 AOP基本概念 连接点 切入点 通知 切面 目标对象 织入 引入 谈谈你对AOP的理解&#xff1f; SpringAOP的实现 依赖引用 spring.xml配置 注解实现 1.定义切面 设置通知 2.开启aop 3.测试 …

金仓数据库KingbaseES客户端编程开发框架-MyBatis(2. 概述 3. MyBatis配置说明)

2. 概述 MyBatis 是一款优秀的持久层框架&#xff0c;它支持定制化 SQL、存储过程以及高级映射。MyBatis 避免了几乎所有的 JDBC 代码和手动设置参数以及获取结果集。MyBatis 可以使用简单的 XML 或注解来配置和映射原生信息&#xff0c; 将接口和 Java 的 POJOs(Plain Old Ja…

Sourcemap 配置详解

前言 之前在脚手架工具内整合将sourcemap上传sentry能力的时候&#xff0c;考虑bundle分割对.map文件关联的限制&#xff1a;比如TerserWebpackPlugin&#xff08;JS压缩&#xff09;只对 devtool 选项的 source-map&#xff0c;inline-source-map&#xff0c;hidden-source-m…

后端研发工程师面经——JAVA语言

文章目录2. JAVA语言2.1 面向对象的三大特性2.2 JAVA异常2.2.1 异常产生的原因2.2.2 异常的分类2.2.3 异常的处理方式2.3 序列化和反序列化2.3.1 概念2.3.2 JAVA中的序列化和反序列化2.3.3 序列化和反序列化的接口2.3.4 Serialization接口详解2.3.5 Externalizable接口详解2.3.…

3D建模师做多了女人会不会找不到老婆?次世代美少女战士建模流程讲解

什么是次世代&#xff1f; 次世代是个舶来语&#xff0c;“次世代游戏”指代和同类游戏相比下更加先进的游戏&#xff0c;即“下一代游戏”。 次世代是利用高模烘焙的法线贴图回帖到低模上&#xff0c;让低模在游戏引擎里可以及时显示高模的视觉效果。模型面数比较高&#xf…

算法 - 计数排序(Counting_Sort)

目录 引言&#xff1a; 学习&#xff1a; 什么是计数排序&#xff08;Counting_sort&#xff09;&#xff1f; 定义&#xff1a; 算法思想&#xff1a; 排序过程&#xff1a; Step 1 &#xff1a; Step 2 &#xff1a; Step 3 &#xff1a; Step 4 : Step 5 &…

单片机项目式实训总篇

采取新方法&#xff0c;让自己尽快变强&#xff0c;为了更好的再次见面。停止大脑内斗。 总学习目标&#xff1a;&#xff08;完成后此文字支持跳转&#xff09; 基础知识 端口操作 显示 高级输入 时间控制 综合 Flag: 一周破解C51程序 学习内容&#xff1a; 了解单片…

DeepExploit——当Metasploit遇上机器学习

Metasploit Meets Machine Learning 文章目录Metasploit Meets Machine Learning1. Metasploit准备1.1 与外部项目的合作1.1.1 启用RPC API1.1.2 使用RPC API操作Metasploit2. 创建机器学习模型2.1 DQN2.2 A3C2.2.1 CartPole2.2.2 分布式学习机制3. 深度利用3.1 代理任务3.2 当…

JVM——GC垃圾回收机制

文章目录JVM——GC垃圾回收机制一、如何判断哪些对象应该被回收——对象判活算法引用计数算法可达性分析算法引用最终判定二、对象应该怎么被回收——垃圾回收算法分代收集理论标记-清除算法标记-复制算法标记-整理算法三、内存对象什么时候被回收——触发条件年轻代GC(Minor G…

如期而至的SVN服务器迁移引来一个大瓜XAMPP

文章目录前言方案评估前奏XAMMP搭建svn服务准备软件包安装必要环境和工具安装xampp运行xampp编辑xampp访问xampp安装subversion安装svnmanager创建svn仓库目录修改配置文件为svnmanager创建MySQL用户重启xammp服务访问svnmanager登录svnmanager可能遇到的错误查看服务器目录信息…

10 nginx 中的 slab

前言 这里主要是描述 nginx 中的 slab 内存分配相关 slab 在很多的地方都有使用, 比如 linux, nginx, netty 等等 主要的作用是 内存管理, 复用 简略 nginx 中的 slab 的流程 # slab relatedvoid* poolPtr malloc(2048);ngx_slab_pool_t *pool (ngx_slab_pool_t *)poo…

Pytorch深度学习——线性回归实现 04(未完)

文章目录1 问题假设2 步骤3 学习使用Pytorch的API来搭建模型3.1 nn.Model3.2 优化器类3.3 评估模式和训练模式3.4 使用GPUdata和item的区别1 问题假设 假设我们的基础模型就是y wxb&#xff0c;其中w和b均为参数&#xff0c;我们使用y 3x0.8来构造数据x、y,所以最后通过模型…

0.django部署(基础知识)

我们前面的代码都是在我们自己的电脑&#xff08;通常是Windows操作系统&#xff09;上面运行的&#xff0c;因为我们还处于开发过程中。 当我们完成一个阶段的开发任务后&#xff0c;就需要把我们开发的网站服务&#xff0c;给真正的用户使用了。 那就需要我们的 网站 部署在…

【二次分配问题】基于遗传算法 (GA)、粒子群优化 (PSO) 和萤火虫算法 (FA) 求解二次分配( QAP)问题(MATLAB 实现)

目录 1 概述 3 Matlab代码及文章阅读 4 运行结果 4.1 萤火虫算法 4.2 粒子群优化算法 4.3 遗传算法 5 参考文献 1 概述 目前&#xff0c;该问题已经得到深入的研究&#xff0c;进化策略(evolutionstrategies)、遗传算法(genetic algorithms)、遗传规划(geneticprogramm…

警惕利用「以太坊合并」的 3 种骗局

原文作者&#xff1a;茉莉 距离以太坊合并还有不到 6 小时&#xff0c;这条被视作下一代互联网 Web3.0 底层基础设施的区块链网络将彻底改变共识机制&#xff0c;从工作量证明的 PoW 机制转向权益证明的 PoS。 在合并即将到来前&#xff0c;去中心化安全网络市场 PolySwarm 创…