Apache Atlas Hook for Apache Sqoop

1. Sqoop 模型

Sqoop模型包含以下类型(types):

  • Entity types:

    • sqoop_process
      • super-types: Process
      • attributes: qualifiedName, name, description, owner, inputs, outputs, operation, commandlineOpts, startTime, endTime, userName
    • sqoop_dbdatastore
      • super-types: DataSet
      • attributes: qualifiedName, name, description, owner, dbStoreType, storeUse, storeUri, source
  • Enum types:

    • sqoop_operation_type
      • values: IMPORT, EXPORT, EVAL
    • sqoop_dbstore_usage
      • values: TABLE, QUERY, PROCEDURE, OTHER

使用唯一属性qualifiedName在Atlas中创建和删除Sqoop实体,其值应格式如下所述。

sqoop_process.qualifiedName:     sqoop <operation> --connect <url> {[--table <tableName>] || [--database <databaseName>]} [--query <storeQuery>]
sqoop_dbdatastore.qualifiedName: <storeType> --url <storeUri> {[--table <tableName>] || [--database <databaseName>]} [--query <storeQuery>]  --hive-<operation> --hive-database <databaseName> [--hive-table <tableName>] --hive-cluster <clusterName>

2. Sqoop Hook

Sqoop添加了一个SqoopJobDataPublisher,它在完成导入Job后将数据发布到Atlas。如今,SqoopHook仅支持hiveImport,用来给上述的模型在Atlas中添加实体。 添加以下属性以在Sqoop中启用Atlas hook:

  • 通过添加以下内容在<sqoop-conf>/sqoop-site.xml中设置Atlas hook: ` <property> <name>sqoop.job.data.publish.class</name> <value>org.apache.atlas.sqoop.hook.SqoopHook</value> </property>
  • 解压apache-atlas-${project.version}-sqoop-hook.tar.gz
  • cd apache-atlas-sqoop-hook-${project.version}
  • 将文件夹apache-atlas-sqoop-hook-${project.version}/hook/sqoop的全部内容复制到<atlas package>/hook/sqoop
  • <atlas-conf>/atlas-application.properties复制到sqoop conf目录``<sqoop-conf>/
  • 在sqoop lib中链接<atlas package>/hook/sqoop/*.jar

atlas-application.properties中的以下属性控制线程池和通知详细信息:

atlas.hook.sqoop.synchronous=false # whether to run the hook synchronously. false recommended to avoid delays in Sqoop operation completion. Default: false
atlas.hook.sqoop.numRetries=3      # number of retries for notification failure. Default: 3
atlas.hook.sqoop.queueSize=10000   # queue size for the threadpool. Default: 10000

atlas.cluster.name=primary # clusterName to use in qualifiedName of entities. Default: primary

atlas.kafka.zookeeper.connect=                    # Zookeeper connect URL for Kafka. Example: localhost:2181
atlas.kafka.zookeeper.connection.timeout.ms=30000 # Zookeeper connection timeout. Default: 30000
atlas.kafka.zookeeper.session.timeout.ms=60000    # Zookeeper session timeout. Default: 60000
atlas.kafka.zookeeper.sync.time.ms=20             # Zookeeper sync time. Default: 20

可以通过在配置名称前加上“atlas.kafka”前缀来指定Kafka通知生成器的其他配置。有关Kafka制作人支持的配置列表,请参阅Kafka Producer Configs

3. 注意

目前只有sqoop hook捕获以下sqoop操作

  • hiveImport

results matching ""

    No results matching ""