1 package eu.fbk.knowledgestore.populator.rdf;
2
3 import java.io.File;
4 import java.io.IOException;
5 import java.io.InputStreamReader;
6 import java.io.OutputStream;
7 import java.io.PrintStream;
8 import java.io.PrintWriter;
9 import java.util.Collection;
10 import java.util.Comparator;
11 import java.util.List;
12 import java.util.Map;
13
14 import javax.annotation.Nullable;
15
16 import com.google.common.base.Charsets;
17 import com.google.common.base.Function;
18 import com.google.common.base.Throwables;
19 import com.google.common.collect.ImmutableList;
20 import com.google.common.collect.ImmutableSet;
21 import com.google.common.collect.Iterables;
22 import com.google.common.collect.Lists;
23 import com.google.common.collect.Maps;
24 import com.google.common.io.ByteStreams;
25 import com.google.common.io.CharStreams;
26 import com.google.common.io.Files;
27
28 import org.apache.commons.cli.CommandLine;
29 import org.apache.commons.cli.GnuParser;
30 import org.apache.commons.cli.HelpFormatter;
31 import org.apache.commons.cli.Option;
32 import org.apache.commons.cli.OptionBuilder;
33 import org.apache.commons.cli.Options;
34 import org.apache.commons.cli.ParseException;
35 import org.openrdf.model.Statement;
36 import org.openrdf.model.URI;
37 import org.openrdf.rio.RDFFormat;
38 import org.openrdf.rio.RDFHandler;
39 import org.openrdf.rio.RDFHandlerException;
40 import org.openrdf.rio.helpers.RDFHandlerBase;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 import ch.qos.logback.classic.LoggerContext;
45 import ch.qos.logback.classic.joran.JoranConfigurator;
46 import ch.qos.logback.core.joran.spi.JoranException;
47
48 import eu.fbk.knowledgestore.Operation;
49 import eu.fbk.knowledgestore.Outcome;
50 import eu.fbk.knowledgestore.Session;
51 import eu.fbk.knowledgestore.client.Client;
52 import eu.fbk.knowledgestore.data.Criteria;
53 import eu.fbk.knowledgestore.data.Data;
54 import eu.fbk.knowledgestore.data.Handler;
55 import eu.fbk.knowledgestore.data.Record;
56 import eu.fbk.knowledgestore.data.Stream;
57 import eu.fbk.knowledgestore.internal.Compression;
58 import eu.fbk.knowledgestore.internal.Util;
59 import eu.fbk.knowledgestore.internal.rdf.RDFUtil;
60 import eu.fbk.knowledgestore.vocabulary.CKR;
61 import eu.fbk.knowledgestore.vocabulary.KS;
62
63 public final class RDFPopulator {
64
65 private static final String VERSION = Util.getVersion("eu.fbk.knowledgestore",
66 "ks-populator-rdf", "devel");
67
68 private static final String HEADER = Util.getResource(RDFPopulator.class, "header").trim();
69
70 private static final String FOOTER = Util.getResource(RDFPopulator.class, "footer").trim();
71
72 private static final String DISCLAIMER = Util.getResource(RDFPopulator.class, "disclaimer")
73 .trim();
74
75 private static final Logger MAIN_LOGGER = LoggerFactory.getLogger(RDFPopulator.class);
76
77 private static final Logger STATUS_LOGGER = LoggerFactory.getLogger("status");
78
79 public static void main(final String... args) {
80 try {
81
82 final CommandLine cmd = parseCommandLine(args);
83
84
85 final String base = cmd.getOptionValue('b');
86 final int parallelism = !cmd.hasOption('p') ? 1 :
87 Integer.parseInt(cmd.getOptionValue('p'));
88 final boolean listStdin = cmd.hasOption('@');
89 final String listFile = cmd.getOptionValue('T');
90 final List<String> sourceFiles = cmd.getArgList();
91 final boolean sourceStdin = !cmd.hasOption('@') && !cmd.hasOption('T')
92 && cmd.getArgs().length == 0;
93 final String sourceFormat = cmd.getOptionValue('s');
94 final String errorFile = cmd.getOptionValue('e');
95 final String target = cmd.getOptionValue('o');
96 final String targetFormat = cmd.getOptionValue('t');
97 final boolean validate = !cmd.hasOption('i');
98 final Criteria criteria = !cmd.hasOption('c') ? Criteria.overwrite() :
99 Criteria.parse(cmd.getOptionValue('c'), Data.getNamespaceMap());
100 final URI globalURI = cmd.hasOption('g') ? (URI) Data.parseValue(
101 cmd.getOptionValue('g'), Data.getNamespaceMap()) : CKR.GLOBAL;
102 final String credentials = cmd.getOptionValue('u');
103
104
105 String username = null;
106 String password = null;
107 if (credentials != null) {
108 final int index = credentials.indexOf(':');
109 username = credentials.substring(0, index < 0 ? credentials.length() : index);
110 password = index < 0 ? null : credentials.substring(index + 1);
111 }
112
113
114 final List<File> sources = select(listStdin, listFile, sourceFiles, sourceStdin);
115 for (final File file : sources) {
116 checkFileParseable(file, sourceFormat);
117 }
118
119
120 final Stream<Record> axioms = decode(sources, globalURI, parallelism, base,
121 sourceFormat);
122
123
124 if (target == null) {
125
126 disableLogging();
127 final OutputStream out = System.out;
128 System.setOut(new PrintStream(ByteStreams.nullOutputStream()));
129 write(axioms, out, targetFormat);
130
131 } else if (!target.startsWith("http://") && !target.startsWith("https://")) {
132
133 write(axioms, new File(target), targetFormat);
134
135 } else {
136
137 Session session = null;
138 final Client client = Client.builder(target).maxConnections(2)
139 .validateServer(validate).build();
140 try {
141 session = client.newSession(username, password);
142 final Stream<Record> rejected = upload(session, criteria, axioms);
143 if (errorFile == null) {
144 write(rejected, System.err, targetFormat);
145 } else {
146 write(rejected, new File(errorFile), targetFormat);
147 }
148 } finally {
149 Util.closeQuietly(session);
150 client.close();
151 }
152 }
153
154
155 System.exit(0);
156
157 } catch (final IllegalArgumentException ex) {
158
159 ex.printStackTrace();
160 System.err.println("INVALID INPUT. " + ex.getMessage());
161 System.exit(-1);
162
163 } catch (final ParseException ex) {
164
165 System.err.println("SYNTAX ERROR. " + ex.getMessage());
166 System.exit(-1);
167
168 } catch (final Throwable ex) {
169
170 System.err.println("EXECUTION FAILED. " + ex.getMessage() + "\n");
171 ex.printStackTrace();
172 System.exit(-2);
173 }
174 }
175
176 private static CommandLine parseCommandLine(final String... args) throws ParseException {
177
178
179 final List<Option> inputOpts = Lists.newArrayList();
180 newOption(inputOpts, '@', "files-from-stdin", 0, false, null,
181 "read names of input files from STDIN");
182 newOption(inputOpts, 'T', "files-from", 1, false, "FILE",
183 "read names of input files from FILE");
184 newOption(inputOpts, 's', "source-format", 1, false, "FMT",
185 "use input RDF format/compression FMT (eg: ttl.gz; default: "
186 + "autodetect based on file name)");
187 newOption(inputOpts, 'b', "base", 1, false, "URI",
188 "base URI for resolving parsed relative URIs");
189 newOption(inputOpts, 'p', "parallel-files", 1, false, "N",
190 "parse at most N files in parallel (default: 1)");
191
192
193 final List<Option> extractOpts = Lists.newArrayList();
194 newOption(extractOpts, 'g', "global-uri", 1, false, "URI",
195 "use URI in place of ckr:global (default: ckr:global)");
196 newOption(extractOpts, 'd', "default", 1, false, "FILE",
197 "augment axioms with default metadata/context in FILE");
198
199
200 final List<Option> outputOpts = Lists.newArrayList();
201 newOption(outputOpts, 'o', "output", 1, false, "FILE|URL",
202 "send axioms to FILE | server URL (default: STDOUT)");
203 newOption(outputOpts, 'e', "error", 1, false, "FILE",
204 "write non-uploaded axioms to FILE (default: STDERR)");
205 newOption(outputOpts, 't', "target-format", 1, false, "FMT",
206 "use output file RDF format/compression FMT (e.g., ttl.gz; "
207 + "default: autodetect based on file name)");
208 newOption(outputOpts, 'u', "user", 1, false, "user[:pwd]",
209 "upload using login user:pwd (default: anonymous)");
210 newOption(outputOpts, 'i', "ignore-certificate", 0, false, null,
211 "don't check server certificate (default: check)");
212 newOption(outputOpts, 'c', "criteria", 1, false, "C",
213 "upload with merge criteria C (default: overwrite *)");
214
215
216
217
218 final List<Option> miscOpts = Lists.newArrayList();
219 newOption(miscOpts, 'h', "help", 0, false, null, "print this help message and exit");
220 newOption(miscOpts, 'v', "version", 0, false, null, "print version information and exit");
221
222
223 final List<Option> allOpts = ImmutableList.copyOf(Iterables.concat(inputOpts, extractOpts,
224 outputOpts, miscOpts));
225
226
227 final CommandLine cmd = new GnuParser().parse(newOptions(allOpts), args);
228
229
230 if (cmd.hasOption('h')) {
231 final HelpFormatter formatter = new HelpFormatter();
232 formatter.setOptionComparator(new Comparator<Option>() {
233
234 @Override
235 public int compare(final Option option1, final Option option2) {
236 return allOpts.indexOf(option1) - allOpts.indexOf(option2);
237 }
238
239 });
240 final PrintWriter out = new PrintWriter(System.out);
241 formatter.printUsage(out, 80, "ksrdf [-o URL|FILE] [OPTIONS] [INPUT_FILE ...]");
242 out.println();
243 formatter.printWrapped(out, 80, HEADER);
244 formatter.printWrapped(out, 80, DISCLAIMER);
245 out.println("\nInput options:");
246 formatter.printOptions(out, 80, newOptions(inputOpts), 2, 2);
247 out.println("\nExtraction options:");
248 formatter.printOptions(out, 80, newOptions(extractOpts), 2, 5);
249 out.println("\nOutput options:");
250 formatter.printOptions(out, 80, newOptions(outputOpts), 2, 2);
251 out.println("\nMiscellaneous options:");
252 formatter.printOptions(out, 80, newOptions(miscOpts), 2, 14);
253 out.println();
254 out.println(FOOTER);
255 out.flush();
256 System.exit(0);
257
258 } else if (cmd.hasOption('v')) {
259 System.out.println(String.format(
260 "ksrdf (FBK KnowledgeStore) %s\njava %s bit (%s) %s\n%s", VERSION,
261 System.getProperty("sun.arch.data.model"), System.getProperty("java.vendor"),
262 System.getProperty("java.version"), DISCLAIMER));
263 System.exit(0);
264 }
265
266
267 return cmd;
268 }
269
270 private static List<File> select(final boolean listStdin, final String listFile,
271 final List<String> sourceFiles, final boolean sourceStdin) throws IOException {
272
273
274 final List<String> inputs = Lists.newArrayList(sourceFiles);
275
276
277 if (listFile != null) {
278 final File file = new File(listFile);
279 checkFileExist(file);
280 for (final String line : Files.readLines(file, Charsets.UTF_8)) {
281 final String trimmedLine = line.trim();
282 if (!"".equals(trimmedLine)) {
283 inputs.add(line);
284 }
285 }
286 }
287
288
289 if (listStdin) {
290 for (final String line : CharStreams.readLines(new InputStreamReader(System.in))) {
291 final String trimmedLine = line.trim();
292 if (!"".equals(trimmedLine)) {
293 inputs.add(line);
294 }
295 }
296 }
297
298
299 final List<File> files = Lists.newArrayListWithCapacity(inputs.size());
300 for (final String input : inputs) {
301 files.add(new File(input));
302 }
303
304
305 if (sourceStdin) {
306 files.add(null);
307 }
308 return files;
309 }
310
311 private static Stream<Record> decode(final List<File> files, final URI globalURI,
312 final int parallelism, @Nullable final String base,
313 @Nullable final String formatString) {
314
315
316 final Compression compression = detectCompression(formatString, null);
317 final RDFFormat format = detectRDFFormat(formatString, null);
318
319
320 return new Stream<Record>() {
321
322 @Override
323 protected void doToHandler(final Handler<? super Record> handler) throws Throwable {
324
325
326 final Decoder decoder = new Decoder(handler, globalURI);
327
328
329 RDFHandler rdfHandler = new RDFHandlerBase() {
330
331 @Override
332 public void handleStatement(final Statement stmt) throws RDFHandlerException {
333 emit(stmt);
334 }
335
336 @Override
337 public void endRDF() throws RDFHandlerException {
338 emit(null);
339 }
340
341 private void emit(@Nullable final Statement stmt) throws RDFHandlerException {
342 try {
343 decoder.handle(stmt);
344 } catch (final Throwable ex) {
345 Throwables.propagateIfPossible(ex, RDFHandlerException.class);
346 Throwables.propagate(ex);
347 }
348 }
349
350 };
351
352
353 rdfHandler = RDFUtil.newLoggingHandler(rdfHandler, STATUS_LOGGER, null,
354 "parsing: %d triples (%d triples/s, %d triples/s avg)", null);
355
356
357 rdfHandler = RDFUtil.newDecouplingHandler(rdfHandler, null);
358
359
360 final Map<File, RDFHandler> map = Maps.newLinkedHashMap();
361 for (final File file : files) {
362 final RDFHandler fileHandler = RDFUtil.newLoggingHandler(rdfHandler,
363 MAIN_LOGGER, null, null,
364 "parsed " + (file == null ? "STDIN" : file.getAbsolutePath())
365 + ": %d triples, (%d triples/s avg)");
366 map.put(file, fileHandler);
367 }
368 rdfHandler.startRDF();
369 RDFUtil.readRDF(map, format, null, base, false, compression, parallelism);
370 rdfHandler.endRDF();
371 STATUS_LOGGER.info("");
372 }
373
374 };
375 }
376
377 private static Stream<Record> upload(final Session session, final Criteria criteria,
378 final Stream<Record> axioms) {
379
380 return axioms.transform(null, new Function<Handler<Record>, Handler<Record>>() {
381
382 @Override
383 public Handler<Record> apply(final Handler<Record> handler) {
384 return new UploadHandler(session, criteria, handler);
385 }
386
387 });
388 }
389
390 private static void write(final Stream<Record> axioms, final OutputStream stream,
391 @Nullable final String formatString) throws IOException {
392
393
394 final Compression compression = detectCompression(formatString, Compression.NONE);
395 final RDFFormat format = detectRDFFormat(formatString, null);
396 if (format == null) {
397 if (formatString == null) {
398 throw new IllegalArgumentException(
399 "Must specify output format (-t) if writing to STDOUT");
400 } else {
401 throw new IllegalArgumentException("Cannot detect RDF format for " + formatString);
402 }
403 }
404
405
406 final OutputStream actualStream = compression.write(Data.getExecutor(), stream);
407
408
409 RDFUtil.writeRDF(actualStream, format, Data.getNamespaceMap(), null,
410 Record.encode(axioms, ImmutableSet.of(KS.AXIOM)));
411 }
412
413 private static void write(final Stream<Record> axioms, final File file,
414 @Nullable final String formatString) throws IOException {
415
416
417 Compression compression = detectCompression(file.getName(), null);
418 if (compression == null) {
419 compression = detectCompression(formatString, Compression.NONE);
420 }
421 RDFFormat format = detectRDFFormat(file.getName(), null);
422 if (format == null) {
423 format = detectRDFFormat(formatString, null);
424 }
425 if (format == null) {
426 throw new IllegalArgumentException("Cannot detect RDF format of " + file);
427 }
428
429
430 final OutputStream actualStream = compression.write(Data.getExecutor(), file);
431
432
433 try {
434 RDFUtil.writeRDF(actualStream, format, Data.getNamespaceMap(), null,
435 Record.encode(axioms, ImmutableSet.of(KS.AXIOM)));
436 } finally {
437 Util.closeQuietly(actualStream);
438 }
439 }
440
441 private static Options newOptions(final Iterable<? extends Option> options) {
442 final Options result = new Options();
443 for (final Object option : options) {
444 result.addOption((Option) option);
445 }
446 return result;
447 }
448
449 private static void newOption(final Collection<? super Option> options,
450 @Nullable final Character shortName, final String longName, final int argCount,
451 final boolean argOpt, @Nullable final String argName, final String description) {
452
453 OptionBuilder.withLongOpt(longName);
454 OptionBuilder.withDescription(description);
455 if (argCount != 0) {
456 OptionBuilder.withArgName(argName);
457 if (argOpt) {
458 if (argCount == 1) {
459 OptionBuilder.hasOptionalArg();
460 } else if (argCount > 1) {
461 OptionBuilder.hasOptionalArgs(argCount);
462 } else {
463 OptionBuilder.hasOptionalArgs();
464 }
465 } else {
466 if (argCount == 1) {
467 OptionBuilder.hasArg();
468 } else if (argCount > 1) {
469 OptionBuilder.hasArgs(argCount);
470 } else {
471 OptionBuilder.hasArgs();
472 }
473 }
474 }
475 options.add(shortName == null ? OptionBuilder.create() : OptionBuilder.create(shortName));
476 }
477
478 private static RDFFormat detectRDFFormat(@Nullable final String string,
479 final RDFFormat fallback) {
480 return string == null ? fallback : RDFFormat.forFileName("dummy." + string.trim(),
481 fallback);
482 }
483
484 private static Compression detectCompression(@Nullable final String string,
485 final Compression fallback) {
486 return string == null ? fallback : Compression.forFileName("dummy." + string.trim(),
487 fallback);
488 }
489
490 private static void checkFileExist(@Nullable final File file) {
491 if (file == null) {
492 return;
493 } else if (!file.exists()) {
494 throw new IllegalArgumentException("File '" + file + "' does not exist");
495 } else if (file.isDirectory()) {
496 throw new IllegalArgumentException("Path '" + file + "' denotes a directory");
497 }
498 }
499
500 private static void checkFileParseable(@Nullable final File file,
501 @Nullable final String formatString) {
502 if (file == null) {
503 if (formatString == null) {
504 throw new IllegalArgumentException("Cannot detect RDF format "
505 + "and compression of STDIN: please specify option -s");
506 }
507 return;
508 }
509 checkFileExist(file);
510 final RDFFormat defaultFormat = detectRDFFormat(formatString, null);
511 final Compression defaultCompression = detectCompression(formatString, null);
512 final RDFFormat format = RDFFormat.forFileName(file.getName());
513 if (format == null && defaultFormat == null) {
514 throw new IllegalArgumentException("Unknown RDF format for file " + file);
515 } else if (format != null && defaultFormat != null && !format.equals(defaultFormat)) {
516 System.err.println("Warning: detected RDF format for file " + file
517 + " doesn't match specified format");
518 }
519 final Compression compression = Compression.forFileName(file.getName(), Compression.NONE);
520 if (defaultCompression != null && !compression.equals(defaultCompression)) {
521 System.err.println("Warning: detected compression format for file " + file
522 + " doesn't match specified format");
523 }
524 }
525
526 private static void disableLogging() {
527 final LoggerContext context = (LoggerContext) LoggerFactory.getILoggerFactory();
528 try {
529 final JoranConfigurator configurator = new JoranConfigurator();
530 configurator.setContext(context);
531 context.reset();
532 configurator.doConfigure(RDFPopulator.class.getResource("logback.disabled.xml"));
533 } catch (final JoranException je) {
534
535 }
536 }
537
538 private static final class UploadHandler implements Handler<Record> {
539
540 private static final int BUFFER_SIZE = 1024;
541
542 private final Session session;
543
544 private final Criteria criteria;
545
546 private final Handler<Record> errorHandler;
547
548 private final Map<URI, Record> buffer;
549
550 UploadHandler(final Session session, final Criteria criteria,
551 final Handler<Record> errorHandler) {
552 this.session = session;
553 this.criteria = criteria;
554 this.errorHandler = errorHandler;
555 this.buffer = Maps.newHashMapWithExpectedSize(BUFFER_SIZE);
556 }
557
558 @Override
559 public void handle(final Record axiom) throws Throwable {
560 if (axiom == null) {
561 flush(true);
562 } else {
563 this.buffer.put(axiom.getID(), axiom);
564 if (this.buffer.size() == BUFFER_SIZE) {
565 flush(false);
566 }
567 }
568 }
569
570 private void flush(final boolean done) throws Throwable {
571 if (!this.buffer.isEmpty()) {
572 try {
573 final Operation.Merge operation = this.session.merge(KS.AXIOM)
574 .criteria(this.criteria).records(this.buffer.values());
575 operation.exec(new Handler<Outcome>() {
576
577 @Override
578 public void handle(final Outcome outcome) throws Throwable {
579 if (outcome.getStatus().isOK()) {
580 UploadHandler.this.buffer.remove(outcome.getObjectID());
581 }
582 }
583
584 });
585 } catch (final Throwable ex) {
586 MAIN_LOGGER.error("Upload failure: " + ex.getMessage(), ex);
587 }
588 for (final Record record : this.buffer.values()) {
589 this.errorHandler.handle(record);
590 }
591 }
592 if (done) {
593 this.errorHandler.handle(null);
594 }
595 }
596
597 }
598
599 }