Sqoop Engine usage documentation

    The Sqoop engine mainly depends on the Hadoop basic environment. If the node needs to deploy the Sqoop engine, the Hadoop client environment needs to be deployed.

    It is strongly recommended that you use the native Sqoop to execute the test task on the node before executing the Sqoop task to check whether the node environment is normal.

    表1-1 环境配置清单

    Linkis Parameter NameParameter ContentRemark
    wds.linkis.hadoop.site.xmlSet sqoop to load hadoop parameter file locationRequired,Reference example:”/etc/hadoop/conf/core-site.xml;/etc/hadoop/conf/hdfs-site.xml;/etc/hadoop/conf/yarn-site.xml;/etc/hadoop/conf/mapred-site.xml”
    sqoop.fetch.status.intervalSet the interval time for obtaining sqoop execution statusNot required, the default value is 5s

    Note: Before compiling the sqoop engine, the linkis project needs to be fully compiled

    The installation method is to compile the compiled engine package, located in

    and then deploy to

    1. cd ${LINKIS_HOME}/sbin
    2. sh linkis-daemon.sh restart cg-engineplugin

    More engineplugin details can be found in the following article.
    https://linkis.apache.org/zh-CN/docs/1.1.1/deployment/engine-conn-plugin-installation

    OnceEngineConn is used by calling LinkisManager’s createEngineConn interface through LinkisManagerClient, and sending the code to the created Sqoop engine, and then the Sqoop engine starts to execute. This method can be called by other systems, such as Exchange. The use of Client is also very simple, first create a new maven project, or introduce the following dependencies into your project

    Test Case:

    1. package com.webank.wedatasphere.exchangis.job.server.log.client
    2. import java.util.concurrent.TimeUnit
    3. import java.util
    4. import org.apache.linkis.computation.client.LinkisJobBuilder
    5. import org.apache.linkis.computation.client.once.simple.{SimpleOnceJob, SimpleOnceJobBuilder, SubmittableSimpleOnceJob}
    6. import org.apache.linkis.computation.client.operator.impl.{EngineConnLogOperator, EngineConnMetricsOperator, EngineConnProgressOperator}
    7. import org.apache.linkis.computation.client.utils.LabelKeyUtils
    8. import scala.collection.JavaConverters._
    9. object SqoopOnceJobTest extends App {
    10. LinkisJobBuilder.setDefaultServerUrl("http://127.0.0.1:9001")
    11. val logPath = "C:\\Users\\resources\\log4j.properties"
    12. System.setProperty("log4j.configurationFile", logPath)
    13. val startUpMap = new util.HashMap[String, Any]
    14. startUpMap.put("wds.linkis.engineconn.java.driver.memory", "1g")
    15. val builder = SimpleOnceJob.builder().setCreateService("Linkis-Client")
    16. .addLabel(LabelKeyUtils.ENGINE_TYPE_LABEL_KEY, "sqoop-1.4.6")
    17. .addLabel(LabelKeyUtils.USER_CREATOR_LABEL_KEY, "Client")
    18. .setStartupParams(startUpMap)
    19. .setMaxSubmitTime(30000)
    20. .addExecuteUser("freeuser")
    21. val onceJob = importJob(builder)
    22. onceJob.submit()
    23. println(onceJob.getId)
    24. val logOperator = onceJob.getOperator(EngineConnLogOperator.OPERATOR_NAME).asInstanceOf[EngineConnLogOperator]
    25. println(onceJob.getECMServiceInstance)
    26. logOperator.setFromLine(0)
    27. logOperator.setECMServiceInstance(onceJob.getECMServiceInstance)
    28. logOperator.setEngineConnType("sqoop")
    29. logOperator.setIgnoreKeywords("[main],[SpringContextShutdownHook]")
    30. var progressOperator = onceJob.getOperator(EngineConnProgressOperator.OPERATOR_NAME).asInstanceOf[EngineConnProgressOperator]
    31. var metricOperator = onceJob.getOperator(EngineConnMetricsOperator.OPERATOR_NAME).asInstanceOf[EngineConnMetricsOperator]
    32. var end = false
    33. var rowBefore = 1
    34. while (!end || rowBefore > 0){
    35. if(onceJob.isCompleted) {
    36. end = true
    37. metricOperator = null
    38. }
    39. logOperator.setPageSize(100)
    40. Utils.tryQuietly{
    41. val logs = logOperator.apply()
    42. logs.logs.asScala.foreach( log => {
    43. println(log)
    44. })
    45. rowBefore = logs.logs.size
    46. }
    47. Thread.sleep(3000)
    48. Option(metricOperator).foreach( operator => {
    49. if (!onceJob.isCompleted){
    50. println(s"Metric Monitor: ${operator.apply()}")
    51. println(s"Progress: ${progressOperator.apply()}")
    52. }
    53. })
    54. }
    55. onceJob.waitForCompleted()
    56. println(onceJob.getStatus)
    57. System.exit(0)
    58. def importJob(jobBuilder: SimpleOnceJobBuilder): SubmittableSimpleOnceJob = {
    59. jobBuilder
    60. .addJobContent("sqoop.env.mapreduce.job.queuename", "queue_10")
    61. .addJobContent("sqoop.mode", "import")
    62. .addJobContent("sqoop.args.connect", "jdbc:mysql://127.0.0.1:3306/exchangis")
    63. .addJobContent("sqoop.args.username", "free")
    64. .addJobContent("sqoop.args.password", "testpwd")
    65. .addJobContent("sqoop.args.query", "select id as order_number, sno as time from" +
    66. " exchangis where sno =1 and $CONDITIONS")
    67. .addJobContent("sqoop.args.hcatalog.database", "freedb")
    68. .addJobContent("sqoop.args.hcatalog.table", "zy_test")
    69. .addJobContent("sqoop.args.hcatalog.partition.keys", "month")
    70. .addJobContent("sqoop.args.hcatalog.partition.values", "3")
    71. .addJobContent("sqoop.args.num.mappers", "1")
    72. .build()
    73. }
    74. def exportJob(jobBuilder: SimpleOnceJobBuilder): SubmittableSimpleOnceJob = {
    75. jobBuilder
    76. .addJobContent("sqoop.env.mapreduce.job.queuename", "queue1")
    77. .addJobContent("sqoop.mode", "import")
    78. .addJobContent("sqoop.args.connect", "jdbc:mysql://127.0.0.1:3306/exchangis")
    79. .addJobContent("sqoop.args.query", "select id as order, sno as great_time from" +
    80. " exchangis_table where sno =1 and $CONDITIONS")
    81. .addJobContent("sqoop.args.hcatalog.database", "hadoop")
    82. .addJobContent("sqoop.args.hcatalog.table", "partition_33")
    83. .addJobContent("sqoop.args.hcatalog.partition.keys", "month")
    84. .addJobContent("sqoop.args.hcatalog.partition.values", "4")
    85. .addJobContent("sqoop.args.num.mappers", "1")