2023年11月29日发(作者:)

通过java调⽤datax,返回任务执⾏详情

(如有错漏之处,敬请指正)

DATAX

DataX 是阿⾥巴巴集团内被⼴泛使⽤的离线数据同步⼯具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、

ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间⾼效的数据同步功能。

datax的详细介绍

请参考

public static void entry(final String[] args) throws Throwable {

Options options = new Options();

ion("job", true, "Job config.");

ion("jobid", true, "Job unique id.");

ion("mode", true, "Job runtime mode.");

BasicParser parser = new BasicParser();

CommandLine cl = (options, args);

String jobPath = ionValue("job");

// 如果⽤户没有明确指定jobid, 会指定 jobid 默认值为-1

String jobIdString = ionValue("jobid");

RUNTIME_MODE = ionValue("mode");

Configuration configuration = (jobPath);

long jobId;

if (!"-1".equalsIgnoreCase(jobIdString)) {

jobId = ong(jobIdString);

} else {

// only for dsc & ds & datax 3 update

String dscJobUrlPatternString = "/instance/(d{1,})/";

String dsJobUrlPatternString = "/inner/job/(d{1,})/config";

String dsTaskGroupUrlPatternString = "/inner/job/(d{1,})/taskGroup/";

List patternStringList = (dscJobUrlPatternString,

dsJobUrlPatternString, dsTaskGroupUrlPatternString);

jobId = parseJobIdFromUrl(patternStringList, jobPath);

ck();

} else {

userConf = ();

("jobContainer starts to do preHandle ...");

dle();

("jobContainer starts to do init ...");

();

("jobContainer starts to do prepare ...");

e();

("jobContainer starts to do split ...");

tage = ();

("jobContainer starts to do schedule ...");

le();

("jobContainer starts to do post ...");

();

("jobContainer starts to do postHandle ...");

VMInfo vmInfo = nfo();

if (vmInfo != null) {

ta(false);

(tring());

}

(tance().summarizeNoException());

f(alReadRecords(communication)),

"读写失败总数",

f(alErrorRecords(communication))

));

("task-total-info:" + (startTimeStamp) + "|" +

(endTimeStamp) + "|" +

f(totalCosts) + "|" +

重写logStatistics⽅法,返回该实体。

在nstant⾥修改datax_home 为本地路径

执⾏结果为