1 package eu.fbk.knowledgestore.datastore.hbase.utils;
2
3 import java.io.IOException;
4 import java.util.ArrayList;
5 import java.util.HashMap;
6 import java.util.List;
7 import java.util.Map;
8 import java.util.Properties;
9
10 import com.google.common.base.Preconditions;
11
12 import org.apache.hadoop.hbase.HTableDescriptor;
13 import org.apache.hadoop.hbase.client.Delete;
14 import org.apache.hadoop.hbase.client.Get;
15 import org.apache.hadoop.hbase.client.HBaseAdmin;
16 import org.apache.hadoop.hbase.client.HTable;
17 import org.apache.hadoop.hbase.client.Put;
18 import org.apache.hadoop.hbase.client.Result;
19 import org.apache.hadoop.hbase.client.ResultScanner;
20 import org.apache.hadoop.hbase.client.Scan;
21 import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
22 import org.apache.hadoop.hbase.util.Bytes;
23 import org.openrdf.model.URI;
24
25 import eu.fbk.knowledgestore.data.Record;
26 import eu.fbk.knowledgestore.data.XPath;
27
28 public class HBaseUtils extends AbstractHBaseUtils {
29
30
31 private static Map<String, HTable> tableNameHandleMap = new HashMap<String, HTable>();
32
33 public HBaseUtils(final Properties properties) {
34 super(properties);
35 }
36
37
38
39
40
41
42 @Override
43 public HTable getTable(String tableName) {
44 logger.debug("NATIVE Begin of getTable for " + tableName);
45 HTable table = tableNameHandleMap.get(tableName);
46 if (table != null) {
47 logger.debug("NATIVE Found a cached handle for table " + tableName);
48 return table;
49 }
50 try {
51 logger.debug("NATIVE Looking for a handle of table: " + tableName);
52 HBaseAdmin admin = new HBaseAdmin(this.getHbcfg());
53 HTableDescriptor[] resources = admin.listTables(tableName);
54 Preconditions.checkElementIndex(0, resources.length, "no table " + tableName + " found");
55 admin.close();
56 table = new HTable(this.getHbcfg(), tableName);
57 } catch (IOException e) {
58 logger.error("NATIVE Error while trying to obtain table: " + tableName);
59 logger.error(e.getMessage());
60 };
61 tableNameHandleMap.put(tableName, table);
62 logger.debug("NATIVE Cached a handle of table: " + tableName);
63 return table;
64 }
65
66
67
68
69 @Override
70 public void commit() {
71 }
72
73
74
75
76 @Override
77 public void rollback() {
78 }
79
80
81
82
83
84
85
86 @Override
87 public ResultScanner getScanner(String tableName, Scan scan) {
88 logger.debug("NATIVE Begin of getScanner(" + tableName + ", " + scan + ")");
89 HTable tab = (HTable) getTable(tableName);
90 ResultScanner resScanner = null;
91 try {
92 resScanner = tab.getScanner(scan);
93 } catch (IOException e) {
94 logger.error("Error while trying to obtain a ResultScanner: " + tableName);
95 logger.error(e.getMessage());
96 }
97 return resScanner;
98 }
99
100
101
102
103 @Override
104 public void processPut(Record record, String tabName,
105 String famName, String quaName) {
106 logger.debug("NATIVE Begin processPut(" + record + ", " + tabName + ")");
107 HTable hTable = getTable(tabName);
108 try {
109 Put op = createPut(record, tabName, famName, quaName);
110 hTable.put(op);
111 } catch (IOException e) {
112 logger.error("Error while attempting to perform operations at HBaseDataTransactions.");
113 logger.error(e.getMessage());
114 }
115 }
116
117
118
119
120 @Override
121 public void processDelete(URI id, String tabName,
122 String famName, String quaName) {
123 logger.debug("NATIVE Begin processDelete(" + id + ", " + tabName + ")");
124 HTable hTable = getTable(tabName);
125 try {
126 Delete op = createDelete(id, tabName);
127 hTable.delete(op);
128 } catch (IOException e) {
129 logger.error("Error while attempting to perform operations at HBaseDataTransactions.");
130 logger.error(e.getMessage());
131 }
132 }
133
134
135
136
137
138
139
140 @Override
141 public Record get(String tableName, URI id) throws IOException {
142 logger.debug("NATIVE Begin of get(" + tableName + ", " + id + ")");
143 HTable selTable = getTable(tableName);
144 Record resGotten = null;
145 if (selTable != null) {
146
147 Get get = new Get(Bytes.toBytes(id.toString()));
148 Result rs = selTable.get(get);
149 logger.debug("Value obtained: " + new String(rs.value()));
150 final AvroSerializer serializer = getSerializer();
151 resGotten = (Record) serializer.fromBytes(rs.value());
152 }
153 return resGotten;
154 }
155
156 @Override
157 public List<Record> get(String tableName,
158 List<URI> ids) throws IOException {
159 logger.debug("NATIVE Begin of get(" + tableName + ", " + ids + ")");
160 HTable selTable = getTable(tableName);
161 List<Record> resGotten = new ArrayList<Record> ();
162 List<Get> gets = new ArrayList<Get> ();
163 AvroSerializer serializer = getSerializer();
164
165 for (URI id : ids) {
166 gets.add(new Get(Bytes.toBytes(id.toString())));
167 }
168 Result[] results = selTable.get(gets);
169
170
171 for (Result res : results) {
172 final byte[] bytes = res.value();
173 if (bytes != null) {
174 resGotten.add((Record) serializer.fromBytes(bytes));
175 }
176 }
177 return resGotten;
178 }
179
180
181
182
183
184
185 @Override
186 public Put createPut(Record record, String tableName,
187 String famName, String quaName) throws IOException {
188 HTable hTable = getTable(tableName);
189 Put put = null;
190 if (hTable != null) {
191
192 AvroSerializer serializer = getSerializer();
193 final byte[] bytes = serializer.toBytes(record);
194
195 put = new Put(Bytes.toBytes(record.getID().toString()));
196
197 put.add(Bytes.toBytes(famName), Bytes.toBytes(quaName), bytes);
198 }
199 return put;
200 }
201
202
203
204
205
206
207
208
209
210 @Override
211 public Delete createDelete(URI id, String tableName) throws IOException {
212 Delete del = null;
213 HTable hTable = getTable(tableName);
214 if (hTable != null) {
215 del = new Delete(Bytes.toBytes(id.toString()));
216 }
217 return del;
218 }
219
220
221
222
223
224
225 @Override
226 public List<Object> checkForErrors(Object[] objs) {
227 List<Object> errors = new ArrayList<Object>();
228 if (objs != null) {
229 for (int cont = 0; cont < objs.length; cont ++) {
230 if (objs[cont] == null) {
231 logger.debug("A operation could not be performed.");
232 errors.add(objs[cont]);
233 }
234 }
235 }
236 return errors;
237 }
238
239
240
241
242
243
244
245
246 @Override
247 public long count(String tableName, String familyName, XPath condition) throws IOException {
248 logger.debug("NATIVE Begin count");
249
250 org.apache.hadoop.conf.Configuration customConf = new org.apache.hadoop.conf.Configuration(super.getHbcfg());
251
252 customConf.setLong("hbase.rpc.timeout", 600000);
253
254 customConf.setLong("hbase.client.scanner.caching", 1000);
255
256
257
258
259
260
261
262 AggregationClient agClient = new AggregationClient(customConf);
263 long rowCount = 0;
264 byte[] tName = Bytes.toBytes(tableName);
265 try {
266 Scan scan = getScan(tableName, familyName);
267 rowCount = agClient.rowCount(tName, null, scan);
268 } catch (Throwable e) {
269 throw new IOException(e.toString());
270 }
271 return rowCount;
272 }
273
274 @Override
275 public void begin() {
276 }
277 }