1 import org.apache.hadoop.conf.Configuration; 2 3 import java.util.HashMap; 4 import java.util.Properties; 5 6 public class Propss { 7 static Properties producer_Props = new Properties(); 8 static Properties consumer_Props = new Properties(); 9 static HashMap<String, Object> kafka_Consumer = new HashMap<>(); 10 public static Configuration setConf(Configuration conf){ 11 conf.set("hbase.zookeeper.quorum","hadoop106,hadoop107,hadoop108"); 12 conf.set("hbae.zookeeper.property.client","2181"); 13 return conf; 14 } 15 16 static{ 17 kafka_Consumer.put("bootstrap.servers","hadoop106:9092,hadoop107:9092,hadoop108:9092"); 18 kafka_Consumer.put("group.id", "test"); 19 //from beginning 20 kafka_Consumer.put("auto.offset.reset","earliest"); 21 kafka_Consumer.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); 22 kafka_Consumer.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 23 24 producer_Props.setProperty("bootstrap.servers","hadoop106:9092,hadoop107:9092,hadoop108:9092"); 25 producer_Props.setProperty("ack","all"); 26 producer_Props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); 27 producer_Props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 28 producer_Props.put("auto.offset.reset","earliest"); 29 30 consumer_Props.setProperty("bootstrap.servers","hadoop106:9092,hadoop107:9092,hadoop108:9092"); 31 consumer_Props.setProperty("group.id","test"); 32 consumer_Props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); 33 consumer_Props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 34 consumer_Props.put("auto.offset.reset","earliest"); 35 } 36 }
1 import org.apache.hadoop.conf.Configuration; 2 import org.apache.hadoop.hbase.*; 3 import org.apache.hadoop.hbase.client.*; 4 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; 5 import org.apache.hadoop.hbase.util.Bytes; 6 import java.io.IOException; 7 import java.util.ArrayList; 8 import java.util.Iterator; 9 import java.util.List; 10 import java.util.UUID; 11 12 public class HBase_Util { 13 static Configuration con = HBaseConfiguration.create(); 14 static Configuration conf = Propss.setConf(con); 15 static Connection connection; 16 static HBaseAdmin admin; 17 static Table t; 18 19 static { 20 try { 21 connection = ConnectionFactory.createConnection(conf); 22 admin = (HBaseAdmin)connection.getAdmin(); 23 } catch (IOException e) { 24 e.printStackTrace(); 25 } 26 } 27 28 //建表 29 public static void build_Table(String tableName,List<String> FamilyNames) throws Exception { 30 TableDescriptorBuilder buider = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName)); 31 for (String columnName : FamilyNames) { 32 ColumnFamilyDescriptor info = ColumnFamilyDescriptorBuilder.of(Bytes.toBytes(columnName)); 33 buider.setColumnFamily(info); 34 } 35 TableDescriptor build = buider.build(); 36 admin.createTable(build); 37 System.out.println("____build_done____"); 38 } 39 40 //插入一条数据 41 public static void insert_Row(String tableName,String row,String Family,String qualifier,String value) throws Exception { 42 t = connection.getTable(TableName.valueOf(tableName)); 43 Put put = new Put(Bytes.toBytes(row)); 44 Put put1 = put.addColumn(Bytes.toBytes(Family), Bytes.toBytes(qualifier), Bytes.toBytes(value)); 45 t.put(put1); 46 System.out.println("____insert_Row_done____"); 47 } 48 49 //插入num条数据 50 public static void insert_Batch(String tableName,String row,String Family,String qualifier,String value,Integer num) throws Exception { 51 t = connection.getTable(TableName.valueOf(tableName)); 52 List<Put> list=new ArrayList<>(); 53 for (int i = 0; i < num; i++) { 54 String s = UUID.randomUUID().toString().replaceAll("-", ""); 55 Put puts = new Put(Bytes.toBytes(s)); 56 Put putss = puts.addColumn(Bytes.toBytes(Family), Bytes.toBytes(qualifier), Bytes.toBytes(value+i)); 57 list.add(putss); 58 } 59 t.put(list); 60 System.out.println("____insert_Batch_done____"); 61 } 62 63 //删除表 64 public static void drop_Table(String tableName) throws Exception { 65 if (admin.tableExists(TableName.valueOf(tableName))){ 66 admin.disableTable(TableName.valueOf(tableName)); 67 admin.deleteTable(TableName.valueOf(tableName)); 68 System.out.println("____drop_Table_done____"); 69 }else { 70 System.out.println("____no_such_Table_found____"); 71 } 72 73 } 74 75 //删除一条数据 76 public static void delete_Row(String tableName,String row) throws Exception { 77 t = connection.getTable(TableName.valueOf(tableName)); 78 Delete delete = new Delete(Bytes.toBytes(row)); 79 t.delete(delete); 80 System.out.println("____delete_Row_done____"); 81 } 82 83 //特定列过滤查询 84 public static void scan_Filter(String tableName,String Family,String qualifier,CompareOperator compare,byte[] value) throws Exception { 85 t = connection.getTable(TableName.valueOf(tableName)); 86 SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes(Family), Bytes.toBytes(qualifier), compare, value); 87 Scan scan = new Scan(); 88 Scan scan1 = scan.setFilter(filter); 89 ResultScanner scanner = t.getScanner(scan1); 90 Iterator<Result> iterator = scanner.iterator(); 91 while (iterator.hasNext()){ 92 Cell[] cells = iterator.next().rawCells(); 93 System.out.println("____"+new String(CellUtil.cloneRow(cells[0]))+"____"); 94 for (Cell cell : cells) { 95 System.out.print(new String(CellUtil.cloneRow(cell))); 96 System.out.print(" - "); 97 System.out.print(new String(CellUtil.cloneFamily(cell))); 98 System.out.print(" - "); 99 System.out.print(new String(CellUtil.cloneQualifier(cell))); 100 System.out.print(" - "); 101 System.out.println(new String(CellUtil.cloneValue(cell))); 102 } 103 104 } 105 System.out.println("____scan_Filter_done____"); 106 } 107 //查询一条数据 108 public static void scan_Row(String tableName,String row)throws Exception{ 109 t = connection.getTable(TableName.valueOf(tableName)); 110 Get get = new Get(Bytes.toBytes(row)); 111 Result result = t.get(get); 112 Cell[] cells = result.rawCells(); 113 for (Cell cell : cells) { 114 System.out.print(new String(CellUtil.cloneRow(cell))); 115 System.out.print(" - "); 116 System.out.print(new String(CellUtil.cloneFamily(cell))); 117 System.out.print(" - "); 118 System.out.print(new String(CellUtil.cloneQualifier(cell))); 119 System.out.print(" - "); 120 System.out.println(new String(CellUtil.cloneValue(cell))); 121 } 122 System.out.println("____scan_Row_done____"); 123 } 124 //区间查询数据 125 public static void scan_Rows(String tableName,String row1,String row2)throws Exception{ 126 t = connection.getTable(TableName.valueOf(tableName)); 127 Scan sc=new Scan(Bytes.toBytes(row1),Bytes.toBytes(row2)); 128 ResultScanner scanner = t.getScanner(sc); 129 Iterator<Result> iterator = scanner.iterator(); 130 System.out.println("____前闭后开____"); 131 while (iterator.hasNext()){ 132 Result next = iterator.next(); 133 Cell[] cells = next.rawCells(); 134 System.out.println("____"+new String(CellUtil.cloneRow(cells[0]))+"____"); 135 for (Cell cell : cells) { 136 System.out.print(new String(CellUtil.cloneRow(cell))); 137 System.out.print(" - "); 138 System.out.print(new String(CellUtil.cloneFamily(cell))); 139 System.out.print(" - "); 140 System.out.print(new String(CellUtil.cloneQualifier(cell))); 141 System.out.print(" - "); 142 System.out.println(new String(CellUtil.cloneValue(cell))); 143 } 144 } 145 System.out.println("____scan_Rows_done____"); 146 } 147 //查询一条特定列族数据 148 public static void get_value_by_family(String tableName,String row,String family)throws Exception{ 149 t = connection.getTable(TableName.valueOf(tableName)); 150 Get get = new Get(Bytes.toBytes(row)); 151 get.addFamily(Bytes.toBytes(family)); 152 Result result = t.get(get); 153 Cell[] cells = result.rawCells(); 154 for (Cell cell : cells) { 155 System.out.print(new String(CellUtil.cloneRow(cell))); 156 System.out.print(" - "); 157 System.out.print(new String(CellUtil.cloneFamily(cell))); 158 System.out.print(" - "); 159 System.out.print(new String(CellUtil.cloneQualifier(cell))); 160 System.out.print(" - "); 161 System.out.println(new String(CellUtil.cloneValue(cell))); 162 } 163 System.out.println("____get_value_by_family_done____"); 164 } 165 166 //查询一条特定列数据 167 public static void get_value_by_qualifier(String tableName,String row,String family,String qualifier)throws Exception{ 168 t = connection.getTable(TableName.valueOf(tableName)); 169 Get get = new Get(Bytes.toBytes(row)); 170 get.addColumn(Bytes.toBytes(family),Bytes.toBytes(qualifier)); 171 Result result = t.get(get); 172 Cell[] cells = result.rawCells(); 173 for (Cell cell : cells) { 174 System.out.print(new String(CellUtil.cloneRow(cell))); 175 System.out.print(" - "); 176 System.out.print(new String(CellUtil.cloneFamily(cell))); 177 System.out.print(" - "); 178 System.out.print(new String(CellUtil.cloneQualifier(cell))); 179 System.out.print(" - "); 180 System.out.println(new String(CellUtil.cloneValue(cell))); 181 } 182 System.out.println("____get_value_by_qualifier_done____"); 183 } 184 185 //全查某表 186 public static void scan_All(String tableName) throws Exception { 187 t = connection.getTable(TableName.valueOf(tableName)); 188 Scan sc=new Scan(); 189 ResultScanner scanner = t.getScanner(sc); 190 Iterator<Result> iterator = scanner.iterator(); 191 while (iterator.hasNext()){ 192 Result next = iterator.next(); 193 Cell[] cells = next.rawCells(); 194 System.out.println("____"+new String(CellUtil.cloneRow(cells[0]))+"____"); 195 for (Cell cell : cells) { 196 System.out.print(new String(CellUtil.cloneRow(cell))); 197 System.out.print(" - "); 198 System.out.print(new String(CellUtil.cloneFamily(cell))); 199 System.out.print(" - "); 200 System.out.print(new String(CellUtil.cloneQualifier(cell))); 201 System.out.print(" - "); 202 System.out.println(new String(CellUtil.cloneValue(cell))); 203 } 204 } 205 System.out.println("____scan_All_done____"); 206 } 207 //查看所有表 208 public static void list() throws Exception { 209 TableName[] tableNames = admin.listTableNames(); 210 for (TableName tableName : tableNames) { 211 System.out.println(tableName.toString()); 212 } 213 214 } 215 //查看某表结构 216 public static void desc_Table(String tableName) throws Exception { 217 List<TableDescriptor> tableDescriptors = admin.listTableDescriptors(); 218 Iterator<TableDescriptor> iterator = tableDescriptors.iterator(); 219 while (iterator.hasNext()){ 220 TableDescriptor next = iterator.next(); 221 if (next.getTableName().toString().equals(tableName)){ 222 System.out.println(next); 223 } 224 } 225 System.out.println("____list_done____"); 226 227 } 228 229 230 //关流 231 public static void stop() throws Exception { 232 connection.close(); 233 System.out.println("____connection_close_done____"); 234 } 235 }
1 import org.apache.hadoop.hbase.CompareOperator; 2 import org.apache.hadoop.hbase.util.Bytes; 3 4 import java.util.Arrays; 5 import java.util.List; 6 7 public class Test { 8 public static void main(String[] args) throws Exception { 9 //查询所有表 10 HBase_Util.list(); 11 //创建表 12 List<String> list = Arrays.asList("info","data"); 13 HBase_Util.build_Table("user_info2",list); 14 //删除某表 15 HBase_Util.drop_Table("user_info2"); 16 //查询某表结构 17 HBase_Util.desc_Table("user_info2"); 18 //插入一条数据 19 HBase_Util.insert_Row("user_info2","001","info","name","zhangsan"); 20 HBase_Util.insert_Row("user_info2","001","info","age","18"); 21 HBase_Util.insert_Row("user_info2","001","info","sex","male"); 22 HBase_Util.insert_Row("user_info2","001","data","Math","100"); 23 HBase_Util.insert_Row("user_info2","001","data","Chinese","97"); 24 HBase_Util.insert_Row("user_info2","001","data","English","95"); 25 26 HBase_Util.insert_Row("user_info2","002","info","name","lisi"); 27 HBase_Util.insert_Row("user_info2","002","info","age","19"); 28 HBase_Util.insert_Row("user_info2","002","info","sex","famale"); 29 HBase_Util.insert_Row("user_info2","002","data","Math","70"); 30 HBase_Util.insert_Row("user_info2","002","data","Chinese","100"); 31 HBase_Util.insert_Row("user_info2","002","data","English","99"); 32 33 HBase_Util.insert_Row("user_info2","003","info","name","wangwu"); 34 HBase_Util.insert_Row("user_info2","003","info","age","20"); 35 HBase_Util.insert_Row("user_info2","003","info","sex","male"); 36 HBase_Util.insert_Row("user_info2","003","data","Math","65"); 37 HBase_Util.insert_Row("user_info2","003","data","Chinese","50"); 38 HBase_Util.insert_Row("user_info2","003","data","English","49"); 39 //全查某表 40 HBase_Util.scan_All("user_info2"); 41 //查询一条记录 42 HBase_Util.scan_Row("user_info2","001"); 43 //查询一条记录的 特定列族的 数据 44 HBase_Util.get_value_by_family("user_info2","001","info"); 45 //查询一条记录的 特定列的 数据 46 HBase_Util.get_value_by_qualifier("user_info2","001","data","Math"); 47 //查询区间数据 [前闭后开) 48 HBase_Util.scan_Rows("user_info2","001","003"); 49 //特定列过滤查询 50 HBase_Util.scan_Filter("user_info2","data","Math", CompareOperator.GREATER, Bytes.toBytes(60)); 51 //删除某一条记录 52 HBase_Util.delete_Row("user_info2","003"); 53 54 HBase_Util.stop(); 55 } 56 }