Sqoop 引擎

    Sqoop引擎主要依赖Hadoop基础环境,如果该节点需要部署Sqoop引擎,需要部署Hadoop客户端环境。

    强烈建议您在执行Sqoop任务之前,先在该节点使用原生的Sqoop执行测试任务,以检测该节点环境是否正常。

    表1-1 环境配置清单

    Linkis系统参数参数备注
    wds.linkis.hadoop.site.xml设置sqoop加载hadoop参数文件位置一般不需要单独配置,默认值”core-site.xml;hdfs-site.xml;yarn-site.xml;mapred-site.xml”
    sqoop.fetch.status.interval设置获取sqoop执行状态的间隔时间一般不需要单独配置,默认值为5s

    Linkis 1.1.2及以上支持的主流Sqoop版本1.4.6与1.4.7,更高版本可能需要修改部分代码重新编译。

    2.2 Sqoop engineConn部署和加载

    1. ${linkis_code_dir}/linkis-engineconn-plugins/engineconn-plugins/sqoop/
    2. mvn clean install

    安装方式是将编译出来的引擎包,位置在

    1. ${linkis_code_dir}/linkis-enginepconn-plugins/engineconn-plugins/sqoop/target/sqoop-engineconn.zip

    然后上传部署到linkis服务器

    并重启linkis-engineplugin

    1. cd ${LINKIS_HOME}/sbin
    2. sh linkis-daemon.sh restart cg-engineplugin

    engineplugin更详细的介绍可以参看下面的文章。

    3.1 通过Linkis-cli进行任务提交

    hdfs文件导出到mysql

    1. sh linkis-cli-sqoop export \
    2. -D mapreduce.job.queuename=ide \
    3. --connect jdbc:mysql://10.10.10.10:9600/testdb \
    4. --username password@123 \
    5. --password password@123 \
    6. --table test_sqoop_01_copy \
    7. --columns user_id,user_code,user_name,email,status \
    8. --export-dir /user/hive/warehouse/hadoop/test_linkis_sqoop_2 \
    9. --update-mode allowinsert --verbose ;

    mysql数据导入到hive库

    3.1 OnceEngineConn方式

    OnceEngineConn的使用方式是通过LinkisManagerClient调用LinkisManager的createEngineConn的接口,并将代码发给创建的Sqoop引擎,然后Sqoop引擎就开始执行,此方式可以被其他系统进行调用,比如Exchangis。Client的使用方式也很简单,首先新建一个maven项目,或者在您的项目中引入以下的依赖

    1. <dependency>
    2. <groupId>org.apache.linkis</groupId>
    3. <artifactId>linkis-computation-client</artifactId>
    4. <version>${linkis.version}</version>
    5. </dependency>

    测试用例:

    1. import java.util.concurrent.TimeUnit
    2. import java.util
    3. import org.apache.linkis.computation.client.once.simple.{SimpleOnceJob, SimpleOnceJobBuilder, SubmittableSimpleOnceJob}
    4. import org.apache.linkis.computation.client.operator.impl.{EngineConnLogOperator, EngineConnMetricsOperator, EngineConnProgressOperator}
    5. import org.apache.linkis.computation.client.utils.LabelKeyUtils
    6. import scala.collection.JavaConverters._
    7. object SqoopOnceJobTest extends App {
    8. LinkisJobBuilder.setDefaultServerUrl("http://127.0.0.1:9001")
    9. val logPath = "C:\\Users\\resources\\log4j.properties"
    10. System.setProperty("log4j.configurationFile", logPath)
    11. val startUpMap = new util.HashMap[String, Any]
    12. val builder = SimpleOnceJob.builder().setCreateService("Linkis-Client")
    13. .addLabel(LabelKeyUtils.ENGINE_TYPE_LABEL_KEY, "sqoop-1.4.6")
    14. .addLabel(LabelKeyUtils.USER_CREATOR_LABEL_KEY, "Client")
    15. .addLabel(LabelKeyUtils.ENGINE_CONN_MODE_LABEL_KEY, "once")
    16. .setStartupParams(startUpMap)
    17. .setMaxSubmitTime(30000)
    18. .addExecuteUser("freeuser")
    19. val onceJob = importJob(builder)
    20. val time = System.currentTimeMillis()
    21. onceJob.submit()
    22. println(onceJob.getId)
    23. val logOperator = onceJob.getOperator(EngineConnLogOperator.OPERATOR_NAME).asInstanceOf[EngineConnLogOperator]
    24. println(onceJob.getECMServiceInstance)
    25. logOperator.setFromLine(0)
    26. logOperator.setECMServiceInstance(onceJob.getECMServiceInstance)
    27. logOperator.setEngineConnType("sqoop")
    28. logOperator.setIgnoreKeywords("[main],[SpringContextShutdownHook]")
    29. var progressOperator = onceJob.getOperator(EngineConnProgressOperator.OPERATOR_NAME).asInstanceOf[EngineConnProgressOperator]
    30. var metricOperator = onceJob.getOperator(EngineConnMetricsOperator.OPERATOR_NAME).asInstanceOf[EngineConnMetricsOperator]
    31. var end = false
    32. var rowBefore = 1
    33. while (!end || rowBefore > 0){
    34. if(onceJob.isCompleted) {
    35. end = true
    36. metricOperator = null
    37. }
    38. logOperator.setPageSize(100)
    39. Utils.tryQuietly{
    40. val logs = logOperator.apply()
    41. logs.logs.asScala.foreach( log => {
    42. println(log)
    43. })
    44. }
    45. Thread.sleep(3000)
    46. Option(metricOperator).foreach( operator => {
    47. if (!onceJob.isCompleted){
    48. println(s"Metric监控: ${operator.apply()}")
    49. println(s"进度: ${progressOperator.apply()}")
    50. })
    51. }
    52. onceJob.isCompleted
    53. onceJob.waitForCompleted()
    54. println(onceJob.getStatus)
    55. println(TimeUnit.SECONDS.convert(System.currentTimeMillis() - time, TimeUnit.MILLISECONDS) + "s")
    56. System.exit(0)
    57. def importJob(jobBuilder: SimpleOnceJobBuilder): SubmittableSimpleOnceJob = {
    58. jobBuilder
    59. .addJobContent("sqoop.env.mapreduce.job.queuename", "queue_10")
    60. .addJobContent("sqoop.mode", "import")
    61. .addJobContent("sqoop.args.connect", "jdbc:mysql://127.0.0.1:3306/exchangis")
    62. .addJobContent("sqoop.args.username", "free")
    63. .addJobContent("sqoop.args.password", "testpwd")
    64. .addJobContent("sqoop.args.query", "select id as order_number, sno as time from" +
    65. " exchangis where sno =1 and $CONDITIONS")
    66. .addJobContent("sqoop.args.hcatalog.database", "freedb")
    67. .addJobContent("sqoop.args.hcatalog.table", "zy_test")
    68. .addJobContent("sqoop.args.hcatalog.partition.keys", "month")
    69. .addJobContent("sqoop.args.hcatalog.partition.values", "3")
    70. .addJobContent("sqoop.args.num.mappers", "1")
    71. .build()
    72. }
    73. def exportJob(jobBuilder: SimpleOnceJobBuilder): SubmittableSimpleOnceJob = {
    74. jobBuilder
    75. .addJobContent("sqoop.env.mapreduce.job.queuename", "queue1")
    76. .addJobContent("sqoop.mode", "import")
    77. .addJobContent("sqoop.args.connect", "jdbc:mysql://127.0.0.1:3306/exchangis")
    78. .addJobContent("sqoop.args.query", "select id as order, sno as great_time from" +
    79. " exchangis_table where sno =1 and $CONDITIONS")
    80. .addJobContent("sqoop.args.hcatalog.database", "hadoop")
    81. .addJobContent("sqoop.args.hcatalog.table", "partition_33")
    82. .addJobContent("sqoop.args.hcatalog.partition.keys", "month")
    83. .addJobContent("sqoop.args.hcatalog.partition.values", "4")
    84. .addJobContent("sqoop.args.num.mappers", "1")
    85. }

    4.1 Common arguments

    参数key说明
    sqoop.modeimport/export/…
    -Dmapreduce.job.queuenamesqoop.env.mapreduce.job.queuename
    —connect <jdbc-uri>sqoop.args.connectSpecify JDBC connect string
    —connection-manager <class-name>sqoop.args.connection.managerSpecify connection manager class name
    —connection-param-file <properties-file>sqoop.args.connection.param.fileSpecify connection parameters file
    —driver <class-name>sqoop.args.driverManually specify JDBC driver class to use
    —hadoop-home <hdir>sqoop.args.hadoop.homeOverride $HADOOP_MAPRED_HOME_ARG
    —hadoop-mapred-home <dir>sqoop.args.hadoop.mapred.homeOverride $HADOOP_MAPRED_HOME_ARG
    —helpsqoop.args.helpPrint usage instructions
    -PRead password from console
    —password <password>sqoop.args.passwordSet authentication password
    —password-alias <password-alias>sqoop.args.password.aliasCredential provider password alias
    —password-file <password-file>sqoop.args.password.fileSet authentication password file path
    —relaxed-isolationsqoop.args.relaxed.isolationUse read-uncommitted isolation for imports
    —skip-dist-cachesqoop.args.skip.dist.cacheSkip copying jars to distributed cache
    —username <username>sqoop.args.usernameSet authentication username
    —verbosesqoop.args.verbosePrint more information while working
    参数key说明
    —batchsqoop.args.batch Indicates underlying statements to be executed in batch mode
    —call <arg>sqoop.args.callPopulate the table using this stored procedure (one call  per row)
    —clear-staging-tablesqoop.args.clear.staging.tableIndicates that any data in staging table can be deleted
    —columns <col,col,col…>sqoop.args.columnsColumns to export to table
    —directsqoop.args.directUse direct export fast path
    —export-dir <dir>sqoop.args.export.dirHDFS source path for the export
    -m,—num-mappers <n>sqoop.args.num.mappersUse ‘n’ map tasks to export in parallel
    —mapreduce-job-name <name>sqoop.args.mapreduce.job.nameSet name for generated mapreduce job
    —staging-table <table-name>sqoop.args.staging.tableIntermediate staging  table
    —table <table-name>sqoop.args.tableTable to populate
    —update-key <key>sqoop.args.update.keyUpdate records by specified key column
    —update-mode <mode>sqoop.args.update.modeSpecifies how updates are  performed  when new   rows are  found with non-matching keys in database
    —validatesqoop.args.validateValidate the copy using the configured validator
    —validation-failurehandler <validation-failurehandler>sqoop.args.validation.failurehandlerValidate the  copy using the configured validator
    —validation-threshold <validation-threshold>sqoop.args.validation.threshold Fully qualified class name for ValidationThreshold
    —validator <validator>sqoop.args.validatorFully qualified class name for the Validator

    4.3 Import control arguments

    4.4 Incremental import argument

    参数key说明
    —check-column <column>sqoop.args.check.columnSource column to check for incremental change
    —incremental <import-type>sqoop.args.incrementalDefine an incremental import of type ‘append’ or ‘lastmodified’
    —last-value <value>sqoop.args.last.valueLast imported value in the incremental check column

    4.5 Output line formatting arguments

    参数key说明
    —enclosed-by <char>sqoop.args.enclosed.bySets a required field enclosing character
    —escaped-by <char>sqoop.args.escaped.bySets the escape character
    —fields-terminated-by <char>sqoop.args.fields.terminated.bySets the field separator character
    —lines-terminated-by <char>sqoop.args.lines.terminated.bySets the end-of-line character
    —mysql-delimiterssqoop.args.mysql.delimitersUses MySQL’s default delimiter set: fields: , lines: \n escaped-by: \ optionally-enclosed-by: ‘
    —optionally-enclosed-by <char>sqoop.args.optionally.enclosed.bySets a field enclosing character

    4.6 Input parsing arguments

    参数key说明
    —input-enclosed-by <char>sqoop.args.input.enclosed.bySets a required field encloser
    —input-escaped-by <char>sqoop.args.input.escaped.bySets the input escape character
    —input-fields-terminated-by <char>sqoop.args.input.fields.terminated.bySets the input field separator
    —input-lines-terminated-by <char>sqoop.args.input.lines.terminated.bySets the input end-of-line char
    —input-optionally-enclosed-by <char>sqoop.args.input.optionally.enclosed.bySets a field enclosing character

    4.8 HBase arguments

    参数key说明
    —column-family <family>sqoop.args.column.familySets the target column family for the import
    —hbase-bulkloadsqoop.args.hbase.bulkloadEnables HBase bulk loading
    —hbase-create-tablesqoop.args.hbase.create.tableIf specified, create missing HBase tables
    —hbase-row-key <col>sqoop.args.hbase.row.keySpecifies which input column to use as the row key
    —hbase-table <table>sqoop.args.hbase.tableImport to <table>in HBase

    4.9 HCatalog arguments

    参数key说明
    —hcatalog-database <arg>sqoop.args.hcatalog.databaseHCatalog database name
    —hcatalog-home <hdir>sqoop.args.hcatalog.homeOverride $HCAT_HOME
    —hcatalog-partition-keys <partition-key>sqoop.args.hcatalog.partition.keysSets the partition keys to use when importing to hive
    —hcatalog-partition-values <partition-value>sqoop.args.hcatalog.partition.valuesSets the partition values to use when importing to hive
    —hcatalog-table <arg>sqoop.args.hcatalog.tableHCatalog table name
    —hive-home <dir>sqoop.args.hive.homeOverride $HIVE_HOME
    —hive-partition-key <partition-key>sqoop.args.hive.partition.keySets the partition key to use when importing to hive
    —hive-partition-value <partition-value>sqoop.args.hive.partition.valueSets the partition value to use when importing to hive
    —map-column-hive <arg>sqoop.args.map.column.hiveOverride mapping for specific column to hive types.
    HCatalog import specific options:
    —create-hcatalog-tablesqoop.args.create.hcatalog.tableCreate HCatalog before import
    —hcatalog-storage-stanza <arg>sqoop.args.hcatalog.storage.stanzaHCatalog storage stanza for table creation

    4.10 Accumulo arguments

    参数key说明
    —accumulo-batch-size <size>sqoop.args.accumulo.batch.sizeBatch size in bytes
    —accumulo-column-family <family>sqoop.args.accumulo.column.familySets the target column family for the import
    —accumulo-create-tablesqoop.args.accumulo.create.tableIf specified, create missing Accumulo tables
    —accumulo-instance <instance>sqoop.args.accumulo.instanceAccumulo instance name.
    —accumulo-max-latency <latency>sqoop.args.accumulo.max.latencyMax write latency in milliseconds
    —accumulo-password <password>sqoop.args.accumulo.passwordAccumulo password.
    —accumulo-row-key <col>sqoop.args.accumulo.row.keySpecifies which input column to use as the row key
    —accumulo-table <table>sqoop.args.accumulo.tableImport to <table>in Accumulo
    —accumulo-user <user>sqoop.args.accumulo.userAccumulo user name.
    —accumulo-visibility <vis>sqoop.args.accumulo.visibilityVisibility token to be applied to all rows imported
    —accumulo-zookeepers <zookeepers>sqoop.args.accumulo.zookeepersComma-separated list of zookeepers (host:port)

    4.11 Code generation arguments

    参数key说明
    -conf <configuration file>sqoop.args.confspecify an application configuration file
    -D <property=value>sqoop.args.Duse value for given property
    -fs <localnamenode:port>sqoop.args.fs
    -jt <localresourcemanager:port>sqoop.args.jt
    -files <comma separated list of files>sqoop.args.filesspecify comma separated files to be copied to the map reduce cluster
    -libjars <comma separated list of jars>sqoop.args.libjarsspecify comma separated jar files to include in the classpath.
    -archives <comma separated list of archives>sqoop.args.archivesspecify comma separated archives to be unarchived on the compute machines.