1 package eu.fbk.knowledgestore;
2
3 import com.google.common.collect.Lists;
4 import eu.fbk.knowledgestore.Operation.*;
5 import eu.fbk.knowledgestore.Outcome.Status;
6 import eu.fbk.knowledgestore.data.*;
7 import eu.fbk.knowledgestore.vocabulary.NFO;
8 import eu.fbk.knowledgestore.vocabulary.NIE;
9 import org.openrdf.model.Statement;
10 import org.openrdf.model.URI;
11 import org.openrdf.query.BindingSet;
12 import org.slf4j.Logger;
13 import org.slf4j.LoggerFactory;
14 import org.slf4j.MDC;
15
16 import javax.annotation.Nullable;
17 import java.io.Closeable;
18 import java.util.*;
19 import java.util.concurrent.ScheduledFuture;
20 import java.util.concurrent.TimeUnit;
21 import java.util.concurrent.atomic.AtomicBoolean;
22 import java.util.concurrent.atomic.AtomicReference;
23
24 public abstract class AbstractSession implements Session {
25
26 private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSession.class);
27
28 private static final String MDC_ATTRIBUTE = "context";
29
30 private static final long GRACE_PERIOD = 5000;
31
32 private static final long CLOSE_WAIT_TIME = 1000;
33
34 private static long invocationCounter = 0;
35
36
37
38 private final Map<String, String> namespaces;
39
40 @Nullable
41 private final String username;
42
43 @Nullable
44 private final String password;
45
46 private final long creationTime;
47
48 private final WeakHashMap<Closeable, ?> pendingCloseables;
49
50 private final AtomicBoolean closed;
51
52
53
54 private long timestamp;
55
56 @Nullable
57 private String operation;
58
59 @Nullable
60 private URI recordType;
61
62 @Nullable
63 private Class<?> streamType;
64
65 @Nullable
66 private URI objectID;
67
68 @Nullable
69 private URI invocationID;
70
71 @Nullable
72 private String oldMDCAttribute;
73
74 private long numCreated;
75
76 private long numDeleted;
77
78 private long numModified;
79
80 private long numUnmodified;
81
82 private final List<Outcome> failedOutcomes;
83
84 @Nullable
85 private ScheduledFuture<?> interruptFuture;
86
87 @Nullable
88 private Thread interruptThread;
89
90 protected AbstractSession(@Nullable final Map<String, String> namespaces,
91 @Nullable final String username, @Nullable final String password) {
92 this.namespaces = namespaces != null ? namespaces : Data.newNamespaceMap();
93 this.username = username;
94 this.creationTime = System.currentTimeMillis();
95 this.password = password;
96 this.pendingCloseables = new WeakHashMap<Closeable, Object>();
97 this.closed = new AtomicBoolean(false);
98 this.failedOutcomes = Lists.newArrayList();
99 }
100
101 @Nullable
102 private void start(final String operation, @Nullable final URI recordType,
103 @Nullable final Class<?> streamType, @Nullable final URI objectID, @Nullable final Long timeout) {
104
105 this.timestamp = System.currentTimeMillis();
106 this.operation = operation;
107 this.recordType = recordType;
108 this.streamType = streamType;
109 this.objectID = objectID;
110 this.numCreated = 0L;
111 this.numDeleted = 0L;
112 this.numModified = 0L;
113 this.numUnmodified = 0L;
114 this.failedOutcomes.clear();
115 this.oldMDCAttribute = MDC.get(MDC_ATTRIBUTE);
116
117 URI invocationID = null;
118 if (this.oldMDCAttribute != null) {
119 try {
120 invocationID = Data.getValueFactory().createURI(this.oldMDCAttribute);
121 } catch (final Throwable ex) {
122
123 }
124 }
125 if (invocationID == null) {
126 final long ts = System.currentTimeMillis();
127 final long counter;
128 synchronized (AbstractSession.class) {
129 ++AbstractSession.invocationCounter;
130 if (AbstractSession.invocationCounter < ts) {
131 AbstractSession.invocationCounter = ts;
132 }
133 counter = AbstractSession.invocationCounter;
134 }
135 invocationID = Data.getValueFactory().createURI("req:" + Long.toString(counter, 32));
136 }
137 setInvocationID(invocationID);
138
139 if (timeout != null) {
140 final Thread thread = Thread.currentThread();
141 this.interruptFuture = Data.getExecutor().schedule(new Runnable() {
142
143 @Override
144 public void run() {
145 thread.interrupt();
146 }
147
148 }, timeout + GRACE_PERIOD, TimeUnit.MILLISECONDS);
149 }
150
151 synchronized (this.closed) {
152 this.interruptThread = Thread.currentThread();
153 }
154 }
155
156 private void end() throws OperationException {
157 try {
158 if (this.interruptFuture != null && !this.interruptFuture.isDone()) {
159 if (!this.interruptFuture.cancel(false)) {
160 try {
161 this.interruptFuture.get();
162 } catch (final Throwable ex) {
163
164 }
165 }
166 }
167 if (!this.failedOutcomes.isEmpty()) {
168 throw fail(null);
169 }
170 } finally {
171 MDC.put(MDC_ATTRIBUTE, this.oldMDCAttribute);
172 this.interruptFuture = null;
173 this.oldMDCAttribute = null;
174 synchronized (this.closed) {
175 this.interruptThread = null;
176 Thread.interrupted();
177 this.closed.notify();
178 }
179 }
180 }
181
182 private <T extends Closeable> T filter(@Nullable final T closeable) {
183 if (closeable != null) {
184 this.pendingCloseables.put(closeable, null);
185 }
186 return closeable;
187 }
188
189 @Nullable
190 private <E> Stream<E> filter(@Nullable final Stream<E> stream) {
191 if (stream == null) {
192 return null;
193 }
194 Stream<E> result = stream;
195 if (this.interruptFuture != null) {
196 result = stream.setTimeout(System.currentTimeMillis()
197 + this.interruptFuture.getDelay(TimeUnit.MILLISECONDS));
198 }
199 this.pendingCloseables.put(result, null);
200
201 return result;
202 }
203
204 private Handler<Outcome> filter(@Nullable final Handler<? super Outcome> handler) {
205 return new Handler<Outcome>() {
206
207 private boolean log = true;
208
209 @Override
210 public void handle(final Outcome outcome) throws Throwable {
211 if (outcome != null) {
212 final Status status = outcome.getStatus();
213 if (status.isError()) {
214 AbstractSession.this.failedOutcomes.add(outcome);
215 } else if (status == Status.OK_CREATED) {
216 ++AbstractSession.this.numCreated;
217 } else if (status == Status.OK_DELETED) {
218 ++AbstractSession.this.numDeleted;
219 } else if (status == Status.OK_MODIFIED) {
220 ++AbstractSession.this.numModified;
221 } else {
222 ++AbstractSession.this.numUnmodified;
223 }
224 }
225 if (handler != null) {
226 try {
227 handler.handle(outcome);
228 } catch (final Throwable ex) {
229 if (this.log) {
230 this.log = false;
231 LOGGER.error("Handler failure: iteration being interrupted, "
232 + "no additional exception from handler will be logged", ex);
233 }
234 Thread.currentThread().interrupt();
235 }
236 }
237 }
238
239 };
240 }
241
242 private OperationException fail(@Nullable final Throwable ex) {
243 if (!this.failedOutcomes.isEmpty()) {
244 final int size = this.failedOutcomes.size();
245 final List<Throwable> causes = Lists.newArrayListWithCapacity(size);
246 for (final Outcome failedOutcome : this.failedOutcomes) {
247 causes.add(new OperationException(failedOutcome));
248 }
249 this.failedOutcomes.clear();
250 if (ex != null) {
251 causes.add(ex);
252 }
253 return new OperationException(outcome(Status.ERROR_BULK, null), causes);
254 } else if (ex instanceof OperationException) {
255 return (OperationException) ex;
256 } else if (ex != null) {
257 final AtomicReference<String> message = new AtomicReference<String>(ex.getMessage());
258 Status status = Status.ERROR_UNEXPECTED;
259 try {
260 status = doFail(ex, message);
261 } catch (final Throwable ex2) {
262 LOGGER.warn("Could not map " + ex.getClass().getSimpleName()
263 + " to status / message", ex);
264 }
265 return new OperationException(outcome(status, message.get()), ex);
266 } else {
267 return new OperationException(outcome(Status.ERROR_UNEXPECTED, null));
268 }
269 }
270
271 private Outcome outcome(final Status status, @Nullable final String message) {
272 final long elapsed = System.currentTimeMillis() - this.timestamp;
273 final StringBuilder builder = new StringBuilder();
274 final long numObjects = this.numCreated + this.numDeleted + this.numModified
275 + this.numUnmodified;
276 if (numObjects > 0 || status == Status.OK_BULK || status == Status.ERROR_BULK) {
277 builder.append(numObjects).append(' ')
278 .append(this.recordType.getLocalName().toLowerCase()).append("(s) involved");
279 if (this.numCreated > 0) {
280 builder.append(", ").append(this.numCreated).append(" created");
281 }
282 if (this.numModified > 0) {
283 builder.append(", ").append(this.numModified).append(" modified");
284 }
285 if (this.numUnmodified > 0) {
286 builder.append(", ").append(this.numUnmodified).append(" unmodified");
287 }
288 if (this.numDeleted > 0) {
289 builder.append(", ").append(this.numDeleted).append(" deleted");
290 }
291 if (!this.failedOutcomes.isEmpty()) {
292 builder.append(", ").append(this.failedOutcomes.size()).append(" failed");
293 }
294 if (numObjects == 0) {
295 if (message != null) {
296 builder.append(", ");
297 }
298 }
299 }
300 if (message != null) {
301 builder.append(message);
302 }
303 if (builder.length() > 0) {
304 builder.append(", ");
305 }
306 builder.append(elapsed).append(" ms");
307 return Outcome.create(status, this.invocationID, this.objectID, builder.toString());
308 }
309
310 private void logRequest(final Object... args) {
311 if (LOGGER.isInfoEnabled()) {
312 final StringBuilder builder = new StringBuilder();
313 builder.append(this.operation.toUpperCase());
314 if (this.recordType != null) {
315 builder.append(' ').append(this.recordType.getLocalName().toUpperCase());
316 }
317 String separator = " ";
318 for (int i = 0; i < args.length; i += 2) {
319 final Object name = args[i];
320 final Object value = args[i + 1];
321 if (value != null) {
322 builder.append(separator);
323 if (name != null) {
324 builder.append(name).append('=');
325 }
326 if (value instanceof Collection<?> && ((Collection<?>) value).size() > 10) {
327 builder.append("[...").append(((Collection<?>) value).size())
328 .append(" elements...]");
329 } else {
330 builder.append(value);
331 }
332 separator = ", ";
333 }
334 }
335 LOGGER.info(builder.toString());
336 }
337 }
338
339 private <T> T logResponse(final T result) {
340 if (LOGGER.isInfoEnabled()) {
341 final long elapsed = System.currentTimeMillis() - this.timestamp;
342 if (result instanceof Outcome) {
343 final Outcome outcome = (Outcome) result;
344 final String message = outcome.getMessage();
345 LOGGER.info("Result: {}{}, {} ms", outcome.getStatus(), message == null ? ""
346 : ", " + message, elapsed);
347 } else if (result instanceof Long) {
348 LOGGER.info("Result: {}, {} ms", result, elapsed);
349 } else if (result instanceof Representation) {
350 final Record meta = ((Representation) result).getMetadata();
351 final String file = meta.getUnique(NFO.FILE_NAME, String.class, "unnamed file");
352 final String type = meta.getUnique(NIE.MIME_TYPE, String.class, "unknown type");
353 final long size = meta.getUnique(NFO.FILE_SIZE, Long.class, -1L);
354 LOGGER.info("Result: {}, {}, {}, {} ms", file, type, size >= 0 ? size + " bytes"
355 : "unknown size", elapsed);
356 } else if (result instanceof Stream) {
357 LOGGER.info("Result: {} stream, {} ms",
358 this.streamType == BindingSet.class ? "tuple" : this.streamType
359 .getSimpleName().toLowerCase(), elapsed);
360 } else if (result == null) {
361 LOGGER.info("Result: null, {} ms", elapsed);
362 }
363 }
364 return result;
365 }
366
367 @Override
368 @Nullable
369 public final String getUsername() throws IllegalStateException {
370 checkNotClosed();
371 return this.username;
372 }
373
374 @Override
375 public final String getPassword() throws IllegalStateException {
376 checkNotClosed();
377 return this.password;
378 }
379
380 @Override
381 public final Map<String, String> getNamespaces() throws IllegalStateException {
382 checkNotClosed();
383 return this.namespaces;
384 }
385
386 @Override
387 public final Download download(final URI resourceID) throws IllegalStateException {
388 checkNotClosed();
389 return new Download(this.namespaces, resourceID) {
390
391 @Override
392 @Nullable
393 protected Representation doExec(@Nullable final Long timeout, final URI id,
394 @Nullable final Set<String> mimeTypes, final boolean caching)
395 throws OperationException {
396
397 synchronized (AbstractSession.this) {
398 checkNotClosed();
399 start("DOWNLOAD", null, null, id, timeout);
400 try {
401 logRequest(null, id, "accept", mimeTypes,
402 null, caching ? null : "no caching", "timeout", timeout);
403 return logResponse(filter(doDownload(timeout, id, mimeTypes, caching)));
404 } catch (final Throwable ex) {
405 throw fail(ex);
406 } finally {
407 end();
408 }
409 }
410
411 }
412
413 };
414 }
415
416 @Override
417 public final Upload upload(final URI resourceID) throws IllegalStateException {
418 checkNotClosed();
419 return new Upload(this.namespaces, resourceID) {
420
421 @Override
422 protected Outcome doExec(@Nullable final Long timeout, final URI id,
423 @Nullable final Representation representation) throws OperationException {
424 synchronized (AbstractSession.this) {
425 checkNotClosed();
426 start("UPLOAD", null, null, id, timeout);
427 try {
428 logRequest(null, id, "content", representation != null, "timeout", timeout);
429 return logResponse(doUpload(timeout, resourceID, representation));
430 } catch (final Throwable ex) {
431 throw fail(ex);
432 } finally {
433 end();
434 }
435 }
436 }
437
438 };
439 }
440
441 @Override
442 public final Count count(final URI type) throws IllegalStateException {
443 checkNotClosed();
444 return new Count(this.namespaces, type) {
445
446 @Override
447 protected long doExec(@Nullable final Long timeout, final URI type,
448 @Nullable final XPath condition, @Nullable final Set<URI> ids)
449 throws OperationException {
450 synchronized (AbstractSession.this) {
451 checkNotClosed();
452 start("COUNT", type, null, null, timeout);
453 try {
454 logRequest(null, condition, "ids", ids, "timeout", timeout);
455 return logResponse(doCount(timeout, type, condition, ids));
456 } catch (final Throwable ex) {
457 throw fail(ex);
458 } finally {
459 end();
460 }
461 }
462 }
463
464 };
465 }
466
467 @Override
468 public final Retrieve retrieve(final URI type) throws IllegalStateException {
469 checkNotClosed();
470 return new Retrieve(this.namespaces, type) {
471
472 @Override
473 protected Stream<Record> doExec(@Nullable final Long timeout, final URI type,
474 @Nullable final XPath condition, @Nullable final Set<URI> ids,
475 @Nullable final Set<URI> properties, @Nullable final Long offset,
476 @Nullable final Long limit) throws OperationException {
477 synchronized (AbstractSession.this) {
478 checkNotClosed();
479 start("RETRIEVE", type, Record.class, null, timeout);
480 try {
481 logRequest(null, condition, "ids", ids, "props", properties,
482 "offset", offset, "limit", limit, "timeout", timeout);
483 return logResponse(doRetrieve(timeout, type, condition, ids, properties,
484 offset, limit));
485 } catch (final Throwable ex) {
486 throw fail(ex);
487 } finally {
488 end();
489 }
490 }
491 }
492
493 };
494 }
495
496 @Override
497 public final Create create(final URI type) throws IllegalStateException {
498 checkNotClosed();
499 return new Create(this.namespaces, type) {
500
501 @Override
502 protected Outcome doExec(@Nullable final Long timeout, final URI type,
503 @Nullable final Stream<? extends Record> records,
504 final Handler<? super Outcome> handler) throws OperationException {
505 synchronized (AbstractSession.this) {
506 checkNotClosed();
507 start("CREATE", type, null, null, timeout);
508 try {
509 logRequest("timeout", timeout);
510 doCreate(timeout, type, records, filter(handler));
511 return logResponse(outcome(Status.OK_BULK, null));
512 } catch (final Throwable ex) {
513 throw fail(ex);
514 } finally {
515 end();
516 }
517 }
518 }
519
520 };
521 }
522
523 @Override
524 public final Merge merge(final URI type) throws IllegalStateException {
525 checkNotClosed();
526 return new Merge(this.namespaces, type) {
527
528 @Override
529 protected Outcome doExec(@Nullable final Long timeout, final URI type,
530 @Nullable final Stream<? extends Record> records,
531 @Nullable final Criteria criteria, final Handler<? super Outcome> handler)
532 throws OperationException {
533 synchronized (AbstractSession.this) {
534 checkNotClosed();
535 start("MERGE", type, null, null, timeout);
536 try {
537 logRequest(null, criteria, "timeout", timeout);
538 doMerge(timeout, type, records, criteria, filter(handler));
539 return logResponse(outcome(Status.OK_BULK, null));
540 } catch (final Throwable ex) {
541 throw fail(ex);
542 } finally {
543 end();
544 }
545 }
546 }
547
548 };
549 }
550
551 @Override
552 public final Update update(final URI type) throws IllegalStateException {
553 checkNotClosed();
554 return new Update(this.namespaces, type) {
555
556 @Override
557 protected Outcome doExec(@Nullable final Long timeout, final URI type,
558 @Nullable final XPath condition, @Nullable final Set<URI> ids,
559 @Nullable final Record record, @Nullable final Criteria criteria,
560 final Handler<? super Outcome> handler) throws OperationException {
561 synchronized (AbstractSession.this) {
562 checkNotClosed();
563 start("UPDATE", type, null, null, timeout);
564 try {
565 logRequest(null, criteria, null, condition, "ids", ids, "timeout", timeout);
566 doUpdate(timeout, type, condition, ids, record, criteria, filter(handler));
567 return logResponse(outcome(Status.OK_BULK, null));
568 } catch (final Throwable ex) {
569 throw fail(ex);
570 } finally {
571 end();
572 }
573 }
574 }
575
576 };
577 }
578
579 @Override
580 public final Delete delete(final URI type) throws IllegalStateException {
581 checkNotClosed();
582 return new Delete(this.namespaces, type) {
583
584 @Override
585 protected Outcome doExec(@Nullable final Long timeout, final URI type,
586 @Nullable final XPath condition, @Nullable final Set<URI> ids,
587 final Handler<? super Outcome> handler) throws OperationException {
588 synchronized (AbstractSession.this) {
589 checkNotClosed();
590 start("DELETE", type, null, null, timeout);
591 try {
592 logRequest(null, condition, "ids", ids, "timeout", timeout);
593 doDelete(timeout, type, condition, ids, filter(handler));
594 return logResponse(outcome(Status.OK_BULK, null));
595 } catch (final Throwable ex) {
596 throw fail(ex);
597 } finally {
598 end();
599 }
600 }
601 }
602
603 };
604 }
605
606 @Override
607 public final Match match() throws IllegalStateException {
608 checkNotClosed();
609 return new Match(this.namespaces) {
610
611 @Override
612 protected Stream<Record> doExec(@Nullable final Long timeout,
613 final Map<URI, XPath> conditions, final Map<URI, Set<URI>> ids,
614 final Map<URI, Set<URI>> properties) throws OperationException {
615 synchronized (AbstractSession.this) {
616 checkNotClosed();
617 start("MATCH", null, Record.class, null, timeout);
618 try {
619 logRequest(null, conditions, "ids", ids, "props",
620 properties, "timeout", timeout);
621 return logResponse(filter(doMatch(timeout, conditions, ids, properties)));
622 } catch (final Throwable ex) {
623 throw fail(ex);
624 } finally {
625 end();
626 }
627 }
628 }
629
630 };
631 }
632
633 @Override
634 public final Sparql sparql(final String expression, final Object... arguments)
635 throws IllegalStateException {
636 checkNotClosed();
637 return new Sparql(this.namespaces, expression, arguments) {
638
639 @Override
640 protected <T> Stream<T> doExec(@Nullable final Long timeout, final Class<T> type,
641 final String expression, @Nullable final Set<URI> defaultGraphs,
642 @Nullable final Set<URI> namedGraphs) throws OperationException {
643 synchronized (AbstractSession.this) {
644 checkNotClosed();
645 start("SPARQL", null, type, null, timeout);
646 try {
647 logRequest("from", defaultGraphs, "from-named", namedGraphs,
648 "timeout", timeout, null, expression);
649 return logResponse(filter(doSparql(timeout, type, expression,
650 defaultGraphs, namedGraphs)));
651 } catch (final Throwable ex) {
652 throw fail(ex);
653 } finally {
654 end();
655 }
656 }
657 }
658
659 };
660 }
661
662 @Override
663 public final SparqlUpdate sparqlupdate()
664 throws IllegalStateException {
665 checkNotClosed();
666 return new SparqlUpdate(this.namespaces) {
667 @Override
668 protected Outcome doExec(@Nullable Long timeout, @Nullable Stream<? extends Statement> statements) throws OperationException {
669 synchronized (AbstractSession.this) {
670 checkNotClosed();
671 start("SPARQLUPDATE", null, null, null, timeout);
672 try {
673 logRequest("timeout", timeout);
674 return logResponse(doSparqlUpdate(timeout, statements));
675 } catch (final Throwable ex) {
676 throw fail(ex);
677 } finally {
678 end();
679 }
680 }
681 }
682 };
683 }
684
685 @Override
686 public final SparqlDelete sparqldelete()
687 throws IllegalStateException {
688 checkNotClosed();
689 return new SparqlDelete(this.namespaces) {
690 @Override
691 protected Outcome doExec(@Nullable Long timeout, @Nullable Stream<? extends Statement> statements) throws OperationException {
692 synchronized (AbstractSession.this) {
693 checkNotClosed();
694 start("SPARQLDELETE", null, null, null, timeout);
695 try {
696 logRequest("timeout", timeout);
697 return logResponse(doSparqlDelete(timeout, statements));
698 } catch (final Throwable ex) {
699 throw fail(ex);
700 } finally {
701 end();
702 }
703 }
704 }
705 };
706 }
707
708 @Override
709 public final boolean isClosed() {
710 return this.closed.get();
711 }
712
713 @Override
714 public final void close() {
715 if (this.closed.compareAndSet(false, true)) {
716 synchronized (this.closed) {
717 if (this.interruptThread != null) {
718 this.interruptThread.interrupt();
719 try {
720 this.closed.wait(CLOSE_WAIT_TIME);
721
722 } catch (final InterruptedException ex) {
723
724 }
725 }
726 }
727 for (final Closeable closeable : this.pendingCloseables.keySet()) {
728 try {
729 closeable.close();
730 } catch (final Throwable ex) {
731 LOGGER.error("Error closing " + closeable.getClass().getSimpleName()
732 + " after session has been closed", ex);
733 }
734 }
735 doClose();
736 }
737 }
738
739 @Override
740 public String toString() {
741 return getClass().getSimpleName() + "[" + (isClosed() ? "closed" : "open") + ", "
742 + this.username + ", " + new Date(this.creationTime).toString() + ")";
743 }
744
745 protected final void checkNotClosed() {
746 if (this.closed.get()) {
747 throw new IllegalStateException("Session has been closed");
748 }
749 }
750
751 protected final URI getInvocationID() {
752 return this.invocationID;
753 }
754
755 protected final void setInvocationID(final URI invocationID) {
756 this.invocationID = invocationID;
757 MDC.put(MDC_ATTRIBUTE, invocationID == null ? null : invocationID.stringValue());
758 }
759
760 protected Status doFail(final Throwable ex, final AtomicReference<String> message)
761 throws Throwable {
762 return Status.ERROR_UNEXPECTED;
763 }
764
765 protected abstract Representation doDownload(@Nullable Long timeout, final URI id,
766 @Nullable final Set<String> mimeTypes, final boolean useCaches) throws Throwable;
767
768 protected abstract Outcome doUpload(@Nullable Long timeout, final URI resourceID,
769 @Nullable final Representation representation) throws Throwable;
770
771 protected abstract long doCount(@Nullable Long timeout, final URI type,
772 @Nullable final XPath condition, @Nullable final Set<URI> ids) throws Throwable;
773
774 protected abstract Stream<Record> doRetrieve(@Nullable Long timeout, final URI type,
775 @Nullable final XPath condition, @Nullable final Set<URI> ids,
776 @Nullable final Set<URI> properties, @Nullable Long offset, @Nullable Long limit)
777 throws Throwable;
778
779 protected abstract void doCreate(@Nullable Long timeout, final URI type,
780 @Nullable final Stream<? extends Record> records,
781 final Handler<? super Outcome> handler) throws Throwable;
782
783 protected abstract void doMerge(@Nullable Long timeout, final URI type,
784 @Nullable final Stream<? extends Record> records, @Nullable final Criteria criteria,
785 final Handler<? super Outcome> handler) throws Throwable;
786
787 protected abstract void doUpdate(@Nullable Long timeout, final URI type,
788 @Nullable final XPath condition, @Nullable final Set<URI> ids,
789 @Nullable final Record record, @Nullable final Criteria criteria,
790 final Handler<? super Outcome> handler) throws Throwable;
791
792 protected abstract void doDelete(@Nullable Long timeout, final URI type,
793 @Nullable final XPath condition, @Nullable final Set<URI> ids,
794 final Handler<? super Outcome> handler) throws Throwable;
795
796 protected abstract Stream<Record> doMatch(@Nullable Long timeout,
797 final Map<URI, XPath> conditions, final Map<URI, Set<URI>> ids,
798 final Map<URI, Set<URI>> properties) throws Throwable;
799
800 protected abstract <T> Stream<T> doSparql(@Nullable Long timeout, final Class<T> type,
801 final String expression, @Nullable final Set<URI> defaultGraphs,
802 @Nullable final Set<URI> namedGraphs) throws Throwable;
803
804 protected abstract Outcome doSparqlUpdate(@Nullable Long timeout,
805 @Nullable final Stream<? extends Statement> statements) throws Throwable;
806
807 protected abstract Outcome doSparqlDelete(@Nullable Long timeout,
808 @Nullable final Stream<? extends Statement> statements) throws Throwable;
809
810 protected void doClose() {
811 }
812
813 }