Hbase基础API

全是代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
package com.Hbase

import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter

object HbaseDDL {

def main(args: Array[String]): Unit = {

// 确认表是否存在

def ensureHbaseTableExist(tableName:TableName) = {
// 配置 Hbase
val hbaseconf = HBaseConfiguration.create()
//val zkConn = "bigdata:2181, bigdata:2182, bigdata:2183"
//hbaseconf.set("hbase.zookeeper.quorum", zkConn)
val HbaseConn = ConnectionFactory.createConnection(hbaseconf)
val adminHbase = HbaseConn.getAdmin()

val ifExist = adminHbase.tableExists(tableName)

ifExist
}

// 确认表是否存在 测试
// val result = ensureHbaseTableExist("user_info1")
// if (result) {
// println("表存在")
// } else {
// println("表不存在")
// }


/**
* Hbase建表 两个参数
* @param tableName 形式为 ns:tb 或者 tb API 创建 namespace 机会不多,一般通过 hbase shell 创建
* @param columnFamilys cf1,cf2,cf3
*/
def createHbaseTable(tableName:String, columnFamilys:String) = {

// 配置 Hbase
val hbaseconf = HBaseConfiguration.create()
//val zkConn = "bigdata:2181, bigdata:2182, bigdata:2183"
//hbaseconf.set("hbase.zookeeper.quorum", zkConn)
val HbaseConn = ConnectionFactory.createConnection(hbaseconf)
val adminHbase = HbaseConn.getAdmin()

// 列簇逗号分隔
val CFS = columnFamilys.split(",")

// 表名,判断是否带了namespace,带了则判断是否存在 namespace, 不存在则创建
val nameSpace = tableName.split(":")(0)


if (nameSpace != tableName) {
adminHbase.createNamespace(NamespaceDescriptor.create(tableName.split(":")(0)).build())
println("NameSpace 创建成功!")
}
// 判断表是否存在,不存在新建,存在则提示
if (!ensureHbaseTableExist(TableName.valueOf(tableName))) {

// 实例化 HTableDescriptor
val htable = new HTableDescriptor(TableName.valueOf(tableName))

// 循环添加所有列簇
for ( columnFamily <- CFS) {

// 实例化 HColumnDescriptor
val htableColumnFamily1 = new HColumnDescriptor((columnFamily))
// 调用 HColumnDescriptor 设置列簇属性
htableColumnFamily1.setMaxVersions(3)
// 表增加列族
htable.addFamily(new HColumnDescriptor(columnFamily))
}
// 创建表
adminHbase.createTable(htable)
println("表创建成功")
} else {
println("表已存在")
}

adminHbase.close()

}

// 测试建表
// createHbaseTable("scTable3", "info,base")


/**
* 列出所有表
*/
def listAllHbaseTable() ={
// 配置 Hbase
val hbaseconf = HBaseConfiguration.create()
//val zkConn = "bigdata:2181, bigdata:2182, bigdata:2183"
//hbaseconf.set("hbase.zookeeper.quorum", zkConn)
val HbaseConn = ConnectionFactory.createConnection(hbaseconf)
val adminHbase = HbaseConn.getAdmin()

val listTables = adminHbase.listTableNames()

for(table <- listTables){
println(table)
}
adminHbase.close()
}

//listAllHbaseTable()

/**
* 删除一张表,输入表名
* 判断是否存在,是否失效,否则不能删除
*
* @param tableName
*/
def deleteHbaseTable(tableName: String) ={

val hbaseconf = HBaseConfiguration.create()
val HbaseConn = ConnectionFactory.createConnection(hbaseconf)
val adminHbase = HbaseConn.getAdmin()

val tbName = TableName.valueOf(tableName)

if(ensureHbaseTableExist(tbName)){

// 若表不失效,则使失效
if(!adminHbase.isTableDisabled(tbName)){
adminHbase.disableTable(tbName)
}

adminHbase.deleteTable(tbName)
println("删除成功")
} else {
println("表不存在")
}

adminHbase.close()
}

//deleteHbaseTable("scTable3")


/**
*
* 删除表的某个列族 ,得到 HTableDescriptor , 调用该类的 removeFamily 方法
* @param tableName 表名 ---> ns:tb or tb [String]
* @param columnFamily 列族名 ---> String
*/
def deleteHbaseColumnFamily(tableName:String, columnFamily:String) ={

val hbaseconf = HBaseConfiguration.create()
val HbaseConn = ConnectionFactory.createConnection(hbaseconf)
val adminHbase = HbaseConn.getAdmin()

val tbName = TableName.valueOf(tableName)
// disable table
adminHbase.disableTable(tbName)

// get HTableDescriptor

val htd = adminHbase.getTableDescriptor(tbName)

// delete family
htd.removeFamily(columnFamily.getBytes())

// apply htd to table
adminHbase.modifyTable(tbName, htd)

// enable table
adminHbase.enableTable(tbName)

println("删除成功")

adminHbase.close()

}

// deleteHbaseColumnFamily("scTable3", "base")


/**
* 给表增加列族 先得到表的 HTableDescriptor, 然后使用 HColumnDescriptor 初始化 新增列,并设置属性
* 调用 HTableDescriptor 的 addFamily 方法,将初始化好的 HCD 添加到 HTableDescriptor ,然后使用admin 的 modifyTable 方法将修改应用
* @param tableName
* @param columnFamily
*/
def addHbaseColumnFamily(tableName:String, columnFamily:String) ={

val hbaseconf = HBaseConfiguration.create()
val HbaseConn = ConnectionFactory.createConnection(hbaseconf)
val adminHbase = HbaseConn.getAdmin()

val tbName = TableName.valueOf(tableName)
// disable table
adminHbase.disableTable(tbName)

// get HTableDescriptor
val htd = adminHbase.getTableDescriptor(tbName)

//
val hcd = new HColumnDescriptor(columnFamily)
hcd.setMaxVersions(3)

// add family
htd.addFamily(hcd)

// apply htd to table
adminHbase.modifyTable(tbName, htd)

// enable table
adminHbase.enableTable(tbName)

println("添加成功")

adminHbase.close()

}

//addHbaseColumnFamily("scTable3", "base")


/**
* 修改列簇功能 get 到 HTableDescriptor ,再 get 到 Family ,设置 Family ,admin modifyTable 应用
* @param tableName
* @param columnFamily
*/
def modifyHbaseTableColumnFamily(tableName:String, columnFamily:String) ={
val hbaseconf = HBaseConfiguration.create()
val HbaseConn = ConnectionFactory.createConnection(hbaseconf)
val adminHbase = HbaseConn.getAdmin()

val tbName = TableName.valueOf(tableName)

adminHbase.disableTable(tbName)

val htd = adminHbase.getTableDescriptor(tbName)

val modifyCol = htd.getFamily(columnFamily.getBytes())
modifyCol.setMaxVersions(3)

adminHbase.modifyTable(tbName,htd)

adminHbase.enableTable(tbName)

adminHbase.close()

println("修改成功!")
}

//modifyHbaseTableColumnFamily("scTable3", "info")

/**
* 插入数据,五个参数
* @param tableName
* @param columnFamily
* @param column
* @param rowkey
* @param value
*/
def putDataHbaseTable(tableName:String, columnFamily:String, column:String,
rowkey:String, value:String) ={
val hbaseconf = HBaseConfiguration.create()

// table
val hTable = new HTable(hbaseconf, tableName)
// row key
val putData = new Put(rowkey.getBytes())
// put value
putData.add(columnFamily.getBytes(), column.getBytes(), value.getBytes())

/**
* 插入方式
* ASYNC_WAL : 当数据变动时,异步写WAL日志
* SYNC_WAL : 当数据变动时,同步写WAL日志
* FSYNC_WAL : 当数据变动时,同步写WAL日志,并且,强制将数据写入磁盘
* SKIP_WAL : 不写WAL日志
* USE_DEFAULT : 使用HBase全局默认的WAL写入级别,即 SYNC_WAL
*/
putData.setDurability(Durability.SYNC_WAL)

// put data to table
hTable.put(putData)

println("插入数据成功!")
// close table
hTable.close()
}

//putDataHbaseTable("scTable3", "info", "name", "rk0002", "lalala")

def deleteDataHbaseTable(tableName: String, rowkey:String, columnFamily:String,
column:String = null)={

val hbaseConf = HBaseConfiguration.create()

val hTable = new HTable(hbaseConf, tableName)

// 初始化 Delete ,表名后可接时间戳
val deletaData = new Delete(rowkey.getBytes())

/**
* 1).删除指定列的 最新版本 的数据:Delete addColumn (byte[] family, byte[] qualifier)
* 2).删除指定列的 指定版本 的数据:Delete addColumn (byte[] family, byte[] qualifier, long timestamp )
* 3).删除指定列的 所有版本 的数据:Delete addColumns (byte[] family, byte[] qualifier)
* 4).删除指定列的,时间戳 小于等于 给定时间戳的 所有版本 的数据:Delete addColumns (byte[] family, byte[] qualifier, long timestamp )
* 5).删除指定列族的所有列的 所有版本 数据:Delete addFamily (byte[] family) 默认使用当前时间的时间戳,时间戳大于当前时间的数据删除不掉
* 6).删除指定列族的所有列中 时间戳 小于等于 指定时间戳 的所有数据:Delete addFamily (byte[] family, long timestamp)
* 7).删除指定列族中 所有列的时间戳 等于 指定时间戳的版本数据:Delete addFamilyVersion (byte[] family, long timestamp)
*/
deletaData.addColumn(columnFamily.getBytes(),column.getBytes())
//deletaData.addColumns(columnFamily.getBytes(),column.getBytes())
//deletaData.addFamily(columnFamily.getBytes())
hTable.delete(deletaData)

println("删除成功")
hTable.close()

}

// deleteDataHbaseTable("scTable3", "rk0002", "info")

def getDataHbaseTable(tableName:String, rowkey:String, columnFamily:String, column:String = null)={
val hbaseCOnf = HBaseConfiguration.create()

val hTable = new HTable(hbaseCOnf, tableName)

val getData = new Get(rowkey.getBytes())

/**
* 1). Get addFamily(byte[] family) 指定希望获取的列族
* 2). Get addColumn(byte[] family, byte[] qualifier) 指定希望获取的列
* 3). Get setTimeRange(long minStamp, long maxStamp) 设置获取数据的 时间戳范围
* 4). Get setTimeStamp(long timestamp) 设置获取数据的时间戳
* 5). Get setMaxVersions(int maxVersions) 设定获取数据的版本数
* 6). Get setMaxVersions() 设定获取数据的所有版本
* 7). Get setFilter(Filter filter) 为Get对象添加过滤器
* 8). void setCacheBlocks(boolean cacheBlocks) 设置该Get获取的数据是否缓存在内存中
*/
//getData.addFamily(columnFamily.getBytes())

//getData.addColumn(columnFamily.getBytes(), column.getBytes())

//getData.setTimeStamp("1535680422860".toLong)

getData.setMaxVersions()
val results = hTable.get(getData)

for (result <- results.rawCells()){

println( new String(CellUtil.cloneRow(result)) + "\t" +
new String(CellUtil.cloneFamily(result)) + "\t" +
new String(CellUtil.cloneQualifier(result)) + "\t" +
new String(CellUtil.cloneValue(result)) + "\t" +
result.getTimestamp)

}
hTable.close()

}

//getDataHbaseTable("scTable", "rk0002", "info", "age")

def scanDataHbaseTable(tableName:String, startRow:String, stopRow:String,
columnFamily:String, column:String)={
val hBaseConf = HBaseConfiguration.create()
val hTable = new HTable(hBaseConf, tableName)

/**
* 1). 创建扫描所有行的Scan:Scan()
*   2). 创建Scan,从指定行开始扫描:Scan(byte[] startRow)
*   注意:如果指定行不存在,从下一个最近的行开始
*   3). 创建Scan,指定起止行:Scan(byte[] startRow, byte[] stopRow)
*   注意: startRow <= 结果集 < stopRow
*   4). 创建Scan,指定起始行和过滤器:Scan(byte[] startRow, Filter filter)
*/
val scanData = new Scan()

val filter1 = new SingleColumnValueFilter(columnFamily.getBytes(), column.getBytes(), CompareOp.GREATER_OR_EQUAL, "60".getBytes() )

/**
* Scan setStartRow (byte[] startRow) 设置Scan的开始行,默认 结果集 包含该行。如果希望结果集不包含该行,可以在行键末尾加上0。
* Scan setStopRow (byte[] stopRow) 设置Scan的结束行,默认 结果集 不包含该行。如果希望结果集包含该行,可以在行键末尾加上0。
* Scan setBatch(int batch) 指定最多返回的Cell数目.用于防止一行中有过多的数据,导致OutofMemory错误
* Scan setTimeRange (long minStamp, long maxStamp) 扫描指定 时间范围 的数据
* Scan setTimeStamp (long timestamp) 扫描 指定时间 的数据
* Scan addColumn (byte[] family, byte[] qualifier) 指定扫描的列
* Scan addFamily (byte[] family) 指定扫描的列族
* Scan setFilter (Filter filter) 为Scan设置过滤器,详见HBase API Filter过滤器
* Scan setReversed (boolean reversed) 设置Scan的扫描顺序,默认是正向扫描(false),可以设置为逆向扫描(true)。注意:该方法0.98版本以后才可用!!
* Scan setMaxVersions () 获取所有版本的数据
* Scan setMaxVersions (int maxVersions) 设置获取的最大版本数! 不调用上下两个setMaxVersions() 方法,只返回最新版本数据
* void setCaching (int caching) 设定缓存在内存中的行数,缓存得越多,以后查询结果越快,同时也消耗更多内存
* void setRaw (boolean raw) 激活或者禁用raw模式。如果raw模式被激活,Scan将返回 所有已经被打上删除标记但尚未被真正删除 的数据。该功能仅用于激活了 KEEP_DELETED_ROWS的列族,即列族开启了 hcd.setKeepDeletedCells(true)
* Scan激活raw模式后,只能浏览所有的列,而不能指定任意的列,否则会报错
*/
scanData.setFilter(filter1)

val resultsScan:ResultScanner = hTable.getScanner(scanData)


while (resultsScan.iterator().hasNext){
val results = resultsScan.iterator().next()
for (result:Cell <- results.rawCells()) {

println(new String(CellUtil.cloneRow(result)) + "\t" +
new String(CellUtil.cloneFamily(result)) + "\t" +
new String(CellUtil.cloneQualifier(result)) + "\t" +
new String(CellUtil.cloneValue(result)) + "\t" +
result.getTimestamp)
}

}

/**
* for 循环无法直接遍历 ResultScanner 暂无办法
*/
// for(results:Result <- resultsScan){
//
// for (result:Cell <- results.rawCells()) {
//
// println(new String(CellUtil.cloneRow(result)) + "\t" +
// new String(CellUtil.cloneFamily(result)) + "\t" +
// new String(CellUtil.cloneQualifier(result)) + "\t" +
// new String(CellUtil.cloneValue(result)) + "\t" +
// result.getTimestamp)
// }
//
// }
hTable.close()
}

//scanDataHbaseTable("scTable", "rk0001", "rk0002", "info", "age")

}
}