notes

使用 DataX 进行数据同步

DataX 能实现异构数据库之间数据同步的功能。不同类型数据库之间的数据同步。我最近在调研的是一个内外网数据同步的功能。内外网之间的网络是不通的,我需要定时的把外网的数据库同步到内网来。

这就需要用到 datax,我内外网要同步的数据库都是 mysql。实现的原理大致是用 datax 导出 mysql 到 csv 文件,把 csv 传到内网,在通过 csv 导入到内网的 mysql。

DataX 的安装

datax 的使用非常简单,只是需要依赖 python 2.6+jdk1.8

官方还说需要 maven,但那只是用来编译 datax 源码的。如果下载的是编译好的 tar 包,就不需要 maven。

环境准备好后,下载官方编译好的 tar 包,并解压。解压后有这么几个文件夹。

bin  conf  job  lib  log  log_perf  plugin  script  tmp

运行 bin/datax.py 即可运行同步任务,因为是 py 文件,所以需要 python,而且里面的语法是 2.6+ 版本的。

同步任务是以 json 文件的形式放在 job 目录下的,job 目录下初始自带一个 job.json 样例文件。我们执行看看效果:

./bin/datax.py ./job/job.json

datax 会执行这个任务,中间会输出一些 log:

DataX (DATAX-OPENSOURCE-3.0), From Alibaba !
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.


2021-11-21 19:42:31.164 [main] INFO  VMInfo - VMInfo# operatingSystem class => sun.management.OperatingSystemImpl
2021-11-21 19:42:31.175 [main] INFO  Engine - the machine info  => 

	osInfo:	AdoptOpenJDK 1.8 25.265-b01
	jvmInfo:	Linux amd64 4.14.129-bbrplus
	cpu num:	2

	totalPhysicalMemory:	-0.00G
	freePhysicalMemory:	-0.00G
	maxFileDescriptorCount:	-1
	currentOpenFileDescriptorCount:	-1

	GC Names	[PS MarkSweep, PS Scavenge]

	MEMORY_NAME                    | allocation_size                | init_size                      
	PS Eden Space                  | 256.00MB                       | 256.00MB                       
	Code Cache                     | 240.00MB                       | 2.44MB                         
	Compressed Class Space         | 1,024.00MB                     | 0.00MB                         
	PS Survivor Space              | 42.50MB                        | 42.50MB                        
	PS Old Gen                     | 683.00MB                       | 683.00MB                       
	Metaspace                      | -0.00MB                        | 0.00MB                         


2021-11-21 19:42:31.198 [main] INFO  Engine - 
{
	"content":[
		{
			"reader":{
				"name":"streamreader",
				"parameter":{
					"column":[
						{
							"type":"string",
							"value":"DataX"
						},
						{
							"type":"long",
							"value":19890604
						},
						{
							"type":"date",
							"value":"1989-06-04 00:00:00"
						},
						{
							"type":"bool",
							"value":true
						},
						{
							"type":"bytes",
							"value":"test"
						}
					],
					"sliceRecordCount":100000
				}
			},
			"writer":{
				"name":"streamwriter",
				"parameter":{
					"encoding":"UTF-8",
					"print":false
				}
			}
		}
	],
	"setting":{
		"errorLimit":{
			"percentage":0.02,
			"record":0
		},
		"speed":{
			"byte":10485760
		}
	}
}

2021-11-21 19:42:31.223 [main] WARN  Engine - prioriy set to 0, because NumberFormatException, the value is: null
2021-11-21 19:42:31.225 [main] INFO  PerfTrace - PerfTrace traceId=job_-1, isEnable=false, priority=0
2021-11-21 19:42:31.226 [main] INFO  JobContainer - DataX jobContainer starts job.
2021-11-21 19:42:31.228 [main] INFO  JobContainer - Set jobId = 0
2021-11-21 19:42:31.244 [job-0] INFO  JobContainer - jobContainer starts to do prepare ...
2021-11-21 19:42:31.245 [job-0] INFO  JobContainer - DataX Reader.Job [streamreader] do prepare work .
2021-11-21 19:42:31.246 [job-0] INFO  JobContainer - DataX Writer.Job [streamwriter] do prepare work .
2021-11-21 19:42:31.246 [job-0] INFO  JobContainer - jobContainer starts to do split ...
2021-11-21 19:42:31.247 [job-0] INFO  JobContainer - Job set Max-Byte-Speed to 10485760 bytes.
2021-11-21 19:42:31.248 [job-0] INFO  JobContainer - DataX Reader.Job [streamreader] splits to [1] tasks.
2021-11-21 19:42:31.249 [job-0] INFO  JobContainer - DataX Writer.Job [streamwriter] splits to [1] tasks.
2021-11-21 19:42:31.273 [job-0] INFO  JobContainer - jobContainer starts to do schedule ...
2021-11-21 19:42:31.283 [job-0] INFO  JobContainer - Scheduler starts [1] taskGroups.
2021-11-21 19:42:31.285 [job-0] INFO  JobContainer - Running by standalone Mode.
2021-11-21 19:42:31.299 [taskGroup-0] INFO  TaskGroupContainer - taskGroupId=[0] start [1] channels for [1] tasks.
2021-11-21 19:42:31.303 [taskGroup-0] INFO  Channel - Channel set byte_speed_limit to -1, No bps activated.
2021-11-21 19:42:31.303 [taskGroup-0] INFO  Channel - Channel set record_speed_limit to -1, No tps activated.
2021-11-21 19:42:31.316 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] taskId[0] attemptCount[1] is started
2021-11-21 19:42:31.416 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] taskId[0] is successed, used[103]ms
2021-11-21 19:42:31.417 [taskGroup-0] INFO  TaskGroupContainer - taskGroup[0] completed it's tasks.
2021-11-21 19:42:41.316 [job-0] INFO  StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.043s |  All Task WaitReaderTime 0.054s | Percentage 100.00%
2021-11-21 19:42:41.316 [job-0] INFO  AbstractScheduler - Scheduler accomplished all tasks.
2021-11-21 19:42:41.316 [job-0] INFO  JobContainer - DataX Writer.Job [streamwriter] do post work.
2021-11-21 19:42:41.317 [job-0] INFO  JobContainer - DataX Reader.Job [streamreader] do post work.
2021-11-21 19:42:41.318 [job-0] INFO  JobContainer - DataX jobId [0] completed successfully.
2021-11-21 19:42:41.320 [job-0] INFO  HookInvoker - No hook invoked, because base dir not exists or is a file: /opt/datax/hook
2021-11-21 19:42:41.321 [job-0] INFO  JobContainer - 
	 [total cpu info] => 
		averageCpu                     | maxDeltaCpu                    | minDeltaCpu                    
		-1.00%                         | -1.00%                         | -1.00%
                        

	 [total gc info] => 
		 NAME                 | totalGCCount       | maxDeltaGCCount    | minDeltaGCCount    | totalGCTime        | maxDeltaGCTime     | minDeltaGCTime     
		 PS MarkSweep         | 0                  | 0                  | 0                  | 0.000s             | 0.000s             | 0.000s             
		 PS Scavenge          | 0                  | 0                  | 0                  | 0.000s             | 0.000s             | 0.000s             

2021-11-21 19:42:41.321 [job-0] INFO  JobContainer - PerfTrace not enable!
2021-11-21 19:42:41.321 [job-0] INFO  StandAloneJobContainerCommunicator - Total 100000 records, 2600000 bytes | Speed 253.91KB/s, 10000 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.043s |  All Task WaitReaderTime 0.054s | Percentage 100.00%
2021-11-21 19:42:41.328 [job-0] INFO  JobContainer - 
任务启动时刻                    : 2021-11-21 19:42:31
任务结束时刻                    : 2021-11-21 19:42:41
任务总计耗时                    :                 10s
任务平均流量                    :          253.91KB/s
记录写入速度                    :          10000rec/s
读出记录总数                    :              100000
读写失败总数                    :                   0

DataX 使用

datax 的运行没问题了,但是我如何用 datax 去实现 mysql 导出到 csv 这个功能呢。

这我们就要去看官方文档了,文档写清楚了每个支持的数据库的 json 写法。比如我目前要想实现 mysql -> csv。就要去看 mysqlreadertxtfilewriter 的文档。然后在内网进行导入就需要看 txtfilereadermysqlwriter 的文档。官网页面上有显示支持的数据源。

以导出为例,最后写好的 job 任务大概是这样的:

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "mysqlreader", 
          "parameter": {
            "column": ["*"], 
            "connection": [
              {
                "jdbcUrl": ["jdbc:mysql://192.168.0.2:3306/test"], 
                "table": ["user"]
              }
            ], 
            "password": "root", 
            "username": "root" 
          }
        }, 
        "writer": {
          "name": "txtfilewriter", 
          "parameter": {
            "path": "/home",
            "fileName": "result",
            "writeMode": "truncate",
            "fileFormat": "csv",
            "dateFormat": "yyyy-MM-dd HH:mm:ss" 
          }
        }
      }
    ], 
    "setting": {
      "speed": {
        "channel": "1"
      }
    }
  }
}

然后我们把它保存成一个 mysql2csv.json 文件放在 job 目录,然后执行:

./bin/datax.py ./job/mysql2csv.json

就会开始执行我们定义的任务。reader 和 writer 里面具体每个字段是什么用,文档里面都有的,可以自己详细去看。

DataX 的使用还是很简单的,我主要在前期找 python2.7 和 jdk1.8 环境的镜像花了一点时间。

DataX-web 的安装

datax-web 项目提供了 UI 界面去定义 datax 任务。我这里使用的是 WeiYe-Jing/datax-web

安装的话其实按照官方的安装步骤来就行了。依赖的环境和 datax 一样,也是 python 2.6+ 和 jdk1.8 版本。还需要一个额外的 mysql 数据库,用来记录 web 页面产生的一些数据。比如 datax-web 项目额外提供了增量导出导入的功能,是 web 项目本身记录了上一次最后的导出 id 或者导出时间信息。

注意:mysql 必须是 5.6、5.7 版本的 mysql,因为需要 mysql 初始化后导入官方的 datax-web.sql 脚本,而我在 mysql:8.0.18 版本上试了导入会出错,应该是 sql 语句不兼容的问题,这需要注意。

使用官方提供的 datax-web.sql 脚本初始化 mysql 数据库。其他按照官方的文档来,最后执行 ./bin/start-all.sh 脚本就行了。

start-all.sh 脚本会启动两个服务,datax-admindatax-executor,admin 就是管理后台,executor 是具体执行数据同步任务的,这和 CI 的流程有点像。

start-all.sh 启动的服务都是后台启动的。

服务启动后,在浏览器中输入 http://ip:port/index.html 就能看到页面了。

注意:url 后面的 index.html 必须带上。初始账号是 admin / 123456

DataX-web 的使用

登进去主页之后,我们可以看到左边导航栏有 项目管理,任务管理,数据源管理等等。

接下来按照我说的流程一个个的创建。

1) 首先 项目管理 -> 添加,创建一个项目。

2) 数据源管理 -> 添加 创建数据源。

3) 任务管理 -> DataX 任务模版 -> 添加 先创建模板。

4) 任务管理 -> 任务构建 创建任务。

大致是分为这四步,任务创建好之后可以在任务管理里面看到,这里的 状态 那一栏初始是红色,是关闭的。点一下启用,就会变成绿色,这时设置的 cron 才会生效。

然后编辑任务里面的 辅助参数,和数据库的表结构配合可以实现增量更新。可以选择 主键自增时间自增 等方式。比如选择了主键自增,刚开始我们可以设置一个 0,每执行过一次任务之后,该 id 就会被更新为上一次的任务触发时最大的 id,任务失败不更新。时间自增 也是同理,会自动更新为上一次触发任务的时间。

然后我们点击 执行一次,主动触发任务,然后点击 查询日志,可以查看当前任务的日志和历史任务日志。

这里有很多页面需要手动执行刷新,数据才会出来。

datax-web 的基本使用差不多是这样。

hqqsk8/datax-web:2.1.2

由于前期安装和环境配置花了很多时间,所以我做了一个 datax-web 镜像推到 dockerhub 上了。这个镜像内部自带了 datax 和 datax-web 项目,直接 run 起来就可以跑了。由于需要 mysql,所以我写了个 docker-compose.yml,地址在xxx。直接拉下来 docker-compose up -d 就可以跑,然后访问浏览器 localhost:9527 端口就行了。

FAQ

datax-web 项目的数据源没有我想要的怎么办?

data-web UI 界面上的数据源其实很少,但是我们在创建任务后,会生成一个 json 文件,其实就是 datax 需要的任务文件,我们可以直接改那个 json 配置达到目的。虽然 datax-web 不支持,但是 datax 是支持的。我试过也能执行成功。

同理,我们如果有其他的需求但是 web 项目不支持的,我们可以直接修改 json 配置。

具体是在 任务管理 -> 操作 -> 编辑 里面修改。

mysql 类型的数据源连不上,权限验证没通过错误

这是因为官方的 datax 只支持 5.x 版本的 mysql 链接(好像是),我之前的是 mysql 8.0.18 版本的,一直连不上。需要自己下载 mysql-connector-java-8.0.18.jar 包放到 datax/plugin/{reader|writer}/{mysqlreader|mysqlwriter}/libs/ 目录就行了。