原创

一文了解什么是Apache Druid

1. 引言

在本教程中,我们将了解如何使用事件数据和 Apache Druid. 我们将介绍事件数据和 Druid 体系结构的基础知识。作为其中的一部分,我们将创建一个简单的数据管道,利用Druid的各种特性,包括各种数据接收模式和查询准备好的数据的不同方法。

2. 基础概念

在深入讨论Apache Druid的操作细节之前,让我们先了解一些基本概念。最感兴趣的是大规模事件数据的实时分析。

因此,我们必须理解事件数据的含义,以及在规模上实时分析它们需要什么。

2.1. 什么是事件数据?

事件数据是指关于在特定时间点发生的更改的一段信息。事件数据在当今的应用程序中几乎无处不在。从经典的应用程序日志到现代的传感器数据,它几乎无处不在。它们通常以大规模产生的机器可读信息为特征。

它们支持一些功能,如预测、自动化、通信和集成等等。此外,它们在事件驱动体系结构中具有重要意义。

2.2. Apache Druid 是什么?

ApacheDruid是一个实时分析数据库,旨在对面向事件的数据进行快速分析。Druid从2011年开始,2012年在GPL许可下开源,2015年迁移到Apache许可。它由Apache基金会管理,来自多个组织的社区捐款。它提供了实时接收、快速查询性能和高可用性。

Druid这个名字指的是它的体系结构可以改变以解决不同类型的数据问题。在商业智能应用程序中,它经常被用来分析大量的实时和历史数据。

3. Druid 架构

Druid是一个用Java编写的面向列的分布式数据源。它能够接收大量的事件数据,并在这些数据之上提供低延迟查询。此外,它还提供了对数据进行任意切片和分块的可能性。

了解Druid架构如何支持这些特性是非常有趣的。在本节中,我们将介绍 Druid 体系结构的一些重要部分。

3.1. 数据存储设计

了解 Druid 如何构建和存储数据是很重要的,这样可以进行分区和分发。默认情况下,在数据块处理期间将数据存储到数据块中

img

Druid将数据存储在我们所知的“datasource”,这在逻辑上类似于关系数据库中的表。Druid 可以并行处理来自多个源的数据。
默认情况下,每个数据源都是基于时间进行分区的,如果配置了其他属性,则会进一步基于其他属性。一个时间范围的数据被称为“块(chunk)”—例如,如果数据是按小时划分的,则是一小时的数据。

每个块(chunk)进一步划分为一个或多个“段(segments)”,这些“段”是由多行数据组成的单个文件。一个数据源可以有从几段到数百万段的任意位置。

3.2. Druid 进程

Druid 有多进程和分布式体系结构。因此,每个进程都可以独立地伸缩,允许我们创建灵活的集群。来了解下Druid 的重要过程:

img

  • Coordinator: 该过程主要负责段的管理和分发,并与历史进程通信,根据配置加载或删除段ns
  • Overlord: 这是负责接受任务、协调任务分配、围绕任务创建锁以及将状态返回给调用者的主流程
  • Broker: 这是一个在分布式集群中执行所有查询的进程;它从Zookeeper收集元数据,并将查询路由到具有正确段的进程
  • Router: 这是一个可选过程,可用于将查询路由到不同的代理进程,从而为查询更重要的数据提供查询隔离
  • Historical: 这些进程存储可查询的数据;它们与Zookeeper保持恒定的连接,并监视它们必须加载和服务的段信息
  • MiddleManager: 这些是执行提交的任务的工作进程;它们将任务转发给运行在不同jvm中的moon,从而提供资源和日志隔离

3.3. 外部依赖项

除了核心进程外,Druid还依赖于几个外部依赖项,以使其集群能够按预期运行

让我们看看Druid集群是如何与核心流程和外部依赖关系一起形成的:

img

Druid 使用 深层存储来存储 被摄取到系统中的任何数据。它们不用于响应查询,而是用作数据备份和在进程之间传输数据。它们可以是从本地文件系统到分布式对象存储(如S3和HDFS)的任何内容。

元数据存储用于保存共享的系统元数据. 如段使用信息和任务信息。但是,它从未用于存储实际数据。它是一个关系数据库,比如Apache Derby、PostgreSQL或MySQL。

Druid使用Apache Zookeeper用于管理当前集群的状态。它促进了 Druid 集群中的许多操作,如coordinator/overlord的领袖选举、段发布协议和段加载/丢弃协议。

4. Druid 设置

Druid被设计成一个可伸缩的、容错的集群。然而,建立一个生产级的Druid集群并非易事。如前所述,有许多进程和外部依赖项需要设置和配置。由于以灵活的方式创建集群是可能的,我们必须注意我们的需求,以便适当地设置各个流程。
另外,Druid仅在类Unix环境中受支持,而在Windows上不受支持。此外,运行Druid进程需要java8或更高版本。有几种单服务器配置可用于在一台机器上设置Druid,以运行教程和示例。但是,对于运行生产工作负载,建议使用多台机器建立一个完整的Druid集群。

在本教程中,我们将借助在Docker Hub发布的官方Docker镜像在一台机器上设置Druid. 这也使我们能够在Windows上运行Druid,正如我们前面所讨论的,它在其他方面是不受支持的。有一个Docker compose file可用,它为每个Druid进程及其外部依赖项创建一个容器。

我们必须将配置值作为环境变量提供给Druid。实现这一点最简单的方法是在与Docker compose文件相同的目录中提供[一个名为“environment”的文件]。

一旦我们有了Docker compose和环境文件,启动Druid就像在同一个目录中运行一个命令一样简单:

docker -compose up

这将调出一个单机 Druid 设置所需的所有容器。作为一个机器,我们必须小心地提供大量的内存。

5. 摄取数据

使用Druid构建数据管道的第一步是将数据加载到Druid中。这个过程在Druid架构中被称为数据摄取或索引。我们必须找到一个合适的数据集来继续本教程。
现在,正如我们目前所收集到的,我们必须收集事件数据,并且具有一些时间特性,以充分利用 Druid 的基础设施。

Druid 的官方指南使用简单而优雅的包含特定日期的维基百科页面编辑的数据. 我们将继续使用它作为我们的教程。

5.1. 数据模型

让我们从检查现有数据的结构开始。我们创建的大多数数据管道对数据异常非常敏感,因此,有必要尽可能地清理数据。
虽然有复杂的方法和工具来执行数据分析,但我们将从目视检查开始。快速分析会发现输入数据中有以JSON格式捕获的事件,单个事件包含典型属性

{
  "time": "2015-09-12T02:10:26.679Z",
  "channel": "#pt.wikipedia",
  "cityName": null,
  "comment": "Houveram problemas na última edição e tive de refazê-las, junto com as atualizações da página.",
  "countryIsoCode": "BR",
  "countryName": "Brazil",
  "isAnonymous": true,
  "isMinor": false,
  "isNew": false,
  "isRobot": false,
  "isUnpatrolled": true,
  "metroCode": null,
  "namespace": "Main",
  "page": "Catarina Muniz",
  "regionIsoCode": null,
  "regionName": null,
  "user": "181.213.37.148",
  "delta": 197,
  "added": 197,
  "deleted": 0
}

虽然有很多属性定义了这个事件,但是在处理Druid的时候,有一些属性是我们特别感兴趣的:

  • Timestamp
  • Dimensions
  • Metrics

Druid 需要一个特定的属性来标识为时间戳列。在大多数情况下,Druid 的数据解析器能够自动检测出最佳候选对象。但我们总是有选择的余地,尤其是当我们的数据中没有合适的属性时。
维度是 Druid 按原样存储的属性。我们可以将它们用于任何目的,如分组、筛选或应用聚合器。我们可以选择摄取规范中的维度,我们将在教程中进一步讨论。
与维度不同,度量是默认以聚合形式存储的属性。我们可以为Druid选择一个聚合函数,在摄取过程中应用于这些属性。加上启用了上卷功能,这些功能可以导致紧凑的数据表示。

5.2. 摄取方法

现在,我们将讨论在Druid中执行数据摄取的各种方法。通常,事件驱动的数据本质上是流式的,这意味着它们会随着时间的推移以不同的速度生成,就像维基百科的编辑一样。
然而,我们可能会在一段时间内对数据进行批处理,因为数据本质上是静态的,就像去年维基百科的所有编辑一样。
我们可能还需要解决各种各样的数据用例,而Druid对其中大多数都有极好的支持。让我们回顾一下在数据管道中使用Druid的两种最常见的方法:

  • Streaming Ingestion
  • Batched Ingestion

在Druid中,最常见的摄取数据的方法是通过Apache流服务,Druid可以直接从Kafka读取数据。Druid 也支持其他平台,比如Kinesis。我们必须在重载过程中启动管理器,它创建和管理Kafka索引任务。我们可以通过重载进程的 httppost 命令以JSON文件的形式提交一个supervisor规范来启动supervisor。

或者,我们可以批量接收数据—例如,从本地或远程文件。它提供了一个基于Hadoop的批处理接收选项,用于以Hadoop文件格式从Hadoop文件系统接收数据。更常见的是,我们可以选择顺序或并行的本机批处理摄取。它是一种更方便、更简单的方法,因为它没有任何外部依赖关系。

5.3.定义任务规范

在本教程中,我们将为我们的输入数据设置一个本机批处理摄取任务。我们可以从Druid控制台配置任务,这为我们提供了一个直观的图形界面。或者,我们可以将任务规范定义为JSON文件,并使用脚本或命令行将其提交给overlord 进程

让我们首先定义一个简单的任务规范,用于将数据摄取到一个名为 wikipedia-index.json 的文件中:

{
  "type" : "index_parallel",
  "spec" : {
    "dataSchema" : {
      "dataSource" : "wikipedia",
      "dimensionsSpec" : {
        "dimensions" : [
          "channel",
          "cityName",
          "comment",
          "countryIsoCode",
          "countryName",
          "isAnonymous",
          "isMinor",
          "isNew",
          "isRobot",
          "isUnpatrolled",
          "metroCode",
          "namespace",
          "page",
          "regionIsoCode",
          "regionName",
          "user",
          { "name": "added", "type": "long" },
          { "name": "deleted", "type": "long" },
          { "name": "delta", "type": "long" }
        ]
      },
      "timestampSpec": {
        "column": "time",
        "format": "iso"
      },
      "metricsSpec" : [],
      "granularitySpec" : {
        "type" : "uniform",
        "segmentGranularity" : "day",
        "queryGranularity" : "none",
        "intervals" : ["2015-09-12/2015-09-13"],
        "rollup" : false
      }
    },
    "ioConfig" : {
      "type" : "index_parallel",
      "inputSource" : {
        "type" : "local",
        "baseDir" : "quickstart/tutorial/",
        "filter" : "wikiticker-2015-09-12-sampled.json.gz"
      },
      "inputFormat" : {
        "type": "json"
      },
      "appendToExisting" : false
    },
    "tuningConfig" : {
      "type" : "index_parallel",
      "maxRowsPerSegment" : 5000000,
      "maxRowsInMemory" : 25000
    }
  }
}

让我们根据前面的小节中介绍的基础知识来理解此任务规范:

  • 我们选择了index_parallel任务,它并行地为我们提供本机批处理摄取
  • 我们将在该任务中使用的数据源的名称为“wikipedia”
  • 数据的时间戳来自属性“time”
  • 我们添加了许多数据属性作为维度
  • 在当前任务中,我们没有对数据使用任何指标
  • 应为此任务禁用默认启用的汇总
  • 任务的输入源是名为wikiticker-2015-09-12-sampled.json.gz的本地文件
  • 我们没有使用任何辅助分区,可以在tuningConfig中定义

此任务规范假设我们已下载数据文件 wikiticker-2015-09-12-sampled.json.gz 并保存在Druid运行的本地机器上。当我们将 Druid 作为码头集装箱运行时,这可能会更棘手。幸运的是,Druid在默认情况下会在quickstart/tutorial位置提供这个示例数据。

5.4.提交任务说明

最后,我们可以使用curl这样的工具通过命令行将这个任务规范提交给 overlord 进程:

curl -X 'POST' -H 'Content-Type:application/json' -d @wikipedia-index.json http://localhost:8081/druid/indexer/v1/task

通常,如果提交成功,上面的命令会返回任务的ID。我们可以通过Druid控制台或通过执行查询来验证摄取任务的状态,这将在下一节中介绍。

5.5. 高级摄取概念

当我们需要处理大量的数据时,Druid是最合适的——当然不是我们在本教程中看到的那种数据!现在,为了实现大规模的特性,Druid架构必须提供合适的工具和技巧。

虽然我们在本教程中不会使用它们,但让我们快速讨论上卷和分区。

事件数据的大小很快就会增长到巨大的卷,这会影响我们可以实现的查询性能。在许多情况下,我们可能会总结一段时间内的数据。这就是我们所知道的Druid卷曲。当启用了上卷时,Druid会努力在摄取期间使用相同的维度和时间戳汇总行。上卷虽然可以节省空间,但会导致查询精度的损失,因此必须合理使用。

面对不断增长的数据量,另一种实现更好性能的潜在方法是分配数据,从而增加工作负载。默认情况下,Druid根据时间戳将数据划分为包含一个或多个段的时间块。此外,我们可以决定使用自然维度进行二次分区,以改善数据的局部性。此外,Druid首先根据时间戳对每个片段中的数据进行排序,然后根据我们配置的其他维度进行排序。

6.查询数据

一旦我们成功地执行了数据摄取,它就可以供我们查询了。在Druid中有多种查询数据的方法。在Druid中执行查询的最简单方法是通过Druid控制台。但是,我们也可以通过发送HTTP命令或使用命令行工具来执行查询。

在Druid中构造查询的两种主要方式是原生查询和类SQL查询。我们将用这两种方法构造一些基本查询,并使用curl通过HTTP发送它们。让我们来看看如何对之前在Druid中摄取的数据创建一些简单的查询。

6.1. 原生查询

Druid 中的本机查询使用JSON对象,我们可以将其发送到代理或路由器进行处理。我们可以通过 httppost 命令发送查询,除其他外,也可以这样做。
让我们创建一个名为simple_query_native.json 的文件:

{
  "queryType" : "topN",
  "dataSource" : "wikipedia",
  "intervals" : ["2015-09-12/2015-09-13"],
  "granularity" : "all",
  "dimension" : "page",
  "metric" : "count",
  "threshold" : 10,
  "aggregations" : [
    {
      "type" : "count",
      "name" : "count"
    }
  ]
}

这是一个简单的查询,它获取2019年9月12日至13日期间收到最多页面编辑的前十个页面。
让我们使用curl在HTTP上发布:

curl -X 'POST' -H 'Content-Type:application/json' -d @simple_query_native.json http://localhost:8888/druid/v2?pretty

此响应包含JSON格式的前十页的详细信息:

[ {
  "timestamp" : "2015-09-12T00:46:58.771Z",
  "result" : [ {
    "count" : 33,
    "page" : "Wikipedia:Vandalismusmeldung"
  }, {
    "count" : 28,
    "page" : "User:Cyde/List of candidates for speedy deletion/Subpage"
  }, {
    "count" : 27,
    "page" : "Jeremy Corbyn"
  }, {
    "count" : 21,
    "page" : "Wikipedia:Administrators' noticeboard/Incidents"
  }, {
    "count" : 20,
    "page" : "Flavia Pennetta"
  }, {
    "count" : 18,
    "page" : "Total Drama Presents: The Ridonculous Race"
  }, {
    "count" : 18,
    "page" : "User talk:Dudeperson176123"
  }, {
    "count" : 18,
    "page" : "Wikipédia:Le Bistro/12 septembre 2015"
  }, {
    "count" : 17,
    "page" : "Wikipedia:In the news/Candidates"
  }, {
    "count" : 17,
    "page" : "Wikipedia:Requests for page protection"
  } ]
} ]

6.2. Druid SQL

Druid有一个内置的SQL层,它为我们提供了在熟悉的类似SQL的构造中构造查询的自由。它利用 Apache Calcite来解析和规划查询。但是,Druid Sql在将SQL查询发送到数据处理之前,会将SQL查询转换为查询代理上的原生查询。

让我们看看如何使用druidsql创建与之前相同的查询。和以前一样,我们将创建一个名为 simple_query_sql.json 的JSON文件。

{
  "query":"SELECT page, COUNT(*) AS counts /
    FROM wikipedia WHERE \"__time\" /
    BETWEEN TIMESTAMP '2015-09-12 00:00:00' AND TIMESTAMP '2015-09-13 00:00:00' /
    GROUP BY page ORDER BY Edits DESC LIMIT 10"
}

请注意,为了可读性,查询被分成多行,但它应该出现在一行中。同样,如前所述,我们将通过HTTP发布此查询,但发送到另一个端点:

curl -X 'POST' -H 'Content-Type:application/json' -d @simple_query_sql.json http://localhost:8888/druid/v2/sql

输出应该与我们先前使用本机查询获得的结果非常相似。

6.3. 查询类型

在前一节中,我们看到了一种类型的查询,我们根据一个间隔获取度量“count”的前十个结果。这只是Druid支持的一种查询类型,称为TopN查询。当然,我们可以通过使用过滤器和聚合使这个简单的TopN查询更加有趣。但这不在本教程的范围之内。然而,Druid 中还有其他几个问题可能会引起我们的兴趣。

一些流行的包括Timeseries和GroupBy.

Timeseries查询返回一个JSON对象数组,其中每个对象表示时间序列查询中描述的一个值,例如,最近一个月一个维度的日平均值。

GroupBy查询返回一个JSON对象数组,其中每个对象表示GroupBy查询中描述的分组。例如,我们可以查询过去一个月按另一个维度分组的一个维度的日平均值。
还有其他几种查询类型,包括ScanSearchTimeBoundarySegmentMetadataDatasourceMetadata

6.4. 高级查询概念

Druid提供了一些复杂的方法来创建复杂的查询来创建有趣的数据应用程序。这些方法包括各种方法来分割数据,同时仍然能够提供令人难以置信的查询性能。

虽然对它们的详细讨论超出了本教程的范围,但让我们来讨论一些重要的问题,如连接和查找、多租户和查询缓存

Druid 支持两种连接数据的方法。第一种是连接运算符,第二种是查询时查找。但是,为了获得更好的查询性能,建议避免查询时连接。

多租户是指在同一个Druid基础设施上支持多个租户的特性,同时仍然为他们提供逻辑隔离。在Druid中,可以通过每个租户的独立数据源或租户的数据分区来实现这一点。

最后,查询缓存是数据密集型应用程序性能的关键。Druid 在段和查询结果级别支持查询结果缓存。此外,高速缓存数据可以驻留在存储器或外部持久存储器中。

7. 语言绑定

尽管Druid对创建摄取规范和用JSON定义查询有很好的支持,但是有时候在JSON中定义这些查询可能会很乏味,尤其是当查询变得复杂时。不幸的是,Druid 没有提供任何特定语言的客户端库来帮助我们。但是有很多语言绑定是由社区开发的。一个这样的客户机库也可用于Java。

我们将快速了解如何使用Java中的这个客户机库来构建前面使用的TopN查询。

让我们首先定义Maven中所需的依赖项:

<dependency>
    <groupId>in.zapr.druid</groupId>
    <artifactId>druidry</artifactId>
    <version>2.14</version>
</dependency>

在此之后,我们应该能够使用客户端库并创建TopN查询:

DateTime startTime = new DateTime(2015, 9, 12, 0, 0, 0, DateTimeZone.UTC);
DateTime endTime = new DateTime(2015, 9, 13, 0, 0, 0, DateTimeZone.UTC);
Interval interval = new Interval(startTime, endTime);
Granularity granularity = new SimpleGranularity(PredefinedGranularity.ALL);
DruidDimension dimension = new SimpleDimension("page");
TopNMetric metric = new SimpleMetric("count");
DruidTopNQuery query = DruidTopNQuery.builder()
  .dataSource("wikipedia")
  .dimension(dimension)
  .threshold(10)
  .topNMetric(metric)
  .granularity(granularity)
  .filter(filter)
  .aggregators(Arrays.asList(new LongSumAggregator("count", "count")))
  .intervals(Collections.singletonList(interval)).build();

之后,我们只需生成所需的JSON结构,就可以在HTTP POST调用中使用:

ObjectMapper mapper = new ObjectMapper();
String requiredJson = mapper.writeValueAsString(query);

8. 总结

在本教程中,我们学习了事件数据和apache druid架构的基础知识。

此外,我们在本地机器上使用Docker容器建立了一个主要的Druid集群。然后,我们还完成了在Druid中使用本机批处理任务摄取示例数据集的过程。在这之后,我们看到了在Druid中查询数据的不同方法。最后,我们查看了Java中的客户机库来构建Druid查询。

我们刚刚触及了 Druid 必须提供的一些特性。Druid 可以帮助我们构建数据管道和创建数据应用程序。为了有效地利用Druid的能力,很明显,下一步要学习的是高级摄取和查询特性。

此外,创建一个合适的Druid集群,根据需要扩展各个进程,应该是最大化收益的目标。

正文到此结束