1 package eu.fbk.knowledgestore.datastore.hbase;
2
3 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_CON_FAM_NAME;
4 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_CON_TAB_NAME;
5 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_ENT_FAM_NAME;
6 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_ENT_TAB_NAME;
7 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_MEN_FAM_NAME;
8 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_MEN_TAB_NAME;
9 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_RES_FAM_NAME;
10 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_RES_TAB_NAME;
11 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_USR_FAM_NAME;
12 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_USR_TAB_NAME;
13 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.HBASEDATASTORE_SERVERFILTERFLAG;
14 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.HBASEDATASTORE_TABLEPREFIX_DEFAULT;
15 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.HBASEDATASTORE_TABLEPREFIX_PROP;
16 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.URIDICT_RELATIVEPATH_DEFAULT;
17 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.URIDICT_RELATIVEPATH_PROP;
18
19 import java.io.IOException;
20 import java.util.Properties;
21
22 import org.apache.hadoop.fs.FileSystem;
23 import org.apache.hadoop.fs.Path;
24 import org.openrdf.model.URI;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 import eu.fbk.knowledgestore.data.Dictionary;
29 import eu.fbk.knowledgestore.datastore.DataStore;
30 import eu.fbk.knowledgestore.datastore.DataTransaction;
31 import eu.fbk.knowledgestore.datastore.hbase.utils.AbstractHBaseUtils;
32 import eu.fbk.knowledgestore.datastore.hbase.utils.AvroSerializer;
33
34
35
36
37
38 public class HBaseDataStore implements DataStore {
39
40
41 public org.apache.hadoop.conf.Configuration hbaseCfg;
42
43
44 private static Logger logger = LoggerFactory.getLogger(HBaseDataStore.class);
45
46
47 private boolean serverFilterFlag;
48
49
50 private AvroSerializer serializer;
51
52
53 private AbstractHBaseUtils hbaseUtils;
54
55
56 private final FileSystem fileSystem;
57
58
59 private final String hbaseTableNamePrefix;
60
61
62
63
64
65
66
67
68
69 public HBaseDataStore(final FileSystem fileSystem, final Properties properties) {
70
71 this.hbaseUtils = AbstractHBaseUtils.factoryHBaseUtils(properties);
72 this.hbaseCfg = this.hbaseUtils.getHbcfg();
73
74
75 this.serverFilterFlag = Boolean.parseBoolean(properties.getProperty(HBASEDATASTORE_SERVERFILTERFLAG,
76 "true"));
77
78
79 this.hbaseTableNamePrefix = properties.getProperty(HBASEDATASTORE_TABLEPREFIX_PROP,
80 HBASEDATASTORE_TABLEPREFIX_DEFAULT);
81
82
83 this.fileSystem = fileSystem;
84
85 try {
86
87 checkAndCreateTable(hbaseTableNamePrefix + DEFAULT_RES_TAB_NAME, DEFAULT_RES_FAM_NAME);
88 checkAndCreateTable(hbaseTableNamePrefix + DEFAULT_MEN_TAB_NAME, DEFAULT_MEN_FAM_NAME);
89 checkAndCreateTable(hbaseTableNamePrefix + DEFAULT_ENT_TAB_NAME, DEFAULT_ENT_FAM_NAME);
90 checkAndCreateTable(hbaseTableNamePrefix + DEFAULT_CON_TAB_NAME, DEFAULT_CON_FAM_NAME);
91 checkAndCreateTable(hbaseTableNamePrefix + DEFAULT_USR_TAB_NAME, DEFAULT_USR_FAM_NAME);
92
93
94 final String dictRelPathString = properties.getProperty(URIDICT_RELATIVEPATH_PROP,
95 URIDICT_RELATIVEPATH_DEFAULT);
96 final Path dictFullPath = new Path(dictRelPathString).makeQualified(fileSystem);
97 this.serializer = new AvroSerializer(Dictionary.createHadoopDictionary(URI.class,
98 dictFullPath.toString()));
99
100 } catch (final IOException e) {
101 logger.error("Error while creating a new HBaseStore.");
102 logger.error(e.getMessage());
103 }
104 }
105
106 @Override
107 public void init() throws IOException {
108 }
109
110 @Override
111 public DataTransaction begin(final boolean readOnly) throws IOException
112 {
113 logger.debug("Starting transaction in HBaseStore.");
114 hbaseUtils.initSerializer(this.serializer);
115 hbaseUtils.initServerFilterFlag(this.serverFilterFlag);
116 hbaseUtils.initHbaseTableNamePrefix(this.hbaseTableNamePrefix);
117 return new HBaseDataTransaction(hbaseUtils);
118 }
119
120 @Override
121 public void close()
122 {
123 logger.debug("Finishing transaction in HBaseStore.");
124 }
125
126
127
128
129
130
131
132 private void checkAndCreateTable(final String tabName, final String colFamName)
133 throws IOException
134 {
135 hbaseUtils.checkAndCreateTable(tabName, colFamName);
136 }
137
138
139
140
141 public static Logger getLogger()
142 {
143 return logger;
144 }
145
146
147
148
149 public static void setLogger(final Logger logger)
150 {
151 HBaseDataStore.logger = logger;
152 }
153
154
155
156
157 public boolean getServerFilterFlag()
158 {
159 return serverFilterFlag;
160 }
161
162
163
164
165 public AvroSerializer getSerializer()
166 {
167 return this.serializer;
168 }
169
170
171
172
173 public AbstractHBaseUtils getHbaseUtils()
174 {
175 return this.hbaseUtils;
176 }
177
178
179
180
181 public String getHbaseTableNamePrefix()
182 {
183 return this.hbaseTableNamePrefix;
184 }
185
186 }