1 package eu.fbk.knowledgestore.datastore.hbase.utils;
2
3 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.HADOOP_FS_DEFAULT_NAME;
4 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.HBASE_TRAN_LAYER;
5 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.HBASE_ZOOKEEPER_CLIENT_PORT;
6 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.HBASE_ZOOKEEPER_QUORUM;
7 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.OMID_TRAN_LAYER_OPT;
8 import static java.lang.Integer.MAX_VALUE;
9
10 import java.io.IOException;
11 import java.nio.ByteBuffer;
12 import java.util.List;
13 import java.util.Properties;
14
15 import org.apache.hadoop.hbase.HBaseConfiguration;
16 import org.apache.hadoop.hbase.HColumnDescriptor;
17 import org.apache.hadoop.hbase.HTableDescriptor;
18 import org.apache.hadoop.hbase.MasterNotRunningException;
19 import org.apache.hadoop.hbase.ZooKeeperConnectionException;
20 import org.apache.hadoop.hbase.client.Delete;
21 import org.apache.hadoop.hbase.client.HBaseAdmin;
22 import org.apache.hadoop.hbase.client.Put;
23 import org.apache.hadoop.hbase.client.ResultScanner;
24 import org.apache.hadoop.hbase.client.Scan;
25 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
26 import org.apache.hadoop.hbase.filter.FilterList;
27 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
28 import org.apache.hadoop.hbase.util.Bytes;
29 import org.openrdf.model.URI;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 import eu.fbk.knowledgestore.data.Record;
34 import eu.fbk.knowledgestore.data.XPath;
35 import eu.fbk.knowledgestore.runtime.DataCorruptedException;
36
37
38
39
40 public abstract class AbstractHBaseUtils {
41
42
43 public static Logger logger = LoggerFactory.getLogger(AbstractHBaseUtils.class);
44
45 private org.apache.hadoop.conf.Configuration hbcfg;
46
47 private AvroSerializer serializer;
48
49 private String hbaseTableNamePrefix;
50
51 private boolean serverFilterFlag;
52
53
54
55
56
57 public AbstractHBaseUtils(Properties properties) {
58 createConfiguration(properties);
59 }
60
61 public static AbstractHBaseUtils factoryHBaseUtils(Properties properties) {
62 AbstractHBaseUtils hbaseUtils = null;
63 if (properties.getProperty(HBASE_TRAN_LAYER, OMID_TRAN_LAYER_OPT).equalsIgnoreCase(
64 OMID_TRAN_LAYER_OPT)) {
65 logger.info("Using OMID HbaseUtils");
66 hbaseUtils = new OmidHBaseUtils(properties);
67 } else {
68 logger.info("Using Native HbaseUtils");
69 hbaseUtils = new HBaseUtils(properties);
70 }
71 return hbaseUtils;
72 }
73
74
75
76
77 public abstract void begin();
78
79
80
81
82
83
84 public abstract void commit() throws DataCorruptedException, IOException;
85
86
87
88
89
90
91 public abstract void rollback() throws DataCorruptedException, IOException;
92
93
94
95
96
97
98 public abstract Object getTable(String tableName);
99
100
101
102
103
104
105
106
107
108 public abstract void processPut(Record record, String tabName,
109 String famName, String quaName);
110
111
112
113
114
115
116
117
118
119 public abstract void processDelete(URI id, String tabName,
120 String famName, String quaName);
121
122
123
124
125
126
127
128 public abstract Record get(String tableName, URI id)
129 throws IOException;
130
131
132
133
134
135
136 public abstract ResultScanner getScanner(String tableName, Scan scan);
137
138
139
140
141
142
143
144
145
146 public abstract List<Record> get(String tableName, List<URI> ids)
147 throws IOException;
148
149
150
151
152
153
154 public abstract List<Object> checkForErrors(Object[] objs);
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170 public abstract long count(String tableName, String familyName, XPath condition)
171 throws IOException;
172
173
174
175
176
177
178 public void createConfiguration(final Properties properties) {
179
180 setHbcfg(HBaseConfiguration.create());
181
182 getHbcfg().set(HBASE_ZOOKEEPER_QUORUM,
183 properties.getProperty(HBASE_ZOOKEEPER_QUORUM, "hlt-services4"));
184
185 getHbcfg().set(HBASE_ZOOKEEPER_CLIENT_PORT,
186 properties.getProperty(HBASE_ZOOKEEPER_CLIENT_PORT, "2181"));
187
188 getHbcfg().set(HADOOP_FS_DEFAULT_NAME,
189 properties.getProperty(HADOOP_FS_DEFAULT_NAME, "hdfs://hlt-services4:9000"));
190
191
192 }
193
194
195
196
197
198
199
200
201
202
203 public FilterList getFilter(XPath condition, boolean passAll,
204 String []famNames, String []qualNames, String []params) {
205 FilterList list = new FilterList((passAll)?FilterList.Operator.MUST_PASS_ALL:
206 FilterList.Operator.MUST_PASS_ONE);
207 for (int iCont = 0; iCont < famNames.length; iCont ++) {
208 SingleColumnValueFilter filterTmp = new SingleColumnValueFilter(
209 Bytes.toBytes(famNames[iCont]),
210 Bytes.toBytes(qualNames[iCont]),
211 CompareOp.EQUAL,
212 Bytes.toBytes(params[iCont])
213 );
214 list.addFilter(filterTmp);
215 }
216 return list;
217 }
218
219
220
221
222
223
224
225
226
227
228
229 public Scan getResultScan(String tableName, String famName,
230 ByteBuffer startKey, ByteBuffer endKey) throws IOException {
231 logger.debug("AbstractHBaseUtils Begin of getResultScan(" + tableName + ", " + famName + ")");
232 Scan scan = new Scan();
233 scan.addFamily(Bytes.toBytes(famName));
234
235 if (startKey != null)
236 scan.setStartRow(Bytes.toBytes(startKey));
237 if (endKey != null)
238 scan.setStopRow(Bytes.toBytes(endKey));
239 return scan;
240 }
241
242
243
244
245
246
247
248
249
250 public Scan getScan(String tableName,
251 String famName) throws IOException {
252 return getResultScan(tableName, famName, null, null);
253 }
254
255
256
257
258
259
260 public Put createPut(Record record, String tableName,
261 String famName, String quaName) throws IOException {
262 Object tTable = getTable(tableName);
263 Put put = null;
264 if (tTable != null) {
265
266 AvroSerializer serializer = getSerializer();
267 final byte[] bytes = serializer.toBytes(record);
268
269 put = new Put(Bytes.toBytes(record.getID().toString()));
270
271 put.add(Bytes.toBytes(famName), Bytes.toBytes(quaName), bytes);
272 }
273 return put;
274 }
275
276
277
278
279
280
281
282
283
284 public Delete createDelete(URI id, String tableName) throws IOException {
285 Delete del = null;
286 Object tTable = getTable(tableName);
287 if (tTable != null) {
288 del = new Delete(Bytes.toBytes(id.toString()));
289 }
290 return del;
291 }
292
293
294
295
296 public static Logger getLogger() {
297 return logger;
298 }
299
300
301
302
303 public void setLogger(Logger pLogger) {
304 logger = pLogger;
305 }
306
307
308
309
310
311
312
313 public void checkAndCreateTable(String tabName, String colFamName) throws IOException {
314 HBaseAdmin hba;
315 try {
316 hba = new HBaseAdmin(this.getHbcfg());
317 if (hba.tableExists(tabName) == false) {
318 logger.debug("creating table " + tabName);
319 final HTableDescriptor tableDescriptor = new HTableDescriptor(tabName);
320 final HColumnDescriptor columnDescriptor = new HColumnDescriptor(colFamName);
321 columnDescriptor.setMaxVersions(MAX_VALUE);
322 tableDescriptor.addFamily(columnDescriptor);
323 hba.createTable(tableDescriptor);
324 } else {
325 logger.debug("already existent table " + tabName);
326 }
327 hba.close();
328 } catch (MasterNotRunningException e) {
329 throw new IOException(e);
330 } catch (ZooKeeperConnectionException e) {
331 throw new IOException(e);
332 } catch (IOException e) {
333 throw new IOException(e);
334 }
335 }
336
337
338
339 public org.apache.hadoop.conf.Configuration getHbcfg() {
340 return hbcfg;
341 }
342
343
344
345 public void setHbcfg(org.apache.hadoop.conf.Configuration hbcfg) {
346 this.hbcfg = hbcfg;
347 }
348
349 public void initServerFilterFlag(boolean serverFilterFlag) {
350 this.serverFilterFlag = serverFilterFlag;
351 }
352
353
354
355
356 public boolean getServerFilterFlag() {
357 return serverFilterFlag;
358 }
359
360 public void initSerializer(AvroSerializer serializer) {
361 this.serializer = serializer;
362 }
363
364
365
366
367 public AvroSerializer getSerializer() {
368 return serializer;
369 }
370
371 public void initHbaseTableNamePrefix(String hbaseTableNamePrefix) {
372 this.hbaseTableNamePrefix = hbaseTableNamePrefix;
373 }
374
375
376
377
378 public String getHbaseTableNamePrefix() {
379 return hbaseTableNamePrefix;
380 }
381
382 }