.

kafka介绍

参考:https://zh.wikipedia.org/wiki/Kafka

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。其持久化层本质上是一个“按照分布式事务日志架构的大规模发布/订阅消息队列”,这使它作为企业级基础设施来处理流式数据非常有价值。此外,Kafka可以通过Kafka Connect连接到外部系统(用于数据输入/输出),并提供了Kafka Streams——一个Java流式处理库 (计算机)。
kafka框架kafka存储的消息来自任意多被成为“生产者”(producer)的进程。数据从而可以被分配到不同的“分区”(partition),不同的(topic)下。在一个分区内,这些消息被索引并连同时间戳存储在一起。其他被成为“消费者”(consumer)的进程可以从分区查询消息。kafka运行在一个由一台或多台服务器组成的集群上,并且分区可以跨集群结点分布。
kafka高效的处理实时流式数据,可以实现与storm、hbase和spark的集成。作为集群部署在多台服务器上,kafka处理它所有的发布和订阅消息系统使用了四个API。它能够传递大规模流式消息,自带容错功能,已经取代了一些传统消息系统,如JMS,AMQP等。
生产者API:支持应用程序发布Record流。
消费者API:支持应用程序订阅Topic和处理Record流。
Stream API:将输入流转换为输出流,并产生结果。
Connector API:执行可重用得生产者和消费者API,可将Topic链接到现有应用程序。
Topic 用来对消息进行分类,每个进入到kafka的消息都会被放到一个Topic下
Broker 用来实现数据存储的主机服务器
Partition 每个Topic中的消息会被分为若干个Partition,以提高消息的处理效率
Producer 消息的生产者
Consumer 消息的消费者
Consumer Group 消息的消费群组

Kafka特性

参考:https://blog.csdn.net/z769184640/article/details/51585419
通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息。
支持通过kafka服务器和消费机集群来分区消息。
支持Hadoop并行数据加载。
Kafka的目的是提供一个发布订阅解决方案,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。

kafka与zooker的关系整理

参考:http://blog.sina.com.cn/s/blog_69cf1be90102xcr8.html

配置管理服务kafka使用zookeeper来存储一些meta信息,并使用了zookeeper watch机制来发现meta信息的变更并作出相应的动作

broker node registry :当kafka的broker启动后,会向zookeeper注册自己的节点信息(临时znode),当broker 与zookeeper断开,此znode会被删除broker

topic registry:当一个broker启动后,会向
zookeeper注册自己持有的topic和partition

consumer 和consumer group:一个group中的多个consumer可以交错的消费一个topic的所有partitions;简而言之,保证此topic的所有partitions都能被此group所消费,且消费时为了性能考虑,让partition相对均衡的分散到每个consumer上.状态同步服务consumer保存 消费消息的offset 在zookeeper

partition的leader (host:port)注册在zookeeper中, producer作为 zookeeper的client,注册了watch用来监听partition leader的变更事件zookeeper 支持kafka的 partition 的 leader 和follower的协同与选举, 保证partition 中只要leader/follower中只要有一个正常, 服务就不会中断

汇总:

Producer端使用zookeeper用来”发现”broker列表,以及和Topic下每个partition leader建立socket连接并发送消息

Broker端使用zookeeper用来注册broker信息,已经监测partitionleader存活性.

Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition leader建立socket连接,并获取消息

zookeeper集群搭建

参考:https://www.ilanni.com/?p=11393

主机名 IP 端口
k8s1 192.168.26.135 2181
k8s2 192.168.26.136 2181
k8s3 192.168.26.137 2181

安装jdk和zookeeper

1
2
3
4
5
6
7
8
9
10
$ yum install java-1.8.0-openjdk -y
$ mkdir -p /system/zookeeper/{logs,data}
$ cd /system/zookeeper
$ wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.13/zookeeper-3.4.13.tar.gz
$ tar zxvf zookeeper-3.4.13.tar.gz
$ cd zookeeper-3.4.13
$ cp conf/zoo_sample.cfg conf/zoo.cfg
$ vim /etc/profile
export PATH=$PATH:/root/zookeeper-3.4.13/bin
$ source /etc/profile

注意:在搭建zookeeper集群时,一定要停止已经启动的zookeeper。

1
2
3
$ zkServer.sh stop
ZooKeeper JMX enabled by defaultUsing config: /root/zookeeper-3.4.13/bin/../conf/zoo.cfg
Stopping zookeeper … STOPPED

修改zookeeper的配置文件zoo.cfg

1
2
3
4
5
6
7
8
9
10
vim /usr/local/zookeeper/conf/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
clientPort=2181
dataLogDir=/system/zookeeper/logs
dataDir=/system/zookeeper/data
server.1= 192.168.26.135:2888:3888
server.2= 192.168.26.136:2888:3888
server.3= 192.168.26.137:2888:3888
1
2
3
4
5
6
7
配置文件参数说明:
tickTime这个时间是作为zookeeper服务器之间或客户端与服务器之间维持心跳的时间间隔,也就是说每个tickTime时间就会发送一个心跳。
initLimit这个配置项是用来配置zookeeper接受客户端(这里所说的客户端不是用户连接zookeeper服务器的客户端,而是zookeeper服务器集群中连接到leader的follower 服务器)初始化连接时最长能忍受多少个心跳时间间隔数。当已经超过10个心跳的时间(也就是tickTime)长度后 zookeeper 服务器还没有收到客户端的返回信息,那么表明这个客户端连接失败。总的时间长度就是 10*2000=20秒。
syncLimit这个配置项标识leader与follower之间发送消息,请求和应答时间长度,最长不能超过多少个tickTime的时间长度,总的时间长度就是5*2000=10秒。
dataDir顾名思义就是zookeeper保存数据的目录,默认情况下zookeeper将写数据的日志文件也保存在这个目录里;
clientPort这个端口就是客户端连接Zookeeper服务器的端口,Zookeeper会监听这个端口接受客户端的访问请求;
server.A=B:C:D中的A是一个数字,表示这个是第几号服务器,B是这个服务器的IP地址,C第一个端口用来集群成员的信息交换,表示这个服务器与集群中的leader服务器交换信息的端口,D是在leader挂掉时专门用来进行选举leader所用的端口。

创建ServerID标识

除了修改zoo.cfg配置文件外,zookeeper集群模式下还要配置一个myid文件,这个文件需要放在dataDir目录下。这个文件里面有一个数据就是A的值(该A就是zoo.cfg文件中server.A=B:C:D中的A),在zoo.cfg文件中配置的dataDir路径中创建myid文件。

在192.168.26.135服务器上创建myid文件,并设置为1,同时与zoo.cfg文件里面的server.1对应,如下:

1
echo "1" > /system/zookeeper/data/myid

在192.168.26.136服务器上创建myid文件,并设置为2,同时与zoo.cfg文件里面的server.2对应,如下:

1
echo "2" > /system/zookeeper/data/myid

在192.168.26.137服务器上创建myid文件,并设置为3,同时与zoo.cfg文件里面的server.3对应,如下:

1
echo "3" > /system/zookeeper/data/myid

zkServer.sh start注意:在启动第一台zookeeper的时候可能会报错,等三台zookeeper全部启动完成之后就不会报错了。
zookeeper启动完毕后,我们来查看各个服务器上zookeeper的状态。如下:

1
2
3
4
5
6
7
8
9
[root@k8s1 ~]# zkServer.sh status
ZooKeeper JMX enabled by defaultUsing config: /system/zookeeper/zookeeper-3.4.13/bin/../conf/zoo.cfgMode: follower

[root@k8s2 ~]# zkServer.sh status
ZooKeeper JMX enabled by defaultUsing config: /system/zookeeper/zookeeper-3.4.13/bin/../conf/zoo.cfgMode: follower

[root@k8s3 ~]# zkServer.sh status
ZooKeeper JMX enabled by defaultUsing config: /system/zookeeper/zookeeper-3.4.13/bin/../conf/zoo.cfgMode:
leader

zookeeper添加删除节点,需要先改zoo.cfg配置文件,然后再一个一个节点zkServer.sh restart

kafka集群搭建

参考:https://kafka.apache.org/documentation/#gettingStarted

https://blog.csdn.net/z769184640/article/details/51585419

1
2
3
4
5
6
$ mkdir -p /system/kafka/kafka-logs
$ cd /system/kafka
$ wget http://mirrors.shu.edu.cn/apache/kafka/2.0.1/kafka_2.12-2.0.1.tgz
$ tar zxvf kafka_2.12-2.0.1.tgz
$ cd kafka_2.12-2.0.1
$ vim config/server.properties

server.properties配置说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 唯一标识一个broker.
broker.id=1
#绑定服务监听的地址和端口,要填写hostname -i 出来的地址,否则可能会绑定到127.0.0.1,producer可能会发不出消息
listeners=PLAINTEXT://192.168.26.135:9092
#broker对producers和consumers服务的地址和端口,如果没有配置,使用listeners的配置,本文没有配置该项
#advertised.listeners=PLAINTEXT://your.host.name:9092
# 处理网络请求的线程数
num.network.threads=3
# 处理磁盘I/O的线程数
num.io.threads=8
# socket server的发送buffer大小 (SO_SNDBUF)
socket.send.buffer.bytes=102400
# socket server的接收buffer大小 (SO_RCVBUF)
socket.receive.buffer.bytes=102400
#一个请求的最大size,用来保护防止oom
socket.request.max.bytes=104857600

#存放日志和消息的目录,可以是用逗号分开的目录,同样不推荐使用/tmp
log.dirs=/system/kafka/kafka-logs
#每个topic默认partitions的数量,数量较大表示消费者可以有更大的并行度。
num.partitions=2
# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1
#日志的过期时间,超过后被删除,单位小时
log.retention.hours=168
#一个日志文件最大大小,超过会新建一个文件
log.segment.bytes=1073741824
#根据过期策略检查过期文件的时间间隔,单位毫秒
log.retention.check.interval.ms=300000

#Zookeeper的连接配置,用逗号隔开,也可以用192.168.26.135:2181/kakfa这样的方式指定kafka数据在zk中的根目录
zookeeper.connect=192.168.26.135:2181,192.168.26.136:2181,192.168.26.137:2181
# 连接zk的超时时间
zookeeper.connection.timeout.ms=6000

主要配置文件为server.properties,对于producer和consumer分别有producer.properties和consumer.properties,但是一般不需要单独配置,可以从server.properties中读取。

启动各节点,分发此配置文件,修改broker.id和listeners地址,建立相应的目录。

1
2
3
[root@k8s1 ~]# ./bin/kafka-server-start.sh -daemon config/server.properties
[root@k8s2 ~]# ./bin/kafka-server-start.sh -daemon config/server.properties
[root@k8s3 ~]# ./bin/kafka-server-start.sh -daemon config/server.properties

-daemon放在后台运行。

验证是否成功

创建一个topic名为test

1
2
3
[root@k8s1 ~]# cd /system/kafka/kafka_2.12-2.0.1
[root@k8s1 ~]# ./bin/kafka-topics.sh --create --zookeeper 172.23.8.144:2181 --replication-factor 3 --partitions 1 --topic test
Created topic "test"

producer发送消息,ctrl+c终止

1
2
3
4
[root@k8s2 ~]# cd /system/kafka/kafka_2.12-2.0.1
[root@k8s2 ~]# ./bin/kafka-console-producer.sh --broker-list 192.168.26.137:9092 --topic test
1111
2222

消费者接收消息

1
2
3
4
[root@k8s3 ~]# cd /system/kafka/kafka_2.12-2.0.1
[root@k8s3 ~]# ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.26.135:9092 --topic test --from-beginning
1111
2222

继续发送消息则在消费者终端会一直出现新产生的消息。至此,kafka集群搭建成功