1.情景展示
传统的消息中间件,如果我们想要应用到自己的系统当中,就必须在框架里面进行集成。
也就是说,必须将连接消息中间件的代码(如:生产者和消费者)嵌入到我们的web系统中,这就是所谓的硬编码。
这种硬编码的方式,侵入性强,开发成本相对较高,它要求开发人员不仅要关注业务,还要兼顾技术实现。
这也与我们开发降低系统耦合性(解耦)的原则相悖。
那能不能使消息的生产和消费脱离系统而存在呢?
2.具体分析
我们知道:kafka是可以当作消息中间件来使用的。
而kafka connect就能很好的满足我们的需求。
我们可以根据kafka connect自定义开发组件来实现数据的同步。
3.kafka connect
Kafka Connect是一个高伸缩性、高可靠性的数据集成工具,用于在Apache Kafka与其他系统间进行数据搬运以及执行ETL操作。
Kafka Connect主要由source connector和sink connector组成。事实上,几乎大部分的ETL框架都是由这两大类逻辑组件组成的,如Apache Flume、Kettle等。
source connector负责把输入数据从外部系统中导入到Kafka中,而sink connector则负责把输出数据导出到其他外部系统。
接口文档:https://kafka.apache.org/35/generated/connect_rest.yaml
启动
运行前提:
运行zookeeper和kafka。
启动kafka connect。
kafka connect的启动方式有两种:单独部署和集群部署。
启动文件说明:
windows版
单独部署:KAFKA_HOME/bin/windows/connect-standalone.bat
集群部署:KAFKA_HOME/bin/windows/connect-distributed.bat
linux版
单独部署:KAFKA_HOME/bin/connect-standalone.sh
集群部署:KAFKA_HOME/bin/connect-distributed.sh
配置文件说明:
单独部署配置文件:KAFKA_HOME/config/connect-standalone.properties
集群部署配置文件:KAFKA_HOME/config/connect-distributed.properties
以windows版的单独部署为例。
kafka connect配置文件说明
找到KAFKA_HOME/config目录下的connect-standalone.properties和connect-distributed.properties文件。
connect-standalone.properties:当只有一个kafka服务器时,我们需要使用这个配置文件。
connect-distributed.properties:当存在多个kafka服务器时(集群部署),我们需要使用这个配置文件。
以connect-standalone.properties为例:
说明:如果是connect-distributed.properties,这里可以配置多个broker地址,中间使用逗号隔开。
bootstrap.servers属性:指定broker服务器的地址,默认值为:localhost:9092。
offset.storage.file.filename属性:设置偏移量文件connect.offsets的存放地址,默认路径为:/tmp。
linux:connect.offsets存储在:/tmp目录下,windows:connect.offsets存在KAFKA_HOME所在磁盘的/tmp目录下。
当目录不存在时,会自动被创建。
offset.flush.interval.ms属性:偏移量刷新时间间隔,单位:毫秒,默认值为:10000毫秒(10秒)。
plugin.path属性:设置connect插件的存放路径,多个路径之间使用逗号隔开。
在启动kafka connect时,会自动从上面配置的路径,读取connect插件。
一般情况下,我们不在这里配置,而是将插件直接放在:KAFKA_HOME/plugins目录下。
plugin.path=/kfk/kafka_2.13-3.6.1/plugins
这样一来,当你使用connect-standalone.bat并且指定配置文件connect-standalone.properties时,kafka connect会自动读取此路径下的插件(Source Connector和Sink Connector)。
验证插件是否生效
http://localhost:8083/connector-plugins
listeners属性:设置的是kafka connect服务器的访问端口号,默认值是8083。
多个地址之间使用逗号隔开。
如何更改它的端口号呢?
listeners=HTTP://:8093
在此配置文件当中增加如上代码, 我们就把kafka connect的端口号改成8093啦。
启动kafka connect
前提:已经启动好了kafka server。
以只运行一个broker进行举例说明
windows操作步骤
broker的启动命令是:connect-standalone.bat。
文件所在路径:KAFKA_HOME/bin/windows
所需配置文件:connect-standalone.properties。
文件所在路径:KAFKA_HOME/config
同样打开黑窗口,运行以下指令:
connect-standalone.bat ../../config/connect-standalone.properties
当出现如上字样时,就说明:kafka connect服务启动成功了。
访问地址:
http://localhost:8083/
3.6.1是kafka服务器的版本号。
linux操作步骤
broker的启动命令是:connect-standalone.sh。
文件所在路径:KAFKA_HOME/bin
所需配置文件:connect-standalone.properties。
文件所在路径:KAFKA_HOME/config
切换到KAFKA_HOME/bin/windows目录下,输入cmd。
并按回车键打开黑窗口。
运行以下命令
connect-standalone.bat ../../config/connect-standalone.properties
当出现如上字样时,说明kafka connect服务启动成功了。
运行connect-standalone.bat