1 package eu.fbk.knowledgestore.internal.rdf;
2
3 import java.io.File;
4 import java.io.IOException;
5 import java.io.InputStream;
6 import java.io.OutputStream;
7 import java.lang.reflect.Field;
8 import java.util.Collections;
9 import java.util.Comparator;
10 import java.util.List;
11 import java.util.Map;
12 import java.util.Set;
13 import java.util.concurrent.ArrayBlockingQueue;
14 import java.util.concurrent.BlockingQueue;
15 import java.util.concurrent.CountDownLatch;
16 import java.util.concurrent.Future;
17 import java.util.concurrent.atomic.AtomicBoolean;
18 import java.util.concurrent.atomic.AtomicLong;
19 import java.util.concurrent.atomic.AtomicReference;
20
21 import javax.annotation.Nullable;
22
23 import com.google.common.base.Function;
24 import com.google.common.base.MoreObjects;
25 import com.google.common.base.Preconditions;
26 import com.google.common.base.Strings;
27 import com.google.common.base.Throwables;
28 import com.google.common.collect.ImmutableList;
29 import com.google.common.collect.Lists;
30 import com.google.common.collect.Maps;
31 import com.google.common.collect.Ordering;
32
33 import org.openrdf.model.BNode;
34 import org.openrdf.model.Literal;
35 import org.openrdf.model.Namespace;
36 import org.openrdf.model.Resource;
37 import org.openrdf.model.Statement;
38 import org.openrdf.model.URI;
39 import org.openrdf.model.Value;
40 import org.openrdf.model.impl.NamespaceImpl;
41 import org.openrdf.query.BindingSet;
42 import org.openrdf.query.QueryEvaluationException;
43 import org.openrdf.query.QueryResultHandlerException;
44 import org.openrdf.query.TupleQueryResultHandlerBase;
45 import org.openrdf.query.TupleQueryResultHandlerException;
46 import org.openrdf.query.resultio.BasicQueryWriterSettings;
47 import org.openrdf.query.resultio.BooleanQueryResultFormat;
48 import org.openrdf.query.resultio.BooleanQueryResultParser;
49 import org.openrdf.query.resultio.BooleanQueryResultWriter;
50 import org.openrdf.query.resultio.QueryResultIO;
51 import org.openrdf.query.resultio.TupleQueryResultFormat;
52 import org.openrdf.query.resultio.TupleQueryResultParser;
53 import org.openrdf.query.resultio.TupleQueryResultWriter;
54 import org.openrdf.rio.ParserConfig;
55 import org.openrdf.rio.RDFFormat;
56 import org.openrdf.rio.RDFHandler;
57 import org.openrdf.rio.RDFHandlerException;
58 import org.openrdf.rio.RDFParseException;
59 import org.openrdf.rio.RDFParser;
60 import org.openrdf.rio.RDFWriter;
61 import org.openrdf.rio.Rio;
62 import org.openrdf.rio.RioSetting;
63 import org.openrdf.rio.WriterConfig;
64 import org.openrdf.rio.helpers.BasicParserSettings;
65 import org.openrdf.rio.helpers.BasicWriterSettings;
66 import org.openrdf.rio.helpers.JSONLDMode;
67 import org.openrdf.rio.helpers.JSONLDSettings;
68 import org.openrdf.rio.helpers.NTriplesParserSettings;
69 import org.openrdf.rio.helpers.RDFHandlerBase;
70 import org.openrdf.rio.helpers.RDFJSONParserSettings;
71 import org.openrdf.rio.helpers.RDFParserBase;
72 import org.openrdf.rio.helpers.TriXParserSettings;
73 import org.openrdf.rio.helpers.XMLParserSettings;
74 import org.openrdf.rio.helpers.XMLWriterSettings;
75 import org.slf4j.Logger;
76
77 import info.aduna.iteration.CloseableIteration;
78 import info.aduna.iteration.Iteration;
79
80 import eu.fbk.knowledgestore.data.Data;
81 import eu.fbk.knowledgestore.data.Handler;
82 import eu.fbk.knowledgestore.data.ParseException;
83 import eu.fbk.knowledgestore.data.Stream;
84 import eu.fbk.knowledgestore.internal.Compression;
85 import eu.fbk.knowledgestore.internal.Logging;
86 import eu.fbk.knowledgestore.internal.Util;
87 import eu.fbk.rdfpro.jsonld.JSONLD;
88 import eu.fbk.rdfpro.tql.TQL;
89
90
91
92 public final class RDFUtil {
93
94 public static final String PROPERTY_VARIABLES = "variables";
95
96 private static boolean jsonldDisabled = false;
97
98 public static void toHtml(final Value value, @Nullable final Map<String, String> prefixes,
99 final Appendable sink) throws IOException {
100 if (value instanceof Literal) {
101 final Literal literal = (Literal) value;
102 sink.append("<span");
103 if (literal.getLanguage() != null) {
104 sink.append(" title=\"@").append(literal.getLanguage()).append("\"");
105 } else if (literal.getDatatype() != null) {
106 sink.append(" title=\"<").append(literal.getDatatype().stringValue())
107 .append(">\"");
108 }
109 sink.append(">").append(value.stringValue()).append("</span>");
110 } else if (value instanceof BNode) {
111 sink.append("_:").append(((BNode) value).getID());
112 } else if (value instanceof URI) {
113 final URI uri = (URI) value;
114 sink.append("<a href=\"").append(uri.stringValue()).append("\">");
115 String prefix = null;
116 if (prefixes != null) {
117 prefix = prefixes.get(uri.getNamespace());
118 }
119 if (prefix == null) {
120 prefix = Data.namespaceToPrefix(uri.getNamespace(), Data.getNamespaceMap());
121 }
122 if (prefix != null) {
123 sink.append(prefix).append(':').append(uri.getLocalName());
124 } else {
125 final int index = uri.stringValue().lastIndexOf('/');
126 if (index >= 0) {
127 sink.append("<..").append(uri.stringValue().substring(index))
128 .append(">");
129 } else {
130 sink.append("<").append(uri.stringValue()).append(">");
131 }
132 }
133 sink.append("</a>");
134 }
135 }
136
137 public static Stream<Statement> toStatementStream(
138 final Iteration<? extends BindingSet, ?> iteration) {
139
140 Preconditions.checkNotNull(iteration);
141
142 return Stream.create(iteration).transform(new Function<BindingSet, Statement>() {
143
144 @Override
145 @Nullable
146 public Statement apply(final BindingSet bindings) {
147 final Value subject = bindings.getValue("subject");
148 final Value predicate = bindings.getValue("predicate");
149 final Value object = bindings.getValue("object");
150 final Value context = bindings.getValue("context");
151 if (subject instanceof Resource && predicate instanceof URI && object != null) {
152 final Resource subj = (Resource) subject;
153 final URI pred = (URI) predicate;
154 if (context == null) {
155 return Data.getValueFactory().createStatement(subj, pred, object);
156 } else if (context instanceof Resource) {
157 final Resource ctx = (Resource) context;
158 return Data.getValueFactory().createStatement(subj, pred, object, ctx);
159 }
160 }
161 return null;
162 }
163
164 }, 0);
165 }
166
167 public static Stream<BindingSet> toBindingsStream(
168 final CloseableIteration<BindingSet, QueryEvaluationException> iteration,
169 final Iterable<? extends String> variables) {
170
171 Preconditions.checkNotNull(iteration);
172
173 final List<String> variableList = ImmutableList.copyOf(variables);
174 final CompactBindingSet.Builder builder = CompactBindingSet.builder(variableList);
175
176 return Stream.create(iteration).transform(new Function<BindingSet, BindingSet>() {
177
178 @Override
179 @Nullable
180 public BindingSet apply(final BindingSet bindings) {
181 final int variableCount = variableList.size();
182 for (int i = 0; i < variableCount; ++i) {
183 final String variable = variableList.get(i);
184 builder.set(variable, bindings.getValue(variable));
185 }
186 return builder.build();
187 }
188
189 }, 0).setProperty(PROPERTY_VARIABLES, variableList);
190 }
191
192 public static int detectSparqlProlog(final String string) {
193 final int length = string.length();
194 int index = 0;
195 while (index < length) {
196 final char ch = string.charAt(index);
197 if (ch == '#') {
198 while (index < length && string.charAt(index) != '\n') {
199 ++index;
200 }
201 } else if (ch == 'p' || ch == 'b' || ch == 'P' || ch == 'B') {
202 while (index < length && string.charAt(index) != '>') {
203 ++index;
204 }
205 } else if (!Character.isWhitespace(ch)) {
206 return index;
207 }
208 ++index;
209 }
210 throw new ParseException(string, "Cannot detect SPARQL prolog");
211 }
212
213 public static String detectSparqlForm(final String string) {
214 final int start = detectSparqlProlog(string);
215 for (int i = start; i < string.length(); ++i) {
216 final char ch = string.charAt(i);
217 if (Character.isWhitespace(ch)) {
218 final String form = string.substring(start, i).toLowerCase();
219 if (!form.equals("select") && !form.equals("construct")
220 && !form.equals("describe") && !form.equals("ask")) {
221 throw new ParseException(string, "Invalid query form: " + form);
222 }
223 return form;
224 }
225 }
226 throw new ParseException(string, "Cannot detect query form");
227 }
228
229 public static long writeSparqlTuples(final TupleQueryResultFormat format,
230 final OutputStream out, final Stream<? extends BindingSet> stream) {
231
232 final TupleQueryResultWriter writer = RDFUtil.newSparqlTupleWriter(format, out);
233
234 try {
235 final AtomicLong result = new AtomicLong();
236 stream.toHandler(new Handler<BindingSet>() {
237
238 private boolean started = false;
239
240 private long count = 0L;
241
242 @Override
243 public void handle(final BindingSet bindings) throws QueryResultHandlerException {
244 if (!this.started) {
245 @SuppressWarnings("unchecked")
246 final List<String> variables = (List<String>) stream.getProperty(
247 PROPERTY_VARIABLES, Object.class);
248 writer.startDocument();
249 writer.startHeader();
250 writer.startQueryResult(variables);
251 this.started = true;
252 }
253 if (bindings != null) {
254 writer.handleSolution(bindings);
255 ++this.count;
256 } else if (this.started) {
257 writer.endQueryResult();
258 result.set(this.count);
259 }
260 }
261
262 });
263 return result.get();
264
265 } catch (final Exception ex) {
266 throw Throwables.propagate(ex);
267
268 } finally {
269 Util.closeQuietly(stream);
270 }
271 }
272
273 public static Stream<BindingSet> readSparqlTuples(final TupleQueryResultFormat format,
274 final InputStream in) {
275
276
277 final TupleQueryResultParser parser = newSparqlTupleParser(format);
278
279
280 final Map<String, String> mdc = Logging.getMDC();
281 return new Stream<BindingSet>() {
282
283 @Override
284 protected void doToHandler(final Handler<? super BindingSet> handler) throws Throwable {
285 final Map<String, String> oldMdc = Logging.getMDC();
286 try {
287 Logging.setMDC(mdc);
288 parser.setQueryResultHandler(new TupleQueryResultHandlerBase() {
289
290 private CompactBindingSet.Builder builder;
291
292 @Override
293 public void startQueryResult(final List<String> vars)
294 throws TupleQueryResultHandlerException {
295 final List<String> variables = ImmutableList.copyOf(vars);
296 setProperty(PROPERTY_VARIABLES, variables);
297 this.builder = CompactBindingSet.builder(variables);
298 }
299
300 @Override
301 public void handleSolution(final BindingSet bindings)
302 throws TupleQueryResultHandlerException {
303 if (bindings != null) {
304 emit(bindings);
305 }
306 }
307
308 @Override
309 public void endQueryResult() throws TupleQueryResultHandlerException {
310 emit(null);
311 }
312
313 private void emit(final BindingSet bindings)
314 throws TupleQueryResultHandlerException {
315 try {
316 BindingSet compactBindings = bindings;
317 if (bindings != null) {
318 this.builder.setAll(bindings);
319 compactBindings = this.builder.build();
320 }
321 handler.handle(compactBindings);
322 } catch (final Throwable ex) {
323 Throwables.propagateIfPossible(ex,
324 TupleQueryResultHandlerException.class);
325 throw new TupleQueryResultHandlerException(ex);
326 }
327 }
328
329 });
330 parser.parseQueryResult(in);
331 } finally {
332 Logging.setMDC(oldMdc);
333 }
334 }
335
336 };
337 }
338
339 public static void writeSparqlBoolean(final BooleanQueryResultFormat format,
340 final OutputStream out, final boolean value) {
341
342 final BooleanQueryResultWriter writer = newSparqlBooleanWriter(format, out);
343
344 try {
345 writer.startDocument();
346 writer.startHeader();
347 writer.handleBoolean(value);
348
349 } catch (final Exception ex) {
350 Throwables.propagate(ex);
351 }
352 }
353
354 public static boolean readSparqlBoolean(final BooleanQueryResultFormat format,
355 final InputStream in) {
356
357 final BooleanQueryResultParser parser = newSparqlBooleanReader(format);
358
359 try {
360 final AtomicBoolean resultHolder = new AtomicBoolean();
361 parser.setQueryResultHandler(new TupleQueryResultHandlerBase() {
362
363 @Override
364 public void handleBoolean(final boolean result) throws QueryResultHandlerException {
365 resultHolder.set(result);
366 }
367
368 });
369 parser.parseQueryResult(in);
370 return resultHolder.get();
371
372 } catch (final Exception ex) {
373 throw Throwables.propagate(ex);
374 }
375 }
376
377 public static long writeRDF(final OutputStream out, final RDFFormat format,
378 @Nullable final Map<String, String> namespaces,
379 @Nullable final Map<? extends RioSetting<?>, ? extends Object> settings,
380 final Stream<? extends Statement> stream) {
381
382 final Map<RioSetting<?>, Object> actualSettings = Maps.newHashMap();
383 if (settings != null) {
384 actualSettings.putAll(settings);
385 }
386 final Object types = stream.getProperty("types", Object.class);
387 if (types instanceof Set && !jsonldDisabled) {
388 try {
389 actualSettings.put(JSONLD.ROOT_TYPES, types);
390 } catch (final Throwable ex) {
391 jsonldDisabled = true;
392 }
393 }
394
395 try {
396 final RDFHandler handler = writeRDF(out, format, namespaces, actualSettings);
397 final AtomicLong result = new AtomicLong();
398 stream.toHandler(new Handler<Statement>() {
399
400 private boolean started = false;
401
402 private long count = 0L;
403
404 @Override
405 public void handle(final Statement statement) throws RDFHandlerException {
406 if (!this.started) {
407 handler.startRDF();
408 this.started = true;
409 }
410 if (statement != null) {
411 handler.handleStatement(statement);
412 ++this.count;
413 } else if (this.started) {
414 handler.endRDF();
415 result.set(this.count);
416 }
417 }
418
419 });
420 return result.get();
421
422 } catch (final Exception ex) {
423 throw Throwables.propagate(ex);
424
425 } finally {
426 Util.closeQuietly(stream);
427 }
428 }
429
430 @SuppressWarnings({ "unchecked", "rawtypes" })
431 public static RDFHandler writeRDF(final OutputStream out, final RDFFormat format,
432 @Nullable final Map<String, String> namespaces,
433 @Nullable final Map<? extends RioSetting<?>, ? extends Object> settings)
434 throws IOException, RDFHandlerException {
435
436 final RDFWriter writer = Rio.createWriter(format, out);
437
438 final WriterConfig config = writer.getWriterConfig();
439 config.set(BasicWriterSettings.PRETTY_PRINT, true);
440 config.set(BasicWriterSettings.RDF_LANGSTRING_TO_LANG_LITERAL, true);
441 config.set(BasicWriterSettings.XSD_STRING_TO_PLAIN_LITERAL, true);
442
443 if (format.equals(RDFFormat.RDFXML)) {
444 config.set(XMLWriterSettings.INCLUDE_XML_PI, true);
445 config.set(XMLWriterSettings.INCLUDE_ROOT_RDF_TAG, true);
446 }
447
448 if (settings != null) {
449 for (final Map.Entry entry : settings.entrySet()) {
450 config.set((RioSetting) entry.getKey(), entry.getValue());
451 }
452 }
453
454 return namespaces == null ? writer : newNamespaceHandler(writer, namespaces, null);
455 }
456
457 public static Stream<Statement> readRDF(final InputStream in, final RDFFormat format,
458 @Nullable final Map<String, String> namespaces, @Nullable final String base,
459 final boolean preserveBNodes) {
460
461 final Map<String, String> mdc = Logging.getMDC();
462 return new Stream<Statement>() {
463
464 @Override
465 protected void doToHandler(final Handler<? super Statement> handler) throws Throwable {
466 final Map<String, String> oldMdc = Logging.getMDC();
467 try {
468 Logging.setMDC(mdc);
469 final RDFHandler rdfHandler = new RDFHandlerBase() {
470
471 @Override
472 public void handleStatement(final Statement statement)
473 throws RDFHandlerException {
474 emit(statement);
475 }
476
477 @Override
478 public void endRDF() throws RDFHandlerException {
479 emit(null);
480 }
481
482 private void emit(final Statement statement) throws RDFHandlerException {
483 try {
484 handler.handle(statement);
485 } catch (final Throwable ex) {
486 Throwables.propagateIfPossible(ex, RDFHandlerException.class);
487 throw new RuntimeException(ex);
488 }
489 }
490
491 };
492 readRDF(in, format, namespaces, base, preserveBNodes, rdfHandler);
493 } finally {
494 Logging.setMDC(oldMdc);
495 }
496 }
497
498 };
499 }
500
501 public static void readRDF(final InputStream in, @Nullable final RDFFormat format,
502 @Nullable final Map<String, String> namespaces, @Nullable final String base,
503 final boolean preserveBNodes, final RDFHandler handler) throws IOException,
504 RDFParseException, RDFHandlerException {
505
506 final RDFParser parser = Rio.createParser(format);
507 parser.setValueFactory(Data.getValueFactory());
508
509 final ParserConfig config = parser.getParserConfig();
510 config.set(BasicParserSettings.FAIL_ON_UNKNOWN_DATATYPES, true);
511 config.set(BasicParserSettings.FAIL_ON_UNKNOWN_LANGUAGES, true);
512 config.set(BasicParserSettings.VERIFY_DATATYPE_VALUES, false);
513 config.set(BasicParserSettings.VERIFY_LANGUAGE_TAGS, true);
514 config.set(BasicParserSettings.VERIFY_RELATIVE_URIS, true);
515 config.set(BasicParserSettings.NORMALIZE_DATATYPE_VALUES, true);
516 config.set(BasicParserSettings.NORMALIZE_LANGUAGE_TAGS, true);
517 config.set(BasicParserSettings.PRESERVE_BNODE_IDS, preserveBNodes);
518
519 if (format.equals(RDFFormat.NTRIPLES)) {
520 config.set(NTriplesParserSettings.FAIL_ON_NTRIPLES_INVALID_LINES, true);
521
522 } else if (format.equals(RDFFormat.JSONLD)) {
523
524 config.set(JSONLDSettings.COMPACT_ARRAYS, true);
525 config.set(JSONLDSettings.OPTIMIZE, true);
526 config.set(JSONLDSettings.USE_NATIVE_TYPES, false);
527 config.set(JSONLDSettings.USE_RDF_TYPE, false);
528 config.set(JSONLDSettings.JSONLD_MODE, JSONLDMode.COMPACT);
529
530 } else if (format.equals(RDFFormat.RDFJSON)) {
531 config.set(RDFJSONParserSettings.FAIL_ON_MULTIPLE_OBJECT_DATATYPES, true);
532 config.set(RDFJSONParserSettings.FAIL_ON_MULTIPLE_OBJECT_LANGUAGES, true);
533 config.set(RDFJSONParserSettings.FAIL_ON_MULTIPLE_OBJECT_TYPES, true);
534 config.set(RDFJSONParserSettings.FAIL_ON_MULTIPLE_OBJECT_VALUES, true);
535 config.set(RDFJSONParserSettings.FAIL_ON_UNKNOWN_PROPERTY, true);
536 config.set(RDFJSONParserSettings.SUPPORT_GRAPHS_EXTENSION, true);
537
538 } else if (format.equals(RDFFormat.TRIX)) {
539 config.set(TriXParserSettings.FAIL_ON_TRIX_INVALID_STATEMENT, true);
540 config.set(TriXParserSettings.FAIL_ON_TRIX_MISSING_DATATYPE, false);
541
542 } else if (format.equals(RDFFormat.RDFXML)) {
543 config.set(XMLParserSettings.FAIL_ON_DUPLICATE_RDF_ID, true);
544 config.set(XMLParserSettings.FAIL_ON_INVALID_NCNAME, true);
545 config.set(XMLParserSettings.FAIL_ON_INVALID_QNAME, true);
546 config.set(XMLParserSettings.FAIL_ON_MISMATCHED_TAGS, true);
547 config.set(XMLParserSettings.FAIL_ON_NON_STANDARD_ATTRIBUTES, false);
548 config.set(XMLParserSettings.FAIL_ON_SAX_NON_FATAL_ERRORS, false);
549 }
550
551 if (namespaces != null && parser instanceof RDFParserBase) {
552 try {
553 final Field field = RDFParserBase.class.getDeclaredField("namespaceTable");
554 field.setAccessible(true);
555 field.set(parser, Data.newNamespaceMap(Data.newNamespaceMap(), namespaces));
556 } catch (final Throwable ex) {
557
558 ex.printStackTrace();
559 }
560 }
561
562 parser.setRDFHandler(handler);
563 parser.parse(in, Strings.nullToEmpty(base));
564 }
565
566 public static void readRDF(final Map<File, ? extends RDFHandler> sources,
567 @Nullable final RDFFormat format, @Nullable final Map<String, String> namespaces,
568 @Nullable final String base, final boolean preserveBNodes,
569 @Nullable final Compression compression, final int parallelism) throws IOException,
570 RDFParseException, RDFHandlerException {
571
572
573 final Map<File, RDFHandler> actualSources = Maps.newHashMap(sources);
574 final List<File> sortedFiles = Lists.newArrayList(sources.keySet());
575 Collections.sort(sortedFiles, new Comparator<File>() {
576
577 @Override
578 public int compare(final File first, final File second) {
579 if (first == null) {
580 return second == null ? 0 : -1;
581 } else {
582 return second == null ? 1 : (int) (second.length() - first.length());
583 }
584 }
585
586 });
587
588
589 final int actualParallelism = Math.max(1, Math.min(parallelism, sortedFiles.size()));
590
591
592 if (actualParallelism == 1) {
593 for (final File file : sortedFiles) {
594 final RDFHandler handler = actualSources.get(file);
595 readRDFHelper(file, format, namespaces, base, preserveBNodes, compression, handler);
596 }
597 return;
598 }
599
600
601 final AtomicReference<Throwable> exceptionHolder = new AtomicReference<Throwable>(null);
602 final CountDownLatch latch = new CountDownLatch(actualParallelism);
603
604
605 for (int i = 0; i < actualParallelism; ++i) {
606 Data.getExecutor().execute(new Runnable() {
607
608 @Override
609 public void run() {
610 try {
611 while (exceptionHolder.get() == null) {
612 final File file;
613 final RDFHandler handler;
614 synchronized (sortedFiles) {
615 if (sortedFiles.isEmpty() || exceptionHolder.get() != null) {
616 break;
617 }
618 file = sortedFiles.remove(0);
619 handler = actualSources.get(file);
620 }
621 readRDFHelper(file, format, namespaces, base, preserveBNodes,
622 compression, handler);
623 }
624 } catch (final Throwable ex) {
625 exceptionHolder.set(ex);
626 } finally {
627 latch.countDown();
628 }
629 }
630
631 });
632 }
633
634 try {
635 latch.await();
636 } catch (final InterruptedException ex) {
637
638 Thread.currentThread().interrupt();
639 }
640
641
642 final Throwable ex = exceptionHolder.get();
643 if (ex != null) {
644 Throwables.propagateIfPossible(ex, IOException.class);
645 Throwables.propagateIfPossible(ex, RDFHandlerException.class);
646 Throwables.propagateIfPossible(ex, RDFParseException.class);
647 throw new RuntimeException(ex);
648 }
649 }
650
651 private static void readRDFHelper(@Nullable final File file, @Nullable final RDFFormat format,
652 @Nullable final Map<String, String> namespaces, @Nullable final String base,
653 final boolean preserveBNodes, @Nullable final Compression compression,
654 final RDFHandler handler) throws IOException, RDFParseException, RDFHandlerException {
655
656
657 RDFFormat actualFormat = format;
658 if (actualFormat == null) {
659 if (file == null) {
660 throw new IllegalArgumentException("Cannot detect RDF format of STDIN");
661 }
662 actualFormat = RDFFormat.forFileName(file.getName());
663 }
664
665
666 Compression actualCompression = compression;
667 if (actualCompression == null) {
668 actualCompression = file == null ? Compression.NONE : Compression.forFileName(
669 file.getName(), Compression.NONE);
670 }
671
672
673 InputStream stream = null;
674 try {
675 stream = file == null ? System.in : actualCompression.read(Data.getExecutor(), file);
676 readRDF(stream, actualFormat, namespaces, base, preserveBNodes, handler);
677
678 } catch (final Throwable ex) {
679 final String message = "Parsing of " + (file == null ? "STDIN" : file)
680 + " using format " + actualFormat + " and compression " + actualCompression
681 + " failed: " + ex.getMessage();
682 if (ex instanceof IOException) {
683 throw new IOException(message, ex);
684 } else if (ex instanceof RDFParseException) {
685 throw new RDFParseException(message, ex);
686 } else if (ex instanceof RDFHandlerException) {
687 throw new RDFHandlerException(message, ex);
688 }
689 throw new RuntimeException(message, ex);
690 } finally {
691 if (stream != System.in) {
692 Util.closeQuietly(stream);
693 }
694 }
695 }
696
697 public static TupleQueryResultWriter newSparqlTupleWriter(final TupleQueryResultFormat format,
698 final OutputStream stream) {
699
700 final TupleQueryResultWriter writer = QueryResultIO.createWriter(format, stream);
701
702 final WriterConfig config = writer.getWriterConfig();
703 if (format.equals(TupleQueryResultFormat.JSON)) {
704 config.set(BasicWriterSettings.PRETTY_PRINT, true);
705 config.set(BasicWriterSettings.XSD_STRING_TO_PLAIN_LITERAL, true);
706 config.set(BasicWriterSettings.RDF_LANGSTRING_TO_LANG_LITERAL, true);
707
708 } else if (format.equals(TupleQueryResultFormat.SPARQL)) {
709 config.set(BasicWriterSettings.PRETTY_PRINT, true);
710 config.set(BasicWriterSettings.XSD_STRING_TO_PLAIN_LITERAL, true);
711 config.set(BasicWriterSettings.RDF_LANGSTRING_TO_LANG_LITERAL, true);
712 config.set(BasicQueryWriterSettings.ADD_SESAME_QNAME, false);
713 }
714
715 return writer;
716 }
717
718 public static TupleQueryResultParser newSparqlTupleParser(final TupleQueryResultFormat format) {
719
720 final TupleQueryResultParser parser = QueryResultIO.createParser(format);
721 parser.setValueFactory(CompactValueFactory.getInstance());
722
723 return parser;
724 }
725
726 public static BooleanQueryResultWriter newSparqlBooleanWriter(
727 final BooleanQueryResultFormat format, final OutputStream stream) {
728
729 final BooleanQueryResultWriter writer = QueryResultIO.createWriter(format, stream);
730
731 final WriterConfig config = writer.getWriterConfig();
732 if (format.equals(BooleanQueryResultFormat.JSON)) {
733 config.set(BasicWriterSettings.PRETTY_PRINT, true);
734 config.set(BasicWriterSettings.XSD_STRING_TO_PLAIN_LITERAL, true);
735 config.set(BasicWriterSettings.RDF_LANGSTRING_TO_LANG_LITERAL, true);
736
737 } else if (format.equals(TupleQueryResultFormat.SPARQL)) {
738 config.set(BasicWriterSettings.PRETTY_PRINT, true);
739 config.set(BasicWriterSettings.XSD_STRING_TO_PLAIN_LITERAL, true);
740 config.set(BasicWriterSettings.RDF_LANGSTRING_TO_LANG_LITERAL, true);
741 config.set(BasicQueryWriterSettings.ADD_SESAME_QNAME, false);
742 }
743
744 return writer;
745 }
746
747 public static BooleanQueryResultParser newSparqlBooleanReader(
748 final BooleanQueryResultFormat format) {
749
750 final BooleanQueryResultParser parser = QueryResultIO.createParser(format);
751 parser.setValueFactory(Data.getValueFactory());
752
753 return parser;
754 }
755
756 public static RDFHandler newMergingHandler(final RDFHandler handler) {
757 return new MergingHandler(handler);
758 }
759
760 public static RDFHandler newDecouplingHandler(final RDFHandler handler,
761 @Nullable final Integer queueSize) {
762 return new DecouplingHandler(handler, queueSize);
763 }
764
765 public static RDFHandler newNamespaceHandler(final RDFHandler handler,
766 final Map<String, String> namespaces, @Nullable final Integer bufferSize) {
767 return new NamespaceHandler(handler, namespaces, bufferSize);
768 }
769
770 public static RDFHandler newLoggingHandler(final RDFHandler handler, final Logger logger,
771 @Nullable final String startMessage, @Nullable final String progressMessage,
772 @Nullable final String endMessage) {
773
774 Preconditions.checkNotNull(handler);
775 Preconditions.checkNotNull(logger);
776 if (startMessage == null && progressMessage == null && endMessage == null) {
777 return handler;
778 } else {
779 return new LoggingHandler(handler, logger, startMessage, progressMessage, endMessage);
780 }
781 }
782
783 private static final class MergingHandler implements RDFHandler {
784
785 private final RDFHandler handler;
786
787 private int depth;
788
789 MergingHandler(final RDFHandler handler) {
790 this.handler = Preconditions.checkNotNull(handler);
791 this.depth = 0;
792 }
793
794 @Override
795 public synchronized void startRDF() throws RDFHandlerException {
796 if (this.depth == 0) {
797 this.handler.startRDF();
798 }
799 ++this.depth;
800 }
801
802 @Override
803 public synchronized void handleComment(final String comment) throws RDFHandlerException {
804 this.handler.handleComment(comment);
805 }
806
807 @Override
808 public synchronized void handleNamespace(final String prefix, final String uri)
809 throws RDFHandlerException {
810 this.handler.handleNamespace(prefix, uri);
811 }
812
813 @Override
814 public synchronized void handleStatement(final Statement statement)
815 throws RDFHandlerException {
816 this.handler.handleStatement(statement);
817 }
818
819 @Override
820 public synchronized void endRDF() throws RDFHandlerException {
821 --this.depth;
822 if (this.depth == 0) {
823 this.handler.endRDF();
824 }
825 }
826
827 }
828
829 private static final class DecouplingHandler implements RDFHandler {
830
831 private static final int DEFAULT_QUEUE_SIZE = 1024;
832
833 private static final Object EOF = new Object();
834
835 private final RDFHandler handler;
836
837 private final int queueSize;
838
839 private BlockingQueue<Object> queue;
840
841 private AtomicReference<Throwable> exception;
842
843 private Future<?> future;
844
845 private int depth;
846
847 DecouplingHandler(final RDFHandler handler, @Nullable final Integer queueSize) {
848 this.handler = Preconditions.checkNotNull(handler);
849 this.queueSize = MoreObjects.firstNonNull(queueSize, DEFAULT_QUEUE_SIZE);
850 this.queue = null;
851 this.exception = null;
852 this.future = null;
853 this.depth = 0;
854 }
855
856 @Override
857 public synchronized void startRDF() throws RDFHandlerException {
858
859
860 if (this.depth++ > 0) {
861 return;
862 }
863
864
865 this.queue = new ArrayBlockingQueue<Object>(this.queueSize);
866 this.exception = new AtomicReference<Throwable>(null);
867
868
869
870 this.future = Data.getExecutor().submit(new Runnable() {
871
872 @Override
873 public void run() {
874 Object object;
875 try {
876 DecouplingHandler.this.handler.startRDF();
877 while ((object = DecouplingHandler.this.queue.take()) != EOF) {
878 if (object instanceof Statement) {
879 DecouplingHandler.this.handler.handleStatement((Statement) object);
880 } else if (object instanceof Namespace) {
881 final Namespace ns = (Namespace) object;
882 DecouplingHandler.this.handler.handleNamespace(ns.getPrefix(),
883 ns.getName());
884 } else if (object instanceof String) {
885 DecouplingHandler.this.handler.handleComment((String) object);
886 }
887 }
888 DecouplingHandler.this.handler.endRDF();
889 } catch (final Throwable ex) {
890 DecouplingHandler.this.exception.set(ex);
891 }
892 }
893
894 });
895 }
896
897 @Override
898 public void handleComment(final String comment) throws RDFHandlerException {
899
900
901 put(comment);
902 propagateOnFailure();
903 }
904
905 @Override
906 public void handleNamespace(final String prefix, final String uri)
907 throws RDFHandlerException {
908
909
910 put(new NamespaceImpl(prefix, uri));
911 propagateOnFailure();
912 }
913
914 @Override
915 public void handleStatement(final Statement statement) throws RDFHandlerException {
916
917
918 put(statement);
919 propagateOnFailure();
920 }
921
922 @Override
923 public synchronized void endRDF() throws RDFHandlerException {
924
925
926 if (--this.depth > 0) {
927 return;
928 }
929
930
931 put(EOF);
932
933
934 try {
935 this.future.get();
936 } catch (final Throwable ex) {
937 Throwables.propagateIfPossible(ex, RDFHandlerException.class);
938 Throwables.propagate(ex);
939 }
940
941
942 propagateOnFailure();
943 }
944
945 private void put(final Object object) throws RDFHandlerException {
946 try {
947 this.queue.put(object);
948 } catch (final InterruptedException ex) {
949 throw new RDFHandlerException(ex);
950 }
951 }
952
953 private void propagateOnFailure() throws RDFHandlerException {
954 final Throwable ex = this.exception.get();
955 if (ex != null) {
956 Throwables.propagateIfPossible(ex, RDFHandlerException.class);
957 Throwables.propagate(ex);
958 }
959 }
960
961 }
962
963 private static final class NamespaceHandler implements RDFHandler {
964
965 private static final int DEFAULT_BUFFER_SIZE = 1024;
966
967 private final RDFHandler handler;
968
969 private final Map<String, String> namespaces;
970
971 private final int bufferSize;
972
973 private List<Statement> buffer;
974
975 private boolean buffering;
976
977 private Map<String, String> bindings;
978
979 NamespaceHandler(final RDFHandler handler, final Map<String, String> namespaces,
980 @Nullable final Integer bufferSize) {
981 this.handler = Preconditions.checkNotNull(handler);
982 this.namespaces = Preconditions.checkNotNull(namespaces);
983 this.bufferSize = MoreObjects.firstNonNull(bufferSize, DEFAULT_BUFFER_SIZE);
984 this.buffer = null;
985 this.buffering = false;
986 this.bindings = null;
987 }
988
989 @Override
990 public void startRDF() throws RDFHandlerException {
991 this.bindings = Maps.newHashMap();
992 this.buffer = Lists.newArrayListWithCapacity(this.bufferSize);
993 this.buffering = true;
994 this.handler.startRDF();
995 }
996
997 @Override
998 public void handleComment(final String comment) throws RDFHandlerException {
999 flush();
1000 this.handler.handleComment(comment);
1001 }
1002
1003 @Override
1004 public void handleNamespace(final String prefix, final String uri)
1005 throws RDFHandlerException {
1006 if (this.buffering) {
1007 this.bindings.put(uri, prefix);
1008 }
1009 }
1010
1011 @Override
1012 public void handleStatement(final Statement statement) throws RDFHandlerException {
1013 if (this.buffering) {
1014 extractNamespace(statement.getSubject());
1015 extractNamespace(statement.getPredicate());
1016 extractNamespace(statement.getObject());
1017 extractNamespace(statement.getContext());
1018 this.buffer.add(statement);
1019 if (this.buffer.size() == this.bufferSize) {
1020 flush();
1021 }
1022 } else {
1023 this.handler.handleStatement(statement);
1024 }
1025 }
1026
1027 @Override
1028 public void endRDF() throws RDFHandlerException {
1029 flush();
1030 this.handler.endRDF();
1031 }
1032
1033 private void extractNamespace(final Value value) {
1034 if (value instanceof URI) {
1035 final String ns = ((URI) value).getNamespace();
1036 this.bindings.put(ns, this.bindings.get(ns));
1037 } else if (value instanceof Literal) {
1038 extractNamespace(((Literal) value).getDatatype());
1039 }
1040 }
1041
1042 private void flush() throws RDFHandlerException {
1043 if (!this.buffering) {
1044 return;
1045 }
1046 for (final String namespace : Ordering.natural().sortedCopy(this.bindings.keySet())) {
1047 String prefix = this.bindings.get(namespace);
1048 if (prefix == null) {
1049 prefix = Data.namespaceToPrefix(namespace, this.namespaces);
1050 }
1051 if (prefix != null) {
1052 this.handler.handleNamespace(prefix, namespace);
1053 }
1054 }
1055 for (final Statement statement : this.buffer) {
1056 this.handler.handleStatement(statement);
1057 }
1058 this.bindings = null;
1059 this.buffer = null;
1060 this.buffering = false;
1061 }
1062
1063 }
1064
1065 private static final class LoggingHandler implements RDFHandler {
1066
1067 private final RDFHandler handler;
1068
1069 @Nullable
1070 private final Logger logger;
1071
1072 @Nullable
1073 private final String startMessage;
1074
1075 @Nullable
1076 private final String progressMessage;
1077
1078 @Nullable
1079 private final String endMessage;
1080
1081 private long totalTs;
1082
1083 private long totalCounter = 0;
1084
1085 private long lastTs;
1086
1087 private long lastCounter = 0;
1088
1089 LoggingHandler(final RDFHandler handler, final Logger logger,
1090 @Nullable final String startMessage, @Nullable final String progressMessage,
1091 @Nullable final String endMessage) {
1092 this.handler = Preconditions.checkNotNull(handler);
1093 this.logger = logger;
1094 this.startMessage = startMessage;
1095 this.progressMessage = progressMessage;
1096 this.endMessage = endMessage;
1097 }
1098
1099 @Override
1100 public void startRDF() throws RDFHandlerException {
1101 this.handler.startRDF();
1102 this.totalTs = System.currentTimeMillis();
1103 this.lastTs = this.totalTs;
1104 if (this.startMessage != null) {
1105 this.logger.info(this.startMessage);
1106 }
1107 }
1108
1109 @Override
1110 public void handleComment(final String comment) throws RDFHandlerException {
1111 this.handler.handleComment(comment);
1112 }
1113
1114 @Override
1115 public void handleNamespace(final String prefix, final String uri)
1116 throws RDFHandlerException {
1117 this.handler.handleNamespace(prefix, uri);
1118 }
1119
1120 @Override
1121 public void handleStatement(final Statement statement) throws RDFHandlerException {
1122 this.handler.handleStatement(statement);
1123 ++this.totalCounter;
1124 if (this.progressMessage != null && this.totalCounter % 1000 == 0) {
1125 final long ts = System.currentTimeMillis();
1126 if (ts - this.lastTs >= 1000) {
1127 final long throughput = (this.totalCounter - this.lastCounter) * 1000
1128 / (ts - this.lastTs);
1129 final long avgThroughput = this.totalCounter * 1000 / (ts - this.totalTs);
1130 this.lastTs = ts;
1131 this.lastCounter = this.totalCounter;
1132 this.logger.info(String.format(this.progressMessage, this.totalCounter,
1133 throughput, avgThroughput));
1134 }
1135 }
1136 }
1137
1138 @Override
1139 public void endRDF() throws RDFHandlerException {
1140 if (this.endMessage != null) {
1141 final long ts = System.currentTimeMillis();
1142 final long avgThroughput = this.totalCounter * 1000 / (ts - this.totalTs + 1);
1143 this.logger.info(String.format(this.endMessage, this.totalCounter, avgThroughput));
1144 }
1145 this.handler.endRDF();
1146 }
1147
1148 }
1149
1150 private RDFUtil() {
1151 }
1152
1153 {
1154 TQL.register();
1155 System.setProperty("entityExpansionLimit", "" + Integer.MAX_VALUE);
1156 }
1157
1158 }