共计 12971 个字符,预计需要花费 33 分钟才能阅读完成。
阿里巴巴高效的离线数据同步工具 DataX
前言
我们公司有个项目的数据量高达五千万,但是因为报表那块数据不太准确,业务库和报表库又是跨库操作,所以并不能使用 SQL 来进行同步。当时的打算是通过 mysqldump 或者存储的方式来进行同步,但是尝试后发现这些方案都不切实际:
mysqldump 方案
不仅备份需要时间,同步也需要时间,而且在备份的过程,可能还会有数据产出(也就是说同步等于没同步)存储方式:这个效率太慢了,要是数据量少还好,我们使用这个方式的时候,三个小时才同步两千条数据…
实际生成环境需求
1. 将某里云的 MySQL5.7 数据库 A(RDS 数据库)全量同步到某讯云 MySQL5.7 数据库 A
2. 后台实时增量备份
某里云现有产品:DTS(收费产品)
开源地址
https://github.com/alibaba/DataX
DataX 是什么
DataX 是阿里云 DataWorks 数据集成的开源版本。
DataX 是阿里云 DataWorks 数据集成 的开源版本,主要就是用于实现数据间的离线同步。DataX 致力于实现包括关系型数据库(MySQL、Oracle 等)、HDFS、Hive、ODPS、HBase、FTP 等 各种异构数据源(即不同的数据库)间稳定高效的数据同步功能。
为了 解决异构数据源同步问题,DataX 将复杂的网状同步链路变成了星型数据链路,DataX 作为中间传输载体负责连接各种数据源;
当需要接入一个新的数据源时,只需要将此数据源对接到 DataX,便能跟已有的数据源作为无缝数据同步。
设计理念
为了解决异构数据源同步问题,DataX 将复杂的网状的同步链路变成了星型数据链路,DataX 作为中间传输载体负责连接各种数据源。当需要接入一个新的数据源的时候,只需要将此数据源对接到 DataX,便能跟已有的数据源做到无缝数据同步。
当前使用现状
DataX 在阿里巴巴集团内被广泛使用,承担了所有大数据的离线同步业务,并已持续稳定运行了 6 年之久。目前每天完成同步 8w 多道作业,每日传输数据量超过 300TB。
此前已经开源 DataX1.0 版本,此次介绍为阿里巴巴开源全新版本 DataX3.0,有了更多更强大的功能和更好的使用体验。
datax 基本组件介绍
DataX 本身作为离线数据同步框架,采用 Framework + plugin 架构构建。将数据源读取和写入抽象成为 Reader/Writer 插件,纳入到整个同步框架中。
Reader:Reader 为数据采集模块,负责采集数据源的数据,将数据发送给 Framework。
Writer:Writer 为数据写入模块,负责不断向 Framework 取数据,并将数据写入到目的端。
Framework:Framework 用于连接 reader 和 writer,作为两者的数据传输通道,并处理缓冲,流控,并发,数据转换等核心技术问题。
datax 主流数据库支持情况
DataX 目前已经有了比较全面的插件体系,主流的 RDBMS 数据库、NOSQL、大数据计算系统都已经接入,目前支持数据如下图,
详情请点击:DataX 数据源参考指南
类型 | 数据源 | Reader(读) | Writer(写) | 文档 |
---|---|---|---|---|
RDBMS 关系型数据库 | MySQL | √ | √ | 读 、 写 |
Oracle | √ | √ | 读 、 写 | |
OceanBase | √ | √ | 读 、 写 | |
SQLServer | √ | √ | 读 、 写 | |
PostgreSQL | √ | √ | 读 、 写 | |
DRDS | √ | √ | 读 、 写 | |
Kingbase | √ | √ | 读 、 写 | |
通用 RDBMS(支持所有关系型数据库) | √ | √ | 读 、 写 | |
阿里云数仓数据存储 | ODPS | √ | √ | 读 、 写 |
ADB | √ | 写 | ||
ADS | √ | 写 | ||
OSS | √ | √ | 读 、 写 | |
OCS | √ | 写 | ||
Hologres | √ | 写 | ||
AnalyticDB For PostgreSQL | √ | 写 | ||
阿里云中间件 | datahub | √ | √ | 读、写 |
SLS | √ | √ | 读、写 | |
阿里云图数据库 | GDB | √ | √ | 读 、 写 |
NoSQL 数据存储 | OTS | √ | √ | 读 、 写 |
Hbase0.94 | √ | √ | 读 、 写 | |
Hbase1.1 | √ | √ | 读 、 写 | |
Phoenix4.x | √ | √ | 读 、 写 | |
Phoenix5.x | √ | √ | 读 、 写 | |
MongoDB | √ | √ | 读 、 写 | |
Cassandra | √ | √ | 读 、 写 | |
数仓数据存储 | StarRocks | √ | √ | 读、写 |
ApacheDoris | √ | 写 | ||
ClickHouse | √ | 写 | ||
Databend | √ | 写 | ||
Hive | √ | √ | 读 、 写 | |
kudu | √ | 写 | ||
无结构化数据存储 | TxtFile | √ | √ | 读 、 写 |
FTP | √ | √ | 读 、 写 | |
HDFS | √ | √ | 读 、 写 | |
Elasticsearch | √ | 写 | ||
时间序列数据库 | OpenTSDB | √ | 读 | |
TSDB | √ | √ | 读 、 写 | |
TDengine | √ | √ | 读 、 写 |
系统要求
测试系统:Centos7
内网 IP:192.168.1.3【node03,安装 Datax】
内网 IP:192.168.1.4【node04,安装 MySQL5.7】
需求:通过 Datax 工具将阿里云的 RDS(mysql5.7)的数据库同步到本地 node04 服务器中。
jdk1.8+
python 运行环境(推荐 python2.6.x)
Datax 安装
1. 下载安装并解压
node03 服务器执行以下命令进行解压
# wget http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz
备用下载:http://js.funet8.com/centos_software/datax.tar.gz
# mkdir /data/
# tar -zxf datax.tar.gz -C /data/
文件信息:
文件名称:datax.tar.gz
文件大小:853734462 字节
MD5:8E93697ADDBD26BEBC157613089A1173
SHA1:B0735462809F664D721D992DF5FD4813C0DB360C
CRC32:FA2708CC
2. 验证 datax 是否安装成功
进入 datax 的安装目录的 bin 路径下,然后执行以下命令验证 datax 是否安装成功
node03 执行以下命令进入 datax 的 bin 目录
# cd /data/datax/bin
# python datax.py -r streamreader -w streamwriter
出现以下结果,证明安装成功
[root@node3 bin]# python datax.py -r streamreader -w streamwriter
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
Please refer to the streamreader document:
https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md
Please refer to the streamwriter document:
https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md
Please save the following configuration as a json file and use
python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
to run the job.
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {"column": [],
"sliceRecordCount": ""
}
},
"writer": {
"name": "streamwriter",
"parameter": {"encoding": "","print": true}
}
}
],
"setting": {
"speed": {"channel": ""}
}
}
}
需要删除文件
# rm -rf /data/datax/plugin/*/._* # 需要删除隐藏文件 (重要)
当未删除时,可能会输出:[datax/plugin/reader/._drdsreader/plugin.json] 不存在. 请检查您的配置文件.
Datax 实战
一、使用 datax 实现 stream2stream 数据读取
使用 datax 实现读取字符串,然后打印到控制台当中来
1)第一步:查看帮助文档
# cd /data/datax
# python bin/datax.py -w streamwriter -r streamreader
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
Please refer to the streamreader document:
https://github.com/alibaba/DataX/blob/master/streamreader/doc/streamreader.md
Please refer to the streamwriter document:
https://github.com/alibaba/DataX/blob/master/streamwriter/doc/streamwriter.md
Please save the following configuration as a json file and use
python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json
to run the job.
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {"column": [],
"sliceRecordCount": ""
}
},
"writer": {
"name": "streamwriter",
"parameter": {"encoding": "","print": true}
}
}
],
"setting": {
"speed": {"channel": ""}
}
}
}
2)第二步:开发 datax 配置文件
node03 服务器开发 stream2stream 的配置文件
# cd /data/datax/job
# vim stream2stream.json
{
"job": {
"setting": {
"speed": {"byte":10485760},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column" : [
{
"value": "DataX",
"type": "string"
},
{
"value": 19890604,
"type": "long"
},
{
"value": "1989-06-04 00:00:00",
"type": "date"
},
{
"value": true,
"type": "bool"
},
{
"value": "test",
"type": "bytes"
}
],
"sliceRecordCount": 10
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": true,
"encoding": "UTF-8"
}
}
}
]
}
}
3)第三步:启动 datax 实现数据的打印
执行以下命令启动 datax
# cd /data/datax
# python /data/datax/bin/datax.py /data/datax/job/stream2stream.json
2023-02-27 10:20:04.678 [main] WARN ConfigParser - 插件 [streamreader,streamwriter] 加载失败,1s 后重试... Exception:Code:[Common-00], Describe:[您提供的配置文件存在错误信息,请检查您的作业配置 .] - 配置信息错误,您提供的配置文件 [/home/data/datax/plugin/reader/._drdsreader/plugin.json] 不存在. 请检查您的配置文件.
2023-02-27 10:20:05.686 [main] ERROR Engine -
经 DataX 智能分析, 该任务最可能的错误原因是:
com.alibaba.datax.common.exception.DataXException: Code:[Common-00], Describe:[您提供的配置文件存在错误信息,请检查您的作业配置 .] - 配置信息错误,您提供的配置文件 [/home/data/datax/plugin/reader/._drdsreader/plugin.json] 不存在. 请检查您的配置文件.
at com.alibaba.datax.common.exception.DataXException.asDataXException(DataXException.java:26)
at com.alibaba.datax.common.util.Configuration.from(Configuration.java:95)
at com.alibaba.datax.core.util.ConfigParser.parseOnePluginConfig(ConfigParser.java:153)
at com.alibaba.datax.core.util.ConfigParser.parsePluginConfig(ConfigParser.java:125)
at com.alibaba.datax.core.util.ConfigParser.parse(ConfigParser.java:63)
at com.alibaba.datax.core.Engine.entry(Engine.java:137)
at com.alibaba.datax.core.Engine.main(Engine.java:204)
解决报错
部署 datax 到本地后首次执行任务报错
分别进入到 reader 和 writer 目录, 删除掉这类型文件
cd /data/datax/plugin/reader
rm -rf ./._*
cd /data/datax/plugin/writer
rm -rf ./._*
再次执行
# python /data/datax/bin/datax.py /data/datax/job/stream2stream.json
案例二:使用 datax 实现 mysql2stream
使用 datax 实现将 mysql 一张表的指定字段的数据抽取出来,并打印出来
1)第一步:创建 mysql 数据库以及向 mysql 当中插入数据
执行以下命令创建 mysql 表数据
在 192.168.1.6 数据库中操作
# mysql -u root -h 192.168.1.6 -P 3306 -p'123456'
mysql> CREATE DATABASE `userdb`;
mysql> USE `userdb`;
mysql> DROP TABLE IF EXISTS `emp`;
mysql> CREATE TABLE `emp` (`id` int(11) DEFAULT NULL,
`name` varchar(100) DEFAULT NULL,
`deg` varchar(100) DEFAULT NULL,
`salary` int(11) DEFAULT NULL,
`dept` varchar(10) DEFAULT NULL,
`create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`is_delete` bigint(20) DEFAULT '1'
) ENGINE=InnoDB DEFAULT CHARSET=latin1;
mysql> insert into `emp`(`id`,`name`,`deg`,`salary`,`dept`,`create_time`,`update_time`,`is_delete`) values (1201,'gopal','manager',50000,'TP','2018-06-17 18:54:32','2019-01-17 11:19:32',1),(1202,'manishahello','Proof reader',50000,'TPP','2018-06-15 18:54:32','2018-06-17 18:54:32',0),(1203,'khalillskjds','php dev',30000,'AC','2018-06-17 18:54:32','2019-03-14 09:18:27',1),(1204,'prasanth_xxx','php dev',30000,'AC','2018-06-17 18:54:32','2019-04-07 09:09:24',1),(1205,'kranthixxx','admin',20000,'TP','2018-06-17 18:54:32','2018-12-08 11:50:33',0),(1206,'garry','manager',50000,'TPC','2018-12-10 21:41:09','2018-12-10 21:41:09',1),(1207,'oliver','php dev',2000,'AC','2018-12-15 13:49:13','2018-12-15 13:49:13',1),(1208,'hello','phpDev',200,'TP','2018-12-16 09:41:48','2018-12-16 09:41:48',1),(1209,'ABC','HELLO',300,NULL,'2018-12-16 09:42:04','2018-12-16 09:42:24',1),(1210,'HELLO','HELLO',5800,'TP','2019-01-24 09:02:43','2019-01-24 09:02:43',1),(1211,'WORLD','TEST',8800,'AC','2019-01-24 09:03:15','2019-01-24 09:03:15',1),(1212,'sdfs','sdfsdf',8500,'AC','2019-03-13 22:01:38','2019-03-13 22:01:38',1),(1213,NULL,'sdfsdf',9800,'sdfsdf','2019-03-14 09:08:31','2019-03-14 09:08:54',1),(1214,'xxx','sdfsdf',9500,NULL,'2019-03-14 09:13:32','2019-03-14 09:13:44',0),(1215,'sdfsf','sdfsdfsdf',9870,'TP','2019-04-07 09:10:39','2019-04-07 09:11:18',0),(1216,'hello','HELLO',5600,'AC','2019-04-07 09:37:05','2019-04-07 09:37:05',1),(1217,'HELLO2','hello2',7800,'TP','2019-04-07 09:37:40','2019-04-07 09:38:17',1);
查看数据
2)第二步:开发 datax 的配置文件
node03 执行以下命令查看帮助文档
# cd /data/datax
# python bin/datax.py -r mysqlreader -w streamwriter
node03 执行以下命令开发 datax 配置文件,根据实际情况填写用户名和密码等。
# cd /data/datax/job
# vim mysql2stream.json
{
"job": {
"setting": {
"speed": {"channel": 3},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"column": [
"id",
"name"
],
"connection": [
{
"table": ["emp"],
"jdbcUrl": ["jdbc:mysql://192.168.1.6:3306/userdb"]
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding":"GBK",
"print":true
}
}
}
]
}
}
3)第三步:启动 datax 实现数据同步
node03 执行以下命令实现 datax 数据同步
# cd /data/datax
# python /data/datax/bin/datax.py /data/datax/job/mysql2stream.json
报错:2023-02-27 10:49:31.382 [main] WARN ConfigParser - 插件 [mysqlreader,streamwriter] 加载失败,1s 后重试... Exception:Code:[Common-00], Describe:[您提供的配置文件存在错误信息,请检查您的作业配置 .] - 配置信息错误,您提供的配置文件 [/home/data/datax/plugin/reader/._drdsreader/plugin.json] 不存在. 请检查您的配置文件.
2023-02-27 10:49:32.389 [main] ERROR Engine -
经 DataX 智能分析, 该任务最可能的错误原因是:
com.alibaba.datax.common.exception.DataXException: Code:[Common-00], Describe:[您提供的配置文件存在错误信息,请检查您的作业配置 .] - 配置信息错误,您提供的配置文件 [/home/data/datax/plugin/reader/._drdsreader/plugin.json] 不存在. 请检查您的配置文件.
at com.alibaba.datax.common.exception.DataXException.asDataXException(DataXException.java:26)
at com.alibaba.datax.common.util.Configuration.from(Configuration.java:95)
at com.alibaba.datax.core.util.ConfigParser.parseOnePluginConfig(ConfigParser.java:153)
at com.alibaba.datax.core.util.ConfigParser.parsePluginConfig(ConfigParser.java:125)
at com.alibaba.datax.core.util.ConfigParser.parse(ConfigParser.java:63)
at com.alibaba.datax.core.Engine.entry(Engine.java:137)
at com.alibaba.datax.core.Engine.main(Engine.java:204)
部署 datax 到本地后首次执行任务报错
分别进入到 reader 和 writer 目录, 删除掉这类型文件
cd /data/datax/plugin/reader
rm -rf ./._*
cd /data/datax/plugin/writer
rm -rf ./._*
再次执行
# python /data/datax/bin/datax.py /data/datax/job/mysql2stream.json
案例三:使用 datax 实现增量数据同步
使用 datax 实现增量数据同步打印到控制台
1)第一步:开发 datax 的配置文件
node03 执行以下命令开发 datax 配置文件
# cd /data/datax/job
# vim mysql2streamadd.json
{
"job": {
"setting": {
"speed": {"channel": 3},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"column": [
"id",
"name"
],
"where": "create_time >'${start_time}'and create_time <'${end_time}'","connection": [
{
"table": ["emp"],
"jdbcUrl": ["jdbc:mysql://192.168.1.6:3306/userdb"]
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding":"GBK",
"print":true
}
}
}
]
}
}
2)第二步:启动 datax 实现数据同步
# cd /data/datax
# /data/datax/bin/datax.py /data/datax/job/mysql2streamadd.json -p "-Dstart_time='2018-06-15 00:00:00'-Dend_time='2023-06-15 23:59:59'"
SQL:select id,name from emp where (create_time > '2018-06-15 00:00:00' and create_time < '2023-06-15 23:59:59'
案例四:使用 datax 实现 mysql2mysql
使用 datax 实现将数据从 mysql 当中读取,并且通过 sql 语句实现数据的过滤,并且将数据写入到 mysql 另外一张表当中去
1)第一步:创建 mysql 另外一张表
# mysql -u root -h 192.168.1.6 -P 3306 -p
mysql> USE userdb;
mysql> CREATE TABLE `emp2` (`id` INT(11) DEFAULT NULL,
`name` VARCHAR(100) DEFAULT NULL,
`deg` VARCHAR(100) DEFAULT NULL,
`salary` INT(11) DEFAULT NULL
) ENGINE=INNODB DEFAULT CHARSET=latin1;
2)第二步:开发 datax 的配置文件
查看帮助文档
# cd /data/datax
# python /data/datax/bin/datax.py -r mysqlreader -w mysqlwriter
node03 执行以下命令开发 datax 配置文件
# cd /data/datax/job/
# vim mysql2mysql.json
{
"job": {
"setting": {
"speed": {"channel":1}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"connection": [
{
"querySql": ["select id,name,deg,salary from emp where id < 1208;"],
"jdbcUrl": ["jdbc:mysql://192.168.1.6:3306/userdb"]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"writeMode": "insert",
"username": "root",
"password": "123456",
"column": [
"id",
"name",
"deg",
"salary"
],
"session": ["set session sql_mode='ANSI'"],"preSql": ["delete from emp2"],"connection": [
{
"jdbcUrl": "jdbc:mysql://192.168.1.6:3306/userdb?useUnicode=true&characterEncoding=utf-8",
"table": ["emp2"]
}
]
}
}
}
]
}
}
3)第三步:启动 datax 实现数据同步
node03 执行以下命令实现 datax 数据同步
cd /data/datax
python /data/datax/bin/datax.py /data/datax/job/mysql2mysql.json
完成
2023-02-27 15:57:43.684 [job-0] INFO JobContainer - PerfTrace not enable!
2023-02-27 15:57:43.685 [job-0] INFO StandAloneJobContainerCommunicator - Total 7 records, 177 bytes | Speed 17B/s, 0 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
2023-02-27 15:57:43.687 [job-0] INFO JobContainer -
任务启动时刻 : 2023-02-27 15:57:33
任务结束时刻 : 2023-02-27 15:57:43
任务总计耗时 : 10s
任务平均流量 : 17B/s
记录写入速度 : 0rec/s
读出记录总数 : 7
读写失败总数 : 0
4)查看数据
帮助文档
MysqlReader 插件文档:https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md
DataX MysqlWriter https://github.com/alibaba/DataX/blob/master/mysqlwriter/doc/mysqlwriter.md
# python /data/datax/bin/datax.py -r mysqlreader -w mysqlwriter
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader", # 读取端
"parameter": {"column": [], # 需要同步的列 (* 表示所有的列)
"connection": [
{"jdbcUrl": [], # 连接信息
"table": [] # 连接表}
],
"password": "", # 连接用户"username":"", # 连接密码
"where": "" # 描述筛选条件
}
},
"writer": {
"name": "mysqlwriter", # 写入端
"parameter": {"column": [], # 需要同步的列
"connection": [
{"jdbcUrl": "", # 连接信息"table": [] # 连接表
}
],
"password": "", # 连接密码"preSql": [], # 同步前. 要做的事"session": [],"username":"", # 连接用户
"writeMode": "" # 操作类型
}
}
}
],
"setting": {
"speed": {"channel": "" # 指定并发数}
}
}
}