Java教程

Spring Assistant框架搭建消息队列写入Kafka消费Windows单机测试

本文主要是介绍Spring Assistant框架搭建消息队列写入Kafka消费Windows单机测试,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

前提:已安装Java8、Maven

一、在Idea中创建Spring Assistant项目,选中web、Apache Kafka、lombok

 

 

 

二、导入Pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.3</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.company</groupId>
    <artifactId>gmall-logger</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>gmall-logger</name>
    <description>gmall-logger</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

三、创建class文件

package com.company.gmalllogger.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
//@Controller
@RestController // =@Controller+@ResponseBody
public class LoggerController {

    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    @RequestMapping("test1")
//    @ResponseBody //返回普通的java对象
    public String test1(){
        System.out.println("success");
        return "success";
//        return "index.html"; //静态页面展示
    }

    //方法2:带参查询
    @RequestMapping("test2")
    public String test2(@RequestParam("name") String nn,
                        @RequestParam("age") int age){
        System.out.println(nn + ":" + age  );
        return "success";
    }

    //方法3:带参查询且给定age默认值
    @RequestMapping("test3")
    public String test3(@RequestParam("name") String nn,
                        @RequestParam(value = "age",defaultValue = "20") int age){
        System.out.println(nn + ":" + age  );
        return "success";
    }


    //对接Kafka
    @RequestMapping("applog")
    public String gerLog(@RequestParam("param") String logStr){
        System.out.println(logStr);
        //将行为数据保存至日志文件并打印到控制台
        log.info(logStr);

        //将数据写入Kafka,主题是ods_base_log
        kafkaTemplate.send("ods_base_log",logStr);
        return "success";

    }
}

当你想要测试静态页面时,在resource.static目录下创建index.html文件并写入

<!DCOTYPE html>
<html>
<h1>公司名称</h1>
<h2>大数据</h2>>
<h3>静态页面的展示</h3>>
</html>

然后在浏览器中输入:localhost:8080/test1即可查看

四、配置application.properties文件

# 应用名称
spring.application.name=gmall-logger

# 应用服务 WEB 访问端口
server.port=8081

#============== kafka ===================
# 指定kafka 代理地址,可以多个
spring.kafka.bootstrap-servers=hadoop201:9092

# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

五、配置logback.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <property name="LOG_HOME" value="d:/opt/module/logs" /> //此路径是日志写入到本地的路径,可自行更改
    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%msg%n</pattern>
        </encoder>
    </appender>

    <appender name="rollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${LOG_HOME}/app.log</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${LOG_HOME}/app.%d{yyyy-MM-dd}.log</fileNamePattern>
        </rollingPolicy>
        <encoder>
            <pattern>%msg%n</pattern>
        </encoder>
    </appender>

    <!-- 将某一个包下日志单独打印日志 -->
    <logger name="com.company.gmalllogger.controller.LoggerController" //注意这里需要根据自身情况来写入
            level="INFO" additivity="false">
        <appender-ref ref="rollingFile" />
        <appender-ref ref="console" />
    </logger>

    <root level="error" additivity="false">
        <appender-ref ref="console" />
    </root>
</configuration>

logger name处的写入方式:

 

 

 六、在虚拟机hadoop201中配置

前提:

环境搭建:

java8、zookeeper集群、kafka集群

上传生产数据的jar包和application.yml文件到/opt/module/gmall-logger/rt_log/目录下

 

 

1.运行zookeeper集群

/bin/zksh.sh start

2.运行kafka集群

bin/kafka-server-start.sh --zookeeper hadoop201:2181 config/server.properties &

3.查看kafka主题

bin/kafka-topics.sh --zookeeper hadoop102:2181 --list

4.创建topic

bin/kafka-topics.sh --zookeeper hadoop102:2181 \
--create --replication-factor 3 --partitions 1 --topic ods_base_log

选项说明:

--topic 定义topic名

--replication-factor  定义副本数

--partitions  定义分区数

5.删除topic

bin/kafka-topics.sh --zookeeper hadoop102:2181 \
--delete --topic ods_base_log

需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。

6.发送消息

bin/kafka-console-producer.sh \
--broker-list hadoop102:9092 --topic ods_base_log
>hello world
>kafka kafka

7.消费消息

bin/kafka-console-consumer.sh \
--zookeeper hadoop102:2181 --from-beginning --topic ods_base_log

--from-beginning:会把ods_base_log主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。

8.查看某个topic详情

bin/kafka-topics.sh --zookeeper hadoop102:2181 \
--describe --topic ods_base_log

七、修改application.yml文件

 

ip地址需改成自己IPV4的真实地址

八、

  • 运行Windows上的Idea程序LoggerApplication
  • 运行rt_applog下的jar包
  • 启动kafka消费者进行测试
    bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic ods_base_log

 九、结果

1.jar包运行造数据:

 

2.kafka消费者消费数据

 

3.Idea控制台打印数据

 

 完毕。

 

这篇关于Spring Assistant框架搭建消息队列写入Kafka消费Windows单机测试的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!