1 package eu.fbk.knowledgestore.datastore;
2
3 import java.io.IOException;
4 import java.util.List;
5 import java.util.Map;
6 import java.util.Set;
7
8 import javax.annotation.Nullable;
9
10 import com.google.common.base.Function;
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Throwables;
13 import com.google.common.collect.ImmutableList;
14 import com.google.common.collect.ImmutableSet;
15 import com.google.common.collect.Sets;
16
17 import org.openrdf.model.Resource;
18 import org.openrdf.model.Statement;
19 import org.openrdf.model.URI;
20 import org.openrdf.model.Value;
21 import org.openrdf.model.ValueFactory;
22 import org.openrdf.query.BindingSet;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 import eu.fbk.knowledgestore.data.Data;
27 import eu.fbk.knowledgestore.data.Handler;
28 import eu.fbk.knowledgestore.data.Record;
29 import eu.fbk.knowledgestore.data.Stream;
30 import eu.fbk.knowledgestore.data.XPath;
31 import eu.fbk.knowledgestore.runtime.DataCorruptedException;
32 import eu.fbk.knowledgestore.triplestore.SelectQuery;
33 import eu.fbk.knowledgestore.triplestore.TripleStore;
34 import eu.fbk.knowledgestore.triplestore.TripleTransaction;
35 import eu.fbk.knowledgestore.vocabulary.KS;
36
37 public final class TripleDataStore implements DataStore {
38
39 private static final Logger LOGGER = LoggerFactory.getLogger(TripleDataStore.class);
40
41 private final TripleStore tripleStore;
42
43 private boolean initialized;
44
45 private boolean closed;
46
47 public TripleDataStore(final TripleStore tripleStore) {
48 this.tripleStore = Preconditions.checkNotNull(tripleStore);
49 this.initialized = false;
50 this.closed = false;
51 LOGGER.info("{} configured, triplestore={}", this, this.tripleStore);
52 }
53
54 @Override
55 public synchronized void init() throws IOException, IllegalStateException {
56 Preconditions.checkState(!this.initialized && !this.closed);
57 this.initialized = true;
58 LOGGER.info("{} initialized", this);
59 }
60
61 @Override
62 public synchronized DataTransaction begin(final boolean readOnly)
63 throws DataCorruptedException, IOException, IllegalStateException {
64 Preconditions.checkState(this.initialized && !this.closed);
65 return new TripleDataTransaction(this.tripleStore.begin(readOnly));
66 }
67
68 @Override
69 public synchronized void close() {
70 if (this.closed) {
71 return;
72 }
73 this.closed = true;
74 }
75
76 @Override
77 public String toString() {
78 return this.getClass().getSimpleName();
79 }
80
81 private static final class TripleDataTransaction implements DataTransaction {
82
83 private final TripleTransaction transaction;
84
85 TripleDataTransaction(final TripleTransaction transaction) {
86 this.transaction = transaction;
87 }
88
89 private Stream<Record> query(final String spoPattern, final URI type,
90 @Nullable final Set<? extends URI> properties, @Nullable final XPath condition)
91 throws IOException {
92
93
94 final StringBuilder builder = new StringBuilder();
95 if (KS.RESOURCE.equals(type)) {
96 builder.append("SELECT ?s ?p ?o ?p1 ?o1 ?p2 ?o2 {\n"
97 + " ?s ?p ?o .\n"
98 + " OPTIONAL {\n ?o ?p1 ?o1\n"
99 + " FILTER (?p = <http://dkm.fbk.eu/ontologies/knowledgestore#storedAs>)\n" //
100 + " OPTIONAL {\n"
101 + " ?o1 ?p2 ?o2\n"
102 + " FILTER (?p1 = <http://www.semanticdesktop.org/ontologies/2007/03/22/nfo#hasHash>)\n" //
103 + " }\n"
104 + " }\n ");
105 } else {
106 builder.append("SELECT ?s ?p ?o {\n"
107 + " ?s ?p ?o .\n");
108 }
109 builder.append(" ?s a ").append(Data.toString(type, null)).append(" .\n");
110 builder.append(spoPattern);
111 builder.append("\n}");
112 final String query = builder.toString();
113
114
115 final Stream<BindingSet> bindingStream = Stream.create(this.transaction.query(
116 SelectQuery.from(query), null, null));
117
118
119 final Stream<Statement> stmtStream;
120 if (KS.RESOURCE.equals(type)) {
121 stmtStream = bindingStream.transform(null,
122 new Function<Handler<Statement>, Handler<BindingSet>>() {
123
124 @Override
125 public Handler<BindingSet> apply(final Handler<Statement> handler) {
126 return new Handler<BindingSet>() {
127
128 private final Set<Statement> set = Sets.newHashSet();
129
130 private Resource subject = null;
131
132 @Override
133 public void handle(final BindingSet bindings) throws Throwable {
134 if (bindings == null) {
135 handler.handle(null);
136 return;
137 }
138 final Resource s = (Resource) bindings.getValue("s");
139 final URI p = (URI) bindings.getValue("p");
140 final Value o = bindings.getValue("o");
141 final URI p1 = (URI) bindings.getValue("p1");
142 final Value o1 = bindings.getValue("o1");
143 final URI p2 = (URI) bindings.getValue("p2");
144 final Value o2 = bindings.getValue("o2");
145 final ValueFactory vf = Data.getValueFactory();
146 if (!s.equals(this.subject)) {
147 this.set.clear();
148 this.subject = s;
149 }
150 emit(handler, vf.createStatement(s, p, o));
151 if (o1 != null) {
152 emit(handler, vf.createStatement((URI) o, p1, o1));
153 if (o2 != null) {
154 emit(handler, vf.createStatement((URI) o1, p2, o2));
155 }
156 }
157 }
158
159 private void emit(final Handler<Statement> handler,
160 final Statement statement) throws Throwable {
161 if (this.set.add(statement)) {
162 handler.handle(statement);
163 }
164 }
165
166 };
167
168 }
169
170 });
171 } else {
172 stmtStream = bindingStream.transform(new Function<BindingSet, Statement>() {
173
174 @Override
175 public Statement apply(final BindingSet bindings) {
176 final Resource s = (Resource) bindings.getValue("s");
177 final URI p = (URI) bindings.getValue("p");
178 final Value o = bindings.getValue("o");
179 return Data.getValueFactory().createStatement(s, p, o);
180 }
181
182 }, 1);
183 }
184
185
186 Stream<Record> recordStream = Record.decode(stmtStream, ImmutableList.of(type), true);
187
188
189 if (condition != null) {
190 recordStream = recordStream.filter(condition.asPredicate(), 1);
191 }
192
193
194 if (properties != null && !properties.isEmpty()) {
195 final URI[] props = properties.toArray(new URI[properties.size()]);
196 recordStream = recordStream.transform(new Function<Record, Record>() {
197
198 @Override
199 public Record apply(final Record record) {
200 record.retain(props);
201 return null;
202 }
203
204 }, 1);
205 }
206 return recordStream;
207 }
208
209 @Override
210 public Stream<Record> lookup(final URI type, final Set<? extends URI> ids,
211 final Set<? extends URI> properties) throws IOException, IllegalArgumentException,
212 IllegalStateException {
213 return Stream.concat(Stream.create(ids).chunk(64)
214 .transform(new Function<List<? extends URI>, Stream<Record>>() {
215
216 @Override
217 public Stream<Record> apply(final List<? extends URI> input) {
218 final StringBuilder builder = new StringBuilder();
219 builder.append(" VALUES ?s {");
220 for (final URI id : input) {
221 builder.append(" <").append(id.toString()).append(">");
222 }
223 builder.append(" }");
224 try {
225 return query(builder.toString(), type, properties, null);
226 } catch (final IOException ex) {
227 throw Throwables.propagate(ex);
228 }
229 }
230
231 }, 1));
232 }
233
234 @Override
235 public Stream<Record> retrieve(final URI type, final XPath condition,
236 final Set<? extends URI> properties) throws IOException, IllegalArgumentException,
237 IllegalStateException {
238 return query(" ?s a <" + type.toString() + "> .", type, properties, condition);
239 }
240
241 @Override
242 public long count(final URI type, final XPath condition) throws IOException,
243 IllegalArgumentException, IllegalStateException {
244 return query(" ?s a <" + type.toString() + "> .", type, null, condition).count();
245 }
246
247 @Override
248 public Stream<Record> match(final Map<URI, XPath> conditions,
249 final Map<URI, Set<URI>> ids, final Map<URI, Set<URI>> properties)
250 throws IOException, IllegalStateException {
251 throw new UnsupportedOperationException();
252 }
253
254 @Override
255 public void store(final URI type, final Record record) throws IOException,
256 IllegalStateException {
257
258
259 delete(type, record.getID());
260
261
262 final List<Statement> statements = Record.encode(Stream.create(record),
263 ImmutableList.of(type)).toList();
264 this.transaction.add(statements);
265 }
266
267 @Override
268 public void delete(final URI type, final URI id) throws IOException, IllegalStateException {
269
270
271 final List<Statement> statements = Record.encode(
272 lookup(type, ImmutableSet.of(id), null), ImmutableList.of(type)).toList();
273
274
275 this.transaction.remove(statements);
276 }
277
278 @Override
279 public void end(final boolean commit) throws DataCorruptedException, IOException,
280 IllegalStateException {
281 this.transaction.end(commit);
282 }
283
284 }
285
286 }