kafka安装与基础应用


kafka概述

Apache Kafka 是一个开源分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序。

Kafka 结合了三个关键功能,因此您可以 使用 一个久经考验的解决方案来实现端到端的事件流用例

  1. 发布(写)和订阅(读)流事件,包括来自其他系统的数据的持续导入/导出的。
  2. 为了存储持久和可靠的事件流,只要你想要的。
  3. 在事件发生时或追溯性地处理事件流。

所有这些功能都是以分布式、高度可扩展、弹性、容错和安全的方式提供的。Kafka 可以部署在裸机硬件、虚拟机和容器上,也可以部署在本地和云端。您可以在自行管理 Kafka 环境和使用各种供应商提供的完全托管服务之间进行选择。

Kafka 是一个分布式系统,由通过高性能TCP 网络协议进行通信的服务器客户端组成。它可以部署在内部和云环境中的裸机硬件、虚拟机和容器上。

服务器:Kafka 作为一个或多个服务器的集群运行,这些服务器可以跨越多个数据中心或云区域。其中一些服务器形成存储层,称为代理。其他服务器运行 Kafka Connect以将数据作为事件流持续导入和导出,从而将 Kafka 与您现有的系统(例如关系数据库以及其他 Kafka 集群)集成。为了让您实现关键任务用例,Kafka 集群具有高度可扩展性和容错性:如果其中任何一个服务器出现故障,其他服务器将接管它们的工作,以确保连续运行而不会丢失任何数据。

客户端:它们允许您编写分布式应用程序和微服务,即使在网络问题或机器故障的情况下,它们也可以并行、大规模和容错方式读取、写入和处理事件流。Kafka 附带了一些这样的客户端,这些客户端由 Kafka 社区提供的数十个客户端进行了扩充 :客户端可用于 Java 和 Scala,包括更高级别的 Kafka Streams库,用于 Go、Python、C/C++ 和许多其他编程语言以及 REST API。

kafka安装

首先安装jdk

#jdk安装
wget https://download.oracle.com/java/17/latest/jdk-17_linux-x64_bin.tar.gz
#解压
tar -xf  jdk-17_linux-x64_bin.tar.gz -C /usr/local
sed -i '$a\export JAVA_HOME=/usr/local/jdk-17\nexport CLASSPATH=.:${JAVA_HOME}/jre/lib/rt.jar:${JAVA_HOME}/lib/dt.jar:${JAVA_HOME}/lib/tools.jar\nexport PATH=$PATH:${JAVA_HOME}/bin' /etc/profile
source /etc/profile
java --version

kafka官网 https://kafka.apache.org/

官网下载地址 [**https://downloads.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz**](https://downloads.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz)

wget  https://downloads.apache.org/kafka/3.0.0/kafka_2.13-3.0.0.tgz
tar -xf kafka_2.13-3.0.0.tgz -C /usr/local
cd /usr/local/ && mv kafka_2.13-3.0.0 kafka
#修改kafka配置文件
grep "^[a-Z]" /usr/local/kafka/config/server.properties 
broker.id=2    #id标识
listeners=PLAINTEXT://:9092   #监听地址
advertised.listeners=PLAINTEXT://:9092   #监听地址
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168   #保存小时默认7天
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.188.17:2181,192.168.188.18:2181,192.168.188.7:2181  #zookeeper集群地址也可写单个
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0

#后台启动配置文件kafka
#可以先输出一波日志 看是否有报错信息 无报错后转至守护进程模式
/usr/local/kafka/bin/kafka-server-start.sh  /usr/local/kafka/config/server.properties 
/usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties 
1.启动kafka
bin/kafka-server-start.sh -daemon config/server.properties

2.关闭kafka
bin/kafka-server-stop.sh

3.查看kafka topic是否支持集群,没反应就是正确
bin/kafka-topics.sh --describe --zookeeper 192.168.188.17:2181,192.168.188.18:2181,192.168.188.7:2181 --topic MyTopic

4.查看当前zookerrper下的kafka集群所有的topic
bin/kafka-topics.sh --list --zookeeper 192.168.188.17:2181,192.168.188.18:2181,192.168.188.7:2181

5.详细查看topic
bin/kafka-topics.sh --describe --zookeeper 192.168.188.17:2181,192.168.188.18:2181,192.168.188.7:2181 --topic topic名字

6.创建一个topic,副本备份数1个,分区数1个
bin/kafka-topics.sh --create --zookeeper 192.168.188.17:2181,192.168.188.18:2181,192.168.188.7:2181 --replication-factor 1 --partitions 1 --topic topic名字

7.通过list命令查看创建的topic
bin/kafka-topics.sh --list --zookeeper 192.168.188.17:2181,192.168.188.18:2181,192.168.188.7:2181

8.删除一个topic
bin/kafka-topics.sh --zookeeper 192.168.188.17:2181,192.168.188.18:2181,192.168.188.7:2181 --delete --topic topic名字

9.改变集群模式为主主,切换主从到主主模式,解决主从模式下,从从选举时间问题
bin/kafka-preferred-replica-election.sh --zookeeper 192.168.188.17:2181,192.168.188.18:2181,192.168.188.7:2181
#发送生产者测试信息
10.生产者
bin/kafka-console-producer.sh --broker-list 192.168.188.7:9092,192.168.188.17:9092,192.168.188.18:9092 --topic test777
#接收消费者测试信息
bin/kafka-console-consumer.sh --bootstrap-server 192.168.188.7:9092,192.168.188.17:9092,192.168.188.18:9092 --topic test777 --from-beginning

文章作者: 一抹忘忧
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 一抹忘忧 !
  目录