本篇文章给大家谈谈hudi,以及对应的知识点,希望对各位有所帮助,不要忘了收藏本站喔。
本文目录一览:
本篇为Hudi概念和特性相关介绍。依据于官网和相关博客资料,融入了个人理解。内容可能会有疏漏,欢迎大家指正和补充。
Apache Hudi是一个Data Lakes的开源方案,Hudi是Hadoop Updates and Incrementals的简写,它是由Uber开发并开源的Data Lakes解决方案。Hudi具有如下基本特性/能力:
Hudi内部维护了时间线,支持按照数据的到达时间顺序来获取数据。Hudi确保时间线上的动作是原子性的。
数据使用列存储形式存放,每次提交都会产生新版本的列存储文件。将原有数据文件copy一份,合并入变更的数据后保存一份新的数据。因此写入放大很高,读取放大基本为0。适用于读取多写入少的场景。写入存在延迟,Flink每次checkpoint或者Spark每次微批处理才会commit新数据。读取延迟很低。可以通过 hoodie.cleaner.commits.retained 配置项确定需要保存的最近commit个数,防止存储空间占用无限放大。可以提供较好的并发控制,因为读取数据的时候,无法读取到正在写入但尚未commit或者是写入失败的数据。
数据使用行存储(avro)和列存储(parquet)共同存放。其中新变更的数据使用行存储,历史数据采用列存储。每当满足一定条件的时候(经过n个commit或者经过特定时间)Hudi开始compact操作,将行存储的数据和列存储的合并,生成新的列存储文件。适用于写多读少的场景。写入延迟很小,读取的时候需要合并新文件avro和parquet,存在一定延迟。需要使用定期的online compaction或者是手工执行的offline compaction将avro格式和parquet格式文件合并。
通过配置项 table.type 指定表类型。
两种表类型的特性对比:
Snapshot Queries : Queries see the latest snapshot of the table as of a given commit or compaction action. In case of merge on read table, it exposes near-real time data(few mins) by merging the base and delta files of the latest file slice on-the-fly. For copy on write table, it provides a drop-in replacement for existing parquet tables, while providing upsert/delete and other write side features.
Incremental Queries : Queries only see new data written to the table, since a given commit/compaction. This effectively provides change streams to enable incremental data pipelines.
Read Optimized Queries : Queries see the latest snapshot of table as of a given commit/compaction action. Exposes only the base/columnar files in latest file slices and guarantees the same columnar query performance compared to a non-hudi columnar table.
总结:
通过 hoodie.datasource.query.type 参数控制查询类型。配置项对应为:
Hudi通过HoodieKey(recordKey和poartition path)和file id的对应关系来加速upsert操作。这正是Hudi的索引机制。这种对应关系在初始记录写入之后不会在改变。
对于COW表插入数据的场景,索引可以快速的过滤掉不涉及数据修改的file。对于MOR表插入数据的场景,索引能够很快定位到需要合并的文件。不需要像Hive ACID一样,合并所有的base file。
Hudi支持下面4种Index选项:
所有具有GLOBAL和非GLOBAL两种(HBase本来就是global的)。其中:
使用场景:
hoodie.index.type 用来修改Index选项。
注意:使用GLOBAL_BLOOM的时候需要留意 hoodie.bloom.index.update.partition.path 配置。源码给出的解释如下:
设置为true的话,如果record更新操作修改了partition,则会在新partition插入这条数据,然后在旧partition删除这条数据。
如果设置为false,会在就parititon更新这条数据。
hoodie.simple.index.update.partition.path 对于GLOBAL_SIMPLE也同理。
用于提高读写性能,避免使用list file操作。
0.10.1以后metadata table默认启用( hoodie.metadata.enable )。
为了确保metadata table保持最新,针对同一张Hudi表的写操作需要根据不同的场景增加相应配置。
单个writer同步表服务(清理,聚簇,压缩),只需配置 hoodie.metadata.enable=true ,重启writer。
单个writer异步表服务(同一进程内),需要配置乐观并发访问控制:
多个writer(不同进程)异步表服务,需要配置乐观并发访问控制:
外部分布式锁提供方有: ZookeeperBasedLockProvider , HiveMetastoreBasedLockProvider 和 DynamoDBBasedLockProvider 。
总结:
通过 write.operation 配置项指定。
The following is an inside look on the Hudi write path and the sequence of events that occur during a write.
试验功能,Spark 3.1.x和3.2.x支持Schema变更。
Primary key由RecordKey和Partition path组成。
RecordKey由 hoodie.datasource.write.recordkey.field 决定。Partition path由 hoodie.datasource.write.partitionpath.field 决定。
Hudi支持MVCC和乐观并发访问两种方式。MVCC方式所有的table service都使用同一个writer来保证没有冲突,避免竟态条件。新版本的Hudi增加了乐观并发访问控制(OCC)。支持文件级别的乐观锁。需要依赖外部组件实现乐观锁,例如Zookeeper,Hive metastore等。
启用并发控制:
使用Zookeeper分布式锁:
使用HiveMetastore分布式锁:
禁用并发写入:
hudi数据分布式需要备份。分布式存储拥有多副本,备份能保证一定的数据可恢复性。Hudi是HadoopUpdatesandIncrementals的简写,它是由Uber开发并开源的DataLakes解决方案。Hudi用于管理的数据库层上构建具有增量数据管道的流式数据湖,同时针对湖引擎和常规批处理进行了优化。
没有hudi这个单词,只有hive是正确单词,所以不存在区别。
重点词汇:hive
英[haɪv]
释义:
n.忙碌的场所,繁忙的地方;蜂房,蜂箱;蜂群;蜂窝状物体。
v.(蜜蜂)入蜂箱。
[复数:hives;第三人称单数:hives;现在分词:hiving;过去式:hived;过去分词:hived]
短语:
Hive Rise蜂巢行动。
例句:
用作名词(n)
A hive cannot exist without a queen.
蜂房不可无蜂王。
There were so many bees in the hive that I felt great fear.
蜂房里有那么多蜜蜂我感到很害怕。
方法如下:
1. 项目背景
传统数仓的组织架构是针对离线数据的OLAP(联机事务分析)需求设计的,常用的导入数据方式为采用sqoop或spark定时作业逐批将业务库数据导入数仓。随着数据分析对实时性要求的不断提高,按小时、甚至分钟级的数据同步越来越普遍。由此展开了基于spark/flink流处理机制的(准)实时同步系统的开发。
然而实时同步数仓从一开始就面临如下几个挑战:
小文件问题。不论是spark的microbatch模式,还是flink的逐条处理模式,每次写入HDFS时都是几M甚至几十KB的文件。长时间下来产生的大量小文件,会对HDFS namenode产生巨大的压力。
对update操作的支持。HDFS系统本身不支持数据的修改,无法实现同步过程中对记录进行修改。
事务性。不论是追加数据还是修改数据,如何保证事务性。即数据只在流处理程序commit操作时一次性写入HDFS,当程序rollback时,已写入或部分写入的数据能随之删除。
Hudi是针对以上问题的解决方案之一。以下是对Hudi的简单介绍,主要内容翻译自官网。
2. Hudi简介
2.1 时间线(Timeline)
Hudi内部按照操作时刻(instant)对表的所有操作维护了一条时间线,由此可以提供表在某一时刻的视图,还能够高效的提取出延后到达的数据。每一个时刻包含:
时刻行为:对表操作的类型,包含:
commit:提交,将批次的数据原子性的写入表;
clean: 清除,后台作业,不断清除不需要的旧得版本的数据;
delta_commit:delta 提交是将批次记录原子性的写入MergeOnRead表中,数据写入的目的地是delta日志文件;
compacttion:压缩,后台作业,将不同结构的数据,例如记录更新操作的行式存储的日志文件合并到列式存储的文件中。压缩本身是一个特殊的commit操作;
rollback:回滚,一些不成功时,删除所有部分写入的文件;
savepoint:保存点,标志某些文件组为“保存的“,这样cleaner就不会删除这些文件;
时刻时间:操作开始的时间戳;
状态:时刻的当前状态,包含:
requested 某个操作被安排执行,但尚未初始化
inflight 某个操作正在执行
completed 某一个操作在时间线上已经完成
Hudi保证按照时间线执行的操作按照时刻时间具有原子性及时间线一致性。
2.2 文件管理
Hudi表存在在DFS系统的 base path(用户写入Hudi时自定义) 目录下,在该目录下被分成不同的分区。每一个分区以 partition path 作为唯一的标识,组织形式与Hive相同。
每一个分区内,文件通过唯一的 FileId 文件id 划分到 FileGroup 文件组。每一个FileGroup包含多个 FileSlice 文件切片,每一个切片包含一个由commit或compaction操作形成的base file 基础文件(parquet文件),以及包含对基础文件进行inserts/update操作的log files 日志文件(log文件)。Hudi采用了MVCC设计,compaction操作会将日志文件和对应的基础文件合并成新的文件切片,clean操作则删除无效的或老版本的文件。
2.3 索引
Hudi通过映射Hoodie键(记录键+ 分区路径)到文件id,提供了高效的upsert操作。当第一个版本的记录写入文件时,这个记录键值和文件的映射关系就不会发生任何改变。换言之,映射的文件组始终包含一组记录的所有版本。
2.4 表类型查询
Hudi表类型定义了数据是如何被索引、分布到DFS系统,以及以上基本属性和时间线事件如何施加在这个组织上。查询类型定义了底层数据如何暴露给查询。
| 表类型 | 支持的查询类型 | | :-------------------- | :----------------------------- | | Copy On Write写时复制 | 快照查询 + 增量查询 | | Merge On Read读时合并 | 快照查询 + 增量查询 + 读取优化 |
2.4.1 表类型
Copy On Write:仅采用列式存储文件(parquet)存储文件。更新数据时,在写入的同时同步合并文件,仅仅修改文件的版次并重写。
Merge On Read:采用列式存储文件(parquet)+行式存储文件(avro)存储数据。更新数据时,新数据被写入delta文件并随后以异步或同步的方式合并成新版本的列式存储文件。
| 取舍 | CopyOnWrite | MergeOnRead | | :----------------------------------- | :---------------------- | :-------------------- | | 数据延迟 | 高 | 低 | | Update cost (I/O)更新操作开销(I/O) | 高(重写整个parquet) | 低(追加到delta记录) | | Parquet文件大小 | 小(高更新(I/O)开销) | 大(低更新开销) | | 写入频率 | 高 | 低(取决于合并策略) |
2.4.2 查询类型
快照查询:查询会看到以后的提交操作和合并操作的最新的表快照。对于merge on read表,会将最新的基础文件和delta文件进行合并,从而会看到近实时的数据(几分钟的延迟)。对于copy on write表,当存在更新/删除操作时或其他写操作时,会直接代替已有的parquet表。
增量查询:查询只会看到给定提交/合并操作之后新写入的数据。由此有效的提供了变更流,从而实现了增量数据管道。
读优化查询:查询会看到给定提交/合并操作之后表的最新快照。只会查看到最新的文件切片中的基础/列式存储文件,并且保证和非hudi列式存储表相同的查询效率。
| 取舍 | 快照 | 读取优化 | | :------- | :------------------------------------------------------ | :------------------------------------ | | 数据延迟 | 低 | 高 | | 查询延迟 | 高(合并基础/列式存储文件 + 行式存储delta / 日志 文件) | 低(原有的基础/列式存储文件查询性能) |
3. Spark结构化流写入Hudi
以下是整合spark结构化流+hudi的示意代码,由于Hudi OutputFormat目前只支持在spark rdd对象中调用,因此写入HDFS操作采用了spark structured streaming的forEachBatch算子。具体说明见注释。
4. 测试结果
受限于测试条件,这次测试没有考虑update操作,而仅仅是测试hudi对追加新数据的性能。
数据程序一共运行5天,期间未发生报错导致程序退出。
kafka每天读取数据约1500万条,被消费的topic共有9个分区。
几点说明如下
1 是否有数据丢失及重复
由于每条记录的分区+偏移量具有唯一性,通过检查同一分区下是否有偏移量重复及不连续的情况,可以断定数据不存丢失及重复消费的情况。
2 最小可支持的单日写入数据条数
数据写入效率,对于cow及mor表,不存在更新操作时,写入速率接近。这本次测试中,spark每秒处理约170条记录。单日可处理1500万条记录。
3 cow和mor表文件大小对比
每十分钟读取两种表同一分区小文件大小,单位M。结果如下图,mor表文件大小增加较大,占用磁盘资源较多。不存在更新操作时,尽可能使用cow表。
一个热爱生活又放荡不羁的程序猿
本文主要讲解如下内容:
一、数据湖的优点
二、目前有哪些开源数据湖组件
三、三大数据湖组件对比
数据湖相比传统数仓而言,最明显的便是优秀的T+0能力,这个解决了Hadoop时代数据分析的顽疾。传统的数据处理流程从数据入库到数据处理通常需要一个较长的环节、涉及许多复杂的逻辑来保证数据的一致性,由于架构的复杂性使得整个流水线具有明显的延迟。
目前开源的数据湖有江湖人称“数据湖三剑客”的 Hudi、Delta Lake和Iceberg
Iceberg官网定义:Iceberg是一个通用的表格式(数据组织格式),提供高性能的读写和元数据管理功能。
Iceberg 的 ACID 能力可以简化整个流水线的设计,传统 Hive/Spark 在修正数据时需要将数据读取出来,修改后再写入,有极大的修正成本。
[玫瑰]ACID能力,无缝贴合流批一体数据存储
随着flink等技术的不断发展,流批一体生态不断完善,但在流批一体数据存储方面一直是个空白,直到Iceberg等数据湖技术的出现,这片空白被慢慢填补。
Iceberg 提供 ACID 事务能力,上游数据写入即可见,不影响当前数据处理任务,这大大简化了 ETL;
Iceberg 提供了 upsert、merge into 能力,可以极大地缩小数据入库延迟;
[玫瑰]统一数据存储,无缝衔接计算引擎和数据存储
Iceberg提供了基于流式的增量计算模型和基于批处理的全量表计算模型。批处理和流任务可以使用相同的存储模型,数据不再孤立;
Iceberg 支持隐藏分区和分区进化,方便业务进行数据分区策略更新。
Iceberg屏蔽了底层数据存储格式的差异,提供对于Parquet,ORC和Avro格式的支持。将上层引擎的能力传导到下层的存储格式。
[玫瑰]开放架构设计,开发维护成本相对可控
Iceberg 的架构和实现并未绑定于某一特定引擎,它实现了通用的数据组织格式,利用此格式可以方便地与不同引擎对接,目前 Iceberg 支持的计算引擎有 Spark、Flink、Presto 以及 Hive。
相比于 Hudi、Delta Lake,Iceberg 的架构实现更为优雅,同时对于数据格式、类型系统有完备的定义和可进化的设计;
面向对象存储的优化。Iceberg 在数据组织方式上充分考虑了对象存储的特性,避免耗时的 listing 和 rename 操作,使其在基于对象存储的数据湖架构适配上更有优势。
[玫瑰]增量数据读取,实时计算的一把利剑
Iceberg 支持通过流式方式读取增量数据,支持 Structed Streaming 以及 Flink table Source。
Apache Hudi是一种数据湖的存储格式,在Hadoop文件系统之上提供了更新数据和删除数据的能力以及消费变化数据的能力。
Hudi支持如下两种表类型:
使用Parquet格式存储数据。Copy On Write表的更新操作需要通过重写实现。
使用列式文件格式(Parquet)和行式文件格式(Avro)混合的方式来存储数据。Merge On Read使用列式格式存放Base数据,同时使用行式格式存放增量数据。最新写入的增量数据存放至行式文件中,根据可配置的策略执行COMPACTION操作合并增量数据至列式文件中。
应用场景
Hudi支持插入、更新和删除数据。可以实时消费消息队列(Kafka)和日志服务SLS等日志数据至Hudi中,同时也支持实时同步数据库Binlog产生的变更数据。
Hudi优化了数据写入过程中产生的小文件。因此,相比其他传统的文件格式,Hudi对HDFS文件系统更加的友好。
Hudi支持多种数据分析引擎,包括Hive、Spark、Presto和Impala。Hudi作为一种文件格式,不需要依赖额外的服务进程,在使用上也更加的轻量化。
Hudi支持Incremental Query查询类型,可以通过Spark Streaming查询给定COMMIT后发生变更的数据。Hudi提供了一种消费HDFS变化数据的能力,可以用来优化现有的系统架构。
Delta Lake是Spark计算框架和存储系统之间带有Schema信息数据的存储中间层。它给Spark带来了三个最主要的功能:
第一,Delta Lake使得Spark能支持数据更新和删除功能;
第二,Delta Lake使得Spark能支持事务;
第三,支持数据版本管理,运行用户查询 历史 数据快照。
核心特性
Delta lake
由于Apache Spark在商业化上取得巨 成功,所以由其背后商业公司Databricks推出的Delta lake也显得格外亮眼。在没有delta数据湖之前,Databricks的客户 般会采 经典的lambda架构来构建他们的流批处理场景。
Hudi
Apache Hudi是由Uber的 程师为满 其内部数据分析的需求 设计的数据湖项 ,它提供的fast upsert/delete以及compaction等功能可以说是精准命中 民群众的痛点,加上项 各成员积极地社区建设,包括技术细节分享、国内社区推 等等,也在逐步地吸引潜在 户的 光。
Iceberg
Netflix的数据湖原先是借助Hive来构建,但发现Hive在设计上的诸多缺陷之后,开始转为 研Iceberg,并最终演化成Apache下 个 度抽象通 的开源数据湖 案。
三者均为Data Lake的数据存储中间层,其数据管理的功能均是基于 系列的meta 件。Meta 件的 类似于数据库的catalog,起到schema管理、事务管理和数据管理的功能。与数据库不同的是,这些meta 件是与数据 件 起存放在存储引擎中的, 户可以直接看到。这个做法直接继承了 数据分析中数据对 户可见的传统,但是 形中也增加了数据被不 破坏的风险。 旦删了meta 录,表就被破坏了,恢复难度很 。
Meta包含有表的schema信息。因此系统可以 掌握schema的变动,提供schema演化的 持。Meta 件也有transaction log的功能(需要 件系统有原 性和 致性的 持)。所有对表的变更都会 成 份新的meta 件,于是系统就有了ACID和多版本的 持,同时可以提供访问 历史 的功能。在这些 ,三者是相同的。
Hudi 的设计 标正如其名,Hadoop Upserts Deletes and Incrementals(原为 Hadoop Upserts anD Incrementals),强调了其主要 持Upserts、Deletes 和 Incremental 数据处理,其主要提供的写 具是 Spark HudiDataSource API 和 提供的 HoodieDeltaStreamer,均 持三种数据写 式:UPSERT,INSERT 和 BULK_INSERT。其对 Delete 的 持也是通过写 时指定 定的选项 持的,并不 持纯粹的 delete 接 。
在查询 ,Hudi 持 Hive、Spark、Presto。
在性能 ,Hudi 设计了 HoodieKey , 个类似于主键的东西。对于查询性能, 般需求是根据查询谓词 成过滤条件下推 datasource。Hudi 这 没怎么做 作,其性能完全基于引擎 带的谓词下推和 partition prune 功能。
Hudi 的另 特 是 持 Copy On Write 和 Merge On Read。前者在写 时做数据的 merge,写 性能略差,但是读性能更 些。后者读的时候做 merge,读性能差,但是写 数据会 较及时,因 后者可以提供近实时的数据分析能 。最后,Hudi 提供了 个名为run_sync_tool 的脚本同步数据的 schema 到 Hive 表。Hudi 还提供了 个命令 具 于管理 Hudi 表。
Iceberg 没有类似的 HoodieKey 设计,其不强调主键。没有主键,做 update/delete/merge 等操作就要通过 Join 来实现, Join 需要有 个类似 SQL 的执 引擎。
Iceberg 在查询性能 做了 量的 作。值得 提的是它的 hidden partition 功能。Hidden partition 意思是说,对于 户输 的数据, 户可以选取其中某些列做适当的变换(Transform)形成 个新的列作为 partition 列。这个 partition 列仅仅为了将数据进 分区,并不直接体现在表的 schema中。
Delta 的定位是流批 体的, 持 update/delete/merge,spark 的所有数据写 式,包括基于dataframe 的批式、流式,以及 SQL 的 Insert、Insert Overwrite 等都是 持的。
不强调主键,因此其 update/delete/merge 的实现均是基于 spark 的 join 功能。在数据写 ,Delta 与 Spark 是强绑定的,这 点 Hudi 是不同的:Hudi 的数据写 不绑定 Spark。
在查询 ,Delta 前 持 Spark 与 Presto,但是,Spark 是不可或缺的,因为 delta log 的处理需要 到 Spark。这意味着如果要 Presto 查询 Delta,查询时还要跑 个 Spark 作业。更为难受的是,Presto 查询是基于 SymlinkTextInputFormat 。在查询之前,要运 Spark 作业 成这么个 Symlink 件。如果表数据是实时更新的,意味着每次在查询之前先要跑 个 SparkSQL,再跑 Presto。为此,EMR 在这 做了改进可以不必事先启动 个 Spark 任务。
在查询性能 ,开源的 Delta 乎没有任何优化。
Delta 在数据 merge 性能不如 Hudi,在查询 性能不如 Iceberg,是不是意味着 Delta 是处了呢?其实不然。Delta 的 优点就是与 Spark 的整合能 ,尤其是其流批 体的设计,配合 multi-hop 的 data pipeline,可以 持分析、Machine learning、CDC 等多种场景。使 灵活、场景 持完善是它相 Hudi 和 Iceberg 的最 优点。另外,Delta 号称是 Lambda 架构、Kappa 架构的改进版, 需关 流批, 需关 架构。这 点上 Hudi 和 Iceberg 是 所不及的。
三个引擎的初衷场景并不完全相同,Hudi 为了 incremental 的 upserts,Iceberg 定位于 性能的分析与可靠的数据管理,Delta 定位于流批 体的数据处理。这种场景的不同也造成了三者在设计上的差别。尤其是 Hudi,其设计与另外两个相 差别更为明显。
Delta、Hudi、Iceberg三个开源项 中,Delta和Hudi跟Spark的代码深度绑定,尤其是写 路径。这两个项 设计之初,都基本上把Spark作为他们的默认计算引擎了。 Apache Iceberg的 向 常坚定,宗旨就是要做 个通 化设计的Table Format。
Iceberg完美的解耦了计算引擎和底下的存储系统,便于多样化计算引擎和 件格式,很好的完成了数据湖架构中的Table Format这 层的实现,因此也更容易成为Table Format层的开源事实标准。另 ,Apache Iceberg也在朝着流批 体的数据存储层发展,manifest和snapshot的设计,有效地隔离不同transaction的变更, 常 便批处理和增量计算。并且,Apache Flink已经是 个流批 体的计算引擎, 者都可以完美匹配,合 打造流批 体的数据湖架构。
Apache Iceberg这个项 背后的社区资源 常丰富。在国外,Netflix、Apple、Linkedin、Adobe等公司都有PB级别的 产数据运 在Apache Iceberg上;在国内,腾讯这样的巨头也有 常庞 的数据跑在Apache Iceberg之上,最 的业务每天有 T的增量数据写 。
hudi的介绍就聊到这里吧,感谢你花时间阅读本站内容,更多关于、hudi的信息别忘了在本站进行查找喔。
版权声明:本文内容由互联网用户自发贡献,本站不拥有所有权,不承担相关法律责任。如果发现本站有涉嫌抄袭的内容,欢迎发送邮件至举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。
标签: #hudi
相关文章