1   package eu.fbk.knowledgestore;
2   
3   import com.google.common.collect.Lists;
4   import eu.fbk.knowledgestore.Operation.*;
5   import eu.fbk.knowledgestore.Outcome.Status;
6   import eu.fbk.knowledgestore.data.*;
7   import eu.fbk.knowledgestore.vocabulary.NFO;
8   import eu.fbk.knowledgestore.vocabulary.NIE;
9   import org.openrdf.model.Statement;
10  import org.openrdf.model.URI;
11  import org.openrdf.query.BindingSet;
12  import org.slf4j.Logger;
13  import org.slf4j.LoggerFactory;
14  import org.slf4j.MDC;
15  
16  import javax.annotation.Nullable;
17  import java.io.Closeable;
18  import java.util.*;
19  import java.util.concurrent.ScheduledFuture;
20  import java.util.concurrent.TimeUnit;
21  import java.util.concurrent.atomic.AtomicBoolean;
22  import java.util.concurrent.atomic.AtomicReference;
23  
24  public abstract class AbstractSession implements Session {
25  
26      private static final Logger LOGGER = LoggerFactory.getLogger(AbstractSession.class);
27  
28      private static final String MDC_ATTRIBUTE = "context";
29  
30      private static final long GRACE_PERIOD = 5000; // 5 sec in addition to the timeout set
31  
32      private static final long CLOSE_WAIT_TIME = 1000;
33  
34      private static long invocationCounter = 0;
35  
36      // Session-scoped variables
37  
38      private final Map<String, String> namespaces;
39  
40      @Nullable
41      private final String username;
42  
43      @Nullable
44      private final String password;
45  
46      private final long creationTime;
47  
48      private final WeakHashMap<Closeable, ?> pendingCloseables;
49  
50      private final AtomicBoolean closed;
51  
52      // Request-scoped variables
53  
54      private long timestamp;
55  
56      @Nullable
57      private String operation;
58  
59      @Nullable
60      private URI recordType;
61  
62      @Nullable
63      private Class<?> streamType;
64  
65      @Nullable
66      private URI objectID;
67  
68      @Nullable
69      private URI invocationID;
70  
71      @Nullable
72      private String oldMDCAttribute;
73  
74      private long numCreated;
75  
76      private long numDeleted;
77  
78      private long numModified;
79  
80      private long numUnmodified;
81  
82      private final List<Outcome> failedOutcomes;
83  
84      @Nullable
85      private ScheduledFuture<?> interruptFuture;
86  
87      @Nullable
88      private Thread interruptThread;
89  
90      protected AbstractSession(@Nullable final Map<String, String> namespaces,
91              @Nullable final String username, @Nullable final String password) {
92          this.namespaces = namespaces != null ? namespaces : Data.newNamespaceMap();
93          this.username = username;
94          this.creationTime = System.currentTimeMillis();
95          this.password = password;
96          this.pendingCloseables = new WeakHashMap<Closeable, Object>();
97          this.closed = new AtomicBoolean(false);
98          this.failedOutcomes = Lists.newArrayList();
99      }
100 
101     @Nullable
102     private void start(final String operation, @Nullable final URI recordType,
103             @Nullable final Class<?> streamType, @Nullable final URI objectID, @Nullable final Long timeout) {
104 
105         this.timestamp = System.currentTimeMillis();
106         this.operation = operation;
107         this.recordType = recordType;
108         this.streamType = streamType;
109         this.objectID = objectID;
110         this.numCreated = 0L;
111         this.numDeleted = 0L;
112         this.numModified = 0L;
113         this.numUnmodified = 0L;
114         this.failedOutcomes.clear();
115         this.oldMDCAttribute = MDC.get(MDC_ATTRIBUTE);
116 
117         URI invocationID = null;
118         if (this.oldMDCAttribute != null) {
119             try {
120                 invocationID = Data.getValueFactory().createURI(this.oldMDCAttribute);
121             } catch (final Throwable ex) {
122                 // not valid
123             }
124         }
125         if (invocationID == null) {
126             final long ts = System.currentTimeMillis();
127             final long counter;
128             synchronized (AbstractSession.class) {
129                 ++AbstractSession.invocationCounter;
130                 if (AbstractSession.invocationCounter < ts) {
131                     AbstractSession.invocationCounter = ts;
132                 }
133                 counter = AbstractSession.invocationCounter;
134             }
135             invocationID = Data.getValueFactory().createURI("req:" + Long.toString(counter, 32));
136         }
137         setInvocationID(invocationID);
138 
139         if (timeout != null) {
140             final Thread thread = Thread.currentThread();
141             this.interruptFuture = Data.getExecutor().schedule(new Runnable() {
142 
143                 @Override
144                 public void run() {
145                     thread.interrupt();
146                 }
147 
148             }, timeout + GRACE_PERIOD, TimeUnit.MILLISECONDS);
149         }
150 
151         synchronized (this.closed) {
152             this.interruptThread = Thread.currentThread();
153         }
154     }
155 
156     private void end() throws OperationException {
157         try {
158             if (this.interruptFuture != null && !this.interruptFuture.isDone()) {
159                 if (!this.interruptFuture.cancel(false)) {
160                     try {
161                         this.interruptFuture.get(); // being interrupted; await interruption
162                     } catch (final Throwable ex) {
163                         // ignore
164                     }
165                 }
166             }
167             if (!this.failedOutcomes.isEmpty()) {
168                 throw fail(null); // thrown only if operation completed with some failed outcomes
169             }
170         } finally {
171             MDC.put(MDC_ATTRIBUTE, this.oldMDCAttribute);
172             this.interruptFuture = null;
173             this.oldMDCAttribute = null;
174             synchronized (this.closed) {
175                 this.interruptThread = null;
176                 Thread.interrupted(); // cancel interrupted status
177                 this.closed.notify(); // notify thread blocked on close(), if any
178             }
179         }
180     }
181 
182     private <T extends Closeable> T filter(@Nullable final T closeable) {
183         if (closeable != null) {
184             this.pendingCloseables.put(closeable, null);
185         }
186         return closeable;
187     }
188 
189     @Nullable
190     private <E> Stream<E> filter(@Nullable final Stream<E> stream) {
191         if (stream == null) {
192             return null;
193         }
194         Stream<E> result = stream;
195         if (this.interruptFuture != null) {
196             result = stream.setTimeout(System.currentTimeMillis()
197                     + this.interruptFuture.getDelay(TimeUnit.MILLISECONDS));
198         }
199         this.pendingCloseables.put(result, null);
200 
201         return result;
202     }
203 
204     private Handler<Outcome> filter(@Nullable final Handler<? super Outcome> handler) {
205         return new Handler<Outcome>() {
206 
207             private boolean log = true;
208 
209             @Override
210             public void handle(final Outcome outcome) throws Throwable {
211                 if (outcome != null) {
212                     final Status status = outcome.getStatus();
213                     if (status.isError()) {
214                         AbstractSession.this.failedOutcomes.add(outcome);
215                     } else if (status == Status.OK_CREATED) {
216                         ++AbstractSession.this.numCreated;
217                     } else if (status == Status.OK_DELETED) {
218                         ++AbstractSession.this.numDeleted;
219                     } else if (status == Status.OK_MODIFIED) {
220                         ++AbstractSession.this.numModified;
221                     } else {
222                         ++AbstractSession.this.numUnmodified;
223                     }
224                 }
225                 if (handler != null) {
226                     try {
227                         handler.handle(outcome);
228                     } catch (final Throwable ex) {
229                         if (this.log) {
230                             this.log = false;
231                             LOGGER.error("Handler failure: iteration being interrupted, "
232                                     + "no additional exception from handler will be logged", ex);
233                         }
234                         Thread.currentThread().interrupt();
235                     }
236                 }
237             }
238 
239         };
240     }
241 
242     private OperationException fail(@Nullable final Throwable ex) {
243         if (!this.failedOutcomes.isEmpty()) {
244             final int size = this.failedOutcomes.size();
245             final List<Throwable> causes = Lists.newArrayListWithCapacity(size);
246             for (final Outcome failedOutcome : this.failedOutcomes) {
247                 causes.add(new OperationException(failedOutcome));
248             }
249             this.failedOutcomes.clear();
250             if (ex != null) {
251                 causes.add(ex);
252             }
253             return new OperationException(outcome(Status.ERROR_BULK, null), causes);
254         } else if (ex instanceof OperationException) {
255             return (OperationException) ex;
256         } else if (ex != null) {
257             final AtomicReference<String> message = new AtomicReference<String>(ex.getMessage());
258             Status status = Status.ERROR_UNEXPECTED;
259             try {
260                 status = doFail(ex, message);
261             } catch (final Throwable ex2) {
262                 LOGGER.warn("Could not map " + ex.getClass().getSimpleName()
263                         + " to status / message", ex);
264             }
265             return new OperationException(outcome(status, message.get()), ex);
266         } else {
267             return new OperationException(outcome(Status.ERROR_UNEXPECTED, null));
268         }
269     }
270 
271     private Outcome outcome(final Status status, @Nullable final String message) {
272         final long elapsed = System.currentTimeMillis() - this.timestamp;
273         final StringBuilder builder = new StringBuilder();
274         final long numObjects = this.numCreated + this.numDeleted + this.numModified
275                 + this.numUnmodified;
276         if (numObjects > 0 || status == Status.OK_BULK || status == Status.ERROR_BULK) {
277             builder.append(numObjects).append(' ')
278                     .append(this.recordType.getLocalName().toLowerCase()).append("(s) involved");
279             if (this.numCreated > 0) {
280                 builder.append(", ").append(this.numCreated).append(" created");
281             }
282             if (this.numModified > 0) {
283                 builder.append(", ").append(this.numModified).append(" modified");
284             }
285             if (this.numUnmodified > 0) {
286                 builder.append(", ").append(this.numUnmodified).append(" unmodified");
287             }
288             if (this.numDeleted > 0) {
289                 builder.append(", ").append(this.numDeleted).append(" deleted");
290             }
291             if (!this.failedOutcomes.isEmpty()) {
292                 builder.append(", ").append(this.failedOutcomes.size()).append(" failed");
293             }
294             if (numObjects == 0) {
295                 if (message != null) {
296                     builder.append(", ");
297                 }
298             }
299         }
300         if (message != null) {
301             builder.append(message);
302         }
303         if (builder.length() > 0) {
304             builder.append(", ");
305         }
306         builder.append(elapsed).append(" ms");
307         return Outcome.create(status, this.invocationID, this.objectID, builder.toString());
308     }
309 
310     private void logRequest(final Object... args) {
311         if (LOGGER.isInfoEnabled()) {
312             final StringBuilder builder = new StringBuilder();
313             builder.append(this.operation.toUpperCase());
314             if (this.recordType != null) {
315                 builder.append(' ').append(this.recordType.getLocalName().toUpperCase());
316             }
317             String separator = " ";
318             for (int i = 0; i < args.length; i += 2) {
319                 final Object name = args[i];
320                 final Object value = args[i + 1];
321                 if (value != null) {
322                     builder.append(separator);
323                     if (name != null) {
324                         builder.append(name).append('=');
325                     }
326                     if (value instanceof Collection<?> && ((Collection<?>) value).size() > 10) {
327                         builder.append("[...").append(((Collection<?>) value).size())
328                                 .append(" elements...]");
329                     } else {
330                         builder.append(value);
331                     }
332                     separator = ", ";
333                 }
334             }
335             LOGGER.info(builder.toString());
336         }
337     }
338 
339     private <T> T logResponse(final T result) {
340         if (LOGGER.isInfoEnabled()) {
341             final long elapsed = System.currentTimeMillis() - this.timestamp;
342             if (result instanceof Outcome) {
343                 final Outcome outcome = (Outcome) result;
344                 final String message = outcome.getMessage();
345                 LOGGER.info("Result: {}{}, {} ms", outcome.getStatus(), message == null ? ""
346                         : ", " + message, elapsed);
347             } else if (result instanceof Long) {
348                 LOGGER.info("Result: {}, {} ms", result, elapsed);
349             } else if (result instanceof Representation) {
350                 final Record meta = ((Representation) result).getMetadata();
351                 final String file = meta.getUnique(NFO.FILE_NAME, String.class, "unnamed file");
352                 final String type = meta.getUnique(NIE.MIME_TYPE, String.class, "unknown type");
353                 final long size = meta.getUnique(NFO.FILE_SIZE, Long.class, -1L);
354                 LOGGER.info("Result: {}, {}, {}, {} ms", file, type, size >= 0 ? size + " bytes"
355                         : "unknown size", elapsed);
356             } else if (result instanceof Stream) {
357                 LOGGER.info("Result: {} stream, {} ms",
358                         this.streamType == BindingSet.class ? "tuple" : this.streamType
359                                 .getSimpleName().toLowerCase(), elapsed);
360             } else if (result == null) {
361                 LOGGER.info("Result: null, {} ms", elapsed);
362             }
363         }
364         return result;
365     }
366 
367     @Override
368     @Nullable
369     public final String getUsername() throws IllegalStateException {
370         checkNotClosed();
371         return this.username;
372     }
373 
374     @Override
375     public final String getPassword() throws IllegalStateException {
376         checkNotClosed();
377         return this.password;
378     }
379 
380     @Override
381     public final Map<String, String> getNamespaces() throws IllegalStateException {
382         checkNotClosed();
383         return this.namespaces;
384     }
385 
386     @Override
387     public final Download download(final URI resourceID) throws IllegalStateException {
388         checkNotClosed();
389         return new Download(this.namespaces, resourceID) {
390 
391             @Override
392             @Nullable
393             protected Representation doExec(@Nullable final Long timeout, final URI id,
394                     @Nullable final Set<String> mimeTypes, final boolean caching)
395                     throws OperationException {
396 
397                 synchronized (AbstractSession.this) {
398                     checkNotClosed();
399                     start("DOWNLOAD", null, null, id, timeout);
400                     try {
401                         logRequest(null, id, "accept", mimeTypes, //
402                                 null, caching ? null : "no caching", "timeout", timeout);
403                         return logResponse(filter(doDownload(timeout, id, mimeTypes, caching)));
404                     } catch (final Throwable ex) {
405                         throw fail(ex);
406                     } finally {
407                         end();
408                     }
409                 }
410 
411             }
412 
413         };
414     }
415 
416     @Override
417     public final Upload upload(final URI resourceID) throws IllegalStateException {
418         checkNotClosed();
419         return new Upload(this.namespaces, resourceID) {
420 
421             @Override
422             protected Outcome doExec(@Nullable final Long timeout, final URI id,
423                     @Nullable final Representation representation) throws OperationException {
424                 synchronized (AbstractSession.this) {
425                     checkNotClosed();
426                     start("UPLOAD", null, null, id, timeout);
427                     try {
428                         logRequest(null, id, "content", representation != null, "timeout", timeout);
429                         return logResponse(doUpload(timeout, resourceID, representation));
430                     } catch (final Throwable ex) {
431                         throw fail(ex);
432                     } finally {
433                         end();
434                     }
435                 }
436             }
437 
438         };
439     }
440 
441     @Override
442     public final Count count(final URI type) throws IllegalStateException {
443         checkNotClosed();
444         return new Count(this.namespaces, type) {
445 
446             @Override
447             protected long doExec(@Nullable final Long timeout, final URI type,
448                     @Nullable final XPath condition, @Nullable final Set<URI> ids)
449                     throws OperationException {
450                 synchronized (AbstractSession.this) {
451                     checkNotClosed();
452                     start("COUNT", type, null, null, timeout);
453                     try {
454                         logRequest(null, condition, "ids", ids, "timeout", timeout);
455                         return logResponse(doCount(timeout, type, condition, ids));
456                     } catch (final Throwable ex) {
457                         throw fail(ex);
458                     } finally {
459                         end();
460                     }
461                 }
462             }
463 
464         };
465     }
466 
467     @Override
468     public final Retrieve retrieve(final URI type) throws IllegalStateException {
469         checkNotClosed();
470         return new Retrieve(this.namespaces, type) {
471 
472             @Override
473             protected Stream<Record> doExec(@Nullable final Long timeout, final URI type,
474                     @Nullable final XPath condition, @Nullable final Set<URI> ids,
475                     @Nullable final Set<URI> properties, @Nullable final Long offset,
476                     @Nullable final Long limit) throws OperationException {
477                 synchronized (AbstractSession.this) {
478                     checkNotClosed();
479                     start("RETRIEVE", type, Record.class, null, timeout);
480                     try {
481                         logRequest(null, condition, "ids", ids, "props", properties, //
482                                 "offset", offset, "limit", limit, "timeout", timeout);
483                         return logResponse(doRetrieve(timeout, type, condition, ids, properties,
484                                 offset, limit));
485                     } catch (final Throwable ex) {
486                         throw fail(ex);
487                     } finally {
488                         end();
489                     }
490                 }
491             }
492 
493         };
494     }
495 
496     @Override
497     public final Create create(final URI type) throws IllegalStateException {
498         checkNotClosed();
499         return new Create(this.namespaces, type) {
500 
501             @Override
502             protected Outcome doExec(@Nullable final Long timeout, final URI type,
503                     @Nullable final Stream<? extends Record> records,
504                     final Handler<? super Outcome> handler) throws OperationException {
505                 synchronized (AbstractSession.this) {
506                     checkNotClosed();
507                     start("CREATE", type, null, null, timeout);
508                     try {
509                         logRequest("timeout", timeout);
510                         doCreate(timeout, type, records, filter(handler));
511                         return logResponse(outcome(Status.OK_BULK, null));
512                     } catch (final Throwable ex) {
513                         throw fail(ex);
514                     } finally {
515                         end();
516                     }
517                 }
518             }
519 
520         };
521     }
522 
523     @Override
524     public final Merge merge(final URI type) throws IllegalStateException {
525         checkNotClosed();
526         return new Merge(this.namespaces, type) {
527 
528             @Override
529             protected Outcome doExec(@Nullable final Long timeout, final URI type,
530                     @Nullable final Stream<? extends Record> records,
531                     @Nullable final Criteria criteria, final Handler<? super Outcome> handler)
532                     throws OperationException {
533                 synchronized (AbstractSession.this) {
534                     checkNotClosed();
535                     start("MERGE", type, null, null, timeout);
536                     try {
537                         logRequest(null, criteria, "timeout", timeout);
538                         doMerge(timeout, type, records, criteria, filter(handler));
539                         return logResponse(outcome(Status.OK_BULK, null));
540                     } catch (final Throwable ex) {
541                         throw fail(ex);
542                     } finally {
543                         end();
544                     }
545                 }
546             }
547 
548         };
549     }
550 
551     @Override
552     public final Update update(final URI type) throws IllegalStateException {
553         checkNotClosed();
554         return new Update(this.namespaces, type) {
555 
556             @Override
557             protected Outcome doExec(@Nullable final Long timeout, final URI type,
558                     @Nullable final XPath condition, @Nullable final Set<URI> ids,
559                     @Nullable final Record record, @Nullable final Criteria criteria,
560                     final Handler<? super Outcome> handler) throws OperationException {
561                 synchronized (AbstractSession.this) {
562                     checkNotClosed();
563                     start("UPDATE", type, null, null, timeout);
564                     try {
565                         logRequest(null, criteria, null, condition, "ids", ids, "timeout", timeout);
566                         doUpdate(timeout, type, condition, ids, record, criteria, filter(handler));
567                         return logResponse(outcome(Status.OK_BULK, null));
568                     } catch (final Throwable ex) {
569                         throw fail(ex);
570                     } finally {
571                         end();
572                     }
573                 }
574             }
575 
576         };
577     }
578 
579     @Override
580     public final Delete delete(final URI type) throws IllegalStateException {
581         checkNotClosed();
582         return new Delete(this.namespaces, type) {
583 
584             @Override
585             protected Outcome doExec(@Nullable final Long timeout, final URI type,
586                     @Nullable final XPath condition, @Nullable final Set<URI> ids,
587                     final Handler<? super Outcome> handler) throws OperationException {
588                 synchronized (AbstractSession.this) {
589                     checkNotClosed();
590                     start("DELETE", type, null, null, timeout);
591                     try {
592                         logRequest(null, condition, "ids", ids, "timeout", timeout);
593                         doDelete(timeout, type, condition, ids, filter(handler));
594                         return logResponse(outcome(Status.OK_BULK, null));
595                     } catch (final Throwable ex) {
596                         throw fail(ex);
597                     } finally {
598                         end();
599                     }
600                 }
601             }
602 
603         };
604     }
605 
606     @Override
607     public final Match match() throws IllegalStateException {
608         checkNotClosed();
609         return new Match(this.namespaces) {
610 
611             @Override
612             protected Stream<Record> doExec(@Nullable final Long timeout,
613                     final Map<URI, XPath> conditions, final Map<URI, Set<URI>> ids,
614                     final Map<URI, Set<URI>> properties) throws OperationException {
615                 synchronized (AbstractSession.this) {
616                     checkNotClosed();
617                     start("MATCH", null, Record.class, null, timeout);
618                     try {
619                         logRequest(null, conditions, "ids", ids, "props", //
620                                 properties, "timeout", timeout);
621                         return logResponse(filter(doMatch(timeout, conditions, ids, properties)));
622                     } catch (final Throwable ex) {
623                         throw fail(ex);
624                     } finally {
625                         end();
626                     }
627                 }
628             }
629 
630         };
631     }
632 
633     @Override
634     public final Sparql sparql(final String expression, final Object... arguments)
635             throws IllegalStateException {
636         checkNotClosed();
637         return new Sparql(this.namespaces, expression, arguments) {
638 
639             @Override
640             protected <T> Stream<T> doExec(@Nullable final Long timeout, final Class<T> type,
641                     final String expression, @Nullable final Set<URI> defaultGraphs,
642                     @Nullable final Set<URI> namedGraphs) throws OperationException {
643                 synchronized (AbstractSession.this) {
644                     checkNotClosed();
645                     start("SPARQL", null, type, null, timeout);
646                     try {
647                         logRequest("from", defaultGraphs, "from-named", namedGraphs, //
648                                 "timeout", timeout, null, expression);
649                         return logResponse(filter(doSparql(timeout, type, expression,
650                                 defaultGraphs, namedGraphs)));
651                     } catch (final Throwable ex) {
652                         throw fail(ex);
653                     } finally {
654                         end();
655                     }
656                 }
657             }
658 
659         };
660     }
661 
662 	@Override
663 	public final SparqlUpdate sparqlupdate()
664 			throws IllegalStateException {
665 		checkNotClosed();
666 		return new SparqlUpdate(this.namespaces) {
667 			@Override
668 			protected Outcome doExec(@Nullable Long timeout, @Nullable Stream<? extends Statement> statements) throws OperationException {
669 				synchronized (AbstractSession.this) {
670 					checkNotClosed();
671 					start("SPARQLUPDATE", null, null, null, timeout);
672 					try {
673 						logRequest("timeout", timeout);
674 						return logResponse(doSparqlUpdate(timeout, statements));
675 					} catch (final Throwable ex) {
676 						throw fail(ex);
677 					} finally {
678 						end();
679 					}
680 				}
681 			}
682 		};
683 	}
684 
685     @Override
686     public final SparqlDelete sparqldelete()
687             throws IllegalStateException {
688         checkNotClosed();
689         return new SparqlDelete(this.namespaces) {
690             @Override
691             protected Outcome doExec(@Nullable Long timeout, @Nullable Stream<? extends Statement> statements) throws OperationException {
692                 synchronized (AbstractSession.this) {
693                     checkNotClosed();
694                     start("SPARQLDELETE", null, null, null, timeout);
695                     try {
696                         logRequest("timeout", timeout);
697                         return logResponse(doSparqlDelete(timeout, statements));
698                     } catch (final Throwable ex) {
699                         throw fail(ex);
700                     } finally {
701                         end();
702                     }
703                 }
704             }
705         };
706     }
707 
708     @Override
709     public final boolean isClosed() {
710         return this.closed.get();
711     }
712 
713     @Override
714     public final void close() {
715         if (this.closed.compareAndSet(false, true)) {
716             synchronized (this.closed) {
717                 if (this.interruptThread != null) {
718                     this.interruptThread.interrupt(); // attempt interrupting current operation
719                     try {
720                         this.closed.wait(CLOSE_WAIT_TIME);
721 
722                     } catch (final InterruptedException ex) {
723                         // ignore and proceed with closing resources
724                     }
725                 }
726             }
727             for (final Closeable closeable : this.pendingCloseables.keySet()) {
728                 try {
729                     closeable.close();
730                 } catch (final Throwable ex) {
731                     LOGGER.error("Error closing " + closeable.getClass().getSimpleName()
732                             + " after session has been closed", ex);
733                 }
734             }
735             doClose(); // custom close actions
736         }
737     }
738 
739     @Override
740     public String toString() {
741         return getClass().getSimpleName() + "[" + (isClosed() ? "closed" : "open") + ", "
742                 + this.username + ", " + new Date(this.creationTime).toString() + ")";
743     }
744 
745     protected final void checkNotClosed() {
746         if (this.closed.get()) {
747             throw new IllegalStateException("Session has been closed");
748         }
749     }
750 
751     protected final URI getInvocationID() {
752         return this.invocationID;
753     }
754 
755     protected final void setInvocationID(final URI invocationID) {
756         this.invocationID = invocationID;
757         MDC.put(MDC_ATTRIBUTE, invocationID == null ? null : invocationID.stringValue());
758     }
759 
760     protected Status doFail(final Throwable ex, final AtomicReference<String> message)
761             throws Throwable {
762         return Status.ERROR_UNEXPECTED;
763     }
764 
765     protected abstract Representation doDownload(@Nullable Long timeout, final URI id,
766             @Nullable final Set<String> mimeTypes, final boolean useCaches) throws Throwable;
767 
768     protected abstract Outcome doUpload(@Nullable Long timeout, final URI resourceID,
769             @Nullable final Representation representation) throws Throwable;
770 
771     protected abstract long doCount(@Nullable Long timeout, final URI type,
772             @Nullable final XPath condition, @Nullable final Set<URI> ids) throws Throwable;
773 
774     protected abstract Stream<Record> doRetrieve(@Nullable Long timeout, final URI type,
775             @Nullable final XPath condition, @Nullable final Set<URI> ids,
776             @Nullable final Set<URI> properties, @Nullable Long offset, @Nullable Long limit)
777             throws Throwable;
778 
779     protected abstract void doCreate(@Nullable Long timeout, final URI type,
780             @Nullable final Stream<? extends Record> records,
781             final Handler<? super Outcome> handler) throws Throwable;
782 
783     protected abstract void doMerge(@Nullable Long timeout, final URI type,
784             @Nullable final Stream<? extends Record> records, @Nullable final Criteria criteria,
785             final Handler<? super Outcome> handler) throws Throwable;
786 
787     protected abstract void doUpdate(@Nullable Long timeout, final URI type,
788             @Nullable final XPath condition, @Nullable final Set<URI> ids,
789             @Nullable final Record record, @Nullable final Criteria criteria,
790             final Handler<? super Outcome> handler) throws Throwable;
791 
792     protected abstract void doDelete(@Nullable Long timeout, final URI type,
793             @Nullable final XPath condition, @Nullable final Set<URI> ids,
794             final Handler<? super Outcome> handler) throws Throwable;
795 
796     protected abstract Stream<Record> doMatch(@Nullable Long timeout,
797             final Map<URI, XPath> conditions, final Map<URI, Set<URI>> ids,
798             final Map<URI, Set<URI>> properties) throws Throwable;
799 
800     protected abstract <T> Stream<T> doSparql(@Nullable Long timeout, final Class<T> type,
801             final String expression, @Nullable final Set<URI> defaultGraphs,
802             @Nullable final Set<URI> namedGraphs) throws Throwable;
803 
804     protected abstract Outcome doSparqlUpdate(@Nullable Long timeout,
805 													@Nullable final Stream<? extends Statement> statements) throws Throwable;
806 
807     protected abstract Outcome doSparqlDelete(@Nullable Long timeout,
808                                               @Nullable final Stream<? extends Statement> statements) throws Throwable;
809 
810     protected void doClose() {
811     }
812 
813 }