1   package eu.fbk.knowledgestore.triplestore;
2   
3   import java.io.IOException;
4   import java.util.Collections;
5   import java.util.Iterator;
6   
7   import javax.annotation.Nullable;
8   
9   import com.google.common.base.Preconditions;
10  import com.google.common.base.Throwables;
11  import com.google.common.collect.Iterators;
12  
13  import org.openrdf.model.Resource;
14  import org.openrdf.model.Statement;
15  import org.openrdf.model.URI;
16  import org.openrdf.model.Value;
17  import org.openrdf.model.impl.ContextStatementImpl;
18  import org.openrdf.query.Binding;
19  import org.openrdf.query.BindingSet;
20  import org.openrdf.query.MalformedQueryException;
21  import org.openrdf.query.QueryEvaluationException;
22  import org.openrdf.query.QueryLanguage;
23  import org.openrdf.query.TupleQuery;
24  import org.openrdf.repository.Repository;
25  import org.openrdf.repository.RepositoryConnection;
26  import org.openrdf.repository.RepositoryException;
27  import org.openrdf.repository.sail.SailRepository;
28  import org.openrdf.sail.Sail;
29  import org.slf4j.Logger;
30  import org.slf4j.LoggerFactory;
31  
32  import info.aduna.iteration.CloseableIteration;
33  import info.aduna.iteration.CloseableIteratorIteration;
34  import info.aduna.iteration.IterationWrapper;
35  
36  import eu.fbk.knowledgestore.data.Handler;
37  import eu.fbk.knowledgestore.internal.Util;
38  import eu.fbk.knowledgestore.runtime.DataCorruptedException;
39  
40  public final class RepositoryTripleStore implements TripleStore {
41  
42      private static final Logger LOGGER = LoggerFactory.getLogger(RepositoryTripleStore.class);
43  
44      private final Repository repository;
45  
46      public RepositoryTripleStore(final Sail sail) {
47          this(new SailRepository(sail));
48      }
49  
50      public RepositoryTripleStore(final Repository repository) {
51          this.repository = Preconditions.checkNotNull(repository);
52          LOGGER.info("RepositoryTripleStore configured, backend={}", repository.getClass()
53                  .getSimpleName());
54      }
55  
56      @Override
57      public void init() throws IOException {
58          try {
59              this.repository.initialize();
60          } catch (final Throwable ex) {
61              throw new IOException("Could not initialize Sesame repository");
62          }
63      }
64  
65      @Override
66      public TripleTransaction begin(final boolean readOnly) throws IOException {
67          return new RepositoryTripleTransaction(readOnly);
68      }
69  
70      @Override
71      public void reset() throws IOException {
72          RepositoryConnection connection = null;
73          try {
74              connection = this.repository.getConnection();
75              connection.clear();
76              connection.clearNamespaces();
77              LOGGER.info("Sesame repository successfully resetted");
78          } catch (final RepositoryException ex) {
79              throw new IOException("Could not reset Sesame repository", ex);
80          } finally {
81              Util.closeQuietly(connection);
82          }
83      }
84  
85      @Override
86      public void close() {
87          try {
88              this.repository.shutDown();
89          } catch (final RepositoryException ex) {
90              LOGGER.error("Failed to shutdown Sesame repository", ex);
91          }
92      }
93  
94      @Override
95      public String toString() {
96          return getClass().getSimpleName();
97      }
98  
99      private class RepositoryTripleTransaction implements TripleTransaction {
100 
101         private final RepositoryConnection connection;
102 
103         private final boolean readOnly;
104 
105         private final long ts;
106 
107         private boolean dirty;
108 
109         RepositoryTripleTransaction(final boolean readOnly) throws IOException {
110 
111             final long ts = System.currentTimeMillis();
112             final RepositoryConnection connection;
113             try {
114                 connection = RepositoryTripleStore.this.repository.getConnection();
115             } catch (final RepositoryException ex) {
116                 throw new IOException("Could not connect to Sesame repository", ex);
117             }
118 
119             this.connection = connection;
120             this.readOnly = readOnly;
121             this.ts = ts;
122             this.dirty = false;
123 
124             try {
125                 connection.begin();
126             } catch (final Throwable ex) {
127                 Util.closeQuietly(connection);
128             }
129 
130             if (LOGGER.isDebugEnabled()) {
131                 LOGGER.debug(this + " started in " + (readOnly ? "read-only" : "read-write")
132                         + " mode, " + (System.currentTimeMillis() - ts) + " ms");
133             }
134         }
135 
136         private void checkWritable() {
137             if (this.readOnly) {
138                 throw new IllegalStateException(
139                         "Write operation not allowed on read-only transaction");
140             }
141         }
142 
143         @Nullable
144         private <T, E extends Exception> CloseableIteration<T, E> logClose(
145                 @Nullable final CloseableIteration<T, E> iteration) {
146             if (iteration == null || !LOGGER.isDebugEnabled()) {
147                 return iteration;
148             }
149             final long ts = System.currentTimeMillis();
150             return new IterationWrapper<T, E>(iteration) {
151 
152                 @Override
153                 protected void handleClose() throws E {
154                     try {
155                         super.handleClose();
156                     } finally {
157                         LOGGER.debug("Repository iteration closed after {} ms",
158                                 System.currentTimeMillis() - ts);
159                     }
160                 }
161 
162             };
163         }
164 
165         @Override
166         public CloseableIteration<? extends Statement, ? extends Exception> get(
167                 final Resource subject, final URI predicate, final Value object,
168                 final Resource context) throws IOException, IllegalStateException {
169 
170             try {
171                 final long ts = System.currentTimeMillis();
172                 final CloseableIteration<? extends Statement, ? extends Exception> result;
173                 if (subject == null || predicate == null || object == null || context == null) {
174                     result = logClose(this.connection.getStatements(subject, predicate, object,
175                             false, context));
176                     LOGGER.debug("getStatements() iteration obtained in {} ms",
177                             System.currentTimeMillis() - ts);
178                 } else {
179                     Iterator<Statement> iterator;
180                     if (this.connection.hasStatement(subject, predicate, object, true, context)) {
181                         iterator = Collections.emptyIterator();
182                     } else {
183                         iterator = Iterators
184                                 .<Statement>singletonIterator(new ContextStatementImpl(subject,
185                                         predicate, object, context));
186                     }
187                     result = new CloseableIteratorIteration<Statement, RuntimeException>(iterator);
188                     LOGGER.debug("hasStatement() evaluated in {} ms", System.currentTimeMillis()
189                             - ts);
190                 }
191                 return result;
192 
193             } catch (final RepositoryException ex) {
194                 throw new IOException("Error while retrieving matching statements", ex);
195             }
196         }
197 
198         @Override
199         public CloseableIteration<BindingSet, QueryEvaluationException> query(
200                 final SelectQuery query, final BindingSet bindings, @Nullable final Long timeout)
201                 throws IOException, UnsupportedOperationException, IllegalStateException {
202 
203             LOGGER.debug("Evaluating query:\n{}", query.getString());
204 
205             final TupleQuery tupleQuery;
206             try {
207                 tupleQuery = this.connection.prepareTupleQuery(QueryLanguage.SPARQL,
208                         query.getString());
209 
210             } catch (final RepositoryException ex) {
211                 throw new IOException("Failed to prepare SPARQL tuple query:\n" + query, ex);
212 
213             } catch (final MalformedQueryException ex) {
214                 // should not happen, as SelectQuery can only be created with valid queries
215                 throw new UnsupportedOperationException(
216                         "SPARQL query rejected as malformed by Sesame repository:\n" + query, ex);
217             }
218 
219             if (bindings != null) {
220                 for (final Binding binding : bindings) {
221                     tupleQuery.setBinding(binding.getName(), binding.getValue());
222                 }
223             }
224 
225             if (timeout != null) {
226                 // Note: we pass the value in ms, although the spec says seconds. However, at
227                 // least for Virtuoso it seems that the value passed is interpreted as a ms value
228                 tupleQuery.setMaxQueryTime(timeout.intValue());
229             }
230 
231             final long ts = System.currentTimeMillis();
232             try {
233                 // execute the query
234                 final CloseableIteration<BindingSet, QueryEvaluationException> result;
235                 result = logClose(tupleQuery.evaluate());
236                 LOGGER.debug("Query result iteration obtained in {} ms",
237                         System.currentTimeMillis() - ts);
238                 return result;
239 
240             } catch (final QueryEvaluationException ex) {
241                 // return all the information available, so to help debugging
242                 final StringBuilder builder = new StringBuilder();
243                 boolean emitQuery = false;
244                 builder.append("Query evaluation failed after ")
245                         .append(System.currentTimeMillis() - ts).append(" ms");
246                 if (ex.getMessage() != null) {
247                     builder.append("\n").append(ex.getMessage());
248                     emitQuery = !ex.getMessage().contains(query.getString());
249                 }
250                 if (emitQuery) {
251                     builder.append("\nFailed query:\n\n").append(query);
252                 }
253                 throw new IOException(builder.toString(), ex);
254             }
255         }
256 
257         @Override
258         public void infer(final Handler<? super Statement> handler) throws IOException,
259                 IllegalStateException {
260 
261             checkWritable();
262 
263             // No inference done at this level (to be implemented in a decorator).
264             if (handler != null) {
265                 try {
266                     handler.handle(null);
267                 } catch (final Throwable ex) {
268                     Throwables.propagateIfPossible(ex, IOException.class);
269                     throw new RuntimeException(ex);
270                 }
271             }
272         }
273 
274         @Override
275         public void add(final Iterable<? extends Statement> statements) throws IOException,
276                 IllegalStateException {
277 
278             Preconditions.checkNotNull(statements);
279             checkWritable();
280 
281             try {
282                 this.dirty = true;
283                 this.connection.add(statements);
284             } catch (final RepositoryException ex) {
285                 throw new DataCorruptedException("Error while adding statements", ex);
286             }
287         }
288 
289         @Override
290         public void remove(final Iterable<? extends Statement> statements) throws IOException,
291                 IllegalStateException {
292 
293             Preconditions.checkNotNull(statements);
294             checkWritable();
295 
296             try {
297                 this.dirty = true;
298                 this.connection.remove(statements);
299             } catch (final RepositoryException ex) {
300                 throw new DataCorruptedException("Error while removing statements", ex);
301             }
302         }
303 
304         @Override
305         public void end(final boolean commit) throws DataCorruptedException, IOException,
306                 IllegalStateException {
307 
308             final long ts = System.currentTimeMillis();
309             boolean committed = false;
310 
311             try {
312                 if (this.dirty) {
313                     if (commit) {
314                         try {
315                             this.connection.commit();
316                             committed = true;
317 
318                         } catch (final Throwable ex) {
319                             try {
320                                 this.connection.rollback();
321                                 LOGGER.debug("{} rolled back after commit failure", this);
322 
323                             } catch (final RepositoryException ex2) {
324                                 throw new DataCorruptedException(
325                                         "Failed to rollback transaction after commit failure", ex);
326                             }
327                             throw new IOException(
328                                     "Failed to commit transaction (rollback forced)", ex);
329                         }
330                     } else {
331                         try {
332                             this.connection.rollback();
333                         } catch (final Throwable ex) {
334                             throw new DataCorruptedException("Failed to rollback transaction", ex);
335                         }
336                     }
337                 }
338             } finally {
339                 try {
340                     this.connection.close();
341                 } catch (final RepositoryException ex) {
342                     LOGGER.error("Failed to close connection", ex);
343                 } finally {
344                     if (LOGGER.isDebugEnabled()) {
345                         final long now = System.currentTimeMillis();
346                         LOGGER.debug("{} {} and closed in {} ms, tx duration {} ms", this,
347                                 committed ? "committed" : "rolled back", now - ts, now - this.ts);
348                     }
349                 }
350             }
351         }
352 
353         @Override
354         public String toString() {
355             return getClass().getSimpleName();
356         }
357 
358     }
359 
360 }