1   package eu.fbk.knowledgestore.datastore.hbase.utils;
2   
3   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_CON_TAB_NAME;
4   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_ENT_TAB_NAME;
5   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_MEN_TAB_NAME;
6   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_RES_TAB_NAME;
7   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_USR_TAB_NAME;
8   
9   import java.io.IOException;
10  import java.util.ArrayList;
11  import java.util.Arrays;
12  import java.util.HashMap;
13  import java.util.List;
14  import java.util.Map;
15  import java.util.Properties;
16  
17  import javax.annotation.Nullable;
18  
19  import com.continuuity.tephra.TransactionContext;
20  import com.continuuity.tephra.TransactionFailureException;
21  import com.continuuity.tephra.TransactionSystemClient;
22  import com.continuuity.tephra.hbase.TransactionAwareHTable;
23  import com.continuuity.tephra.inmemory.InMemoryTxSystemClient;
24  import com.continuuity.tephra.inmemory.InMemoryTransactionManager;
25  
26  import org.apache.hadoop.hbase.client.Delete;
27  import org.apache.hadoop.hbase.client.Get;
28  import org.apache.hadoop.hbase.client.HTable;
29  import org.apache.hadoop.hbase.client.Put;
30  import org.apache.hadoop.hbase.client.Result;
31  import org.apache.hadoop.hbase.client.ResultScanner;
32  import org.apache.hadoop.hbase.client.Scan;
33  import org.apache.hadoop.hbase.util.Bytes;
34  import org.openrdf.model.URI;
35  
36  import eu.fbk.knowledgestore.data.Record;
37  import eu.fbk.knowledgestore.data.Stream;
38  import eu.fbk.knowledgestore.data.XPath;
39  import eu.fbk.knowledgestore.datastore.hbase.HBaseScanIterator;
40  import eu.fbk.knowledgestore.runtime.DataCorruptedException;
41  
42  
43  
44  
45  public class TephraHBaseUtils extends AbstractHBaseUtils
46  {
47  
48      
49      private TransactionSystemClient txClient;
50  
51      
52      private final TransactionContext txContext;
53  
54      
55      private static Map<String, TransactionAwareHTable> tableNameHandleMap = new HashMap<String, TransactionAwareHTable>();
56  
57      
58  
59  
60  
61  
62  
63      public TephraHBaseUtils(final Properties properties)
64      {
65          
66          super(properties);
67  
68          final String TEPHRA_HOST = "data.tx.bind.address";
69          final String TEPHRA_PORT = "data.tx.bind.port";
70  
71          getHbcfg().set(TEPHRA_HOST, "escher");
72          getHbcfg().setInt(TEPHRA_PORT, 15165);
73  
74          
75          final List<TransactionAwareHTable> txTabList = new ArrayList<TransactionAwareHTable>(5);
76          try {
77              logger.debug("TEPHRA create the 5 TransactionAwareHTable tables");
78              final List<String> tableNames = Arrays.asList(DEFAULT_RES_TAB_NAME,
79                      DEFAULT_MEN_TAB_NAME, DEFAULT_ENT_TAB_NAME, DEFAULT_CON_TAB_NAME,
80                      DEFAULT_USR_TAB_NAME);
81              HTable hTab;
82              TransactionAwareHTable txTab;
83              for (final String tableName : tableNames) {
84                  hTab = new HTable(this.getHbcfg(), tableName);
85                  txTab = new TransactionAwareHTable(hTab);
86                  tableNameHandleMap.put(tableName, txTab);
87                  txTabList.add(txTab);
88                  logger.debug("TEPHRA Cached a handle of table: " + tableName);
89              }
90          } catch (final IOException e) {
91              logger.error("TEPHRA Error while creating 5 TransactionAwareHTable tables");
92              logger.error(e.getMessage());
93          }
94  
95  
96          
97  	this.txClient = new InMemoryTxSystemClient(new InMemoryTransactionManager(this.getHbcfg()));
98          this.txContext = new TransactionContext(txClient,
99  						txTabList.toArray(new TransactionAwareHTable[0]));
100     }
101 
102     
103 
104 
105     @Override
106     public void commit() throws DataCorruptedException, IOException, IllegalStateException
107     {
108         try {
109             this.txContext.finish();
110         } catch (final TransactionFailureException tfe1) {
111             try {
112                 this.txContext.abort();
113             } catch (final TransactionFailureException tfe2) {
114             }
115             throw new IOException("Error trying to commit transaction.", tfe1);
116         }
117     }
118 
119     
120 
121 
122     @Override
123     public void rollback() throws DataCorruptedException, IOException, IllegalStateException
124     {
125         try {
126             this.txContext.abort();
127         } catch (final Exception e) {
128             throw new DataCorruptedException("Error trying to rollback a Transaction.", e);
129         }
130     }
131 
132     
133 
134 
135 
136 
137 
138 
139     @Override
140     public Object getTable(final String tableName)
141     {
142         logger.debug("TEPHRA Begin of getTable for " + tableName);
143         final TransactionAwareHTable table = tableNameHandleMap.get(tableName);
144         if (table == null) {
145             
146             logger.error("TEPHRA error: unebale to find cached table " + tableName);
147             return table;
148         } else {
149             logger.debug("TEPHRA Found a cached handle for table " + tableName);
150             return table;
151         }
152     }
153 
154     @Override
155     public void processPut(final Record record, final String tabName, final String famName,
156             final String quaName)
157     {
158         logger.debug("TEPHRA Begin processPut(" + record + ", " + tabName + ")");
159         final TransactionAwareHTable txTable = (TransactionAwareHTable) getTable(tabName);
160         Put op = null;
161         try {
162             op = createPut(record, tabName, famName, quaName);
163             txTable.put(op);
164             
165         } catch (final IllegalArgumentException e) {
166             
167             logger.error("Error while attempting to perform operations at HBaseDataTransactions.");
168             logger.error(e.getMessage());
169         } catch (final IOException e) {
170             
171             logger.error("Error while attempting to perform operations at HBaseDataTransactions.");
172             logger.error(e.getMessage());
173         }
174     }
175 
176     @Override
177     public void begin()
178     {
179         try {
180             this.txContext.start();
181         } catch (final TransactionFailureException e) {
182             logger.error("Error while attempting to perform operations at HBaseDataTransactions.");
183             logger.error(e.getMessage());
184         }
185     }
186 
187     @Override
188     public void processDelete(final URI id, final String tabName, final String famName,
189             final String quaName)
190     {
191         logger.debug("TEPHRA Begin processDelete(" + id + ", " + tabName + ")");
192         final TransactionAwareHTable txTable = (TransactionAwareHTable) getTable(tabName);
193         Delete op = null;
194         try {
195             op = createDelete(id, tabName);
196             txTable.delete(op);
197         } catch (final IllegalArgumentException e) {
198             logger.error("Error while attempting to perform operations at HBaseDataTransactions.");
199             logger.error(e.getMessage());
200         } catch (final IOException e) {
201             logger.error("Error while attempting to perform operations at HBaseDataTransactions.");
202             logger.error(e.getMessage());
203         }
204     }
205 
206     
207 
208 
209 
210 
211 
212 
213 
214 
215     @Override
216     public Record get(final String tableName, final URI id) throws IOException
217     {
218         logger.debug("TEPHRA Begin of get(" + tableName + ", " + id + ")");
219         final TransactionAwareHTable txTable = (TransactionAwareHTable) getTable(tableName);
220         Record resGotten = null;
221         if (txTable != null) {
222             
223             final Get get = new Get(Bytes.toBytes(id.toString())).setMaxVersions(1);
224             final Result rs = txTable.get(get);
225             logger.debug("Value obtained: " + new String(rs.value()));
226             final AvroSerializer serializer = getSerializer();
227             resGotten = (Record) serializer.fromBytes(rs.value());
228         }
229         return resGotten;
230     }
231 
232     @Override
233     public List<Record> get(final String tableName, final List<URI> ids) throws IOException
234     {
235         logger.debug("TEPHRA Begin of get(" + tableName + ", " + ids + ")");
236         final TransactionAwareHTable txTable = (TransactionAwareHTable) getTable(tableName);
237         final List<Record> resGotten = new ArrayList<Record>();
238         final List<Get> gets = new ArrayList<Get>();
239         final AvroSerializer serializer = getSerializer();
240 
241         for (final URI id : ids) {
242             gets.add(new Get(Bytes.toBytes(id.toString())));
243         }
244         
245         final Result[] results = txTable.get(gets);
246 
247         for (final Result res : results) {
248             final byte[] bytes = res.value();
249             if (bytes != null) {
250                 resGotten.add((Record) serializer.fromBytes(bytes));
251             }
252         }
253         return resGotten;
254     }
255 
256     @Override
257     public List<Object> checkForErrors(final Object[] objs)
258     {
259         return new ArrayList<Object>();
260     }
261 
262     
263 
264 
265 
266 
267 
268 
269 
270 
271 
272 
273     @Override
274     public long count(final String tableName, final String familyName,
275             @Nullable final XPath condition) throws IOException
276     {
277         logger.debug("TEPHRA Begin count");
278         
279         final Stream<Record> cur = Stream.create(new HBaseScanIterator(this, tableName,
280                 familyName, condition, null, getServerFilterFlag()));
281         try {
282             return cur.count();
283         } finally {
284             cur.close();
285         }
286     }
287 
288     
289 
290 
291 
292 
293 
294 
295 
296 
297 
298     @Override
299     public ResultScanner getScanner(final String tableName, final Scan scan)
300     {
301         logger.debug("TEPHRA Begin of getScanner(" + tableName + ", " + scan + ")");
302         final TransactionAwareHTable txTable = (TransactionAwareHTable) getTable(tableName);
303         ResultScanner resScanner = null;
304         try {
305             resScanner = txTable.getScanner(scan);
306         } catch (final IOException e) {
307             logger.error("Error while trying to obtain a ResultScanner: " + tableName);
308             logger.error(e.getMessage());
309         }
310         return resScanner;
311     }
312 }