前提:已安装Java8、Maven
一、在Idea中创建Spring Assistant项目,选中web、Apache Kafka、lombok
二、导入Pom文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.5.3</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.company</groupId> <artifactId>gmall-logger</artifactId> <version>0.0.1-SNAPSHOT</version> <name>gmall-logger</name> <description>gmall-logger</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <excludes> <exclude> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </exclude> </excludes> </configuration> </plugin> </plugins> </build> </project>
三、创建class文件
package com.company.gmalllogger.controller; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; @Slf4j //@Controller @RestController // =@Controller+@ResponseBody public class LoggerController { @Autowired private KafkaTemplate<String,String> kafkaTemplate; @RequestMapping("test1") // @ResponseBody //返回普通的java对象 public String test1(){ System.out.println("success"); return "success"; // return "index.html"; //静态页面展示 } //方法2:带参查询 @RequestMapping("test2") public String test2(@RequestParam("name") String nn, @RequestParam("age") int age){ System.out.println(nn + ":" + age ); return "success"; } //方法3:带参查询且给定age默认值 @RequestMapping("test3") public String test3(@RequestParam("name") String nn, @RequestParam(value = "age",defaultValue = "20") int age){ System.out.println(nn + ":" + age ); return "success"; } //对接Kafka @RequestMapping("applog") public String gerLog(@RequestParam("param") String logStr){ System.out.println(logStr); //将行为数据保存至日志文件并打印到控制台 log.info(logStr); //将数据写入Kafka,主题是ods_base_log kafkaTemplate.send("ods_base_log",logStr); return "success"; } }
当你想要测试静态页面时,在resource.static目录下创建index.html文件并写入
<!DCOTYPE html> <html> <h1>公司名称</h1> <h2>大数据</h2>> <h3>静态页面的展示</h3>> </html>
然后在浏览器中输入:localhost:8080/test1即可查看
四、配置application.properties文件
# 应用名称 spring.application.name=gmall-logger # 应用服务 WEB 访问端口 server.port=8081 #============== kafka =================== # 指定kafka 代理地址,可以多个 spring.kafka.bootstrap-servers=hadoop201:9092 # 指定消息key和消息体的编解码方式 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
五、配置logback.xml文件
<?xml version="1.0" encoding="UTF-8"?> <configuration> <property name="LOG_HOME" value="d:/opt/module/logs" /> //此路径是日志写入到本地的路径,可自行更改 <appender name="console" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>%msg%n</pattern> </encoder> </appender> <appender name="rollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender"> <file>${LOG_HOME}/app.log</file> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <fileNamePattern>${LOG_HOME}/app.%d{yyyy-MM-dd}.log</fileNamePattern> </rollingPolicy> <encoder> <pattern>%msg%n</pattern> </encoder> </appender> <!-- 将某一个包下日志单独打印日志 --> <logger name="com.company.gmalllogger.controller.LoggerController" //注意这里需要根据自身情况来写入 level="INFO" additivity="false"> <appender-ref ref="rollingFile" /> <appender-ref ref="console" /> </logger> <root level="error" additivity="false"> <appender-ref ref="console" /> </root> </configuration>
logger name处的写入方式:
六、在虚拟机hadoop201中配置
前提:
环境搭建:
java8、zookeeper集群、kafka集群
上传生产数据的jar包和application.yml文件到/opt/module/gmall-logger/rt_log/目录下
1.运行zookeeper集群
/bin/zksh.sh start
2.运行kafka集群
bin/kafka-server-start.sh --zookeeper hadoop201:2181 config/server.properties &
3.查看kafka主题
bin/kafka-topics.sh --zookeeper hadoop102:2181 --list
4.创建topic
bin/kafka-topics.sh --zookeeper hadoop102:2181 \ --create --replication-factor 3 --partitions 1 --topic ods_base_log
选项说明:
--topic 定义topic名
--replication-factor 定义副本数
--partitions 定义分区数
5.删除topic
bin/kafka-topics.sh --zookeeper hadoop102:2181 \ --delete --topic ods_base_log
需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。
6.发送消息
bin/kafka-console-producer.sh \ --broker-list hadoop102:9092 --topic ods_base_log >hello world >kafka kafka
7.消费消息
bin/kafka-console-consumer.sh \ --zookeeper hadoop102:2181 --from-beginning --topic ods_base_log
--from-beginning:会把ods_base_log主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。
8.查看某个topic详情
bin/kafka-topics.sh --zookeeper hadoop102:2181 \ --describe --topic ods_base_log
七、修改application.yml文件
ip地址需改成自己IPV4的真实地址
八、
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic ods_base_log
九、结果
1.jar包运行造数据:
2.kafka消费者消费数据
3.Idea控制台打印数据
完毕。