跳到正文

kafka学习

Posted by lili Blog on August 7, 2021 · 读取中...

    kafka学习

    说在前面:

    最近新接触的一个项目中,接触到了ddmq的相关工作,经了解ddmq的本质就是通过kakfa来改造了。其实在19年在dp实习的时候,就已经接触到了kafka这个东西,不过当时丝毫不懂,仅仅知道是消息队列,可以用来做存储。然后根据测试文档的命令,会写数据和查数据,当数据库来用。= = 恰逢这次对kafka再次有所接触。在这里做一个自己学习整理的笔记。

    对一个东西的认知可以从三个方面来初步了解。是什么?为什么?怎么用?

    先引入一个消息系统的概念

    在大数据中,使用了大量的数据。 关于数据,我们有两个主要挑战。第一个挑战是如何收集大量的数据,第二个挑战是分析收集的数据。 为了克服这些挑战,您必须需要一个消息系统。

    什么是消息系统?

    消息系统负责将数据从一个应用程序传输到另一个应用程序,因此应用程序可以专注于数据,但不担心如何共享它。 分布式消息传递基于可靠消息队列的概念。 消息在客户端应用程序和消息传递系统之间异步排队。 有两种类型的消息模式可用 - 一种是点对点,另一种是发布 - 订阅(pub-sub)消息系统。 大多数消息模式遵循 pub-sub

    点对点消息系统

    在点对点系统中,消息被保留在队列中。 一个或多个消费者可以消耗队列中的消息,但是特定消息只能由最多一个消费者消费。 一旦消费者读取队列中的消息,它就从该队列中消失。 该系统的典型示例是订单处理系统,其中每个订单将由一个订单处理器处理,但多个订单处理器也可以同时工作。但是对于一个特定的订单,只有其中一个订单处理器可以拿到该订单进行处理。 下图描述了结构。

    image-20210703211734940

    发布 - 订阅消息系统

    在发布 - 订阅系统中,消息被保留在主题中。 与点对点系统不同,消费者可以订阅一个或多个主题并使用该主题中的所有消息。 在发布 - 订阅系统中,消息生产者称为发布者,消息使用者称为订阅者。 一个现实生活的例子是Dish电视,它发布不同的渠道,如运动,电影,音乐等,任何人都可以订阅自己的频道集,并获得他们订阅的频道时可用。

    image-20210703211741692

    什么是Kafka?

    Apache Kafka是一个分布式发布 - 订阅消息系统和一个强大的队列,可以处理大量的数据,并使您能够将消息从一个端点传递到另一个端点。 Kafka适合离线和在线消息消费。 Kafka消息保留在磁盘上,并在群集内复制以防止数据丢失。 Kafka构建在ZooKeeper同步服务之上。 它与Apache Storm和Spark非常好地集成,用于实时流式数据分析。

    好处

    以下是Kafka的几个好处 -

    • 可靠性 - Kafka是分布式,分区,复制和容错的。
    • 可扩展性 - Kafka消息传递系统轻松缩放,无需停机。
    • 耐用性 - Kafka使用分布式提交日志,这意味着消息会尽可能快地保留在磁盘上,因此它是持久的。
    • 性能 - Kafka对于发布和订阅消息都具有高吞吐量。 即使存储了许多TB的消息,它也保持稳定的性能。

    Kafka非常快,并保证零停机和零数据丢失。

    用例

    Kafka可以在许多用例中使用。 其中一些列出如下 -

    • 指标 - Kafka通常用于操作监控数据。 这涉及聚合来自分布式应用程序的统计信息,以产生操作数据的集中馈送。
    • 日志聚合解决方案 - Kafka可用于跨组织从多个服务收集日志,并使它们以标准格式提供给多个服务器。
    • 流处理 - 流行的框架(如Storm和Spark Streaming)从主题中读取数据,对其进行处理,并将处理后的数据写入新主题,供用户和应用程序使用。 Kafka的强耐久性在流处理的上下文中也非常有用。

    way

    子:

    从2个面试题说起,

    第1个问题,如果一台机器上有10w个定时任务,如何做到高效触发?

    具体场景是:

    有一个APP实时消息通道系统,对每个用户会维护一个APP到服务器的TCP连接,用来实时收发消息,对这个TCP连接,有这样一个需求:“如果连续30s没有请求包(例如登录,消息,keepalive包),服务端就要将这个用户的状态置为离线”。

    其中,单机TCP同时在线量约在10w级别,keepalive请求包较分散大概30s一次,吞吐量约在3000qps。

    怎么做?

    常用方案使用time定时任务,每秒扫描一次所有连接的集合Map<uid, last_packet_time>,把连接时间(每次有新的请求更新对应连接的连接时间)比当前时间的差值大30s的连接找出来处理。

    另一种方案,使用环形队列法:

    image-20210703223312958

    三个重要的数据结构:

    1)30s超时,就创建一个index从0到30的环形队列(本质是个数组)

    2)环上每一个slot是一个Set,任务集合

    3)同时还有一个Map<uid, index>,记录uid落在环上的哪个slot里

    这样当有某用户uid有请求包到达时:

    1)从Map结构中,查找出这个uid存储在哪一个slot里

    2)从这个slot的Set结构中,删除这个uid

    3)将uid重新加入到新的slot中,具体是哪一个slot呢 => Current Index指针所指向的上一个slot,因为这个slot,会被timer在30s之后扫描到

    4)更新Map,这个uid对应slot的index值

    哪些元素会被超时掉呢?

    Current Index每秒种移动一个slot,这个slot对应的Set中所有uid都应该被集体超时!如果最近30s有请求包来到,一定被放到Current Index的前一个slot了,Current Index所在的slot对应Set中所有元素,都是最近30s没有请求包来到的。

    所以,当没有超时时,Current Index扫到的每一个slot的Set中应该都没有元素。