KAFKA

KAFKA

介绍

定义

Apache Kafka® 是一个事件流平台。这意味着什么? Kafka 结合了三个关键功能,因此您可以 使用 一个久经考验的解决方案来实现端到端的事件流用例:

要发布(写)publish (write) 和订阅(读) subscribe to (read) 流事件,包括来自其他系统的数据的持续导入/导出的。 根据需要,持久而可靠地 存储store 事件流。 在事件发生时或追溯性地处理事件流。 To process streams of events as they occur or retrospectively.

Kafka 是一个分布式系统,由通过高性能TCP 网络协议进行通信的服务器和客户端组成。它可以部署在内部和云环境中的裸机硬件、虚拟机和容器上。

服务器:Kafka 作为一个或多个服务器的集群运行,这些服务器可以跨越多个数据中心或云区域。其中一些服务器形成存储层,称为代理。其他服务器运行 Kafka Connect以将数据作为事件流持续导入和导出,以将 Kafka 与您现有的系统(例如关系数据库以及其他 Kafka 集群)集成。为了让您实现关键任务用例,Kafka 集群具有高度可扩展性和容错性:如果其中任何一个服务器出现故障,其他服务器将接管它们的工作,以确保连续运行而不会丢失任何数据。

客户端:它们允许您编写分布式应用程序和微服务,即使在网络问题或机器故障的情况下,它们也可以并行、大规模和容错方式读取、写入和处理事件流。Kafka 附带了一些这样的客户端,这些客户端由 Kafka 社区提供的数十个客户端进行了扩充 :客户端可用于 Java 和 Scala,包括更高级别的 Kafka Streams库,用于 Go、Python、C/C++ 和许多其他编程语言以及 REST API。

主要概念和术语

一个 事件event 记录,在世界上或者在您的企业“出事”的事实。它在文档中也称为记录或消息。当您向 Kafka 读取或写入数据时,您以事件的形式执行此操作。从概念上讲,事件具有键、值、时间戳和可选的元数据标头。这是一个示例事件:

Event key: "Alice" Event value: "Made a payment of $200 to Bob" Event timestamp: "Jun. 25, 2020 at 2:06 p.m."

生产者Producers 是那些向 Kafka 发布(写入)事件的客户端应用程序,而 消费者consumers 是订阅(读取和处理)这些事件的那些客户端应用程序。在 Kafka 中,生产者和消费者是完全解耦且彼此不可知的,这是实现 Kafka 众所周知的高可扩展性的关键设计元素。例如,生产者永远不需要等待消费者。Kafka 提供了各种保证,例如能够一次处理事件。

事件被组织并持久地存储在 主题topics 中。非常简单,主题类似于文件系统中的文件夹,事件就是该文件夹中的文件。一个示例主题名称可以是“支付”。Kafka 中的主题总是多生产者和多订阅者:一个主题可以有零个、一个或多个向其写入事件的生产者,以及零个、一个或多个订阅这些事件的消费者。可以根据需要随时读取主题中的事件——与传统消息传递系统不同,事件在消费后不会被删除。相反,您可以通过每个主题的配置设置来定义 Kafka 应该保留您的事件多长时间,之后旧事件将被丢弃。Kafka 的性能在数据大小方面实际上是恒定的,因此长时间存储数据是完全没问题的。

主题是 分区partitioned 的,这意味着一个主题分布在位于不同 Kafka 代理的多个“桶”上。数据的这种分布式放置对于可伸缩性非常重要,因为它允许客户端应用程序同时从/向多个代理读取和写入数据。当一个新事件发布到一个主题时,它实际上被附加到该主题的分区之一。具有相同事件键(例如,客户或车辆 ID)的事件被写入同一分区,并且 Kafka保证给定主题分区的任何消费者将始终以与写入事件完全相同的顺序读取该分区的事件。

为了使您的数据具有容错性和高可用性,每个主题都可以 复制replicated ,甚至可以跨地理区域或数据中心,以便始终有多个代理拥有数据副本,以防万一出现问题,您想对经纪人进行维护等。常见的生产设置是复制因子为 3,即,您的数据将始终存在三个副本。此复制在主题分区级别执行。

学习笔记

Kafka 分布式、基于发布/订阅模式的消息队列。消息的发布者 不会将消息直接发给特定的订阅者,而是将消息分为不同的类别,订阅者只接收刚兴趣的消息 应用场景:缓存/消峰、解耦、异步通信

两种模式:

基础架构

kafka的leader选举

kafka 2.8 之前需要借助 zoopker 协调集群 >=2.8 kraft <= 0.8.2 每个Partition的多个Replica同时竞争Leader

Kafka Broker Controller选举

我们知道Zookeeper集群中也有选举机制,是通过Paxos算法,通过不同节点向其他节点发送信息来投票选举出leader,但是Kafka的Controller的选举就没有这么复杂了。

1、Kafka的Broker在启动后,会在zookeeper中注册 /brokers/ids/[0,1,2],每个Broker节点都有一个Controller模块 2、Kafka集群的Controller Leader选举是通过在zookeeper上创建/controller临时节点来实现Controller Leader选举,并在该节点中写入当前broker的信息 {"version":1,"brokerid":1,"timestamp":"1512018424988"} 利用Zookeeper的强一致性特性,一个节点只能被一个客户端创建成功,创建成功的broker即为Controller Leader,即先到先得原则, 此leader也就是集群中的Controller Leader,负责集群中所有大小事务:管理集群broker的上下线,所有topic分区副本的分配和Leader选举等。 当Controller Leader和zookeeper失去连接时,临时节点会删除,而其他broker会监听该节点的变化,当节点删除时,其他broker会收到事件通知,重新发起Controller Leader选举。 3、Controller Leader会监听 zk /brokers/ids/[0,1,2] 节点的变化

Kafka Partition Leader选举

首先Kafka会将接收到的消息分区(partition),每个主题(topic)的消息有不同的分区。这样一方面消息的存储就不会受到单一服务器存储空间大小的限制,另一方面消息的处理也可以在多个服务器上并行。 其次为了保证高可用,每个分区都会有一定数量的副本(replica)。这样如果有部分服务器不可用,副本所在的服务器就会接替上来,保证应用的持续性。 但是,为了保证较高的处理效率,消息的读写都是在固定的一个副本上完成。这个副本就是所谓的Leader,而其他副本则是Follower。而Follower则会定期地到Leader上同步数据。

Leader选举 如果某个分区所在的服务器出了问题,不可用,kafka会从该分区的其他的副本中选择一个作为新的Leader。之后所有的读写就会转移到这个新的Leader上。现在的问题是应当选择哪个作为新的Leader。显然,只有那些跟Leader保持同步的Follower才应该被选作新的Leader。 Kafka会在Zookeeper上针对每个分区维护一个称为ISR(in-sync replica,已同步的副本)的集合,该集合中是一些分区的副本。只有当这些副本都跟Leader中的副本同步了之后,kafka才会认为消息已提交,并反馈给消息的生产者。如果这个集合有增减,kafka会更新zookeeper上的记录。 如果某个分区的Leader不可用,Kafka就会从ISR集合中选择一个靠前的副本作为新的Leader。 显然通过ISR,kafka需要的冗余度较低,可以容忍的失败数比较高。假设某个topic有f+1个副本,kafka可以容忍f个服务器不可用。

4、controller决定选举。选举规则:在isr中存活为前提,按照AR(所有副本)中排在前面优先。(正常情况下,ar的顺序与isr的顺序一致,有节点掉线又上线后,ar的顺序有集群维护,isr则加到最后面)。例如:ar[1,0,2],isr[1,0,2],那么leader就会按照1,0,2的顺序轮询。 5、Controller将节点信息上传到ZK备份 6、其他controller从zk同步相关信息。 7、假设broker1中leader挂了,Controller监测到节点变化,向zk请求isr,选举新的leader(isr存活,ar排在前面的优先),然后controller更新zk中存储的leader和isr信息。

为什么不用少数服从多数的方法 少数服从多数是一种比较常见的一致性算法和Leader选举法。 它的含义是只有超过半数的副本同步了,系统才会认为数据已同步; 选择Leader时也是从超过半数的同步的副本中选择。 这种算法需要较高的冗余度。 譬如只允许一台机器失败,需要有三个副本;而如果只容忍两台机器失败,则需要五个副本。 而kafka的ISR集合方法,分别只需要两个和三个副本。 如果所有的ISR副本都失败了怎么办? 此时有两种方法可选,一种是等待ISR集合中的副本复活,一种是选择任何一个立即可用的副本,而这个副本不一定是在ISR集合中。这两种方法各有利弊,实际生产中按需选择。 如果要等待ISR副本复活,虽然可以保证一致性,但可能需要很长时间。而如果选择立即可用的副本,则很可能该副本并不一致。

版本

在1.x之前的版本,基本遵循4位版本号,例如:0.8.2.2、0.9.0.1、0.10.0.0...

在1.x之后,kafka 全面启用了遵循 Major.Minor.Patch 的三位版本规则,其中Major表示大版本,通常是一些重大改变,因此彼此之间功能可能会不兼容;Minor表示小版本,通常是一些新功能的增加;最后Patch表示修订版,主要为修复一些重点Bug而发布的版本。例如:Kafka 2.1.1,大版本就是2,小版本是1,Patch版本为1,是为修复Bug发布的第1个版本。

kafka_2.13-2.8.1.tgz 中的2.13表示的scala的版本,因为Kafka服务器端代码完全由Scala语音编写。”-“后面的2.8.1表示的kafka的版本信息,遵循上面的命令规则。

安装

注意:您的本地环境必须安装 Java 8+。 下载文档

单节点

基础命令

Kafka 从 2.2 版本开始将 kafka-topic.sh 脚本中的 −−zookeeper 参数标注为 "过时",推荐使用 −−bootstrap-server 参数。

kafka-configs

集群级别

broker级别

topic级别 待验证

动态调整参数:

集群

新版本的kafka自带有zookeeper

配置zookeeper

配置kafka

配置项解释

在kafka数据存储中,分区由一个或者多个segment组成。 log.segment.bytes: 每个segment的大小,达到这个大小会产生新的segment, 默认是1G log.segment.ms: 配置每隔n ms产生一个新的segment,默认是168h,也就是7天 log.retention.ms: segment的最后写入record的时间-当前时间 > retention.ms 的segment会被删除,默认是168h, 7天 ps:这句话很重要,多读几遍去理解它。之前博主就是误以为类似segment不会存活到我所配置的时间,其实是只要没有新segment产生+不停地往该segment写数据就不会删除该段。 如果满足删除条件的话,segment也不会立即删除,只是会打上delete标签。 log.retention.check.interval.ms 每隔多久检查一次是否有可以删除的log,默认是300s,5分钟,删除上面说的打上delete标签的segment

配置JVM

配置zookeeper服务

配置kafka服务

启动 zookeeper

启动 kafka

kafka-eagle

安装

配置

启动

web登录

nginx 转发efak

需启用此模块: --with-http_sub_module

kafka_python

Python Client

获取 Topic

生产者Producer

消费者Consumer

基于磁盘为什么这么快

原创 Wyman 大数据技术架构 2019-05-23 18:04

 

Kafka是大数据领域无处不在的消息中间件,目前广泛使用在企业内部的实时数据管道,并帮助企业构建自己的流计算应用程序。Kafka虽然是基于磁盘做的数据存储,但却具有高性能、高吞吐、低延时的特点,其吞吐量动辄几万、几十上百万,这其中的原由值得我们一探究竟。本文属于Kafka知识扫盲系列,让我们一起掌握Kafka各种精巧的设计。

 

顺序读写

众所周知Kafka是将消息记录持久化到本地磁盘中的,一般人会认为磁盘读写性能差,可能会对Kafka性能如何保证提出质疑。实际上不管是内存还是磁盘,快或慢关键在于寻址的方式,磁盘分为顺序读写与随机读写,内存也一样分为顺序读写与随机读写。基于磁盘的随机读写确实很慢,但磁盘的顺序读写性能却很高,一般而言要高出磁盘随机读写三个数量级,一些情况下磁盘顺序读写性能甚至要高于内存随机读写,这里给出著名学术期刊 ACM Queue 上的一张性能对比图:

图片

磁盘的顺序读写是磁盘使用模式中最有规律的,并且操作系统也对这种模式做了大量优化,Kafka就是使用了磁盘顺序读写来提升的性能。Kafka的message是不断追加到本地磁盘文件末尾的,而不是随机的写入,这使得Kafka写入吞吐量得到了显著提升。

 

Page Cache

为了优化读写性能,Kafka利用了操作系统本身的Page Cache,就是利用操作系统自身的内存而不是JVM空间内存。这样做的好处有:

相比于使用JVM或in-memory cache等数据结构,利用操作系统的Page Cache更加简单可靠。首先,操作系统层面的缓存利用率会更高,因为存储的都是紧凑的字节结构而不是独立的对象。其次,操作系统本身也对于Page Cache做了大量优化,提供了write-behind、read-ahead以及flush等多种机制。再者,即使服务进程重启,系统缓存依然不会消失,避免了in-process cache重建缓存的过程。

通过操作系统的Page Cache,Kafka的读写操作基本上是基于内存的,读写速度得到了极大的提升。

 

零拷贝

这里主要讲的是Kafka利用linux操作系统的 "零拷贝(zero-copy)" 机制在消费端做的优化。首先来了解下数据从文件发送到socket网络连接中的常规传输路径:

这个过程包含4次copy操作和2次系统上下文切换,性能其实非常低效。linux操作系统 "零拷贝" 机制使用了sendfile方法,允许操作系统将数据从Page Cache 直接发送到网络,只需要最后一步的copy操作将数据复制到 NIC 缓冲区,这样避免重新复制数据。示意图如下:

图片

通过这种 "零拷贝" 的机制,Page Cache 结合 sendfile 方法,Kafka消费端的性能也大幅提升。这也是为什么有时候消费端在不断消费数据时,我们并没有看到磁盘io比较高,此刻正是操作系统缓存在提供数据。

 

分区分段

Kafka的message是按topic分类存储的,topic中的数据又是按照一个一个的partition即分区存储到不同broker节点。每个partition对应了操作系统上的一个文件夹,partition实际上又是按照segment分段存储的。这也非常符合分布式系统分区分桶的设计思想。

通过这种分区分段的设计,Kafka的message消息实际上是分布式存储在一个一个小的segment中的,每次文件操作也是直接操作的segment。为了进一步的查询优化,Kafka又默认为分段后的数据文件建立了索引文件,就是文件系统上的.index文件。这种分区分段+索引的设计,不仅提升了数据读取的效率,同时也提高了数据操作的并行度。

 

总 结

总结起来,Kafka采用顺序读写、Page Cache、零拷贝以及分区分段等这些设计,再加上在索引方面做的优化,另外Kafka数据读写也是批量的而不是单条的,使得Kafka具有了高性能、高吞吐、低延时的特点。这样,Kafka提供大容量的磁盘存储也变成了一种优点。由于本人才粗学浅,表述有误的地方欢迎指教。

 

如何保证数据可靠性和一致性

2019-06-11 21:30:57 12675

学过大数据的同学应该都知道 Kafka,它是分布式消息订阅系统,有非常好的横向扩展性,可实时存储海量数据,是流数据处理中间件的事实标准。本文将介绍 Kafka 是如何保证数据可靠性和一致性的。

数据可靠性

Kafka 作为一个商业级消息中间件,消息可靠性的重要性可想而知。本文从 Producter 往 Broker 发送消息、Topic 分区副本以及 Leader 选举几个角度介绍数据的可靠性。

Topic 分区副本

在 Kafka 0.8.0 之前,Kafka 是没有副本的概念的,那时候人们只会用 Kafka 存储一些不重要的数据,因为没有副本,数据很可能会丢失。但是随着业务的发展,支持副本的功能越来越强烈,所以为了保证数据的可靠性,Kafka 从 0.8.0 版本开始引入了分区副本(详情请参见 KAFKA-50)。也就是说每个分区可以人为的配置几个副本(比如创建主题的时候指定 replication-factor,也可以在 Broker 级别进行配置 default.replication.factor),一般会设置为3。

Kafka 可以保证单个分区里的事件是有序的,分区可以在线(可用),也可以离线(不可用)。在众多的分区副本里面有一个副本是 Leader,其余的副本是 follower,所有的读写操作都是经过 Leader 进行的,同时 follower 会定期地去 leader 上的复制数据。当 Leader 挂了的时候,其中一个 follower 会重新成为新的 Leader。通过分区副本,引入了数据冗余,同时也提供了 Kafka 的数据可靠性。

Kafka 的分区多副本架构是 Kafka 可靠性保证的核心,把消息写入多个副本可以使 Kafka 在发生崩溃时仍能保证消息的持久性。

Producer 往 Broker 发送消息

如果我们要往 Kafka 对应的主题发送消息,我们需要通过 Producer 完成。前面我们讲过 Kafka 主题对应了多个分区,每个分区下面又对应了多个副本;为了让用户设置数据可靠性, Kafka 在 Producer 里面提供了消息确认机制。也就是说我们可以通过配置来决定消息发送到对应分区的几个副本才算消息发送成功。可以在定义 Producer 时通过 acks 参数指定(在 0.8.2.X 版本之前是通过 request.required.acks 参数设置的,详见 KAFKA-3043)。这个参数支持以下三种值:

根据实际的应用场景,我们设置不同的 acks,以此保证数据的可靠性。

另外,Producer 发送消息还可以选择同步(默认,通过 producer.type=sync 配置) 或者异步(producer.type=async)模式。如果设置成异步,虽然会极大的提高消息发送的性能,但是这样会增加丢失数据的风险。如果需要确保消息的可靠性,必须将 producer.type 设置为 sync。

Leader 选举

在介绍 Leader 选举之前,让我们先来了解一下 ISR(in-sync replicas)列表。每个分区的 leader 会维护一个 ISR 列表,ISR 列表里面就是 follower 副本的 Borker 编号,只有跟得上 Leader 的 follower 副本才能加入到 ISR 里面,这个是通过 replica.lag.time.max.ms 参数配置的,具体可以参见 《一文了解 Kafka 的副本复制机制》。只有 ISR 里的成员才有被选为 leader 的可能。

所以当 Leader 挂掉了,而且 unclean.leader.election.enable=false 的情况下,Kafka 会从 ISR 列表中选择第一个 follower 作为新的 Leader,因为这个分区拥有最新的已经 committed 的消息。通过这个可以保证已经 committed 的消息的数据可靠性。

综上所述,为了保证数据的可靠性,我们最少需要配置一下几个参数:

数据一致性

这里介绍的数据一致性主要是说不论是老的 Leader 还是新选举的 Leader,Consumer 都能读到一样的数据。那么 Kafka 是如何实现的呢?

Kafka 是如何保证数据可靠性和一致性

假设分区的副本为3,其中副本0是 Leader,副本1和副本2是 follower,并且在 ISR 列表里面。虽然副本0已经写入了 Message4,但是 Consumer 只能读取到 Message2。因为所有的 ISR 都同步了 Message2,只有 High Water Mark 以上的消息才支持 Consumer 读取,而 High Water Mark 取决于 ISR 列表里面偏移量最小的分区,对应于上图的副本2,这个很类似于木桶原理。

这样做的原因是还没有被足够多副本复制的消息被认为是“不安全”的,如果 Leader 发生崩溃,另一个副本成为新 Leader,那么这些消息很可能丢失了。如果我们允许消费者读取这些消息,可能就会破坏一致性。试想,一个消费者从当前 Leader(副本0) 读取并处理了 Message4,这个时候 Leader 挂掉了,选举了副本1为新的 Leader,这时候另一个消费者再去从新的 Leader 读取消息,发现这个消息其实并不存在,这就导致了数据不一致性问题。

当然,引入了 High Water Mark 机制,会导致 Broker 间的消息复制因为某些原因变慢,那么消息到达消费者的时间也会随之变长(因为我们会先等待消息复制完毕)。延迟时间可以通过参数 replica.lag.time.max.ms 参数配置,它指定了副本在复制消息时可被允许的最大延迟时间。