2023年11月29日发(作者:)
通过java调⽤datax,返回任务执⾏详情
(如有错漏之处,敬请指正)
DATAX
DataX 是阿⾥巴巴集团内被⼴泛使⽤的离线数据同步⼯具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、
ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间⾼效的数据同步功能。
datax的详细介绍
请参考
public static void entry(final String[] args) throws Throwable {
Options options = new Options();
ion("job", true, "Job config.");
ion("jobid", true, "Job unique id.");
ion("mode", true, "Job runtime mode.");
BasicParser parser = new BasicParser();
CommandLine cl = (options, args);
String jobPath = ionValue("job");
// 如果⽤户没有明确指定jobid, 则 会指定 jobid 默认值为-1
String jobIdString = ionValue("jobid");
RUNTIME_MODE = ionValue("mode");
Configuration configuration = (jobPath);
long jobId;
if (!"-1".equalsIgnoreCase(jobIdString)) {
jobId = ong(jobIdString);
} else {
// only for dsc & ds & datax 3 update
String dscJobUrlPatternString = "/instance/(d{1,})/";
String dsJobUrlPatternString = "/inner/job/(d{1,})/config";
String dsTaskGroupUrlPatternString = "/inner/job/(d{1,})/taskGroup/";
List
dsJobUrlPatternString, dsTaskGroupUrlPatternString);
jobId = parseJobIdFromUrl(patternStringList, jobPath);
ck();
} else {
userConf = ();
("jobContainer starts to do preHandle ...");
dle();
("jobContainer starts to do init ...");
();
("jobContainer starts to do prepare ...");
e();
("jobContainer starts to do split ...");
tage = ();
("jobContainer starts to do schedule ...");
le();
("jobContainer starts to do post ...");
();
("jobContainer starts to do postHandle ...");
VMInfo vmInfo = nfo();
if (vmInfo != null) {
ta(false);
(tring());
}
(tance().summarizeNoException());
f(alReadRecords(communication)),
"读写失败总数",
f(alErrorRecords(communication))
));
("task-total-info:" + (startTimeStamp) + "|" +
(endTimeStamp) + "|" +
f(totalCosts) + "|" +
重写logStatistics⽅法,返回该实体。
在nstant⾥修改datax_home 为本地路径
执⾏结果为


发布评论