Canal 原理、使用
canal
什么是canal
canal是用java开发的基于数据库增量日志解析,提供增量数据订阅&消费的中间件。目前,canal主要支持了MySQL的binlog解析,解析完成后才利用canal client 用来处理获得的相关数据。说白了,canal 就是一个同步增量数据的工具。
canal通过binlog同步拿到变更数据,再发送到存储目的地,比如MySQL,Kafka,ElasticSearch等多源同步。
canal使用场景
场景1:原始场景, 阿里otter中间件的一部分
场景2:更新缓存
场景3:抓取业务数据新增变化表,用于制作拉链表。( 拉链表:记录每条信息的生命周期,一旦一条记录的生命周期结束,就要重新开始一条新的记录,并把当前日期放入生效的开始日期 )
场景4:抓取业务表的新增变化数据,用于制作实时统计。
canal运行原理
复制过程分成三步:
Master主库将改变记录,写到二进制日志(binary log)中
Slave从库向mysql master发送dump协议,将master主库的binary log events拷贝到它的中继日志(relay log);
Slave从库读取并重做中继日志中的事件,将改变的数据同步到自己的数据库。
canal的工作原理很简单,就是把自己伪装成slave,假装从master复制数据。
Spring Boot集成Canal
首先,在
pom.xml
文件中添加Canal和RocketMQ的依赖:<!-- Canal依赖 --> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.5</version> </dependency> <!-- RocketMQ依赖 --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.2</version> </dependency>
在
application.properties
或application.yml
中进行Canal和RocketMQ的配置。示例配置如下:# Canal配置 canal.server.host=127.0.0.1 canal.server.port=11111 canal.instance.destination=my_instance canal.instance.username=username canal.instance.password=password # RocketMQ配置 rocketmq.name-server=127.0.0.1:9876 rocketmq.consumer.group=my_consumer_group rocketmq.consumer.topic=my_topic
在这个示例配置中,我们指定了Canal服务器的主机和端口,以及Canal实例的目标、用户名和密码。同时,我们也指定了RocketMQ的Name Server地址、Consumer的组名和Topic名称。
创建一个Canal的监听器类,用于监听Canal收到的数据库变更事件,并将其发送到RocketMQ。示例代码如下:
import com.alibaba.otter.canal.protocol.CanalEntry; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class CanalListener { private final RocketMQTemplate rocketMQTemplate; @Autowired public CanalListener(RocketMQTemplate rocketMQTemplate) { this.rocketMQTemplate = rocketMQTemplate; } public void handleMessage(CanalEntry.RowChange rowChange) { // 处理数据库变更事件,将其发送到RocketMQ // 示例代码省略... } }
在这个示例中,我们创建了一个名为
CanalListener
的监听器类,通过RocketMQTemplate
将数据库变更事件发送到RocketMQ。创建一个Canal的客户端类,用于连接到Canal服务器,并注册我们定义的监听器。示例代码如下:
import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.Message; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class CanalClient { private final CanalListener canalListener; @Autowired public CanalClient(CanalListener canalListener) { this.canalListener = canalListener; } public void start() { CanalConnector connector = CanalConnectors.newSingleConnector( "127.0.0.1", 11111, "username", "password"); connector.connect(); connector.subscribe("my_instance.*"); while (true) { Message message = connector.getWithoutAck(100); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId != -1 && size > 0) { // 处理Canal收到的数据变更事件 message.getEntries().forEach(entry -> { if (entry.getEntryType() == CanalEntry.EntryType.ROWDATA) { try { CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); canalListener.handleMessage(rowChange); } catch (Exception e) { e.printStackTrace(); } } }); connector.ack(batchId); } } } }
在这个示例中,我们创建了一个名为
CanalClient
的客户端类,通过Canal连接器连接到Canal服务器,并注册了我们定义的监听器。在一个循环中,我们从Canal服务器获取数据变更事件,并调用监听器的handleMessage
方法处理这些事件。创建一个RocketMQ的消息消费者类,用于消费从Canal接收到的数据变更事件。示例代码如下:
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; @Component @RocketMQMessageListener(topic = "my_topic", consumerGroup = "my_consumer_group") public class RocketMQConsumer implements RocketMQListener<String> { @Override public void onMessage(String message) { // 处理从RocketMQ接收到的消息 // 示例代码省略... } }
在这个示例中,我们创建了一个名为
RocketMQConsumer
的消息消费者类,通过@RocketMQMessageListener
注解指定了消费的Topic和Consumer Group,并实现了RocketMQListener
接口来处理从RocketMQ接收到的消息。在Spring Boot的启动类中,创建一个启动方法来启动Canal客户端。示例代码如下:
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } @Bean public CanalClient canalClient(CanalListener canalListener) { CanalClient canalClient = new CanalClient(canalListener); canalClient.start(); return canalClient; } }
在这个示例中,我们创建了一个名为
canalClient
的Bean,启动Canal客户端。
通过以上步骤,您已经成功配置了Canal将数据同步到MQ,并使用Spring Boot进行消费。Canal通过监听数据库变更事件并发送到RocketMQ,Spring Boot应用通过RocketMQ的消费者接收并处理这些事件。
- 感谢你赐予我前进的力量