1 package eu.fbk.knowledgestore.server;
2
3 import com.google.common.base.*;
4 import com.google.common.collect.AbstractIterator;
5 import com.google.common.collect.*;
6 import com.google.common.hash.Hashing;
7 import com.google.common.hash.HashingOutputStream;
8 import com.google.common.html.HtmlEscapers;
9 import com.google.common.io.ByteStreams;
10 import com.google.common.io.CountingOutputStream;
11 import com.google.common.io.FileBackedOutputStream;
12 import com.google.common.net.MediaType;
13 import com.google.common.net.UrlEscapers;
14 import eu.fbk.knowledgestore.*;
15 import eu.fbk.knowledgestore.Outcome.Status;
16 import eu.fbk.knowledgestore.data.*;
17 import eu.fbk.knowledgestore.datastore.DataStore;
18 import eu.fbk.knowledgestore.datastore.DataTransaction;
19 import eu.fbk.knowledgestore.filestore.FileStore;
20 import eu.fbk.knowledgestore.internal.rdf.RDFUtil;
21 import eu.fbk.knowledgestore.triplestore.TripleStore;
22 import eu.fbk.knowledgestore.triplestore.TripleTransaction;
23 import eu.fbk.knowledgestore.vocabulary.KS;
24 import eu.fbk.knowledgestore.vocabulary.NFO;
25 import eu.fbk.knowledgestore.vocabulary.NIE;
26 import info.aduna.iteration.CloseableIteration;
27 import org.openrdf.model.Statement;
28 import org.openrdf.model.URI;
29 import org.openrdf.model.ValueFactory;
30 import org.openrdf.model.vocabulary.RDF;
31 import org.openrdf.query.BindingSet;
32 import org.openrdf.query.Dataset;
33 import org.openrdf.query.QueryEvaluationException;
34 import org.openrdf.query.algebra.TupleExpr;
35 import org.openrdf.query.impl.DatasetImpl;
36 import org.openrdf.query.parser.ParsedQuery;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 import javax.annotation.Nullable;
41 import java.io.*;
42 import java.util.Date;
43 import java.util.List;
44 import java.util.Map;
45 import java.util.Set;
46 import java.util.concurrent.atomic.AtomicLong;
47
48
49
50 public final class Server extends AbstractKnowledgeStore {
51
52 private static final Logger LOGGER = LoggerFactory.getLogger(Server.class);
53
54 private static final int DEFAULT_CHUNK_SIZE = 1024;
55
56 private static final int DEFAULT_BUFFER_SIZE = 16 * 1024 * 1024;
57
58 private static long fileVersionCounter = 0L;
59
60 private final FileStore fileStore;
61
62 private final DataStore dataStore;
63
64 private final TripleStore tripleStore;
65
66 private final int chunkSize;
67
68 private final int bufferSize;
69
70 private Server(final Builder builder) {
71
72 boolean success = false;
73
74 this.fileStore = Preconditions.checkNotNull(builder.fileStore);
75 this.dataStore = Preconditions.checkNotNull(builder.dataStore);
76 this.tripleStore = Preconditions.checkNotNull(builder.tripleStore);
77
78 try {
79 this.chunkSize = MoreObjects.firstNonNull(builder.chunkSize, DEFAULT_CHUNK_SIZE);
80 this.bufferSize = MoreObjects.firstNonNull(builder.bufferSize, DEFAULT_BUFFER_SIZE);
81 Preconditions.checkArgument(this.chunkSize > 0);
82 Preconditions.checkArgument(this.bufferSize > 0);
83
84
85 try {
86 this.fileStore.init();
87 this.dataStore.init();
88 this.tripleStore.init();
89 } catch (final Exception ex) {
90 throw new Error(ex);
91 }
92
93 success = true;
94
95 } finally {
96 if (!success) {
97 closeQuietly(this.fileStore);
98 closeQuietly(this.dataStore);
99 closeQuietly(this.tripleStore);
100 }
101 }
102 }
103
104 @Override
105 protected Session doNewSession(@Nullable final String username, @Nullable final String password) {
106 return new SessionImpl(username, password);
107 }
108
109 @Override
110 protected void doClose() {
111 closeQuietly(this.fileStore);
112 closeQuietly(this.dataStore);
113 closeQuietly(this.tripleStore);
114 }
115
116 private static void closeQuietly(@Nullable final Closeable closeable) {
117 if (closeable != null) {
118 try {
119 closeable.close();
120 } catch (final Throwable ex) {
121 LOGGER.error(
122 "Error closing " + closeable.getClass().getSimpleName() + ": "
123 + ex.getMessage(), ex);
124 }
125 }
126 }
127
128 private final class SessionImpl extends AbstractSession {
129
130 SessionImpl(@Nullable final String username, @Nullable final String password) {
131 super(Data.newNamespaceMap(Data.newNamespaceMap(), Data.getNamespaceMap()), username,
132 password);
133 }
134
135 private void check(final boolean condition, final Status status,
136 @Nullable final URI objectID, @Nullable final String message, final Object... args)
137 throws OperationException {
138 if (!condition) {
139 throw newException(status, objectID,
140 message == null ? null : String.format(message, args));
141 }
142 }
143
144 private Outcome newOutcome(@Nullable final Status status, @Nullable final URI objectID,
145 @Nullable final String message, final Object... args) {
146 return Outcome.create(status == null ? Status.ERROR_UNEXPECTED : status,
147 getInvocationID(), objectID,
148 message == null ? null : String.format(message, args));
149 }
150
151 private OperationException newException(@Nullable final Status status,
152 @Nullable final URI objectID, @Nullable final String message,
153 final Throwable... causes) {
154 return new OperationException(newOutcome(status, objectID, message), causes);
155 }
156
157 private <T> Stream<T> attach(final DataTransaction transaction, final Stream<T> stream) {
158 return stream.onClose(new Closeable() {
159
160 @Override
161 public void close() throws IOException {
162 transaction.end(true);
163 }
164
165 });
166 }
167
168 private <T> Stream<T> attach(final TripleTransaction transaction, final Stream<T> stream) {
169 return stream.onClose(new Closeable() {
170
171 @Override
172 public void close() throws IOException {
173 transaction.end(true);
174 }
175
176 });
177 }
178
179 @Override
180 protected Representation doDownload(@Nullable final Long timeout, final URI resourceID,
181 @Nullable final Set<String> mimeTypes, final boolean useCaches) throws Throwable {
182
183
184
185
186 final DataTransaction transaction = Server.this.dataStore.begin(true);
187
188 try {
189
190 final Record resource = transaction.lookup(KS.RESOURCE,
191 ImmutableSet.of(resourceID), ImmutableSet.of(KS.STORED_AS)).getUnique();
192
193
194 if (resource == null) {
195 return null;
196 }
197
198
199 final Record metadata = resource.getUnique(KS.STORED_AS, Record.class);
200 if (metadata == null) {
201 return null;
202 }
203
204
205 final String fileName = metadata.getUnique(NFO.FILE_NAME, String.class);
206 check(fileName != null, null, resourceID, "No filename stored for resource (!)");
207
208
209 String transformToType = null;
210 final String fileTypeString = metadata.getUnique(NIE.MIME_TYPE, String.class);
211 if (mimeTypes != null) {
212 check(fileTypeString != null, Status.ERROR_NOT_ACCEPTABLE, resourceID,
213 "No MIME type stored for file %s", fileName);
214 boolean compatible = false;
215 final MediaType fileType = MediaType.parse(fileTypeString);
216 for (final String type : mimeTypes) {
217 try {
218 final boolean matches = fileType.is(MediaType.parse(type)
219 .withoutParameters());
220 final boolean transform = !matches && !compatible
221 && canTransform(fileTypeString, type);
222 compatible = compatible || matches || transform;
223 transformToType = transform ? type : null;
224 } catch (final IllegalArgumentException ex) {
225
226 }
227 }
228 check(compatible, Status.ERROR_NOT_ACCEPTABLE, resourceID,
229 "Incompatible MIME type %s for file %s", fileType, fileName);
230 }
231
232
233 InputStream stream = Server.this.fileStore.read(fileName);
234 check(stream != null, null, resourceID, "File %s missing for resource %s (!)",
235 fileName);
236
237 if (transformToType != null) {
238
239 final String ext = Iterables.getFirst(
240 Data.mimeTypeToExtensions(transformToType), "bin");
241 final String name = MoreObjects.firstNonNull(
242 metadata.getUnique(NFO.FILE_NAME, String.class, null), "download")
243 + "." + ext;
244 stream = transform(fileTypeString, transformToType, stream);
245 final Representation representation = Representation.create(stream);
246 final Record meta = representation.getMetadata();
247 meta.setID(metadata.getID());
248 meta.set(NIE.MIME_TYPE, transformToType);
249 meta.set(NFO.FILE_NAME, name);
250 meta.set(NFO.FILE_LAST_MODIFIED, metadata.getUnique(NFO.FILE_LAST_MODIFIED));
251 return representation;
252
253 } else {
254
255 final Representation representation = Representation.create(stream);
256 representation.getMetadata().setID(metadata.getID());
257 for (final URI property : metadata.getProperties()) {
258 representation.getMetadata().set(property, metadata.get(property));
259 }
260 return representation;
261 }
262
263 } finally {
264
265 transaction.end(true);
266 }
267 }
268
269 private boolean canTransform(final String fromType, final String toType) {
270 final String type = toType.trim().toLowerCase();
271 return type.equals("text/html") || type.equals("text/plain");
272 }
273
274 private InputStream transform(final String fromType, final String toType,
275 final InputStream fromStream) throws IOException {
276 final String type = toType.trim().toLowerCase();
277 if (type.equals("text/html")) {
278
279 final byte[] data = ByteStreams.toByteArray(fromStream);
280 final String string = new String(data, Charsets.UTF_8);
281 final ByteArrayOutputStream out = new ByteArrayOutputStream();
282 final OutputStreamWriter writer = new OutputStreamWriter(out, Charsets.UTF_8);
283 writer.append("<html>\n");
284 writer.append("<head>\n");
285 writer.append("<meta http-equiv=\"Content-type\" "
286 + "content=\"text/html;charset=UTF-8\"/>\n");
287 writer.append("</head>\n");
288 writer.append("<body>\n");
289 writer.append("<pre>");
290 writer.append(HtmlEscapers.htmlEscaper().escape(string));
291 writer.append("</pre>\n");
292 writer.append("</body>\n");
293 writer.append("</html>\n");
294 writer.close();
295 return new ByteArrayInputStream(out.toByteArray());
296 } else if (type.equals("text/plain")) {
297 return fromStream;
298 } else {
299 throw new UnsupportedOperationException();
300 }
301 }
302
303 @Override
304 protected Outcome doUpload(@Nullable final Long timeout, final URI resourceID,
305 @Nullable final Representation representation) throws Throwable {
306
307
308 String fileName = null;
309 Status status;
310
311
312 final DataTransaction transaction = Server.this.dataStore.begin(false);
313
314 try {
315
316 final Record resource = transaction.lookup(KS.RESOURCE,
317 ImmutableSet.of(resourceID), null).getUnique();
318 if (resource == null) {
319 throw newException(Status.ERROR_DEPENDENCY_NOT_FOUND, resourceID,
320 "Specified resource does not exist");
321 }
322
323
324 final Record oldMetadata = resource.getUnique(KS.STORED_AS, Record.class);
325
326
327 if (representation == null) {
328
329 status = oldMetadata == null ? Status.OK_UNMODIFIED : Status.OK_DELETED;
330 resource.set(KS.STORED_AS, null);
331
332 } else {
333
334 status = oldMetadata == null ? Status.OK_CREATED : Status.OK_MODIFIED;
335 final Record metadata = representation.getMetadata();
336 metadata.setID(Data.getValueFactory().createURI(resourceID + "_file"));
337 fileName = metadata.getUnique(NFO.FILE_NAME, String.class);
338 String fileType = metadata.getUnique(NIE.MIME_TYPE, String.class);
339 if (fileType != null) {
340 try {
341 MediaType.parse(fileType);
342 } catch (final IllegalArgumentException ex) {
343 fileType = null;
344 metadata.set(NIE.MIME_TYPE, null);
345 }
346 }
347 fileName = generateFileName(resourceID, fileName, fileType);
348 fileType = fileType != null ? fileType : Data.extensionToMimeType(fileName);
349 metadata.set(NFO.FILE_NAME, fileName);
350 metadata.set(NIE.MIME_TYPE, fileType);
351
352
353 final OutputStream stream = Server.this.fileStore.write(fileName);
354 try {
355
356 final CountingOutputStream cos = new CountingOutputStream(stream);
357 final HashingOutputStream hos = new HashingOutputStream(Hashing.md5(), cos);
358 representation.writeTo(hos);
359 hos.close();
360
361
362 final Record hash = Record.create();
363 hash.set(NFO.HASH_ALGORITHM, "MD5");
364 hash.set(NFO.HASH_VALUE, hos.hash().toString());
365 metadata.set(NFO.HAS_HASH, hash);
366 metadata.set(NFO.FILE_SIZE, cos.getCount());
367 if (metadata.isNull(NFO.FILE_LAST_MODIFIED)) {
368 metadata.set(NFO.FILE_LAST_MODIFIED, new Date());
369 }
370 } finally {
371 stream.close();
372 }
373
374
375 resource.set(KS.STORED_AS, Record.create(metadata, true));
376 }
377
378
379 if (status != Status.OK_UNMODIFIED) {
380 transaction.store(KS.RESOURCE, resource);
381 }
382
383
384 if (oldMetadata != null) {
385 deleteFileQuietly(oldMetadata.getUnique(NFO.FILE_NAME, String.class));
386 }
387
388
389 transaction.end(true);
390
391
392 return newOutcome(status, resourceID, null);
393
394 } catch (final Throwable ex) {
395
396 deleteFileQuietly(fileName);
397 transaction.end(false);
398 throw ex;
399 }
400 }
401
402 private String generateFileName(final URI resourceID,
403 @Nullable final String suppliedFileName, @Nullable final String suppliedFileType) {
404
405
406 String fileName = "file";
407 String fileExt = "bin";
408
409
410 if (suppliedFileName != null) {
411 final String name = UrlEscapers.urlPathSegmentEscaper().escape(suppliedFileName);
412 final int index = name.lastIndexOf('.');
413 if (index > 0 && index < name.length() - 1) {
414 fileName = name.substring(0, index);
415 fileExt = name.substring(index + 1);
416 }
417 }
418
419
420 if (suppliedFileType != null) {
421 final List<String> mimeExtensions = Data.mimeTypeToExtensions(suppliedFileType);
422 if (!mimeExtensions.isEmpty()) {
423 fileExt = mimeExtensions.get(0);
424 }
425 }
426
427
428 final String uri = resourceID.stringValue();
429 int start = 0;
430 int end = uri.length();
431 for (int index = 0; index < uri.length(); ++index) {
432 final char ch = uri.charAt(index);
433 if (ch == '/' || ch == ':') {
434 start = index + 1;
435 } else if (ch == '.') {
436 end = index;
437 } else if (ch == '#' || ch == '?') {
438 end = Math.min(end, index);
439 break;
440 }
441 }
442 if (start < end) {
443 fileName = uri.substring(start, end);
444 }
445
446
447 long fileVersion;
448 final long ts = System.currentTimeMillis();
449 synchronized (Server.class) {
450 ++Server.fileVersionCounter;
451 if (Server.fileVersionCounter < ts) {
452 Server.fileVersionCounter = ts;
453 }
454 fileVersion = Server.fileVersionCounter;
455 }
456
457
458 return fileName + "." + Long.toString(fileVersion, 32) + "." + fileExt;
459 }
460
461 private void deleteFileQuietly(@Nullable final String fileName) {
462 if (fileName != null) {
463 try {
464 Server.this.fileStore.delete(fileName);
465 } catch (final Throwable ex) {
466 LOGGER.error("Failed to delete file " + fileName
467 + " (will be garbage collected)", ex);
468 }
469 }
470 }
471
472 @Override
473 protected long doCount(@Nullable final Long timeout, final URI type,
474 @Nullable final XPath condition, @Nullable final Set<URI> ids) throws Throwable {
475
476
477 final Set<URI> actualIDs = ids != null ? ids : retrieveToLookup(type, condition);
478
479
480
481 if (actualIDs != null) {
482 return doRetrieve(timeout, type, condition, actualIDs, condition == null ? null : condition.getProperties(),
483 null, null).count();
484 }
485
486
487 final DataTransaction tx = Server.this.dataStore.begin(true);
488 try {
489 return tx.count(type, condition);
490 } finally {
491 tx.end(true);
492 }
493 }
494
495 @Override
496 protected Stream<Record> doRetrieve(@Nullable final Long timeout, final URI type,
497 @Nullable final XPath condition, @Nullable final Set<URI> ids,
498 @Nullable final Set<URI> properties, @Nullable final Long offset,
499 @Nullable final Long limit) throws Throwable {
500
501
502 final Set<URI> actualIDs = ids != null ? ids : retrieveToLookup(type, condition);
503
504
505 final DataTransaction tx = Server.this.dataStore.begin(true);
506
507 Stream<Record> stream;
508 if (actualIDs == null) {
509
510 stream = tx.retrieve(type, condition, properties);
511
512 } else {
513
514 Set<URI> props = properties;
515 if (props != null && condition != null
516 && !props.containsAll(condition.getProperties())) {
517 props = Sets.union(properties, condition.getProperties());
518 }
519 stream = tx.lookup(type, actualIDs, props);
520 if (condition != null) {
521 stream = stream.filter(condition.asPredicate(), 0);
522 }
523 if (props != properties) {
524 final URI[] array = properties.toArray(new URI[properties.size()]);
525 stream = stream.transform(new Function<Record, Record>() {
526
527 @Override
528 public Record apply(final Record record) {
529 record.retain(array);
530 return record;
531 }
532
533 }, 0);
534 }
535 }
536
537
538 if (offset != null || limit != null) {
539 stream = stream.slice(MoreObjects.firstNonNull(offset, 0L),
540 MoreObjects.firstNonNull(limit, Long.MAX_VALUE));
541 }
542
543
544 return attach(tx, stream);
545 }
546
547 @SuppressWarnings({ "unchecked", "rawtypes" })
548 private Set<URI> retrieveToLookup(final URI type, @Nullable final XPath condition)
549 throws IOException {
550
551 if (condition == null) {
552 return null;
553 }
554
555 final Map<URI, Set<Object>> restrictions = Maps.newHashMap();
556 condition.decompose(restrictions);
557
558 DataTransaction tx = null;
559 Set<URI> ids = null;
560 try {
561 if (KS.RESOURCE.equals(type) && restrictions.containsKey(KS.HAS_MENTION)) {
562 ids = Sets.newHashSet();
563 tx = Server.this.dataStore.begin(true);
564 tx.lookup(KS.MENTION, (Set) restrictions.get(KS.HAS_MENTION),
565 ImmutableSet.of(KS.MENTION_OF))
566 .transform(URI.class, true, KS.MENTION_OF).toCollection(ids);
567
568 } else if (KS.MENTION.equals(type) && restrictions.containsKey(KS.MENTION_OF)) {
569 ids = Sets.newHashSet();
570 tx = Server.this.dataStore.begin(true);
571 tx.lookup(KS.RESOURCE, (Set) restrictions.get(KS.MENTION_OF),
572 ImmutableSet.of(KS.HAS_MENTION))
573 .transform(URI.class, true, KS.HAS_MENTION).toCollection(ids);
574 }
575 } finally {
576 if (tx != null) {
577 tx.end(false);
578 }
579 }
580
581 return ids;
582 }
583
584 @Override
585 protected void doCreate(@Nullable final Long timeout, final URI type,
586 @Nullable final Stream<? extends Record> records,
587 final Handler<? super Outcome> handler) throws Throwable {
588
589 modify(new RecordUpdater() {
590
591 @Override
592 public Record computeNewRecord(final URI id, @Nullable final Record oldRecord,
593 @Nullable final Record suppliedRecord) throws Throwable {
594 assert suppliedRecord != null;
595 check(oldRecord == null, Status.ERROR_OBJECT_ALREADY_EXISTS, id, null);
596 return suppliedRecord;
597 }
598
599 }, type, null, records, handler);
600 }
601
602 @Override
603 protected void doMerge(@Nullable final Long timeout, final URI type,
604 @Nullable final Stream<? extends Record> records,
605 @Nullable final Criteria criteria, final Handler<? super Outcome> handler)
606 throws Throwable {
607
608 modify(new RecordUpdater() {
609
610 @Override
611 public Record computeNewRecord(final URI id, @Nullable final Record oldRecord,
612 @Nullable final Record suppliedRecord) throws Throwable {
613 assert suppliedRecord != null;
614 if (criteria == null) {
615 return oldRecord;
616 } else {
617 final Record record = oldRecord == null ? Record.create(id, type)
618 : Record.create(oldRecord, true);
619 criteria.merge(record, suppliedRecord);
620 return record;
621 }
622 }
623
624 }, type, null, records, handler);
625 }
626
627 @Override
628 protected void doUpdate(@Nullable final Long timeout, final URI type,
629 @Nullable final XPath condition, @Nullable final Set<URI> ids,
630 @Nullable final Record record, @Nullable final Criteria criteria,
631 final Handler<? super Outcome> handler) throws Throwable {
632
633 modify(new RecordUpdater() {
634
635 @Override
636 public Record computeNewRecord(final URI id, @Nullable final Record oldRecord,
637 @Nullable final Record suppliedRecord) throws Throwable {
638 assert oldRecord != null;
639 assert suppliedRecord == null;
640 final Record newRecord = Record.create(oldRecord, true);
641 criteria.merge(newRecord, record);
642 return newRecord;
643 }
644
645 }, type, condition, ids == null ? null : Stream.create(ids), handler);
646 }
647
648 @Override
649 protected void doDelete(@Nullable final Long timeout, final URI type,
650 @Nullable final XPath condition, @Nullable final Set<URI> ids,
651 final Handler<? super Outcome> handler) throws Throwable {
652
653 modify(new RecordUpdater() {
654
655 @Override
656 public Record computeNewRecord(final URI id, @Nullable final Record oldRecord,
657 @Nullable final Record suppliedRecord) throws Throwable {
658 assert oldRecord != null;
659 assert suppliedRecord == null;
660 return null;
661 }
662
663 }, type, condition, ids == null ? null : Stream.create(ids), handler);
664 }
665
666 private void modify(final RecordUpdater updater, final URI type,
667 @Nullable final XPath condition, @Nullable final Stream<?> recordOrIDStream,
668 final Handler<? super Outcome> handler) throws Throwable {
669
670
671 final Stream<?> stream = recordOrIDStream != null ? recordOrIDStream :
672 retrieveIDs(type, condition);
673
674 try {
675
676 stream.chunk(Server.this.chunkSize).toHandler(new Handler<List<?>>() {
677
678 private final AtomicLong index = new AtomicLong(0L);
679
680 @Override
681 public void handle(@Nullable final List<?> chunk) throws Throwable {
682 final long startIndex = this.index.get();
683 if (chunk != null) {
684
685 final boolean success = modifyChunk(updater, type, condition, chunk,
686 handler, this.index, false);
687
688
689 if (!success) {
690 this.index.set(startIndex);
691 for (int i = 0; !Thread.interrupted() && i < chunk.size(); ++i) {
692 final List<?> newChunk = ImmutableList.of(chunk.get(i));
693 modifyChunk(updater, type, condition, newChunk, handler,
694 this.index, true);
695 }
696 }
697 }
698 }
699
700 });
701
702
703 handler.handle(null);
704
705 } finally {
706
707 closeQuietly(stream);
708 }
709 }
710
711 private boolean modifyChunk(final RecordUpdater updater, final URI type,
712 @Nullable final XPath condition, final List<?> suppliedRecordsOrIDs,
713 final Handler<? super Outcome> handler, final AtomicLong index,
714 final boolean reportFailure) throws Throwable {
715
716
717 final long startIndex = index.get();
718 final ValueFactory factory = Data.getValueFactory();
719 final int size = suppliedRecordsOrIDs.size();
720 final List<Outcome> outcomes = Lists.newArrayListWithCapacity(size);
721 final List<URI> ids = Lists.newArrayListWithCapacity(size);
722 final List<Record> suppliedRecords = Lists.newArrayListWithExpectedSize(size);
723 for (final Object input : suppliedRecordsOrIDs) {
724 if (input instanceof URI) {
725 ids.add((URI) input);
726 suppliedRecords.add(null);
727 } else {
728 final Record record = (Record) input;
729 ids.add(record.getID());
730 suppliedRecords.add(record);
731 }
732 }
733
734
735 final DataTransaction tx = Server.this.dataStore.begin(false);
736
737 try {
738
739 final Stream<Record> stream = tx.lookup(type, ImmutableSet.copyOf(ids), null);
740 final Map<URI, Record> oldRecords = stream.toMap(new Function<Record, URI>() {
741
742 @Override
743 public URI apply(final Record record) {
744 return record.getID();
745 }
746
747 }, Functions.<Record>identity());
748
749
750
751 for (int i = 0; !Thread.interrupted() && i < size; ++i) {
752 final URI id = ids.get(i);
753 final Record oldRecord = oldRecords.get(id);
754 final Record suppliedRecord = suppliedRecords.get(i);
755 if (id == null) {
756 assert suppliedRecord != null;
757 outcomes.add(newOutcome(Status.ERROR_INVALID_INPUT, null,
758 "Missing ID for record:\n" + suppliedRecord
759 .toString(Data.getNamespaceMap(), true)));
760
761 } else if (suppliedRecord != null || oldRecord != null
762 && (condition == null || condition.evalBoolean(oldRecord))) {
763 final URI oldInvocationID = getInvocationID();
764 setInvocationID(factory.createURI(oldInvocationID + "#"
765 + index.incrementAndGet()));
766 try {
767 outcomes.add(modifyRecord(updater, tx, id, oldRecord, suppliedRecord));
768 } catch (final OperationException ex) {
769 outcomes.add(ex.getOutcome());
770 } finally {
771 setInvocationID(oldInvocationID);
772 }
773 }
774 }
775
776
777 tx.end(true);
778
779
780 for (final Outcome outcome : outcomes) {
781 handler.handle(outcome);
782 }
783 return true;
784
785 } catch (final Throwable ex) {
786
787 LOGGER.error("Data processing error", ex);
788
789
790 if (reportFailure) {
791 for (int i = 0; i < ids.size(); ++i) {
792 index.set(startIndex);
793 handler.handle(Outcome.create(
794 Status.ERROR_UNEXPECTED,
795 factory.createURI(getInvocationID() + "#"
796 + index.incrementAndGet()), ids.get(i), ex.getMessage()));
797 }
798 }
799
800
801 tx.end(false);
802 return false;
803 }
804 }
805
806 private Outcome modifyRecord(final RecordUpdater updater,
807 final DataTransaction transaction, final URI recordID,
808 @Nullable final Record oldRecord, @Nullable final Record suppliedRecord)
809 throws Throwable {
810
811
812 final Set<Record> recordsToStore = Sets.newHashSet();
813 final Set<Record> recordsToDelete = Sets.newHashSet();
814
815
816 if (suppliedRecord != null) {
817 preprocess(suppliedRecord);
818 }
819
820
821 final Record newRecord = updater.computeNewRecord(recordID, oldRecord, suppliedRecord);
822 if (newRecord != null) {
823 expand(newRecord);
824 }
825
826
827 Status status = Status.OK_UNMODIFIED;
828 if (newRecord == null) {
829 if (oldRecord != null) {
830 recordsToDelete.add(oldRecord);
831 status = Status.OK_DELETED;
832 }
833 } else {
834 if (oldRecord == null) {
835 recordsToStore.add(newRecord);
836 status = Status.OK_CREATED;
837 } else if (!oldRecord.hash().equals(newRecord.hash())) {
838 recordsToStore.add(newRecord);
839 status = Status.OK_MODIFIED;
840 }
841 }
842 if (status == Status.OK_UNMODIFIED) {
843 return newOutcome(status, recordID, null);
844 }
845
846
847 final Map<URI, Record> nilMap = ImmutableMap.of();
848 final Map<URI, Record> oldMap = oldRecord == null ? nilMap : extractRelated(oldRecord);
849 final Map<URI, Record> newMap = newRecord == null ? nilMap : extractRelated(newRecord);
850
851
852 for (final URI id : Sets.union(oldMap.keySet(), newMap.keySet())) {
853
854
855 final Record oldRel = oldMap.get(id);
856 final Record newRel = newMap.get(id);
857 final URI type = MoreObjects.firstNonNull(oldRel, newRel).getSystemType();
858 if (oldRel != null && newRel != null) {
859 for (final URI property : oldRel.getProperties()) {
860 final List<URI> newValues = newRel.get(property, URI.class);
861 if (!newValues.isEmpty()) {
862 final List<URI> oldValues = oldRel.get(property, URI.class);
863 oldRel.remove(property, newValues);
864 newRel.remove(property, oldValues);
865 }
866 }
867 }
868 final List<URI> nilList = ImmutableList.of();
869 final List<URI> oldProperties = oldRel == null ? nilList : oldRel.getProperties();
870 final List<URI> newProperties = newRel == null ? nilList : newRel.getProperties();
871
872
873
874 if (!oldProperties.isEmpty() || !newProperties.isEmpty()) {
875 Record related = transaction.lookup(type, ImmutableSet.of(id), null)
876 .getUnique();
877 if (related == null) {
878 related = Record.create(id, type);
879 }
880 recordsToStore.add(related);
881 for (final URI property : oldProperties) {
882 assert oldRel != null;
883 if (!property.equals(RDF.TYPE)) {
884 related.remove(property, oldRel.get(property));
885 }
886 }
887 for (final URI property : newProperties) {
888 assert newRel != null;
889 if (!property.equals(RDF.TYPE)) {
890 related.add(property, newRel.get(property));
891 }
892 }
893 expand(related);
894 }
895 }
896
897
898 for (final Record record : recordsToStore) {
899 transaction.store(record.getSystemType(), record);
900 }
901 for (final Record record : recordsToDelete) {
902 transaction.delete(record.getSystemType(), record.getID());
903 }
904
905
906 return newOutcome(status, recordID, null);
907 }
908
909 private Stream<URI> retrieveIDs(final URI type, @Nullable final XPath condition)
910 throws Throwable {
911
912
913 final FileBackedOutputStream buffer = new FileBackedOutputStream(
914 Server.this.bufferSize);
915
916 try {
917
918 final Writer writer = new OutputStreamWriter(buffer, Charsets.UTF_8);
919 final DataTransaction tx = Server.this.dataStore.begin(true);
920 Stream<Record> cursor = null;
921 try {
922 cursor = tx.retrieve(type, condition, ImmutableSet.<URI>of());
923 cursor.toHandler(new Handler<Record>() {
924
925 @Override
926 public void handle(final Record record) throws Throwable {
927 if (record != null) {
928 writer.write(record.getID().stringValue());
929 writer.write("\n");
930 }
931 }
932
933 });
934 } finally {
935 closeQuietly(cursor);
936 tx.end(true);
937 writer.flush();
938 }
939
940
941 final BufferedReader reader = buffer.asByteSource().asCharSource(Charsets.UTF_8)
942 .openBufferedStream();
943 return Stream.create(new AbstractIterator<URI>() {
944
945 @Override
946 protected URI computeNext() {
947 try {
948 final String line = reader.readLine();
949 return line == null ? endOfData() : Data.getValueFactory().createURI(
950 line);
951 } catch (final Throwable ex) {
952 throw Throwables.propagate(ex);
953 }
954 }
955
956 }).onClose(buffer);
957
958 } catch (final Throwable ex) {
959
960 buffer.close();
961 throw ex;
962 }
963 }
964
965 @Override
966 protected Stream<Record> doMatch(@Nullable final Long timeout,
967 final Map<URI, XPath> conditions, final Map<URI, Set<URI>> ids,
968 final Map<URI, Set<URI>> properties) throws Throwable {
969
970 throw new UnsupportedOperationException();
971 }
972
973 @SuppressWarnings("unchecked")
974 @Override
975 protected Outcome doSparqlUpdate(@Nullable Long timeout, @Nullable Stream<? extends Statement> statements) throws Throwable {
976
977 LOGGER.debug("Server.UPDATING");
978 final TripleTransaction tx = Server.this.tripleStore.begin(false);
979 try {
980 tx.add(statements);
981 Outcome outcome = newOutcome(Status.OK_BULK, null, null);
982 tx.end(true);
983 return outcome;
984 } catch (final Throwable ex) {
985 ex.printStackTrace();
986 tx.end(false);
987 throw ex;
988 }
989 finally {
990 closeQuietly(statements);
991 }
992 }
993
994 @SuppressWarnings("unchecked")
995 @Override
996 protected Outcome doSparqlDelete(@Nullable Long timeout, @Nullable Stream<? extends Statement> statements) throws Throwable {
997
998 LOGGER.debug("Server.REMOVING");
999 final TripleTransaction tx = Server.this.tripleStore.begin(false);
1000 try {
1001 tx.remove(statements);
1002 Outcome outcome = newOutcome(Status.OK_BULK, null, null);
1003 tx.end(true);
1004 return outcome;
1005 } catch (final Throwable ex) {
1006 tx.end(false);
1007 throw ex;
1008 }
1009 finally {
1010 closeQuietly(statements);
1011 }
1012 }
1013
1014 @SuppressWarnings("unchecked")
1015 @Override
1016 protected <T> Stream<T> doSparql(@Nullable final Long timeout, final Class<T> type,
1017 final String expression, @Nullable final Set<URI> defaultGraphs,
1018 @Nullable final Set<URI> namedGraphs) throws Throwable {
1019
1020
1021 final ParsedQuery parsedQuery;
1022 try {
1023 parsedQuery = SparqlHelper.parse(expression, null);
1024 } catch (final Throwable ex) {
1025 throw newException(Status.ERROR_INVALID_INPUT, null, ex.getMessage(), ex);
1026 }
1027
1028
1029 Dataset dataset = parsedQuery.getDataset();
1030 if (defaultGraphs != null || namedGraphs != null) {
1031 final DatasetImpl ds = new DatasetImpl();
1032 final Set<URI> emptyGraphs = ImmutableSet.of();
1033 for (final URI graph : MoreObjects.firstNonNull(defaultGraphs, emptyGraphs)) {
1034 ds.addDefaultGraph(graph);
1035 }
1036 for (final URI graph : MoreObjects.firstNonNull(namedGraphs, emptyGraphs)) {
1037 ds.addNamedGraph(graph);
1038 }
1039 dataset = ds;
1040 }
1041
1042
1043 final TripleTransaction tx = Server.this.tripleStore.begin(true);
1044 try {
1045
1046 final TupleExpr expr = parsedQuery.getTupleExpr();
1047 final CloseableIteration<BindingSet, QueryEvaluationException> iteration;
1048 iteration = SparqlHelper.evaluate(tx, expr, dataset, null, timeout);
1049
1050
1051 if (type == BindingSet.class) {
1052 return attach(tx, (Stream<T>) RDFUtil.toBindingsStream(iteration, parsedQuery
1053 .getTupleExpr().getBindingNames()));
1054 } else if (type == Statement.class) {
1055 return (Stream<T>) attach(tx, RDFUtil.toStatementStream(iteration));
1056 } else if (type == Boolean.class) {
1057 try {
1058 return (Stream<T>) attach(tx, Stream.create(iteration.hasNext()));
1059 } finally {
1060 iteration.close();
1061 }
1062 } else {
1063 throw new Error("Unexpected result type: " + type);
1064 }
1065
1066 } catch (final Throwable ex) {
1067 tx.end(true);
1068 throw ex;
1069 }
1070 }
1071
1072 @Override
1073 protected void doClose() {
1074 evictClosedSessions();
1075
1076 }
1077
1078 }
1079
1080 private void preprocess(final Record record) throws Throwable {
1081
1082
1083 if (KS.RESOURCE.equals(record.getSystemType())) {
1084 record.set(KS.STORED_AS, null);
1085 }
1086
1087
1088 }
1089
1090 private void expand(final Record record) throws Throwable {
1091
1092
1093 }
1094
1095 private Map<URI, Record> extractRelated(final Record record) throws Throwable {
1096
1097
1098
1099 final URI id = record.getID();
1100 final URI type = record.getSystemType();
1101
1102 final Map<URI, Record> map = Maps.newHashMap();
1103 if (type.equals(KS.RESOURCE)) {
1104 for (final URI mentionID : record.get(KS.HAS_MENTION, URI.class)) {
1105 map.put(mentionID, Record.create(mentionID, KS.MENTION).add(KS.MENTION_OF, id));
1106 }
1107
1108 } else if (type.equals(KS.MENTION)) {
1109 final URI resourceID = record.getUnique(KS.MENTION_OF, URI.class);
1110 if (resourceID != null) {
1111 map.put(resourceID, Record.create(resourceID, KS.RESOURCE).add(KS.HAS_MENTION, id));
1112 }
1113
1114 } else {
1115
1116 throw new Error("Unexpected type: " + type);
1117 }
1118
1119 return map;
1120 }
1121
1122 private interface RecordUpdater {
1123
1124 @Nullable
1125 Record computeNewRecord(final URI id, @Nullable final Record oldRecord,
1126 @Nullable final Record suppliedRecord) throws Throwable;
1127
1128 }
1129
1130 public static Builder builder(final FileStore fileStore, final DataStore dataStore,
1131 final TripleStore tripleStore) {
1132 return new Builder(fileStore, dataStore, tripleStore);
1133 }
1134
1135 public static class Builder {
1136
1137 private final FileStore fileStore;
1138
1139 private final DataStore dataStore;
1140
1141 private final TripleStore tripleStore;
1142
1143 @Nullable
1144 private Integer chunkSize;
1145
1146 @Nullable
1147 private Integer bufferSize;
1148
1149 Builder(final FileStore fileStore, final DataStore dataStore, final TripleStore tripleStore) {
1150 this.fileStore = Preconditions.checkNotNull(fileStore);
1151 this.dataStore = Preconditions.checkNotNull(dataStore);
1152 this.tripleStore = Preconditions.checkNotNull(tripleStore);
1153 }
1154
1155 public Builder chunkSize(@Nullable final Integer chunkSize) {
1156 this.chunkSize = chunkSize;
1157 return this;
1158 }
1159
1160 public Builder bufferSize(@Nullable final Integer bufferSize) {
1161 this.bufferSize = bufferSize;
1162 return this;
1163 }
1164
1165 public Server build() {
1166 return new Server(this);
1167 }
1168
1169 }
1170
1171 }