Sqoop Engine usage documentation
The Sqoop engine mainly depends on the Hadoop basic environment. If the node needs to deploy the Sqoop engine, the Hadoop client environment needs to be deployed.
It is strongly recommended that you use the native Sqoop to execute the test task on the node before executing the Sqoop task to check whether the node environment is normal.
表1-1 环境配置清单
Linkis Parameter Name | Parameter Content | Remark |
---|---|---|
wds.linkis.hadoop.site.xml | Set sqoop to load hadoop parameter file location | Required,Reference example:”/etc/hadoop/conf/core-site.xml;/etc/hadoop/conf/hdfs-site.xml;/etc/hadoop/conf/yarn-site.xml;/etc/hadoop/conf/mapred-site.xml” |
sqoop.fetch.status.interval | Set the interval time for obtaining sqoop execution status | Not required, the default value is 5s |
Note: Before compiling the sqoop engine, the linkis project needs to be fully compiled
The installation method is to compile the compiled engine package, located in
and then deploy to
cd ${LINKIS_HOME}/sbin
sh linkis-daemon.sh restart cg-engineplugin
More engineplugin details can be found in the following article.
https://linkis.apache.org/zh-CN/docs/1.1.1/deployment/engine-conn-plugin-installation
OnceEngineConn is used by calling LinkisManager’s createEngineConn interface through LinkisManagerClient, and sending the code to the created Sqoop engine, and then the Sqoop engine starts to execute. This method can be called by other systems, such as Exchange. The use of Client is also very simple, first create a new maven project, or introduce the following dependencies into your project
Test Case:
package com.webank.wedatasphere.exchangis.job.server.log.client
import java.util.concurrent.TimeUnit
import java.util
import org.apache.linkis.computation.client.LinkisJobBuilder
import org.apache.linkis.computation.client.once.simple.{SimpleOnceJob, SimpleOnceJobBuilder, SubmittableSimpleOnceJob}
import org.apache.linkis.computation.client.operator.impl.{EngineConnLogOperator, EngineConnMetricsOperator, EngineConnProgressOperator}
import org.apache.linkis.computation.client.utils.LabelKeyUtils
import scala.collection.JavaConverters._
object SqoopOnceJobTest extends App {
LinkisJobBuilder.setDefaultServerUrl("http://127.0.0.1:9001")
val logPath = "C:\\Users\\resources\\log4j.properties"
System.setProperty("log4j.configurationFile", logPath)
val startUpMap = new util.HashMap[String, Any]
startUpMap.put("wds.linkis.engineconn.java.driver.memory", "1g")
val builder = SimpleOnceJob.builder().setCreateService("Linkis-Client")
.addLabel(LabelKeyUtils.ENGINE_TYPE_LABEL_KEY, "sqoop-1.4.6")
.addLabel(LabelKeyUtils.USER_CREATOR_LABEL_KEY, "Client")
.setStartupParams(startUpMap)
.setMaxSubmitTime(30000)
.addExecuteUser("freeuser")
val onceJob = importJob(builder)
onceJob.submit()
println(onceJob.getId)
val logOperator = onceJob.getOperator(EngineConnLogOperator.OPERATOR_NAME).asInstanceOf[EngineConnLogOperator]
println(onceJob.getECMServiceInstance)
logOperator.setFromLine(0)
logOperator.setECMServiceInstance(onceJob.getECMServiceInstance)
logOperator.setEngineConnType("sqoop")
logOperator.setIgnoreKeywords("[main],[SpringContextShutdownHook]")
var progressOperator = onceJob.getOperator(EngineConnProgressOperator.OPERATOR_NAME).asInstanceOf[EngineConnProgressOperator]
var metricOperator = onceJob.getOperator(EngineConnMetricsOperator.OPERATOR_NAME).asInstanceOf[EngineConnMetricsOperator]
var end = false
var rowBefore = 1
while (!end || rowBefore > 0){
if(onceJob.isCompleted) {
end = true
metricOperator = null
}
logOperator.setPageSize(100)
Utils.tryQuietly{
val logs = logOperator.apply()
logs.logs.asScala.foreach( log => {
println(log)
})
rowBefore = logs.logs.size
}
Thread.sleep(3000)
Option(metricOperator).foreach( operator => {
if (!onceJob.isCompleted){
println(s"Metric Monitor: ${operator.apply()}")
println(s"Progress: ${progressOperator.apply()}")
}
})
}
onceJob.waitForCompleted()
println(onceJob.getStatus)
System.exit(0)
def importJob(jobBuilder: SimpleOnceJobBuilder): SubmittableSimpleOnceJob = {
jobBuilder
.addJobContent("sqoop.env.mapreduce.job.queuename", "queue_10")
.addJobContent("sqoop.mode", "import")
.addJobContent("sqoop.args.connect", "jdbc:mysql://127.0.0.1:3306/exchangis")
.addJobContent("sqoop.args.username", "free")
.addJobContent("sqoop.args.password", "testpwd")
.addJobContent("sqoop.args.query", "select id as order_number, sno as time from" +
" exchangis where sno =1 and $CONDITIONS")
.addJobContent("sqoop.args.hcatalog.database", "freedb")
.addJobContent("sqoop.args.hcatalog.table", "zy_test")
.addJobContent("sqoop.args.hcatalog.partition.keys", "month")
.addJobContent("sqoop.args.hcatalog.partition.values", "3")
.addJobContent("sqoop.args.num.mappers", "1")
.build()
}
def exportJob(jobBuilder: SimpleOnceJobBuilder): SubmittableSimpleOnceJob = {
jobBuilder
.addJobContent("sqoop.env.mapreduce.job.queuename", "queue1")
.addJobContent("sqoop.mode", "import")
.addJobContent("sqoop.args.connect", "jdbc:mysql://127.0.0.1:3306/exchangis")
.addJobContent("sqoop.args.query", "select id as order, sno as great_time from" +
" exchangis_table where sno =1 and $CONDITIONS")
.addJobContent("sqoop.args.hcatalog.database", "hadoop")
.addJobContent("sqoop.args.hcatalog.table", "partition_33")
.addJobContent("sqoop.args.hcatalog.partition.keys", "month")
.addJobContent("sqoop.args.hcatalog.partition.values", "4")
.addJobContent("sqoop.args.num.mappers", "1")