atlas-Hook

Atlas Hook

Atlas 提供了关于 Hive、HBase、Kafka、Sqoop、Storm 和 Falcon 等组件的 Hook。

在安装包中已经含有 hook 和 hook-bin,不需要像官网所说,需要解压相关hook包,然后拷贝到安装目录。

Hive Hook

分批量导入 和 hook 实时跟踪

批量导入 hive 元数据

import_hive.sh 脚本在 安装包的 bin 目录下。所以还是需要将编译后的 hook 包进行解压,拷贝此脚本。

直接执行会报错,少依赖包。如下三个,下载后放到 $ATLAS_HOME/hook/hive/atlas-hive-plugin-impl/ 下。

wget https://repo1.maven.org/maven2/com/fasterxml/jackson/module/jackson-module-jaxb-annotations/2.9.8/jackson-module-jaxb-annotations-2.9.8.jar

wget https://repo1.maven.org/maven2/com/fasterxml/jackson/jaxrs/jackson-jaxrs-base/2.9.8/jackson-jaxrs-base-2.9.8.jar

wget https://repo1.maven.org/maven2/com/fasterxml/jackson/jaxrs/jackson-jaxrs-json-provider/2.9.8/jackson-jaxrs-json-provider-2.9.8.jar

之后再执行,执行日志位于 $ATLAS_HOME/logs/import_hive.log

hook 跟踪

atlas 提供了对应的 hive atlas hook 来进行跟踪 hive cli 里面的操作来进行元数据的更新。配置 hook 步骤如下:

  1. 在 hive-env.sh 里面增加如下内容:即添加 hive hook 的目录。

    1
    export HIVE_AUX_JARS_PATH=/data/atlas/apache-atlas-2.0.0/hook/hive
  2. 在 hive-site.xml 内增加配置:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    <property>
    <name>atlas.cluster.name</name>
    <value>primary</value>
    </property>

    <property>
    <name>atlas.hook.hive.maxThreads</name>
    <value>1</value>
    </property>

    <property>
    <name>atlas.hook.hive.minThreads</name>
    <value>1</value>
    </property>

    <property>
    <name>atlas.rest.address</name>
    <value>http://tmaster:21000</value>
    </property>

    # 必要配置,上面四项可不配。
    <property>
    <name>hive.exec.post.hooks</name>
    <value>org.apache.atlas.hive.hook.HiveHook</value>
    </property>
  3. 修改 atlas 配置文件 atlas-application.properties。如果不修改value,那么可以不增加这些配置

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    # hive hook 相关
    atlas.hook.hive.synchronous=false # whether to run the hook synchronously. false recommended to avoid delays in Hive query completion. Default: false
    atlas.hook.hive.numRetries=3 # number of retries for notification failure. Default: 3
    atlas.hook.hive.queueSize=10000 # queue size for the threadpool. Default: 10000
    atlas.cluster.name=primary # clusterName to use in qualifiedName of entities. Default: primary

    # kafka 相关,所有的kafka相关配置加上 atlas.kafka. 前缀即可生效。
    atlas.kafka.zookeeper.connect= # Zookeeper connect URL for Kafka. Example: localhost:2181
    atlas.kafka.zookeeper.connection.timeout.ms=30000 # Zookeeper connection timeout. Default: 30000
    atlas.kafka.zookeeper.session.timeout.ms=60000 # Zookeeper session timeout. Default: 60000
    atlas.kafka.zookeeper.sync.time.ms=20 # Zookeeper sync time. Default: 20
  4. 将 atlas-application.properties 拷贝到 hive conf 目录下,或者直接做个软链接

  5. 其他hive节点配置,需要 atlas-application.propertiesexport HIVE_AUX_JARS_PATH=/data/atlas/apache-atlas-2.0.0/hook/hive

Kafka Hook

将编译后的安装包中的 apache-atlas-2.0.0-kafka-hook 中的 jar 包 目录拷贝到 atlas 安装包的 $ATLAS_HOME/hook/kafka/ 下。

然后 将 atlas 配置文件 打包到 $ATLAS_HOME//hook/kafka/atlas-kafka-plugin-impl/kafka-bridge-2.0.0.jar

1
2
在 atlas-kafka-plugin-impl 目录下打包
zip -u kafka-bridge-2.0.0.jar atlas-application.properties

将准备好的 hook 和 hook-bin分发到 Kafka 节点

hook-bin 下执行 import-kafka.sh

注意点:

  1. kafka 版本会有影响,有个 zkUtils 的包,有个方法一直找不到。换个版本就好了,目测是低版本kafka 集群少了未知的包,
  2. atlas 只会读取与 atlas 集成的 kafka 的 topic 。需要在导入kafka-bridge 包同级目录下增加 atlas-application.properties ,atlas.cluster.name 和 atlas.kafka.zookeeper.connect 两项分别确定 集群名和 目标kafka地址。导入时修改的这两个配置不会影响 atlas 集群中的参数。

HBase Hook

提前下载缺失的包:https://mvnrepository.com/artifact/org.apache.htrace/htrace-core4/4.2.0-incubating

HBase-site.xml 增加配置

1
2
3
4
<property>
<name>hbase.coprocessor.master.classes</name>
<value>org.apache.atlas.hbase.hook.HBaseAtlasCoprocessor</value>
</property>

atlas 配置文件 atlas-application.properties 拷贝到HBase 所有节点的 config 目录。

批量导入时,需要增加上面下载的 jar 包 到hook 下的目录下。

把所有 atlas 下 HBase hook 相关的 jar 包 软链接 到 HBase 下的 lib 目录下。

HBase 版本问题还未解决。

针对于 要求不同 hive 不同 HBase 导入时所对应的 qualifiedName 中 default.testhive@testHive 的 testHive 不一样。需要将不同hive 、HBase 集群 中的 atlas-application.properties 中的 atlas.cluster.name。hive不需要重启。HBase需要重启。切记,同一个集群的,要保持配置一样。

Spark-Atlas-Connector

git下载源码https://github.com/hortonworks-spark/spark-atlas-connector/archive/master.zip

编译

1
2
mvn clean
mvn package -DskipTests

创建 atlas 中 spark 相关模块:

拷贝 patch/1100-spark_model.json$ATLAS_HOME/models/1000-Hadoop/ 下,然后重启 atlas 服务使之生效。

将 配置 文件 atlas-application.properties 拷贝到 spark conf 下,同时 spark 需要能连接上hive hadoop等。

启动 spark SQL 或者 spark shell 时,带上连接器的 jar 包。

1
bin/spark-sql --jars spark-atlas-connector-assembly-0.1.0-SNAPSHOT.jar --conf spark.extraListeners=com.hortonworks.spark.atlas.SparkAtlasEventTracker --conf spark.sql.queryExecutionListeners=com.hortonworks.spark.atlas.SparkAtlasEventTracker --conf spark.sql.streaming.streamingQueryListeners=com.hortonworks.spark.atlas.SparkAtlasStreamingQueryEventTracker

此连接器,只能实现 insert 相关操作,并且生成的 atlas entities 是 spark process,而不是 hive process。所以几乎对追踪 hive 元数据没作用。

所有就有接下来的转换思路,不用spark-sql,而是转为 使用 hive on spark。

Hive on Spark

hive on spark 与 spark-sql 操作过程正好相反。spark-sql 是在spark 中连接hive元数据,然后进行计算。而 hive on spark 是,hive 底层执行引擎由 MR 修改为 Spark。

版本依赖:

Hive Version Spark Version
master 2.3.0
3.0.x 2.3.0
2.3.x 2.0.0
2.2.x 1.6.0
2.1.x 1.6.0
2.0.x 1.5.0
1.2.x 1.3.1
1.1.x 1.2.0

spark-sql 监听器实现

原理:通过listener监听到spark-sql中的每一个sql,然后将sql放入到hive session中进行解析,然后引用atlas的hive hook包 进行 sql 关系的注入,drop 操作除外。

(无法实现drop操作是因为,在spark-sql中,执行drop 后,hive metastore 中已经不存在需要drop的数据,当 hive session进行解析时会发现不存在,无法进行解析。)

代码打包:(不同的hive环境需要替换掉jar包中的hive-site.xml)

  • atlas-bridges-1.0-SNAPSHOT.jar
  • atlas-bridges-1.0-SNAPSHOT-jar-with-dependencies.jar

使用方法:将jar包放到 $SPARK_HOME/jars 目录下。

  • 对于 atlas-bridges-1.0-SNAPSHOT.jar ,还需要在jars目录下创建 atlas-hive-hook 的相关包 的软链接

  • 对于atlas-bridges-1.0-SNAPSHOT-jar-with-dependencies.jar ,只需要放入其包。

    然后在启动spark-sql时,加上监听器参数

    1
    2
    3
    bin/spark-sql --jars atlas-bridges-1.0-SNAPSHOT.jar  --conf spark.extraListeners=com.eebbk.atlas.bridges.sparksql.AtlasMetadataListener

    bin/spark-sql --conf spark.extraListeners=com.eebbk.atlas.bridges.sparksql.AtlasMetadataListener

listener:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.eebbk.atlas.bridges.sparksql;

import org.apache.log4j.Logger;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;


public class AtlasMetadataListener extends SparkListener {

private static final Logger log = Logger.getLogger(AtlasMetadataListener.class);

private String command;
private SparkSqlLineage sparkSqlLineage = new SparkSqlLineage();
private Thread thread = new Thread(sparkSqlLineage);


@Override
public void onJobEnd(SparkListenerJobEnd jobEnd) {
if (thread.getState().compareTo(Thread.State.NEW)==0){
thread.start();
}
if (!command.toLowerCase().startsWith("drop")){
log.info("send command to sparkSqlLineage: "+command);
sparkSqlLineage.addCommand(command);
} else {
log.info("drop operator is not support in spark-sql for apache atlas!");
}
}

@Override
public void onJobStart(SparkListenerJobStart jobStart) {
Object jobDesc = jobStart.properties().get("spark.job.description");
command = jobDesc.toString();
}
}

sqlLineage:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package com.eebbk.atlas.bridges.sparksql;

import org.apache.atlas.hive.hook.HiveHook;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.hooks.Hook;
import org.apache.hadoop.hive.ql.hooks.HookContext;
import org.apache.hadoop.hive.ql.hooks.HookUtils;
import org.apache.hadoop.hive.ql.lockmgr.LockException;
import org.apache.hadoop.hive.ql.parse.*;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;

public class SparkSqlLineage implements Runnable{

private static final Logger log = Logger.getLogger(SparkSqlLineage.class);

private final LinkedBlockingQueue<String> sqlQueue = new LinkedBlockingQueue<>(100);

private HiveConf hiveConf = null;
private ParseDriver pd = null;
private Context context = null;
private SessionState ss = null;
private String queryStr = null;
private String command = null;

public void addCommand(String command){
try {
sqlQueue.put(command);
} catch (InterruptedException e) {
log.error("put command to queue fail. ",e);
}
}

@Override
public void run() {

while (true){
try {
command = sqlQueue.take();
} catch (InterruptedException e) {
log.error("get sql command fail." + e );
}

if (command==null||command.equals("end")){
log.info("process sql thread end !");
break;
}

if (ss==null){
initSessionState();
}

String queryId = QueryPlan.makeQueryId();

command = new VariableSubstitution().substitute(hiveConf, command);

if (context == null){
initContext();
}
context.setCmd(command);

// get tree
ASTNode tree = getTree(command);

dealSessionState();

try {
queryStr = HookUtils.redactLogString(hiveConf, command);
hiveConf.set("mapreduce.workflow.name", queryStr);
} catch (Exception e) {
log.error("get queryStr fail." + e);
}

// get sem
BaseSemanticAnalyzer sem = null;
try {
sem = SemanticAnalyzerFactory.get(hiveConf, tree);
sem.analyze(tree, context);
log.info("Semantic Analysis Completed");
sem.validate();
} catch (SemanticException e) {
log.error("init sem fail."+ e );
}

// get query plan
if (sem == null){
break;
}
QueryPlan plan = new QueryPlan(queryStr, sem, System.currentTimeMillis(), queryId, ss.getCommandType());

// HookContext
HookContext hookContext = null;
try {
hookContext = new HookContext(plan, hiveConf, context.getPathToCS(), ss.getUserName(), ss.getUserIpAddress(), queryId);
hookContext.setHookType(HookContext.HookType.POST_EXEC_HOOK);
} catch (Exception e) {
log.error("get hookContext fail." + e);
}

List<Hook> hookList;
try {
hookList = HookUtils.getHooks(hiveConf, HiveConf.ConfVars.POSTEXECHOOKS, Hook.class);
for (Hook hook:hookList){
if (hook instanceof HiveHook){
((HiveHook) hook).run(hookContext);
}
}
} catch (Exception e) {
log.error("run hook fail."+ e);
}
}
}

private void initSessionState(){
SessionState sessionState = new SessionState(hiveConf == null ? initHiveConf() : hiveConf);
SessionState.start(sessionState);
this.ss = SessionState.get();
}

private HiveConf initHiveConf(){
this.hiveConf = new HiveConf();
return hiveConf;
}

private void initContext() {
try {
this.context = new Context(hiveConf);
context.setTryCount(Integer.MAX_VALUE);
context.setHDFSCleanup(true);
} catch (IOException e) {
log.error("init context fail." + e );
}
}

private void initParseDriver(){
this.pd = new ParseDriver();
}

private ASTNode getTree(String command){
ASTNode tree = null;
if (pd ==null){
initParseDriver();
}
try {
tree = pd.parse(command, context);
tree = ParseUtils.findRootNonNullToken(tree);
log.info("parse command to tree success.");
} catch (ParseException e) {
log.error("parse command to tree fail. " + e);
}
return tree;
}

private void dealSessionState(){
try {
ss.initTxnMgr(hiveConf);
ValidTxnList txns = ss.getTxnMgr().getValidTxns();
String txnStr = txns.toString();
hiveConf.set(ValidTxnList.VALID_TXNS_KEY, txnStr);
} catch (LockException e) {
log.error("deal SessionState fail." + e );
}
}
}