1   package eu.fbk.knowledgestore.datastore.hbase.utils;
2   
3   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_CON_TAB_NAME;
4   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_ENT_TAB_NAME;
5   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_MEN_TAB_NAME;
6   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_RES_TAB_NAME;
7   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_USR_TAB_NAME;
8   
9   import java.io.IOException;
10  import java.util.ArrayList;
11  import java.util.Arrays;
12  import java.util.HashMap;
13  import java.util.List;
14  import java.util.Map;
15  import java.util.Properties;
16  
17  import javax.annotation.Nullable;
18  
19  import com.continuuity.tephra.TransactionContext;
20  import com.continuuity.tephra.TransactionFailureException;
21  import com.continuuity.tephra.TransactionSystemClient;
22  import com.continuuity.tephra.hbase.TransactionAwareHTable;
23  import com.continuuity.tephra.inmemory.InMemoryTxSystemClient;
24  import com.continuuity.tephra.inmemory.InMemoryTransactionManager;
25  
26  import org.apache.hadoop.hbase.client.Delete;
27  import org.apache.hadoop.hbase.client.Get;
28  import org.apache.hadoop.hbase.client.HTable;
29  import org.apache.hadoop.hbase.client.Put;
30  import org.apache.hadoop.hbase.client.Result;
31  import org.apache.hadoop.hbase.client.ResultScanner;
32  import org.apache.hadoop.hbase.client.Scan;
33  import org.apache.hadoop.hbase.util.Bytes;
34  import org.openrdf.model.URI;
35  
36  import eu.fbk.knowledgestore.data.Record;
37  import eu.fbk.knowledgestore.data.Stream;
38  import eu.fbk.knowledgestore.data.XPath;
39  import eu.fbk.knowledgestore.datastore.hbase.HBaseScanIterator;
40  import eu.fbk.knowledgestore.runtime.DataCorruptedException;
41  
42  /**
43   * Implements HBase operations using Continuuity Tephra.
44   */
45  public class TephraHBaseUtils extends AbstractHBaseUtils
46  {
47  
48      /** Transaction client managed by Tephra */
49      private TransactionSystemClient txClient;
50  
51      /** Transaction context managed by Tephra */
52      private final TransactionContext txContext;
53  
54      /** The map tableName -> table handle */
55      private static Map<String, TransactionAwareHTable> tableNameHandleMap = new HashMap<String, TransactionAwareHTable>();
56  
57      /**
58       * Constructor.
59       * 
60       * @param properties
61       *            the configuration properties
62       */
63      public TephraHBaseUtils(final Properties properties)
64      {
65          // setting basic configuration inside parent class.
66          super(properties);
67  
68          final String TEPHRA_HOST = "data.tx.bind.address";
69          final String TEPHRA_PORT = "data.tx.bind.port";
70  
71          getHbcfg().set(TEPHRA_HOST, "escher");
72          getHbcfg().setInt(TEPHRA_PORT, 15165);
73  
74          // create and cache all the 5 TransactionAwareHTable tables (one for each HBase table)
75          final List<TransactionAwareHTable> txTabList = new ArrayList<TransactionAwareHTable>(5);
76          try {
77              logger.debug("TEPHRA create the 5 TransactionAwareHTable tables");
78              final List<String> tableNames = Arrays.asList(DEFAULT_RES_TAB_NAME,
79                      DEFAULT_MEN_TAB_NAME, DEFAULT_ENT_TAB_NAME, DEFAULT_CON_TAB_NAME,
80                      DEFAULT_USR_TAB_NAME);
81              HTable hTab;
82              TransactionAwareHTable txTab;
83              for (final String tableName : tableNames) {
84                  hTab = new HTable(this.getHbcfg(), tableName);
85                  txTab = new TransactionAwareHTable(hTab);
86                  tableNameHandleMap.put(tableName, txTab);
87                  txTabList.add(txTab);
88                  logger.debug("TEPHRA Cached a handle of table: " + tableName);
89              }
90          } catch (final IOException e) {
91              logger.error("TEPHRA Error while creating 5 TransactionAwareHTable tables");
92              logger.error(e.getMessage());
93          }
94  
95  
96          // Creating transaction client and context
97  	this.txClient = new InMemoryTxSystemClient(new InMemoryTransactionManager(this.getHbcfg()));
98          this.txContext = new TransactionContext(txClient,
99  						txTabList.toArray(new TransactionAwareHTable[0]));
100     }
101 
102     /**
103      * Commits work done.
104      */
105     @Override
106     public void commit() throws DataCorruptedException, IOException, IllegalStateException
107     {
108         try {
109             this.txContext.finish();
110         } catch (final TransactionFailureException tfe1) {
111             try {
112                 this.txContext.abort();
113             } catch (final TransactionFailureException tfe2) {
114             }
115             throw new IOException("Error trying to commit transaction.", tfe1);
116         }
117     }
118 
119     /**
120      * Rollbacks work done.
121      */
122     @Override
123     public void rollback() throws DataCorruptedException, IOException, IllegalStateException
124     {
125         try {
126             this.txContext.abort();
127         } catch (final Exception e) {
128             throw new DataCorruptedException("Error trying to rollback a Transaction.", e);
129         }
130     }
131 
132     /**
133      * Gets a handle of a specific table.
134      * 
135      * @param tableName
136      *            of the table to be accessed.
137      * @return HTable of the table found.
138      */
139     @Override
140     public Object getTable(final String tableName)
141     {
142         logger.debug("TEPHRA Begin of getTable for " + tableName);
143         final TransactionAwareHTable table = tableNameHandleMap.get(tableName);
144         if (table == null) {
145             // TODO propagate exceptions
146             logger.error("TEPHRA error: unebale to find cached table " + tableName);
147             return table;
148         } else {
149             logger.debug("TEPHRA Found a cached handle for table " + tableName);
150             return table;
151         }
152     }
153 
154     @Override
155     public void processPut(final Record record, final String tabName, final String famName,
156             final String quaName)
157     {
158         logger.debug("TEPHRA Begin processPut(" + record + ", " + tabName + ")");
159         final TransactionAwareHTable txTable = (TransactionAwareHTable) getTable(tabName);
160         Put op = null;
161         try {
162             op = createPut(record, tabName, famName, quaName);
163             txTable.put(op);
164             // TODO rollback if there is an exception
165         } catch (final IllegalArgumentException e) {
166             // TODO propagate exceptions
167             logger.error("Error while attempting to perform operations at HBaseDataTransactions.");
168             logger.error(e.getMessage());
169         } catch (final IOException e) {
170             // TODO propagate exceptions
171             logger.error("Error while attempting to perform operations at HBaseDataTransactions.");
172             logger.error(e.getMessage());
173         }
174     }
175 
176     @Override
177     public void begin()
178     {
179         try {
180             this.txContext.start();
181         } catch (final TransactionFailureException e) {
182             logger.error("Error while attempting to perform operations at HBaseDataTransactions.");
183             logger.error(e.getMessage());
184         }
185     }
186 
187     @Override
188     public void processDelete(final URI id, final String tabName, final String famName,
189             final String quaName)
190     {
191         logger.debug("TEPHRA Begin processDelete(" + id + ", " + tabName + ")");
192         final TransactionAwareHTable txTable = (TransactionAwareHTable) getTable(tabName);
193         Delete op = null;
194         try {
195             op = createDelete(id, tabName);
196             txTable.delete(op);
197         } catch (final IllegalArgumentException e) {
198             logger.error("Error while attempting to perform operations at HBaseDataTransactions.");
199             logger.error(e.getMessage());
200         } catch (final IOException e) {
201             logger.error("Error while attempting to perform operations at HBaseDataTransactions.");
202             logger.error(e.getMessage());
203         }
204     }
205 
206     /**
207      * Gets a Record based on information passed.
208      * 
209      * @param tableName
210      *            to do the get.
211      * @param id
212      *            of the Record needed
213      * @throws IOException
214      */
215     @Override
216     public Record get(final String tableName, final URI id) throws IOException
217     {
218         logger.debug("TEPHRA Begin of get(" + tableName + ", " + id + ")");
219         final TransactionAwareHTable txTable = (TransactionAwareHTable) getTable(tableName);
220         Record resGotten = null;
221         if (txTable != null) {
222             // Resource's Key
223             final Get get = new Get(Bytes.toBytes(id.toString())).setMaxVersions(1);
224             final Result rs = txTable.get(get);
225             logger.debug("Value obtained: " + new String(rs.value()));
226             final AvroSerializer serializer = getSerializer();
227             resGotten = (Record) serializer.fromBytes(rs.value());
228         }
229         return resGotten;
230     }
231 
232     @Override
233     public List<Record> get(final String tableName, final List<URI> ids) throws IOException
234     {
235         logger.debug("TEPHRA Begin of get(" + tableName + ", " + ids + ")");
236         final TransactionAwareHTable txTable = (TransactionAwareHTable) getTable(tableName);
237         final List<Record> resGotten = new ArrayList<Record>();
238         final List<Get> gets = new ArrayList<Get>();
239         final AvroSerializer serializer = getSerializer();
240 
241         for (final URI id : ids) {
242             gets.add(new Get(Bytes.toBytes(id.toString())));
243         }
244         // OMID does support the usage of a list of gets
245         final Result[] results = txTable.get(gets);
246 
247         for (final Result res : results) {
248             final byte[] bytes = res.value();
249             if (bytes != null) {
250                 resGotten.add((Record) serializer.fromBytes(bytes));
251             }
252         }
253         return resGotten;
254     }
255 
256     @Override
257     public List<Object> checkForErrors(final Object[] objs)
258     {
259         return new ArrayList<Object>();
260     }
261 
262     /**
263      * Gets a number of record of tableName matching condition
264      * 
265      * @param tableName
266      *            to scan
267      * @param condition
268      *            to match
269      * @param scan
270      *            to scan
271      * @throws IOException
272      */
273     @Override
274     public long count(final String tableName, final String familyName,
275             @Nullable final XPath condition) throws IOException
276     {
277         logger.debug("TEPHRA Begin count");
278         // VERY INEFFICIENT: to be improved!
279         final Stream<Record> cur = Stream.create(new HBaseScanIterator(this, tableName,
280                 familyName, condition, null, getServerFilterFlag()));
281         try {
282             return cur.count();
283         } finally {
284             cur.close();
285         }
286     }
287 
288     /**
289      * Gets a scanner for a specific table
290      * 
291      * @param tableName
292      *            to get the scanner from
293      * @param scan
294      *            for the specific table
295      * @param conf
296      *            object to get a hold of an HBase table
297      */
298     @Override
299     public ResultScanner getScanner(final String tableName, final Scan scan)
300     {
301         logger.debug("TEPHRA Begin of getScanner(" + tableName + ", " + scan + ")");
302         final TransactionAwareHTable txTable = (TransactionAwareHTable) getTable(tableName);
303         ResultScanner resScanner = null;
304         try {
305             resScanner = txTable.getScanner(scan);
306         } catch (final IOException e) {
307             logger.error("Error while trying to obtain a ResultScanner: " + tableName);
308             logger.error(e.getMessage());
309         }
310         return resScanner;
311     }
312 }