1 import lombok.AllArgsConstructor; 2 import lombok.Data; 3 import lombok.NoArgsConstructor; 4 5 @Data 6 @NoArgsConstructor 7 @AllArgsConstructor 8 public class Sensor { 9 private String sensor_id; 10 private Long sensor_timeStamp; 11 private Double sensor_Temp; 12 }
1 import lombok.AllArgsConstructor; 2 import lombok.Data; 3 import lombok.NoArgsConstructor; 4 5 @Data 6 @AllArgsConstructor 7 @NoArgsConstructor 8 public class Shop { 9 private String uid; 10 private String type; 11 private String name; 12 private Integer num; 13 private Double price; 14 private Long time; 15 private Double total; 16 }
1 import java.util.HashMap; 2 import java.util.Properties; 3 4 //配置 工具类 5 public class PropsUtil { 6 static Properties producer_Props = new Properties(); 7 static Properties consumer_Props = new Properties(); 8 static HashMap<String, Object> kafka_Consumer = new HashMap<>(); 9 10 static{ 11 kafka_Consumer.put("bootstrap.servers","hadoop106:9092,hadoop107:9092,hadoop108:9092"); 12 kafka_Consumer.put("group.id", "test"); 13 //from beginning 14 kafka_Consumer.put("auto.offset.reset","earliest"); 15 kafka_Consumer.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); 16 kafka_Consumer.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 17 18 producer_Props.setProperty("bootstrap.servers","hadoop106:9092,hadoop107:9092,hadoop108:9092"); 19 producer_Props.setProperty("ack","all"); 20 producer_Props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); 21 producer_Props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 22 producer_Props.put("auto.offset.reset","earliest"); 23 24 consumer_Props.setProperty("bootstrap.servers","hadoop106:9092,hadoop107:9092,hadoop108:9092"); 25 consumer_Props.setProperty("group.id","test"); 26 consumer_Props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); 27 consumer_Props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 28 consumer_Props.put("auto.offset.reset","earliest"); 29 } 30 }
1 import org.apache.flink.configuration.Configuration; 2 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; 3 import java.sql.Connection; 4 import java.sql.DriverManager; 5 import java.sql.PreparedStatement; 6 7 8 //自定义 mysql sink 工具类 继承 RichSinkFuncation 9 public class MySqlUtil extends RichSinkFunction<Shop> { 10 private Connection conn; 11 private PreparedStatement pre; 12 @Override 13 public void open(Configuration parameters) throws Exception { 14 Class.forName("com.mysql.jdbc.Driver"); 15 conn = DriverManager.getConnection("jdbc:mysql://hadoop106:3306/test3", "root", "root"); 16 conn.setAutoCommit(true); 17 } 18 19 @Override 20 public void invoke(Shop value, Context context) throws Exception { 21 pre = conn.prepareStatement("insert into shop(type,total) values (?,?)"); 22 pre.setString(1,value.getType()); 23 pre.setDouble(2,value.getTotal()); 24 pre.execute(); 25 } 26 27 @Override 28 public void close() throws Exception { 29 pre.close(); 30 conn.close(); 31 } 32 }
1 import org.apache.flink.api.common.serialization.SimpleStringEncoder; 2 import org.apache.flink.core.fs.Path; 3 import org.apache.flink.streaming.api.datastream.DataStreamSource; 4 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 5 import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; 6 import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; 7 8 import java.util.concurrent.TimeUnit; 9 //sink 到 hdfs 10 public class Flink_Sink_HDFS { 11 public static void main(String[] args) throws Exception { 12 //Flink环境 13 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 14 env.setParallelism(1); 15 //套接字 数据源 hadoop106 16 DataStreamSource<String> hadoop106 = env.socketTextStream("hadoop106", 9999); 17 //sink 存储路径 18 String path = "hdfs://hadoop106:8020//test//out"; 19 Path outputPath = new Path(path); 20 21 //存储设置,检查点10秒 22 env.enableCheckpointing(10000); 23 24 //sink 25 final StreamingFileSink<String> sink = StreamingFileSink 26 .forRowFormat(outputPath, new SimpleStringEncoder<String>("UTF-8")) 27 .withRollingPolicy( 28 DefaultRollingPolicy.builder() 29 .withRolloverInterval(TimeUnit.MINUTES.toMillis(15)) 30 .withInactivityInterval(TimeUnit.MINUTES.toMillis(5)) 31 .withMaxPartSize(1024 * 1024 * 1024) 32 .build()) 33 .build(); 34 //存入 35 hadoop106.addSink(sink); 36 env.execute(); 37 38 } 39 }
1 import org.apache.flink.api.common.functions.MapFunction; 2 import org.apache.flink.api.common.serialization.SimpleStringSchema; 3 import org.apache.flink.streaming.api.TimeCharacteristic; 4 import org.apache.flink.streaming.api.datastream.DataStreamSource; 5 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; 6 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 7 import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; 8 import org.apache.flink.streaming.api.windowing.time.Time; 9 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; 10 11 import java.util.Arrays; 12 import java.util.List; 13 14 public class Flink_Sink_Mysql { 15 public static void main(String[] args) throws Exception { 16 //Flink环境 17 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 18 env.setParallelism(1); 19 //时间语义:事件时间 20 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); 21 //kafka topic : day1 22 List<String> topic = Arrays.asList("day1"); 23 //拿到消费者 24 FlinkKafkaConsumer<String> sss = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), PropsUtil.consumer_Props); 25 //数据源sss从头消费 26 sss.setStartFromEarliest(); 27 //数据源获取数据 28 DataStreamSource<String> source = env.addSource(sss); 29 30 //分装成商品shop对象 31 SingleOutputStreamOperator<Shop> flat = source.map(new MapFunction<String, Shop>() { 32 @Override 33 public Shop map(String s) throws Exception { 34 String[] ss = s.split(","); 35 return new Shop(ss[0], ss[1], ss[2], Integer.valueOf(ss[3]), Double.valueOf(ss[4]), Long.valueOf(ss[5]), Integer.valueOf(ss[3]) * Double.valueOf(ss[4])); 36 } 37 }); 38 //水位线3s 39 SingleOutputStreamOperator<Shop> ope = flat.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Shop>(Time.seconds(3)) { 40 @Override 41 public long extractTimestamp(Shop shop) { 42 return shop.getTime(); 43 } 44 }); 45 46 SingleOutputStreamOperator<Shop> sum = ope.keyBy("type").timeWindow(Time.seconds(60)).sum("total"); 47 sum.print(); 48 //存入Mysql 49 sum.addSink(new MySqlUtil()); 50 51 env.execute(); 52 } 53 }