1.借助Canal实现MySQL数据库间链接canal链接mysql
2.Canal-adapter1.1.4集成Elasticsearch7.8.0排坑指南及在本地环境运行canal-adapter项目
3.mysql+canal+adapter+es实现数据同步
借助Canal实现MySQL数据库间链接canal链接mysql
借助Canal实现MySQL数据库间链接
在现代化的源译应用开发中,不同的码编系统或应用之间可能需要共享同一个数据库,这时候就需要实现数据库间的源译链接。本文将介绍如何借助Canal实现MySQL数据库间链接。码编Canal是源译阿里巴巴开源的一款数据库增量订阅和消费组件,支持MySQL、码编tcpudpsocket源码Oracle等数据库,源译它可以将数据库更新的码编数据通过可靠的方式同步到其他数据存储、NoSQL等系统中。源译
一、码编Canal介绍
Canal是源译阿里巴巴开源的一款数据库增量订阅和消费组件,是码编基于MySQL数据库增量日志构建的,从而实现了与数据源(如MySQL)解耦,源译达到了异构神异的码编目的。Canal主要包括三个模块: Canal.Admin、源译Canal.Server和Canal.Client。
Canal.Admin: Canal控制台管理界面,用于管理Canal的启停和监控
Canal.Server: Canal的工作服务端,负责从数据源(如MySQL)订阅增量日志,并把日志传输给客户端
Canal.Client: Canal的客户端,用于订阅和消费Canal.Server传输的数据
二、Canal的使用场景
Canal主要应用于以下场景:
1、数据实时同步
提供不同数据存储的数据实时同步,如 MySQL 到 Elasticsearch 的橡木箱子源码同步,实时更新数据,保持数据一致
2、数据订阅
对于需要全量数据同步的场景,结合 snapshot 快照机制,可以实现数据全量订阅
3、实时数据分析
对数据实时抓取,进行数据分析计算
4、缓存更新
将数据更新到Cache(如Redis)中,提升系统性能
三、Canal的具体实现
将A库的数据同步到B库中,具体实现如下:
1、安装Canal
Canal的安装需要先下载源码,然后进行编译打包,具体步骤可以参考Canal官网: /alibaba/canal
2、配置Canal
Canal的配置文件位于config文件夹下,通过修改canal.properties实现配置。
(1)配置MySQL的主从关系
# mysql主从地址信息
canal.instance.master.address=.0.0.1:
canal.instance.dbUsername=canal_test
canal.instance.dbPassword=canal_test
# 配置binlog信息,也可以从当前解析到的binlog中获取,
# 优先从binlog position 获取,找不到才到 GTID_GET中获取, gtid模式推荐打开,
# 当前的timestamp可以通过show master status或 show binary logs获取
canal.instance.connectionCharset = UTF-8
canal.instance.gtidon=on
canal.instance.position =
(2)配置Canal连接内容
# 配置instance连接信息
canal.instance.filter.regex=canal_test.tb_goods
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
(3)配置数据输出方式
# 配置数据输出方式
canal.mq.topic=test
# 指定数据传输格式
canal.mq.flatMessage = false
(4)配置kafka通常参数和账号信息
# canal.mq.producerGroup 客户端group组名,同一个topic下的不同group组互不影响
# canal.mq.servers 指定mq服务器的地址
# canal.mq.topics 指定MQ topic主题名称
# authAccount ,配置到应用已指定的账号
canal.mq.properties.bootstrap.servers=...:
canal.mq.producer.bootstrap-servers=...:
canal.mq.producer.topic=myTest can
(5)启动Canal
执行bin目录下的startup脚本,即可启动Canal。担保式交易源码
3、配置Canal客户端
在B库中新建表,同步A库的数据到该表中。
(1)在B库中创建表
mysql> create database canal_test2;
mysql> use canal_test2;
mysql> create table tb_goods(
-> id int() not null auto_increment primary key,
-> name varchar() not null,
-> price int() not null
-> )engine=innodb default charset=utf8;
(2)在Canal服务端中新增instance,即配置同步关系
mysql> create database canal_client;
mysql> use canal_client;
mysql> create table canal_client.tb_goods(
-> id int() not null,
-> name varchar() not null,
-> price int() not null
-> )engine=innodb default charset=utf8;
mysql> GRANT ALL PRIVILEGES ON canal_client.* TO canal@’%’ IDENTIFIED BY ‘canal_test’ WITH GRANT OPTION;
(3)在Canal客户端中启动Canal
通过Canal客户端,将A库的数据同步到B库中的表中,具体代码实现如下:
public class SimpleCanalClientExample {
public static void mn(String[] args) {
// 从控制台读取参数
String host = args[0];
int port = Integer.valueOf(args[1]);
String destination = args[2];
String username = args[3];
String password = args[4];
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(host,
port), destination, username, password);
int batchSize = ;
try {
connector.connect();
connector.subscribe(“canal_test.tb_goods”);
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
Thread.sleep();
} else {
System.out.printf(“batchId: %s, size: %s \n”, batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
connector.disconnect();
}
}
private static void printEntry(List entries) {
for (CanalEntry.Entry entry : entries) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN
|| entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND
|| entry.getEntryType() == CanalEntry.EntryType.HEARTBEAT) {
continue;
}
RowChange rowChange = null;
try {
rowChange = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
continue;
}
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
if (rowChange.getEventType() == CanalEntry.EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (rowChange.getEventType() == CanalEntry.EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
printColumn(rowData.getBeforeColumnsList());
System.out.println(“=======”);
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(List columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + “\t” + column.getValue() + “\t” + column.getUpdated());
}
}
}
通过以上代码,我们就可以将A库的tb_goods表中的数据实时同步到B库中的canal_client库的tb_goods表中。
四、总结
Canal是一款非常优秀的数据库增量订阅和消费组件,它可以很好地解决数据库间链接的问题,实现不同数据存储之间的数据同步。我们可以通过Canal的控制台管理界面,或者通过Canal的客户端代码实现数据库之间的数据同步。当然,Canal也有其缺点,就是在高并发场景下,可能会受到性能的限制,这需要我们在具体的应用场景中进行实际评估。
Canal-adapter1.1.4集成Elasticsearch7.8.0排坑指南及在本地环境运行canal-adapter项目
在集成canal的过程中,我遇到了众多问题,尽管网上有诸多解答,但质量不尽如人意。bash脚本源码于是,我下载源码进行本地编译,逐一排查,总结出以下要点:
以下是常见问题:
1、如何使canal-adapter1.1.4支持ES7系列?
2、常见错误信息
3、canal-adapter1.1.4支持的具体版本号范围
问题一:让canal-adapter支持ES7系列
首先,下载canal对应版本的源码到本地,使用编码工具打开。由于canal1.1.4最高支持的版本是6.4.3,在canal-adapter的elasticsearch模块中,引用的ES版本号为6.4.3,因此需要将ES的依赖版本号升起来。
修改完毕后,重新编译项目,会发现有几处代码编译报错。因为不同版本的ES的代码语法有所不同,只需要稍作改动即可。
代码编译通过后,修改canal-adapter下的launcher模块中的application.yml文件,修改后的示例如下:
修改完配置文件后,接下来配置数据库与ES索引的对应关系。位于elasticsearch模块下的勾魂夺魄指标源码资源文件目录下的es文件夹下,默认有3个文件。为了方便演示,先删除了两个文件。
然后在ES中创建相应的mapping结构,用于将数据库数据同步到ES中。
完成上述步骤后,即可启动canal-adapter本地项目。
问题二:关于常见的报错信息
canal-adapter在使用过程中,通常会遇到很多报错。以下逐一为大家解答:
采坑点之一:在本地运行前一定先在maven的root模块下安装,安装完毕后再运行CanalAdapterApplication启动类。
如果没有先安装直接运行,会出现报错,提示找不到OuterAdapter类的实现类。
通过报错信息可以发现,当前提示是ESAdapter这个类找不到。根据抛出异常代码所在行通过源码打断点进一步排查,发现找不到target目录下的plugin目录下面的jar包。
有两种方式可以解决这个问题,第一种是在canal-adapter项目的launcher模块下的main方法下面新建文件夹canal-adapter/plugin,将编译后的es的jar包放进去,然后修改源码中关于本地文件加载的路径。
另外一种方法就是,运行前还是先使用maven的install安装一下。
采坑点之二:报错信息Config dir not found
在本地调试过程中,发现有报错Config dir not found。通过报错行打断点进一步排查,发现是项目启动完毕后在执行数据初始化阶段没有找到配置文件所导致的异常。
这个问题也比较好解决,我们可以在canal-adapter的launcher模块的配置文件中新建一个叫es的文件夹,把elasticsearch模块下的es文件夹拷贝过来,即可解决这个问题。
采坑点之三:报错Elasticsearch exception [type=index_not_found_exception, reason=no such index [XXXX]]
这个问题,大家可以检查一下ES里面对应的索引名称是否存在,索引的mapping结构是否已经创建;当然,可能还有其他情况下导致出现这个问题,暂时没有遇到。
采坑点之四:报错Not found the mapping info of index: XXX
这个问题从报错信息来看,总感觉像是ES中索引的Mapping结构没有创建好。我用多种方式进行mapping结构的创建,可一直报错。
根据报错堆栈信息,通过打断点的方式进一步排查,我们会看到在ESConnection类的行有这样一些被注释了的代码。
这也正是canal-adapter1.1.4为什么不支持ES7以上的版本了。我们只需要将这些被注释的代码打开即可解决这个问题。
通过上述代码的改造,我们可以对改完后的内容进行测试,全量同步数据和增量同步数据。
canal-adapter为我们提供了全量同步数据的接口,我们在canal-adapter的launcher模块的com.alibaba.otter.canal.adapter.launcher.rest目录下可以看到有一个类叫做CommonRest,其里面提供全量同步数据的方法和条件同步数据的方法。
直接使用postman发送如下请求即可完成数据的全量同步,效果如下,同时,如果数据库当前表的数据发生变更,canal-adapter也能及时监听到并同步到ES中。
关于canal-adapter配置文件的,大家可以参考一下官网文档:github.com/alibaba/cana...
另外还有一个网上经常提到的name: es6和es7,通过观察源码,在adapter1.1.4版本中,直接使用es即可。
如上,canal-adapter1.1.4在本地运行起来了,并且全量同步数据和增量同步数据都已触发并生效。
通过kibana也可以查询到对应的数据了。
最后,这个项目在本地编译后在target目录下会生成一个canal-adapter的文件夹,这个文件夹可以拷贝出来直接运行。
在windos和linux都可以运行。我这边编译后,在本地直接运行bat文件,程序正常并且可以正常全量同步数据和增量同步数据。
不过遇到很奇怪的一个问题,将编译后的文件放在linux系统运行,则会不同的刷错误日志如下。
暂时还未解决当前问题。不过我这边在目前的实际应用场景中,使用不到adapter,因为它的使用场景比较有效,对数据有较高的要求。
这个问题在github上提了issues。
地址:canal-adapter在本地环境可正常运行,编译后在服务器上运行出错;· Issue # · alibaba/canal
mysql+canal+adapter+es实现数据同步
一、版本
MySQL+canal+adapter+es数据同步方案
二、MySQL开启binlog
配置MySQL服务器,启用binlog日志功能,确保数据变化得以记录。
三、MySQL配置文件
调整MySQL配置文件,确保binlog日志开启状态。
四、MySQL授权canal连接
为canal连接MySQL服务器的账号分配权限,使其能作为MySQL的从服务器。
五、下载canal及adapter
下载并解压canal和adapter相关组件,检查目录结构。
六、canal配置文件编辑
在canal配置文件中,仅需调整关键配置项以满足同步需求。
七、启动canal
运行canal启动脚本,观察日志输出。若遇到报错,检查startup.sh脚本的-Xss参数设置,必要时调整参数。
八、adapter配置与启动
编辑adapter配置文件,确保与canal及es的映射关系正确。启动adapter后,检查日志,若未见输出,尝试调整-Xss虚拟机参数。
九、解决adapter日志问题
通过调整-Xss参数至k,解决了adapter启动无日志问题。若依然存在问题,则考虑调整源码。
十、canal源码修改
下载canal源码,修改相关配置项,如pom.xml文件,完成重新打包与编译,替换adapter插件中的jar文件。
十一、测试与验证
在MySQL中执行数据插入操作,验证adapter日志及ES数据同步情况。针对关联表场景,进行新索引构建及数据插入,确保数据完整同步。
十二、结论
通过以上步骤,实现了MySQL数据通过canal和adapter同步至ES的目标,确保了数据的一致性与实时性。针对关联表的同步,需关注ES索引的创建与数据映射关系的正确性。