1 package eu.fbk.knowledgestore.datastore.hbase.utils;
2
3 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.HBASE_REGION_MEMSTORE_FLUSH_SIZE;
4 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.HBASE_REGION_NRESERVATION_BLOCKS;
5 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.OMID_REGION_MEMSTORE_FLUSH_SIZE_OPT;
6 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.OMID_REGION_NRESERVATION_BLOCKS_OPT;
7 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.OMID_TSO_DEFAULT_HOST_OPT;
8 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.OMID_TSO_DEFAULT_PORT_OPT;
9 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.OMID_TSO_HOST;
10 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.OMID_TSO_PORT;
11
12 import java.io.IOException;
13 import java.util.ArrayList;
14 import java.util.HashMap;
15 import java.util.List;
16 import java.util.Map;
17 import java.util.Properties;
18
19 import javax.annotation.Nullable;
20
21 import com.yahoo.omid.transaction.RollbackException;
22 import com.yahoo.omid.transaction.TTable;
23 import com.yahoo.omid.transaction.Transaction;
24 import com.yahoo.omid.transaction.TransactionException;
25 import com.yahoo.omid.transaction.TransactionManager;
26
27 import org.apache.hadoop.hbase.client.Delete;
28 import org.apache.hadoop.hbase.client.Get;
29 import org.apache.hadoop.hbase.client.Put;
30 import org.apache.hadoop.hbase.client.Result;
31 import org.apache.hadoop.hbase.client.ResultScanner;
32 import org.apache.hadoop.hbase.client.Scan;
33 import org.apache.hadoop.hbase.util.Bytes;
34 import org.openrdf.model.URI;
35
36 import eu.fbk.knowledgestore.data.Record;
37 import eu.fbk.knowledgestore.data.Stream;
38 import eu.fbk.knowledgestore.data.XPath;
39 import eu.fbk.knowledgestore.datastore.hbase.HBaseScanIterator;
40 import eu.fbk.knowledgestore.runtime.DataCorruptedException;
41
42
43
44
45 public class OmidHBaseUtils extends AbstractHBaseUtils {
46
47
48 private static TransactionManager tranManager;
49
50
51 private Transaction t1;
52
53
54 private static Map<String, TTable> tableNameHandleMap = new HashMap<String, TTable>();
55
56
57
58
59
60
61
62 public OmidHBaseUtils(final Properties properties) {
63
64 super(properties);
65
66 getHbcfg().setInt(
67 HBASE_REGION_MEMSTORE_FLUSH_SIZE,
68 Integer.parseInt(properties.getProperty(HBASE_REGION_MEMSTORE_FLUSH_SIZE, ""
69 + OMID_REGION_MEMSTORE_FLUSH_SIZE_OPT)));
70
71 getHbcfg().setInt(
72 HBASE_REGION_NRESERVATION_BLOCKS,
73 Integer.parseInt(properties.getProperty(HBASE_REGION_NRESERVATION_BLOCKS, ""
74 + OMID_REGION_NRESERVATION_BLOCKS_OPT)));
75
76 getHbcfg().set(OMID_TSO_HOST,
77 properties.getProperty(OMID_TSO_HOST, OMID_TSO_DEFAULT_HOST_OPT));
78
79 getHbcfg().setInt(
80 OMID_TSO_PORT,
81 Integer.parseInt(properties.getProperty(OMID_TSO_PORT, ""
82 + OMID_TSO_DEFAULT_PORT_OPT)));
83
84
85 try {
86 tranManager = new TransactionManager(this.getHbcfg());
87 } catch (IOException e) {
88 logger.error("Error trying to create a TransactionManager of OMID.");
89 logger.error(e.getMessage());
90 }
91 }
92
93
94
95
96 @Override
97 public void commit() throws DataCorruptedException, IOException, IllegalStateException{
98 try {
99 tranManager.commit(t1);
100 } catch (RollbackException e) {
101 rollback();
102 throw new IOException("Error trying to commit transaction.", e);
103 } catch (TransactionException e) {
104 rollback();
105 throw new IOException("Error trying to commit transaction.", e);
106 }
107 }
108
109
110
111
112 @Override
113 public void rollback() throws DataCorruptedException, IOException, IllegalStateException{
114 try {
115 tranManager.rollback(t1);
116 } catch (Exception e) {
117 throw new DataCorruptedException("Error trying to rollback a Transaction.", e);
118 }
119 }
120
121
122
123
124
125
126 @Override
127 public Object getTable(String tableName) {
128 logger.debug("OMID Begin of getTable for " + tableName);
129 TTable table = tableNameHandleMap.get(tableName);
130 if (table != null) {
131 logger.debug("OMIDE Found a cached handle for table " + tableName);
132 return table;
133 }
134 try {
135 table = new TTable(this.getHbcfg(), tableName);
136 } catch (IOException e) {
137 logger.error("OMID Error trying to get a TransactionTable of OMID.");
138 logger.error(e.getMessage());
139 }
140 tableNameHandleMap.put(tableName, table);
141 logger.debug("OMID Cached a handle of table: " + tableName);
142 return table;
143 }
144
145 @Override
146 public void processPut(Record record, String tabName,
147 String famName, String quaName) {
148 logger.debug("OMID Begin processPut(" + record + ", " + tabName + ")");
149 TTable tTable = (TTable) getTable(tabName);
150 Put op = null;
151 try {
152 op = createPut(record, tabName, famName, quaName);
153 tTable.put(t1, op);
154
155 } catch (IllegalArgumentException e) {
156
157 logger.error("Error while attempting to perform operations at HBaseDataTransactions.");
158 logger.error(e.getMessage());
159 } catch (IOException e) {
160
161 logger.error("Error while attempting to perform operations at HBaseDataTransactions.");
162 logger.error(e.getMessage());
163 }
164 }
165
166 @Override
167 public void begin() {
168 try {
169 t1 = tranManager.begin();
170 } catch (TransactionException e) {
171 logger.error("Error while attempting to perform operations at HBaseDataTransactions.");
172 logger.error(e.getMessage());
173 }
174 }
175
176 @Override
177 public void processDelete(URI id, String tabName,
178 String famName, String quaName) {
179 logger.debug("OMID Begin processDelete(" + id + ", " + tabName + ")");
180 TTable tTable = (TTable) getTable(tabName);
181 Delete op = null;
182 try {
183 op = createDelete(id, tabName);
184 tTable.delete(t1, op);
185 } catch (IllegalArgumentException e) {
186 logger.error("Error while attempting to perform operations at HBaseDataTransactions.");
187 logger.error(e.getMessage());
188 } catch (IOException e) {
189 logger.error("Error while attempting to perform operations at HBaseDataTransactions.");
190 logger.error(e.getMessage());
191 }
192 }
193
194
195
196
197
198
199
200 @Override
201 public Record get(String tableName, URI id) throws IOException {
202 logger.debug("OMID Begin of get(" + tableName + ", " + id + ")");
203 TTable tTable = (TTable) getTable(tableName);
204 Record resGotten = null;
205 if (tTable != null) {
206
207 Get get = new Get(Bytes.toBytes(id.toString())).setMaxVersions(1);
208 Result rs = tTable.get(t1, get);
209 logger.debug("Value obtained: " + new String(rs.value()));
210 final AvroSerializer serializer = getSerializer();
211 resGotten = (Record) serializer.fromBytes(rs.value());
212 }
213 return resGotten;
214 }
215
216 @Override
217 public List<Record> get(String tableName,
218 List<URI> ids) throws IOException {
219 logger.debug("OMID Begin of get(" + tableName + ", " + ids + ")");
220 TTable tTable = (TTable)getTable(tableName);
221 List<Record> resGotten = new ArrayList<Record> ();
222 List<Get> gets = new ArrayList<Get> ();
223 AvroSerializer serializer = getSerializer();
224
225 for (URI id : ids) {
226 gets.add(new Get(Bytes.toBytes(id.toString())));
227 }
228
229 Result[] results = tTable.get(t1, gets);
230
231 for (Result res : results) {
232 final byte[] bytes = res.value();
233 if (bytes != null) {
234 resGotten.add((Record) serializer.fromBytes(bytes));
235 }
236 }
237 return resGotten;
238 }
239
240 @Override
241 public List<Object> checkForErrors(Object[] objs) {
242 return new ArrayList<Object>();
243 }
244
245
246
247
248
249
250
251
252
253
254
255
256 @Override
257 public long count(final String tableName, final String familyName,
258 @Nullable final XPath condition) throws IOException {
259 logger.debug("OMID Begin count");
260
261 final Stream<Record> cur = Stream.create(new HBaseScanIterator(this, tableName,
262 familyName, condition, null, getServerFilterFlag()));
263 try {
264 return cur.count();
265 } finally {
266 cur.close();
267 }
268 }
269
270
271
272
273
274
275
276 @Override
277 public ResultScanner getScanner(String tableName, Scan scan) {
278 logger.debug("OMID Begin of getScanner(" + tableName + ", " + scan + ")");
279 TTable tTable = (TTable)getTable(tableName);
280 ResultScanner resScanner = null;
281 try {
282 resScanner = tTable.getScanner(t1, scan);
283 } catch (IOException e) {
284 logger.error("Error while trying to obtain a ResultScanner: " + tableName);
285 logger.error(e.getMessage());
286 }
287 return resScanner;
288 }
289 }