数据处理

在数据处理阶段,两大挑战分别是如何保证数据质量和如何降低运维成本

批处理

UNIX的管道

MapReduce的不足

需要的:

  1. 一种技术抽象让多步骤数据处理变得易于维护
  2. 不要复杂的配置,需要能自动进行性能优化
  3. 要能把数据处理的描述语言,与背后的运行引擎解耦合开来
  4. 要统一批处理和流处理的编程模型
  5. 要在架构层面提供异常处理和数据监控的能力
stateDiagram-v2  state 前端/用户端 {    统一批处理/流处理的的描述语法 --> 有向图编译系统  }  有向图编译系统 --> 后端/计算引擎: DAG  state 后端/计算引擎 {    基于有向图的优化系统    计算资源自动分配系统    自动监控和错误追踪  }

分布式批处理需要解决的问题

  1. 如何将输入数据分区
  2. 容错:任务可能随时会失败

模式

Workflow

复制模式

flowchart  数据集 --> 复制器  复制器 --> 工作流1  复制器 --> 工作流2  复制器 --> 工作流3

过滤模式

flowchart  数据集(1,2,3) --> 过滤器  过滤器 --> 工作流(1,2)

分离模式

flowchart  数据集(1,2,3) --> 分离器  分离器 --> 工作流1(1,2)  分离器 --> 工作流2(2,3)

合并模式

flowchart  数据集1(1,2) --> 合并器  数据集2(2,3) --> 合并器  合并器 --> 工作流(1,2,3)

发布订阅

DAG

单任务 DAG

stateDiagram    [*] --> Stage1Task1    [*] --> Stage1Task2    [*] --> Stage1Task3    [*] --> Stage1Task4    Stage1Task1 --> Stage2Task1    Stage1Task1 --> Stage2Task2    Stage1Task2 --> Stage2Task1    Stage1Task2 --> Stage2Task2    Stage1Task3 --> Stage2Task3    Stage1Task3 --> Stage2Task4    Stage1Task4 --> Stage2Task3    Stage1Task4 --> Stage2Task4    Stage2Task1 --> Stage3Task1    Stage2Task2 --> Stage3Task1    Stage2Task3 --> Stage3Task1    Stage2Task4 --> Stage3Task1    state "Stage 1" as Stage1 {        Stage1Task1: Task 1.1        Stage1Task2: Task 1.2        Stage1Task3: Task 1.3        Stage1Task4: Task 1.4    }    state "Stage 2" as Stage2 {        Stage2Task1: Task 2.1        Stage2Task2: Task 2.2        Stage2Task3: Task 2.3        Stage2Task4: Task 2.4    }    state "Stage 3" as Stage3 {        Stage3Task1: Task 3.1    }    Stage1 --> Stage2    Stage2 --> Stage3

单任务 DAG 可以读取零个或多个数据源,并输出零个或多个结果数据源

单任务DAG的操作和转换:

  1. 输入
  2. map only 处理:包括 map、flatMap、forEach 等不在处理系统的节点之间交换任何数据的操作
  3. shuffle:数据根据 key 在多个节点之间重新分发,是最耗费资源的
  4. 重分区:一种轻量的 shuffle,其将数据进行重新调整所属节点
  5. 分布式join:包括广播 join、远程缓存 join 等

DAG 管道

stateDiagram-v2  direction LR  存储节点1 --> 处理节点1  存储节点1 --> 处理节点2  处理节点1 --> 存储节点2  处理节点1 --> 存储节点3  处理节点2 --> 存储节点4

不同于单任务 DAG,管道 DAG,涉及多个任务。管道 DAG 的每个处理节点都会读取持久存储节点的数据并写入持久存储节点

DAG 管道的特点:

  1. 处理节点之间不会互相连接,而是会通过一个存储节点连接。处理节点负责从存储节点获取数据、处理数据,并将数据发送到一个或多个存储节点
  2. 管道总是以存储节点开始和结束,因为处理节点处理完数据后不会保留状态,只有存储节点保留了数据的值
  3. 存储节点可以被多个处理节点使用,以实现产出复杂、昂贵的数据重用及数据一致性保证

架构

Lambda

完整的数据集 = λ (实时数据) * λ (历史数据)

graph TD    NewData[New data]    subgraph Speed Layer        RealtimeView1[Realtime view]        RealtimeView2[Realtime view]        RealtimeView3[Realtime view]    end    subgraph Batch Layer        MasterDataset[Master dataset]    end    subgraph Serving Layer        BatchView1[Batch view]        BatchView2[Batch view]        BatchView3[Batch view]    end    Query["Query: 'How many...?'"]    NewData --> SpeedLayer    NewData --> BatchLayer    SpeedLayer --> RealtimeView1    SpeedLayer --> RealtimeView2    SpeedLayer --> RealtimeView3    BatchLayer --> MasterDataset    MasterDataset --> ServingLayer    ServingLayer --> BatchView1    ServingLayer --> BatchView2    ServingLayer --> BatchView3    RealtimeView1 --> Query    RealtimeView2 --> Query    RealtimeView3 --> Query    BatchView1 --> Query    BatchView2 --> Query    BatchView3 --> Query

离线 + 实时数据视图

这种架构的最大缺点在于需要维护两套代码分别进行批处理与流处理,由此可能会产生的两个层的数据不一致、没有足够的时间进行批处理等问题

批处理层

更新算法:

服务层

服务层的性能优化关键在于通过适当的索引结构设计来提高吞吐量和降低延迟。Lambda架构的优势在于可以根据查询需求定制服务层,使得系统能够在不同的负载和查询模式下保持高效运行

规范化可以保持最小数据冗余,但同时查询慢。反规范化会增加数据冗余,但同时能提升查询性能。ambda架构通过分离主数据集(master dataset)和服务层(serving layer),使批处理层照规范化方式存储,服务层为查询服务量身定制,可以根据需要进行优化,达到最佳性能

服务层的数据库需求:

  1. 批量写入:需要支持批量生成视图。新版本的视图生成后,必须能够完全替换旧版本
  2. 可伸缩:必须能够处理任意大小的视图。这意味着需要分布在多台机器上,类似于分布式文件系统和批处理框架
  3. 随机读取:使用索引直接访问视图的小部分,以确保查询的低延迟
  4. 分布式容错

服务层不需要随机写入功能。这是因为视图仅以批量方式生成,而不是逐个更新的。同时随机写入带来了更多的复杂性,没有随机写入,数据库可以优化读取路径。

速度层

在速度层,使用与批处理类似的全量算法是不可行的,可行的解决方案都依赖于增量算法,思路是在数据进入时更新实时视图,从而重用之前用于生成视图的工作

存储实时视图的数据库最主要的是要满足,这两点一般为 NoSQL 数据库所满足:

速度层的特点:

增量计算的挑战在于分布式系统带来的数据一致性问题,除了可以利用CRDTs(无冲突复制数据类型)来实现增量计算时的最终一致性,Lambda架构提供了天然的保护机制。即使实时视图由于某些边缘情况或合并算法的错误而损坏,批处理层和服务层的视图最终会自动修正这些错误

查询层

负责利用批处理视图和实时视图来响应查询,确定从每个视图中使用哪些数据,并将它们合并以得到正确的结果

查询层的视图必须被设计为可合并的,时间导向的查询自然适合这种结构

Kappa

stateDiagram-v2  state 速度层 {    任务N    任务N+1  }  数据 --> 速度层  state 服务层 {    数据N    数据N+1  }  任务N --> 数据N  任务N+1 --> 数据N+1  应用 --> 数据N+1

kappa 则是通过只支持流处理来避免 Lambda 架构的复杂性,当计算逻辑发生变更时,就可以将 offset 回拨,重新生成数据,当新的数据追赶上了旧数据,应用便可以切换到新数据读取数据

这种架构的缺点在于很难对历史数据进行重新处理。

Spark

Spark 比 MapReduce 快的原因:更为简单的 RDD 编程模型减少了作业调度次数,以及优先使用内存

各组件

  1. SparkContext 启动 DAGScheduler 构造执行的 DAG 图,拆分成计算任务
  2. Driver 向 Cluster Manager 请求计算资源,分配 Worker
  3. Worker 向 Driver 注册并下载代码执行

RDD

分区:同一个 RDD 包含的数据被存储在系统的不同节点中,需要读取时根据ID 和分区的 index 可以唯一确定对应数据块的编号,从而通过底层存储层的接口中提取到数据进行处理

不可变:一个 RDD 都是只读的,只可以对现有的 RDD 进行转换(Transformation)操作,得到新的 RDD 作为中间计算的结果

并行:由于上面两个特性,就可以并行对 RDD 进行操作

结构

SparkContext:所有 Spark 功能的入口,它代表了与 Spark 节点的连接,一个线程只有一个 SparkContext

SparkConf: 一些参数配置信息

Partitions:数据的逻辑结构,每个 Partition 会映射到某个节点内存或硬盘的一个数据块

Partitioner:定义了划分数据分片的分区规则,如按哈希取模或是按区间划分等

Dependencies:每一步产生的 RDD 里都会存储它的依赖关系,即它是通过哪个 RDD 经过哪个转换操作得到的

窄依赖,父 RDD 的分区可以一一对应到子 RDD 的分区

stateDiagram-v2  state RDD1 {    RDD1_分区1    RDD1_分区2    RDD1_分区3  }  state RDD2 {    RDD2_分区1    RDD2_分区2    RDD2_分区3  }  RDD1_分区1 --> RDD2_分区1: map  RDD1_分区2 --> RDD2_分区2: filter  RDD1_分区3 --> RDD2_分区3: union等

宽依赖,父 RDD 的每个分区可以被多个子 RDD 的分区使用

stateDiagram-v2  state RDD1 {    RDD1_分区1    RDD1_分区2    RDD1_分区3  }  state RDD2 {    RDD2_分区1    RDD2_分区2  }  RDD1_分区1 --> RDD2_分区1: join  RDD1_分区2 --> RDD2_分区1: join  RDD1_分区3 --> RDD2_分区1: join  RDD1_分区1 --> RDD2_分区2: groupBy等  RDD1_分区2 --> RDD2_分区2: groupBy等  RDD1_分区3 --> RDD2_分区2: groupBy等

窄依赖允许子 RDD 的每个分区可以被并行处理产生,而宽依赖则必须等父 RDD 的所有分区都被计算好之后才能开始处理

Checkpoint:对于一些计算过程比较耗时的 RDD,可以进行持久化,标记这个 RDD 有被检查点处理过,并且清空它的所有依赖关系,这样在进行崩溃恢复的时候就不用在向前向父 RDD 回溯

Storage Level:记录 RDD 持久化时的存储级别,内存或内存硬盘 或在分区节点上内存、内存硬盘

Iterator:迭代函数,Compute:计算函数 都是用来表示 RDD 怎样通过父 RDD 计算得到的

preferredLocations:本着计算向数据移动原则,会优先将task任务部署在其将要处理的数据所在的节点上

数据操作

大部分操作跟Stream差不多

Spark 的 Shuffle 操作跟 MapReduce 是一样的,其通过生产与消费 Shuffle 中间文件的方式,来完成集群范围内的数据交换

Shuffle 中间文件

Map 阶段产生中间文件:

Reduce 阶段分发数据:

Reduce Task 将拉取到的数据块填充到读缓冲区,然后按照任务的计算逻辑不停地消费、处理缓冲区中的数据记录

stateDiagram-v2  direction LR  MapTask1 --> 数据文件1  MapTask1 --> 索引文件1  MapTask2 --> 数据文件2  MapTask2 --> 索引文件2  数据文件1 --> ReduceTask1  数据文件1 --> ReduceTask2  索引文件1 --> ReduceTask1  索引文件1 --> ReduceTask2  数据文件2 --> ReduceTask1  数据文件2 --> ReduceTask2  索引文件2 --> ReduceTask1  索引文件2 --> ReduceTask2

shuffle 的过程带来的开销不仅有 Map 阶段后的内存与磁盘方面,也有 Reduce 阶段数据分发的网络开销

DAG

在 Spark 开发下,对一个 RDD 上调用算子、封装计算逻辑,这个过程会衍生新的子 RDD,在子 RDD 之上,开发者还会继续调用其他算子,衍生出新的 RDD,如此往复便有了 DAG

而 DAG 的多个节点,会根据 Action 操作,并以 shuffle 为边界,形成多个 Stage。在同一 Stage 内部,所有算子融合为一个函数,Stage 的输出结果由这个函数一次性作用在输入数据集而产生,同一 Stage 内部的计算,都是在内存中进行的,同时由于将多个算子或操作融合为一个函数,所以也不会产生多份内存数据

调度系统

sequenceDiagram  DAGScheduler ->> TaskScheduler: TaskSets  SchedulerBackend ->> TaskScheduler: WorkerOffer  SchedulerBackend ->> SchedulerBackend: 任务调度  TaskScheduler ->> SchedulerBackend: LaunchTask  SchedulerBackend -->> ExecutorBackend: LaunchTask  SchedulerBackend --> ExecutorBackend: LaunchedExecutor/RemoveExecutor/StatusUpdate  ExecutorBackend ->> ExecutorBackend: 任务执行  ExecutorBackend -->> SchedulerBackend: StatusUpdate  SchedulerBackend ->> TaskScheduler: StatusUpdate  TaskScheduler ->> DAGScheduler: StatusUpdate
  1. DAGScheduler 以 Shuffle 为边界,将开发者设计的计算图 DAG 拆分为多个执行阶段 Stages,然后为每个 Stage 创建任务集 TaskSet
  2. SchedulerBackend 通过与 Executors 中的 ExecutorBackend 的交互来实时地获取集群中可用的计算资源,并将这些信息记录到 ExecutorDataMap 数据结构
  3. 与此同时,SchedulerBackend 根据 ExecutorDataMap 中可用资源创建 WorkerOffer,以 WorkerOffer 为粒度提供计算资源
  4. 对于给定 WorkerOffer,TaskScheduler 结合 TaskSet 中任务的本地性倾向,按照 PROCESS_LOCAL、NODE_LOCAL、RACK_LOCAL 和 ANY 的顺序,依次对 TaskSet 中的任务进行遍历,优先调度本地性倾向要求苛刻的 Task,尽量使计算向数据移动
  5. 被选中的 Task 由 TaskScheduler 传递给 SchedulerBackend,再由 SchedulerBackend 分发到 Executors 中的 ExecutorBackend。Executors 接收到 Task 之后,即调用本地线程池来执行分布式任务。

存储系统

存储系统用于存储 RDD 缓存、Shuffle 中间文件、广播变量

stateDiagram-v2  BlockManagerMaster --> BlockManager  BlockManager --> BlockManagerMaster  BlockManager --> MemoryStore  MemoryStore --> BlockManager  BlockManager --> DiskStore  DiskStore --> BlockManager  note right of BlockManagerMaster    在Driver端统一管理、    维护存储系统全局元数据  end note  note right of MemoryStore    管理RDD Cache、    广播变量在内存中的存储  end note  note right of BlockManager    在Executori端搜集本地存储系统元数据,    及时向BlockManagerMasteri汇报,    同时从BlockManagerMaster获取全局信息  end note  note right of DiskStore    管理RDD Cache、Shuffle    中间文件在磁盘中的存储  end note

MemoryStore 同时支持存储对象值和字节数组这两种不同的数据形式,并且统一采用 MemoryEntry 数据抽象对它们进行封装,其内部会使用 LinkedHashMap 存储数据,以实现LRU算法

而 DiskStor 也会记录逻辑数据块 Block 与磁盘文件系统中物理文件的对应关系,每个 Block 都对应一个磁盘文件

内存管理

内存的区域划分:

1.6 版本之后,Spark 推出了统一内存管理模式, Execution Memory 和 Storage Memory 之间可以相互转化

  1. 如果对方的内存空间有空闲,双方就都可以抢占
  2. 对于 RDD 缓存任务抢占的执行内存,当执行任务有内存需要时,RDD 缓存任务必须立即归还抢占的内存,涉及的 RDD 缓存数据要么落盘、要么清除
  3. 对于分布式计算任务抢占的 Storage Memory 内存空间,即便 RDD 缓存任务有收回内存的需要,也要等到任务执行完毕才能释放

广播变量

一种分发机制,它一次性封装目标数据结构,以 Executors 为粒度去做数据分发

当有两个数据集需要 join 时,可以利用广播变量将较小的数据集进行广播,避免 shuffle

SparkSQL

架构

DataSet

DataSet 所描述的数据都被组织到有名字的列中,就像关系型数据库中的表一样

20221218152850

DataFrame

可以被看作是一种特殊的 DataSet,但是它的每一列并不存储类型信息,所以在编译时并不能发现类型错误

从数据表示(Data Representation)的角度来看,是否携带 Schema 是 DataFrame 与 RDD 唯一的区别,RDD 算子多是高阶函数,这些算子允许开发者灵活地实现业务逻辑,表达能力极强

graph TD;    A[SQL Query] --> B[解析和分析];    B --> C[生成逻辑执行计划];    C --> D[逻辑执行计划优化];    D --> E[生成物理执行计划];    E --> F[物理执行计划优化];    F --> G[代码生成];    G --> H[执行查询];

Catalyst 执行过程优化

SparkStreaming

Spark Streaming 用时间片拆分了无限的数据流,然后对每一个数据片用类似于批处理的方法进行处理,输出的数据也是一块一块的,通过提供了一个对于流数据的抽象 DStream 来描述数据流,底层 DStream 也是由很多个序列化的 RDD 构成,按时间片(比如一秒)切分成的每个数据单位都是一个 RDD

20221218153730

主要缺点是实时计算延迟较高,这是由于 Spark Streaming 不支持太小的批处理的时间间隔

StructuredStreaming

输入的数据流按照时间间隔(以一秒为例)划分成数据段。每一秒都会把新输入的数据添加到表中,Spark 也会每秒更新输出结果。输出结果也是表的形式,输出表可以写入硬盘或者 HDFS。

20221218154712

Structured Streaming 提供一个 level 更高的 API,这样的数据抽象可以让开发者用一套统一的方案去处理批处理和流处理

相比 SparkStreaming,StructuredStreaming可以支持更小的时间间隔,2.3 也引入了连续处理模式,同时也有对事件时间的支持

优化

3.0 引入了 Project Tungsten:

  1. 在数据结构方面,Tungsten 自定义了紧凑的二进制格式,计算、内存效率比 Java 对象高
  2. 基于内存页的顺序内存访问加上堆外内存来管理数据结构规避了使用 Java HashMap 额外的内存开销、GC开销,及提升 CPU 缓存的命中率
  3. 用全阶段代码生成(Whol Stage Code Generation)取代火山迭代模型,不仅减少了虚函数调用和降低内存访问频率,由于每一条指令都是明确的,可以顺序加载到 CPU 寄存器,源数据也可以顺序地加载到 CPU 的各级缓存中

AQE(Adaptive Query Execution),可以在 Spark SQL 优化的过程中动态地调整执行计划:

  1. 自动 join 策略调整,根据数据的大小和分布等因素自动选择最佳的连接策略,以提高查询性能
  2. 自动分区合并:当 shuffle 的 reduce 阶段产生了小分区后,这些小分区会被合并成一个较大的分区,减少调度开销
  3. 自动数据倾斜处理:如果 AQE 发现某张表存在倾斜的数据分片,就会自动对它通过把数据重新分区
  4. ...

shuffle 优化:

  1. 减少 shuffle 的数据量
  2. 避免 shuffle,如果无法避免,则尽可能拖到最后再去 shuffle,这样也能减少 shuffle 的数据量

DPP 动态分区裁剪:

分区表都会在文件系统中创建单独的子目录来存储相应的数据分片,如果过滤谓词中包含分区键,那么 Spark SQL 对分区表做扫描的时候,是完全可以跳过(剪掉)不满足谓词条件的分区目录,这就是分区剪裁

动态分区剪裁这个功能主要用在星型模型数仓的数据关联场景中,动态分区裁剪把维度表中的过滤条件,通过关联关系传导到事实表,来减少事实表中数据的扫描量

join 优化:

Spark 的分布式 join 衍生自单机的 Nested Loop Join、Hash Join、Sort Merge Join 等。

在等值 join 下,Spark SQL优先选择BHU,其次是SMJ,再次是SHJ。其中,SHJ执行效率更高,但是内表数据全部放入内存的要求过于苛刻,SMJ可以充分利用磁盘来完成排序与归并关联,因此,Spark SQL默认选择SMJ作为兜底策略,完成等值Join的关联计算

在不等值 join 下,因为不等值Joi只能依赖NLJ算法,因此内表满足广播阈值条件的情况下,在这种关联场景中,Spark SQL会优先选择BNLJ。否则,Spark SQL只能使用笨重的CPJ来兜底去完成关联计算

Join策略策略缩写Join Hints适用场景
Broadcast Hash JoinBHJbroadcast、broadcastjoin内表较小,通常在2GB左右为宜,最大不能超过8GB
Shuffle Merge Sort JoinSMJmerge、mergejoin等值Join下,Spark SQL的兜底策略,不需要开发者特别设置
Shuffle Hash JoinSHJshuffle hash当大表超过小表3倍以上,且小表数据分布比较均匀,SHJ比SMJ效率更佳
Broadcast Nested Loop JoinBNLJ没有提供Join Hints,开发者无能为力,只能靠Spark SQL自行判断
Cartesian Product JoinCPJshuffle_replicate_nl不等值Join下,Spark SQL的兜底策略,不需要开发者特别设置

对于两张大表的直接 join,是需要尽力避免的,一种优化方式是通过分区机制,将每张大表拆成较小的表,将匹配上的两张较小的表进行join,最后再进行合并

Flink

架构

20221219144426

核心模型

最核心的数据结构是 Stream,它代表一个运行在多个分区上的并行流

当一个 Flink 程序被执行的时候,它会被映射为 Streaming Dataflow:

2022121914389

程序天生是并行和分布式的。一个 Stream 可以包含多个分区(Stream Partitions),一个操作符可以被分成多个操作符子任务,每一个子任务是在不同的线程或者不同的机器节点中独立执行的:

20221219144240

Beam

一个适配流处理、批处理的中间层

编程模型

PCollection

特性:

Transform

stateDiagram-v2  数据1 --> Transform  数据2 --> Transform  Transform --> 数据3  Transform --> 数据4

常见的 Transform 接口:

Pipeline

stateDiagram-v2  输入 --> PCollection1: Transform1  PCollection1 --> PCollection2: Transform2  PCollection2 --> PCollection3: Transform3  PCollection3 --> 输出: Transform4

分布式环境下,整个数据流水线会启动 N 个 Workers 来同时处理 PCollection,在具体处理某一个特定 Transform 的时候,数据流水线会将这个 Transform 的输入数据集 PCollection 里面的元素分割成不同的 Bundle,将这些 Bundle 分发给不同的 Worker 来处理

在单个 Transfrom中,如果某一个 Bundle 里面的元素因为任意原因导致处理失败了,则这整个 Bundle 里的元素都必须重新处理

在多步骤的 Transform 上,如果处理的一个 Bundle 元素发生错误了,则这个元素所在的整个 Bundle 以及与这个 Bundle 有关联的所有 Bundle 都必须重新处理

IO

StreamingSQL

/* 窗口:最近10个温度的平均值 */Select bid, avg(t) as T From BoilerStream WINDOW HOPPING (SIZE 10, ADVANCE BY 1);/* join */from TempStream[temp > 30.0]#window.time(1 min) as T  join RegulatorStream[isOn == false]#window.length(1) as R  on T.roomNo == R.roomNoselect T.roomNo, R.deviceID, 'start' as actioninsert into RegulatorActionStream; // Siddhi Streaming SQL/*  某个模式有没有在特定的时间段内发生 */from every( e1=TempStream ) -> e2=TempStream[ e1.roomNo == roomNo and (e1.temp + 5) <= temp ]    within 10 minselect e1.roomNo, e1.temp as initialTemp, e2.temp as finalTempinsert into AlertStream;