kafka

Kafka 是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于大数据实时处理领域

特点

基础概念

消息和批次

模式

主题和分区

屏幕截图 2020-08-12 152257

生产者和消费者

屏幕截图 2020-08-12 152638

broker 和集群

屏幕截图 2020-08-12 152955

对于消息 kafka会保留一段时间或者达到一定大小的字节数 旧的消息会被删除

多集群

屏幕截图 2020-08-12 153137

使用场景

架构

屏幕截图 2020-08-03 133557

分区与副本机制

ISR:中的副本都是与 leader 同步的副本

为了描述一个副本是否与 leader 副本同步,replica.lag.time.max.ms 用来描述这个最大延迟,如果 follower 副本与 leader 副本的复制延迟超过这个时间,则认为不同步

Leader epoch:可以用来确定最新的分区副本,由两部分数据组成。一个是Epoch,一个单调增加的版本号。每当副本领导权发生变更时,都会增加该版本号

zk的作用

主要为 Kafka 提供元数据的管理的功能

应用场景

搭建

配置

broker 配置

主题配置

命令操作

./kafka-topics.sh --list --zookeeper 172.17.0.1:2181
/opt/kafka/bin/kafka-topics.sh --create --zookeeper 172.17.0.1:2181 --replication-factor 1 --partitions 2 --topic my_log
./kafka-console-producer.sh --topic first --broker-list 172.17.0.1:9092
./kafka-console-consumer.sh --topic first --bootstrap-server 172.17.0.1:9092

工作流程

屏幕截图 2020-08-05 153846

Kafka 中消息是以 topic 进行分类的,生产者生产消息,消费者消费消息,都是面向 topic的

每个 partition 对应于一个 log 文件,该 log 文件中存储的就是 producer 生产的数据

消费者组中的每个消费者,都会实时记录自己消费到了哪个 offset,以便出错恢复时,从上次的位置继续消费

日志

消息日志文件(.log)、位移索引文件(.index)、时间戳索引文件(.timeindex)、已中止(Aborted)事务的索引文件(.txnindex)

stateDiagram-v2  topic --> partition0  topic --> partition1  topic --> partition2  partition1 --> log  log --> segment1  log --> segment2  log --> segment3  segment2 --> .log  segment2 --> .index  segment2 --> .timeindex  segment2 --> .txindex

一个非空的日志段 segment 在超过一段时候后,即使还没有写满,也会强制滚动(roll,也就是新建)日志段

日志段写入

Producer 生产的数据会被不断追加到 log 文件的末端,在对该文件进行读写时,Kafka 会充分利用 PageCache 来加速读写,每条数据都有自己的 offset

Kafka 在写入消息时,会根据这批写入的最大 offset 、时间戳等来判断要不要追加索引

日志段读取

index与log文件的作用

恢复

在启动 broker 时,kafka 会遍历所有日志段。为了从磁盘读取索引数据,对于某一个 segement,恢复操作会从 log 文件重建索引,清除掉之前的索引文件。并删除掉日志文件跟索引文件末尾无效的数据

高水位管理

Kafka 使用高水位(HW, Hight WaterMark)来标识分区下的哪些消息是可以被消费者消费以及进行副本间的同步

public final class LogOffsetMetadata {    ...    public final long messageOffset; // 消息位移值    public final long segmentBaseOffset; // 位移值在日志段的上的位置    public final int relativePositionInSegment; // 位移值所在日志段的物理磁盘位置    ...}

日志段管理

public class LogSegments {  /* the segments of the log with key being LogSegment base offset and value being a LogSegment */  private final ConcurrentNavigableMap<Long, LogSegment> segments = new ConcurrentSkipListMap<>();}

在写入数据时,Kafka 就是是对最后一个日志段执行的写入操作

segments.activeSegment.append(lastOffset, largestTimestamp, shallowOffsetOfMaxTimestamp, records)

在读取数据时,则是根据起始偏移量、读取多少数据,不断地日志段中读取数据

while (fetchDataInfo == null && segmentOpt.isPresent) {  ...  fetchDataInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)  if (fetchDataInfo != null) {    ...  } else segmentOpt = segments.higherSegment(baseOffset)}

索引文件

public abstract class AbstractIndex implements Closeable {    ...    private final long baseOffset; // 对应日志段对象的起始位移值,如 00000000000000000123.index 123就是起始位移值    private final int maxIndexSize; // 控制索引文件的最大长度    private final boolean writable;    private volatile File file;    // Length of the index file    private volatile long length;    private volatile MappedByteBuffer mmap; // 内存映射磁盘读写    /**     * The maximum number of entries this index can hold     */    private volatile int maxEntries;    /** The number of entries in this index */    private volatile int entries;    ...}

Kafka 对 offset 的查找是基于二分查找实现的:

首先通过index文件查找offset所在的大概范围,然后再在这个范围内进行顺序查找,为了使用更少的内存空间,Kafka 采用的是稀疏不连续的索引

其会根据第一条消息的偏移量以及所需读取的大小或者最大能读取的大小,去读取数据

Kafka 利用 mmap,将更大的磁盘文件映射到了一个虚拟内存空间,也就是最近读写的数据更有可能在内存中,对于什么读写的冷数据如果进行访问,会触发缺页中断,所以 Kafka 的二分查找会优先查找热区,即最近操作的那部分数据,找到的话就不用去查冷区的数据,以此提升性能

没有优化:冷区数据触发缺页中断

优化之后:由于大部分查询集中在索引项尾部,所以把后半部分设置为热区,永远保存在缓存中,如果查询目标偏移量在热区索引项范围,直接查热区,避免页中断

主题删除

主题删除过程中,首先会通过复制状态机机制,向 Controller 发送通知,调整主题的所有副本状态,然后移除 zk、controller 关于该主题的所有元数据,最后执行物理磁盘文件的删除操作

副本状态机

副本的 7 种状态:

stateDiagram-v2  NewReplica --> OfflineReplica: broker下线  NewReplica --> OnlineReplica: 初始化之后  OnlineReplica --> OfflineReplica: broker下线/broker重新上线  OfflineReplica --> OnlineReplica: broker下线/broker重新上线  OfflineReplica --> OfflineReplica  OnlineReplica --> OnlineReplica: leader 副本变更  OfflineReplica --> ReplicaDeletionStarted: 删除副本对象  ReplicaDeletionStarted --> ReplicaDeletionSuccessful: 删除副本成功  ReplicaDeletionStarted --> ReplicaDeletionIneligible: 删除副本失败  ReplicaDeletionIneligible --> OnlineReplica  ReplicaDeletionIneligible --> OfflineReplica: 重试副本删除  ReplicaDeletionSuccessful --> NonExistentReplica: 副本对象被移出副本状态机  NonExistentReplica --> NewReplica: 副本对象新创建

当 Controller 接受到状态变更请求时,首先就是判断操作是否有效,无效需要记录一条失败日志,有效则执行对应的操作、变更相关的元数据

分区状态机

stateDiagram-v2  NewPartition --> OnlinePartition: broker启动或新分区初始化  OnlinePartition --> OnlinePartition: 分区选举leader  OnlinePartition --> OfflinePartition: broker下线或主题被删除  OfflinePartition --> OnlinePartition: 分区选举leader  OfflinePartition --> NonExistentPartition: 主题被成功删除  NonExistentPartition --> NewPartition: 新分区创建

Leader 选举策略:当由于某种原因,Leader 下线了,需要根据不同情况来选举 Leader

// 离线分区Leader选举策略final case class OfflinePartitionLeaderElectionStrategy(allowUnclean: Boolean) extends PartitionLeaderElectionStrategy// 分区副本重分配Leader选举策略  final case object ReassignPartitionLeaderElectionStrategy extends PartitionLeaderElectionStrategy// 分区Preferred副本Leader选举策略final case object PreferredReplicaPartitionLeaderElectionStrategy extends PartitionLeaderElectionStrategy// Broker Controlled关闭时Leader选举策略final case object ControlledShutdownPartitionLeaderElectionStrategy extends PartitionLeaderElectionStrategy

这几个策略几乎都是选择当前副本有序集合中的、首个处于 ISR 集合中的存活副本作为新的 Leader

当要变更分区状态,就由 Controller 发送相关消息给 broker 们,再由 broker 来执行对每个分区的元数据变更

消费者组管理

stateDiagram-v2  Empty --> PreparingRebalance: 首个成员加入组  PreparingRebalance --> Empty  PreparingRebalance --> CompletingRebalance: 所有成员加入组  CompletingRebalance --> PreparingRebalance: 新成员加入组或已有成员退出组  CompletingRebalance --> Stable: leader成员指定好方案  CompletingRebalance --> Dead  Dead --> Dead  Stable --> Dead  Empty --> Dead  PreparingRebalance --> Dead  Stable --> PreparingRebalance: 新成员加入组或已有成员退出组

集群成员关系

broker通过创建临时节点把自己的 ID 注册到 Zookeeper

// 集群元数据class ControllerContext {  val stats = new ControllerStats // Controller统计信息类   var offlinePartitionCount = 0   // 离线分区计数器  val shuttingDownBrokerIds = mutable.Set.empty[Int]  // 关闭中Broker的Id列表  private val liveBrokers = mutable.Set.empty[Broker] // 当前运行中Broker对象列表  private val liveBrokerEpochs = mutable.Map.empty[Int, Long]   // 运行中Broker Epoch列表  var epoch: Int = KafkaController.InitialControllerEpoch   // Controller当前Epoch值  var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion  // Controller对应ZooKeeper节点的Epoch值  val allTopics = mutable.Set.empty[String]  // 集群主题列表  val partitionAssignments = mutable.Map.empty[String, mutable.Map[Int, ReplicaAssignment]]  // 主题分区的副本列表  val partitionLeadershipInfo = mutable.Map.empty[TopicPartition, LeaderIsrAndControllerEpoch]  // 主题分区的Leader/ISR副本信息  val partitionsBeingReassigned = mutable.Set.empty[TopicPartition]  // 正处于副本重分配过程的主题分区列表  val partitionStates = mutable.Map.empty[TopicPartition, PartitionState] // 主题分区状态列表   val replicaStates = mutable.Map.empty[PartitionAndReplica, ReplicaState]  // 主题分区的副本状态列表  val replicasOnOfflineDirs = mutable.Map.empty[Int, Set[TopicPartition]]  // 不可用磁盘路径上的副本列表  val topicsToBeDeleted = mutable.Set.empty[String]  // 待删除主题列表  val topicsWithDeletionStarted = mutable.Set.empty[String]  // 已开启删除的主题列表  val topicsIneligibleForDeletion = mutable.Set.empty[String]  // 暂时无法执行删除的主题列表  ......}

Controller 是用来管理整个集群的,它会向其他 broker 发送三类请求:

  1. LeaderAndIsrRequest:告诉 Broker 相关主题各个分区的 Leader 副本位于哪台 Broker 上、ISR 中的副本都在哪些 Broker
  2. StopReplicaRequest:告知指定 Broker 停止它上面的副本对象,这个请求主要的使用场景是分区副本迁移和删除主题
  3. UpdateMetadataRequest:更新 Broker 上的元数据缓存

复制

副本管理

follower 会启动一个线程,不断执行以下操作:

除了副本同步直接操作分区对象,生产者向 Leader 副本写入消息、消费者组写入组信息、事务管理器写入事务信息(包括事务标记、事务元数据等)这三种操作会通过 ReplaceManager 向副本写入数据

而消费者的读取请求,也会通过 ReplaceManager 来确定读取范围,再从底层的日志读取消息构建结果并返回

副本管理还会根据接收到的请求,决定是否将当前副本提升为 leader 副本,同时,还会有一个线程定时检测当前副本与 leader 副本的滞后时间,当滞后时间超过 replica.lag.time.max.ms,将该副本移除出 ISR

请求处理

stateDiagram-v2  clients --> processor线程  processor线程 --> clients  state broker {    state processor线程 {      processor1      processor2      processor3    }    processor线程 --> 请求队列    请求队列 --> IO线程    state IO线程 {      IO线程1      IO线程2      IO线程3    }    state 响应队列 {      processor1响应队列      processor2响应队列      processor3响应队列    }    IO线程 --> 响应队列    响应队列 --> processor线程  }

生产请求:

在消息被写入分区的首领之后,broker 开始检查 acks 配置参数——如果 acks 被设为 0 或 1 ,那么 broker 立即返回响应;如果 acks 被设为 all ,那么请求会被保存在一个叫作炼狱的缓冲区里,直到首领发现所有跟随者副本都复制了消息,响应才会被返回给客户端

获取请求:

broker 将按照客户端指定的数量上限从分区里读取消息,再把消息返回给客户端。Kafka 使用零复制技术向客户端发送消息(直接从文件系统缓存复制到网卡),如果应用程序是从文件读出数据后再通过网络发送出去的场景,并且这个过程中不需要对这些数据进行处理,这种场景可以使用零拷贝

sequenceDiagram  消费者 ->> broker: 获取请求  alt 积累足够多的消息    生产者 ->> broker: 消息    生产者 ->> broker: 消息    生产者 ->> broker: 消息  end  broker ->> 消费者: 消息

所有同步副本复制了这些消息,才允许消费者读取它们

屏幕截图 2020-08-21 144435

监控指标

Kakfa 在 RequestChannel 内保存了一些关于请求的指标:

物理存储

文件管理:

分区分成若干个片段 当前正在写入数据的片段叫作活跃片段

可靠数据传递

kafka 的保证:

副本的同步保证:

broker

复制系数:

主题级别 replication.factor broker级别 default.replication.factor

如果复制系数为 N,那么在 N-1 个 broker 失效的情况下,仍然能够从主题读取数据或向主题写入数据,同时 它们也会占用N倍的磁盘空间、

不完全首领选举:

如果把 unclean.leader.election.enable 设为 true ,就是允许不同步的副本成为首领 就要承担丢失数据和出现数据不一致的风险

最少同步副本:

min.insync.replicas 如果要确保已提交的数据被写入不止一个副本,就需要把最少同步副本数量设置为大一点

生产者

发送确认:

acks:0 能够通过网络把消息发送出去,那么就认为消息已成功写入

1 :意味着首领在收到消息并把它写入到分区数据文件(不一定同步到磁盘上)时会返回确认或错误响应

all: 首领在返回确认或错误响应之前,会等待所有同步副本都收到消息

重试参数:

对于一些错误 可以通过重试来解决 如: LEADER_NOT_AVAILABLE

消费者

显示提交偏移量:

数据管道

需要考虑的问题:

Connect

启动 connect:

./bin/connect-distributed.sh ./config/connect-distributed.properties

文件数据源:

POST localhost:8083/connectors{"name":"load-kafka-config", "config":{"connector.class":"FileStreamSource","file":"config/server.properties","topic":"kafka-config-topic"}}

传递文件数据源到主题上

深入

集群镜像

使用场景:

多集群架构

跨数据中心通信:

中心架构:

stateDiagram-v2  direction LR  北京Kafka集群(部分数据) --> 中心指标Kafka集群(整体数据)  上海Kafka集群(部分数据) --> 中心指标Kafka集群(整体数据)  广州Kafka集群(部分数据) --> 中心指标Kafka集群(整体数据)  厦门Kafka集群(部分数据) --> 中心指标Kafka集群(整体数据)

主从架构:

stateDiagram-v2  订单业务Kafka集群 --> 报表统计Kafka集群

双活架构:

stateDiagram-v2  北京Kafka集群 --> 广州Kafka集群  广州Kafka集群 --> 北京Kafka集群

主备架构:

stateDiagram-v2  direction LR  主Kafka集群 --> 备Kafka集群  用户 --> 主Kafka集群: 正常情况  用户 --> 备Kafka集群: 主集群挂掉

MirrorMaker

stateDiagram-v2  state 源Kafka集群 {    主题A    主题B    主题C  }  主题A --> 消费者1  主题B --> 消费者2  主题C --> 消费者3  state MirrorMaker {    消费者1 --> 生产者    消费者2 --> 生产者    消费者3 --> 生产者  }  生产者 --> 主题A0  生产者 --> 主题B0  生产者 --> 主题C0  state 目标Kafka集群 {    主题A0    主题B0    主题C0  }

如果有可能,尽量让 MirrorMaker 运行在目标数据中心里

监控

所有度量指标都可以通过 Java Management Extensions(JMX)接口来访问

broker

非同步分区数量:

关键指标:

其他监控:

集群问题:

主机问题:

客户端

指标:

优化

操作系统层优化

JVM调优

Broker端调优

应用层调优

性能指标调优

流式处理

数据流:无边界数据集的抽象表示数据流是有序的, 不可变的, 可重播的流式处理是持续地从一个无边界的数据集读取数据,然后对它们进行处理并生成结果

概念

时间:

状态:

时间窗口:

屏幕截图 2020-08-23 112304

设计模式

单事件处理:

屏幕截图 2020-08-23 112459

本地状态事件处理:

屏幕截图 2020-08-23 112551

多阶段处理:

屏幕截图 2020-08-23 112748

外部数据源填充:

屏幕截图 2020-08-23 112929

连接流:

屏幕截图 2020-08-23 113209

对乱序事件重排序

重新处理:

使用新处理程序从头读取数据流生成结果流

Kafka Streams 架构

拓扑结构:

stateDiagram-v2  direction LR  state 处理器 {    计数器 --> 本地状态    本地状态 --> 计数器  }  输入行 --> 拆分成单词  拆分成单词 --> 单词列表  单词列表 --> 按照单词分组  按照单词分组 --> 重分区主题  重分区主题 --> 计数器  计数器 --> 输出单词数量

对拓扑结构伸缩:

stateDiagram-v2  state 主题 {    主题分区1    主题分区2  }  state 任务1 {    A --> B    B --> C  }  state 任务2 {    D --> E    E --> F  }  state 重分区主题 {    重分区主题分区1    重分区主题分区2  }  state 任务3 {    G --> H    H --> I  }  state 任务4 {    J --> K    K --> L  }  主题分区1 --> 任务1  主题分区2 --> 任务2  任务1 --> 重分区主题分区1  任务1 --> 重分区主题分区2  任务2 --> 重分区主题分区1  任务2 --> 重分区主题分区2  重分区主题分区1 --> 任务3  重分区主题分区2 --> 任务4