kafka的安装与配置请参考:https://blog.csdn.net/weixin_35757704/article/details/120488287
mytesttopic
,进入到kafka
的目录下,运行:./bin/kafka-topics.sh --create --topic mytesttopic --replication-factor 1 --partitions 1 --bootstrap-server localhost:9092
然后启动生产者:
./bin/kafka-console-producer.sh --topic mytesttopic --broker-list localhost:9092
pom.xml
:<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.12</artifactId><!--这个是kafka的版本--> <version>1.13.2</version><!--这个是flink的版本--> </dependency>
import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.Properties; public class KafkaStreamWordCount { public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "192.168.99.5:9092");// 这里是kafka的主机地址,可以是 域名:端口,也可是 ip:端口 properties.setProperty("group.id", "test");//第1个参数是固定值 group.id,第2个参数是自定义的组ID,这个可以自己指定 DeserializationSchema<String> deserializationSchema = new SimpleStringSchema(); String topic = "mytesttopic";// 哇!这里不要写错啊,这个是作为消费者接收的kafka对应的topic名称 DataStream<String> text = env.addSource(new FlinkKafkaConsumer<String>(topic, deserializationSchema, properties)); text.print(); env.execute("Flink-Kafka demo"); } }
比如我的虚拟机主机名是:ubuntu
,ip是:192.168.99.5
,就在host里添加:
192.168.99.5 ubuntu
注意即便properties.setProperty("bootstrap.servers", "192.168.99.5:9092");
这样使用ip:端口
配置也需要添加host!
xq@ubuntu:~/Desktop/software/kafka_2.12-2.7.1$ ./bin/kafka-console-producer.sh --topic myte--broker-list localhost:9092 >hello kafka >hello flink >
在flink中显示:
......... 16:52:44,055 INFO org.apache.kafka.clients.Metadata [] - [Consumer clientId=consumer-test-26, groupId=test] Cluster ID: HDij23gxR_edwXhIDqE9ng 16:52:44,056 INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator [] - [Consumer clientId=consumer-test-26, groupId=test] Discovered group coordinator ubuntu:9092 (id: 2147483647 rack: null) 16:52:44,075 INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [] - [Consumer clientId=consumer-test-26, groupId=test] Setting offset for partition mytesttopic-0 to the committed offset FetchPosition{offset=7, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=ubuntu:9092 (id: 0 rack: null), epoch=0}} 16> hello kafka 16> hello flink