Skip to content

Latest commit

 

History

History
66 lines (53 loc) · 1.81 KB

README.md

File metadata and controls

66 lines (53 loc) · 1.81 KB

datax-on-yarn

datax-on-yarn可以让datax在yarn master上运行

提交方式

shell

  • datax_home_hdfs datax在hdfs的安装包
  • datax_job 配置json
  • yarn master_memory内存为yarn master与datax job内存之和
/usr/bin/yarn jar /mnt/dss/211/datax-on-yarn-1.0.0.jar com.on.yarn.Client \
  -jar_path /mnt/dss/211/datax-on-yarn-1.0.0.jar \
  -appname datax-job \
  -master_memory 1024 \
  -p dt=20200324,pt=20200324 \
  -queue default \
  -proxy_user  hanmin.du \
  -datax_job /mnt/dss/datax/job/t2.json \
  -datax_home_hdfs /tmp/linkis/hadoop/datax.tar.gz

sdk api(scala)

  • JobLogger类型重写com.on.yarn.base.YarnManipulator日志输出接口
  • dataxJob中传入运行参数
  • 引入以下依赖
<dependency>
    <groupId>com.on.yarn</groupId>
    <artifactId>datax-client</artifactId>
    <version>1.0.0</version>
</dependency>
val jobLogger = new JobLogger(job)
var client: Client = null
try {
  val cmd = dataxJob.toStrinArray
  jobLogger.info("------------------运行参数: " + ArrayUtil.toString(cmd))
  client = new Client(jobLogger)
  if (!client.init(cmd)) throw new RuntimeException("参数初始化异常: " + dataxJob)
  val applicationId: ApplicationId = client.run
  appId = applicationId.toString
  jobLogger.info("------------------DataX yarn id: " + applicationId.toString)
  val result = client.monitorApplication(applicationId)
  if (result) jobLogger.info("Application completed successfully")
  else throw new RuntimeException("任务运行异常,详见日志,AppID: " + applicationId)
} catch {
  case e: Exception => {
    jobLogger.info(ExceptionUtil.stacktraceToString(e))
  }
} finally {
  if (null != client) client.stop()
}

运行示例

image