DataX 能实现异构数据库之间数据同步的功能。不同类型数据库之间的数据同步。我最近在调研的是一个内外网数据同步的功能。内外网之间的网络是不通的,我需要定时的把外网的数据库同步到内网来。
这就需要用到 datax,我内外网要同步的数据库都是 mysql。实现的原理大致是用 datax 导出 mysql 到 csv 文件,把 csv 传到内网,在通过 csv 导入到内网的 mysql。
datax 的使用非常简单,只是需要依赖 python 2.6+
和 jdk1.8
。
官方还说需要 maven,但那只是用来编译 datax 源码的。如果下载的是编译好的 tar 包,就不需要 maven。
环境准备好后,下载官方编译好的 tar 包,并解压。解压后有这么几个文件夹。
bin conf job lib log log_perf plugin script tmp
运行 bin/datax.py
即可运行同步任务,因为是 py 文件,所以需要 python,而且里面的语法是 2.6+ 版本的。
同步任务是以 json 文件的形式放在 job 目录下的,job 目录下初始自带一个 job.json 样例文件。我们执行看看效果:
./bin/datax.py ./job/job.json
datax 会执行这个任务,中间会输出一些 log:
DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
2021-11-21 19:42:31.164 [main] INFO VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2021-11-21 19:42:31.175 [main] INFO Engine - the machine info =>
osInfo: AdoptOpenJDK 1.8 25.265-b01
jvmInfo: Linux amd64 4.14.129-bbrplus
cpu num: 2
totalPhysicalMemory: -0.00G
freePhysicalMemory: -0.00G
maxFileDescriptorCount: -1
currentOpenFileDescriptorCount: -1
GC Names [PS MarkSweep, PS Scavenge]
MEMORY_NAME | allocation_size | init_size
PS Eden Space | 256.00MB | 256.00MB
Code Cache | 240.00MB | 2.44MB
Compressed Class Space | 1,024.00MB | 0.00MB
PS Survivor Space | 42.50MB | 42.50MB
PS Old Gen | 683.00MB | 683.00MB
Metaspace | -0.00MB | 0.00MB
2021-11-21 19:42:31.198 [main] INFO Engine -
{
"content":[
{
"reader":{
"name":"streamreader",
"parameter":{
"column":[
{
"type":"string",
"value":"DataX"
},
{
"type":"long",
"value":19890604
},
{
"type":"date",
"value":"1989-06-04 00:00:00"
},
{
"type":"bool",
"value":true
},
{
"type":"bytes",
"value":"test"
}
],
"sliceRecordCount":100000
}
},
"writer":{
"name":"streamwriter",
"parameter":{
"encoding":"UTF-8",
"print":false
}
}
}
],
"setting":{
"errorLimit":{
"percentage":0.02,
"record":0
},
"speed":{
"byte":10485760
}
}
}
2021-11-21 19:42:31.223 [main] WARN Engine - prioriy set to 0, because NumberFormatException, the value is: null
2021-11-21 19:42:31.225 [main] INFO PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2021-11-21 19:42:31.226 [main] INFO JobContainer - DataX jobContainer starts job.
2021-11-21 19:42:31.228 [main] INFO JobContainer - Set jobId = 0
2021-11-21 19:42:31.244 [job-0] INFO JobContainer - jobContainer starts to do prepare ...
2021-11-21 19:42:31.245 [job-0] INFO JobContainer - DataX Reader.Job [streamreader] do prepare work .
2021-11-21 19:42:31.246 [job-0] INFO JobContainer - DataX Writer.Job [streamwriter] do prepare work .
2021-11-21 19:42:31.246 [job-0] INFO JobContainer - jobContainer starts to do split ...
2021-11-21 19:42:31.247 [job-0] INFO JobContainer - Job set Max-Byte-Speed to 10485760 bytes.
2021-11-21 19:42:31.248 [job-0] INFO JobContainer - DataX Reader.Job [streamreader] splits to [1] tasks.
2021-11-21 19:42:31.249 [job-0] INFO JobContainer - DataX Writer.Job [streamwriter] splits to [1] tasks.
2021-11-21 19:42:31.273 [job-0] INFO JobContainer - jobContainer starts to do schedule ...
2021-11-21 19:42:31.283 [job-0] INFO JobContainer - Scheduler starts [1] taskGroups.
2021-11-21 19:42:31.285 [job-0] INFO JobContainer - Running by standalone Mode.
2021-11-21 19:42:31.299 [taskGroup-0] INFO TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.
2021-11-21 19:42:31.303 [taskGroup-0] INFO Channel - Channel set byte_speed_limit to -1, No bps activated.
2021-11-21 19:42:31.303 [taskGroup-0] INFO Channel - Channel set record_speed_limit to -1, No tps activated.
2021-11-21 19:42:31.316 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started
2021-11-21 19:42:31.416 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[103]ms
2021-11-21 19:42:31.417 [taskGroup-0] INFO TaskGroupContainer - taskGroup[0] completed it's tasks.
2021-11-21 19:42:41.316 [job-0] INFO StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.043s | All Task WaitReaderTime 0.054s | Percentage 100.00%
2021-11-21 19:42:41.316 [job-0] INFO AbstractScheduler - Scheduler accomplished all tasks.
2021-11-21 19:42:41.316 [job-0] INFO JobContainer - DataX Writer.Job [streamwriter] do post work.
2021-11-21 19:42:41.317 [job-0] INFO JobContainer - DataX Reader.Job [streamreader] do post work.
2021-11-21 19:42:41.318 [job-0] INFO JobContainer - DataX jobId [0] completed successfully.
2021-11-21 19:42:41.320 [job-0] INFO HookInvoker - No hook invoked, because base dir not exists or is a file: /opt/datax/hook
2021-11-21 19:42:41.321 [job-0] INFO JobContainer -
[total cpu info] =>
averageCpu | maxDeltaCpu | minDeltaCpu
-1.00% | -1.00% | -1.00%
[total gc info] =>
NAME | totalGCCount | maxDeltaGCCount | minDeltaGCCount | totalGCTime | maxDeltaGCTime | minDeltaGCTime
PS MarkSweep | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s
PS Scavenge | 0 | 0 | 0 | 0.000s | 0.000s | 0.000s
2021-11-21 19:42:41.321 [job-0] INFO JobContainer - PerfTrace not enable!
2021-11-21 19:42:41.321 [job-0] INFO StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.043s | All Task WaitReaderTime 0.054s | Percentage 100.00%
2021-11-21 19:42:41.328 [job-0] INFO JobContainer -
任务启动时刻 : 2021-11-21 19:42:31
任务结束时刻 : 2021-11-21 19:42:41
任务总计耗时 : 10s
任务平均流量 : 253.91KB/s
记录写入速度 : 10000rec/s
读出记录总数 : 100000
读写失败总数 : 0
datax 的运行没问题了,但是我如何用 datax 去实现 mysql 导出到 csv 这个功能呢。
这我们就要去看官方文档了,文档写清楚了每个支持的数据库的 json 写法。比如我目前要想实现 mysql -> csv。就要去看 mysqlreader
和 txtfilewriter
的文档。然后在内网进行导入就需要看 txtfilereader
和 mysqlwriter
的文档。官网页面上有显示支持的数据源。
以导出为例,最后写好的 job 任务大概是这样的:
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["*"],
"connection": [
{
"jdbcUrl": ["jdbc:mysql://192.168.0.2:3306/test"],
"table": ["user"]
}
],
"password": "root",
"username": "root"
}
},
"writer": {
"name": "txtfilewriter",
"parameter": {
"path": "/home",
"fileName": "result",
"writeMode": "truncate",
"fileFormat": "csv",
"dateFormat": "yyyy-MM-dd HH:mm:ss"
}
}
}
],
"setting": {
"speed": {
"channel": "1"
}
}
}
}
然后我们把它保存成一个 mysql2csv.json
文件放在 job 目录,然后执行:
./bin/datax.py ./job/mysql2csv.json
就会开始执行我们定义的任务。reader 和 writer 里面具体每个字段是什么用,文档里面都有的,可以自己详细去看。
DataX 的使用还是很简单的,我主要在前期找 python2.7 和 jdk1.8 环境的镜像花了一点时间。
datax-web 项目提供了 UI 界面去定义 datax 任务。我这里使用的是 WeiYe-Jing/datax-web
安装的话其实按照官方的安装步骤来就行了。依赖的环境和 datax 一样,也是 python 2.6+ 和 jdk1.8 版本。还需要一个额外的 mysql 数据库,用来记录 web 页面产生的一些数据。比如 datax-web 项目额外提供了增量导出导入的功能,是 web 项目本身记录了上一次最后的导出 id 或者导出时间信息。
注意:mysql 必须是 5.6、5.7 版本的 mysql,因为需要 mysql 初始化后导入官方的
datax-web.sql
脚本,而我在 mysql:8.0.18 版本上试了导入会出错,应该是 sql 语句不兼容的问题,这需要注意。
使用官方提供的 datax-web.sql
脚本初始化 mysql 数据库。其他按照官方的文档来,最后执行 ./bin/start-all.sh
脚本就行了。
start-all.sh 脚本会启动两个服务,datax-admin
和 datax-executor
,admin 就是管理后台,executor 是具体执行数据同步任务的,这和 CI 的流程有点像。
start-all.sh 启动的服务都是后台启动的。
服务启动后,在浏览器中输入 http://ip:port/index.html
就能看到页面了。
注意:url 后面的 index.html 必须带上。初始账号是 admin / 123456
登进去主页之后,我们可以看到左边导航栏有 项目管理,任务管理,数据源管理等等。
接下来按照我说的流程一个个的创建。
1) 首先 项目管理 -> 添加
,创建一个项目。
2) 数据源管理 -> 添加
创建数据源。
3) 任务管理 -> DataX 任务模版 -> 添加
先创建模板。
4) 任务管理 -> 任务构建
创建任务。
大致是分为这四步,任务创建好之后可以在任务管理里面看到,这里的 状态
那一栏初始是红色,是关闭的。点一下启用,就会变成绿色,这时设置的 cron 才会生效。
然后编辑任务里面的 辅助参数
,和数据库的表结构配合可以实现增量更新。可以选择 主键自增
,时间自增
等方式。比如选择了主键自增,刚开始我们可以设置一个 0,每执行过一次任务之后,该 id 就会被更新为上一次的任务触发时最大的 id,任务失败不更新。时间自增
也是同理,会自动更新为上一次触发任务的时间。
然后我们点击 执行一次
,主动触发任务,然后点击 查询日志
,可以查看当前任务的日志和历史任务日志。
这里有很多页面需要手动执行刷新,数据才会出来。
datax-web 的基本使用差不多是这样。
由于前期安装和环境配置花了很多时间,所以我做了一个 datax-web 镜像推到 dockerhub 上了。这个镜像内部自带了 datax 和 datax-web 项目,直接 run 起来就可以跑了。由于需要 mysql,所以我写了个 docker-compose.yml,地址在xxx。直接拉下来 docker-compose up -d
就可以跑,然后访问浏览器 localhost:9527
端口就行了。
data-web UI 界面上的数据源其实很少,但是我们在创建任务后,会生成一个 json 文件,其实就是 datax 需要的任务文件,我们可以直接改那个 json 配置达到目的。虽然 datax-web 不支持,但是 datax 是支持的。我试过也能执行成功。
同理,我们如果有其他的需求但是 web 项目不支持的,我们可以直接修改 json 配置。
具体是在 任务管理 -> 操作 -> 编辑
里面修改。
这是因为官方的 datax 只支持 5.x 版本的 mysql 链接(好像是),我之前的是 mysql 8.0.18 版本的,一直连不上。需要自己下载 mysql-connector-java-8.0.18.jar
包放到 datax/plugin/{reader|writer}/{mysqlreader|mysqlwriter}/libs/
目录就行了。