mysql
1 package sink; 2 3 //import com.util.Propss; 4 //import com.bean.Sku; 5 import org.apache.flink.configuration.Configuration; 6 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; 7 import org.apache.kafka.clients.producer.KafkaProducer; 8 import org.apache.kafka.clients.producer.ProducerRecord; 9 10 import java.lang.reflect.Field; 11 import java.sql.*; 12 13 /** 14 * @Description: 15 * @Author: liuyuan 16 * @Times : 2021/9/24 20:09 17 */ 18 19 //自定义SINK RichSinkFuncation 20 public class MySQLSink extends RichSinkFunction<String> { 21 private static Connection conn; 22 private static PreparedStatement pre; 23 24 private static String database; 25 private static String sql; 26 private static Class T; 27 28 public MySQLSink(String database,String sql,Class T){ 29 this.database=database; 30 this.sql=sql; 31 this.T=T; 32 } 33 34 @Override 35 public void open(Configuration parameters) throws Exception { 36 Class.forName("com.mysql.jdbc.Driver"); 37 conn = DriverManager.getConnection("jdbc:mysql://hadoop106:3306/"+database, "root", "root"); 38 conn.setAutoCommit(true); 39 } 40 41 @Override 42 public void invoke(String value, Context context) throws Exception { 43 String[] split = value.split(","); 44 Field[] declaredFields = T.getDeclaredFields(); 45 pre = conn.prepareStatement(sql); 46 for (int i = 0; i < declaredFields.length; i++) { 47 if(declaredFields[i].getType().toString().equals("class java.lang.String")){ 48 pre.setString((i+1),split[i]); 49 } 50 if(declaredFields[i].getType().toString().equals("class java.lang.Integer")){ 51 pre.setInt((i+1),Integer.valueOf(split[i])); 52 } 53 if(declaredFields[i].getType().toString().equals("class java.lang.Double")){ 54 pre.setDouble((i+1),Double.valueOf(split[i])); 55 } 56 if(declaredFields[i].getType().toString().equals("class java.lang.Long")){ 57 pre.setLong((i+1),Long.valueOf(split[i])); 58 } 59 } 60 pre.execute(); 61 } 62 63 @Override 64 public void close() throws Exception { 65 pre.close(); 66 conn.close(); 67 68 } 69 }
hbase
1 package sink; 2 3 import org.apache.flink.configuration.Configuration; 4 import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; 5 import org.apache.flink.streaming.api.functions.sink.SinkFunction; 6 import org.apache.hadoop.hbase.*; 7 import org.apache.hadoop.hbase.client.*; 8 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 9 import org.apache.hadoop.hbase.util.Bytes; 10 11 import java.io.IOException; 12 import java.lang.reflect.Field; 13 import java.util.*; 14 15 public class HBaseSink extends RichSinkFunction<String> { 16 private Connection connection; 17 private Class T; 18 private String tableName; 19 private String[] fieldsName; 20 List<Put> list=new ArrayList<Put>(); 21 22 public static String[] getFiledName(Class T) { 23 Field[] fields =T.getClass().getDeclaredFields(); 24 String[] fieldName = new String[fields.length]; 25 for (int i = 0; i < fieldName.length; i++) { 26 fieldName[i] = fields[i].getName(); 27 28 } 29 return fieldName; 30 } 31 32 public HBaseSink(Class T, String tableName){ 33 this.T=T; 34 this.tableName=tableName; 35 this.fieldsName=getFiledName(T); 36 } 37 38 @Override 39 public void open(Configuration parameters) throws Exception { 40 connection= HBase_Util.getConf(); 41 } 42 43 @Override 44 public void invoke(String value, Context context) throws Exception { 45 String[] s1 = value.split(","); 46 Table table = connection.getTable(TableName.valueOf(tableName)); 47 // String rowkey = UUID.randomUUID().toString().replaceAll("-", ""); 48 Put put = new Put(Bytes.toBytes(s1[0])); 49 for (int i = 0; i < fieldsName.length; i++) { 50 put.addColumn(Bytes.toBytes("info"),Bytes.toBytes(fieldsName[i]),Bytes.toBytes(s1[i])); 51 list.add(put); 52 } 53 table.put(list); 54 } 55 56 @Override 57 public void close() throws Exception { 58 connection.close(); 59 } 60 61 62 public static class HBase_Util { 63 static org.apache.hadoop.conf.Configuration con = HBaseConfiguration.create(); 64 static org.apache.hadoop.conf.Configuration conf = Propss.setConf(con); 65 static Connection connection; 66 static HBaseAdmin admin; 67 static Table t; 68 69 static { 70 try { 71 connection = ConnectionFactory.createConnection(conf); 72 admin = (HBaseAdmin)connection.getAdmin(); 73 } catch (IOException e) { 74 e.printStackTrace(); 75 } 76 } 77 //获取 conn 78 public static Connection getConf(){ 79 //创建HBase的配置对象 80 org.apache.hadoop.conf.Configuration conf = HBaseConfiguration.create(); 81 //设置hbase配置属性 82 conf.set("hbase.zookeeper.quorum","hadoop106,hadoop107,hadoop108"); 83 conf.set("hbase.zookeeper.property.clientPort","2181"); 84 Connection connection=null; 85 //通过连接函数,创建连接对象 86 try { 87 connection = ConnectionFactory.createConnection(conf); 88 89 } catch (IOException e) { 90 e.printStackTrace(); 91 } 92 return connection; 93 } 94 95 //建表 96 public static void build_Table(String tableName,List<String> FamilyNames) throws Exception { 97 TableDescriptorBuilder buider = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)); 98 for (String columnName : FamilyNames) { 99 ColumnFamilyDescriptor info = ColumnFamilyDescriptorBuilder.of(Bytes.toBytes(columnName)); 100 buider.setColumnFamily(info); 101 } 102 TableDescriptor build = buider.build(); 103 admin.createTable(build); 104 System.out.println("____build_done____"); 105 } 106 107 //插入一条数据 108 public static void insert_Row(String tableName,String row,String Family,String qualifier,String value) throws Exception { 109 t = connection.getTable(TableName.valueOf(tableName)); 110 Put put = new Put(Bytes.toBytes(row)); 111 Put put1 = put.addColumn(Bytes.toBytes(Family), Bytes.toBytes(qualifier), Bytes.toBytes(value)); 112 t.put(put1); 113 System.out.println("____insert_Row_done____"); 114 } 115 116 //插入num条数据 117 public static void insert_Batch(String tableName,String row,String Family,String qualifier,String value,Integer num) throws Exception { 118 t = connection.getTable(TableName.valueOf(tableName)); 119 List<Put> list=new ArrayList<>(); 120 for (int i = 0; i < num; i++) { 121 String s = UUID.randomUUID().toString().replaceAll("-", ""); 122 Put puts = new Put(Bytes.toBytes(s)); 123 Put putss = puts.addColumn(Bytes.toBytes(Family), Bytes.toBytes(qualifier), Bytes.toBytes(value+i)); 124 list.add(putss); 125 } 126 t.put(list); 127 System.out.println("____insert_Batch_done____"); 128 } 129 130 //删除表 131 public static void drop_Table(String tableName) throws Exception { 132 if (admin.tableExists(TableName.valueOf(tableName))){ 133 admin.disableTable(TableName.valueOf(tableName)); 134 admin.deleteTable(TableName.valueOf(tableName)); 135 System.out.println("____drop_Table_done____"); 136 }else { 137 System.out.println("____no_such_Table_found____"); 138 } 139 140 } 141 142 //删除一条数据 143 public static void delete_Row(String tableName,String row) throws Exception { 144 t = connection.getTable(TableName.valueOf(tableName)); 145 Delete delete = new Delete(Bytes.toBytes(row)); 146 t.delete(delete); 147 System.out.println("____delete_Row_done____"); 148 } 149 150 //特定列过滤查询 151 public static void scan_Filter(String tableName, String Family, String qualifier, CompareOperator compare, byte[] value) throws Exception { 152 t = connection.getTable(TableName.valueOf(tableName)); 153 SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes(Family), Bytes.toBytes(qualifier), compare, value); 154 Scan scan = new Scan(); 155 Scan scan1 = scan.setFilter(filter); 156 ResultScanner scanner = t.getScanner(scan1); 157 Iterator<Result> iterator = scanner.iterator(); 158 while (iterator.hasNext()){ 159 Cell[] cells = iterator.next().rawCells(); 160 System.out.println("____"+new String(CellUtil.cloneRow(cells[0]))+"____"); 161 for (Cell cell : cells) { 162 System.out.print(new String(CellUtil.cloneRow(cell))); 163 System.out.print(" - "); 164 System.out.print(new String(CellUtil.cloneFamily(cell))); 165 System.out.print(" - "); 166 System.out.print(new String(CellUtil.cloneQualifier(cell))); 167 System.out.print(" - "); 168 System.out.println(new String(CellUtil.cloneValue(cell))); 169 } 170 171 } 172 System.out.println("____scan_Filter_done____"); 173 } 174 //查询一条数据 175 public static void scan_Row(String tableName,String row)throws Exception{ 176 t = connection.getTable(TableName.valueOf(tableName)); 177 Get get = new Get(Bytes.toBytes(row)); 178 Result result = t.get(get); 179 Cell[] cells = result.rawCells(); 180 for (Cell cell : cells) { 181 System.out.print(new String(CellUtil.cloneRow(cell))); 182 System.out.print(" - "); 183 System.out.print(new String(CellUtil.cloneFamily(cell))); 184 System.out.print(" - "); 185 System.out.print(new String(CellUtil.cloneQualifier(cell))); 186 System.out.print(" - "); 187 System.out.println(new String(CellUtil.cloneValue(cell))); 188 } 189 System.out.println("____scan_Row_done____"); 190 } 191 //区间查询数据 192 public static void scan_Rows(String tableName,String row1,String row2)throws Exception{ 193 t = connection.getTable(TableName.valueOf(tableName)); 194 Scan sc=new Scan(Bytes.toBytes(row1),Bytes.toBytes(row2)); 195 ResultScanner scanner = t.getScanner(sc); 196 Iterator<Result> iterator = scanner.iterator(); 197 System.out.println("____前闭后开____"); 198 while (iterator.hasNext()){ 199 Result next = iterator.next(); 200 Cell[] cells = next.rawCells(); 201 System.out.println("____"+new String(CellUtil.cloneRow(cells[0]))+"____"); 202 for (Cell cell : cells) { 203 System.out.print(new String(CellUtil.cloneRow(cell))); 204 System.out.print(" - "); 205 System.out.print(new String(CellUtil.cloneFamily(cell))); 206 System.out.print(" - "); 207 System.out.print(new String(CellUtil.cloneQualifier(cell))); 208 System.out.print(" - "); 209 System.out.println(new String(CellUtil.cloneValue(cell))); 210 } 211 } 212 System.out.println("____scan_Rows_done____"); 213 } 214 //查询一条特定列族数据 215 public static void get_value_by_family(String tableName,String row,String family)throws Exception{ 216 t = connection.getTable(TableName.valueOf(tableName)); 217 Get get = new Get(Bytes.toBytes(row)); 218 get.addFamily(Bytes.toBytes(family)); 219 Result result = t.get(get); 220 Cell[] cells = result.rawCells(); 221 for (Cell cell : cells) { 222 System.out.print(new String(CellUtil.cloneRow(cell))); 223 System.out.print(" - "); 224 System.out.print(new String(CellUtil.cloneFamily(cell))); 225 System.out.print(" - "); 226 System.out.print(new String(CellUtil.cloneQualifier(cell))); 227 System.out.print(" - "); 228 System.out.println(new String(CellUtil.cloneValue(cell))); 229 } 230 System.out.println("____get_value_by_family_done____"); 231 } 232 233 //查询一条特定列数据 234 public static void get_value_by_qualifier(String tableName,String row,String family,String qualifier)throws Exception{ 235 t = connection.getTable(TableName.valueOf(tableName)); 236 Get get = new Get(Bytes.toBytes(row)); 237 get.addColumn(Bytes.toBytes(family),Bytes.toBytes(qualifier)); 238 Result result = t.get(get); 239 Cell[] cells = result.rawCells(); 240 for (Cell cell : cells) { 241 System.out.print(new String(CellUtil.cloneRow(cell))); 242 System.out.print(" - "); 243 System.out.print(new String(CellUtil.cloneFamily(cell))); 244 System.out.print(" - "); 245 System.out.print(new String(CellUtil.cloneQualifier(cell))); 246 System.out.print(" - "); 247 System.out.println(new String(CellUtil.cloneValue(cell))); 248 } 249 System.out.println("____get_value_by_qualifier_done____"); 250 } 251 252 //全查某表 253 public static void scan_All(String tableName) throws Exception { 254 t = connection.getTable(TableName.valueOf(tableName)); 255 Scan sc=new Scan(); 256 ResultScanner scanner = t.getScanner(sc); 257 Iterator<Result> iterator = scanner.iterator(); 258 while (iterator.hasNext()){ 259 Result next = iterator.next(); 260 Cell[] cells = next.rawCells(); 261 System.out.println("____"+new String(CellUtil.cloneRow(cells[0]))+"____"); 262 for (Cell cell : cells) { 263 System.out.print(new String(CellUtil.cloneRow(cell))); 264 System.out.print(" - "); 265 System.out.print(new String(CellUtil.cloneFamily(cell))); 266 System.out.print(" - "); 267 System.out.print(new String(CellUtil.cloneQualifier(cell))); 268 System.out.print(" - "); 269 System.out.println(new String(CellUtil.cloneValue(cell))); 270 } 271 } 272 System.out.println("____scan_All_done____"); 273 } 274 //查看所有表 275 public static void list() throws Exception { 276 TableName[] tableNames = admin.listTableNames(); 277 for (TableName tableName : tableNames) { 278 System.out.println(tableName.toString()); 279 } 280 281 } 282 //查看所有表结构 283 public static void desc_Table(String tableName) throws Exception { 284 List<TableDescriptor> tableDescriptors = admin.listTableDescriptors(); 285 Iterator<TableDescriptor> iterator = tableDescriptors.iterator(); 286 while (iterator.hasNext()){ 287 TableDescriptor next = iterator.next(); 288 if (next.getTableName().toString().equals(tableName)){ 289 System.out.println(next); 290 } 291 } 292 System.out.println("____list_done____"); 293 294 } 295 296 297 //关流 298 public static void stop() throws Exception { 299 connection.close(); 300 System.out.println("____connection_close_done____"); 301 } 302 303 304 public static class Propss { 305 public static Properties producer_Props = new Properties(); 306 public static Properties consumer_Props = new Properties(); 307 public static HashMap<String, Object> kafka_Consumer = new HashMap<>(); 308 public static HashMap<String, Object> kafka_Producer = new HashMap<>(); 309 310 public static org.apache.hadoop.conf.Configuration setConf(org.apache.hadoop.conf.Configuration conf){ 311 conf.set("hbase.zookeeper.quorum","hadoop106,hadoop107,hadoop108"); 312 conf.set("hbae.zookeeper.property.client","2181"); 313 return conf; 314 } 315 static { 316 317 kafka_Producer.put("bootstrap.servers", "hadoop106:9092,hadoop107:9092,hadoop108:9092"); 318 //0、1 和 all:0表示只要把消息发送出去就返回成功;1表示只要Leader收到消息就返回成功;all表示所有副本都写入数据成功才算成功 319 kafka_Producer.put("acks", "all"); 320 //重试次数 321 kafka_Producer.put("retries", Integer.MAX_VALUE); 322 //批处理的字节数 323 kafka_Producer.put("batch.size", 16384); 324 //批处理的延迟时间,当批次数据未满之时等待的时间 325 kafka_Producer.put("linger.ms", 1); 326 //用来约束KafkaProducer能够使用的内存缓冲的大小的,默认值32MB 327 kafka_Producer.put("buffer.memory", 33554432); 328 // properties.put("value.serializer", 329 // "org.apache.kafka.common.serialization.ByteArraySerializer"); 330 // properties.put("key.serializer", 331 // "org.apache.kafka.common.serialization.ByteArraySerializer"); 332 kafka_Producer.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 333 kafka_Producer.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 334 335 kafka_Consumer.put("bootstrap.servers", "hadoop106:9092,hadoop107:9092,hadoop108:9092"); 336 kafka_Consumer.put("group.id", "com-test"); 337 //from beginning 338 kafka_Consumer.put("auto.offset.reset", "earliest"); 339 kafka_Consumer.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 340 kafka_Consumer.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 341 342 producer_Props.setProperty("bootstrap.servers", "hadoop106:9092,hadoop107:9092,hadoop108:9092"); 343 producer_Props.setProperty("ack", "all"); 344 producer_Props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 345 producer_Props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 346 producer_Props.put("auto.offset.reset", "earliest"); 347 348 consumer_Props.setProperty("bootstrap.servers", "hadoop106:9092,hadoop107:9092,hadoop108:9092"); 349 consumer_Props.setProperty("group.id", "com-test"); 350 consumer_Props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 351 consumer_Props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 352 consumer_Props.put("auto.offset.reset", "earliest"); 353 } 354 355 356 } 357 358 } 359 360 }