canal

什么是canal

canal是用java开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前,canal主要支持了MySQL的binlog解析,解析完成后才利用canal client 用来处理获得的相关数据。说白了,canal 就是一个同步增量数据的工具

canal过binlog同步拿到变更数据再发送到存储目的地,比如MySQLKafkaElasticSearch等多源同步。

canal使用场景

  • 场景1:原始场景, 阿里otter中间件的一部分

  • 场景2:更新缓存

  • 场景3:抓取业务数据新增变化表,用于制作拉链表。( 拉链表:记录每条信息的生命周期,一旦一条记录的生命周期结束,就要重新开始一条新的记录,并把当前日期放入生效的开始日期 )

  • 场景4:抓取业务表的新增变化数据,用于制作实时统计。

canal运行原理

复制过程分成三步:

  • Master主库将改变记录,写到二进制日志(binary log)中

  • Slave从库向mysql master发送dump协议,将master主库的binary log events拷贝到它的中继日志(relay log)

  • Slave从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库

canal的工作原理很简单,就是把自己伪装成slave假装从master复制数据

Spring Boot集成Canal

  1. 首先,在pom.xml文件中添加Canal和RocketMQ的依赖:

    <!-- Canal依赖 -->
    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client</artifactId>
        <version>1.1.5</version>
    </dependency>
    ​
    <!-- RocketMQ依赖 -->
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.0.2</version>
    </dependency>
  2. application.propertiesapplication.yml中进行Canal和RocketMQ的配置。示例配置如下:

    # Canal配置
    canal.server.host=127.0.0.1
    canal.server.port=11111
    canal.instance.destination=my_instance
    canal.instance.username=username
    canal.instance.password=password
    ​
    # RocketMQ配置
    rocketmq.name-server=127.0.0.1:9876
    rocketmq.consumer.group=my_consumer_group
    rocketmq.consumer.topic=my_topic

    在这个示例配置中,我们指定了Canal服务器的主机和端口,以及Canal实例的目标、用户名和密码。同时,我们也指定了RocketMQ的Name Server地址、Consumer的组名和Topic名称。

  3. 创建一个Canal的监听器类,用于监听Canal收到的数据库变更事件,并将其发送到RocketMQ。示例代码如下:

    import com.alibaba.otter.canal.protocol.CanalEntry;
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    ​
    @Component
    public class CanalListener {
    ​
        private final RocketMQTemplate rocketMQTemplate;
    ​
        @Autowired
        public CanalListener(RocketMQTemplate rocketMQTemplate) {
            this.rocketMQTemplate = rocketMQTemplate;
        }
    ​
        public void handleMessage(CanalEntry.RowChange rowChange) {
            // 处理数据库变更事件,将其发送到RocketMQ
            // 示例代码省略...
        }
    }

    在这个示例中,我们创建了一个名为CanalListener的监听器类,通过RocketMQTemplate将数据库变更事件发送到RocketMQ。

  4. 创建一个Canal的客户端类,用于连接到Canal服务器,并注册我们定义的监听器。示例代码如下:

    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.protocol.Message;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    ​
    @Component
    public class CanalClient {
    ​
        private final CanalListener canalListener;
    ​
        @Autowired
        public CanalClient(CanalListener canalListener) {
            this.canalListener = canalListener;
        }
    ​
        public void start() {
            CanalConnector connector = CanalConnectors.newSingleConnector(
                "127.0.0.1", 11111, "username", "password");
    ​
            connector.connect();
            connector.subscribe("my_instance.*");
    ​
            while (true) {
                Message message = connector.getWithoutAck(100);
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId != -1 && size > 0) {
                    // 处理Canal收到的数据变更事件
                    message.getEntries().forEach(entry -> {
                        if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) {
                            try {
                                CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                                canalListener.handleMessage(rowChange);
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    });
    ​
                    connector.ack(batchId);
                }
            }
        }
    }

    在这个示例中,我们创建了一个名为CanalClient的客户端类,通过Canal连接器连接到Canal服务器,并注册了我们定义的监听器。在一个循环中,我们从Canal服务器获取数据变更事件,并调用监听器的handleMessage方法处理这些事件。

  5. 创建一个RocketMQ的消息消费者类,用于消费从Canal接收到的数据变更事件。示例代码如下:

    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.springframework.stereotype.Component;
    ​
    @Component
    @RocketMQMessageListener(topic = "my_topic", consumerGroup = "my_consumer_group")
    public class RocketMQConsumer implements RocketMQListener<String> {
    ​
        @Override
        public void onMessage(String message) {
            // 处理从RocketMQ接收到的消息
            // 示例代码省略...
        }
    }

    在这个示例中,我们创建了一个名为RocketMQConsumer的消息消费者类,通过@RocketMQMessageListener注解指定了消费的Topic和Consumer Group,并实现了RocketMQListener接口来处理从RocketMQ接收到的消息。

  6. 在Spring Boot的启动类中,创建一个启动方法来启动Canal客户端。示例代码如下:

    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    ​
    @SpringBootApplication
    public class Application {
    ​
        public static void main(String[] args) {
            SpringApplication.run(Application.class, args);
        }
    ​
        @Bean
        public CanalClient canalClient(CanalListener canalListener) {
            CanalClient canalClient = new CanalClient(canalListener);
            canalClient.start();
            return canalClient;
        }
    }

    在这个示例中,我们创建了一个名为canalClient的Bean,启动Canal客户端。

通过以上步骤,您已经成功配置了Canal将数据同步到MQ,并使用Spring Boot进行消费。Canal通过监听数据库变更事件并发送到RocketMQ,Spring Boot应用通过RocketMQ的消费者接收并处理这些事件。