1 package eu.fbk.knowledgestore.datastore.hbase;
2
3 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_USR_FAM_NAME;
4 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_USR_QUA_NAME;
5 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_USR_TAB_NAME;
6 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_CON_FAM_NAME;
7 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_CON_QUA_NAME;
8 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_CON_TAB_NAME;
9 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_ENT_FAM_NAME;
10 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_ENT_QUA_NAME;
11 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_ENT_TAB_NAME;
12 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_MEN_FAM_NAME;
13 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_MEN_QUA_NAME;
14 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_MEN_TAB_NAME;
15 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_RES_FAM_NAME;
16 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_RES_QUA_NAME;
17 import static eu.fbk.knowledgestore.datastore.hbase.utils.HBaseConstants.DEFAULT_RES_TAB_NAME;
18
19 import java.io.IOException;
20 import java.util.Map;
21 import java.util.Set;
22
23 import javax.annotation.Nullable;
24
25 import org.openrdf.model.URI;
26 import org.openrdf.model.vocabulary.RDF;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29
30 import eu.fbk.knowledgestore.data.Data;
31 import eu.fbk.knowledgestore.data.Record;
32 import eu.fbk.knowledgestore.data.Stream;
33 import eu.fbk.knowledgestore.data.XPath;
34 import eu.fbk.knowledgestore.datastore.DataStore;
35 import eu.fbk.knowledgestore.datastore.DataTransaction;
36 import eu.fbk.knowledgestore.datastore.hbase.utils.AbstractHBaseUtils;
37 import eu.fbk.knowledgestore.runtime.DataCorruptedException;
38 import eu.fbk.knowledgestore.vocabulary.KS;
39
40
41
42
43
44
45 public class HBaseDataTransaction implements DataTransaction {
46
47
48 private static Logger logger = LoggerFactory.getLogger(HBaseDataTransaction.class);
49
50
51 private AbstractHBaseUtils hbaseUtils;
52
53
54
55
56
57 HBaseDataTransaction (AbstractHBaseUtils pHbaseUtils) {
58 this.setHbaseUtils(pHbaseUtils);
59 this.getHbaseUtils().begin();
60 }
61
62 @Nullable
63 static URI getRecordType(Record record) {
64 for (URI type : record.get(RDF.TYPE, URI.class)) {
65 if (DataStore.SUPPORTED_TYPES.contains(type)) {
66 return type;
67 }
68 }
69 return null;
70 }
71
72 @Override
73 public Stream<Record> lookup(final URI type, final Set<? extends URI> ids,
74 final @Nullable Set<? extends URI> properties) throws DataCorruptedException,
75 IOException, IllegalArgumentException, IllegalStateException {
76
77
78
79 HBaseIterator iterator = null;
80 if (KS.RESOURCE.equals(type)) {
81 iterator = new HBaseIterator(hbaseUtils, hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_RES_TAB_NAME,
82 ids, properties);
83 } else if (KS.MENTION.equals(type)) {
84 iterator = new HBaseIterator(hbaseUtils, hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_MEN_TAB_NAME,
85 ids, properties);
86 } else if (KS.ENTITY.equals(type)) {
87 iterator = new HBaseIterator(hbaseUtils, hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_ENT_TAB_NAME,
88 ids, properties);
89 } else if (KS.CONTEXT.equals(type)) {
90 iterator = new HBaseIterator(hbaseUtils, hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_CON_TAB_NAME,
91 ids, properties);
92 } else if (KS.USER.equals(type)) {
93 iterator = new HBaseIterator(hbaseUtils, hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_USR_TAB_NAME,
94 ids, properties);
95 } else {
96 throw new IllegalArgumentException("Unsupported record type "
97 + Data.toString(type, Data.getNamespaceMap()));
98 }
99 return Stream.create(iterator);
100 }
101
102 @Override
103 public Stream<Record> retrieve(final URI type, @Nullable final XPath condition,
104 @Nullable final Set<? extends URI> properties) throws DataCorruptedException,
105 IOException, IllegalArgumentException, IllegalStateException {
106
107 String tableName;
108 String familyName;
109
110 if (KS.RESOURCE.equals(type)) {
111 tableName = hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_RES_TAB_NAME;
112 familyName = DEFAULT_RES_FAM_NAME;
113 } else if (KS.MENTION.equals(type)) {
114 tableName = hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_MEN_TAB_NAME;
115 familyName = DEFAULT_MEN_FAM_NAME;
116 } else if (KS.ENTITY.equals(type)) {
117 tableName = hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_ENT_TAB_NAME;
118 familyName = DEFAULT_ENT_FAM_NAME;
119 } else if (KS.CONTEXT.equals(type)) {
120 tableName = hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_CON_TAB_NAME;
121 familyName = DEFAULT_CON_FAM_NAME;
122 } else if (KS.USER.equals(type)) {
123 tableName = hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_USR_TAB_NAME;
124 familyName = DEFAULT_USR_FAM_NAME;
125 } else {
126 throw new IllegalArgumentException("Unsupported record type "
127 + Data.toString(type, Data.getNamespaceMap()));
128 }
129
130 return Stream.create(new HBaseScanIterator(hbaseUtils, tableName, familyName,
131 condition, properties, hbaseUtils.getServerFilterFlag()));
132 }
133
134 @Override
135 public long count(URI type, XPath condition)
136 throws DataCorruptedException, IOException,
137 IllegalArgumentException, IllegalStateException {
138
139 String tableName = null;
140 String familyName = null;
141
142 if (KS.RESOURCE.equals(type)) {
143 tableName = hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_RES_TAB_NAME;
144 familyName = DEFAULT_RES_FAM_NAME;
145 } else if (KS.MENTION.equals(type)) {
146 tableName = hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_MEN_TAB_NAME;
147 familyName = DEFAULT_MEN_FAM_NAME;
148 } else if (KS.ENTITY.equals(type)) {
149 tableName = hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_ENT_TAB_NAME;
150 familyName = DEFAULT_ENT_FAM_NAME;
151 } else if (KS.CONTEXT.equals(type)) {
152 tableName = hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_CON_TAB_NAME;
153 familyName = DEFAULT_CON_FAM_NAME;
154 } else if (KS.USER.equals(type)) {
155 tableName = hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_USR_TAB_NAME;
156 familyName = DEFAULT_USR_FAM_NAME;
157 } else {
158 throw new IllegalArgumentException("Unsupported record type "
159 + Data.toString(type, Data.getNamespaceMap()));
160 }
161
162 return getHbaseUtils().count(tableName, familyName, condition);
163 }
164
165 @Override
166 public Stream<Record> match(final Map<URI, XPath> conditions, final Map<URI, Set<URI>> ids,
167 final Map<URI, Set<URI>> properties) throws IOException, IllegalStateException {
168 throw new UnsupportedOperationException();
169 }
170
171
172
173
174 @Override
175 public void store(final URI type, final Record record) {
176 if (KS.RESOURCE.equals(type)) {
177 getHbaseUtils().processPut(record,
178 hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_RES_TAB_NAME,
179 DEFAULT_RES_FAM_NAME,
180 DEFAULT_RES_QUA_NAME);
181 } else if (KS.MENTION.equals(type)) {
182 getHbaseUtils().processPut(record,
183 hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_MEN_TAB_NAME,
184 DEFAULT_MEN_FAM_NAME,
185 DEFAULT_MEN_QUA_NAME);
186 } else if (KS.ENTITY.equals(type)) {
187 getHbaseUtils().processPut(record,
188 hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_ENT_TAB_NAME,
189 DEFAULT_ENT_FAM_NAME,
190 DEFAULT_ENT_QUA_NAME);
191 } else if (KS.CONTEXT.equals(type)) {
192 getHbaseUtils().processPut(record,
193 hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_CON_TAB_NAME,
194 DEFAULT_CON_FAM_NAME,
195 DEFAULT_CON_QUA_NAME);
196 } else if (KS.USER.equals(type)) {
197 getHbaseUtils().processPut(record,
198 hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_USR_TAB_NAME,
199 DEFAULT_USR_FAM_NAME,
200 DEFAULT_USR_QUA_NAME);
201 } else {
202 throw new IllegalArgumentException("Unsupported record:\n"
203 + record.toString(Data.getNamespaceMap(), true));
204 }
205 }
206
207
208
209
210 @Override
211 public void delete(final URI type, final URI id) {
212 if (KS.RESOURCE.equals(type)) {
213 getHbaseUtils().processDelete(id,
214 hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_RES_TAB_NAME,
215 DEFAULT_RES_FAM_NAME,
216 DEFAULT_RES_QUA_NAME);
217 } else if (KS.MENTION.equals(type)) {
218 getHbaseUtils().processDelete(id,
219 hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_MEN_TAB_NAME,
220 DEFAULT_MEN_FAM_NAME,
221 DEFAULT_MEN_QUA_NAME);
222 } else if (KS.ENTITY.equals(type)) {
223 getHbaseUtils().processDelete(id,
224 hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_ENT_TAB_NAME,
225 DEFAULT_ENT_FAM_NAME,
226 DEFAULT_ENT_QUA_NAME);
227 } else if (KS.CONTEXT.equals(type)) {
228 getHbaseUtils().processDelete(id,
229 hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_CON_TAB_NAME,
230 DEFAULT_CON_FAM_NAME,
231 DEFAULT_CON_QUA_NAME);
232 } else if (KS.USER.equals(type)) {
233 getHbaseUtils().processDelete(id,
234 hbaseUtils.getHbaseTableNamePrefix() + DEFAULT_USR_TAB_NAME,
235 DEFAULT_USR_FAM_NAME,
236 DEFAULT_USR_QUA_NAME);
237 } else {
238 throw new IllegalArgumentException("Unsupported record type:\n" + type);
239 }
240 }
241
242 @Override
243 public void end(boolean commit) throws DataCorruptedException, IOException,
244 IllegalStateException {
245 if (commit)
246 getHbaseUtils().commit();
247 else
248 getHbaseUtils().rollback();
249 }
250
251
252
253
254 public static Logger getLogger() {
255 return logger;
256 }
257
258
259
260
261 public static void setLogger(Logger logger) {
262 HBaseDataTransaction.logger = logger;
263 }
264
265
266
267
268 public AbstractHBaseUtils getHbaseUtils() {
269 return hbaseUtils;
270 }
271
272
273
274
275 public void setHbaseUtils(AbstractHBaseUtils hbaseUtils) {
276 this.hbaseUtils = hbaseUtils;
277 }
278 }