Kafka –分布式订阅消息(转载)

引言

Kafka是LinkedIn开源出来的一款消息服务器,用Scala

语言实现;这货的性能是百万级的QPS(估计是挂载了多块磁盘),我随便写个测试程序就是十万级。

1、Kafka基本概念

​Kafka是分布式发布-订阅消息系统。它最初由LinkedIn公司开发,之后成为Apache项目的一部分。Kafka是一个分布式的,可划分的,冗余备份的持久性的日志服务。它主要用于处理活跃的流式数据。

在大数据系统中,常常会碰到一个问题,整个大数据是由各个子系统组成,数据需要在各个子系统中高性能,低延迟的不停流转。传统的企业消息系统并不是非常适合大规模的数据处理。为了已在同时搞定在线应用(消息)和离线应用(数据文件,日志)Kafka就出现了。

Kafka可以起到两个作用:

  1. 1、降低系统组网复杂度。
  • 2、降低编程复杂度,各个子系统不在是相互协商接口,各个子系统类似插口插在插座上,Kafka承担高速数据总线的作用。

Kafka主要特点:

  1. 1、同时为发布和订阅提供高吞吐量。据了解,Kafka每秒可以生产约25万消息(50 MB),每秒处理55万消息(110 MB)。
  • 2、可进行持久化操作。将消息持久化到磁盘,因此可用于批量消费,例如ETL,以及实时应用程序。通过将数据持久化到硬盘以及replication防止数据丢失。
  • 3、分布式系统,易于向外扩展。所有的producer、broker和consumer都会有多个,均为分布式的。无需停机即可扩展机器。
  • 4、消息被处理的状态是在consumer端维护,而不是由server端维护。当失败时能自动平衡。
  • 5、支持online和offline的场景。
kafka流程图

​Kafka的整体架构非常简单,是显式分布式架构,producer、broker(kafka)和consumer都可以有多个。Producer,consumer实现Kafka注册的接口,数据从producer发送到broker,broker承担一个中间缓存和分发的作用。broker分发注册到系统中的consumer。broker的作用类似于缓存,即活跃的数据和离线处理系统之间的缓存。客户端和服务器端的通信,是基于简单,高性能,且与编程语言无关的TCP协议。几个基本概念:

  1. Topic:特指Kafka处理的消息源(feeds of messages)的不同分类。
  2. 在Kafka中消息是按照Topic进行分类的;每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)。
  3. ​Partition:Topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。      ​​
  4.       Parition是物理存储上的概念,创建Topic时可指定Parition数量。每个Parition对应一个存储文件夹,文件夹下存储该Parition所持有的消息数据和索引文件。Topic进行分区划分的主要目的是出于性能方面的考虑,Kafka尽量的使所有分区均匀的分布到集群所有的节点上而不是集中在某些节点上,另外主从关系也尽量均衡,这样每个节点都会担任一定比例的分区的Leader。每个Parition是一个有序的队列,每条消息在Parition中拥有一个offset。
  • Message:消息,是通信的基本单位,每个producer可以向一个topic(主题)发布一些消息。
  • Producers:消息和数据生产者,向Kafka的一个topic发布消息的过程叫做producers。
  • Consumers:消息和数据消费者,订阅topics并处理其发布的消息的过程叫做consumers。
  • Broker:缓存代理,Kafka集群中的一台或多台服务器统称为broker。

​消息推送的流程

消息推送
  • Producer根据指定的partition方法(round-robin、hash等),将消息发布到指定topic的partition里面
  • kafka集群接收到Producer发过来的消息后,将其持久化到硬盘,并保留消息指定时长(可配置),而不关注消息是否被消费。
  • Consumer从kafka集群pull数据,并控制获取消息的offset

Kafka的设计:

1、吞吐量

高吞吐是kafka需要实现的核心目标之一,为此kafka做了以下一些设计:

  1. 数据磁盘持久化:消息不在内存中cache,直接写入到磁盘,充分利用磁盘的顺序读写性能
  • zero-copy:减少IO操作步骤
  • 数据批量发送
  • 数据压缩
  • Topic划分为多个partition,提高parallelism

负载均衡

  1. producer根据用户指定的算法,将消息发送到指定的partition
  • 存在多个partiiton,每个partition有自己的replica,每个replica分布在不同的Broker节点上
  • 多个partition需要选取出lead partition,lead partition负责读写,并由zookeeper负责fail over
  • 通过zookeeper管理broker与consumer的动态加入与离开

拉取系统

由于kafka broker会持久化数据,broker没有内存压力,因此,consumer非常适合采取pull的方式消费数据,具有以下几点好处:

  1. 简化kafka设计
  • consumer根据消费能力自主控制消息拉取速度
  • consumer根据自身情况自主选择消费模式,例如批量,重复消费,从尾端开始消费等

可扩展性

当需要增加broker结点时,新增的broker会向zookeeper注册,而producer及consumer会根据注册在zookeeper上的watcher感知这些变化,并及时作出调整。

Kayka的应用场景:

1.消息队列

比起大多数的消息系统来说,Kafka有更好的吞吐量,内置的分区,冗余及容错性,这让Kafka成为了一个很好的大规模消息处理应用的解决方案。消息系统一般吞吐量相对较低,但是需要更小的端到端延时,并尝尝依赖于Kafka提供的强大的持久性保障。在这个领域,Kafka足以媲美传统消息系统,如ActiveMR

或RabbitMQ。

2.行为跟踪

Kafka的另一个应用场景是跟踪用户浏览页面、搜索及其他行为,以发布-订阅的模式实时记录到对应的topic里。那么这些结果被订阅者拿到后,就可以做进一步的实时处理,或实时监控,或放到hadoop/离线数据仓库里处理。

3.元信息监控

作为操作记录的监控模块来使用,即汇集记录一些操作信息,可以理解为运维性质的数据监控吧。

4.日志收集

日志收集方面,其实开源产品有很多,包括Scribe、Apache Flume。很多人使用Kafka代替日志聚合(log aggregation)。日志聚合一般来说是从服务器上收集日志文件,然后放到一个集中的位置(文件服务器或HDFS)进行处理。然而Kafka忽略掉文件的细节,将其更清晰地抽象成一个个日志或事件的消息流。这就让Kafka处理过程延迟更低,更容易支持多数据源和分布式数据处理。比起以日志为中心的系统比如Scribe或者Flume来说,Kafka提供同样高效的性能和因为复制导致的更高的耐用性保证,以及更低的端到端延迟。

5.流处理

这个场景可能比较多,也很好理解。保存收集流数据,以提供之后对接的Storm或其他流式计算框架进行处理。很多用户会将那些从原始topic来的数据进行阶段性处理,汇总,扩充或者以其他的方式转换到新的topic下再继续后面的处理。例如一个文章推荐的处理流程,可能是先从RSS数据源中抓取文章的内容,然后将其丢入一个叫做“文章”的topic中;后续操作可能是需要对这个内容进行清理,比如回复正常数据或者删除重复数据,最后再将内容匹配的结果返还给用户。这就在一个独立的topic之外,产生了一系列的实时数据处理的流程。Strom

和Samza是非常著名的实现这种类型数据转换的框架。

6.事件源

事件源是一种应用程序设计的方式,该方式的状态转移被记录为按时间顺序排序的记录序列。Kafka可以存储大量的日志数据,这使得它成为一个对这种方式的应用来说绝佳的后台。比如动态汇总(News feed)。

7.持久性日志(commit log)

Kafka可以为一种外部的持久性日志的分布式系统提供服务。这种日志可以在节点间备份数据,并为故障节点数据回复提供一种重新同步的机制。Kafka中日志压缩功能为这种用法提供了条件。在这种用法中,Kafka类似于Apache BookKeeper项目。

Kayka的设计要点:

1、直接使用linux 文件系统的cache,来高效缓存数据。

2、采用linux Zero-Copy提高发送性能。传统的数据发送需要发送4次上下文切换,采用sendfile系统调用之后,数据直接在内核态交换,系统上下文切换减少为2次。根据测试结果,可以提高60%的数据发送性能。Zero-Copy详细的技术细节可以参考:https://www.ibm.com/developerworks/linux/library/j-zerocopy/

3、数据在磁盘上存取代价为O(1)。kafka以topic来进行消息管理,每个topic包含多个part(ition),每个part对应一个逻辑log,有多个segment组成。每个segment中存储多条消息(见上图),消息id由其逻辑位置决定,即从消息id(offset)可直接定位到消息的存储位置,避免id到位置的额外映射。每个part在内存中对应一个index,记录每个segment中的第一条消息偏移。发布者发到某个topic的消息会被均匀的分布到多个part上(随机或根据用户指定的回调函数进行分布),broker收到发布消息往对应part的最后一个segment上添加该消息,当某个segment上的消息条数达到配置值或消息发布时间超过阈值时,segment上的消息会被flush到磁盘,只有flush到磁盘上的消息订阅者才能订阅到,segment达到一定的大小后将不会再往该segment写数据,broker会创建新的segment。

4、显式分布式,即所有的producer、broker和consumer都会有多个,均为分布式的。Producer和broker之间没有负载均衡机制。broker和consumer之间利用zookeeper进行负载均衡。所有broker和consumer都会在zookeeper中进行注册,且zookeeper会保存他们的一些元数据信息。如果某个broker和consumer发生了变化,所有其他的broker和consumer都会得到通知。

每个Consumer是完全独立,如果多个Consume想轮流消费同一个Topic的同一个Parition就做不到;后来Kafka发明了一个Consumer-group的概念,每个Consumer客户端被创建时,会向Zookeeper注册自己的信息;一个group中的多个Consumer可以交错的消费一个Topic的所有Paritions;简而言之,保证此Topic的所有Paritions都能被此group所消费,且消费时为了性能考虑,让Parition相对均衡的分散到每个Consumer上,Consume-group之间是完全独立。

很多传统的Message Queue都会在消息被消费完后将消息删除,一方面避免重复消费,另一方面可以保证Queue的长度比较短,提高效率。而如上文所述,Kafka并不删除已消费的消息,为了实现传统Message Queue消息只被消费一次的语义,Kafka保证每条消息在同一个Consumer Group里只会被某一个Consumer消费。与传统Message Queue不同的是,Kafka还允许不同Consumer Group同时消费同一条消息,这一特性可以为消息的多元化处理提供支持。​

发布到Kafka的消息在一个Parition中是顺序存储的,发布者可以通过随机、哈希、轮训等方式发布到多个分区中,消费者通过指定offset进行消费;所以Kafka当中消息的顺序性更多的取决于使用方如何使用。

Kafka系统中消息支持容灾备份存储,每个Parition有主分区、备用分区的概念,一个Topic中的多个Parition的主分区可能落在不同的物理机器上面,Kafka也是尽量让其分布在不同的机器上以提高系统性能。消息的读写都是通过主分区直接完成,客户端要直连主分区所在的物理机进行读写操作。备用分区就像一个"Consumer"消费主分区的消息并保存在本地日志中进行备份;主分区负责跟踪所有的备用分区的状态,如果备用分区"落后"太多或者失效,主分区将会把它从同步列表中删除;主备分区的管理是通过zookeeper进行的。

发布时的可靠性取决于两点:发送端的确认机制、以及Kafka系统落地的策略

。发送端支持无确认、主分区确认(主分区收到消息后发送确认回执)、以及主备分区确认(备用分区消息同步后主分区才发送确认回执)三种机制;Kafka系统落地的策略有两种刷盘方式:通过配置消息数、以及配置刷盘时间间隔。

消费时的可靠性取决于消费者的读取逻辑,Kafka是不保存消息的任何状态的。At most once、At least once 、Exactly once 三种模式需要自己按照业务实现,最容易实现就是At least once,两外两种在分布式系统中都不可能做到完全的绝对实现,只能无限靠近,降低错误率。

3、消息存储方式

Parition是以文件的形式存储在文件系统

中,比如创建了一个名为tipocTest的Topic,其有4个Parition,在Kafka的数据目录下面会有四个文件夹,按照Topic-partnum命名。

Parition中的每条Message由offset来表示它在这个Parition中的偏移量,这个offset不是该Message在Parition数据文件中的实际存储位置,而是逻辑上一个值,它唯一确定了Parition中的一条Message。因此,可以认为offset是Parition中Message的id。Parition中的每条Message包含了三个属性: Offset 、DataSize 、Data;Parition的数据文件则包含了若干条上述格式的Message,按offset由小到大排列在一起;Kafka收到新的消息后追加到文件末尾即可,所以消息的发布效率是很高的。

下面我们来思考另一个问题,如果一个Parition只有一个数据文件会怎么样? 新消息是添加在文件末尾,不论文件数据文件有多大,这个操作永远都是O(1)。但是在读取的时候根据offset查找Message是顺序查找的,因此,如果数据文件很大的话,查找的效率就低。那么Kafka

是如何解决查找效率的的问题呢?1) 分段、2) 索引。

4、数据文件的分段与索引

Kafka

解决查询效率的手段之一是将数据文件分段,可以配置每个数据文件的最大值,每段放在一个单独的数据文件里面,数据文件以该段中最小的offset命名。这样在查找指定offset的Message的时候,用二分查找

就可以定位到该Message在哪个段中。

数据文件分段使得可以在一个较小的数据文件中查找对应offset的Message了,但是这依然需要顺序扫描才能找到对应offset的Message。为了进一步提高查找的效率,Kafka

为每个分段后的数据文件建立了索引文件

,文件名与数据文件的名字是一样的,只是文件扩展名为.index。索引文件

中包含若干个索引条目,每个条目表示数据文件中一条Message的索引——Offset与position(Message在数据文件中的绝对位置)的对应关系;index文件中并没有为数据文件中的每条Message建立索引,而是采用了稀疏存储的方式,每隔一定字节的数据建立一条索引。这样避免了索引文件

占用过多的空间,从而可以将索引文件

保留在内存中。但缺点是没有建立索引的Message也不能一次定位到其在数据文件的位置,从而需要做一次顺序扫描,但是这次顺序扫描的范围就很小了。

每个分段还有一个.timeindex索引文件,这个文件的格式与.index文件格式一样,所记录的东西是消息发布时间与offset的稀疏索引,用于消息定期删除使用。

下图是一个分段索引的例子

这套机制是建立在offset是有序的;索引文件

被映射到内存中,所以查找的速度还是很快的。一句话,Kafka

的Message存储采用了分区(Parition)、分段(segment)和稀疏索引这几个手段来达到高效发布和随机读取。

5、消费端设计

出于性能、容灾方面的考虑,实际需求是有多Consumer消费一个Topic的情况;由于多个Consumer之间是相互独立的,可以采用竞争Parition上岗的方式进行消费,同一个时刻只有一个Consumer在消费一个Parition,多个Consumer之间定期同步offset状态;如果是需要多通道消费,可以竞争不同的Parition对应资源上岗消费。

由于Kafka是按照offset进行读取的,一般的client都封装成:给定一个起始offset后续不停的get就可以顺序读取了,没有消费确认的概念,Kafka也不会维护每个消息、每个Consumer的状态。其实实现一套消费确认机制也不难,这需要我们实现一个proxy层,在proxy层保留一个循环缓冲区,业务端消费确认后方可从缓冲区里面移除,如果一段时间没有确认,下次来取的时候重复下发下去。

总结:为了解决kafka的效率问题 ,它把​对数据使用 分段,索引的方法。索引是范围索引。

message 的offset (偏移值)​就像他的id一样 可以通过他来确定一个message 在partion中的位置

Consumer-group的​中consumer 可以消费一个或者多个message 这要看相对于partion的数量。

还在学习当中。有些东西还不太清楚。希望自己保持初心。

参考文档:http://www.infoq.com/cn/articles/kafka-analysis-part-4​

http://mp.weixin.qq.com/s?__biz=MzI4OTU3ODk3NQ==&mid=2247483898&idx=1&sn=e5abc4cd7ae3650efc920498483d479b&chksm=ec2c4b4adb5bc25c774b900d579ea9cf9ea3bc04d5c0df517fe2c775b85b406dd1a2efd82a39&mpshare=1&scene=23&srcid=0217hWKtP4AK21NuFM5DnAvr#rd

http://blog.jobbole.com/75328/

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注