1   package eu.fbk.knowledgestore.datastore.hbase.utils;
2   
3   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.HADOOP_FS_DEFAULT_NAME;
4   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.HBASE_TRAN_LAYER;
5   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.HBASE_ZOOKEEPER_CLIENT_PORT;
6   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.HBASE_ZOOKEEPER_QUORUM;
7   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.OMID_TRAN_LAYER_OPT;
8   import static java.lang.Integer.MAX_VALUE;
9   
10  import java.io.IOException;
11  import java.nio.ByteBuffer;
12  import java.util.List;
13  import java.util.Properties;
14  
15  import org.apache.hadoop.hbase.HBaseConfiguration;
16  import org.apache.hadoop.hbase.HColumnDescriptor;
17  import org.apache.hadoop.hbase.HTableDescriptor;
18  import org.apache.hadoop.hbase.MasterNotRunningException;
19  import org.apache.hadoop.hbase.ZooKeeperConnectionException;
20  import org.apache.hadoop.hbase.client.Delete;
21  import org.apache.hadoop.hbase.client.HBaseAdmin;
22  import org.apache.hadoop.hbase.client.Put;
23  import org.apache.hadoop.hbase.client.ResultScanner;
24  import org.apache.hadoop.hbase.client.Scan;
25  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
26  import org.apache.hadoop.hbase.filter.FilterList;
27  import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
28  import org.apache.hadoop.hbase.util.Bytes;
29  import org.openrdf.model.URI;
30  import org.slf4j.Logger;
31  import org.slf4j.LoggerFactory;
32  
33  import eu.fbk.knowledgestore.data.Record;
34  import eu.fbk.knowledgestore.data.XPath;
35  import eu.fbk.knowledgestore.runtime.DataCorruptedException;
36  
37  
38  
39  
40  public abstract class AbstractHBaseUtils {
41  
42      
43      public static Logger logger = LoggerFactory.getLogger(AbstractHBaseUtils.class);
44  
45      private org.apache.hadoop.conf.Configuration hbcfg;
46      
47      private AvroSerializer serializer;
48      
49      private String hbaseTableNamePrefix;
50  
51      private boolean serverFilterFlag;
52  
53      
54  
55  
56  
57      public AbstractHBaseUtils(Properties properties) {
58          createConfiguration(properties);
59      }
60  
61      public static AbstractHBaseUtils factoryHBaseUtils(Properties properties) {
62          AbstractHBaseUtils hbaseUtils = null;
63          if (properties.getProperty(HBASE_TRAN_LAYER, OMID_TRAN_LAYER_OPT).equalsIgnoreCase(
64                  OMID_TRAN_LAYER_OPT)) {
65              logger.info("Using OMID HbaseUtils");
66              hbaseUtils = new OmidHBaseUtils(properties);
67          } else {
68              logger.info("Using Native HbaseUtils");
69              hbaseUtils = new HBaseUtils(properties);
70          }
71          return hbaseUtils;
72      }
73  
74      
75  
76  
77      public abstract void begin();
78  
79      
80  
81  
82  
83  
84      public abstract void commit() throws DataCorruptedException, IOException;
85  
86      
87  
88  
89  
90  
91      public abstract void rollback() throws DataCorruptedException, IOException;
92  
93      
94  
95  
96  
97  
98      public abstract Object getTable(String tableName);
99  
100     
101 
102 
103 
104 
105 
106 
107 
108     public abstract void processPut(Record record, String tabName,
109             String famName, String quaName);
110 
111     
112 
113 
114 
115 
116 
117 
118 
119     public abstract void processDelete(URI id, String tabName,
120             String famName, String quaName);
121 
122     
123 
124 
125 
126 
127 
128     public abstract Record get(String tableName, URI id)
129             throws IOException;
130 
131     
132 
133 
134 
135 
136     public abstract ResultScanner getScanner(String tableName, Scan scan);
137 
138     
139 
140 
141 
142 
143 
144 
145 
146     public abstract List<Record> get(String tableName, List<URI> ids)
147             throws IOException;
148 
149     
150 
151 
152 
153 
154     public abstract List<Object> checkForErrors(Object[] objs);
155 
156     
157 
158 
159 
160 
161 
162 
163 
164 
165 
166 
167 
168 
169 
170     public abstract long count(String tableName, String familyName, XPath condition)
171             throws IOException;
172 
173     
174 
175 
176 
177 
178     public void createConfiguration(final Properties properties) {
179 
180         setHbcfg(HBaseConfiguration.create());
181 
182         getHbcfg().set(HBASE_ZOOKEEPER_QUORUM,
183                 properties.getProperty(HBASE_ZOOKEEPER_QUORUM, "hlt-services4"));
184 
185         getHbcfg().set(HBASE_ZOOKEEPER_CLIENT_PORT,
186                 properties.getProperty(HBASE_ZOOKEEPER_CLIENT_PORT, "2181"));
187 
188         getHbcfg().set(HADOOP_FS_DEFAULT_NAME,
189                 properties.getProperty(HADOOP_FS_DEFAULT_NAME, "hdfs://hlt-services4:9000"));
190 
191         
192     }
193 
194     
195 
196 
197 
198 
199 
200 
201 
202 
203     public FilterList getFilter(XPath condition, boolean passAll,
204             String []famNames, String []qualNames, String []params) {
205         FilterList list = new FilterList((passAll)?FilterList.Operator.MUST_PASS_ALL:
206         FilterList.Operator.MUST_PASS_ONE);
207         for (int iCont = 0; iCont < famNames.length; iCont ++) {
208             SingleColumnValueFilter filterTmp = new SingleColumnValueFilter(
209                 Bytes.toBytes(famNames[iCont]),
210                 Bytes.toBytes(qualNames[iCont]),
211                 CompareOp.EQUAL,
212                 Bytes.toBytes(params[iCont])
213                 );
214             list.addFilter(filterTmp);
215         }
216         return list;
217     }
218  
219     
220 
221 
222 
223 
224 
225 
226 
227 
228 
229      public Scan getResultScan(String tableName, String famName,
230              ByteBuffer startKey, ByteBuffer endKey) throws IOException {
231 	 logger.debug("AbstractHBaseUtils Begin of getResultScan(" + tableName + ", " + famName + ")");
232          Scan scan = new Scan();
233          scan.addFamily(Bytes.toBytes(famName));
234          
235          if (startKey != null)
236              scan.setStartRow(Bytes.toBytes(startKey));
237          if (endKey != null)
238              scan.setStopRow(Bytes.toBytes(endKey));
239          return scan;
240      }
241 
242     
243 
244 
245 
246 
247 
248 
249 
250      public Scan getScan(String tableName,
251              String famName) throws IOException {
252          return getResultScan(tableName, famName, null, null);
253      }
254 
255     
256 
257 
258 
259 
260     public Put createPut(Record record, String tableName,
261             String famName, String quaName) throws IOException {
262         Object tTable = getTable(tableName);
263         Put put = null;
264         if (tTable != null) {
265             
266             AvroSerializer serializer = getSerializer();
267             final byte[] bytes = serializer.toBytes(record);
268             
269             put = new Put(Bytes.toBytes(record.getID().toString()));
270             
271             put.add(Bytes.toBytes(famName), Bytes.toBytes(quaName), bytes);
272         }
273         return put;
274     }
275 
276     
277 
278 
279 
280 
281 
282 
283 
284     public Delete createDelete(URI id, String tableName) throws IOException {
285         Delete del = null;
286         Object tTable = getTable(tableName);
287         if (tTable != null) {
288             del = new Delete(Bytes.toBytes(id.toString()));
289         }
290         return del;
291     }
292 
293     
294 
295 
296     public static Logger getLogger() {
297         return logger;
298     }
299 
300     
301 
302 
303     public void setLogger(Logger pLogger) {
304         logger = pLogger;
305     }
306 
307     
308 
309 
310 
311 
312 
313     public void checkAndCreateTable(String tabName, String colFamName) throws IOException {
314         HBaseAdmin hba;
315         try {
316             hba = new HBaseAdmin(this.getHbcfg());
317             if (hba.tableExists(tabName) == false) {
318                 logger.debug("creating table " + tabName);
319                 final HTableDescriptor tableDescriptor = new HTableDescriptor(tabName);
320                 final HColumnDescriptor columnDescriptor = new HColumnDescriptor(colFamName);
321 		columnDescriptor.setMaxVersions(MAX_VALUE);
322 		tableDescriptor.addFamily(columnDescriptor);
323                 hba.createTable(tableDescriptor);
324             } else {
325                 logger.debug("already existent table " + tabName);
326             }
327             hba.close();
328         } catch (MasterNotRunningException e) {
329             throw new IOException(e);
330         } catch (ZooKeeperConnectionException e) {
331             throw new IOException(e);
332         } catch (IOException e) {
333             throw new IOException(e);
334         }
335     }
336     
337 
338 
339     public org.apache.hadoop.conf.Configuration getHbcfg() {
340         return hbcfg;
341     }
342     
343 
344 
345     public void setHbcfg(org.apache.hadoop.conf.Configuration hbcfg) {
346         this.hbcfg = hbcfg;
347     }
348 
349     public void initServerFilterFlag(boolean serverFilterFlag) {
350         this.serverFilterFlag = serverFilterFlag;
351     }
352    
353     
354 
355 
356     public boolean getServerFilterFlag() {
357         return serverFilterFlag;
358     }
359 
360     public void initSerializer(AvroSerializer serializer) {
361         this.serializer = serializer;
362     }
363    
364     
365 
366 
367     public AvroSerializer getSerializer() {
368         return serializer;
369     }
370     
371     public void initHbaseTableNamePrefix(String hbaseTableNamePrefix) {
372         this.hbaseTableNamePrefix = hbaseTableNamePrefix;
373     }
374    
375     
376 
377 
378     public String getHbaseTableNamePrefix() {
379         return hbaseTableNamePrefix;
380     }
381 
382 }