共计 6013 个字符,预计需要花费 16 分钟才能阅读完成。
上一篇文章我们已经搭建好了 Datax-Web 后台,这篇文章我们具体讲一下如何通过 Datax-Web 来配置,同步 MySQL 数据库。
目标
MySql 数据库全量同步
1. 执行器配置
1、” 调度中心 OnLine:” 右侧显示在线的 ” 调度中心 ” 列表, 任务执行结束后, 将会以 failover 的模式进行回调调度中心通知执行结果, 避免回调的单点风险;
2、“执行器列表”中显示在线的执行器列表, 可通过 ”OnLine 机器 ” 查看对应执行器的集群机器;
1、AppName:(与 datax-executor 中 application.yml 的 datax.job.executor.appname 保持一致)
每个执行器集群的唯一标示 AppName, 执行器会周期性以 AppName 为对象进行自动注册。可通过该配置自动发现注册成功的执行器, 供任务调度时使用;
2、名称: 执行器的名称, 因为 AppName 限制字母数字等组成, 可读性不强, 名称为了提高执行器的可读性;
3、排序: 执行器的排序, 系统中需要执行器的地方, 如任务新增, 将会按照该排序读取可用的执行器列表;
4、注册方式:调度中心获取执行器地址的方式;
自动注册:执行器自动进行执行器注册,调度中心通过底层注册表可以动态发现执行器机器地址;
手动录入:人工手动录入执行器的地址信息,多地址逗号分隔,供调度中心使用;
5、机器地址:” 注册方式 ” 为 ” 手动录入 ” 时有效,支持人工维护执行器的地址信息;
2. 创建数据源
数据源管理—> 添加
如图填写 MySQL 的账号信息,点击测试连接,无误之后确认。
第四步使用
3. 创建任务模版
第四步使用
4. 构建 JSON 脚本
1. 任务批量构建
步骤一,步骤二,选择第二步中创建的数据源,JSON 构建目前支持的数据源有 hive,mysql,oracle,postgresql,sqlserver,hbase,mongodb,clickhouse 其它数据源的 JSON 构建正在开发中, 暂时需要手动编写。
任务管理—> 任务批量构建—> 选择数据库源
2. 字段映射
3. 批量创建任务
手动执行一次
4. 启动任务
查看日志
报错
2023-03-28 16:41:14 [JobThread.run-130] <br>----------- datax-web job execute start -----------<br>----------- Param:
2023-03-28 16:41:14 [BuildCommand.buildDataXParam-100] ------------------Command parameters:
2023-03-28 16:41:14 [ExecutorJobHandler.execute-57] ------------------DataX process id: 29802
2023-03-28 16:41:14 [AnalysisStatistics.analysisStatisticsLog-53] File "/data/datax/bin/datax.py", line 114
2023-03-28 16:41:14 [AnalysisStatistics.analysisStatisticsLog-53] print readerRef
2023-03-28 16:41:14 [AnalysisStatistics.analysisStatisticsLog-53] ^
2023-03-28 16:41:14 [AnalysisStatistics.analysisStatisticsLog-53] SyntaxError: Missing parentheses in call to 'print'. Did you mean print(readerRef)?
2023-03-28 16:41:14 [JobThread.run-165] <br>----------- datax-web job execute end(finish) -----------<br>----------- ReturnT:ReturnT [code=500, msg=command exit value(1) is failed, content=null]
2023-03-28 16:41:14 [ProcessCallbackThread.callbackLog-186] <br>----------- datax-web job callback finish.
2023-03-28 16:41:14 [TriggerCallbackThread.callbackLog-186] <br>----------- datax-web job callback finish.
经过查询是本机装了多版本的 python
[root@node3 bin]# whereis python
python: /usr/bin/python /usr/bin/python2.7 /usr/bin/python3.6 /usr/bin/python3.6m /usr/lib/python2.7 /usr/lib/python3.6 /usr/lib64/python2.7 /usr/lib64/python3.6 /etc/python /usr/include/python2.7 /usr/include/python3.6m /root/anaconda3/bin/python /root/anaconda3/bin/python3.9 /root/anaconda3/bin/python3.9-config /usr/share/man/man1/python.1.gz
[root@node3 bin]# python -V
Python 3.9.13
[root@node3 bin]# /usr/bin/python -V
Python 2.7.5
经过修复使 Python 改为 2.7 再执行任务
[root@node3 ~]# python -V
Python 2.7.5
还有一种修复方式是
Python (2.x) (支持 Python3 需要修改替换 datax/bin 下面的三个 python 文件,替换文件在 doc/datax-web/datax-python3 下) 必选,主要用于调度执行底层 DataX 的启动脚本,默认的方式是以 Java 子进程方式执行 DataX,用户可以选择以 Python 方式来做自定义的改造
5. 查看任务
查看日志:
再用 Navicat 查看目标库中数据是否一致。
DataX-Web 增量配置说明
一、根据日期进行增量数据抽取
1. 页面任务配置
打开菜单任务管理页面,选择添加任务
按下图中 5 个步骤进行配置
- 1. 任务类型选 DataX 任务
- 2. 辅助参数选择时间自增
- 3. 增量开始时间选择,即 sql 中查询时间的开始时间,用户使用此选项方便第一次的全量同步。第一次同步完成后,该时间被更新为上一次的任务触发时间,任务失败不更新。
- 4. 增量时间字段,-DlastTime=’%s’-DcurrentTime=’%s’先来解析下这段字符串
1.- D 是 DataX 参数的标识符,必配
2.- D 后面的 lastTime 和 currentTime 是 DataX json 中 where 条件的时间字段标识符,必须和 json 中的变量名称保持一致
3.='%s' 是项目用来去替换时间的占位符,比配并且格式要完全一致
4. 注意 -DlastTime='%s' 和 -DcurrentTime='%s' 中间有一个空格,空格必须保留并且是一个空格
- 5. 时间格式,可以选择自己数据库中时间的格式,也可以通过 json 中配置 sql 时间转换函数来处理
注意,注意,注意: 配置一定要仔细看文档(后面我们也会对这块配置进行优化,避免大家犯错)
2.JSON 配置
datax.json
{
"job": {
"setting": {
"speed": {"channel": 16}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"splitPk": "id",
"username": "root",
"password": "root",
"column": ["*"],
"connection": [
{
"jdbcUrl": ["jdbc:mysql://localhost:3306/test?characterEncoding=utf8"],
"querySql": ["select * from test_list where operationDate >= FROM_UNIXTIME(${lastTime}) and operationDate < FROM_UNIXTIME(${currentTime})"
]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"username": "root",
"password": "123456",
"column": ["*"],
"batchSize": "4096",
"connection": [
{
"jdbcUrl": "jdbc:mysql://localhost:3307/test?characterEncoding=utf8",
"table": ["test_list"]
}
]
}
}
}
]
}
}
querySql 解析
select * from test_list where operationDate >= ${lastTime} and operationDate < ${currentTime}
- 1. 此处的关键点在 {lastTime},{currentTime},${} 是 DataX 动态参数的固定格式,lastTime,currentTime 就是我们页面配置中 -DlastTime=’%s’-DcurrentTime=’%s’中的 lastTime,currentTime,注意字段一定要一致。
- 2. 如果任务配置页面,时间类型选择为时间戳但是数据库时间格式不是时间戳,例如是:2019-11-26 11:40:57 此时可以用 FROM_UNIXTIME(${lastTime})进行转换。
select * from test_list where operationDate >= FROM_UNIXTIME(${lastTime}) and operationDate < FROM_UNIXTIME(${currentTime})
二、根据自增 Id 进行增量数据抽取
1. 页面任务配置
打开菜单任务管理页面,选择添加任务
按下图中 4 个步骤进行配置
- 1. 任务类型选 DataX 任务
- 2. 辅助参数选择主键自增
- 3. 增量主键开始 ID 选择,即 sql 中查询 ID 的开始 ID,用户使用此选项方便第一次的全量同步。第一次同步完成后,该 ID 被更新为上一次的任务触发时最大的 ID,任务失败不更新。
- 4. 增量时间字段,-DstartId=’%s’-DendId=’%s’先来解析下这段字符串
1.- D 是 DataX 参数的标识符,必配
2.- D 后面的 startId 和 endId 是 DataX json 中 where 条件的 id 字段标识符,必须和 json 中的变量名称保持一致,endId 是任务在每次执行时获取当前表 maxId,也是下一次任务的 startId
3.='%s' 是项目用来去替换时间的占位符,比配并且格式要完全一致
4. 注意 -DstartId='%s' 和 -DendId='%s' 中间有一个空格,空格必须保留并且是一个空格
5.reader 数据源,选择任务同步的读数据源
6. 配置 reader 数据源中需要同步数据的表名及该表的主键
注意,注意,注意: 一定要仔细看文档(后续会对这块配置进行优化,避免大家犯错)
2.JSON 配置
datax.json
{
"job": {
"setting": {
"speed": {
"channel": 3,
"byte": 1048576
},
"errorLimit": {
"record": 0,
"percentage": 0.02
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "yRjwDFuoPKlqya9h9H2Amg==",
"password": "yRjwDFuoPKlqya9h9H2Amg==",
"splitPk": "","connection": [
{
"querySql": ["select * from job_log where id>= ${startId} and id< ${endId}"
],
"jdbcUrl": ["jdbc:mysql://localhost:3306/datax_web"]
}
]
}
},
"writer": {
"name": "mysqlwriter",
"parameter": {
"username": "mCFD+p1IMsa0rHicbQohcA==",
"password": "PhYxJmA/nuBJD1OxKTRzZH8sxuRddOv83hdqDOVR+i0=",
"column": [
"`id`",
"`job_group`",
"`job_id`",
"`job_desc`",
"`executor_address`",
"`executor_handler`",
"`executor_param`",
"`executor_sharding_param`",
"`executor_fail_retry_count`",
"`trigger_time`",
"`trigger_code`",
"`trigger_msg`",
"`handle_time`",
"`handle_code`",
"`handle_msg`",
"`alarm_status`",
"`process_id`",
"`max_id`"
],
"connection": [
{
"table": ["job_log"],
"jdbcUrl": "jdbc:mysql://47.98.125.243:3306/datax_web"
}
]
}
}
}
]
}
}
querySql 解析
select * from job_log where id>= ${startId} and id< ${endId}
- 1. 此处的关键点在 {startId},{endId},${} 是 DataX 动态参数的固定格式,startId,endId 就是我们页面配置中 -DstartId=’%s’-DendId=’%s’中的 startId,endId,注意字段一定要一致。
三、JVM 启动参数配置
此选择为非必选,可以配置 DataX 启动时 JVM 的参数,具体配置不做详解。
JVM 启动参数拼接结果为:-j "-Xms2G -Xmx2G"
参考
https://github.com/WeiYe-Jing/datax-web
https://github.com/WeiYe-Jing/datax-web/blob/master/doc/datax-web/increment-desc.md