kafka

发布时间 2023-04-19 19:05:32作者: Eddie-Lucas

安装kafka

  1. 首先安装jdk,zookeeper,kafka

将压缩包放进linux目录/opt/mySoftware下,并依次执行下面的命令

tar -zxvf jdk-8u361-linux-x64.tar.gz
tar -zxvf zookeeper-3.4.12.tar.gz
tar -zxvf kafka_2.11-2.0.0.tgz

解压后会生成文件夹jdk1.8.0_361``kafka_2.11-2.0.0 zookeeper-3.4.12

  1. 配置环境变量
vim /etc/profile

然后到文件最下方,添加下面的内容

# jdk
export JAVA_HOME=/opt/mySoftware/jdk1.8.0_361
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH
# zookeeper
export ZOOKEEPER_HOME=/opt/mySoftware/zookeeper-3.4.12
export PATH=$PATH:$ZOOKEEPER_HOME/bin
# kafka
export KAFKA_HOME=/opt/mySoftware/kafka_2.11-2.0.0
export PATH=$PATH:$KAFKA_HOME/bin

添加完成后保存并退出,然后执行命令使配置生效

source /etc/profile

验证是否成功

java -version

切换目录,创建tmp目录,打开配置目录,修改zookeeper配置文件名字

cd /opt/mySoftware/zookeeper-3.4.12
mkdir tmp
cd conf
cp zoo_sample.cfg zoo.cfg

打开zoo.cfg

vim zoo.cfg

修改/添加以下内容后,保存退出

# 数据目录
dataDir=/opt/mySoftware/zookeeper-3.4.12/tmp/data
# 日志目录
dataLogDir=/opt/mySoftware/zookeeper-3.4.12/tmp/log

创建zookeeper服务器编号

cd /opt/mySoftware/zookeeper-3.4.12/tmp
mkdir data
touch /opt/mySoftware/zookeeper-3.4.12/tmp/data/myid
vim /opt/mySoftware/zookeeper-3.4.12/tmp/data/myid

在myid文件中添加一个数字0

启动zookeeper

zkServer.sh start  启动服务
zkServer.sh status  查看服务状态
zkServer.sh stop  停止服务
zkCli.sh  客户端连接

然后修改kafka的server.properties

cd /opt/mySoftware/kafka_2.11-2.0.0/config
vim server.properties

修改这些内容

# broker的编号,如果集群中有多个broker,则每个broker的编号需要设置的不同
broker.id=0
# broker对外提供的服务入口地址,下面的ip是虚拟机的ip
listeners=PLAINTEXT://192.168.162.124:9092
# 存放消息日志文件的地址
log.dirs=/tmp/kafka-logs
# Kafka所需的ZooKeeper集群地址,为了方便演示,我们假设Kafka和ZooKeeper都安装在本机
zookeeper.connect=localhost:2181/kafka

启动kafka

/opt/mySoftware/kafka_2.11-2.0.0目录下执行下面的命令

# 前台启动
bin/kafka-server-start.sh config/server.properties
# 后台启动
bin/kafka-server-start.sh config/server.properties &
//关闭后台运行
bin/kafka-server-stop.sh

基本概念

生产者:Producer,发送消息的一方,将消息发送给kafka

broker:翻译过来是经纪人,代理人,通常情况下将它看作kafka实例

消费者:Consumer,接收消息的一方,从kafka上拉取消息

主题:Topic,kafka中,消息依据主题进行分类,生产者将消息发送到特定的主题,消费者从特定的主题接收消息

分区一个主题可以分为多个分区,一个分区只能属于一个主题。同一个主题下不同分区的消息是不同的,分区在存储层面可以看作一个可追加的日志,消息在追加到分区时会分配一个偏移量offset,offset是消息在分区中的唯一标识,kafka根据offset来保证消息在分区内的顺序性。offset不跨越分区,也就是说消息在分区内是有序的而不是主题有序的
image.png
分区多副本机制:在分区中,有多个副本,保存的是相同的消息,副本之间是一主多从的关系,leader副本负责处理读写请求,follower副本只负责和leader保持同步,副本处于不同的broker,当一个leader出现故障,会从follower中重新选出leader对外提供服务,这样就保证了在某个broker失效时仍然能保证服务可用
image.png
不仅broker会失效,consumer也会发生故障宕机,消费端也是具有一定的容灾能力的,当consummer拉取消息时,会保存拉取消息的位置(从哪里拉取的消息),当重新恢复后,会根据之前保存的位置重新拉取进行消费

AR,ISR,OSR:包括leader和follower在内的所有副本叫做AR(all),所有和leader保持一定程度同步的副本(包含leader)叫做ISR(in),与leader同步滞后过多的副本组成OSR(out),AR=ISR+OSR,正常情况下AR=ISR,即所有副本的同步程度都比较好,leader负责跟踪和维护这两个集合,如果ISR中的副本滞后过多就移除,如果OSR同步追上则加入到ISR,当leader故障重选leader时,只能在ISR中选

LSO,HW,LEO:HW是高水位,在这个标志之前的消息是可以被消费者拉取的,HW及之后的消息对消费者不可见
image.png
几个例子:
image.png
image.png
image.png
image.png
kafka的复制机制是介于同步复制和异步复制之间的一种均衡的状态,如果是同步复制,只有所有的follower都拷贝了leader的消息副本,这条消息才算发送成功,这种情况,效率很低;如果是单纯的异步复制,则当leader写入了消息就认为发送成功,如果follower还没来得及拷贝,leader就宕机,就会造成数据丢失

fdsf