0%

[笔记] Procella: Unifying serving and analytical data at YouTube

原文:Procella: Unifying serving and analytical data at YouTube

TL;DR

Procella是一种可以适应不同查询需求的分析引擎,它的特点:

  • 松耦合的schema要求,只要数据schema与table schema兼容,不需要完全一致,且文件本身的索引结构也可以惰性生成。
  • 使用了新的列存格式Artus,内带索引结构,同时支持高性能的点查和扫描,且使用高性能的、不解压就可以操作的压缩算法。
  • 使用Lambda架构,实时节点与compaction节点分离,前者支持高性能读写,后者可以在后台做复杂的优化。
  • 多种优化,包括自适应的基于运行期抽样的query优化。
  • 广泛使用多种cache。
  • 完整的SQL支持。

Introduction

Procella可以同时满足在/离线的读写请求,涵盖以下YouTube的需求:

  • 报表(reporting)与仪表盘(dashboard),需要支持准实时响应大数据量的复杂分析,以及读到足够新鲜的数据。
  • 内嵌统计(embeded statistics),如video的喜爱或观看数,需要支持实时响应模式简单但参数变化非常多的读写请求,请求量级可以达到每秒百万次。
  • 监控(monitoring),类似于dashboard,还需要有降精度、数据淘汰、近似函数、以及时序函数。
  • 临时分析(ad-hoc analysis),qps不大,能忍受分钟级延时,但查询复杂(且难以预测),数据量可能非常大。

之前YouTube使用了不同系统分别满足以上需求,但遇到了下面这些问题:

  • 数据需要用不同的ETL流程导入不同的系统,导致了资源浪费、数据不一致或质量差、加载时间长、开发和维护代价高、业务响应时间慢等问题。
  • 不同系统有不同的API,需要用不同的工具(尤其有些系统不支持完整的SQL),学习成本高。
  • 部分系统在YouTube的数据量级下有性能或伸缩性上的问题。

Procella实现了以下特性来解决上述问题:

  • 支持几乎完整的SQL标准,包括复杂的多阶段join、分析函数和set操作。
  • 通过计算(borg)与存储(colossus)分离实现了高伸缩性。
  • 高性能,同时提供高吞吐与低延时。
  • 通过lambda架构提供足够的数据新鲜度。

Architecture

Procella基于了多种Google的infrastructure,这些系统的特性也影响了Procella的设计:

  • 存储用Colossus,文件写完后不可变,远程访问延时高。
  • 计算用Borg,运行大量小task比少量大task更有助于提高总的利用率(调度容易);failover概率大,外加没有本地存储,更有理由拆小task了;单个task的性能难以预测。

Data

Procella中数据按表组织,每张表保存为多个文件(或称为tablet、partition),主要使用一种叫Artus的列存格式,也支持Capacitor等格式。同一份数据可以被多个Procella实例服务。

与很多现代分析引擎类似,Procella也没有使用传统的B树来构建索引,而是使用了更轻量的结构,如zone map、bitmap、bloom filter、partition、sorted keys。这些结构部分是由registration server在文件注册时从文件头获取的,部分是由data server在评估query时惰性生成。

schema、table到file的映射、统计信息、zone map等数据主要存储在metadata store(BigTable和Spanner)中。

DDL命令(CREATE、ALTER、DROP等)会发给registration server,再存储到metadata store中。用户可以指定各种选项,包括实时还是离线导入。对于实时导入的表,用户还可以指定过期淘汰规则、降精度和compact的方式。

用户可以自己离线生成好数据,再用DDL命令把文件导入到表中。这个过程中RgS(就是registration server)会从文件头解析出table到file的映射和索引结构。如果文件头没有索引结构的话,Procella也能接受。

RgS还负责检查文件的schema与table的schema是否兼容。如果不兼容的话还可能裁剪或compact schema。

实时写入会发给ingestion server(IgS),如果数据schema与table schema不一致的话,IgS会做转换,再写进WAL中。同时IgS还会把数据发给对应的data server(根据data partition)。数据会先进data server的内存buffer中,再定期持久化下去(但只为了容错,不与全量数据合并)。IgS也可以同时把数据发给多个data server从而实现冗余服务。

WAL会被背景compact成data file。

两种路径并存可能导致最近几秒内数据是“脏”的,但随着compaction的进度会达到最终一致。如果为了一致性的考虑,query可以跳过buffer中的数据,但这样数据可见性就变差了。

compaction server会定期compact和重分区掉WAL文件,并转换为PAX风格的列存文件。过程中compaction server也可以执行用户定义的SQL逻辑来做过滤、聚合、淘汰等操作。每轮compaction之后compaction server会更新metadata server。被替换掉的文件随后会被清理掉。

Query Lifecycle

client的query会发给Root Server(RS)。RS负责解析、重写、生成逻辑plan、优化,最后得到执行plan。这个过程中它会用到Metadata Server(MDS)的数据去掉不需要的文件。RS会构建一棵执行树以满足满足复杂query的时序(如shuffle)、数据依赖(如broadcast join)、多变的join策略等需要,其中节点是query block(多个算子),边是数据流(聚合、远端执行、错峰执行等)。还可以向执行树中插入自定义的算子来做多种优化。RS完成整个过程后会把执行树和相关的统计信息、错误等发回给client。

Data Server(DS)会从RS或其它DS处收到它要处理的query部分。执行时数据可以来自本地内存、Colossus文件、RDMA、或其它DS。Procella会激进地下推计算,将filter、projection、aggregation(包括TOP、UNIQUE、COUNT DISTINCT、QUANTILE等近似聚合)、join等都尽量推给最靠近相应数据的DS,允许DS使用各种编码原生的函数做计算以得到最优的性能。

Optimizations

Caching

Procella使用了多种cache来缓解计算存储分离带来的延时上升:

  • DS会缓存Colossus的文件的handle。
  • DS使用了单独的LRU cache来缓存每个文件的头。
  • DS使用单独的cache缓存列存数据。Artus的内存与磁盘使用相同格式,减小了cache的开销。另外DS还会缓存一些派生信息,如复杂算子的结果,或是bloom filter。
  • MDS会使用本地的LRU cache缓存各种元数据。
  • Procella实现了affinity调度,将对相同数据和元数据的操作尽量调度到相同的DS或MDS上,极大提升了cache命中率。且这种affinity是松耦合的,如果请求跑到了其它server上,只是命中不了cache,不会有任何错误。

最终效果是在Procella的报表实例中,尽管内存只能装下2%的数据,但文件handle的命中率达到了99%,数据命中率达到了90%。

Data format

Procella一开始使用Capacitor作为列存格式,但这种格式不适合于点查和小范围的scan,因此Procella又开发了Artus格式,同时支持高性能的点查和scan。它的特性是:

  • 使用自定义编码而不是像LZW一样的通用压缩算法,可以不解压数据就直接seek。
  • 进行多轮自适应编码,先扫描一轮数据得到比较轻量的信息(如distinct值的数量、min/max、有序性等),再选择适合的编码,如字典编码、RLE、delta等,从而得到2倍于ZSTD等通用算法的压缩率,同时仍然保留了不解压直接操作数据的能力。每种编码都有函数来预估它处理特定数据的压缩率和速度,Procella会根据用户的偏好来做选择。
  • 选择能对有序列做二分查找的编码,同时还能做O(1)的基于行号的seek。对于像RLE这样的变长编码,Procella维护了一个skip block,每B行记录一个值。
  • 使用了一种不同于ColumnIO(参见Parquet)的方式来表示嵌套和可重复的数据类型,即不记录RL和DL,而是将optional和repeated的中间字段也按列持久化下来,它们的值是元素数量(optional为0或1,repeated是非负值),parent不存在的字段则不会被记录下来。这样重建对象时就可以先读到parent的元素数量,再直接去子字段中读相应数量的记录即可。另一个好处是这样可以支持对子字段的O(1)的seek。
  • 直接暴露字典索引、RLE等编码信息给evaluation engine。
  • 在文件头和列头中记录丰富的元数据,如数据schema、排序、min/max、编码信息、bloom filter等。
  • 支持倒排索引,目前主要是优化数组的IN操作。

以下是Capacitor与Artus的比较。

Evaluation Engine

许多现代的分析系统会在查询时使用LLVM将执行plan编译为native code来提高性能。但Procella同时要服务在线和离线请求,对于在线请求编译开销太大。Procella因此用其它方式实现了名为Superluminal的evaluation engine:

  • 重度使用C++的模板元编程从而在编译期生成代码。
  • 以block为单位处理数据以利用向量化计算和可感知cache的算法的优势。
  • 直接操作底层数据编码(而不是丧失了编码信息只能通过中间层来操作),并尽量保持这一特性。
  • 以一种全列存的方式处理结构化数据,不持久化中间结果。
  • 动态合并filter,并沿着执行plan一直下推到扫数据的节点。

以下是Superluminal和开源的Supersonic运行TPC-H的对比:

Partitioning & Indexing

Procella支持多级的分区和聚类。通常fact table是按date分区,再按多个维度聚类的。维表通常会按维度key分区排序。这种特点利于Procella快速裁剪掉不需要读的文件和做co-partition的join而不需要shuffle数据。

MDS的内存中的元数据也使用了多种压缩算法,用有限的内存装下海量的元数据。MDS的裁剪对Procella的性能影响巨大,因此保证MDS的大多数操作只走内存是非常重要的。

DS在处理query时会使用bloom filter、min/max、倒排索引等文件元数据来最小化磁盘访问,这些元数据会异步缓存在LRU cache中。

Distributed operations

Procella有多种join策略,既可以显式使用hint,也可以由优化器来选择:

  • broadcast,当join一边的表特别小时使用。
  • co-partitioned,当join key分别是两边的partition key时使用。
  • shuffle,当两边都很大,没办法根据join key分区的时候,数据会按join key发给若干个中间server来shuffle。
  • pipelined,当右边是个复杂的query,但很可能结果集很小时,先执行右边,再把结果发送给左边做类似broadcast join的操作。
  • remote lookup。很多时候,维表(构建端)很大,但按join key分区,但fact table(探测端)却不是。此时探测端DS会向构建端DS发送RPC,获取这次join需要的key和value。为了减少RPC的开销,探测端要用上所有可能的filter,批量发送key,最大程度减少RPC的数量。构建端也会下推projection和filter以保证只传输必要的数据。得益于Artus的高性能点查,Procella可以高效地执行lookup join。

Procella还应用了以下方法来缓解长尾请求对服务的影响:

  • RS会在query执行期间维护DS延时的分位数,如果遇到了明显慢于中位数的DS,RS会选另一台DS发送一个backup请求。
  • RS会控制发给DS的请求,以避免太多请求将DS压垮。
  • RS会区分发往DS的请求的优先级,小query优先级更高。相应地,DS对高低优先级分别有一个线程池。

Procella会为非常大型的聚合增加一层预聚合。

Query Optimization

Procella中有一种virtual table,类似于materialized view。在query时Procella会自适应使用virtual table来实现最优化的查询:

  • 选择virtual table时不光使用size来判断,还会通过在table schema上匹配filter来看数据组织方式(分区、聚类等)是否合适。
  • 可以从多个virtual table中读数据,再使用UNION ALL拼起来。
  • 可以从不同virtual table中读不同时间段的数据,再使用UNION ALL拼起来。
  • 可以自动识别star模式的join,并为没有在fact table中反规范化的维表插入一个join。

Procella的优化器同时使用了静态和自适应优化技术。在编译期使用基于规则的优化器,如filter下推、subplan去相关、常量折叠等。在执行期会对这次query用到的真实数据抽样得到统计信息,再基于统计信息来选择或调优算子。

传统的收集统计信息的方式不仅更复杂,而且它有个问题是越靠近叶子节点统计越准确。相比之下Procella中随着数据流过query plan而更新统计信息的方式更简单,保证了整个plan有着相同的准确度。

具体实现是在query plan中增加一种“collection sites”,基于这种信息来决定如何如何转换未执行的部分。目前Procella会在shuffle时收集信息,因为shuffle是一个天然的物化点。这些信息随后会被用于决定如何切分数据(包括了切分函数和reducer数量)。

Procella在做聚合的时候会根据预估的行数决定需要多少个预聚合节点。在join的时候Procella会为每个partition统计它的key count、min/max、可能还有bloom filter(如果key count不是特别大),这些统计信息通常就不抽样了,而是从全部数据中得到。

基于这些信息Procella能做出以下join的优化:

  • broadcast join,会用到RDMA。
  • 如果filter端可以构建出一个不太大的、假阳性率在10%左右的bloom filter,则可以用这个bloom filter来裁剪探测端。
  • 如果join的一端已经按join key分区了,则可以只shuffle另一端匹配这些区间(min/max)的数据。
  • 如果没有其它优化可做了,则可以根据两边的表大小来自动选取分区数量再shuffle。

在处理ORDER BY时,Procella会先用一个query来估计要排序的行数,确定分成多少个shard,再用一个query来估计分位数,来确定如何划分数据。

以上自适应优化对大型query效果很好,但其开销对于小query来说太大了。此时用户可以传入hint而不开启自适应优化。

另外目前自适应优化还不能应用到join顺序的选择上,这里可以继续使用传统的基于统计信息的优化。

Data Ingestion

Procella提供了离线数据生成工具,将用户的数据用MapReduce转换为最适合Procella处理的形式。

Serving Embeded Statistics

Procella需要支持特别高频的嵌入页面的计数器query,如SELECT SUM(views) FROM Table WHERE video_id = N,这种query扫过的数据量并不会太大,但要能以非常低的延时服务非常高的QPS,且这些值的更新频率也很高。Procella会以“stats serving”模式来运行这些实例,会启用以下优化:

  • RgS会在新数据注册后立即通知DS加载数据,而不是惰性加载。
  • MDS直接与RS编译在一起,省掉了RPC的开销。
  • 所有元数据都会被预加载到内存中并保持更新。
  • 激进地缓存query plan。
  • RS会把相同key的请求合并到一起同时发给两个DS,从而避免延时长尾。
  • 监控RS和DS的错误率和延时,一旦有问题就替换。
  • 高开销的优化会被关掉。

Performance

(略)

Google Technologies

  • Dremel主要是stateless,没有这么广泛使用cache;Dremel全球使用一个实例,而Procella为不同用户使用不同实例(从而开启不同优化);Dremel存储使用Capacitor,没有索引,而Artus有丰富的索引。
  • Mesa场景与Procella不太一样,本身不支持SQL。
  • F1 Query主打的是query federation,查询与存储引擎分离,主要靠挖掘不同引擎的特点来适应不同的场景。而Procella则是使用同一套存储引擎来服务不同场景,查询与存储耦合在一起。
  • PowerDrill主要针对大数据量的简单查询来优化,如日志等。
  • Spanner主打的是ACID的OLTP需求。

External Technologies

(只列一下产品)

  • Ad-hoc analysis:
    • Presto以及对应的AWS Athena
    • Spark SQL
    • Snowflake
    • Redshift
  • Real time reporting:
    • Druid
    • Pinot
    • ElasticSearch
    • Amplitude
    • Mixpanel
    • Apache Kylin
  • Monitoring:
    • Stackdriver
    • Cloudwatch
    • Gorilla和Beringei
    • InfluxDB
    • OpenTSDB
  • Serving Statistics:
    • HBase