1   package eu.fbk.knowledgestore.datastore.hbase;
2   
3   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_CON_FAM_NAME;
4   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_CON_TAB_NAME;
5   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_ENT_FAM_NAME;
6   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_ENT_TAB_NAME;
7   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_MEN_FAM_NAME;
8   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_MEN_TAB_NAME;
9   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_RES_FAM_NAME;
10  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_RES_TAB_NAME;
11  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_USR_FAM_NAME;
12  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_USR_TAB_NAME;
13  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.HBASEDATASTORE_SERVERFILTERFLAG;
14  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.HBASEDATASTORE_TABLEPREFIX_DEFAULT;
15  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.HBASEDATASTORE_TABLEPREFIX_PROP;
16  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.URIDICT_RELATIVEPATH_DEFAULT;
17  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.URIDICT_RELATIVEPATH_PROP;
18  
19  import java.io.IOException;
20  import java.util.Properties;
21  
22  import org.apache.hadoop.fs.FileSystem;
23  import org.apache.hadoop.fs.Path;
24  import org.openrdf.model.URI;
25  import org.slf4j.Logger;
26  import org.slf4j.LoggerFactory;
27  
28  import eu.fbk.knowledgestore.data.Dictionary;
29  import eu.fbk.knowledgestore.datastore.DataStore;
30  import eu.fbk.knowledgestore.datastore.DataTransaction;
31  import eu.fbk.knowledgestore.datastore.hbase.utils.AbstractHBaseUtils;
32  import eu.fbk.knowledgestore.datastore.hbase.utils.AvroSerializer;
33  
34  /**
35   * HBaseDataStore used to read and write data into HBase specific tables e.g. Resources, Mentions,
36   * Entities.
37   */
38  public class HBaseDataStore implements DataStore {
39  
40      /** Hadoop Configuration object. */
41      public org.apache.hadoop.conf.Configuration hbaseCfg;
42  
43      /** Logger object used inside HdfsFileStore. */
44      private static Logger logger = LoggerFactory.getLogger(HBaseDataStore.class);
45  
46      /** flag to set where to perform filtering (default is on server-side) */
47      private boolean serverFilterFlag;
48          
49      /** Serializer used to transform record to byte arrays and back. */
50      private AvroSerializer serializer;
51  
52      /** hbase utilities hiding the modality (OMID or NATIVE) */
53      private AbstractHBaseUtils hbaseUtils;
54  
55      /** the FileSystem utilized to store additional data (e.g. the URI dictionary) */
56      private final FileSystem fileSystem;
57  
58      /** the prefix of table names */
59      private final String hbaseTableNamePrefix;
60  
61      /**
62       * Constructor.
63       * 
64       * @param fileSystem
65       *            the file system where to store the dictionary
66       * @param properties
67       *            the configuration properties
68       */
69      public HBaseDataStore(final FileSystem fileSystem, final Properties properties) {
70  
71          this.hbaseUtils = AbstractHBaseUtils.factoryHBaseUtils(properties);
72          this.hbaseCfg = this.hbaseUtils.getHbcfg();
73  
74          // set serverFilterFlag
75          this.serverFilterFlag = Boolean.parseBoolean(properties.getProperty(HBASEDATASTORE_SERVERFILTERFLAG, 
76  									    "true"));
77  
78          // set the prefix of table names
79          this.hbaseTableNamePrefix = properties.getProperty(HBASEDATASTORE_TABLEPREFIX_PROP,
80  							   HBASEDATASTORE_TABLEPREFIX_DEFAULT);
81  	
82          // set fileSystem
83          this.fileSystem = fileSystem;
84  
85          try {
86              // check if the htables exist: create them if necessary
87              checkAndCreateTable(hbaseTableNamePrefix + DEFAULT_RES_TAB_NAME, DEFAULT_RES_FAM_NAME);
88              checkAndCreateTable(hbaseTableNamePrefix + DEFAULT_MEN_TAB_NAME, DEFAULT_MEN_FAM_NAME);
89              checkAndCreateTable(hbaseTableNamePrefix + DEFAULT_ENT_TAB_NAME, DEFAULT_ENT_FAM_NAME);
90              checkAndCreateTable(hbaseTableNamePrefix + DEFAULT_CON_TAB_NAME, DEFAULT_CON_FAM_NAME);
91              checkAndCreateTable(hbaseTableNamePrefix + DEFAULT_USR_TAB_NAME, DEFAULT_USR_FAM_NAME);
92  
93              // set Avro serializer
94              final String dictRelPathString = properties.getProperty(URIDICT_RELATIVEPATH_PROP,
95                      URIDICT_RELATIVEPATH_DEFAULT);
96              final Path dictFullPath = new Path(dictRelPathString).makeQualified(fileSystem);
97              this.serializer = new AvroSerializer(Dictionary.createHadoopDictionary(URI.class,
98                      dictFullPath.toString()));
99  
100         } catch (final IOException e) {
101             logger.error("Error while creating a new HBaseStore.");
102             logger.error(e.getMessage());
103         }
104     }
105 
106     @Override
107     public void init() throws IOException {
108     }
109 
110     @Override
111     public DataTransaction begin(final boolean readOnly) throws IOException
112     {
113         logger.debug("Starting transaction in HBaseStore.");
114         hbaseUtils.initSerializer(this.serializer);
115         hbaseUtils.initServerFilterFlag(this.serverFilterFlag);
116         hbaseUtils.initHbaseTableNamePrefix(this.hbaseTableNamePrefix);
117         return new HBaseDataTransaction(hbaseUtils);
118     }
119 
120     @Override
121     public void close()
122     {
123         logger.debug("Finishing transaction in HBaseStore.");
124     }
125 
126     /**
127      * Verifies the existence of tables.
128      * @param tableName to be verified
129      * @param columnFamilyName of the table
130      * @throws IOException
131      */
132     private void checkAndCreateTable(final String tabName, final String colFamName)
133             throws IOException
134     {
135         hbaseUtils.checkAndCreateTable(tabName, colFamName);
136     }
137 
138     /**
139      * @return the logger
140      */
141     public static Logger getLogger()
142     {
143         return logger;
144     }
145 
146     /**
147      * @param logger to be set
148      */
149     public static void setLogger(final Logger logger)
150     {
151         HBaseDataStore.logger = logger;
152     }
153 
154     /**
155      * @return the value of serverFilterFlag
156      */
157     public boolean getServerFilterFlag()
158     {
159         return serverFilterFlag;
160     }
161 
162     /**
163      * @return the serializer
164      */
165     public AvroSerializer getSerializer()
166     {
167         return this.serializer;
168     }
169 
170     /**
171      * @return the hbaseUtils
172      */
173     public AbstractHBaseUtils getHbaseUtils()
174     {
175         return this.hbaseUtils;
176     }
177 
178     /**
179      * @return the value of hbaseTableNamePrefix;
180      */
181     public String getHbaseTableNamePrefix()
182     {
183         return this.hbaseTableNamePrefix;
184     }
185 
186 }