1   package eu.fbk.knowledgestore.datastore.hbase;
2   
3   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_CON_FAM_NAME;
4   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_CON_TAB_NAME;
5   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_ENT_FAM_NAME;
6   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_ENT_TAB_NAME;
7   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_MEN_FAM_NAME;
8   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_MEN_TAB_NAME;
9   import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_RES_FAM_NAME;
10  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_RES_TAB_NAME;
11  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_USR_FAM_NAME;
12  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_USR_TAB_NAME;
13  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.HBASEDATASTORE_SERVERFILTERFLAG;
14  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.HBASEDATASTORE_TABLEPREFIX_DEFAULT;
15  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.HBASEDATASTORE_TABLEPREFIX_PROP;
16  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.URIDICT_RELATIVEPATH_DEFAULT;
17  import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.URIDICT_RELATIVEPATH_PROP;
18  
19  import java.io.IOException;
20  import java.util.Properties;
21  
22  import org.apache.hadoop.fs.FileSystem;
23  import org.apache.hadoop.fs.Path;
24  import org.openrdf.model.URI;
25  import org.slf4j.Logger;
26  import org.slf4j.LoggerFactory;
27  
28  import eu.fbk.knowledgestore.data.Dictionary;
29  import eu.fbk.knowledgestore.datastore.DataStore;
30  import eu.fbk.knowledgestore.datastore.DataTransaction;
31  import eu.fbk.knowledgestore.datastore.hbase.utils.AbstractHBaseUtils;
32  import eu.fbk.knowledgestore.datastore.hbase.utils.AvroSerializer;
33  
34  
35  
36  
37  
38  public class HBaseDataStore implements DataStore {
39  
40      
41      public org.apache.hadoop.conf.Configuration hbaseCfg;
42  
43      
44      private static Logger logger = LoggerFactory.getLogger(HBaseDataStore.class);
45  
46      
47      private boolean serverFilterFlag;
48          
49      
50      private AvroSerializer serializer;
51  
52      
53      private AbstractHBaseUtils hbaseUtils;
54  
55      
56      private final FileSystem fileSystem;
57  
58      
59      private final String hbaseTableNamePrefix;
60  
61      
62  
63  
64  
65  
66  
67  
68  
69      public HBaseDataStore(final FileSystem fileSystem, final Properties properties) {
70  
71          this.hbaseUtils = AbstractHBaseUtils.factoryHBaseUtils(properties);
72          this.hbaseCfg = this.hbaseUtils.getHbcfg();
73  
74          
75          this.serverFilterFlag = Boolean.parseBoolean(properties.getProperty(HBASEDATASTORE_SERVERFILTERFLAG, 
76  									    "true"));
77  
78          
79          this.hbaseTableNamePrefix = properties.getProperty(HBASEDATASTORE_TABLEPREFIX_PROP,
80  							   HBASEDATASTORE_TABLEPREFIX_DEFAULT);
81  	
82          
83          this.fileSystem = fileSystem;
84  
85          try {
86              
87              checkAndCreateTable(hbaseTableNamePrefix + DEFAULT_RES_TAB_NAME, DEFAULT_RES_FAM_NAME);
88              checkAndCreateTable(hbaseTableNamePrefix + DEFAULT_MEN_TAB_NAME, DEFAULT_MEN_FAM_NAME);
89              checkAndCreateTable(hbaseTableNamePrefix + DEFAULT_ENT_TAB_NAME, DEFAULT_ENT_FAM_NAME);
90              checkAndCreateTable(hbaseTableNamePrefix + DEFAULT_CON_TAB_NAME, DEFAULT_CON_FAM_NAME);
91              checkAndCreateTable(hbaseTableNamePrefix + DEFAULT_USR_TAB_NAME, DEFAULT_USR_FAM_NAME);
92  
93              
94              final String dictRelPathString = properties.getProperty(URIDICT_RELATIVEPATH_PROP,
95                      URIDICT_RELATIVEPATH_DEFAULT);
96              final Path dictFullPath = new Path(dictRelPathString).makeQualified(fileSystem);
97              this.serializer = new AvroSerializer(Dictionary.createHadoopDictionary(URI.class,
98                      dictFullPath.toString()));
99  
100         } catch (final IOException e) {
101             logger.error("Error while creating a new HBaseStore.");
102             logger.error(e.getMessage());
103         }
104     }
105 
106     @Override
107     public void init() throws IOException {
108     }
109 
110     @Override
111     public DataTransaction begin(final boolean readOnly) throws IOException
112     {
113         logger.debug("Starting transaction in HBaseStore.");
114         hbaseUtils.initSerializer(this.serializer);
115         hbaseUtils.initServerFilterFlag(this.serverFilterFlag);
116         hbaseUtils.initHbaseTableNamePrefix(this.hbaseTableNamePrefix);
117         return new HBaseDataTransaction(hbaseUtils);
118     }
119 
120     @Override
121     public void close()
122     {
123         logger.debug("Finishing transaction in HBaseStore.");
124     }
125 
126     
127 
128 
129 
130 
131 
132     private void checkAndCreateTable(final String tabName, final String colFamName)
133             throws IOException
134     {
135         hbaseUtils.checkAndCreateTable(tabName, colFamName);
136     }
137 
138     
139 
140 
141     public static Logger getLogger()
142     {
143         return logger;
144     }
145 
146     
147 
148 
149     public static void setLogger(final Logger logger)
150     {
151         HBaseDataStore.logger = logger;
152     }
153 
154     
155 
156 
157     public boolean getServerFilterFlag()
158     {
159         return serverFilterFlag;
160     }
161 
162     
163 
164 
165     public AvroSerializer getSerializer()
166     {
167         return this.serializer;
168     }
169 
170     
171 
172 
173     public AbstractHBaseUtils getHbaseUtils()
174     {
175         return this.hbaseUtils;
176     }
177 
178     
179 
180 
181     public String getHbaseTableNamePrefix()
182     {
183         return this.hbaseTableNamePrefix;
184     }
185 
186 }