+-
踩坑了,解决了,总结了,现在是你的了。

我们系统最开始选用 kafka,互联网医院系统每天上午下午高峰期,系统的并发量不小,公司规定各部门都要轮流值班,出现线上问题时能及时处理。

医院后台系统属于用户订单的下游业务。

  • 1.用户下单后,订单系统会通过发 Kafka 消息给系统;
  • 系统读取消息后,做业务逻辑处理,持久化订单数据,然后展示到医院;
  • 这样医院知道订单,就可以通过该系统排班取号。
  • 系统自动通知挂号;
  • 如果用户已挂号,修改挂号状态,用户就知道哪些订单已经上了。
  • 系统可以大大提高排班效率。

    这一切的核心是:Kafka。

    接下来,我们一起聊聊使用 Kafka 踩过哪些坑?

    1. 顺序问题 1.1 为什么要保证消息的顺序?

    排班订单系统发消息时将订单详细数据放在消息体,我们后台系统只要订阅 topic,就能获取相关消息数据,然后处理自己的业务即可。

    订单有很多状态,比如下单、支付、完成、撤销等。

    不可能下单的消息都没读取到,就先读取支付或撤销的消息吧。要保证消息的顺序。

    1.2 如何保证消息顺序?

    我们都知道 Kafka 的 topic 是无序的,但一个 topic 包含多个 partition,每个 partition 内部是有序的。

    思路:只要保证生产者写消息时,按照一定的规则写到同一个 partition。不同的消费者读不同的 partition 的消息,就能保证生产和消费者消息的顺序。

    我们刚开始是这么做的,同一个用户编号的消息写到同一个 partition。topic 中创建了 4 个 partition,然后部署了 4 个消费者节点,构成消费者组。

    一个 partition 对应一个消费者节点。

    理论上是能够保证消息顺序的。

    1.3 出现意外

    上线刚开始还是比较正常的,很快就收到投诉,客户端有些排班订单一直看不到。

    那段时间网络经常不稳定,业务接口时不时报超时,业务请求时不时会连不上数据库。

    这对顺序消息的打击,可以说是毁灭性的。

    假设订单系统发了“下单”、“支付”、“完成” 三条消息。

    而“下单”消息由于网络原因,系统处理失败了,而后面的两条消息的数据是无法入库的。

    因为只有“下单”消息的数据才是完整的数据,其他类型的消息只会更新状态。

    加上当时没有做失败重试机制,这个问题被放大了。

    那么这个紧急的问题要如何解决呢?

    1.4 解决过程

    我们开始的想法是:在消费者处理消息时,如果处理失败了,立马重试 3-5 次。

    但如果有些请求 5 次依然不成功怎么办?

    不可能一直重试,同步重试机制在出现异常的情况,会严重影响消息消费者的消费速度,降低它的吞吐量。

    这种同步重试机制,会阻塞其他后台系统用户订单消息的读取。

    如果用异步重试机制,处理失败的消息就得保存到重试表下来。

    但有个新问题:只存一条消息如何保证顺序?

    假如“下单”消息失败了,还没来得及异步重试。此时,“支付”消息被消费了,它肯定是不能被正常消费的。

    存一条消息的确无法保证顺序。

    此时,“支付”消息一直等着,每隔一段时间判断一次,它前面的消息都有没有被消费?

    这么做,会出现两个问题:

  • “支付”消息前面只有“下单”消息,这种情况比较简单。但如果某种类型的消息,前面有 N多种消息,需要判断多少次呀?这种判断跟排班系统的耦合性太强了,相当于要把他们系统的逻辑搬一部分到我们系统;
  • 影响消费者的消费速度
  • 有简单的方案:消费者在处理消息时,先判断该订单号在重试表有没有数据,

    如果有则直接把当前消息保存到重试表;如果没有,则进行业务处理,如果出现异常,把该消息保存到重试表。

    后来我们用 elastic-job 建立了失败重试机制,如果重试了 7 次后还是失败,则将该消息的状态标记为失败,发邮件通知开发人员。

    现在由于网络不稳定,医院顶多偶尔延迟看到订单。

    2. 消息积压

    随着系统的服务越来越多。随之而来的是消息的数量越来越大,导致消费者处理不过来,经常出现消息积压的情况。

    对医院的影响非常直观,客户端上的订单信息可能半个小时后才能看到,这种延迟哪里忍得了。

    要先做系统优化,所以我们开始了消息积压问题解决之旅。(说加服务器节点就能解决问题)

    2.1 消息体过大

    Kafka 号称支持百万级的 TPS,从 producer 发送消息到 broker 需要一次网络 IO,broker 写数据到磁盘需要一次磁盘 IO(写操作),consumer 从 broker 获取消息先经过一次磁盘 IO(读操作),再经过一次网络 IO。

    一次简单的消息从生产到消费过程,需要经过两次网络 IO 和两次磁盘 IO。

    如果消息体过大,会增加 IO 的耗时,进而影响 Kafka 生产和消费的速度。消费者速度太慢,就会出现消息积压情况。

    同时,消息体过大可能会出现磁盘空间不足的情况。

    如何优化消息体过大呢?

    我们重新梳理了一下业务,其实没有必要知道订单的中间状态,只需知道一个最终状态就可以了。

    我们就可以这样设计了:

  • 订单系统发送的消息体只用包含 id 和状态等关键信息;
  • 后台显示系统消费消息后,通过 id 调用订单系统的订单详情查询接口获取数据;
  • 后台显示系统判断数据库中是否有该订单的数据,如果没有则入库,有则更新。
  • 2.2 路由规则不合理

    有天中午又反馈有延迟。我们一查 Kafka 的 topic 竟然又出现了消息积压。

    但这次不是所有 partition 上的消息都有积压,而是只有一个。

    刚开始,我以为是消费那个 partition 消息的节点出了什么问题导致的。但是经过排查,没有发现任何异常。

    后来,我查日志和数据库发现:有几个科室的订单量特别大,刚好这几个医院科室被分到同一个 partition,使得该 partition 的消息量比其他 partition 要多很多。

    这时我们才意识到,发消息时按医院科室编号路由 partition 的规则不合理。可能会导致有些 partition 消息太多消费者处理不过来,而有些 partition 却因为消息太少,消费者出现空闲的情况。

    为了避免出现这种分配不均匀的情况,我们需要对发消息的路由规则做一下调整。

    用订单号做路由相对更均匀,不会出现单个订单发消息次数特别多的情况。除非是遇到某个人一直下订单。

    调整后按订单号路由到不同的 partition,同一个订单号的消息,每次到发到同一个 partition。

    2.3 批量操作引起的连锁反应

    反馈又有延迟。这次问题出现得有点奇怪。

    首先,这次问题出现在晚上,不是高峰期。

    根据以往积累的经验,我直接看了 Kafka 的 topic 的数据,果然上面消息有积压。

    但这次每个 partition 都积压了十几万的消息没有消费,比以往加压的消息数量增加了几百倍。这次消息积压得极不寻常。

    我赶紧查服务监控看看消费者挂了没,还好没挂。又查服务日志没有发现异常。这时碰运气问了问下午发生了什么事情没?

    他们说下午有个促销活动,跑了一个 Job 批量更新过有些医院科室的订单信息。

    原来是他们在 Job 中批量发消息导致的问题。

    知道问题的原因了,积压的这十几万的消息该如何处理呢?

    此时,如果直接调大 partition 数量是不行的,历史消息已经存储到4个固定的 partition,只有新增的消息才会到新的 partition。我们重点需要处理的是已有的 partition。

    直接加服务节点也不行,因为 Kafka 允许同组的多个 partition 被一个 consumer 消费,但不允许一个 partition 被同组的多个 consumer 消费,可能会造成资源浪费。

    只有用多线程处理了。

    为了紧急解决问题,我改成了用线程池处理消息,核心线程和最大线程数都配置成了 50。

    调整之后,消息积压数量不断减少。

    但此时收到了报警邮件,有两个订单系统的节点宕机了。

    不久,同事过来找我说,我们系统调用他们订单查询接口的并发量突增,超过了预计的好几倍,导致有 2 个服务节点挂了。他们把查询功能单独整成了一个服务,部署了 6 个节点,挂了 2 个节点。再不处理,另外 4 个节点也会挂。订单服务可以说是最核心的服务。

    为了解决这个问题,只能先把线程数调小。

    幸好,线程数是可以通过 ZooKeeper 动态调整的。我把核心线程数调成了 8 个,最大线程数改成了 10 个。

    后面,运维把订单服务挂的 2 个节点重启后恢复正常了。以防万一,再多加了 2 个节点。

    为了确保订单服务不会出现问题,就保持目前的消费速度,系统的消息积压问题,1 小时候后也恢复正常了。

    后来,我们开了一次复盘会:

  • 订单系统的批量操作一定提前通知下游系统团队;
  • 下游系统团队多线程调用订单查询接口一定要做压测;
  • 这次给订单查询服务敲响了警钟。它作为公司的核心服务,应对高并发场景做的不够好,需要做优化;
  • 对消息积压情况加监控。
  • 顺便说一下,对于要求严格保证消息顺序的场景,可以将线程池改成多个队列,每个队列用单线程处理。

    2.4 表过大

    为了防止后面再次出现消息积压问题,消费者后面就一直用多线程处理消息。

    但有天中午我们还是收到很多报警邮件,提醒我们 Kafka 的 topic 消息有积压。我们正在查原因,此时又反馈有延迟。

    在外行看来:为什么同一个问题一直解决不了?

    其实,导致消息积压的原因其实有很多种…省略一万字。

    查日志发现消费者消费一条消息的耗时长达 2 秒,以前是 500 毫秒,发生了什么?

    消费者的代码也没有做大的调整,为什么会出现这种情况呢?

    查了一下线上表,单表数据量竟然到了几千万,其他也是一样,现在单表保存的数据太多了。

    我们服务端存着多余的数据,不如把表中多余的数据归档。于是 DBA 帮我们把数据做了归档,只保留最近 7 天的数据。

    3. 主键冲突

    其他的问题。比如报警邮件经常报出数据库异常:Duplicate entry '6' for key 'PRIMARY',说主键冲突。

    这种问题一般是由于有两个以上相同主键的 SQL,同时插入数据,第一个插入成功后,第二个插入的时候会报主键冲突。表的主键是唯一的,不允许重复。

    我检查了代码,发现代码逻辑会先根据主键从表中查询订单是否存在,如果存在则更新状态,不存在才插入数据。

    这种判断在并发量不大时,是有用的。

    但高并发的场景下,两个请求同一时刻都查到订单不存在,一个请求先插入数据,另一个请求再插入数据时就会出现主键冲突的异常。

    最常规的做法是:加锁。

    加数据库悲观锁,太影响性能。加数据库乐观锁,基于版本号判断,一般用于更新操作,像这种插入操作基本上不会用。

    可以加基于 Redis 的分布式锁,锁定订单号。

    但仔细思考了一下:

  • 加分布式锁也可能会影响消费者的消息处理速度;
  • 消费者依赖于 Redis,如果 Redis 出现网络超时,我们的服务就悲剧了。
  • 所以,我也不打算用分布式锁。而是选择使用 MySQL 的 INSERT INTO ...ON DUPLICATE KEY UPDATE 语法。

    它会先尝试把数据插入表,如果主键冲突的话那么更新字段。

    把以前的 insert 语句改造之后,就没再出现过主键冲突问题。

    4. 数据库主从延迟

    不久之后的某天,又收到反馈,说下单后,在客户端上看得到订单,但是不全,有时甚至看不到。

    根据以往的经验先看 Kafka 的 topic 中消息有没有积压,但这次并没有积压。

    再查了服务日志,发现订单系统接口返回的数据有些为空,有些只返回了订单数据,没返回用户数据。

    服务没有发现问题,会不会是数据库出问题了,果然 DBA发现数据库的主库同步数据到从库,由于网络原因偶尔有延迟,有时延迟有 5秒。

    如果我们的业务流程从发消息到消费消息耗时小于 5 秒,调用订单详情查询接口时,可能会查不到数据,或者查到的不是最新的数据。

    这个问题会导致直接我们的数据错误。

    为了解决这个问题,我们也加了重试机制。调用接口查询数据时,如果返回数据为空,或者只返回了订单没有用户信息,则加入重试表。

    5. 重复消费

    Kafka消费消息时支持三种模式:

  • at most once 模式:最多一次。保证每一条消息 commit 成功之后,再进行消费处理。消息可能会丢失,但不会重复;
  • at least once 模式:至少一次。保证每一条消息处理成功之后,再进行 commit。消息不会丢失,但可能会重复;
  • exactly once 模式:精确传递一次。将 offset 作为唯一 id 与消息同时处理,并且保证处理的原子性。消息只会处理一次,不丢失也不会重复。但这种方式很难做到。
  • Kafka 默认的模式是 at least once,但这种模式可能会产生重复消费的问题。所以我们的业务逻辑必须做幂等设计。

    而我们的业务场景保存数据时使用了 INSERT INTO ...ON DUPLICATE KEY UPDATE 语法,不存在时插入,存在时更新,是天然支持幂等性的。

    6. 多环境消费问题

    我们当时线上环境分为:pre(预发布环境)和 prod(生产环境),两个环境共用同一个数据库,并且共用同一个 Kafka 集群。

    需要注意的是,在配置 Kafka 的 topic 的时候,要加前缀用于区分不同环境。pre环境的以 pre_ 开头,比如 pre_order。生产环境以 prod_开头,比如 prod_order,防止消息在不同环境中串了。

    但有次运维在 pre 环境切换节点,配置 topic 的时候,错误地配成了 prod 的 topic。刚好那天我们有新功能上 pre 环境,结果悲剧了:prod 的有些消息被 pre 环境的 consumer 消费了。

    而由于消息体做了调整,导致 pre 环境的 consumer 处理消息一直失败,其结果是生产环境丢了部分消息。

    最后生产环境消费者通过重置offset,重新读取了那一部分消息,解决了问题。

    除了上述问题之外,我还遇到过:

    后记
  • Kafka 的 consumer 使用自动确认机制,导致 CPU 使用率 100%;
  • Kafka 集群中的一个 broker 节点挂了,重启后又一直挂。这两个问题说起来有些复杂,我就不一一列举了。
  • 非常感谢这两年使用消息中间件 Kafka 的经历,

    虽说遇到过挺多问题,踩了很多坑,走了很多弯路,但是实打实的让我积累了很多宝贵的经验,快速成长了。

    其实 Kafka 是一个非常优秀的消息中间件,我所遇到的绝大多数问题都并非 Kafka 自身的问题(除了 CPU 使用率 100% 是它的一个 bug 导致的之外)。


    - EOF -

    推荐阅读   点击标题可跳转

    我用kafka两年踩过的一些非比寻常的坑

    Kafka 为什么那么快?


    看完本文有收获?请转发分享给更多人

    关注「ImportNew」,提升Java技能

    点赞和在看就是最大的支持❤️