Druid初探

Apache Druid 是什么

是一个为在大数据集之上做实时统计分析而设计的开源数据存储。这个系统集合了一个面向列存储的层,一个分布式、shared-nothing的架构,和一个高级的索引结构,来达成在秒级以内对十亿行级别的表进行任意的探索分析。

Apache Druid 架构

一个Druid集群包含不同类型的节点,而每种节点都被设计来做好某组事情。我们相信这样的设计可以隔离关注并简化整个系统的复杂度。不同节点的运转几乎都是独立的并且和其他的节点有着最小化的交互,因此集群内的通信故障对于数据可用性的影响微乎其微。

为了解决复杂的数据分析问题,把不同类型的节点组合在一起,就形成了一个完整的系统。架构图来自官网

apache druid 架构

实时节点

实时节点封装了导入查询事件数据的功能,经由这些节点导入的事件数据可以立刻被查询。实时节点只关心一小段时间内的事件数据,并定期把这段时间内收集的这批不可变事件数据导入到Druid集群里面另外一个专门负责处理不可变的批量数据的节点中去。实时节点通过Zookeeper的协调和Druid集群的其他节点协调工作。实时节点通过Zookeeper来宣布他们的在线状态和他们提供的数据。

实时节点为所有传入的事件数据维持一个内存中的索引缓存。为了避免堆溢出问题,实时节点会定期地、或者在达到设定的最大行限制的时候,把内存中的索引持久化到磁盘去。这个持久化进程会把保存于内存缓存中的数据转换为基于列存储的格式

所有的实时节点都会周期性的启动后台的计划任务搜索本地的持久化索引,后台计划任务将这些持久化的索引合并到一起并生成一块不可变的数据

实时节点是一个数据的消费者,需要有相应的生产商为其提供数据流。通常,为了数据耐久性的目的,会在生产商与实时节点间放置一个类似于Kafka这样的消息总线来进行连接。消息总线作为传入数据的缓冲区。类似于Kafka这样的消息总线会维持一个指示当前消费者(实时节点)从事件数据流中已经读取数据的位置偏移量,实时节点每次持久化内存中的缓存到磁盘的时候,都会更新这个偏移量。多个实时节点可以从数据总线导入同一组数据,为数据创建一个副本。这样当一个节点完全挂掉并且磁盘上的数据也丢失了,副本可以确保不会丢失任何数据。

历史节点

历史节点封装了加载和处理由实时节点创建的不可变数据块(segment)的功能。

类似于实时节点,历史节点在Zookeeper 中通告它们的在线状态和为哪些数据提供服务。

Broker节点

Broker 节点扮演着历史节点和实时节点的查询路由的角色。Broker节点知道发布于Zookeeper 中的关于哪些segment 是可查询的和这些 segment 是保存在哪里的,Broker节点就可以将到来的查询请求路由到正确的历史节点或者是实时节点,Broker节点也会将历史节点和实时节点的局部结果进行合并,然后返回最终的合并后的结果给调用者。

协调节点

Druid的协调节点主要负责数据的管理和在历史节点上的分布。协调节点告诉历史节点加载新数据、卸载过期数据、复制数据和为了负载均衡移动数据。

存储格式

Druid中的数据表(称为数据源)是一个时间序列事件数据的集合,并分割到一组segment中,而每一个segment通常是0.5-1千万行。在形式上,我们定义一个segment为跨越一段时间的数据行的集合。Segment是Druid里面的基本存储单元,复制和分布都是在segment基础之上进行的。

其他依赖

Druid 包含3个外部依赖 :Mysql、Deep storage、Zookeeper

  • Mysql

    Mysql:存储关于Druid中的metadata而不是存储实际数据,包含3张表:druid_config (通常是空的), druid_rules(协作节点使用的一些规则信息,比如哪个segment从哪个node去load)和 druid_segments (存储 每个segment的metadata信息)

  • Deep storage

    Deep storage: 存储segments,Druid目前已经支持本地磁盘、NFS挂载磁盘、HDFS、S3等。Deep Storage的数据有2个来源,一个是批数据摄入, 另一个来自实时节点

  • ZooKeeper

    ZooKeeper: 被Druid用于管理当前cluster的状态,比如记录哪些segments从实时节点移到了历史节点

查询API

可以使用 SQL 和 Curl 的方式

单机版安装

下载需要的安装包

  • Zookeeperwget https://apache.org/dist/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz
  • Druidwget http://mirror.bit.edu.cn/apache/incubator/druid/0.15.1-incubating/apache-druid-0.15.1-incubating-bin.tar.gz

解压安装包

1
2
tar -zxf  apache-druid-0.15.1-incubating-bin.tar.gz -C /data/druid/druid-0.15.1
tar -zxf zookeeper-3.4.14.tar.gz -C /data/druid/

Zookeeper 配置:

1
2
cd /data/druid/zookeeper-3.4.14 
mv conf/zoo_sample.cfg conf/zoo.cfg

druid 单机启动准备:

1
ln -s /data/druid/zookeeper-3.4.14 /data/druid/druid-0.15.1/zk

事前不需要启动ZK,直接启动Druid集群:

1
2
3
4
5
6
7
[root@calcnode3 druid-0.15.1]# bin/start-micro-quickstart
[Tue Sep 10 15:39:09 2019] Running command[zk], logging to[/data/druid/druid-0.15.1/var/sv/zk.log]: bin/run-zk conf
[Tue Sep 10 15:39:09 2019] Running command[coordinator-overlord], logging to[/data/druid/druid-0.15.1/var/sv/coordinator-overlord.log]: bin/run-druid coordinator-overlord conf/druid/single-server/micro-quickstart
[Tue Sep 10 15:39:09 2019] Running command[broker], logging to[/data/druid/druid-0.15.1/var/sv/broker.log]: bin/run-druid broker conf/druid/single-server/micro-quickstart
[Tue Sep 10 15:39:09 2019] Running command[router], logging to[/data/druid/druid-0.15.1/var/sv/router.log]: bin/run-druid router conf/druid/single-server/micro-quickstart
[Tue Sep 10 15:39:09 2019] Running command[historical], logging to[/data/druid/druid-0.15.1/var/sv/historical.log]: bin/run-druid historical conf/druid/single-server/micro-quickstart
[Tue Sep 10 15:39:09 2019] Running command[middleManager], logging to[/data/druid/druid-0.15.1/var/sv/middleManager.log]: bin/run-druid middleManager conf/druid/single-server/micro-quickstart

如果需要,可以根据启动脚本使用的配置,在配置文件中修改配置信息。

其中 bin/start-micro-quickstart 主要内容是:

1
exec "$WHEREAMI/supervise" -c "$WHEREAMI/../conf/supervise/single-server/micro-quickstart.conf"

可知,启动配置在 conf/supervise/single-server/micro-quickstart.conf,查看一下主要内容:

1
2
3
4
5
6
7
8
9
10
:verify bin/verify-java           // 校验 java
:verify bin/verify-default-ports // 校验 端口
:kill-timeout 10

!p10 zk bin/run-zk conf // 启动 zk
coordinator-overlord bin/run-druid coordinator-overlord conf/druid/single-server/micro-quickstart // 启动 coordinator-overlord
broker bin/run-druid broker conf/druid/single-server/micro-quickstart // 启动 broker
router bin/run-druid router conf/druid/single-server/micro-quickstart // 启动 router
historical bin/run-druid historical conf/druid/single-server/micro-quickstart // 启动 historical
!p90 middleManager bin/run-druid middleManager conf/druid/single-server/micro-quickstart // 启动 middleManager

看样子,单机配置全在 conf/druid/single-server/micro-quickstart 下面:

1
2
3
4
5
6
7
8
[root@calcnode3 micro-quickstart]# ll
总用量 24
drwxr-xr-x. 2 hive games 4096 9月 12 11:13 broker
drwxr-xr-x. 2 hive games 4096 9月 12 11:44 _common
drwxr-xr-x. 2 hive games 4096 9月 12 11:34 coordinator-overlord
drwxr-xr-x. 2 hive games 4096 9月 12 11:35 historical
drwxr-xr-x. 2 hive games 4096 9月 12 11:13 middleManager
drwxr-xr-x. 2 hive games 4096 9月 12 11:13 router

配置项分别对应各自的节点。

此次只修改了 _common/common.runtime.properties 文件。包括启动的服务,主机名,ZK,元数据,深度存储等。

WebUI界面:http://xxx.xxx.xxx.108:8888

简单使用

先看看批量提交的脚本:

1
2
3
4
5
6
7
8
9
10
11
12
13
[root@calcnode3 druid-0.15.1]# cat bin/post-index-task
#!/bin/bash -eu

PWD="$(pwd)"
WHEREAMI="$(dirname "$0")"
WHEREAMI="$(cd "$WHEREAMI" && pwd)"

if [ -x "$(command -v python2)" ]
then
exec python2 "$WHEREAMI/post-index-task-main" "$@"
else
exec "$WHEREAMI/post-index-task-main" "$@"
fi

脚本使用方法,如下。必须指定 一个文件。

1
2
3
4
5
6
7
[root@calcnode3 druid-0.15.1]#  bin/post-index-task
usage: post-index-task-main [-h] [--url url]
[--coordinator-url COORDINATOR_URL] --file FILE
[--submit-timeout SUBMIT_TIMEOUT]
[--complete-timeout COMPLETE_TIMEOUT]
[--load-timeout LOAD_TIMEOUT] [--quiet]
[--user USER] [--password PASSWORD]

一个官方给的文件:摄取本地文件到 Druid。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
[root@calcnode3 druid-0.15.1]# cat quickstart/tutorial/wikipedia-index.json 
{
"type" : "index",
"spec" : {
"dataSchema" : {
"dataSource" : "wikipedia",
"parser" : {
"type" : "string",
"parseSpec" : {
"format" : "json",
"dimensionsSpec" : {
"dimensions" : [
"channel",
"cityName",
"comment",
"countryIsoCode",
"countryName",
"isAnonymous",
"isMinor",
"isNew",
"isRobot",
"isUnpatrolled",
"metroCode",
"namespace",
"page",
"regionIsoCode",
"regionName",
"user",
{ "name": "added", "type": "long" },
{ "name": "deleted", "type": "long" },
{ "name": "delta", "type": "long" }
]
},
"timestampSpec": {
"column": "time",
"format": "iso"
}
}
},
"metricsSpec" : [],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "day",
"queryGranularity" : "none",
"intervals" : ["2015-09-12/2015-09-13"],
"rollup" : false
}
},
"ioConfig" : {
"type" : "index",
"firehose" : {
"type" : "local",
"baseDir" : "quickstart/tutorial/",
"filter" : "wikiticker-2015-09-12-sampled.json.gz"
},
"appendToExisting" : false
},
"tuningConfig" : {
"type" : "index",
"maxRowsPerSegment" : 5000000,
"maxRowsInMemory" : 25000
}
}
}

执行:

1
2
3
4
5
6
7
8
9
10
11
12
[root@calcnode3 druid-0.15.1]# bin/post-index-task --file quickstart/tutorial/wikipedia-index.json --url http://calcnode3:8081
Beginning indexing data for wikipedia
Task started: index_wikipedia_2019-09-12T07:41:11.478Z
Task log: http://calcnode3:8081/druid/indexer/v1/task/index_wikipedia_2019-09-12T07:41:11.478Z/log
Task status: http://calcnode3:8081/druid/indexer/v1/task/index_wikipedia_2019-09-12T07:41:11.478Z/status
Task index_wikipedia_2019-09-12T07:41:11.478Z still running...
Task index_wikipedia_2019-09-12T07:41:11.478Z still running...
Task finished with status: SUCCESS
Completed indexing data for wikipedia. Now loading indexed data onto the cluster...
wikipedia is 0.0% finished loading...
wikipedia is 0.0% finished loading...
wikipedia loading complete! You may now query your data

查询:

1
SELECT channel, SUM(added) as add_sum FROM wikipedia WHERE "__time" BETWEEN TIMESTAMP '2015-09-12 00:00:00' AND TIMESTAMP '2015-09-13 00:00:00' GROUP BY channel ORDER BY SUM(added) DESC LIMIT 5;

集群安装

配置在 cluster 配置中,具体配置项和单机版一样,只不过不同的服务启动在不同的节点上。

关于 Hadoop 集成

有两点,一个是 DeepStorage,一个是 Hadoop Batch Ingestion

DeepStorage:深度存储,即 Druid 本身的 segment 数据 和 indexing-log 的存储位置,配置很简单,在 _common 目录下的 common.runtime.properties 中进行修改。

Hadoop Batch Ingestion:即源数据放在 HDFS 上,Druid 直接摄取 HDFS 上的对应数据。支持文本格式,SequenceFile,ORC,Parquet 等格式。下面是简单示例:使用前将 HDFS几个配置文件放到 _common 下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
## my-hadoop.json
{
"type" : "index_hadoop",
"spec" : {
"dataSchema" : {
"dataSource" : "wikipedia_hadoop",
"parser" : {
"type" : "hadoopyString",
"parseSpec" : {
"format" : "json",
"dimensionsSpec" : {
"dimensions" : [
"channel",
"cityName",
"comment",
"countryIsoCode",
"countryName",
"isAnonymous",
"isMinor",
"isNew",
"isRobot",
"isUnpatrolled",
"metroCode",
"namespace",
"page",
"regionIsoCode",
"regionName",
"user",
{ "name": "added", "type": "long" },
{ "name": "deleted", "type": "long" },
{ "name": "delta", "type": "long" }
]
},
"timestampSpec" : {
"format" : "auto",
"column" : "time"
}
}
},
"metricsSpec" : [],
"granularitySpec" : {
"type" : "uniform",
"segmentGranularity" : "day",
"queryGranularity" : "none",
"intervals" : ["2015-09-12/2015-09-13"],
"rollup" : false
}
},
"ioConfig" : {
"type" : "hadoop",
"inputSpec" : {
"type" : "static",
"paths" : "hdfs://bbkhd/tmp/wikiticker-2015-09-12-sampled.json.gz"
}
},
"tuningConfig" : {
"type" : "hadoop",
"partitionsSpec" : {
"type" : "hashed",
"targetPartitionSize" : 5000000
},
"forceExtendableShardSpecs" : true,
"jobProperties" : {
"mapreduce.job.classloader" : "true"
}
}
},
"hadoopDependencyCoordinates": ["org.apache.hadoop:hadoop-client:2.8.3"]
}

启动摄取任务:

1
bin/post-index-task --file my-hadoop.json --url http://calcnode3:8081

优点

  • 亚秒级响应

  • 实时导入-kafka

  • 支持复杂的聚合

  • 不支持大表 join

  • lookup 功能可以 join维表

  • 预计算-使用 MR 导入数据

缺点

  • 集群复杂

    各个节点的配置太多,难以管理

  • SQL支持不完善

  • 集成 BI 只能 superset 等几个

  • 无法精确计算