Lachlan
发布于 2025-09-22 / 20 阅读
0

Canal 详解

感谢光老师的无私奉献~

参考资料:

  1. https://github.com/alibaba/canal Canal Github 项目

  2. https://gitee.com/mirrors/canal Canal Gitee 项目

1. 简述

canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。

1.1. 常见应用场景

基于日志增量订阅和消费的业务包括

  • 数据库镜像

  • 数据库实时备份

  • 索引构建和实时维护(拆分异构索引、倒排索引等)

  • 业务 cache 刷新

  • 带业务逻辑的增量数据处理

1.2. 工作原理

1.2.1. MySQL主备复制原理

  1. MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)

  2. MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)

  3. MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

1.2.2. canal 工作原理

  1. canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议

  2. MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )

  3. canal 解析 binary log 对象(原始为 byte 流)

1.3. 多语言

canal 特别设计了 client-server 模式,交互协议使用 protobuf 3.0 , client 端可采用不同语言实现不同的消费逻辑。

  1. canal java 客户端: https://github.com/alibaba/canal/wiki/ClientExample

  2. canal c# 客户端: https://github.com/dotnetcore/CanalSharp

  1. canal Python客户端:https://github.com/haozi3156666/canal-python

canal 作为 MySQL binlog 增量获取和解析工具,可将变更记录投递到 MQ 系统中,比如 Kafka/RocketMQ,可以借助于 MQ 的多语言能力。参考文档: Canal Kafka/RocketMQ QuickStart

1.4. 基于 canal 开发的工具

用Java写的 MySQL Binlog 解析工具,底层依赖了Canal。

canal2sql(基于binlog生成SQL) : [https://github.com/zhuchao941/canal2sql]

1.5. 相关开源&产品

canal 消费端开源项目: Otter https://github.com/alibaba/otter

2. 快速开始

  1. https://github.com/alibaba/canal/wiki/QuickStart 快速开始

  2. https://zhuanlan.zhihu.com/p/177001630 超详细canal入门,看这篇就够了

2.1. 搭建 Canal Server

2.1.1. 配置 MySQL
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
2.1.2. 创建 MySQL 用户
-- 使用命令登录:mysql -u root -p
-- 创建用户 用户名:canal 密码:Canal@123456
create user 'canal'@'%' identified by 'Canal@123456';
-- 授权 *.*表示所有库
grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%';
FLUSH PRIVILEGES;
2.1.3. 下载 Canal

下载 Canal Deployer。

wget https://github.com/alibaba/canal/releases/download/canal-1.1.8/canal.deployer-1.1.8.tar.gz

如果无法访问 Github,可以通过 GitHub 文件加速 下载。

2.1.4. 配置 Canal Server
  1. 创建目录:mkdir -p /opt/app/canal

  2. 上传文件;

  3. 解压:tar zxvf canal.deployer-1.1.8.tar.gz -C ./

  4. 修改配置文件:vi conf/example/instance.properties

## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306 
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 
#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal  
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*
2.1.5. 启动

sh /opt/app/canal/bin/startup.sh

2.1.6. 查看日志
  • less logs/canal/canal.log

2025-02-06 11:10:25.649 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2025-02-06 11:10:25.657 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2025-02-06 11:10:25.668 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2025-02-06 11:10:25.709 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.48.1(192.168.48.1):11111]
2025-02-06 11:10:27.977 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
  • less logs/example/example.log

2025-02-06 11:10:27.080 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2025-02-06 11:10:27.944 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
2025-02-06 11:10:27.945 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\.slave_.*$
2025-02-06 11:10:27.949 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2025-02-06 11:10:28.352 [destination = example , address = /42.194.239.141:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2025-02-06 11:10:28.352 [destination = example , address = /42.194.239.141:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status
2025-02-06 11:10:28.369 [destination = example , address = /42.194.239.141:3306 , EventParser] WARN  c.a.otter.canal.parse.inbound.mysql.MysqlConnection - load MySQL @@version_comment : MySQL Community Server - GPL
2025-02-06 11:10:29.520 [destination = example , address = /42.194.239.141:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000004,position=4,serverId=1,gtid=<null>,timestamp=1738808993000] cost : 1153ms , the next step is binlog dump
2025-02-06 11:10:29.546 [destination = example , address = /42.194.239.141:3306 , EventParser] WARN  c.a.otter.canal.parse.inbound.mysql.MysqlConnection - load MySQL @@version_comment : MySQL Community Server - GPL

2.2. 使用 Canal Client(Java)

  1. https://zhuanlan.zhihu.com/p/177001630 超详细canal入门,看这篇就够了

  2. https://blog.csdn.net/b_just/article/details/108344205 SpringBoot 整合canal 实现数据同步

  3. https://developer.aliyun.com/article/1492657 Spring Boot整合canal实现数据一致性解决方案解析-部署+实战

  4. https://github.com/alibaba/canal/wiki/ClientExample ClientExample

2.2.1. 依赖

canal.client、canal.protocol 1.1.8

<!--Canal Client - Java 客户端-->
<dependency>
  <groupId>com.alibaba.otter</groupId>
  <artifactId>canal.client</artifactId>
  <version>1.1.8</version>
</dependency>
<dependency>
  <groupId>com.alibaba.otter</groupId>
  <artifactId>canal.protocol</artifactId>
  <version>1.1.8</version>
</dependency>
2.2.2. Java 类
package com.leon.canal_demo.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;
import java.util.List;

@Component
public class CannalClient implements InitializingBean {

    private final static int BATCH_SIZE = 1000;
    // private final static int BATCH_SIZE = 4 * 1024;

    @Override
    public void afterPropertiesSet() {
        // 创建链接
        CanalConnector connector = CanalConnectors.newSingleConnector(
                new InetSocketAddress("42.194.239.141", 11111), "example", "", "");
        try {
            //打开连接
            connector.connect();
            //订阅数据库表,全部表
            connector.subscribe(".*\\..*");
            //回滚到未进行ack的地方,下次fetch的时候,可以从最后一个没有ack的地方开始拿
            connector.rollback();
            while (true) {
                // 获取指定数量的数据
                Message message = connector.getWithoutAck(BATCH_SIZE);
                //获取批量ID
                long batchId = message.getId();
                //获取批量的数量
                int size = message.getEntries().size();
                //如果没有数据
                if (batchId == -1 || size == 0) {
                    try {
                        //线程休眠2秒
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    //如果有数据,处理数据
                    printEntry(message.getEntries());
                }
                //进行 batch id 的确认。确认之后,小于等于此 batchId 的 Message 都会被确认。
                connector.ack(batchId);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            connector.disconnect();
        }
    }

    /**
     * 打印canal server解析binlog获得的实体类信息
     */
    private static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
                    || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                //开启/关闭事务的实体类型,跳过
                continue;
            }
            //RowChange对象,包含了一行数据变化的所有特征
            //比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等
            RowChange rowChage;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
            }
            //获取操作类型:insert/update/delete类型
            EventType eventType = rowChage.getEventType();
            //打印Header信息
            System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));
            //判断是否是DDL语句
            if (rowChage.getIsDdl()) {
                System.out.println("================》;isDdl: true,sql:" + rowChage.getSql());
            }
            //获取RowChange对象里的每一行数据,打印出来
            for (RowData rowData : rowChage.getRowDatasList()) {
                //如果是删除语句
                if (eventType == EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                    //如果是新增语句
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                    //如果是更新的语句
                } else {
                    //变更前的数据
                    System.out.println("------->; before");
                    printColumn(rowData.getBeforeColumnsList());
                    //变更后的数据
                    System.out.println("------->; after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }

}
2.2.3. 启动测试

3. Canal 结合 MQ 使用的形态

  1. https://zhuanlan.zhihu.com/p/186035586 Canal+Kafka实现MySQL与Redis数据同步

  2. https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart Canal Kafka RocketMQ QuickStart

一句话总结:

架构:MySQL + Canal + MQ + DS(ES、Redis、HBase);

数据流向:CanalServer 配置 canal.serverMode = kafka,启动后拉取 MySQL 的 binlog 处理,直接将处理好的数据发送到 MQ 中。然后再用 MQ 的消费者(例如 Kafka Java 客户端的消费者组消费),将数据推到数据源中。

MQ 的作用:限流削峰、业务解耦。

4. 基于 canal 开发的工具 canal2sql

用Java写的 MySQL Binlog 解析工具,底层依赖了Canal。

canal2sql(基于binlog生成SQL) : [https://github.com/zhuchao941/canal2sql]

5. 应用场景

5.1. 数据库缓存双写一致性

  1. https://zhuanlan.zhihu.com/p/186035586 Canal+Kafka实现MySQL与Redis数据同步

  2. https://blog.csdn.net/qq_42889280/article/details/123995169 Canal + RocketMQ 同步 MySQL 数据到 Redis 实战

  3. https://developer.aliyun.com/article/770659 详细讲解!Canal+Kafka实现MySQL与Redis数据同步

  4. https://blog.csdn.net/mr_yaodadong/article/details/141018880 如何实现Redis和Mysql中数据双写一致性

一句话总结:

  1. 缓存策略 Cache Aside:采用读数据库写入缓存,修改数据库(删改)删除缓存;

  2. 架构:MySQL + Canal(Server + Client),根据业务场景看是否需要引入 MQ;

  3. 业务流程:Canal Server 读取 MySQL binlog,Canal Client 解析 binlog,如果是修改操作(删改),则操作缓存数据库(一般是 Redis)删除对应的记录(或者 Key)。

  1. 由于有删除失败的风险,所有需要将删除失败的记录存放起来进行重试

  1. 简单版是存放在一个集合中;

  2. 复杂版是将失败记录发到 MQ 中;

代码参考 canal-demo

5.1.1. Cache Aside 的实现
  • 读数据时,先读缓存,如果有就返回。没有再读数据源,将数据放到缓存

  • 写数据时,先写数据源,然后让缓存失效

5.1.2. 异步更新缓存(基于订阅binlog的同步机制)

5.1.2.1. 技术整体思路

MySQL binlog增量订阅消费+消息队列+增量数据更新到redis

1)读Redis:热数据基本都在Redis

2)写MySQL: 增删改都是操作MySQL

3)更新Redis数据:MySQ的数据操作binlog,来更新到Redis

5.1.2.2. Redis更新
  1. 数据操作主要分为两大块:

  • 一个是全量(将全部数据一次写入到redis)

  • 一个是增量(实时更新)

这里说的是增量,指的是mysql的update、insert、delate变更数据。

  1. 读取binlog后分析 ,利用消息队列,推送更新各台的redis缓存数据

这样一旦MySQL中产生了新的写入、更新、删除等操作,就可以把binlog相关的消息推送至Redis,Redis再根据binlog中的记录,对Redis进行更新。

其实这种机制,很类似MySQL的主从备份机制,因为MySQL的主备也是通过binlog来实现的数据一致性。

这里可以结合使用canal(阿里的一款开源框架),通过该框架可以对MySQL的binlog进行订阅,而canal正是模仿了mysql的slave数据库的备份请求,使得Redis的数据更新达到了相同的效果。

当然,这里的消息推送工具你也可以采用别的第三方:kafka、rabbitMQ等来实现推送更新Redis。