关于kafka消费者的命令
consumer group下可以有一个或多个consumer instance,consumer instance可以是一个进程,也可以是一个线程sh kafka-consumer-groups.sh --bootstrap-server "ip:端口,ip:端口,ip:端口" --group 组 --describe
kafka默认端口 kafka默认端口9092修改
kafka默认端口 kafka默认端口9092修改
我在本地idea起了两个进程
那么CON在kafka中有个配置,这个配置用来管理Kafka读取数据的级别。而这个配置默认是能够读取预提交阶段的数据的,所以如果你没改这个配置,那两阶段提交的阶段就是白费了。所以需要改一下这个配置,来更换一下隔离级别:SUMER-ID会有两个,但是CLIENT-ID依旧是一个。
怀疑CLIENT-ID如果ip不同的话,会不同。那这样的话我重新打个jar包放到其他地方执行,是否就会显示不一样呢?
通过idea的
File -->Project Structure -->Artifacts --> Jar --> From module with dependencies.
Build --> Build Artifacts
等一通作生成了j方式一:从当前主题的迁移量位置+1开始取数据ar包放在了其他机器上跑
发现CLIENT-ID依然一样
另外测试过sh kafka-console-consumer.sh --bootstrap-server "xx" --topic xx
消费数据时,再使用sh kafka-consumer-groups.sh 来检查,并不会有关于CONSUMER-ID和CLIENT-ID,即没有识别到消费进程。
4.一文搞定:Flink与Kafka之间的精准一次性
index, log, snapshot, timeindex 文件以当前 Segment 的条消息的 Offset 命名。其中 “.index” 文件存储大量的索引信息,“.log” 文件存储大量的数据,索引文件中的元数据指向对应数据文件中 Message 的物理偏移量。本次的聊法,还是要通过以kafka(source)->Flink,Flink(source)->Kafka来分别展开讨论。
消费者组的概念就是:当有多个应用程序都需要从Kafka获取消息时,让每个app对应一个消费者组,从而使每个应用程序都能获取一个或多个Topic的全部消息;在每个消费者组中,往消费者组中添加消费者来伸缩读取能力和处理能力,消费者组中的每个消费者只处理每个Top在上一篇文章当中,也算是比较详细且通俗的聊了聊Flink是如何通过checkpoint机制来完成数据精准一次性的实现的。并且也在上一章的结尾表示,要在接下来聊一聊Flink与它的铁哥们Kafaka之间,是如何实现数据的精准一次性消费的。ic的一部分的消息,每个消费者对应一个线程。kafka是一个具有数据保存、数据回放能力的消息队列,说白了就是kafka中的每一个数据,都有一个专门的标记作为标识。而在Flink消费kafka传入的数据的时候,source任务就能够将这个偏移量以算子状态的角色进行保存,写入到设定好的检查点中。这样一旦发生故障,Flink中的FlinkKafkaProduce连接器就i能够按照自己保存的偏移量,自己去Kafka中重新拉取数据,也正是通过这种方式,就能够确保Kafka到Flink之间的精准一次性。
在上一篇文章当中,已经表明了,如果想要让输出端能够进行精准一次性消费,就需要使用到幂等性或者是事务。而事务中的两阶段提交是所有方案里面的实现。
这也就表明了,当数据通过Flink发送给sink端Kafka的时候,是经历了两个阶段的处理的。阶段就是Flink向Kafka中插入数据,进入预提交阶段。当JobMar发送的Ckeckpoint保存成功信号过来之后,才会提交事务进行正式的数据发送,也就是让原来不可用的数据可以被使用了。
这个实现过程到目前阶段就很清晰了,它的主体流程无非就是在开启检查点之后,由JobMar向各个阶段的处理逻辑发送有关于检查点的barrier。所有的计算任务接收到之后,就会根据自己当前的状态做一个检查点保存。而当这个barrier来到sink任务的时候,sink就会开启一个事务,然后通过这个事务向外预写数据。直到Jobmar来告诉它这一次的检查点已经保存完成了,sink就会进行第二次提交,数据也就算是成功写出了。
1.必须要保证检查点被打开了,如果检查点没有打开,那么之前说的一切话都是空谈。因为Flink默认检查点是关着的。
3.配置Kafka读取数据的隔离级别
4.事务超时时间
这个配置也很有意思,大家试想一下。如果要进行两阶段提交,就要保证sink端支持事务,Kafka是支持事务的,但是像这个组件对于很多机制都有一个超时时间的概念,也就是说如果时间到了这个界限还没完成工作,那就会默认这个工作失败。Kafka中由这个概念,Flink中同样由这个概念。但是flink默认超时时间是1小时,而Kafka默认是15分钟,这就有可能出现检查点保存东西的时间大于15分钟,如说是16分钟保存完成然后给sink发送检查点保存陈功可以提交事务的信号,但是这个时候Kafka已经认为事务失败,把之前的数据都扔了。那数据不就是丢失了么。所以说Kafka的超时时间要大于Flink的超时时间才好。
kafka消息的管理
2.在FlinkKafakProducer连接器的构造函数中要传入参数,这个参数就是用来保证状态一致性的。就是在构造函数的一个参数输入如下:kafka producer将消息发送给broker后,消息日志会被存储在broker的磁盘上,采用顺序写入的方式。顺序写可以加快磁盘访问速度,并且可以将将多个小型的逻辑写合并成一次大型的物理磁盘写入,数据显示顺序写比随机写入快6000倍以上。另外,作系统使用内存对磁盘进行缓存即pagecache,pagecache完全由作系统管理,这也使得写数据变得即简洁也快速。
配置中可以调整过期时间,超过改时间的消息日志将移除,默认值为7天;也可配置文件大小阈值,达到该阈值后,从最旧消息开始删除。配置项为:
从文件到套接字的常见数据传输路径有4步:
1).作系统从磁盘读取数据到内核空间的 pagecache
2).应用程序读取内核空间的数据到用户空间的缓冲区
3).应用程序将数据(用户空间的缓冲区)写回内核空间到套接字缓冲区(内核空间)
kafka使用 producer ,broker 和 consumer 都共享的标准化的二进制消息格式,这样数据块不用修改就能在他们之间传递。kafka采用Linux 中系统调用sendfile的方式,直接将数据从 pagecache 转移到 socket 网络连接中。这种零拷贝方式使得kafka数据传输更加高效。
以前面文进入kafka容器章中安装的kafka为例: Mac 安装kafka
kafka本地文件存储目录可以在配置文件server.properties中设置,参数及默认值为:
进入该目录,可以看到kafka保存的cosumer offset和topic消息:
其中__consumer_offsets开头的为消费的offset信息,test1开头的即为之前创建的topic “test1”,该topic有三个分区,分区编号从0开始,分别是test1-0、test1-1、test1-2。
进入test1-0,查看包含文件如下:
可以看到kafka消息按partition存储的,每个partition一个目录。partition下消息分段(segment)存储,默认每段1G,通过参数log.segment.bytes可配置。segment包含索引文件index、消息文件log,分别存储消息的索引和内容,以.index和.log结尾,文件命名为当前segment个消息offset。index文件在log每隔一定数据量之间建立索引,可以通过参数index.interval.bytes配置。
通过kafka命令查看00000000000000000000.index内容如下:
00000000000000000000.log内容如下:
其中索引文件中包含两个字段:(offset,ition),分别表示消息offset和该消息在log文件的偏移量。如上图中offset=0的消息对应的ition=0;对应的就是00000000000000000000.log中的条消息:
其中payload为具体的消息内容。
另外里面还有一个以".timeindex"结尾的文件,查看其内容:
该日志文件是kafka0.10.1.1加入的,其中保存的为:(消息时间戳,offset)。时间戳是该segment一个消息对应的时间戳(与log文件中一条记录时间戳一致),kafka也支持根据时间来读取消息。
由上可知消息是按partition来存储的,partition可以配置n个副本followers。多个partition和其follower在broker上是怎么分配的呢?
partition和broker都进行了排序,下标从0开始;
设有k个broker,第i个partition被分配到到 i%k 个broker上;
第i%k个broker即为partition i 的leader,负责这个partition的读写 ;
partition的followers也进行排序,从leader的后续broker开始分配,第i个partition的第j个副本broker为 (j+ i%k)%k。
一个有3个broker的kafka集群,包含3个partition,每个partition副本数为1的topic如下图:
总结:
kafka将消息日志采用顺序写入的方式存放在broker磁盘中;数据传输通过系统调用sendfile零拷贝方式;消息日志分段存放,可配置清除时间或大小阈值;每段包含消息索引、消息内容两个文件,通过索引实现快速查找;按照/topic/partition的目录结构分开存储,且均匀分布到集群各broker上。
参authorizer.class.name=org.apache.ranger.authorization.kafka.authorizer.RangerKafkaAuthorizer考:
kafka-docker上使用+常用指令
其实Flink到Kafak之间也是采用了这种方式,具体的可以看一下ctrl进到FlinkKafkaProduce连接器内部去看一看:生产者向broker发送消息,消费者接收消息,broker是物理概念,部署几个kafka即几个broker,topic是逻辑概念,往topic里发送消息会发送到设置好的几个partion上,每个partion存储作为不同队列存储不同数据,partion有leader和follower备份机制,消息发送时会轮循发送到不同broker的不同partion中,同一消费者只能消费同一分区,通过offset记录消费位置,消费者组可以访问一个topic的不同partion
那看源码吧启动镜像
启动kafka可以带上参数,这样会自动修改kafka里的配置文件(/opt/kafka_版本/conf/server.properties),否则不带参数需要自己进入进行手动修改 带参数版启动可参考
其中172.17.0.3需要改成自己docker的网桥连接地址
查看已启动容器
查看所有容器
启动未启动的容器
创建主题
主题和分区可以理解为:topic是逻辑划分,kafka通过topic进行区分消息,topic的数据会被存储到日志中,如果数据量太大可以引入partion(同时提高读写吞吐量)来分段存储数据。其中replication-factor作用是将任意分区到broker上,broker是物理概念,部署了一个kafka可认为broker数为1,我本机只有一个kafka所以这里replication-factor超过1会报错。 综上几个概念可以理解为:集群中有多个broker,创建主题时可以指明topic有多个partitions(消息拆分到不同分区进行存储,一个partion只能被一个消费者消费--partion内部保证接收数据顺序),可以为分区创建多个副本replication,不同副本在不同的broker中(作为备份使用,这里有leader和flower的区分) 。
查看topic信息
集群部署
生产者:
消费者:
方式二:从当前主题条消息开始消费
生产者将消息发送broker,broker将消息保存到本地日志中,消息的保存时有序的
单播消针对网络抖动导致的生产者重试(发送消息),可以设置消费者加锁解决;息:
当存在一个生产者,一个消费者组的时候,一个消费者组中只有一个消费者会收到消息
多播消息:
查看消费组详细信息:
LOG-END-OFFSET:消息总量(一条消息的偏移量)
LAG :积压了多少条消息
常见问题:
1、如何防止消息丢失
生产者:使用同步消息发送;ack设置为1/all;设置同步分区数>=2
消费者:把自动提交改成手动提交
2、如何防止消息的重复消费
消费者使用多线程异步处理接收数据;创建多个消费者组部署到其他机器上;通过业务架构设计,提升业务层面消费性能。
ps:
缓冲区:kafka默认会创建一个消息缓冲区去存放要发送的消息,大小是32M,每次本地线程会去缓冲区拉16K数据发送到broker,如果不到16K等待10ms也会将数据发送到broker
参考链接:
1、kafka安装教程--
2、kafka配置文件server.properties参数说明
3、创建主题分区数
5、通过docker-come集群部署
6、学习视频
kafka部署遇到的问题
4).作系统将数据从套接字缓冲区(内核空间)到通过网络发送的 NIC 缓冲区kafka部署的问题
kafka.zookeeper.ZooKeeperTimeoutException: Timed out waiting for connection while in state: CONNECTING
这个问题的是由于kafka启动的时候连接zookeeper比较慢, 把超时时间设置大一点就可以了,默认是6000ms
server.properties中增加
consumer group下的topic下的每个分区只能分配给某个group下的一个consumer(当然该分区还可以被分配给其他group)zookeeper.connection.timeout.ms=6000000
2. 发送消息时报一下错误
ja.lang.IllegalStateException: No entry found for connection 0
server.properties中增加
aertised.listeners=PLAINTEXT://172.28.21.243:9092
ip地址为发送消息端连接kafka的ip地址
kafka系列之(3)——Coordinator与offset管理和Consumer Rebalance
3、消息积压consumer group是kafka提供的可扩展且具有容错性的消费者机制。组内可以有多个消费者或消费者实例(consumer instance),它们共享一个公共的ID,即group ID。组内的所有消费者协调在一起来消费主题(subscribed topics)的所有分区(partition)。
group.id是一个字符串,标识一个consumer group
Coordinator一般指的是运行在broker上的group Coordinator,用于管理Consumer Group中各个成员,每个Kafka都有一个GroupCoordinator实例,管理多个消费者组,主要用于offset位移管理和Consumer Rebalance。
对于每个Consumer Group,Coordinator会存储以下信息:
消费者在消费的过程中需要记录自己消费了多少数据,即消费位置信息。在Kafka中这个位置信息有个专门的术语:位移(offset)。
(1)、很多消息引擎都把这部分信息保存在端(broker端)。这样做的好处当然是实现简单,但会有三个主要的问题:
1. broker从此变成有状态的,会影响伸缩性;
2. 需要引入应答机制(acknowledgement)来确认消费成功。
3. 由于要保存很多consumer的offset信息,必然引入复杂的数据结构,造成资源浪费。
(2)、Kafka默认是定期帮你自动提交位移的(enable.automit = true),你当然可以选择手动提交位移实现自己控制。
(3)、另外kafka会定期把group消费情况保存起来,做成一个offset map,如下图所示:
上图中表明了test-group这个组当前的消费情况。
老版本的位移是提交到zookeeper中的,目录结构是:/consumers/ ,但是zookeeper其实并不适合进行大批量的读写作,尤其是写作。 __consumers_offsets topic配置了compact策略,使得它总是能够保存的位移信息,既控制了该topic总体的日志容量,也能实现保存offset的目的。compact的具体原理请参见: Log Compaction 至于每个group保存到__consumers_offsets的哪个分区,如何查看的读取消息。消费者一个或多个主题,并按照消息生产的顺序读取它们。消费者通过检查消息的偏移量来区分已经读取过的消息。偏移量是另一种元数据,他是一个不断递增的整数值,在创建消息时,Kafka回吧它添加到消息里。在给定的分区里,每个消息的偏移量都是的。消费者把每个分区读取的消息偏移量保存在Zookeeper或者Kafka上,如果消费者关闭或重启,它的读取状态不会丢失。问题请参见这篇文章: Kafka 如何读取offset topic内容 (__consumer_offsets) offset提交消息会根据消费组的key(消费组名称)进行分区. 对于一个给定的消费组,它的所有消息都会发送到的broker(即Coordinator) Coordinator上负责管理offset的组件是 Offset mar 。负责存储,抓取,和维护消费者的offsets. 每个broker都有一个offset mar实例. 有两种具体的实现: ZookeeperOffsetMar: 调用zookeeper来存储和接收offset(老版本的位移管理)。 DefaultOffsetMar: 提供消费者offsets内置的offset管理。 通过在config/server.properties中的offset.storage参数选择。 DefaultOffsetMar 除了将offset作为logs保存到磁盘上,DefaultOffsetMar维护了一张能快速服务于offset抓取请求的 consumer offsets表 。这个表作为缓存,包含的含仅仅是”offsets topic”的partitions中属于leader partition对应的条目(存储的是offset)。 对于DefaultOffsetMar还有两个其他属性: “和””,默认值都是1。这两个属性会用来自动地创建”offsets topic”。 offset mar接口的概要: 什么是rebalance? rebalance本质上是一种协议,规定了一个consumer group下的所有consumer如何达成一致来分配topic的每个分区。比如某个group下有20个consumer,它了一个具有100个分区的topic。正常情况下,Kafka平均会为每个consumer分配5个分区。这个分配的过程就叫rebalance。Kafka新版本consumer默认提供了两种分配策略:range和round-robin。 rebalance的触发条件有三种: 组成员发生变更(新consumer加入组、已有consumer主动离开组或已有consumer崩溃了——这两者的区别后面会谈到) 主题数发生变更——这当然是可能的,如果你使用了正则表达式的方式进行,那么新建匹配正则表达式的topic就会触发rebalanc4、解决docker容器启动不了的问题e 主题的分区数发生变更 refer JNDI是J2EE中一个很重要的标准,通常我们是在J2EE编程中用到,Tomcat中提供了在JSP和Servelt中直接使用JNDI的方法,主要是通过dbcp连接池 重点讲使用: 1.配置context.xml 注意地址是:tomcat安装目录下的conf,该目录下应该有了context.xml文件。貌似以前的版本是在项目目录中的META-INF目录下配置,我刚开始时参考是也是老版本的作方法,谁知道试了一晚都不行。。。如果不是那里配置的话会可能出现这样的错误提示:jax.naming.NameNotFoundException: Name jdbc is not bound in this Context? name="hello" type="jax.sql.DataSource" driverClassN截止到目前为止,基本上把有关于状态维护的一些东西都说完了,有状态后端、有检查点。还通过检查点完成可端到端的数据精准一次性消费。但是想到这我又感觉,如果有学习进度比我一些的,万一没办法很好的理解怎么办。所以在下一篇文章当中我就聊聊Flink中的“状态”到底是个什么东西,都有什么类型,都怎么去用。ame="com.mysql.jdbc.Driver" username="root" maxIdle="4" 现在微服务流行,很多公司起项目都是分布式微服务,但是你想过没有,不是把一个单体拆开了,用域名去相互调就叫微服务。好的微服务架构设计模式里要求每个服务的自治性,这样服务拆分成微服务后才能稳定。 怎么才能让每个服务尽量达到自治性呢?这就需要领域、溯源、CQRS、Saga这些设计模式,不好意思一下子说了很多概念,以后慢慢给大家解释。 这几个模式里边有个关键点—需要通过把听说Rocket MQ很多概念也来自Kafka,学会它其他的消息中间件基本也大不的都会了,今天分享一篇Kafka的基础入门文章给大家领域发布给远程的其他服务,完成数据同步。这就需要消息中间件了,消息中间件这块我了解的也不深,公司里用RocketMQ,不过付费版和开源版别很大。 Kafka 是一个分布式的基于发布/模式的消息队列(Message Queue),主要应用与大数据实时处理领域。其主要设计目标如下: 下面给出 Kafka 一些重要概念,让大家对 Kafka 有个整体的认识和感知 Kafka分区 Kafka和Zookeeper的关系 在了解kafka集群之前, 我们先来了解下kafka的工作流程, Kafka集群会将消息流存储在 Topic 的中,每条记录会由一个Key、一个Value和一个时间戳组成。 Kafka的工作流程 Kafka 中消息是以 Topic 进行分类的,生产者生产消息,消费者消费消息,读取和消费的都是同一个 Topic。但是Topic 是逻辑上的概念, Partition 是物理上的概念,每个 Partition 对应一个 log 文件,该 log 文件中存储的就是 Producer 生产的数据。 Producer 生产的数据会不断顺序追加到该 log 文件末尾,并且每条数据都会记录有自己的 Offset 。而消费者组中的每个消费者,也都会实时记录当前自己消费到了哪个 Offset,方便在崩溃恢复时,可以继续从Kafka存储机制上次的 Offset 位置消费。 此时 Producer 端生产的消息会不断追加到 log 文件末尾,这样文件就会越来越大, 为了防止 log 文件过大导致数据定位效率低下,那么Kafka 采取了分片和索引机制。它将每个 Partition 分为多个 Segment,每个 Segment 对应4个文件:“.index” 索引文件, “.log” 数据文件, “.snapshot” 快照文件, “.timeindex” 时间索引文件。这些文件都位于同一文件夹下面,该文件夹的命名规则为:topic 名称-分区号。例如, heartbeat心跳上报服务 这个 topic 有三个分区,则其对应的文件夹为 heartbeat-0,heartbeat-1,heartbeat-2这样。 下图为index 文件和 log 文件的结构示意图: index 文件和 log 文件的结构示意图 kafka中的 Partition 为了保证数据安全,每个 Partition 可以设置多个副本。此时我们对分区0,1,2分别设置3个副本(注:设置两个副本是比较合适的)。而且每个副本都是有"角色"之分的,它们会选取一个副本作为 Leader 副本,而其他的作为 Follower 副本,我们的 Producer 端在发送数据的时候,只能发送到Leader Partition里面 ,然后Follower Partition会去Leader那自行同步数据, Consumer 消费数据的时候,也只能从 Leader 副本那去消费数据的。 Kafka集群副本 Kafka集群副本 Kafka Controller,其实就是一个 Kafka 集群中一台 Broker,它除了具有普通Broker 的消息发送、消费、同步功能之外,还需承担一些额外的工作。Kafka 使用公平竞选的方式来确定 Controller ,在 ZooKeeper 成功创建临时 /controller 的Broker会成为 Controller ,一般而言,Kafka集群中台启动的 Broker 会成为Controller,并将自身 Broker 编号等信息写入ZooKeeper临时/controller。 Consumer 在消费过程中可能会出现断电宕机等故障,在 Consumer 恢复后,需要从故障前的 Offset 位置继续消费。所以 Consumer 需要实时记录自己消费到了哪个 Offset,以便故障恢复后继续消费。在 Kafka 0.9 版本之前,Consumer 默认将 Offset 保存在 ZooKeeper 中,但是从 0.9 版本开始,Consumer 默认将 Offset 保存在 Kafka 一个内置的 Topic 中,该 Topic 为 __consumer_offsets, 以支持高并发的读写。 上面和大家一起深入探讨了 Kafka 的, 基础知识和集群架构,下一篇会从Kafka 三高(高性能, 高可用, 高并发)方面来详细阐述其巧妙的设计思想。大家期待..... 2、kafka插件集成后,因为kafka启动脚本必须指定参数: nohup bin/kafka-server-start.sh config/server.properties & 这将导致无法加载config目录下ranger插件的配置文password="123456"件 需要在环境变量中配置kafka的cinfig目录:export CLASSPATH=.: JAVA_HOME/lib/dt.jar: JAVA_HOME/lib/tools.jar:$KAFKA_HOME/config 3、在不开启kerberos的条件下,kafka无法实现对具体用户的权限控制,因为任何用户发起produce数据到topic上,kafka都会默认为是ANONYMOUS用户 4、kafka插件集成的配置项: 在kafka的server.properties文件最下面添加如下内容 Kafka为这两种模型提供了单一的消费者抽象模型: 消费者组 (consumer group)。 消费者用一个消费者组名标记自己。 一个发布在Topic上消息被分发给此消费者组中的一个消费者。 如所有的消费者都在一个组中,那么这就变成了队列模型。 如所有的消费者都在不同的组中,那么就完全变成了发布-模型。 一个消费者组中消费者同一个Topic,每个消费者接受Topic的一部分分区的消息,从而实现对消费者的横向扩展,对消息进行分流。 注意:当单个消费者无法跟上数据生成的速度,就可以增加更多的消费者分担负载,每个消费者只处理部分partition的消息,从而实现单个应用程序的横向伸缩。但是不要让消费者的数量多于partition的数量,此时多余的消费者会空闲。此外,Kafka还允许多个应用程序从同一个Topic读取所有的消息,此时只要保证每个应用程序有自己的消费者组即可。 在同一个群组中,无法让一个线程运行多个消费者,也无法让多线线程安全地共享一个消费者。按照规则,一个消费者使用一个线程,如果要在同一个消费者组中运行多个消费者,需要让每个消费者运行在自己的线程中。把消费者的逻辑封装在自己的对象中,然后使用ja的ExecutorServ启动多个线程,使每个消费者运行在自己的线程上,可参考 一个 consumer group 中有多个 consumer,一个 topic 有多个 partition,所以必然会涉及到 partition 的分配问题,即确定哪个 partition 由哪个 consumer 来消费。 关于如何设置partition值需要考虑的因素 Kafka 有两种分配策略,一个是 RoundRobin,一个是 Range,默认为Range,当消费者组内消费者发生变化时,会触发分区分配策略(方法重新分配)。 以上三种现象会使partition的所有权在消费者之间转移,这样的行为叫作再均衡。 再均衡的缺点 : RoundRobin 轮询方式将分区所有作为一个整体进行 Hash 排序,消费者组内分配分区个数别为 1,是按照组来分的,可以解决多个消费者消费数据不均衡的问题。 但是,当消费者组内不同主题时,可能造成消费混乱,如下图所示,Consumer0 主题 A,Consumer1 主题 B。 将 A、B 主题的分区排序后分配给消费者组,TopicB 分区中的数据可能 分配到 Consumer0 中。 Range 方式是按照主题来分的,不会产生轮询方式的消费混乱问题。 但是,如下图所示,Consumer0、Consumer1 同时了主题 A 和 B,可能造成消息分配不对等问题,当消费者组内的主题越多,分区分配可能越不均衡。 由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。 consumer group +topic + partition 确定一个offest Kafka 0.9 版本之前,consumer 默认将 offset 保存在 Zookeeper 中,从 0.9 版本开始, consumer 默认将 offset 保存在 Kafka 一个内置的 topic 中,该 topic 为__consumer_offsets。 你如果特别好奇,实在想看看offset什么的,也可以执行下面作: 修改配置文件 consumer.properties 再启动一个消费者 当消费者崩溃或者有新的消费者加入,那么就会触发再均衡(rebalance),完成再均衡后,每个消费者可能会分配到新的分区,而不是之前处理那个,为了能够继续之前的工作,消费者需要读取每个partition一次提交的偏移量,然后从偏移量指定的地方继续处理。 case1:如果提交的偏移量小于客户端处理的一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理。 case2:如果提交的偏移量大于客户端处理的一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失。 自动提交的优点是方便,但是可能会重复处理消息 不足:broker在对提交请求作出回应之前,应用程序会一直阻塞,会限制应用程序的吞吐量。 因此,在消费者关闭之前一般会组合使用commitAsync和commitSync提交偏移量。 ConsumerRebalanceListener需要实现的两个方法 下面的例子演示如何在失去partition的所有权之前通过onPartitionRevoked()方法来提交偏移量。 Consumer有个Rebalance的特性,即重新负载均衡,该特性依赖于一个协调器来实现。每当Consumer Group中有Consumer退出或有新的Consumer加入都会触发Rebalance。 之所以要重新负载均衡,是为了将退出的Consumer所负责处理的数据再重新分配到组内的其他Consumer上进行处理。或当有新加入的Consumer时,将组内其他Consumer的负载压力,重新进均匀分配,而不会说新加入一个Consumer就闲在那。 下面就用几张图简单描述一下,各种情况触发Rebalance时,组内成员是如何与协调器进行交互的。 Tips :图中的Coordinator是协调器,而generation则类似于乐观锁中的版本号,每当成员入组成功就会更新,也是起到一个并发控制的作用。 参考: 创建消息。一般情况下一个消息会被发布到一个特定的主题上。生产者在默认情况下把消息均衡地分布到主题的所有分区上,而并不关心特定消息会被写到哪个分区。不过,在某些情况下,生产者会把消息直接写到指定的分区。通常是通过消息键和分区器来实现,分区器为键生成一个散列值,并将其映射到指定的分区上。这样可以保证包含同一个键的消息会被写到同一个分区上。生产者也可以使用自定义的分区器。 kafka通过partition的概念,保证了partition内消息有序性,缓解了上面的问题。 partition内消息会分发给所有分组 ,每个分组只有一个consumer能消费这条消息。这个语义保证了某个分组消费某个分区的消息,是同步而非并发的。如果一个topic只有一个partition,那么这个topic并发消费有序,否则只是单个partition有序。 消费者是消费者群组的一部分,也就是说,会有一个或多个消费者共同读取一个主题。群组保证每个分CURRENT-OFFSET:被消费的偏移量区只能被一个消费者使用。 设这么个场景:我们从Kafka中读取消息,并且进行检查,产生结果数据。我们可以创建一个消费者实例去做这件事情,但如果生产者 写入消息的速度比消费者读取的速度快 怎么办呢?这样随着时间增长,消息堆积越来越。对于这种场景,我们需要 增加多个消费者来进行水平扩展 。 我们可以 通过增加消费组的消费者来进行水平扩展提升消费能力 。这也是为什么建议创建主题时使用比较多的分区数,这样可以在消费负载高的情况下增加消费者来提升性能。另外,消费者的数量不应该比分区数多,因为多出来的消费者是空闲的,没有任何帮助。 Kafka一个很重要的特性就是,只需写入一次消息,可以支持任意多的应用读取这个消息。换句话说,每个应用都可以读到全量的消息。为了使得 每个应用都能读到全量消息,应用需要有不同的消费组 。对于上面的例子,如我们新增了一个新的消费组G2,而这个消费组有两个消费者,那么会是这样的: 总结起来就是:如果应用需要读取全量消息,那么请为该应用设置一个消费组;如果该应用消费能力不足,那么可以考虑在这个消费组里增加消费者 。 可以看到,当新的消费者加入消费组,它会消费一个或多个分区,而这些分区之前是由其他消费者负责的;另外,当消费者离开消费组(比如重启、宕机等)时,它所消费的分区会分配给其他分区。这种现象称为 重平衡(rebalance) 。重平衡是Kafka一个很重要的性质,这个性质保证了高可用和水平扩展。不过也需要注意到,在重平衡期间,所有消费者都不能消费消息,因此会造成整个消费组短暂的不可用。而且,将分区进行重平衡也会导致原来的消费者状态过期,从而导致消费者需要重新更新状态,这段期间也会降低消费性能。后面我们会讨论如何安全的进行重平衡以及如何尽可能避免。 消费者通过定期发送心跳(hearbeat)到一个作为组协调者(group coordinator)的broker来保持在消费组内存活。这个broker不是固定的,每个消费组都可能不同。当消费者拉取消息或者提交时,便会发送心跳。 如果消费者超过一定时间没有发送心跳,那么它的会话(session)就会过期,组协调者会认为该消费者已经宕机, 然后触发重平衡 。可以看到,从消费者宕机到会话过期是有一定时间的,这段时间内该消费者的分区都不能进行消息消费;通常情况下,我们可以进行优雅关闭,这样消费者会发送离开的消息到组协调者,这样组协调者可以立即进行重平衡而不需要等待会话过期。 在0.10.1版本,Kafka对心跳机制进行了修改,将发送心跳与拉取消息进行分离,这样使得发送心跳的频率不受拉取的频率影响。另外更高版本的Kafka支持配置一个消费者多长时间不拉取消息但仍然保持存活,这个配置可以避免活锁(livelock)。活锁,是指应用没有故障但是由于某些原因不能进一步消费。kafkak配置仅允许受信任的JNDI连接如何配置?
再均衡的优点 :微服务想搞好,消息中间件不能少,Kafka基础入门介绍
现在开始配置:ranger-kafka插件
而Kafka选择了不同的方式:每个consumer group管理自己的位移信息,那么只需要简单的一个整数表示位置就够了;同时可以引入checkpoint机制定期持久化,简化了应答机制的实现。kafka——消费者原理解析
可以通过come集群化部署过es,这里通过创建另一个come.yml文件来部署kafka,配置文件参考 docker-come集群部署kafka总结
maxActive="4"