首发:运维有术
今天分享的主题是:不使用 Helm、Operator,如何在 K8s 集群上手工部署一个开启 SASL 认证的 KRaft 模式的 Kafka 集群?
本文,我将为您提供一份全面的实战指南,逐步引导您完成以下关键任务:
通过本文的指导,您将掌握在 Kubernetes 上部署 KRaft 模式 Kafka 集群的必备技能。
实战服务器配置(架构1:1复刻小规模生产环境,配置略有不同)
主机名 | IP | CPU | 内存 | 系统盘 | 数据盘 | 用途 |
---|---|---|---|---|---|---|
ksp-control-1 | 192.168.9.121 | 4 | 8 | 40 | 100 | KubeSphere/k8s-control-plane |
ksp-control-2 | 192.168.9.122 | 4 | 8 | 40 | 100 | KubeSphere/k8s-control-plane |
ksp-control-3 | 192.168.9.123 | 4 | 8 | 40 | 100 | KubeSphere/k8s-control-plane |
ksp-worker-1 | 192.168.9.124 | 8 | 16 | 40 | 100 | k8s-worker/CI |
ksp-worker-2 | 192.168.9.125 | 8 | 16 | 40 | 100 | k8s-worker |
ksp-worker-3 | 192.168.9.126 | 8 | 16 | 40 | 100 | k8s-worker |
ksp-storage | 192.168.9.127 | 2 | 4 | 40 | 100 | NFS Storage |
合计 | 3 | 38 | 76 | 280 | 700 |
实战环境涉及软件版本信息
目前在 K8s 集群部署 Kafka 的主流方案有以下几种:
往期我们实战演练过,如何使用 Helm 部署 Kafka 集群,具体内容可以参考KubeSphere 部署 Kafka 集群实战指南。本文我们使用手撸资源配置清单的方式部署 Kafka 集群。
资源配置清单规划如下:
本实战环境使用 NFS 作为 K8s 集群的持久化存储,新集群可以参考探索 Kubernetes 持久化存储之 NFS 终极实战指南 部署 NFS 存储。
Kafka 集群所有资源部署在命名空间 opsxlab
内。
明文密码必须使用 base64 加密,echo -n "PleaseChangeMe" | base64 -w0
,生产环境请生成不同的密码。
请使用 vi
编辑器,创建资源清单文件 kafka-sasl-passwords-secret.yaml
,并输入以下内容:
kind: Secret apiVersion: v1 metadata: name: kafka-sasl-passwords labels: app.kubernetes.io/instance: app-kafka app.kubernetes.io/name: kafka data: client-passwords: UGxlYXNlQ2hhbmdlTWU= controller-password: UGxlYXNlQ2hhbmdlTWU= inter-broker-password: UGxlYXNlQ2hhbmdlTWU= type: Opaque
使用下面的命令,创建一个临时 Pod,生成 UUID 后自动删除。
$ kubectl run app-kafka-client --rm -i --image registry.opsxlab.cn:8443/bitnami/kafka:3.6.2 -n opsxlab -- /opt/bitnami/kafka/bin/kafka-storage.sh random-uuid RpOTPIfMRTiPpmCYJHF9KQ
将生成的明文 UUID 使用 base64 加密,echo -n "RpOTPIfMRTiPpmCYJHF9KQ" | base64 -w0
。
请使用 vi
编辑器,创建资源清单文件 kafka-kraft-cluster-id.yaml
,并输入以下内容:
kind: Secret apiVersion: v1 metadata: name: kafka-kraft-cluster-id labels: app.kubernetes.io/instance: app-kafka app.kubernetes.io/name: kafka data: kraft-cluster-id: UnBPVFBJZk1SVGlQcG1DWUpIRjlLUQ== type: Opaque
执行下面的命令,创建资源。
kubectl apply -f kafka-sasl-passwords-secret.yaml -n opsxlab kubectl apply -f kafka-kraft-cluster-id.yaml -n opsxlab
执行下面的命令,查看创建结果。
$ kubectl get secret -n opsxlab NAME TYPE DATA AGE kafka-kraft-cluster-id Opaque 1 5s kafka-sasl-passwords Opaque 3 6s
服务规划说明:
请使用 vi
编辑器,创建资源清单文件 kafka-controller-headless.yaml
,并输入以下内容:
kind: Service apiVersion: v1 metadata: name: kafka-controller-hs labels: app.kubernetes.io/instance: app-kafka app.kubernetes.io/name: kafka spec: ports: - name: tcp-internal protocol: TCP port: 9092 targetPort: internal - name: tcp-controller protocol: TCP port: 9093 targetPort: controller selector: app.kubernetes.io/instance: app-kafka clusterIP: None type: ClusterIP
请使用 vi
编辑器,创建资源清单文件 kafka-controller-0-external.yaml
,并输入以下内容:
kind: Service apiVersion: v1 metadata: name: kafka-controller-0-external labels: app.kubernetes.io/instance: app-kafka app.kubernetes.io/name: kafka spec: ports: - name: tcp-external protocol: TCP port: 9094 targetPort: 9094 nodePort: 31211 selector: app.kubernetes.io/instance: app-kafka app.kubernetes.io/name: kafka-controller-0 type: NodePort
请使用 vi
编辑器,创建资源清单文件 kafka-controller-1-external.yaml
,并输入以下内容:
kind: Service apiVersion: v1 metadata: name: kafka-controller-1-external labels: app.kubernetes.io/instance: app-kafka app.kubernetes.io/name: kafka spec: ports: - name: tcp-external protocol: TCP port: 9094 targetPort: 9094 nodePort: 31212 selector: app.kubernetes.io/instance: app-kafka app.kubernetes.io/name: kafka-controller-1 type: NodePort
请使用 vi
编辑器,创建资源清单文件 kafka-controller-2-external.yaml
,并输入以下内容:
kind: Service apiVersion: v1 metadata: name: kafka-controller-2-external labels: app.kubernetes.io/instance: app-kafka app.kubernetes.io/name: kafka spec: ports: - name: tcp-external protocol: TCP port: 9094 targetPort: 9094 nodePort: 31213 selector: app.kubernetes.io/instance: app-kafka app.kubernetes.io/name: kafka-controller-2 type: NodePort
执行下面的命令,创建资源。
kubectl apply -f kafka-controller-headless.yaml -n opsxlab kubectl apply -f kafka-controller-0-external.yaml -n opsxlab kubectl apply -f kafka-controller-1-external.yaml -n opsxlab kubectl apply -f kafka-controller-2-external.yaml -n opsxlab
执行下面的命令,查看创建结果。
$ kubectl get svc -n opsxlab NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE kafka-controller-0-external NodePort 10.233.1.92 <none> 9094:31211/TCP 8s kafka-controller-1-external NodePort 10.233.18.62 <none> 9094:31212/TCP 8s kafka-controller-2-external NodePort 10.233.38.37 <none> 9094:31213/TCP 8s kafka-controller-hs ClusterIP None <none> 9092/TCP,9093/TCP 8s
使用 StatefulSet 部署 Kafka 集群,3个 Kafka 节点使用内容大部分相同的配置文件,必须修改的参数如下:
请使用 vi
编辑器,创建资源清单文件 kafka-controller-0-sts.yaml
,并输入以下内容:
kind: StatefulSet apiVersion: apps/v1 metadata: name: kafka-controller-0 labels: app.kubernetes.io/instance: app-kafka app.kubernetes.io/name: kafka-controller-0 spec: replicas: 1 selector: matchLabels: app.kubernetes.io/instance: app-kafka app.kubernetes.io/name: kafka-controller-0 template: metadata: labels: app.kubernetes.io/instance: app-kafka app.kubernetes.io/name: kafka-controller-0 spec: containers: - name: kafka image: 'registry.opsxlab.cn:8443/bitnami/kafka:3.6.2' ports: - name: intelrnal containerPort: 9092 protocol: TCP - name: controller containerPort: 9093 protocol: TCP - name: external containerPort: 9094 protocol: TCP env: - name: BITNAMI_DEBUG value: 'false' - name: HOST_IP valueFrom: fieldRef: apiVersion: v1 fieldPath: status.hostIP - name: KAFKA_HEAP_OPTS value: '-Xmx2048m -Xms1024m' - name: KAFKA_KRAFT_CLUSTER_ID valueFrom: secretKeyRef: name: kafka-kraft-cluster-id key: kraft-cluster-id - name: KAFKA_CLIENT_USERS value: user1 - name: KAFKA_CLIENT_PASSWORDS valueFrom: secretKeyRef: name: kafka-sasl-passwords key: client-passwords - name: KAFKA_INTER_BROKER_USER value: inter_broker_user - name: KAFKA_INTER_BROKER_PASSWORD valueFrom: secretKeyRef: name: kafka-sasl-passwords key: inter-broker-password - name: KAFKA_CONTROLLER_USER value: controller_user - name: KAFKA_CONTROLLER_PASSWORD valueFrom: secretKeyRef: name: kafka-sasl-passwords key: controller-password - name: KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL value: PLAIN - name: KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL value: PLAIN - name: KAFKA_CFG_NODE_ID value: '0' - name: KAFKA_CFG_PROCESS_ROLES value: 'controller,broker' - name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERS value: >- 0@kafka-controller-0-0.kafka-controller-hs.opsxlab.svc.cluster.local:9093,1@kafka-controller-1-0.kafka-controller-hs.opsxlab.svc.cluster.local:9093,2@kafka-controller-2-0.kafka-controller-hs.opsxlab.svc.cluster.local:9093 - name: KAFKA_CFG_LISTENERS value: 'INTERNAL://:9092,CONTROLLER://:9093,EXTERNAL://:9094' - name: KAFKA_CFG_ADVERTISED_LISTENERS value: >- INTERNAL://kafka-controller-0-0.kafka-controller-hs.opsxlab.svc.cluster.local:9092,EXTERNAL://192.168.9.121:31211 - name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP value: >- INTERNAL:SASL_PLAINTEXT,CONTROLLER:SASL_PLAINTEXT,EXTERNAL:SASL_PLAINTEXT - name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES value: CONTROLLER - name: KAFKA_CFG_INTER_BROKER_LISTENER_NAME value: INTERNAL - name: KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR value: '3' - name: KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR value: '3' - name: KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR value: '2' resources: limits: cpu: '1' memory: 2Gi requests: cpu: 50m memory: 512Mi volumeMounts: - name: data mountPath: /bitnami/kafka livenessProbe: exec: command: - pgrep - '-f' - kafka initialDelaySeconds: 10 timeoutSeconds: 5 periodSeconds: 10 successThreshold: 1 failureThreshold: 3 readinessProbe: tcpSocket: port: controller initialDelaySeconds: 5 timeoutSeconds: 5 periodSeconds: 10 successThreshold: 1 failureThreshold: 6 terminationMessagePath: /dev/termination-log terminationMessagePolicy: File imagePullPolicy: IfNotPresent restartPolicy: Always terminationGracePeriodSeconds: 30 dnsPolicy: ClusterFirst affinity: podAntiAffinity: preferredDuringSchedulingIgnoredDuringExecution: - weight: 1 podAffinityTerm: labelSelector: matchLabels: app.kubernetes.io/instance: app-kafka app.kubernetes.io/name: kafka topologyKey: kubernetes.io/hostname volumeClaimTemplates: - kind: PersistentVolumeClaim apiVersion: v1 metadata: name: data spec: accessModes: - ReadWriteOnce resources: requests: storage: 10Gi storageClassName: nfs-sc volumeMode: Filesystem serviceName: kafka-controller-hs
请使用 vi
编辑器,创建资源清单文件 kafka-controller-1-sts.yaml
,并输入以下内容:
kind: StatefulSet apiVersion: apps/v1 metadata: name: kafka-controller-1 labels: app.kubernetes.io/instance: app-kafka app.kubernetes.io/name: kafka-controller-1 spec: replicas: 1 selector: matchLabels: app.kubernetes.io/instance: app-kafka app.kubernetes.io/name: kafka-controller-1 template: metadata: labels: app.kubernetes.io/instance: app-kafka app.kubernetes.io/name: kafka-controller-1 spec: containers: - name: kafka image: 'registry.opsxlab.cn:8443/bitnami/kafka:3.6.2' ports: - name: intelrnal containerPort: 9092 protocol: TCP - name: controller containerPort: 9093 protocol: TCP - name: external containerPort: 9094 protocol: TCP env: - name: BITNAMI_DEBUG value: 'false' - name: HOST_IP valueFrom: fieldRef: apiVersion: v1 fieldPath: status.hostIP - name: KAFKA_HEAP_OPTS value: '-Xmx2048m -Xms1024m' - name: KAFKA_KRAFT_CLUSTER_ID valueFrom: secretKeyRef: name: kafka-kraft-cluster-id key: kraft-cluster-id - name: KAFKA_CLIENT_USERS value: user1 - name: KAFKA_CLIENT_PASSWORDS valueFrom: secretKeyRef: name: kafka-sasl-passwords key: client-passwords - name: KAFKA_INTER_BROKER_USER value: inter_broker_user - name: KAFKA_INTER_BROKER_PASSWORD valueFrom: secretKeyRef: name: kafka-sasl-passwords key: inter-broker-password - name: KAFKA_CONTROLLER_USER value: controller_user - name: KAFKA_CONTROLLER_PASSWORD valueFrom: secretKeyRef: name: kafka-sasl-passwords key: controller-password - name: KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL value: PLAIN - name: KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL value: PLAIN - name: KAFKA_CFG_NODE_ID value: '1' - name: KAFKA_CFG_PROCESS_ROLES value: 'controller,broker' - name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERS value: >- 0@kafka-controller-0-0.kafka-controller-hs.opsxlab.svc.cluster.local:9093,1@kafka-controller-1-0.kafka-controller-hs.opsxlab.svc.cluster.local:9093,2@kafka-controller-2-0.kafka-controller-hs.opsxlab.svc.cluster.local:9093 - name: KAFKA_CFG_LISTENERS value: 'INTERNAL://:9092,CONTROLLER://:9093,EXTERNAL://:9094' - name: KAFKA_CFG_ADVERTISED_LISTENERS value: >- INTERNAL://kafka-controller-1-0.kafka-controller-hs.opsxlab.svc.cluster.local:9092,EXTERNAL://192.168.9.121:31212 - name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP value: >- INTERNAL:SASL_PLAINTEXT,CONTROLLER:SASL_PLAINTEXT,EXTERNAL:SASL_PLAINTEXT - name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES value: CONTROLLER - name: KAFKA_CFG_INTER_BROKER_LISTENER_NAME value: INTERNAL - name: KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR value: '3' - name: KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR value: '3' - name: KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR value: '2' resources: limits: cpu: '1' memory: 2Gi requests: cpu: 50m memory: 512Mi volumeMounts: - name: data mountPath: /bitnami/kafka livenessProbe: exec: command: - pgrep - '-f' - kafka initialDelaySeconds: 10 timeoutSeconds: 5 periodSeconds: 10 successThreshold: 1 failureThreshold: 3 readinessProbe: tcpSocket: port: controller initialDelaySeconds: 5 timeoutSeconds: 5 periodSeconds: 10 successThreshold: 1 failureThreshold: 6 terminationMessagePath: /dev/termination-log terminationMessagePolicy: File imagePullPolicy: IfNotPresent restartPolicy: Always terminationGracePeriodSeconds: 30 dnsPolicy: ClusterFirst affinity: podAntiAffinity: preferredDuringSchedulingIgnoredDuringExecution: - weight: 1 podAffinityTerm: labelSelector: matchLabels: app.kubernetes.io/instance: app-kafka app.kubernetes.io/name: kafka topologyKey: kubernetes.io/hostname volumeClaimTemplates: - kind: PersistentVolumeClaim apiVersion: v1 metadata: name: data spec: accessModes: - ReadWriteOnce resources: requests: storage: 10Gi storageClassName: nfs-sc volumeMode: Filesystem serviceName: kafka-controller-hs
请使用 vi
编辑器,创建资源清单文件 kafka-controller-2-sts.yaml
,并输入以下内容:
kind: StatefulSet apiVersion: apps/v1 metadata: name: kafka-controller-2 labels: app.kubernetes.io/instance: app-kafka app.kubernetes.io/name: kafka-controller-2 spec: replicas: 1 selector: matchLabels: app.kubernetes.io/instance: app-kafka app.kubernetes.io/name: kafka-controller-2 template: metadata: labels: app.kubernetes.io/instance: app-kafka app.kubernetes.io/name: kafka-controller-2 spec: containers: - name: kafka image: 'registry.opsxlab.cn:8443/bitnami/kafka:3.6.2' ports: - name: intelrnal containerPort: 9092 protocol: TCP - name: controller containerPort: 9093 protocol: TCP - name: external containerPort: 9094 protocol: TCP env: - name: BITNAMI_DEBUG value: 'false' - name: HOST_IP valueFrom: fieldRef: apiVersion: v1 fieldPath: status.hostIP - name: KAFKA_HEAP_OPTS value: '-Xmx2048m -Xms1024m' - name: KAFKA_KRAFT_CLUSTER_ID valueFrom: secretKeyRef: name: kafka-kraft-cluster-id key: kraft-cluster-id - name: KAFKA_CLIENT_USERS value: user1 - name: KAFKA_CLIENT_PASSWORDS valueFrom: secretKeyRef: name: kafka-sasl-passwords key: client-passwords - name: KAFKA_INTER_BROKER_USER value: inter_broker_user - name: KAFKA_INTER_BROKER_PASSWORD valueFrom: secretKeyRef: name: kafka-sasl-passwords key: inter-broker-password - name: KAFKA_CONTROLLER_USER value: controller_user - name: KAFKA_CONTROLLER_PASSWORD valueFrom: secretKeyRef: name: kafka-sasl-passwords key: controller-password - name: KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL value: PLAIN - name: KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL value: PLAIN - name: KAFKA_CFG_NODE_ID value: '2' - name: KAFKA_CFG_PROCESS_ROLES value: 'controller,broker' - name: KAFKA_CFG_CONTROLLER_QUORUM_VOTERS value: >- 0@kafka-controller-0-0.kafka-controller-hs.opsxlab.svc.cluster.local:9093,1@kafka-controller-1-0.kafka-controller-hs.opsxlab.svc.cluster.local:9093,2@kafka-controller-2-0.kafka-controller-hs.opsxlab.svc.cluster.local:9093 - name: KAFKA_CFG_LISTENERS value: 'INTERNAL://:9092,CONTROLLER://:9093,EXTERNAL://:9094' - name: KAFKA_CFG_ADVERTISED_LISTENERS value: >- INTERNAL://kafka-controller-2-0.kafka-controller-hs.opsxlab.svc.cluster.local:9092,EXTERNAL://192.168.9.121:31213 - name: KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP value: >- INTERNAL:SASL_PLAINTEXT,CONTROLLER:SASL_PLAINTEXT,EXTERNAL:SASL_PLAINTEXT - name: KAFKA_CFG_CONTROLLER_LISTENER_NAMES value: CONTROLLER - name: KAFKA_CFG_INTER_BROKER_LISTENER_NAME value: INTERNAL - name: KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR value: '3' - name: KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR value: '3' - name: KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR value: '2' resources: limits: cpu: '1' memory: 2Gi requests: cpu: 50m memory: 512Mi volumeMounts: - name: data mountPath: /bitnami/kafka livenessProbe: exec: command: - pgrep - '-f' - kafka initialDelaySeconds: 10 timeoutSeconds: 5 periodSeconds: 10 successThreshold: 1 failureThreshold: 3 readinessProbe: tcpSocket: port: controller initialDelaySeconds: 5 timeoutSeconds: 5 periodSeconds: 10 successThreshold: 1 failureThreshold: 6 terminationMessagePath: /dev/termination-log terminationMessagePolicy: File imagePullPolicy: IfNotPresent restartPolicy: Always terminationGracePeriodSeconds: 30 dnsPolicy: ClusterFirst affinity: podAntiAffinity: preferredDuringSchedulingIgnoredDuringExecution: - weight: 1 podAffinityTerm: labelSelector: matchLabels: app.kubernetes.io/instance: app-kafka app.kubernetes.io/name: kafka topologyKey: kubernetes.io/hostname volumeClaimTemplates: - kind: PersistentVolumeClaim apiVersion: v1 metadata: name: data spec: accessModes: - ReadWriteOnce resources: requests: storage: 10Gi storageClassName: nfs-sc volumeMode: Filesystem serviceName: kafka-controller-hs
执行下面的命令,创建资源。
kubectl apply -f kafka-controller-0-sts.yaml -n opsxlab kubectl apply -f kafka-controller-1-sts.yaml -n opsxlab kubectl apply -f kafka-controller-2-sts.yaml -n opsxlab
执行下面的命令,查看创建结果(初次创建比较慢)。
$ kubectl get sts,pod -n opsxlab NAME READY AGE statefulset.apps/kafka-controller-0 1/1 25s statefulset.apps/kafka-controller-1 1/1 25s statefulset.apps/kafka-controller-2 1/1 24s NAME READY STATUS RESTARTS AGE pod/kafka-controller-0-0 1/1 Running 0 24s pod/kafka-controller-1-0 1/1 Running 0 24s pod/kafka-controller-2-0 1/1 Running 0 23s
分别在 k8s 集群内和集群外验证 Kafka 服务的可用性。
kubectl run opsxlab-kafka-client --restart='Never' --image registry.opsxlab.cn:8443/bitnami/kafka:3.6.2 --namespace opsxlab --command -- sleep infinity
cat << EOF > /tmp/client.properties security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-256 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user1" password="PleaseChangeMe"; EOF
app-kafka-client
内部kubectl cp --namespace opsxlab /tmp/client.properties opsxlab-kafka-client:/tmp/client.properties
kubectl exec --tty -i opsxlab-kafka-client --namespace opsxlab -- bash
kafka-topics.sh --bootstrap-server kafka-controller-0-0.kafka-controller-hs.opsxlab.svc.cluster.local:9092 --create --topic test-topic --partitions 3 --replication-factor 3 --command-config /tmp/client.properties
$ kafka-topics.sh --bootstrap-server kafka-controller-0-0.kafka-controller-hs.opsxlab.svc.cluster.local:9092 --topic test-topic --describe --command-config /tmp/client.properties Topic: test-topic TopicId: yNWQQ6yKSBeLmvVUFf2IVw PartitionCount: 3 ReplicationFactor: 3 Configs: Topic: test-topic Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 Topic: test-topic Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0 Topic: test-topic Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
kafka-console-producer.sh \ --broker-list kafka-controller-0-0.kafka-controller-hs.opsxlab.svc.cluster.local:9092 \ --topic test-topic --producer.config /tmp/client.properties
再打开一个终端,然后再执行下面的命令。
kafka-console-consumer.sh \ --bootstrap-server kafka-controller-0-0.kafka-controller-hs.opsxlab.svc.cluster.local:9092 \ --topic test-topic \ --from-beginning --consumer.config /tmp/client.properties
在生产者一侧随便输入测试数据,观察消费者一侧是否正确收到信息。
生产者侧:
I have no name!@opsxlab-kafka-client:/$ kafka-console-producer.sh \ --broker-list kafka-controller-0-0.kafka-controller-hs.opsxlab.svc.cluster.local:9092 \ --topic test-topic --producer.config /tmp/client.properties >cluster kafka test 1 >cluster kafka test 2 >cluster kafka test 3
消费者侧:
I have no name!@opsxlab-kafka-client:/$ kafka-console-consumer.sh \ --bootstrap-server kafka-controller-0-0.kafka-controller-hs.opsxlab.svc.cluster.local:9092 \ --topic test-topic \ --from-beginning --consumer.config /tmp/client.properties cluster kafka test 1 cluster kafka test 2 cluster kafka test 3
为了更严谨的测试 Kafka 在 K8s 集群外的可用性,我在 K8s 集群外找了一台机器,安装 JDK 和 Kafka。安装方式上 JDK 选择了 Yum 安装 openjdk
,Kafka 则选用了官方提供的3.9.0
最新版本的二进制包。
实际测试时还可以选择 Docker 镜像或是在 K8s 集群上再创建一个 Pod,测试时连接 K8s 节点的宿主机 IP 和 NodePort。
# 安装 JDK yum install java-1.8.0-openjdk # 下载 Kafka cd /srv wget https://downloads.apache.org/kafka/3.6.0/kafka_2.13-3.9.0.tgz # 解压 tar xvf kafka_2.13-3.9.0.tgz cd /srv/kafka_2.13-3.9.0/bin
本文使用一个 Master 节点,作为 Kafka NodePort 的 IP,实际使用中建议使用多个 Worker 节点,每个 Pod 对应一个 Worker节点IP。
下面测试的 Broker Server 地址使用 192.168.9.121:31211
cat << EOF > /tmp/client.properties security.protocol=SASL_PLAINTEXT sasl.mechanism=SCRAM-SHA-256 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="user1" password="PleaseChangeMe"; EOF
跟 K8s 集群内部验证测试过程一样,打开两个终端,运行生产者和消费者脚本。执行下面的命令验证测试(细节略过,直接上结果)。
外部生产者侧:
$ ./kafka-console-producer.sh --broker-list 192.168.9.121:31211 --topic test-topic --producer.config /tmp/client.properties >external kafka test 10 >external kafka test 20 >external kafka test 30
外部消费者侧:
$ ./kafka-console-consumer.sh --bootstrap-server 192.168.9.121:31211 --topic test-topic --from-beginning --consumer.config /tmp/client.properties external kafka test 10 external kafka test 20 external kafka test 30 cluster kafka test 1 cluster kafka test 2 cluster kafka test 3
注意: K8s 集群外部消费者能消费到所有数据,包括集群内部测试时生成的数据。
集群内消费者侧: 集群内的消费者,同样能获取外部生产者产生的数据。
I have no name!@opsxlab-kafka-client:/$ kafka-console-consumer.sh \ --bootstrap-server kafka-controller-0-0.kafka-controller-hs.opsxlab.svc.cluster.local:9092 \ --topic test-topic \ --from-beginning --consumer.config /tmp/client.properties cluster kafka test 1 cluster kafka test 2 cluster kafka test 3 external kafka test 10 external kafka test 20 external kafka test 30
./kafka-topics.sh --bootstrap-server 192.168.9.121:31211 --delete --topic test-topic --command-config /tmp/client.properties
./kafka-topics.sh --bootstrap-server 192.168.9.121:31211 --list --command-config /tmp/client.properties
以上,就是我今天分享的全部内容。
免责声明:
本文由博客一文多发平台 OpenWrite 发布!