1   package eu.fbk.knowledgestore.datastore.hbase.utils;
2   
3   import java.io.IOException;
4   import java.util.ArrayList;
5   import java.util.HashMap;
6   import java.util.List;
7   import java.util.Map;
8   import java.util.Properties;
9   
10  import com.google.common.base.Preconditions;
11  
12  import org.apache.hadoop.hbase.HTableDescriptor;
13  import org.apache.hadoop.hbase.client.Delete;
14  import org.apache.hadoop.hbase.client.Get;
15  import org.apache.hadoop.hbase.client.HBaseAdmin;
16  import org.apache.hadoop.hbase.client.HTable;
17  import org.apache.hadoop.hbase.client.Put;
18  import org.apache.hadoop.hbase.client.Result;
19  import org.apache.hadoop.hbase.client.ResultScanner;
20  import org.apache.hadoop.hbase.client.Scan;
21  import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
22  import org.apache.hadoop.hbase.util.Bytes;
23  import org.openrdf.model.URI;
24  
25  import eu.fbk.knowledgestore.data.Record;
26  import eu.fbk.knowledgestore.data.XPath;
27  
28  public class HBaseUtils extends AbstractHBaseUtils {
29  
30      
31      private static Map<String, HTable> tableNameHandleMap = new HashMap<String, HTable>();
32  
33      public HBaseUtils(final Properties properties) {
34          super(properties);
35      }
36  
37      
38  
39  
40  
41  
42      @Override
43      public HTable getTable(String tableName) {
44  	logger.debug("NATIVE Begin of getTable for " + tableName);
45          HTable table = tableNameHandleMap.get(tableName);
46          if (table != null) {
47              logger.debug("NATIVE Found a cached handle for table " + tableName);
48              return table;
49          }
50          try {
51              logger.debug("NATIVE Looking for a handle of table: " + tableName);
52              HBaseAdmin admin = new HBaseAdmin(this.getHbcfg());
53              HTableDescriptor[] resources = admin.listTables(tableName);
54  	    Preconditions.checkElementIndex(0, resources.length, "no table " + tableName + " found");
55              admin.close();
56              table = new HTable(this.getHbcfg(), tableName);
57          } catch (IOException e) {
58              logger.error("NATIVE Error while trying to obtain table: " + tableName);
59              logger.error(e.getMessage());
60          };
61          tableNameHandleMap.put(tableName, table);
62          logger.debug("NATIVE Cached a handle of table: " + tableName);
63          return table;
64      }
65  
66      
67  
68  
69      @Override
70      public void commit() {
71      }
72  
73      
74  
75  
76      @Override
77      public void rollback() {
78      }
79  
80      
81  
82  
83  
84  
85  
86      @Override
87      public ResultScanner getScanner(String tableName, Scan scan) {
88  	logger.debug("NATIVE Begin of getScanner(" + tableName + ", " + scan + ")");
89          HTable tab = (HTable) getTable(tableName);
90          ResultScanner resScanner = null;
91          try {
92              resScanner = tab.getScanner(scan);
93          } catch (IOException e) {
94              logger.error("Error while trying to obtain a ResultScanner: " + tableName);
95              logger.error(e.getMessage());
96          }
97          return resScanner;
98      }
99  
100     
101 
102 
103     @Override
104     public void processPut(Record record, String tabName,
105             String famName, String quaName) {
106 	logger.debug("NATIVE Begin processPut(" + record + ", " + tabName + ")");
107         HTable hTable = getTable(tabName);
108         try {
109             Put op = createPut(record, tabName, famName, quaName);
110             hTable.put(op);
111         } catch (IOException e) {
112             logger.error("Error while attempting to perform operations at HBaseDataTransactions.");
113             logger.error(e.getMessage());
114         }
115     }
116 
117     
118 
119 
120     @Override
121     public void processDelete(URI id, String tabName,
122             String famName, String quaName) {
123 	logger.debug("NATIVE Begin processDelete(" + id + ", " + tabName + ")");
124         HTable hTable = getTable(tabName);
125         try {
126             Delete op = createDelete(id, tabName);
127             hTable.delete(op);
128         } catch (IOException e) {
129             logger.error("Error while attempting to perform operations at HBaseDataTransactions.");
130             logger.error(e.getMessage());
131         }
132     }
133 
134     
135 
136 
137 
138 
139 
140     @Override
141     public Record get(String tableName, URI id) throws IOException {
142 	logger.debug("NATIVE Begin of get(" + tableName + ", " + id + ")");
143         HTable selTable = getTable(tableName);
144         Record resGotten = null;
145         if (selTable != null) {
146            
147            Get get = new Get(Bytes.toBytes(id.toString()));
148            Result rs = selTable.get(get);
149            logger.debug("Value obtained: " + new String(rs.value()));
150            final AvroSerializer serializer = getSerializer();
151            resGotten = (Record) serializer.fromBytes(rs.value());
152         }
153         return resGotten;
154     }
155  
156     @Override
157     public List<Record> get(String tableName,
158             List<URI> ids) throws IOException {
159 	logger.debug("NATIVE Begin of get(" + tableName + ", " + ids + ")");
160         HTable selTable = getTable(tableName);
161         List<Record> resGotten = new ArrayList<Record> ();
162         List<Get> gets = new ArrayList<Get> ();
163         AvroSerializer serializer = getSerializer();
164 
165         for (URI id : ids) {
166             gets.add(new Get(Bytes.toBytes(id.toString())));
167         }
168         Result[] results = selTable.get(gets);
169 
170         
171         for (Result res : results) {
172             final byte[] bytes = res.value();
173             if (bytes != null) {
174                 resGotten.add((Record) serializer.fromBytes(bytes));
175             }
176         }
177         return resGotten;
178     }
179  
180     
181 
182 
183 
184 
185     @Override
186     public Put createPut(Record record, String tableName,
187             String famName, String quaName) throws IOException {
188         HTable hTable = getTable(tableName);
189         Put put = null;
190         if (hTable != null) {
191             
192             AvroSerializer serializer = getSerializer();
193             final byte[] bytes = serializer.toBytes(record);
194             
195             put = new Put(Bytes.toBytes(record.getID().toString()));
196             
197             put.add(Bytes.toBytes(famName), Bytes.toBytes(quaName), bytes);
198         }
199         return put;
200     }
201 
202     
203 
204 
205 
206 
207 
208 
209 
210     @Override
211     public Delete createDelete(URI id, String tableName) throws IOException {
212         Delete del = null;
213         HTable hTable = getTable(tableName);
214         if (hTable != null) {
215             del = new Delete(Bytes.toBytes(id.toString()));
216         }
217         return del;
218     }
219 
220     
221 
222 
223 
224 
225     @Override
226     public List<Object> checkForErrors(Object[] objs) {
227         List<Object> errors = new ArrayList<Object>();
228         if (objs != null) {
229             for (int cont = 0; cont < objs.length; cont ++) {
230                 if (objs[cont] == null) {
231                     logger.debug("A operation could not be performed.");
232                     errors.add(objs[cont]);
233                 }
234             }
235         }
236         return errors;
237     }
238 
239     
240 
241 
242 
243 
244 
245 
246     @Override
247     public long count(String tableName, String familyName, XPath condition) throws IOException {
248 	logger.debug("NATIVE Begin count");
249 	
250 	org.apache.hadoop.conf.Configuration customConf = new org.apache.hadoop.conf.Configuration(super.getHbcfg());
251 	
252         customConf.setLong("hbase.rpc.timeout", 600000);
253         
254         customConf.setLong("hbase.client.scanner.caching", 1000);
255 
256 	
257 
258 
259 
260 
261 
262         AggregationClient agClient = new AggregationClient(customConf);
263 	long rowCount = 0;
264 	byte[] tName = Bytes.toBytes(tableName);
265 	try {
266 	    Scan scan = getScan(tableName, familyName);
267 	    rowCount = agClient.rowCount(tName, null, scan);
268 	} catch (Throwable e) {
269             throw new IOException(e.toString());
270 	}
271         return rowCount;
272     }
273 
274     @Override
275     public void begin() {
276     }
277 }