1 package eu.fbk.knowledgestore.triplestore.virtuoso;
2
3 import com.google.common.base.MoreObjects;
4 import com.google.common.base.Preconditions;
5 import com.google.common.base.Throwables;
6 import com.google.common.collect.Lists;
7 import eu.fbk.knowledgestore.data.Data;
8 import eu.fbk.knowledgestore.data.Handler;
9 import eu.fbk.knowledgestore.internal.Util;
10 import eu.fbk.knowledgestore.runtime.DataCorruptedException;
11 import eu.fbk.knowledgestore.triplestore.SelectQuery;
12 import eu.fbk.knowledgestore.triplestore.TripleStore;
13 import eu.fbk.knowledgestore.triplestore.TripleTransaction;
14 import info.aduna.iteration.CloseableIteration;
15 import info.aduna.iteration.ConvertingIteration;
16 import org.openrdf.model.*;
17 import org.openrdf.model.Statement;
18 import org.openrdf.model.vocabulary.SESAME;
19 import org.openrdf.model.vocabulary.XMLSchema;
20 import org.openrdf.query.BindingSet;
21 import org.openrdf.query.Dataset;
22 import org.openrdf.query.QueryEvaluationException;
23 import org.openrdf.query.impl.ListBindingSet;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26 import virtuoso.jdbc4.ConnectionWrapper;
27 import virtuoso.jdbc4.VirtuosoConnection;
28 import virtuoso.jdbc4.VirtuosoConnectionPoolDataSource;
29 import virtuoso.jdbc4.VirtuosoPooledConnection;
30 import virtuoso.sql.ExtendedString;
31 import virtuoso.sql.RdfBox;
32
33 import javax.annotation.Nullable;
34 import java.io.Closeable;
35 import java.io.IOException;
36 import java.lang.reflect.Field;
37 import java.sql.*;
38 import java.util.Collections;
39 import java.util.List;
40 import java.util.NoSuchElementException;
41 import java.util.Set;
42 import java.util.concurrent.Future;
43 import java.util.concurrent.TimeUnit;
44 import java.util.concurrent.atomic.AtomicLong;
45
46 public final class VirtuosoJdbcTripleStore implements TripleStore {
47
48
49
50
51
52
53 private static final Logger LOGGER = LoggerFactory.getLogger(VirtuosoJdbcTripleStore.class);
54
55 private static final String DEFAULT_HOST = "localhost";
56
57 private static final int DEFAULT_PORT = 1111;
58
59 private static final String DEFAULT_USERNAME = "dba";
60
61 private static final String DEFAULT_PASSWORD = "dba";
62
63 private static final int DEFAULT_FETCH_SIZE = 200;
64
65 private static final long GRACE_PERIOD = 5000;
66
67 private final VirtuosoConnectionPoolDataSource source;
68
69 private final int fetchSize;
70
71 private final AtomicLong transactionCounter;
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86 public VirtuosoJdbcTripleStore(@Nullable final String host, @Nullable final Integer port,
87 @Nullable final String username, @Nullable final String password,
88 @Nullable final Integer fetchSize, @Nullable final String charset) {
89
90
91
92 this.source = new VirtuosoConnectionPoolDataSource();
93 this.source.setServerName(MoreObjects.firstNonNull(host, DEFAULT_HOST));
94 this.source.setPortNumber(MoreObjects.firstNonNull(port, DEFAULT_PORT));
95 this.source.setUser(MoreObjects.firstNonNull(username, DEFAULT_USERNAME));
96 this.source.setPassword(MoreObjects.firstNonNull(password, DEFAULT_PASSWORD));
97 this.source.setCharset(charset != null ? charset : "UTF-8");
98
99
100 this.fetchSize = MoreObjects.firstNonNull(fetchSize, DEFAULT_FETCH_SIZE);
101 this.transactionCounter = new AtomicLong(0L);
102 Preconditions.checkArgument(this.fetchSize > 0);
103
104
105 LOGGER.info("VirtuosoTripleStore configured, URL={}, fetchSize={}",
106 this.source.getServerName() + ":" + this.source.getPortNumber(), fetchSize);
107 }
108
109 @Override
110 public void init() throws IOException {
111
112 }
113
114 @Override
115 public TripleTransaction begin(final boolean readOnly) throws DataCorruptedException,
116 IOException {
117 return new VirtuosoTransaction(readOnly);
118 }
119
120 @Override
121 public void reset() throws IOException {
122 Connection connection = null;
123 try {
124 connection = this.source.getConnection();
125 connection.setReadOnly(false);
126 connection.setAutoCommit(true);
127 connection.prepareCall("RDF_GLOBAL_RESET ()").execute();
128 } catch (final SQLException ex) {
129 throw new IOException(ex);
130 } finally {
131 Util.closeQuietly(connection);
132 }
133 }
134
135 @Override
136 public void close() {
137
138 try {
139 this.source.close();
140 } catch (final SQLException ex) {
141 LOGGER.error("Failed to shutdown Virtuoso driver", ex);
142 }
143 }
144
145 @Override
146 public String toString() {
147 return getClass().getSimpleName();
148 }
149
150 private static Value castValue(final Object value) throws IllegalArgumentException {
151
152 final ValueFactory vf = Data.getValueFactory();
153
154 if (value == null) {
155 return null;
156 }
157 else if (value instanceof ExtendedString) {
158 final ExtendedString es = (ExtendedString) value;
159 String string = es.toString();
160 try {
161 if (es.getIriType() == ExtendedString.IRI && (es.getStrType() & 0x01) == 0x01) {
162 if (string.startsWith("_:")) {
163 string = string.substring(2);
164 return vf.createBNode(string);
165 }
166 else if (string.indexOf(':') < 0) {
167 return vf.createURI(":" + string);
168 }
169 else {
170 return vf.createURI(string);
171 }
172 }
173 else if (es.getIriType() == ExtendedString.BNODE) {
174 return vf.createBNode(string);
175 }
176 else {
177 return vf.createLiteral(string);
178 }
179 } catch (final Throwable ex) {
180 throw new IllegalArgumentException("Invalid value from Virtuoso: \"" + string
181 + "\", STRTYPE = " + es.getIriType(), ex);
182 }
183 }
184 else if (value instanceof RdfBox) {
185 final RdfBox rb = (RdfBox) value;
186 if (rb.getLang() != null) {
187 return vf.createLiteral(rb.toString(), rb.getLang());
188 }
189 else if (rb.getType() != null) {
190 return vf.createLiteral(rb.toString(), vf.createURI(rb.getType()));
191 }
192 else {
193 return vf.createLiteral(rb.toString());
194 }
195 }
196 else if (value instanceof Blob) {
197 return vf.createLiteral(value.toString(), XMLSchema.HEXBINARY);
198 }
199 else if (value instanceof Date) {
200 return Data.convert(new java.util.Date(((Date) value).getTime()), Value.class);
201 }
202 else if (value instanceof Timestamp) {
203 return Data.convert(new Date(((Timestamp) value).getTime()), Value.class);
204 }
205 else if (value instanceof Time) {
206 return vf.createLiteral(value.toString(), XMLSchema.TIME);
207 }
208 else {
209 try {
210 return Data.convert(value, Value.class);
211 } catch (final Throwable ex) {
212 throw new IllegalArgumentException("Could not parse value: " + value, ex);
213 }
214 }
215 }
216
217 private static String sqlForQuery(final String query, @Nullable final Dataset dataset,
218 @Nullable final BindingSet bindings) {
219
220
221 final StringBuilder builder = new StringBuilder("sparql\n ");
222
223
224 if (dataset != null) {
225 final Set<URI> empty = Collections.emptySet();
226 for (final URI uri : MoreObjects.firstNonNull(dataset.getDefaultGraphs(), empty)) {
227 builder.append(" define input:default-graph-uri <" + uri + "> \n");
228 }
229 for (final URI uri : MoreObjects.firstNonNull(dataset.getNamedGraphs(), empty)) {
230 builder.append(" define input:named-graph-uri <" + uri + "> \n");
231 }
232 }
233
234
235 if (bindings != null && bindings.size() > 0) {
236 int i = 0;
237 final int length = query.length();
238 while (i < query.length()) {
239 final char ch = query.charAt(i++);
240 if (ch == '\\' && i < length) {
241 builder.append(ch).append(query.charAt(i++));
242 }
243 else if (ch == '"' || ch == '\'') {
244 builder.append(ch);
245 while (i < length) {
246 final char c = query.charAt(i++);
247 builder.append(c);
248 if (c == ch) {
249 break;
250 }
251 }
252 }
253 else if ((ch == '?' || ch == '$') && i < length
254 && isVarFirstChar(query.charAt(i))) {
255 int j = i + 1;
256 while (j < length && isVarMiddleChar(query.charAt(j))) {
257 ++j;
258 }
259 final String name = query.substring(i, j);
260 final Value value = bindings.getValue(name);
261 if (value != null) {
262 builder.append(Data.toString(value, null));
263 }
264 else {
265 builder.append(ch).append(name);
266 }
267 i = j;
268 }
269 else {
270 builder.append(ch);
271 }
272 }
273 }
274 else {
275 builder.append(query);
276 }
277
278
279 return builder.toString();
280 }
281
282 private static boolean isVarFirstChar(final char c) {
283
284 return '0' <= c && c <= '9' || 'A' <= c && c <= 'Z' || 'a' <= c && c <= 'z' || c == '_'
285 || 0x00C0 <= c && c <= 0x00D6 || 0x00D8 <= c && c <= 0x00F6 || 0x00F8 <= c
286 && c <= 0x02FF || 0x0370 <= c && c <= 0x037D || 0x037F <= c && c <= 0x1FFF
287 || 0x200C <= c && c <= 0x200D || 0x2070 <= c && c <= 0x218F || 0x2C00 <= c
288 && c <= 0x2FEF || 0x3001 <= c && c <= 0xD7FF || 0xF900 <= c && c <= 0xFDCF
289 || 0xFDF0 <= c && c <= 0xFFFD;
290 }
291
292 private static boolean isVarMiddleChar(final char c) {
293
294 return isVarFirstChar(c) || c == 0x00B7 || 0x0300 <= c && c <= 0x036F || 0x203F <= c
295 && c <= 0x2040;
296 }
297
298 private static boolean isPartialResultException(final Throwable ex) {
299
300
301 return ex.getMessage() != null && ex.getMessage().contains("Returning incomplete results");
302 }
303
304 private static void killConnection(final Object connection) throws Throwable {
305 if (connection instanceof ConnectionWrapper) {
306 final Field field = ConnectionWrapper.class.getDeclaredField("pconn");
307 field.setAccessible(true);
308 killConnection(field.get(connection));
309 }
310 else if (connection instanceof VirtuosoPooledConnection) {
311 killConnection(((VirtuosoPooledConnection) connection).getVirtuosoConnection());
312 }
313 else if (connection instanceof VirtuosoConnection) {
314 final Field field = VirtuosoConnection.class.getDeclaredField("socket");
315 field.setAccessible(true);
316 final Closeable socket = (Closeable) field.get(connection);
317 socket.close();
318 }
319 else {
320 throw new Exception("Don't know how to kill connection "
321 + connection.getClass().getName());
322 }
323 }
324
325 private final class VirtuosoTransaction implements TripleTransaction {
326
327 private final Connection connection;
328
329 private final boolean readOnly;
330
331 private final String id;
332
333 VirtuosoTransaction(final boolean readOnly) throws IOException {
334
335
336 this.readOnly = readOnly;
337 this.id = "Virtuoso TX"
338 + VirtuosoJdbcTripleStore.this.transactionCounter.incrementAndGet();
339
340
341 Connection connection = null;
342 try {
343 connection = VirtuosoJdbcTripleStore.this.source.getConnection();
344 connection.setTransactionIsolation(Connection.TRANSACTION_READ_UNCOMMITTED);
345 connection.setReadOnly(readOnly);
346 connection.setAutoCommit(true);
347 } catch (final SQLException ex) {
348 Util.closeQuietly(connection);
349 throw new IOException("Could not connect to Virtuoso", ex);
350 }
351 this.connection = connection;
352 }
353
354 private void checkWritable() {
355 if (this.readOnly) {
356 throw new IllegalStateException(
357 "Write operation not allowed on read-only transaction");
358 }
359 }
360
361 @Override
362 public CloseableIteration<? extends Statement, ? extends Exception> get(
363 final Resource subject, final URI predicate, final Value object,
364 final Resource context) throws IOException, IllegalStateException {
365
366
367
368 final StringBuilder builder = new StringBuilder();
369 builder.append("SELECT * WHERE { GRAPH ");
370 builder.append(context == null ? "?c" : Data.toString(context, null));
371 builder.append(" { ");
372 builder.append(subject == null ? "?s" : Data.toString(subject, null));
373 builder.append(' ');
374 builder.append(predicate == null ? "?p" : Data.toString(predicate, null));
375 builder.append(' ');
376 builder.append(object == null ? "?o" : Data.toString(object, null));
377 builder.append(" } }");
378 final String query = builder.toString();
379
380
381 return new ConvertingIteration<BindingSet, Statement, Exception>(
382 new VirtuosoQueryIteration(query, null, null, null)) {
383
384 @Override
385 protected Statement convert(final BindingSet tuple) throws Exception {
386 final Resource s = subject != null ? subject : (Resource) tuple.getValue("s");
387 final URI p = predicate != null ? predicate : (URI) tuple.getValue("p");
388 final Value o = object != null ? object : tuple.getValue("o");
389 final Resource c = context != null ? context : (Resource) tuple.getValue("c");
390 return Data.getValueFactory().createStatement(s, p, o, c);
391 }
392
393 };
394 }
395
396 @Override
397 public CloseableIteration<BindingSet, QueryEvaluationException> query(
398 final SelectQuery query, final BindingSet bindings, final Long timeout)
399 throws IOException, UnsupportedOperationException, IllegalStateException {
400
401
402 return new VirtuosoQueryIteration(query.getString(), query.getDataset(), bindings,
403 timeout);
404 }
405
406 @Override
407 public void infer(final Handler<? super Statement> handler) throws IOException,
408 IllegalStateException {
409
410 checkWritable();
411
412
413 if (handler != null) {
414 try {
415 handler.handle(null);
416 } catch (final Throwable ex) {
417 Throwables.propagateIfPossible(ex, IOException.class);
418 throw new RuntimeException(ex);
419 }
420 }
421 }
422
423 @Override
424 public void add(final Iterable<? extends Statement> statements) throws IOException,
425 IllegalStateException {
426 LOGGER.debug("UPDATING");
427 update(true, statements);
428 }
429
430 @Override
431 public void remove(final Iterable<? extends Statement> statements) throws IOException,
432 IllegalStateException {
433 LOGGER.debug("REMOVING");
434 update(false, statements);
435 }
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488 private void update(final boolean insert, final Iterable<? extends Statement> statements)
489 throws IOException, IllegalStateException {
490
491
492
493
494
495
496
497
498 Preconditions.checkNotNull(statements);
499 checkWritable();
500
501 try {
502 String command = "DB.DBA.rdf_insert_triple_c (?,?,?,?,?,?)";
503 if (!insert) {
504 command = "DB.DBA.rdf_delete_triple_c (?,?,?,?,?,?)";
505 }
506
507 PreparedStatement insertStmt;
508 insertStmt = this.connection.prepareStatement(command);
509
510 for (Statement stmt : statements) {
511 insertStmt.setString(1, stmt.getSubject().toString());
512 insertStmt.setString(2, stmt.getPredicate().toString());
513 if (stmt.getObject() instanceof Resource) {
514 insertStmt.setString(3, stmt.getObject().toString());
515 insertStmt.setNull(4, 12);
516 insertStmt.setInt(5, 0);
517 }
518 else if (stmt.getObject() instanceof Literal) {
519 Literal lit = (Literal) stmt.getObject();
520 insertStmt.setString(3, lit.getLabel());
521 if (lit.getLanguage() != null) {
522 insertStmt.setString(4, lit.getLanguage());
523 insertStmt.setInt(5, 2);
524 }
525 else if (lit.getDatatype() != null) {
526 insertStmt.setString(4, lit.getDatatype().toString());
527 insertStmt.setInt(5, 3);
528 }
529 else {
530 insertStmt.setNull(4, 12);
531 insertStmt.setInt(5, 1);
532 }
533 }
534
535 Resource context = stmt.getContext();
536 if (context == null) {
537 context = SESAME.NIL;
538 }
539 insertStmt.setString(6, context.toString());
540 insertStmt.addBatch();
541 }
542
543 LOGGER.info("Starting Virtuoso ingestion");
544 insertStmt.executeBatch();
545 LOGGER.info("Finishing Virtuoso ingestion (burp!)");
546
547 insertStmt.clearBatch();
548 insertStmt.close();
549
550 LOGGER.info("Starting full text index update");
551 try {
552 VirtuosoTransaction.this.connection.prepareCall("DB.DBA.VT_INC_INDEX_DB_DBA_RDF_OBJ ()").execute();
553 } catch (SQLException e) {
554 e.printStackTrace();
555 }
556 LOGGER.info("Ending full text index update");
557
558 } catch (final SQLException ex) {
559 throw new IOException(ex);
560 }
561 }
562
563 @Override
564 public void end(final boolean commit) throws DataCorruptedException, IOException,
565 IllegalStateException {
566
567 try {
568
569 final Future<?> future = Data.getExecutor().schedule(new Runnable() {
570
571 @Override
572 public void run() {
573 try {
574 killConnection(VirtuosoTransaction.this.connection);
575 LOGGER.warn("{} - killed Virtuoso JDBC connection", this);
576 } catch (final Throwable ex) {
577 LOGGER.debug(this + " - failed to kill Virtuoso JDBC connection "
578 + "(connection class is "
579 + VirtuosoTransaction.this.connection.getClass() + ")", ex);
580 }
581 }
582
583 }, 1000, TimeUnit.MILLISECONDS);
584
585
586 if (!this.readOnly) {
587 if (commit) {
588 this.connection.commit();
589 }
590 else {
591 this.connection.rollback();
592 }
593 }
594
595
596 this.connection.close();
597 future.cancel(false);
598
599 } catch (final SQLException ex) {
600 LOGGER.error(this + " - failed to close connection", ex);
601 }
602 }
603
604 @Override
605 public String toString() {
606 return this.id;
607 }
608
609 private final class VirtuosoQueryIteration implements
610 CloseableIteration<BindingSet, QueryEvaluationException> {
611
612 private final List<String> variables;
613
614 private java.sql.Statement statement;
615
616 private ResultSet cursor;
617
618 private BindingSet tuple;
619
620 public VirtuosoQueryIteration(final String query, @Nullable final Dataset dataset,
621 @Nullable final BindingSet bindings, @Nullable final Long timeout)
622 throws IOException {
623
624 try {
625
626 final String sql = sqlForQuery(query, dataset, bindings);
627
628
629 final int msTimeout = timeout == null ? 0 : timeout.intValue();
630 try {
631 VirtuosoTransaction.this.connection.prepareCall(
632 "set result_timeout = " + msTimeout).execute();
633 } catch (final Throwable ex) {
634 LOGGER.warn(VirtuosoTransaction.this
635 + " - failed to set result_timeout = " + msTimeout
636 + " on Virtuoso JDBC connection (proceeding anyway)", ex);
637 }
638
639
640 this.statement = VirtuosoTransaction.this.connection.createStatement();
641 this.statement.setFetchDirection(ResultSet.FETCH_FORWARD);
642 this.statement.setFetchSize(VirtuosoJdbcTripleStore.this.fetchSize);
643 if (timeout != null) {
644
645 this.statement.setQueryTimeout((int) ((timeout + GRACE_PERIOD) / 1000));
646 }
647
648
649
650 this.variables = Lists.newArrayList();
651 this.tuple = null;
652
653
654 this.cursor = this.statement.executeQuery(sql);
655
656
657 final ResultSetMetaData metadata = this.cursor.getMetaData();
658 for (int i = 1; i <= metadata.getColumnCount(); ++i) {
659 this.variables.add(metadata.getColumnName(i));
660 }
661
662 } catch (final Throwable ex) {
663 if (isPartialResultException(ex)) {
664
665
666 LOGGER.debug(
667 "{} -no results / partial results returned due to expired timeout",
668 VirtuosoTransaction.this);
669 Util.closeQuietly(this);
670 }
671 throw new IOException("Could not obtain query result set", ex);
672 }
673 }
674
675 @Override
676 public boolean hasNext() throws QueryEvaluationException {
677 if (this.tuple == null) {
678 this.tuple = advance();
679 }
680 return this.tuple != null;
681 }
682
683 @Override
684 public BindingSet next() throws QueryEvaluationException {
685 if (this.tuple == null) {
686 this.tuple = advance();
687 }
688 if (this.tuple == null) {
689 throw new NoSuchElementException();
690 }
691 final BindingSet result = this.tuple;
692 this.tuple = null;
693 return result;
694 }
695
696 @Override
697 public void remove() throws QueryEvaluationException {
698 throw new UnsupportedOperationException();
699 }
700
701 @Override
702 public void close() throws QueryEvaluationException {
703 if (this.statement != null) {
704
705 final Future<?> future = Data.getExecutor().schedule(new Runnable() {
706
707 @Override
708 public void run() {
709 try {
710 end(false);
711 LOGGER.warn(VirtuosoTransaction.this
712 + " - forced closure of Virtuoso transaction "
713 + "after unsuccessfull attempt at closing Virtuoso iteration");
714 } catch (final Throwable ex) {
715 LOGGER.debug(VirtuosoTransaction.this
716 + " - failed to close Virtuoso transaction after "
717 + "unsuccessfull attempt at closing Virtuoso iteration",
718 ex);
719 }
720 }
721
722 }, 1000, TimeUnit.MILLISECONDS);
723
724 try {
725
726
727
728
729
730 this.cursor.close();
731 this.statement.close();
732 future.cancel(false);
733
734 } catch (final SQLException e) {
735 throw new QueryEvaluationException(e);
736
737 } finally {
738 this.statement = null;
739 this.cursor = null;
740 }
741 }
742 }
743
744 @Override
745 protected void finalize() throws Throwable {
746 Util.closeQuietly(this);
747 }
748
749 private BindingSet advance() throws QueryEvaluationException {
750 try {
751 final BindingSet result = null;
752 if (this.cursor != null) {
753 if (this.cursor.next()) {
754 final int size = this.variables.size();
755 final Value[] values = new Value[size];
756 for (int i = 0; i < size; ++i) {
757 values[i] = castValue(this.cursor.getObject(this.variables.get(i)));
758 }
759 return new ListBindingSet(this.variables, values);
760 }
761 else {
762 close();
763 }
764 }
765 return result;
766 } catch (final Exception ex) {
767 if (isPartialResultException(ex)) {
768
769 LOGGER.debug("{} - partial results returned due to expired timeout",
770 VirtuosoTransaction.this);
771 return null;
772 }
773 throw new QueryEvaluationException("Could not retrieve next query result", ex);
774 }
775 }
776
777 }
778
779 }
780
781 }