1.前言 |
|
中文文档地址:https://kafka.apachecn.org/ |
|
1.1定义 |
|
Apache Kafka® 是 一个分布式流处理平台 |
|
1.2基本原理 |
|
|
|
1.3名词解释 |
名词 |
Broker |
Topic |
Partition |
Producer |
Consumer |
ConsumerGroup |
|
2.安装Java环境 |
|
# 下载jdk1.8 |
https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html#license-lightbox |
|
# 解压 |
tar -zxvf jdk-8u281-linux-x64.tar.gz |
|
# 配置环境变量 |
vim /etc/profile |
|
# 增加以下配置 |
JAVA_HOME=/usr/local/java/jdk1.8.0_281 |
CLASSPATH=$JAVA_HOME/lib/ |
PATH=$PATH:$JAVA_HOME/bin |
export PATH JAVA_HOME CLASSPATH |
|
# 重载 |
source /etc/profile |
|
|
|
3.安装Kafka |
|
# 下载源码 |
https://www.apache.org/dyn/closer.cgi?path=/kafka/2.7.0/kafka_2.12-2.7.0.tgz |
|
# 解压 |
tar -zxvf kafka_2.12-2.7.0.tgz |
|
#启动 |
# 需先启动zookeeper |
# -daemon 可启动后台守护模式 |
# 如果你已经启动了zookeeper 就不用启动下面这 (kafka默认提供有zookeeper) |
bin/zookeeper-server-start.sh config/zookeeper.properties |
|
# 启动Kafka服务端 |
bin/kafka-server-start.sh config/server.properties |
|
# 启动kafka客户端测试 |
# 创建一个话题,test话题2个分区 |
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test |
Created topic "test". |
|
# 显示所有话题 |
bin/kafka-topics.sh --list --zookeeper localhost:2181 |
test |
|
# 显示话题信息 |
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test |
Topic:test PartitionCount:2 ReplicationFactor:1 Configs: |
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0 |
Topic: test Partition: 1 Leader: 0 Replicas: 0 Isr: 0 |
|
|
# 启动一个生产者(输入消息) |
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test |
[等待输入自己的内容 出现>输入即可] |
>i am a new msg ! |
>i am a good msg ? |
|
# 启动一个消费者(等待消息) |
# 注意这里的--from-beginning,每次都会从头开始读取,你可以尝试去掉和不去掉看下效果 |
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning |
[等待消息] |
i am a new msg ! |
i am a good msg ? |
|
4.安装RdKafka扩展 |
|
# 下载librdkafka |
git clone https://codechina.csdn.net/mirrors/edenhill/librdkafka.git |
|
# 编译安装 |
cd librdkafka/ |
./configure |
make && make install |
|
# 下载RdKafka扩展 |
https://pecl.php.net/get/rdkafka-5.0.0.tgz |
tar -zxvf rdkafka-5.0.0.tgz |
cd rdkafka-5.0.0.tgz |
|
# 编译安装 |
/usr/local/php/bin/phpize |
./configure --with-config=/usr/local/php/bin/php-config |
make && make install |
|
# php.ini 追加扩展,重启php-fpm |
extension=rdkafka.so |
systemctl restart php-fpm |
|
# 验证 |
php -m |
|
5.使用Kafka |
|
5.1生产(Producer) |
|
$config = new \RdKafka\Conf(); |
# 设置broker |
$config->set('metadata.broker.list', $this->brokerList); |
$producer = new \RdKafka\Producer($config); |
# 设置topic |
$topic = $producer->newTopic($topic); |
$topic->produce(RD_KAFKA_PARTITION_UA, 0, json_encode($message)); |
$producer->poll(0); |
$result = $producer->flush(10000); |
if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) { |
return false; |
} |
return true; |
|
5.2消费(Consumer) |
|
$conf = new \RdKafka\Conf(); |
$conf->set('group.id', $this->groupName); |
$conf->set('metadata.broker.list', $this->brokerList); |
$conf->set('auto.offset.reset', 'earliest'); |
$consumer = new \RdKafka\KafkaConsumer($conf); |
$consumer->subscribe([$this->topicName]); |
while (true) { |
$message = $topic->consume(0, 120*10000); |
if ($message->err != RD_KAFKA_RESP_ERR_NO_ERROR) { |
print("err: " . $message->err); |
print("errstr: " . $message->errstr()); |
} else { |
var_dump($message->payload); |
} |
} |
|
5.3项目中使用 |
|
# rocket-customer |
application/command/KafkaConsumerByCall.php |
application/command/KafkaConsumerByCustomer.php |
application/command/KafkaConsumerByMemberCall.php |
application/command/KafkaConsumerByProfile.php |