1 package eu.fbk.knowledgestore.triplestore;
2
3 import java.io.IOException;
4 import java.util.Collections;
5 import java.util.Iterator;
6
7 import javax.annotation.Nullable;
8
9 import com.google.common.base.Preconditions;
10 import com.google.common.base.Throwables;
11 import com.google.common.collect.Iterators;
12
13 import org.openrdf.model.Resource;
14 import org.openrdf.model.Statement;
15 import org.openrdf.model.URI;
16 import org.openrdf.model.Value;
17 import org.openrdf.model.impl.ContextStatementImpl;
18 import org.openrdf.query.Binding;
19 import org.openrdf.query.BindingSet;
20 import org.openrdf.query.MalformedQueryException;
21 import org.openrdf.query.QueryEvaluationException;
22 import org.openrdf.query.QueryLanguage;
23 import org.openrdf.query.TupleQuery;
24 import org.openrdf.repository.Repository;
25 import org.openrdf.repository.RepositoryConnection;
26 import org.openrdf.repository.RepositoryException;
27 import org.openrdf.repository.sail.SailRepository;
28 import org.openrdf.sail.Sail;
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32 import info.aduna.iteration.CloseableIteration;
33 import info.aduna.iteration.CloseableIteratorIteration;
34 import info.aduna.iteration.IterationWrapper;
35
36 import eu.fbk.knowledgestore.data.Handler;
37 import eu.fbk.knowledgestore.internal.Util;
38 import eu.fbk.knowledgestore.runtime.DataCorruptedException;
39
40 public final class RepositoryTripleStore implements TripleStore {
41
42 private static final Logger LOGGER = LoggerFactory.getLogger(RepositoryTripleStore.class);
43
44 private final Repository repository;
45
46 public RepositoryTripleStore(final Sail sail) {
47 this(new SailRepository(sail));
48 }
49
50 public RepositoryTripleStore(final Repository repository) {
51 this.repository = Preconditions.checkNotNull(repository);
52 LOGGER.info("RepositoryTripleStore configured, backend={}", repository.getClass()
53 .getSimpleName());
54 }
55
56 @Override
57 public void init() throws IOException {
58 try {
59 this.repository.initialize();
60 } catch (final Throwable ex) {
61 throw new IOException("Could not initialize Sesame repository");
62 }
63 }
64
65 @Override
66 public TripleTransaction begin(final boolean readOnly) throws IOException {
67 return new RepositoryTripleTransaction(readOnly);
68 }
69
70 @Override
71 public void reset() throws IOException {
72 RepositoryConnection connection = null;
73 try {
74 connection = this.repository.getConnection();
75 connection.clear();
76 connection.clearNamespaces();
77 LOGGER.info("Sesame repository successfully resetted");
78 } catch (final RepositoryException ex) {
79 throw new IOException("Could not reset Sesame repository", ex);
80 } finally {
81 Util.closeQuietly(connection);
82 }
83 }
84
85 @Override
86 public void close() {
87 try {
88 this.repository.shutDown();
89 } catch (final RepositoryException ex) {
90 LOGGER.error("Failed to shutdown Sesame repository", ex);
91 }
92 }
93
94 @Override
95 public String toString() {
96 return getClass().getSimpleName();
97 }
98
99 private class RepositoryTripleTransaction implements TripleTransaction {
100
101 private final RepositoryConnection connection;
102
103 private final boolean readOnly;
104
105 private final long ts;
106
107 private boolean dirty;
108
109 RepositoryTripleTransaction(final boolean readOnly) throws IOException {
110
111 final long ts = System.currentTimeMillis();
112 final RepositoryConnection connection;
113 try {
114 connection = RepositoryTripleStore.this.repository.getConnection();
115 } catch (final RepositoryException ex) {
116 throw new IOException("Could not connect to Sesame repository", ex);
117 }
118
119 this.connection = connection;
120 this.readOnly = readOnly;
121 this.ts = ts;
122 this.dirty = false;
123
124 try {
125 connection.begin();
126 } catch (final Throwable ex) {
127 Util.closeQuietly(connection);
128 }
129
130 if (LOGGER.isDebugEnabled()) {
131 LOGGER.debug(this + " started in " + (readOnly ? "read-only" : "read-write")
132 + " mode, " + (System.currentTimeMillis() - ts) + " ms");
133 }
134 }
135
136 private void checkWritable() {
137 if (this.readOnly) {
138 throw new IllegalStateException(
139 "Write operation not allowed on read-only transaction");
140 }
141 }
142
143 @Nullable
144 private <T, E extends Exception> CloseableIteration<T, E> logClose(
145 @Nullable final CloseableIteration<T, E> iteration) {
146 if (iteration == null || !LOGGER.isDebugEnabled()) {
147 return iteration;
148 }
149 final long ts = System.currentTimeMillis();
150 return new IterationWrapper<T, E>(iteration) {
151
152 @Override
153 protected void handleClose() throws E {
154 try {
155 super.handleClose();
156 } finally {
157 LOGGER.debug("Repository iteration closed after {} ms",
158 System.currentTimeMillis() - ts);
159 }
160 }
161
162 };
163 }
164
165 @Override
166 public CloseableIteration<? extends Statement, ? extends Exception> get(
167 final Resource subject, final URI predicate, final Value object,
168 final Resource context) throws IOException, IllegalStateException {
169
170 try {
171 final long ts = System.currentTimeMillis();
172 final CloseableIteration<? extends Statement, ? extends Exception> result;
173 if (subject == null || predicate == null || object == null || context == null) {
174 result = logClose(this.connection.getStatements(subject, predicate, object,
175 false, context));
176 LOGGER.debug("getStatements() iteration obtained in {} ms",
177 System.currentTimeMillis() - ts);
178 } else {
179 Iterator<Statement> iterator;
180 if (this.connection.hasStatement(subject, predicate, object, true, context)) {
181 iterator = Collections.emptyIterator();
182 } else {
183 iterator = Iterators
184 .<Statement>singletonIterator(new ContextStatementImpl(subject,
185 predicate, object, context));
186 }
187 result = new CloseableIteratorIteration<Statement, RuntimeException>(iterator);
188 LOGGER.debug("hasStatement() evaluated in {} ms", System.currentTimeMillis()
189 - ts);
190 }
191 return result;
192
193 } catch (final RepositoryException ex) {
194 throw new IOException("Error while retrieving matching statements", ex);
195 }
196 }
197
198 @Override
199 public CloseableIteration<BindingSet, QueryEvaluationException> query(
200 final SelectQuery query, final BindingSet bindings, @Nullable final Long timeout)
201 throws IOException, UnsupportedOperationException, IllegalStateException {
202
203 LOGGER.debug("Evaluating query:\n{}", query.getString());
204
205 final TupleQuery tupleQuery;
206 try {
207 tupleQuery = this.connection.prepareTupleQuery(QueryLanguage.SPARQL,
208 query.getString());
209
210 } catch (final RepositoryException ex) {
211 throw new IOException("Failed to prepare SPARQL tuple query:\n" + query, ex);
212
213 } catch (final MalformedQueryException ex) {
214
215 throw new UnsupportedOperationException(
216 "SPARQL query rejected as malformed by Sesame repository:\n" + query, ex);
217 }
218
219 if (bindings != null) {
220 for (final Binding binding : bindings) {
221 tupleQuery.setBinding(binding.getName(), binding.getValue());
222 }
223 }
224
225 if (timeout != null) {
226
227
228 tupleQuery.setMaxQueryTime(timeout.intValue());
229 }
230
231 final long ts = System.currentTimeMillis();
232 try {
233
234 final CloseableIteration<BindingSet, QueryEvaluationException> result;
235 result = logClose(tupleQuery.evaluate());
236 LOGGER.debug("Query result iteration obtained in {} ms",
237 System.currentTimeMillis() - ts);
238 return result;
239
240 } catch (final QueryEvaluationException ex) {
241
242 final StringBuilder builder = new StringBuilder();
243 boolean emitQuery = false;
244 builder.append("Query evaluation failed after ")
245 .append(System.currentTimeMillis() - ts).append(" ms");
246 if (ex.getMessage() != null) {
247 builder.append("\n").append(ex.getMessage());
248 emitQuery = !ex.getMessage().contains(query.getString());
249 }
250 if (emitQuery) {
251 builder.append("\nFailed query:\n\n").append(query);
252 }
253 throw new IOException(builder.toString(), ex);
254 }
255 }
256
257 @Override
258 public void infer(final Handler<? super Statement> handler) throws IOException,
259 IllegalStateException {
260
261 checkWritable();
262
263
264 if (handler != null) {
265 try {
266 handler.handle(null);
267 } catch (final Throwable ex) {
268 Throwables.propagateIfPossible(ex, IOException.class);
269 throw new RuntimeException(ex);
270 }
271 }
272 }
273
274 @Override
275 public void add(final Iterable<? extends Statement> statements) throws IOException,
276 IllegalStateException {
277
278 Preconditions.checkNotNull(statements);
279 checkWritable();
280
281 try {
282 this.dirty = true;
283 this.connection.add(statements);
284 } catch (final RepositoryException ex) {
285 throw new DataCorruptedException("Error while adding statements", ex);
286 }
287 }
288
289 @Override
290 public void remove(final Iterable<? extends Statement> statements) throws IOException,
291 IllegalStateException {
292
293 Preconditions.checkNotNull(statements);
294 checkWritable();
295
296 try {
297 this.dirty = true;
298 this.connection.remove(statements);
299 } catch (final RepositoryException ex) {
300 throw new DataCorruptedException("Error while removing statements", ex);
301 }
302 }
303
304 @Override
305 public void end(final boolean commit) throws DataCorruptedException, IOException,
306 IllegalStateException {
307
308 final long ts = System.currentTimeMillis();
309 boolean committed = false;
310
311 try {
312 if (this.dirty) {
313 if (commit) {
314 try {
315 this.connection.commit();
316 committed = true;
317
318 } catch (final Throwable ex) {
319 try {
320 this.connection.rollback();
321 LOGGER.debug("{} rolled back after commit failure", this);
322
323 } catch (final RepositoryException ex2) {
324 throw new DataCorruptedException(
325 "Failed to rollback transaction after commit failure", ex);
326 }
327 throw new IOException(
328 "Failed to commit transaction (rollback forced)", ex);
329 }
330 } else {
331 try {
332 this.connection.rollback();
333 } catch (final Throwable ex) {
334 throw new DataCorruptedException("Failed to rollback transaction", ex);
335 }
336 }
337 }
338 } finally {
339 try {
340 this.connection.close();
341 } catch (final RepositoryException ex) {
342 LOGGER.error("Failed to close connection", ex);
343 } finally {
344 if (LOGGER.isDebugEnabled()) {
345 final long now = System.currentTimeMillis();
346 LOGGER.debug("{} {} and closed in {} ms, tx duration {} ms", this,
347 committed ? "committed" : "rolled back", now - ts, now - this.ts);
348 }
349 }
350 }
351 }
352
353 @Override
354 public String toString() {
355 return getClass().getSimpleName();
356 }
357
358 }
359
360 }