1 package eu.fbk.knowledgestore.datastore.hbase.utils;
2
3 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_CON_TAB_NAME;
4 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_ENT_TAB_NAME;
5 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_MEN_TAB_NAME;
6 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_RES_TAB_NAME;
7 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_USR_TAB_NAME;
8
9 import java.io.IOException;
10 import java.util.ArrayList;
11 import java.util.Arrays;
12 import java.util.HashMap;
13 import java.util.List;
14 import java.util.Map;
15 import java.util.Properties;
16
17 import javax.annotation.Nullable;
18
19 import com.continuuity.tephra.TransactionContext;
20 import com.continuuity.tephra.TransactionFailureException;
21 import com.continuuity.tephra.TransactionSystemClient;
22 import com.continuuity.tephra.hbase.TransactionAwareHTable;
23 import com.continuuity.tephra.inmemory.InMemoryTxSystemClient;
24 import com.continuuity.tephra.inmemory.InMemoryTransactionManager;
25
26 import org.apache.hadoop.hbase.client.Delete;
27 import org.apache.hadoop.hbase.client.Get;
28 import org.apache.hadoop.hbase.client.HTable;
29 import org.apache.hadoop.hbase.client.Put;
30 import org.apache.hadoop.hbase.client.Result;
31 import org.apache.hadoop.hbase.client.ResultScanner;
32 import org.apache.hadoop.hbase.client.Scan;
33 import org.apache.hadoop.hbase.util.Bytes;
34 import org.openrdf.model.URI;
35
36 import eu.fbk.knowledgestore.data.Record;
37 import eu.fbk.knowledgestore.data.Stream;
38 import eu.fbk.knowledgestore.data.XPath;
39 import eu.fbk.knowledgestore.datastore.hbase.HBaseScanIterator;
40 import eu.fbk.knowledgestore.runtime.DataCorruptedException;
41
42
43
44
45 public class TephraHBaseUtils extends AbstractHBaseUtils
46 {
47
48
49 private TransactionSystemClient txClient;
50
51
52 private final TransactionContext txContext;
53
54
55 private static Map<String, TransactionAwareHTable> tableNameHandleMap = new HashMap<String, TransactionAwareHTable>();
56
57
58
59
60
61
62
63 public TephraHBaseUtils(final Properties properties)
64 {
65
66 super(properties);
67
68 final String TEPHRA_HOST = "data.tx.bind.address";
69 final String TEPHRA_PORT = "data.tx.bind.port";
70
71 getHbcfg().set(TEPHRA_HOST, "escher");
72 getHbcfg().setInt(TEPHRA_PORT, 15165);
73
74
75 final List<TransactionAwareHTable> txTabList = new ArrayList<TransactionAwareHTable>(5);
76 try {
77 logger.debug("TEPHRA create the 5 TransactionAwareHTable tables");
78 final List<String> tableNames = Arrays.asList(DEFAULT_RES_TAB_NAME,
79 DEFAULT_MEN_TAB_NAME, DEFAULT_ENT_TAB_NAME, DEFAULT_CON_TAB_NAME,
80 DEFAULT_USR_TAB_NAME);
81 HTable hTab;
82 TransactionAwareHTable txTab;
83 for (final String tableName : tableNames) {
84 hTab = new HTable(this.getHbcfg(), tableName);
85 txTab = new TransactionAwareHTable(hTab);
86 tableNameHandleMap.put(tableName, txTab);
87 txTabList.add(txTab);
88 logger.debug("TEPHRA Cached a handle of table: " + tableName);
89 }
90 } catch (final IOException e) {
91 logger.error("TEPHRA Error while creating 5 TransactionAwareHTable tables");
92 logger.error(e.getMessage());
93 }
94
95
96
97 this.txClient = new InMemoryTxSystemClient(new InMemoryTransactionManager(this.getHbcfg()));
98 this.txContext = new TransactionContext(txClient,
99 txTabList.toArray(new TransactionAwareHTable[0]));
100 }
101
102
103
104
105 @Override
106 public void commit() throws DataCorruptedException, IOException, IllegalStateException
107 {
108 try {
109 this.txContext.finish();
110 } catch (final TransactionFailureException tfe1) {
111 try {
112 this.txContext.abort();
113 } catch (final TransactionFailureException tfe2) {
114 }
115 throw new IOException("Error trying to commit transaction.", tfe1);
116 }
117 }
118
119
120
121
122 @Override
123 public void rollback() throws DataCorruptedException, IOException, IllegalStateException
124 {
125 try {
126 this.txContext.abort();
127 } catch (final Exception e) {
128 throw new DataCorruptedException("Error trying to rollback a Transaction.", e);
129 }
130 }
131
132
133
134
135
136
137
138
139 @Override
140 public Object getTable(final String tableName)
141 {
142 logger.debug("TEPHRA Begin of getTable for " + tableName);
143 final TransactionAwareHTable table = tableNameHandleMap.get(tableName);
144 if (table == null) {
145
146 logger.error("TEPHRA error: unebale to find cached table " + tableName);
147 return table;
148 } else {
149 logger.debug("TEPHRA Found a cached handle for table " + tableName);
150 return table;
151 }
152 }
153
154 @Override
155 public void processPut(final Record record, final String tabName, final String famName,
156 final String quaName)
157 {
158 logger.debug("TEPHRA Begin processPut(" + record + ", " + tabName + ")");
159 final TransactionAwareHTable txTable = (TransactionAwareHTable) getTable(tabName);
160 Put op = null;
161 try {
162 op = createPut(record, tabName, famName, quaName);
163 txTable.put(op);
164
165 } catch (final IllegalArgumentException e) {
166
167 logger.error("Error while attempting to perform operations at HBaseDataTransactions.");
168 logger.error(e.getMessage());
169 } catch (final IOException e) {
170
171 logger.error("Error while attempting to perform operations at HBaseDataTransactions.");
172 logger.error(e.getMessage());
173 }
174 }
175
176 @Override
177 public void begin()
178 {
179 try {
180 this.txContext.start();
181 } catch (final TransactionFailureException e) {
182 logger.error("Error while attempting to perform operations at HBaseDataTransactions.");
183 logger.error(e.getMessage());
184 }
185 }
186
187 @Override
188 public void processDelete(final URI id, final String tabName, final String famName,
189 final String quaName)
190 {
191 logger.debug("TEPHRA Begin processDelete(" + id + ", " + tabName + ")");
192 final TransactionAwareHTable txTable = (TransactionAwareHTable) getTable(tabName);
193 Delete op = null;
194 try {
195 op = createDelete(id, tabName);
196 txTable.delete(op);
197 } catch (final IllegalArgumentException e) {
198 logger.error("Error while attempting to perform operations at HBaseDataTransactions.");
199 logger.error(e.getMessage());
200 } catch (final IOException e) {
201 logger.error("Error while attempting to perform operations at HBaseDataTransactions.");
202 logger.error(e.getMessage());
203 }
204 }
205
206
207
208
209
210
211
212
213
214
215 @Override
216 public Record get(final String tableName, final URI id) throws IOException
217 {
218 logger.debug("TEPHRA Begin of get(" + tableName + ", " + id + ")");
219 final TransactionAwareHTable txTable = (TransactionAwareHTable) getTable(tableName);
220 Record resGotten = null;
221 if (txTable != null) {
222
223 final Get get = new Get(Bytes.toBytes(id.toString())).setMaxVersions(1);
224 final Result rs = txTable.get(get);
225 logger.debug("Value obtained: " + new String(rs.value()));
226 final AvroSerializer serializer = getSerializer();
227 resGotten = (Record) serializer.fromBytes(rs.value());
228 }
229 return resGotten;
230 }
231
232 @Override
233 public List<Record> get(final String tableName, final List<URI> ids) throws IOException
234 {
235 logger.debug("TEPHRA Begin of get(" + tableName + ", " + ids + ")");
236 final TransactionAwareHTable txTable = (TransactionAwareHTable) getTable(tableName);
237 final List<Record> resGotten = new ArrayList<Record>();
238 final List<Get> gets = new ArrayList<Get>();
239 final AvroSerializer serializer = getSerializer();
240
241 for (final URI id : ids) {
242 gets.add(new Get(Bytes.toBytes(id.toString())));
243 }
244
245 final Result[] results = txTable.get(gets);
246
247 for (final Result res : results) {
248 final byte[] bytes = res.value();
249 if (bytes != null) {
250 resGotten.add((Record) serializer.fromBytes(bytes));
251 }
252 }
253 return resGotten;
254 }
255
256 @Override
257 public List<Object> checkForErrors(final Object[] objs)
258 {
259 return new ArrayList<Object>();
260 }
261
262
263
264
265
266
267
268
269
270
271
272
273 @Override
274 public long count(final String tableName, final String familyName,
275 @Nullable final XPath condition) throws IOException
276 {
277 logger.debug("TEPHRA Begin count");
278
279 final Stream<Record> cur = Stream.create(new HBaseScanIterator(this, tableName,
280 familyName, condition, null, getServerFilterFlag()));
281 try {
282 return cur.count();
283 } finally {
284 cur.close();
285 }
286 }
287
288
289
290
291
292
293
294
295
296
297
298 @Override
299 public ResultScanner getScanner(final String tableName, final Scan scan)
300 {
301 logger.debug("TEPHRA Begin of getScanner(" + tableName + ", " + scan + ")");
302 final TransactionAwareHTable txTable = (TransactionAwareHTable) getTable(tableName);
303 ResultScanner resScanner = null;
304 try {
305 resScanner = txTable.getScanner(scan);
306 } catch (final IOException e) {
307 logger.error("Error while trying to obtain a ResultScanner: " + tableName);
308 logger.error(e.getMessage());
309 }
310 return resScanner;
311 }
312 }