阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

基于Apache Doris的湖仓分析

72次阅读
没有评论

共计 4097 个字符,预计需要花费 11 分钟才能阅读完成。

导读 Doris 是一种 MPP 架构的分析型数据库,主要面向多维分析、数据报表、用户画像分析等场景。自带分析引擎和存储引擎,支持向量化执行引擎,不依赖其他组件,兼容 MySQL 协议。
Doris 简介

Apache Doris 具备以下几个特点:

  • 良好的架构设计,支持高并发低延时的查询服务,支持高吞吐量的交互式分析。多 FE 均可对外提供服务,并发增加时,线性扩充 FE 和 BE 即可支持高并发的查询请求。
  • 支持批量数据 load 和流式数据 load,支持数据更新。支持 Update/Delete 语法,unique/aggregate 数据模型,支持动态更新数据,实时更新聚合指标。
  • 提供了高可用,容错处理,高扩展的企业级特性。FE Leader 错误异常,FE Follower 秒级切换为新 Leader 继续对外提供服务。
  • 支持聚合表和物化视图。多种数据模型,支持 aggregate, replace 等多种数据模型,支持创建 rollup 表,支持创建物化视图。rollup 表和物化视图支持动态更新,无需用户手动处理。
  • MySQL 协议兼容,支持直接使用 MySQL 客户端连接,非常易用的数据应用对接。
  • Doris 由 Frontend(以下简称 FE)和 Backend(以下简称 BE)组成,其中 FE 负责接受用户请求、编译、优化、分发执行计划、元数据管理、BE 节点的管理等功能,BE 负责执行由 FE 下发的执行计划,存储和管理用户数据。

    基于 Apache Doris 的湖仓分析

    数据湖格式 Hudi 简介

    Hudi 是下一代流式数据湖平台,为数据湖提供了表格式管理的能力,提供事务,ACID,MVCC,数据更新删除,增量数据读取等功能。支持 Spark, Flink, Presto, Trino 等多种计算引擎。

    基于 Apache Doris 的湖仓分析

    Hudi 根据数据更新时行为不同分为两种表类型:

    基于 Apache Doris 的湖仓分析

    针对 Hudi 的两种表格式,存在 3 种不同的查询类型:

    基于 Apache Doris 的湖仓分析

    1. Lambda 架构

    Lambda 将数据处理流分为在线分析和离线分析两条不同的处理路径,两条路径互相独立,互不影响。

    离线分析处理 T + 1 数据,使用 Hive/Spark 处理大数据量,不可变数据,数据一般存储在 HDFS 等系统上。如果遇到数据更新,需要 overwrite 整张表或整个分区,成本比较高。

    在线分析处理实时数据,使用 Flink/Spark Streaming 处理流式数据,分析处理秒级或分钟级流式数据,数据保存在 Kafka 或定期(分钟级)保存到 HDFS 中。​

    该套方案存在以下缺点:

  • 同一套指标可能需要开发两份代码来进行在线分析和离线分析,维护复杂。
  • 数据应用查询指标时可能需要同时查询离线数据和在线数据,开发复杂。
  • 同时部署批处理和流式计算两套引擎,运维复杂。
  • 数据更新需要 overwrite 整张表或分区,成本高。
  • 2. Kappa 架构

    随着在线分析业务越来越多,Lambda 架构的弊端就越来越明显,增加一个指标需要在线离线分别开发,维护困难,离线指标可能和在线指标对不齐,部署复杂,组件繁多。于是 Kappa 架构应运而生。

    Kappa 架构使用一套架构处理在线数据和离线数据,使用同一套引擎同时处理在线和离线数据,数据存储在消息队列上。​

    Kappa 架构也有一定的局限:

  • 流式计算引擎批处理能力较弱,处理大数据量性能较弱。
  • 数据存储使用消息队列,消息队列对数据存储有有效性限制,历史数据无法回溯。
  • 数据时序可能乱序,可能对部分在时序要求方面比较严格的应用造成数据错误。
  • 数据应用需要从消息队列中取数,需要开发适配接口,开发复杂。
  • 3. 基于数据湖的实时数仓

    针对 Lambda 架构和 Kappa 架构的缺陷,业界基于数据湖开发了 Iceberg, Hudi, DeltaLake 这些数据湖技术,使得数仓支持 ACID, Update/Delete,数据 Time Travel, Schema Evolution 等特性,使得数仓的时效性从小时级提升到分钟级,数据更新也支持部分更新,大大提高了数据更新的性能。兼具流式计算的实时性和批计算的吞吐量,支持的是近实时的场景。

    以上方案中其中基于数据湖的应用最广,但数据湖模式无法支撑更高的秒级实时性,也无法直接对外提供数据服务,需要搭建其他的数据服务组件,系统较为复杂。基于此背景下,部分业务开始使用 Doris 来承接,业务数据分析师需要对 Doris 与 Hudi 中的数据进行联邦分析,此外在 Doris 对外提供数据服务时既要能查询 Doris 中数据,也要能加速查询离线业务中的数据湖数据,因此我们开发了 Doris 访问数据湖 Hudi 中数据的特性。

    Doris 分析 Hudi 数据的设计原理

    基于以上背景,我们设计了 Apache Doris 中查询数据湖格式 Hudi 数据,因 Hudi 生态为 java 语言,而 Apache Doris 的执行节点 BE 为 C ++ 环境,C++ 无法直接调用 Hudi java SDK,针对这一点,我们有三种解决方案。

    实现 Hudi C++ client,在 BE 中直接调用 Hudi C++ client 去读写 Hudi 表​

    该方案需要完整实现一套 Hudi C++ client,开发周期较长,后期 Hudi 行为变更需要同步修改 Hudi C++ client,维护较为困难。

    BE 通过 thrift 协议发送读写请求至 Broker,由 Broker 调用 Hudi java client 读取 Hudi 表

    该方案需要在 Broker 中增加读写 Hudi 数据的功能,目前 Broker 定位仅为 fs 的操作接口,引入 Hudi 打破了 Broker 的定位。第二,数据需要在 BE 和 Broker 之间传输,性能较低。

    在 BE 中使用 JNI 创建 JVM,加载 Hudi java client 去读写 Hudi 表

    该方案需要在 BE 进程中维护 JVM,有 JVM 调用 Hudi java client 对 Hudi 进行读写。读写逻辑使用 Hudi 社区 java 实现,可以维护与社区同步;同时数据在同一个进程中进行处理,性能较高。但需要在 BE 维护一个 JVM,管理较为复杂。

    使用 BE arrow parquet c++ api 读取 hudi parquet base file,hudi 表中的 delta file 暂不处理

    该方案可以由 BE 直接读取 hudi 表的 parquet 文件,性能最高。但当前不支持 base file 和 delta file 的合并读取,因此仅支持 COW 表 Snapshot Queries 和 MOR 表的 Read Optimized Queries,不支持 Incremental Queries。

    综上,我们选择方案四,第一期实现了 COW 表 Snapshot Queries 和 MOR 表的 Read Optimized Queries,后面联合 Hudi 社区开发 base file 和 delta file 合并读取的 C ++ 接口。

    Doris 分析 Hudi 数据的技术实现

    Doris 中查询分析 Hudi 外表使用步骤非常简单。

    创建 Hudi 外表

    建表时指定 engine 为 Hudi,同时指定 Hudi 外表的相关信息,如 hive metastore uri,在 hive metastore 中的 database 和 table 名字等。

    建表仅仅在 Doris 的元数据中增加一张表,无任何数据移动。

    建表时支持指定全部或部分 hudi schema,也支持不指定 schema 创建 hudi 外表。指定 schema 时必须与 hiveMetaStore 中 hudi 表的列名,类型一致。

    Example:

    Plaintext
       CREATE TABLE example_db.t_hudi 
        ENGINE=HUDI
        PROPERTIES (
        "hudi.database" = "hudi_db",
        "hudi.table" = "hudi_table",
        "hudi.hive.metastore.uris"  =  "thrift://127.0.0.1:9083"
        );
        
        
        CREATE TABLE example_db.t_hudi (
        column1 int,
        column2 string)
        ENGINE=HUDI
        PROPERTIES (
        "hudi.database" = "hudi_db",
        "hudi.table" = "hudi_table",
        "hudi.hive.metastore.uris"  =  "thrift://127.0.0.1:9083"
        );
    查询 Hudi 外表

    查询 Hudi 数据表时,FE 在 analazy 阶段会查询元数据获取到 Hudi 外表的的 hive metastore 地址,从 Hive metastore 中获取 hudi 表的 schema 信息与文件路径。

  • 获取 hudi 表的数据地址。
  • FE 规划 fragment 增加 HudiScanNode。HudiScanNode 中获取 Hudi table 对应的 data file 文件列表。
  • 根据 Hudi table 获取的 data file 列表生成 scanRange。
  • 下发 HudiScan 任务至 BE 节点。
  • BE 节点根据 HudiScanNode 指定的 Hudi 外表文件路径调用 native parquet reader 进行数据读取。
  • 基于 Apache Doris 的湖仓分析

    后期规划

    目前 Apche Doris 查询 Hudi 表已合入社区,当前已支持 COW 表的 Snapshot Query,支持 MOR 表的 Read Optimized Query。对 MOR 表的 Snapshot Query 暂时还未支持,流式场景中的 Incremental Query 也没有支持。

    后续还有几项工作需要处理,我们和社区也在积极合作进行中:

  • MOR 表的 Snapshot Query。MOR 表实时读需要合并读取 Data file 与对应的 Delta file,BE 需要支持 Delta file AVRO 格式的读取,需要增加 avro 的 native 读取方式。
  • COW/MOR 表的 Incremental Query。支持实时业务中的增量读取。
  • BE 读取 Hudi base file 和 delta file 的 native 接口。目前 BE 读取 Hudi 数据时,仅能读取 data file,使用的是 parquet 的 C ++ SDK。后期我们和联合 Hudi 社区提供 Huid base file 和 delta file 的 C ++/Rust 等语言的读取接口,在 Doris BE 中直接使用 native 接口来查询 Hudi 数据。
  • 今天的分享就到这里,谢谢大家。

    阿里云 2 核 2G 服务器 3M 带宽 61 元 1 年,有高配

    腾讯云新客低至 82 元 / 年,老客户 99 元 / 年

    代金券:在阿里云专用满减优惠券

    正文完
    星哥玩云-微信公众号
    post-qrcode
     0
    星锅
    版权声明:本站原创文章,由 星锅 于2024-07-25发表,共计4097字。
    转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
    【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
    阿里云-最新活动爆款每日限量供应
    评论(没有评论)
    验证码
    【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中