1 package eu.fbk.knowledgestore.triplestore.virtuoso;
2
3 import java.io.Closeable;
4 import java.io.IOException;
5 import java.lang.reflect.Field;
6 import java.sql.Connection;
7 import java.sql.SQLException;
8 import java.util.Collections;
9 import java.util.Iterator;
10 import java.util.concurrent.Future;
11 import java.util.concurrent.TimeUnit;
12
13 import javax.annotation.Nullable;
14
15 import com.google.common.base.Preconditions;
16 import com.google.common.base.Throwables;
17 import com.google.common.collect.Iterators;
18
19 import org.openrdf.model.Resource;
20 import org.openrdf.model.Statement;
21 import org.openrdf.model.URI;
22 import org.openrdf.model.Value;
23 import org.openrdf.model.impl.ContextStatementImpl;
24 import org.openrdf.query.Binding;
25 import org.openrdf.query.BindingSet;
26 import org.openrdf.query.MalformedQueryException;
27 import org.openrdf.query.QueryEvaluationException;
28 import org.openrdf.query.QueryLanguage;
29 import org.openrdf.query.TupleQuery;
30 import org.openrdf.repository.RepositoryException;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 import info.aduna.iteration.CloseableIteration;
35 import info.aduna.iteration.CloseableIteratorIteration;
36 import info.aduna.iteration.EmptyIteration;
37 import info.aduna.iteration.IterationWrapper;
38
39 import virtuoso.sesame2.driver.VirtuosoRepositoryConnection;
40
41 import eu.fbk.knowledgestore.data.Data;
42 import eu.fbk.knowledgestore.data.Handler;
43 import eu.fbk.knowledgestore.runtime.DataCorruptedException;
44 import eu.fbk.knowledgestore.triplestore.SelectQuery;
45 import eu.fbk.knowledgestore.triplestore.TripleTransaction;
46
47 final class VirtuosoTripleTransaction implements TripleTransaction {
48
49 private static final Logger LOGGER = LoggerFactory.getLogger(VirtuosoTripleTransaction.class);
50
51 private final VirtuosoTripleStore store;
52
53 private final VirtuosoRepositoryConnection connection;
54
55 private final boolean readOnly;
56
57 private final long ts;
58
59 VirtuosoTripleTransaction(final VirtuosoTripleStore store, final boolean readOnly)
60 throws IOException {
61
62 assert store != null;
63
64
65 final long ts = System.currentTimeMillis();
66 final VirtuosoRepositoryConnection connection;
67 try {
68 connection = (VirtuosoRepositoryConnection) store.getVirtuoso().getConnection();
69 } catch (final RepositoryException ex) {
70 throw new IOException("Could not connect to Virtuoso", ex);
71 }
72
73 this.store = store;
74 this.connection = connection;
75 this.readOnly = readOnly;
76 this.ts = ts;
77
78 try {
79
80
81
82
83 connection.getQuadStoreConnection().setAutoCommit(true);
84 connection.getQuadStoreConnection().setReadOnly(readOnly);
85
86
87 } catch (final Throwable ex) {
88 try {
89 connection.close();
90 } catch (final RepositoryException ex2) {
91 LOGGER.error("Cannot close connection after begin() failure", ex);
92 }
93 throw new IOException("Cannot setup read-only transaction", ex);
94 }
95
96 if (LOGGER.isDebugEnabled()) {
97 LOGGER.debug(this + " started in " + (readOnly ? "read-only" : "read-write")
98 + " mode, " + (System.currentTimeMillis() - ts) + " ms");
99 }
100 }
101
102 private void checkWritable() {
103 if (this.readOnly) {
104 throw new IllegalStateException("Write operation not allowed on read-only transaction");
105 }
106 }
107
108 @Nullable
109 private <T, E extends Exception> CloseableIteration<T, E> logClose(
110 @Nullable final CloseableIteration<T, E> iteration) {
111 if (iteration == null || !LOGGER.isDebugEnabled()) {
112 return iteration;
113 }
114 final long ts = System.currentTimeMillis();
115 return new IterationWrapper<T, E>(iteration) {
116
117 @Override
118 protected void handleClose() throws E {
119 try {
120 super.handleClose();
121 } finally {
122 LOGGER.debug("Virtuoso iteration closed after {} ms",
123 System.currentTimeMillis() - ts);
124 }
125 }
126
127 };
128 }
129
130 @Override
131 public CloseableIteration<? extends Statement, ? extends Exception> get(
132 @Nullable final Resource subject, @Nullable final URI predicate,
133 @Nullable final Value object, @Nullable final Resource context) throws IOException,
134 IllegalStateException {
135
136 try {
137 final long ts = System.currentTimeMillis();
138 final CloseableIteration<? extends Statement, ? extends Exception> result;
139 if (subject == null || predicate == null || object == null || context == null) {
140 result = logClose(this.connection.getStatements(subject, predicate, object, false,
141 context));
142 LOGGER.debug("Virtuoso getStatements() iteration obtained in {} ms",
143 System.currentTimeMillis() - ts);
144 } else {
145 Iterator<Statement> iterator;
146 if (this.connection.hasStatement(subject, predicate, object, false, context)) {
147 iterator = Collections.emptyIterator();
148 } else {
149 iterator = Iterators.<Statement>singletonIterator(new ContextStatementImpl(
150 subject, predicate, object, context));
151 }
152 result = new CloseableIteratorIteration<Statement, RuntimeException>(iterator);
153 LOGGER.debug("Virtuoso hasStatement() evaluated in {} ms",
154 System.currentTimeMillis() - ts);
155 }
156 return result;
157 } catch (final RepositoryException re) {
158 throw new IOException("Error while checking statement.", re);
159 }
160 }
161
162 @Override
163 public CloseableIteration<BindingSet, QueryEvaluationException> query(final SelectQuery query,
164 @Nullable final BindingSet bindings, @Nullable final Long timeout)
165 throws DataCorruptedException, IOException, UnsupportedOperationException {
166
167 LOGGER.debug("Evaluating query:\n{}", query);
168
169 final TupleQuery tupleQuery;
170 try {
171 tupleQuery = this.connection
172 .prepareTupleQuery(QueryLanguage.SPARQL, query.getString());
173
174 } catch (final RepositoryException ex) {
175 throw new IOException("Failed to prepare SPARQL tuple query:\n" + query, ex);
176
177 } catch (final MalformedQueryException ex) {
178
179 throw new UnsupportedOperationException(
180 "SPARQL query rejected as malformed by Virtuoso:\n" + query, ex);
181 }
182
183 if (bindings != null) {
184 for (final Binding binding : bindings) {
185 tupleQuery.setBinding(binding.getName(), binding.getValue());
186 }
187 }
188
189
190
191
192
193
194 final int msTimeout = timeout == null ? 0 : timeout.intValue();
195 try {
196 this.connection.getQuadStoreConnection()
197 .prepareCall("set result_timeout = " + msTimeout).execute();
198 } catch (final Throwable ex) {
199 LOGGER.warn("Failed to set result_timeout = " + msTimeout
200 + " on Virtuoso JDBC connection", ex);
201 }
202
203 try {
204 final long ts = System.currentTimeMillis();
205 CloseableIteration<BindingSet, QueryEvaluationException> result;
206 result = tupleQuery.evaluate();
207 result = new IterationWrapper<BindingSet, QueryEvaluationException>(result) {
208
209 @Override
210 public boolean hasNext() throws QueryEvaluationException {
211 try {
212 return super.hasNext();
213 } catch (final QueryEvaluationException ex) {
214 if (isPartialResultException(ex)) {
215 return false;
216 }
217 throw ex;
218 }
219 }
220
221 };
222 result = logClose(result);
223 LOGGER.debug("Virtuoso iteration obtained in {} ms", System.currentTimeMillis() - ts);
224 return result;
225 } catch (final QueryEvaluationException ex) {
226 if (isPartialResultException(ex)) {
227 return new EmptyIteration<>();
228 }
229 throw new IOException("Failed to execute query - " + ex.getMessage(), ex);
230 }
231 }
232
233 @Override
234 public void infer(@Nullable final Handler<? super Statement> handler) throws IOException,
235 IllegalStateException {
236
237 checkWritable();
238
239
240 if (handler != null) {
241 try {
242 handler.handle(null);
243 } catch (final Throwable ex) {
244 Throwables.propagateIfPossible(ex, IOException.class);
245 throw new RuntimeException(ex);
246 }
247 }
248 }
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264 public void add(final Statement statement) throws DataCorruptedException, IOException {
265
266 Preconditions.checkNotNull(statement);
267 checkWritable();
268
269 try {
270 this.connection.add(statement);
271 } catch (final RepositoryException ex) {
272 throw new IOException("Failed to add statement: " + statement, ex);
273 }
274 }
275
276 @Override
277 public void add(final Iterable<? extends Statement> stream) throws IOException,
278 IllegalStateException {
279 addBulk(stream, false);
280 }
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295 public void addBulk(final Iterable<? extends Statement> statements, final boolean transaction)
296 throws DataCorruptedException, IOException {
297
298 Preconditions.checkNotNull(statements);
299 checkWritable();
300
301 try {
302 if (!transaction && !this.store.existsTransactionMarker()) {
303 this.store.addTransactionMarker();
304
305 this.connection.getQuadStoreConnection().prepareCall("log_enable(2)").execute();
306 }
307 this.connection.add(statements);
308 this.connection.commit();
309
310 } catch (final SQLException sqle) {
311 throw new IllegalStateException("Invalid internal operation.", sqle);
312 } catch (final RepositoryException e) {
313 throw new DataCorruptedException("Error while adding bulk data.", e);
314 }
315 }
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331 public void remove(final Statement statement) throws DataCorruptedException, IOException {
332
333 Preconditions.checkState(!this.readOnly);
334 checkWritable();
335
336 try {
337 this.connection.remove(statement);
338 } catch (final RepositoryException ex) {
339 throw new IOException("Failed to remove statement: " + statement, ex);
340 }
341 }
342
343 @Override
344 public void remove(final Iterable<? extends Statement> stream) throws IOException,
345 IllegalStateException {
346 removeBulk(stream, false);
347 }
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362 public void removeBulk(final Iterable<? extends Statement> statements,
363 final boolean transaction) throws DataCorruptedException, IOException {
364
365 Preconditions.checkNotNull(statements);
366 checkWritable();
367
368 try {
369 if (!transaction && !this.store.existsTransactionMarker()) {
370 this.store.addTransactionMarker();
371
372 this.connection.getQuadStoreConnection().prepareCall("log_enable(2)").execute();
373 }
374
375 this.connection.remove(statements);
376 this.connection.commit();
377
378 } catch (final SQLException sqle) {
379 throw new IllegalStateException("Invalid internal operation.", sqle);
380 } catch (final RepositoryException e) {
381 throw new DataCorruptedException("Error while adding bulk data.", e);
382 }
383 }
384
385 @Override
386 public void end(final boolean commit) throws IOException {
387
388 final long ts = System.currentTimeMillis();
389 boolean committed = false;
390
391 try {
392 if (!this.readOnly) {
393 if (commit) {
394 try {
395 if (this.store.existsTransactionMarker()) {
396 this.connection.getQuadStoreConnection().prepareCall("log_enable(1)")
397 .execute();
398 this.store.removeTransactionMarker();
399 }
400 this.connection.commit();
401 committed = true;
402
403 } catch (final Throwable ex) {
404 try {
405 if (this.store.existsTransactionMarker()) {
406 throw new DataCorruptedException("Cannot rollback! "
407 + "Modifications performed outside a transaction.");
408 }
409 this.connection.rollback();
410 LOGGER.debug("{} rolled back after commit failure", this);
411
412 } catch (final RepositoryException ex2) {
413 throw new DataCorruptedException(
414 "Failed to rollback transaction after commit failure", ex);
415 }
416 throw new IOException("Failed to commit transaction (rollback forced)", ex);
417 }
418 } else {
419 try {
420 this.connection.rollback();
421 } catch (final Throwable ex) {
422 throw new DataCorruptedException("Failed to rollback transaction", ex);
423 }
424 }
425 }
426 } finally {
427 try {
428 closeVirtuosoRepositoryConnection(this.connection);
429 } catch (final RepositoryException ex) {
430 LOGGER.error("Failed to close connection", ex);
431 } finally {
432 if (LOGGER.isDebugEnabled()) {
433 final long now = System.currentTimeMillis();
434 LOGGER.debug("{} {} and closed in {} ms, tx duration {} ms", this,
435 committed ? "committed" : "rolled back", now - ts, now - this.ts);
436 }
437 }
438 }
439 }
440
441 @Override
442 public String toString() {
443 return getClass().getSimpleName();
444 }
445
446 private static boolean isPartialResultException(final QueryEvaluationException ex) {
447 return ex.getMessage() != null && ex.getMessage().contains("Returning incomplete results");
448 }
449
450 private static void closeVirtuosoRepositoryConnection(
451 final VirtuosoRepositoryConnection connection) throws RepositoryException {
452
453 final Future<?> future = Data.getExecutor().schedule(new Runnable() {
454
455 @Override
456 public void run() {
457 final Connection jdbcConnection = connection.getQuadStoreConnection();
458 try {
459 final Field field = jdbcConnection.getClass().getDeclaredField("socket");
460 field.setAccessible(true);
461 final Closeable socket = (Closeable) field.get(jdbcConnection);
462 socket.close();
463 LOGGER.warn("Closed socket backing virtuoso connection");
464 } catch (final Throwable ex) {
465 LOGGER.debug("Failed to close socket backing virtuoso connection "
466 + "(connection class is " + jdbcConnection.getClass() + ")", ex);
467 }
468 }
469
470 }, 1000, TimeUnit.MILLISECONDS);
471
472 try {
473 connection.close();
474 } finally {
475 future.cancel(false);
476 }
477 }
478
479 }