首页 > 免root版 > GG修改器免root版32位_gg修改器免root版中文
GG修改器免root版32位_gg修改器免root版中文
  • GG修改器免root版32位_gg修改器免root版中文

  • 大小:14.29MB日期:2024-04-20 00:57:43
  • 语言:简体中文系统:Android
绿色无毒,安全可靠!部分设备误报拦截请通过!

应用详情

大家好,今天小编为大家分享关于GG修改器免root版32位_gg修改器免root版中文的内容,赶快来一起来看看吧。

  • try {

    while (true) {

    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100));

    for (ConsumerRecord<String, String> record : records) {

    int updateCount = 1;

    if (map.containsKey(record.value)) {

    updateCount = (int) map.get(record.value + 1);

    }

    map.put(record.value, updateCount);

    }

    }

    }finally {

    consumer.close;

    }

    线程安全性

    在同一个群组中,我们无法让一个线程运行多个消费者,也无法让多个线程安全的共享一个消费者。按照规则,一个消费者使用一个线程,如果一个消费者群组中多个消费者都想要运行的话,那么必须让每个消费者在自己的线程中运行,可以使用 Java 中的 ExecutorService 启动多个消费者进行进行处理。

    消费者配置

    到目前为止,我们学习了如何使用消费者 API,不过只介绍了几个最基本的属性,Kafka 文档列出了所有与消费者相关的配置说明。大部分参数都有合理的默认值,一般不需要修改它们,下面我们就来介绍一下这些参数。

    该属性指定了消费者从服务器获取记录的最小字节数。broker 在收到消费者的数据请求时,如果可用的数据量小于 fetch.min.bytes 指定的大小,那么它会等到有足够的可用数据时才把它返回给消费者。这样可以降低消费者和 broker 的工作负载,因为它们在主题使用频率不是很高的时候就不用来回处理消息。如果没有很多可用数据,但消费者的 CPU 使用率很高,那么就需要把该属性的值设得比默认值大。如果消费者的数量比较多,把该属性的值调大可以降低 broker 的工作负载。

    我们通过上面的 fetch.min.bytes 告诉 Kafka,等到有足够的数据时才会把它返回给消费者。而 fetch.max.wait.ms 则用于指定 broker 的等待时间,默认是 500 毫秒。如果没有足够的数据流入 kafka 的话,消费者获取的最小数据量要求就得不到满足,最终导致 500 毫秒的延迟。如果要降低潜在的延迟,就可以把参数值设置的小一些。如果 fetch.max.wait.ms 被设置为 100 毫秒的延迟,而 fetch.min.bytes 的值设置为 1MB,那么 Kafka 在收到消费者请求后,要么返回 1MB 的数据,要么在 100 ms 后返回所有可用的数据。就看哪个条件首先被满足。

    该属性指定了服务器从每个分区里返回给消费者的最大字节数。它的默认值时 1MB,也就是说,KafkaConsumer.poll 方法从每个分区里返回的记录最多不超过 max.partition.fetch.bytes 指定的字节。如果一个主题有20个分区和5个消费者,那么每个消费者需要至少4 MB的可用内存来接收记录。在为消费者分配内存时,可以给它们多分配一些,因为如果群组里有消费者发生崩溃,剩下的消费者需要处理更多的分区。max.partition.fetch.bytes 的值必须比 broker 能够接收的最大消息的字节数(通过 max.message.size 属性配置大),否则消费者可能无法读取这些消息,导致消费者一直挂起重试。在设置该属性时,另外一个考量的因素是消费者处理数据的时间。消费者需要频繁的调用 poll 方法来避免会话过期和发生分区再平衡,如果单次调用poll 返回的数据太多,消费者需要更多的时间进行处理,可能无法及时进行下一个轮询来避免会话过期。如果出现这种情况,可以把 max.partition.fetch.bytes 值改小,或者延长会话过期时间。

    这个属性指定了消费者在被认为死亡之前可以与服务器断开连接的时间,默认是 3s。如果消费者没有在 session.timeout.ms 指定的时间内发送心跳给群组协调器,就会被认定为死亡,协调器就会触发重平衡。把它的分区分配给消费者群组中的其它消费者,此属性与 heartbeat.interval.ms 紧密相关。heartbeat.interval.ms 指定了 poll 方法向群组协调器发送心跳的频率,session.timeout.ms 则指定了消费者可以多久不发送心跳。所以,这两个属性一般需要同时修改,heartbeat.interval.ms 必须比 session.timeout.ms 小,一般是 session.timeout.ms 的三分之一。如果 session.timeout.ms 是 3s,那么 heartbeat.interval.ms 应该是 1s。把 session.timeout.ms 值设置的比默认值小,可以更快地检测和恢复崩愤的节点,不过长时间的轮询或垃圾收集可能导致非预期的重平衡。把该属性的值设置得大一些,可以减少意外的重平衡,不过检测节点崩溃需要更长的时间。

    该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下的该如何处理。它的默认值是 latest,意思指的是,在偏移量无效的情况下,消费者将从最新的记录开始读取数据。另一个值是 earliest,意思指的是在偏移量无效的情况下,消费者将从起始位置处开始读取分区的记录。

    我们稍后将介绍几种不同的提交偏移量的方式。该属性指定了消费者是否自动提交偏移量,默认值是 true,为了尽量避免出现重复数据和数据丢失,可以把它设置为 false,由自己控制何时提交偏移量。如果把它设置为 true,还可以通过 mit.interval.ms 属性来控制提交的频率

    我们知道,分区会分配给群组中的消费者。PartitionAssignor 会根据给定的消费者和主题,决定哪些分区应该被分配给哪个消费者,Kafka 有两个默认的分配策略Range 和 RoundRobin

    该属性可以是任意字符串,broker 用他来标识从客户端发送过来的消息,通常被用在日志、度量指标和配额中

    该属性用于控制单次调用 call 方法能够返回的记录数量,可以帮你控制在轮询中需要处理的数据量。

    socket 在读写数据时用到的 TCP 缓冲区也可以设置大小。如果它们被设置为 -1,就使用操作系统默认值。如果生产者或消费者与 broker 处于不同的数据中心内,可以适当增大这些值,因为跨数据中心的网络一般都有比较高的延迟和比较低的带宽。

    提交和偏移量的概念

    特殊偏移

    我们上面提到,消费者在每次调用poll 方法进行定时轮询的时候,会返回由生产者写入 Kafka 但是还没有被消费者消费的记录,因此我们可以追踪到哪些记录是被群组里的哪个消费者读取的。消费者可以使用 Kafka 来追踪消息在分区中的位置(偏移量)

    消费者会向一个叫做 _consumer_offset 的特殊主题中发送消息,这个主题会保存每次所发送消息中的分区偏移量,这个主题的主要作用就是消费者触发重平衡后记录偏移使用的,消费者每次向这个主题发送消息,正常情况下不触发重平衡,这个主题是不起作用的,当触发重平衡后,消费者停止工作,每个消费者可能会分到对应的分区,这个主题就是让消费者能够继续处理消息所设置的。

    如果提交的偏移量小于客户端最后一次处理的偏移量,那么位于两个偏移量之间的消息就会被重复处理

    如果提交的偏移量大于最后一次消费时的偏移量,那么处于两个偏移量中间的消息将会丢失

    既然_consumer_offset 如此重要,那么它的提交方式是怎样的呢?下面我们就来说一下提交方式

    KafkaConsumer API 提供了多种方式来提交偏移量

    自动提交

    最简单的方式就是让消费者自动提交偏移量。如果 enable.mit 被设置为true,那么每过 5s,消费者会自动把从 poll 方法轮询到的最大偏移量提交上去。提交时间间隔由 mit.interval.ms 控制,默认是 5s。与消费者里的其他东西一样,自动提交也是在轮询中进行的。消费者在每次轮询中会检查是否提交该偏移量了,如果是,那么就会提交从上一次轮询中返回的偏移量。

    提交当前偏移量

    把 mit.offset 设置为 false,可以让应用程序决定何时提交偏移量。使用 commitSync 提交偏移量。这个 API 会提交由 poll 方法返回的最新偏移量,提交成功后马上返回,如果提交失败就抛出异常。

    异步提交

    异步提交 commitAsync 与同步提交 commitSync 最大的区别在于异步提交不会进行重试,同步提交会一致进行重试。

    同步和异步组合提交

    一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大的问题,因为如果提交失败是因为临时问题导致的,那么后续的提交总会有成功的。但是如果在关闭消费者或再均衡前的最后一次提交,就要确保提交成功。

    因此,在消费者关闭之前一般会组合使用mitSync提交偏移量。

    提交特定的偏移量

    消费者API允许调用 commitSync 和 commitAsync 方法时传入希望提交的 partition 和 offset 的 map,即提交特定的偏移量。

    作者简介:cxuan 一个正在路上坚持信仰的技术人

    声明:本文为作者投稿,版权归作者个人所有。

    【End】

    以上就是关于GG修改器免root版32位_gg修改器免root版中文的全部内容,感谢大家的浏览观看,如果你喜欢本站的文章可以CTRL+D收藏哦。

    相关文章

    热门下载

    大家还在搜