海码充电站的技术专栏 java Coder

队列- -kafka

2018-12-27
watermelon


队列kafka

前言

队列-kafka专题

1、概述及特性

消息系统通常都会由生产者,消费者,Broker三大部分组成,生产者会将消息写入到Broker,消费者会从Broker中读取出消息,不同的MQ实现的Broker实现会有所不同,不过Broker的本质都是要负责将消息落地到服务端的存储系统中。

Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。

特性
  • 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic可以分多个partition, consumer group 对partition进行consume操作。
  • 可扩展性:kafka集群支持热扩展
  • 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
  • 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
  • 高并发:支持数千个客户端同时读写
使用场景
  • 日志收集:一个公司可以用Kafka可以收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
  • 消息系统:解耦和生产者和消费者、缓存消息等。
  • 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
  • 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
  • 流式处理:比如spark streaming和storm
  • 事件源
需要Zookeeper来协调控制:
  • 1:管理broker与consumer(消费者)的动态加入与离开。(Producer(生产者)不需要管理,随便一台计算机都可以作为Producer向Kakfa Broker发消息)
  • 2:触发负载均衡,当broker或consumer(消费者)加入或离开时会触发负载均衡算法,使得一个consumer group内的多个consumer的消费负载平衡。(因为一个comsumer消费一个或多个partition,一个partition只能被一个consumer消费)
  • 3:维护消费关系及每个partition的消费信息。

    2、生产者

    生产流程:

  • 1:【发送到服务端】:客户端连接对象将消息包装到请求中发送到服务端
  • 2:【服务端存储】:服务端的入口也有一个连接对象负责接收请求,并将消息以文件的形式存储起来
  • 3:【响应返回客户端】:服务端返回响应结果给生产者客户端

3、消费者

消费者底层采用的是一个阻塞队列,只要生产者一生产处数据,相应消费者就会将数据消费。

消费流程:

  • 1:【请求服务端】:客户端连接对象将消费信息也包装到请求中发送给服务端
  • 2:【服务端取消息】:服务端从文件存储系统中取出消息
  • 3:【响应返回客户端】:服务端返回响应结果给消费者客户端
  • 4:【还原消息并处理】:客户端将响应结果还原成消息并开始处理消息

4、常用模式

点对点模式
  • 1:每个消息只用一个消费者
  • 2:发送者和接受者没有时间依赖
  • 3:接受者确认消息接受和处理成功
订阅模式
  • 每个消息可以有多个订阅者;(kafka中生产者指定多个groupid)

5、flush刷盘机制(定时刷盘)

熟悉Linux操作系统原理的都知道,当我们把数据写入到文件系统之后,数据其实在操作系统的page cache里面,并没有刷到磁盘上去。如果此时操作系统挂了,其实数据就丢了。
一方面,应用程序可以调用fsync这个系统调用来强制刷盘;另一方面,操作系统有后台线程,定期刷盘。
如果应用程序每写入1次数据,都调用一次fsync,那性能损耗就很大,所以一般都会在性能和可靠性之间进行权衡。因为对应一个应用来说,虽然应用挂了,只要操作系统不挂,数据就不会丢。
另外, kafka是多副本的,当你配置了同步复制之后。多个副本的数据都在page cache里面,出现多个副本同时挂掉的概率比1个副本挂掉,概率就小很多了。
对于kafka来说,也提供了相关的配置参数,来让你在性能与可靠性之间权衡:

log.flush.interval.messages  //多少条消息,刷盘1次
log.flush.interval.ms  //割多长时间,刷盘1次
log.flush.scheduler.interval.ms //周期性的刷盘,缺省3000,即3s。

6、想消费同一组数据 、消费已经被消费过的数据

consumer是底层采用的是一个阻塞队列,只要一有producer生产数据,那consumer就会将数据消费。
当然这里会产生一个很严重的问题,如果你重启一消费者程序,那你连一条数据都抓不到,但是log文件中明明可以看到所有数据都好好的存在。换句话说,一旦你消费过这些数据,那你就无法再次用同一个groupid消费同一组数据了。
原因:消费者消费了数据并不从队列中移除,只是记录了offset偏移量。
同一个consumergroup的所有consumer合起来消费一个topic,并且他们每次消费的时候都会保存一个offset参数在zookeeper的root上。如果此时某个consumer挂了或者新增一个consumer进程,将会触发kafka的负载均衡,暂时性的重启所有consumer,重新分配哪个consumer去消费哪个partition,然后再继续通过保存在zookeeper上的offset参数继续读取数据。注意:offset保存的是consumer 组消费的消息偏移。

  • 1:消费同一组数据怎么做
    方案1:采用不同的group(类似订阅模式)
    方案2:通过一些配置,就可以将线上产生的数据同步到镜像中去,然后再由特定的集群区处理大批量的数据

  • 2:消费已消费过的
    Conosumer.properties配置文件中有两个重要参数
    auto.commit.enable:如果为true,则consumer的消费偏移offset会被记录到zookeeper。下次consumer启动时会从此位置继续消费。
    auto.offset.reset 该参数只接受两个常量largest和Smallest,分别表示将当前offset指到日志文件的最开始位置和最近的位置。
    如果进一步想控制时间,则需要调用SimpleConsumer,自己去设置相关参数。比较重要的参数是 kafka.api.OffsetRequest.EarliestTime()和kafka.api.OffsetRequest.LatestTime()分别表示从日志(数据)的开始位置读取和只读取最新日志。
    如何使用SimpleConsumer
    首先,你必须知道读哪个topic的哪个partition
    然后,找到负责该partition的broker leader,从而找到存有该partition副本的那个broker
    再者,自己去写request并fetch数据
    最终,还要注意需要识别和处理brokerleader的改变

拓展:


上一篇 队列- -RabbitMQ

Comments

Content