1   package eu.fbk.knowledgestore.data;
2   
3   import java.io.Closeable;
4   import java.io.IOException;
5   import java.lang.reflect.Array;
6   import java.lang.reflect.Method;
7   import java.util.Arrays;
8   import java.util.Collection;
9   import java.util.Collections;
10  import java.util.Comparator;
11  import java.util.Enumeration;
12  import java.util.Iterator;
13  import java.util.List;
14  import java.util.Map;
15  import java.util.NoSuchElementException;
16  import java.util.Set;
17  import java.util.SortedSet;
18  import java.util.concurrent.ArrayBlockingQueue;
19  import java.util.concurrent.BlockingQueue;
20  import java.util.concurrent.Callable;
21  import java.util.concurrent.Future;
22  import java.util.concurrent.ScheduledFuture;
23  import java.util.concurrent.TimeUnit;
24  import java.util.concurrent.atomic.AtomicBoolean;
25  import java.util.concurrent.atomic.AtomicLong;
26  import java.util.concurrent.atomic.AtomicReference;
27  
28  import javax.annotation.Nullable;
29  
30  import com.google.common.base.Function;
31  import com.google.common.base.Functions;
32  import com.google.common.base.Preconditions;
33  import com.google.common.base.Predicate;
34  import com.google.common.base.Throwables;
35  import com.google.common.collect.ArrayListMultimap;
36  import com.google.common.collect.ImmutableCollection;
37  import com.google.common.collect.ImmutableList;
38  import com.google.common.collect.ImmutableListMultimap;
39  import com.google.common.collect.ImmutableMap;
40  import com.google.common.collect.ImmutableSet;
41  import com.google.common.collect.ImmutableSortedSet;
42  import com.google.common.collect.Iterables;
43  import com.google.common.collect.Iterators;
44  import com.google.common.collect.ListMultimap;
45  import com.google.common.collect.Lists;
46  import com.google.common.collect.Maps;
47  import com.google.common.collect.Multimap;
48  import com.google.common.collect.Ordering;
49  import com.google.common.collect.Sets;
50  import com.google.common.collect.UnmodifiableIterator;
51  import com.google.common.util.concurrent.Futures;
52  
53  import org.openrdf.model.URI;
54  import org.openrdf.query.BindingSet;
55  import org.slf4j.Logger;
56  import org.slf4j.LoggerFactory;
57  
58  import info.aduna.iteration.CloseableIteration;
59  import info.aduna.iteration.Iteration;
60  
61  import eu.fbk.knowledgestore.internal.Util;
62  
63  
64  
65  
66  
67  
68  
69  
70  
71  
72  
73  
74  
75  
76  
77  
78  
79  
80  
81  
82  
83  
84  
85  
86  
87  
88  
89  
90  
91  
92  
93  
94  
95  
96  
97  
98  
99  
100 
101 
102 
103 
104 
105 
106 
107 
108 
109 
110 
111 
112 
113 
114 
115 
116 
117 
118 
119 
120 
121 
122 
123 
124 
125 
126 
127 
128 
129 
130 
131 
132 
133 
134 
135 
136 
137 
138 
139 
140 
141 
142 
143 
144 
145 
146 
147 
148 
149 
150 
151 
152 
153 
154 public abstract class Stream<T> implements Iterable<T>, Closeable {
155 
156     private static final Logger LOGGER = LoggerFactory.getLogger(Stream.class);
157 
158     private static final Object EOF = new Object();
159 
160     final State state;
161 
162     
163 
164 
165     protected Stream() {
166         this(new State());
167     }
168 
169     Stream(final State state) {
170         this.state = state;
171         state.closeObjects.add(this);
172     }
173 
174     
175 
176 
177 
178 
179 
180 
181 
182 
183 
184 
185     public static <T> Stream<T> create(@SuppressWarnings("unchecked") final T... elements) {
186         if (elements.length == 0) {
187             return new EmptyStream<T>();
188         } else if (elements.length == 1) {
189             return new SingletonStream<T>(elements[0]);
190         } else {
191             return new IteratorStream<T>(Iterators.forArray(elements));
192         }
193     }
194 
195     
196 
197 
198 
199 
200 
201 
202 
203 
204 
205 
206 
207     @SuppressWarnings("unchecked")
208     public static <T> Stream<T> create(final Iterable<? extends T> iterable) {
209         if (iterable instanceof Stream) {
210             return (Stream<T>) iterable;
211         } else if (iterable instanceof ImmutableCollection<?>
212                 && ((ImmutableCollection<? extends T>) iterable).isEmpty()) {
213             return new EmptyStream<T>();
214         } else {
215             return new IterableStream<T>(iterable);
216         }
217     }
218 
219     
220 
221 
222 
223 
224 
225 
226 
227 
228     public static <T> Stream<T> create(final Iterator<? extends T> iterator) {
229         if (iterator.hasNext()) {
230             return new IteratorStream<T>(iterator);
231         } else {
232             return new EmptyStream<T>();
233         }
234     }
235 
236     
237 
238 
239 
240 
241 
242 
243 
244 
245     public static <T> Stream<T> create(final Iteration<? extends T, ?> iteration) {
246         return new IterationStream<T>(iteration);
247     }
248 
249     
250 
251 
252 
253 
254 
255 
256 
257 
258     public static <T> Stream<T> create(final Enumeration<? extends T> enumeration) {
259         if (enumeration.hasMoreElements()) {
260             return new IteratorStream<T>(Iterators.forEnumeration(enumeration));
261         } else {
262             return new EmptyStream<T>();
263         }
264     }
265 
266     
267 
268 
269 
270 
271 
272 
273 
274 
275 
276     public static <T> Stream<T> concat(final Iterable<? extends Iterable<? extends T>> iterables) {
277         return new ConcatStream<Iterable<? extends T>, T>(create(iterables));
278     }
279 
280     
281 
282 
283 
284 
285 
286 
287 
288 
289 
290     @SafeVarargs
291     public static <T> Stream<T> concat(final Iterable<? extends T>... iterables) {
292         return new ConcatStream<Iterable<? extends T>, T>(create(iterables));
293     }
294 
295     
296 
297 
298 
299 
300 
301 
302 
303 
304 
305 
306 
307 
308     public final Stream<T> filter(final Predicate<? super T> predicate, final int parallelism) {
309         synchronized (this.state) {
310             checkState();
311             return new FilterStream<T>(this, parallelism, predicate);
312         }
313     }
314 
315     
316 
317 
318 
319 
320 
321 
322 
323 
324 
325 
326 
327 
328 
329 
330 
331 
332 
333 
334     public final <R> Stream<R> transform(final Function<? super T, ? extends R> function,
335             final int parallelism) {
336         synchronized (this.state) {
337             checkState();
338             return new TransformElementStream<T, R>(this, parallelism, function);
339         }
340     }
341 
342     
343 
344 
345 
346 
347 
348 
349 
350 
351 
352 
353 
354 
355 
356 
357 
358     public final <R> Stream<R> transform(
359             @Nullable final Function<Iterator<T>, Iterator<R>> iteratorFunction,
360             @Nullable final Function<Handler<R>, Handler<T>> handlerFunction) {
361         synchronized (this.state) {
362             checkState();
363             return new TransformSequenceStream<T, R>(this, iteratorFunction, handlerFunction);
364         }
365     }
366 
367     
368 
369 
370 
371 
372 
373 
374 
375 
376 
377 
378 
379 
380 
381 
382 
383 
384 
385 
386 
387 
388 
389 
390 
391 
392     public final <R> Stream<R> transform(final Class<R> type, final boolean lenient,
393             final Object... path) {
394         synchronized (this.state) {
395             checkState();
396             return concat(new TransformPathStream<T, R>(this, type, lenient, path));
397         }
398     }
399 
400     
401 
402 
403 
404 
405 
406 
407 
408     public final Stream<T> distinct() {
409         synchronized (this.state) {
410             checkState();
411             return this instanceof DistinctStream<?> ? this : new DistinctStream<T>(this);
412         }
413     }
414 
415     
416 
417 
418 
419 
420 
421 
422 
423 
424 
425 
426     public final Stream<T> slice(final long offset, final long limit) {
427         synchronized (this.state) {
428             checkState();
429             return new SliceStream<T>(this, offset, limit);
430         }
431     }
432 
433     
434 
435 
436 
437 
438 
439 
440 
441     public final Stream<List<T>> chunk(final int chunkSize) {
442         synchronized (this.state) {
443             checkState();
444             return new ChunkStream<T>(this, chunkSize);
445         }
446     }
447 
448     
449 
450 
451 
452 
453 
454 
455 
456 
457 
458 
459 
460 
461 
462     public final Stream<T> track(@Nullable final AtomicLong counter,
463             @Nullable final AtomicBoolean eof) {
464         synchronized (this.state) {
465             checkState();
466             return new TrackStream<T>(this, counter, eof);
467         }
468     }
469 
470     
471 
472 
473 
474 
475 
476 
477     public final long count() {
478         final AtomicLong result = new AtomicLong();
479         toHandler(new Handler<T>() {
480 
481             private long count;
482 
483             @Override
484             public void handle(final T element) {
485                 if (element != null) {
486                     ++this.count;
487                 } else {
488                     result.set(this.count);
489                 }
490 
491             }
492 
493         });
494         return result.get();
495     }
496 
497     
498 
499 
500     @Override
501     public final Iterator<T> iterator() {
502         synchronized (this.state) {
503             checkState();
504             this.state.available = false;
505             final Iterator<T> iterator;
506             try {
507                 iterator = new CheckedIterator<T>(doIterator(), this);
508             } catch (final Throwable ex) {
509                 throw Throwables.propagate(ex);
510             }
511             this.state.activeIterator = iterator;
512             return iterator;
513         }
514     }
515 
516     
517 
518 
519 
520 
521 
522 
523 
524     public final void toHandler(final Handler<? super T> handler) {
525         Preconditions.checkNotNull(handler);
526         synchronized (this.state) {
527             checkState();
528             this.state.available = false;
529             this.state.toHandlerThread = Thread.currentThread();
530         }
531         try {
532             try {
533                 doToHandler(handler);
534             } catch (final Throwable ex) {
535                 Throwables.propagate(ex);
536             }
537         } finally {
538             synchronized (this.state) {
539                 if (this.state.closed) {
540                     checkState(); 
541                 }
542                 this.state.toHandlerThread = null; 
543                 Thread.interrupted(); 
544                 close(); 
545             }
546         }
547     }
548 
549     
550 
551 
552 
553 
554 
555 
556 
557     public final T[] toArray(final Class<T> elementClass) {
558         return Iterables.toArray(toCollection(Lists.<T>newArrayListWithCapacity(256)),
559                 elementClass);
560     }
561 
562     
563 
564 
565 
566 
567 
568     public final List<T> toList() {
569         return ImmutableList.copyOf(toCollection(Lists.<T>newArrayListWithCapacity(256)));
570     }
571 
572     
573 
574 
575 
576 
577 
578 
579     public final Set<T> toSet() {
580         return ImmutableSet.copyOf(toCollection(Lists.<T>newArrayListWithCapacity(256)));
581     }
582 
583     
584 
585 
586 
587 
588 
589 
590 
591 
592 
593     public final List<T> toSortedList(final Comparator<? super T> comparator) {
594         return Ordering.from(comparator).immutableSortedCopy(
595                 toCollection(Lists.<T>newArrayListWithCapacity(256)));
596     }
597 
598     
599 
600 
601 
602 
603 
604 
605 
606 
607 
608 
609 
610 
611 
612 
613 
614 
615 
616 
617 
618     public final List<T> toSortedList(final Class<? extends Comparable<?>> type,
619             final boolean lenient, final Object... path) {
620         return Ordering.from(new PathComparator(type, lenient, path)).immutableSortedCopy(
621                 toCollection(Lists.<T>newArrayListWithCapacity(256)));
622     }
623 
624     
625 
626 
627 
628 
629 
630 
631 
632 
633 
634     public final SortedSet<T> toSortedSet(final Comparator<? super T> comparator) {
635         return ImmutableSortedSet.copyOf(comparator,
636                 toCollection(Lists.<T>newArrayListWithCapacity(256)));
637     }
638 
639     
640 
641 
642 
643 
644 
645 
646 
647 
648 
649     public final <C extends Collection<? super T>> C toCollection(final C collection) {
650         Preconditions.checkNotNull(collection);
651         toHandler(new Handler<T>() {
652 
653             @Override
654             public void handle(final T element) {
655                 if (element != null) {
656                     collection.add(element);
657                 }
658             }
659         });
660         return collection;
661     }
662 
663     
664 
665 
666 
667 
668 
669 
670 
671 
672 
673 
674 
675 
676 
677 
678 
679 
680 
681 
682     public final <K, V> Map<K, V> toMap(final Function<? super T, ? extends K> keyFunction,
683             final Function<? super T, ? extends V> valueFunction) {
684         return ImmutableMap
685                 .copyOf(toMap(keyFunction, valueFunction, Maps.<K, V>newLinkedHashMap()));
686     }
687 
688     
689 
690 
691 
692 
693 
694 
695 
696 
697 
698 
699 
700 
701 
702 
703 
704 
705 
706 
707 
708 
709 
710 
711     public final <K, V, M extends Map<K, V>> M toMap(
712             final Function<? super T, ? extends K> keyFunction,
713             final Function<? super T, ? extends V> valueFunction, final M map) {
714         Preconditions.checkNotNull(keyFunction);
715         Preconditions.checkNotNull(valueFunction);
716         Preconditions.checkNotNull(map);
717         toHandler(new Handler<T>() {
718 
719             @Override
720             public void handle(final T element) {
721                 if (element != null) {
722                     final K key = keyFunction.apply(element);
723                     final V value = valueFunction.apply(element);
724                     if (key != null && value != null) {
725                         map.put(key, value);
726                     }
727                 }
728             }
729 
730         });
731         return map;
732     }
733 
734     
735 
736 
737 
738 
739 
740 
741 
742 
743 
744 
745 
746 
747 
748 
749 
750 
751     public final <K, V> ListMultimap<K, V> toMultimap(
752             final Function<? super T, ? extends K> keyFunction,
753             final Function<? super T, ? extends V> valueFunction) {
754         return ImmutableListMultimap.copyOf(toMultimap(keyFunction, valueFunction,
755                 ArrayListMultimap.<K, V>create()));
756     }
757 
758     
759 
760 
761 
762 
763 
764 
765 
766 
767 
768 
769 
770 
771 
772 
773 
774 
775 
776 
777 
778 
779     public final <K, V, M extends Multimap<K, V>> M toMultimap(
780             final Function<? super T, ? extends K> keyFunction,
781             final Function<? super T, ? extends V> valueFunction, final M multimap) {
782         Preconditions.checkNotNull(keyFunction);
783         Preconditions.checkNotNull(valueFunction);
784         Preconditions.checkNotNull(multimap);
785         toHandler(new Handler<T>() {
786 
787             @Override
788             public void handle(final T element) {
789                 if (element != null) {
790                     final K key = keyFunction.apply(element);
791                     final V value = valueFunction.apply(element);
792                     if (key != null && value != null) {
793                         multimap.put(key, value);
794                     }
795                 }
796             }
797 
798         });
799         return multimap;
800     }
801 
802     
803 
804 
805 
806 
807 
808 
809 
810 
811     public final T getUnique(final T defaultValue) {
812         try {
813             final T result = getUnique();
814             if (result != null) {
815                 return result;
816             }
817         } catch (final Throwable ex) {
818             
819         }
820         return defaultValue;
821     }
822 
823     
824 
825 
826 
827 
828 
829 
830 
831 
832 
833     @SuppressWarnings("unchecked")
834     @Nullable
835     public final T getUnique() throws IllegalStateException {
836         final AtomicReference<Object> holder = new AtomicReference<Object>();
837         toHandler(new Handler<T>() {
838 
839             @Override
840             public void handle(final T element) {
841                 if (element != null) {
842                     if (holder.get() == null) {
843                         holder.set(element);
844                     } else {
845                         holder.set(EOF);
846                         Thread.currentThread().interrupt(); 
847                     }
848                 }
849             }
850 
851         });
852         final Object result = holder.get();
853         if (result != EOF) {
854             return (T) result;
855         }
856         throw new IllegalStateException("Stream " + this + " returned more than one element");
857     }
858 
859     
860 
861 
862 
863 
864 
865 
866 
867 
868 
869 
870 
871 
872 
873     public final <V> V getProperty(final String name, final Class<V> type) {
874         Preconditions.checkNotNull(name);
875         try {
876             Object value = null;
877             synchronized (this.state) {
878                 if (this.state.properties != null) {
879                     value = this.state.properties.get(name);
880                 }
881             }
882             return Data.convert(value, type);
883         } catch (final Throwable ex) {
884             throw Throwables.propagate(ex);
885         }
886     }
887 
888     
889 
890 
891 
892 
893 
894 
895 
896 
897 
898 
899     public final Stream<T> setProperty(final String name, @Nullable final Object value) {
900         Preconditions.checkNotNull(name);
901         synchronized (this.state) {
902             if (this.state.properties != null) {
903                 this.state.properties.put(name, value);
904             } else if (value != null) {
905                 this.state.properties = Maps.newHashMap();
906                 this.state.properties.put(name, value);
907             }
908         }
909         return this;
910     }
911 
912     
913 
914 
915 
916 
917 
918 
919     public final Long getTimeout() {
920         synchronized (this.state) {
921             return this.state.timeoutFuture == null ? null : this.state.timeoutFuture
922                     .getDelay(TimeUnit.MILLISECONDS) + System.currentTimeMillis();
923         }
924     }
925 
926     
927 
928 
929 
930 
931 
932 
933 
934 
935 
936     public final Stream<T> setTimeout(@Nullable final Long timestamp) {
937         Preconditions.checkArgument(timestamp == null || timestamp > System.currentTimeMillis());
938         synchronized (this.state) {
939             if (this.state.closed) {
940                 return this; 
941             }
942             if (this.state.timeoutFuture != null) {
943                 if (!this.state.timeoutFuture.cancel(false)) {
944                     return this; 
945                 }
946             }
947             if (timestamp != null) {
948                 this.state.timeoutFuture = Data.getExecutor().schedule(new Runnable() {
949 
950                     @Override
951                     public void run() {
952                         close();
953                     }
954 
955                 }, Math.max(0, timestamp - System.currentTimeMillis()), TimeUnit.MILLISECONDS);
956             }
957             return this;
958         }
959     }
960 
961     
962 
963 
964 
965 
966 
967     public final boolean isAvailable() {
968         synchronized (this.state) {
969             return this.state.available;
970         }
971     }
972 
973     
974 
975 
976 
977 
978 
979     public final boolean isClosed() {
980         synchronized (this.state) {
981             return this.state.closed;
982         }
983     }
984 
985     
986 
987 
988 
989 
990 
991 
992 
993 
994 
995 
996 
997 
998     public final Stream<T> onClose(final Object... objects) {
999         synchronized (this.state) {
1000             for (final Object object : objects) {
1001                 if (!(object instanceof Closeable) && !(object instanceof Runnable)
1002                         && !(object instanceof Callable)) {
1003                     throw new IllegalArgumentException("Illegal object: " + object);
1004                 } else if (this.state.closed) {
1005                     closeAction(object);
1006                 } else {
1007                     boolean alreadyContained = false;
1008                     for (final Object o : this.state.closeObjects) {
1009                         if (o == object) {
1010                             alreadyContained = true;
1011                             break;
1012                         }
1013                     }
1014                     if (!alreadyContained) {
1015                         this.state.closeObjects.add(object);
1016                     }
1017                 }
1018             }
1019         }
1020         return this;
1021     }
1022 
1023     
1024 
1025 
1026 
1027 
1028 
1029 
1030     @Override
1031     public final void close() {
1032         synchronized (this.state) {
1033             if (this.state.closed) {
1034                 return;
1035             }
1036             if (this.state.activeIterator instanceof Closeable) {
1037                 Util.closeQuietly(this.state.activeIterator);
1038             }
1039             if (this.state.toHandlerThread != null) {
1040                 this.state.toHandlerThread.interrupt();
1041             }
1042             for (final Object object : this.state.closeObjects) {
1043                 closeAction(object);
1044             }
1045             this.state.activeIterator = null;
1046             this.state.toHandlerThread = null;
1047             this.state.available = false;
1048             this.state.closed = true;
1049         }
1050     }
1051 
1052     
1053 
1054 
1055 
1056 
1057 
1058 
1059     @Override
1060     public final String toString() {
1061         final StringBuilder builder = new StringBuilder();
1062         toStringHelper(builder);
1063         return builder.toString();
1064     }
1065 
1066     void toStringHelper(final StringBuilder builder) {
1067         String name = getClass().getSimpleName();
1068         if (name == null) {
1069             final Method method = getClass().getEnclosingMethod();
1070             if (method != null) {
1071                 name = method.getDeclaringClass().getSimpleName() + "." + method.getName()
1072                         + "-Stream";
1073             } else {
1074                 name = "anon-Stream";
1075             }
1076         }
1077         builder.append(name);
1078         final String args = doToString();
1079         if (args != null) {
1080             builder.append("<").append(args).append(">");
1081         }
1082     }
1083 
1084     final void checkState() {
1085         synchronized (this.state) {
1086             if (this.state.closed) {
1087                 throw new IllegalStateException("Stream already closed: " + this);
1088             } else if (!this.state.available) {
1089                 throw new IllegalStateException("Stream already being iterated: " + this);
1090             }
1091         }
1092     }
1093 
1094     final void closeAction(final Object object) {
1095         try {
1096             if (object instanceof Stream<?>) {
1097                 ((Stream<?>) object).doClose();
1098             } else if (object instanceof Closeable) {
1099                 ((Closeable) object).close();
1100             } else if (object instanceof Runnable) {
1101                 ((Runnable) object).run();
1102             } else if (object instanceof Callable<?>) {
1103                 ((Callable<?>) object).call();
1104             }
1105         } catch (final Throwable ex) {
1106             LOGGER.error("Error performing close action on " + object, ex);
1107         }
1108     }
1109 
1110     
1111 
1112 
1113 
1114 
1115 
1116 
1117 
1118 
1119 
1120     protected Iterator<T> doIterator() throws Throwable {
1121         final ToHandlerIterator<T> iterator = new ToHandlerIterator<T>(this);
1122         iterator.submit();
1123         return iterator;
1124     }
1125 
1126     
1127 
1128 
1129 
1130 
1131 
1132 
1133 
1134 
1135 
1136 
1137 
1138 
1139 
1140     protected void doToHandler(final Handler<? super T> handler) throws Throwable {
1141         final Iterator<T> iterator = doIterator();
1142         while (iterator.hasNext()) {
1143             if (Thread.interrupted()) {
1144                 return;
1145             }
1146             handler.handle(iterator.next());
1147         }
1148         handler.handle(null);
1149     }
1150 
1151     
1152 
1153 
1154 
1155 
1156 
1157 
1158 
1159     @Nullable
1160     protected String doToString() {
1161         return null;
1162     }
1163 
1164     
1165 
1166 
1167 
1168 
1169 
1170 
1171     protected void doClose() throws Throwable {
1172     }
1173 
1174     @Override
1175     protected void finalize() throws Throwable {
1176         try {
1177             close();
1178         } finally {
1179             super.finalize();
1180         }
1181     }
1182 
1183     private static final class State {
1184 
1185         @Nullable
1186         List<Object> closeObjects = Lists.newArrayList();
1187 
1188         @Nullable
1189         ScheduledFuture<?> timeoutFuture;
1190 
1191         @Nullable
1192         Map<String, Object> properties;
1193 
1194         @Nullable
1195         Iterator<?> activeIterator;
1196 
1197         @Nullable
1198         Thread toHandlerThread;
1199 
1200         boolean available = true;
1201 
1202         boolean closed = false;
1203 
1204     }
1205 
1206     private abstract static class AbstractIterator<T> extends UnmodifiableIterator<T> implements
1207             Closeable {
1208 
1209         @Nullable
1210         private T next;
1211 
1212         @Override
1213         public final boolean hasNext() {
1214             if (this.next == null) {
1215                 this.next = advance();
1216             }
1217             return this.next != null;
1218         }
1219 
1220         @Override
1221         public final T next() {
1222             if (this.next == null) {
1223                 final T result = advance();
1224                 if (result != null) {
1225                     return result;
1226                 }
1227                 throw new NoSuchElementException();
1228             } else {
1229                 final T result = this.next;
1230                 this.next = null;
1231                 return result;
1232             }
1233         }
1234 
1235         @Override
1236         public void close() throws IOException {
1237         }
1238 
1239         protected abstract T advance();
1240 
1241     }
1242 
1243     private static final class CheckedIterator<T> extends UnmodifiableIterator<T> {
1244 
1245         private final Iterator<T> iterator;
1246 
1247         private final Stream<T> stream;
1248 
1249         private final State state;
1250 
1251         private boolean exhausted;
1252 
1253         CheckedIterator(final Iterator<T> iterator, final Stream<T> stream) {
1254             this.iterator = iterator;
1255             this.stream = stream;
1256             this.state = stream.state;
1257             this.exhausted = false;
1258         }
1259 
1260         @Override
1261         public boolean hasNext() {
1262             boolean result = false;
1263             if (!this.exhausted) {
1264                 checkState();
1265                 try {
1266                     result = this.iterator.hasNext();
1267                 } finally {
1268                     if (!result) {
1269                         this.stream.close();
1270                         this.exhausted = true;
1271                     }
1272                 }
1273             }
1274             return result;
1275         }
1276 
1277         @Override
1278         public T next() {
1279             checkState();
1280             try {
1281                 return this.iterator.next();
1282             } catch (final Throwable ex) {
1283                 this.stream.close();
1284                 throw Throwables.propagate(ex);
1285             }
1286         }
1287 
1288         private void checkState() {
1289             boolean closed;
1290             synchronized (this.state) {
1291                 closed = this.state.closed;
1292             }
1293             Preconditions.checkState(!closed, "Stream has been closed");
1294         }
1295 
1296     }
1297 
1298     private static final class ToHandlerIterator<T> extends AbstractIterator<T> implements
1299             Handler<T>, Runnable {
1300 
1301         private final Stream<T> stream;
1302 
1303         private final BlockingQueue<Object> queue;
1304 
1305         private Future<?> future;
1306 
1307         ToHandlerIterator(final Stream<T> stream) {
1308             this.stream = stream;
1309             this.queue = new ArrayBlockingQueue<Object>(1024);
1310             this.future = null;
1311         }
1312 
1313         public void submit() {
1314             this.future = Data.getExecutor().submit(this);
1315         }
1316 
1317         @Override
1318         public void run() {
1319             try {
1320                 this.stream.doToHandler(this);
1321             } catch (final Throwable ex) {
1322                 putUninterruptibly(ex);
1323                 putUninterruptibly(EOF);
1324             }
1325         }
1326 
1327         @Override
1328         public void handle(final T element) {
1329             try {
1330                 this.queue.put(element == null ? EOF : element);
1331             } catch (final InterruptedException ex) {
1332                 putUninterruptibly(ex);
1333                 putUninterruptibly(EOF);
1334                 Thread.currentThread().interrupt(); 
1335             }
1336         }
1337 
1338         @SuppressWarnings("unchecked")
1339         @Override
1340         protected T advance() {
1341             try {
1342                 final Object element = this.queue.take();
1343                 if (element == EOF) {
1344                     return null;
1345                 } else if (element instanceof Throwable) {
1346                     throw Throwables.propagate((Throwable) element);
1347                 } else {
1348                     return (T) element;
1349                 }
1350             } catch (final InterruptedException ex) {
1351                 Thread.currentThread().interrupt(); 
1352                 throw new RuntimeException("Interrupted while waiting for next element", ex);
1353             }
1354         }
1355 
1356         @Override
1357         public void close() {
1358             if (this.future != null) {
1359                 this.future.cancel(true);
1360             }
1361         }
1362 
1363         private void putUninterruptibly(final Object element) {
1364             while (true) {
1365                 try {
1366                     this.queue.put(element);
1367                     return;
1368                 } catch (final InterruptedException ex) {
1369                     
1370                 }
1371             }
1372         }
1373 
1374     }
1375 
1376     
1377 
1378     private static final class EmptyStream<T> extends Stream<T> {
1379 
1380         @Override
1381         protected Iterator<T> doIterator() {
1382             return Collections.emptyIterator();
1383         }
1384 
1385         @Override
1386         protected void doToHandler(final Handler<? super T> handler) throws Throwable {
1387             handler.handle(null);
1388         }
1389 
1390     }
1391 
1392     private static final class SingletonStream<T> extends Stream<T> {
1393 
1394         private T element;
1395 
1396         SingletonStream(final T element) {
1397             this.element = Preconditions.checkNotNull(element);
1398         }
1399 
1400         @Override
1401         protected Iterator<T> doIterator() {
1402             return Iterators.singletonIterator(this.element);
1403         }
1404 
1405         @Override
1406         protected void doToHandler(final Handler<? super T> handler) throws Throwable {
1407             handler.handle(this.element);
1408             handler.handle(null);
1409         }
1410 
1411         @Override
1412         protected void doClose() throws Throwable {
1413             this.element = null;
1414         }
1415     }
1416 
1417     private static final class IterableStream<T> extends Stream<T> {
1418 
1419         private Iterable<? extends T> iterable;
1420 
1421         IterableStream(final Iterable<? extends T> iterable) {
1422             this.iterable = Preconditions.checkNotNull(iterable);
1423         }
1424 
1425         @SuppressWarnings("unchecked")
1426         @Override
1427         protected Iterator<T> doIterator() throws Throwable {
1428             return ((Iterable<T>) this.iterable).iterator();
1429         }
1430 
1431         @Override
1432         protected void doToHandler(final Handler<? super T> handler) throws Throwable {
1433             for (final T element : this.iterable) {
1434                 handler.handle(element);
1435             }
1436             handler.handle(null);
1437         }
1438 
1439         @Override
1440         protected void doClose() throws Throwable {
1441             if (this.iterable instanceof Closeable) {
1442                 ((Closeable) this.iterable).close();
1443             }
1444             this.iterable = null;
1445         }
1446 
1447     }
1448 
1449     private static final class IteratorStream<T> extends Stream<T> {
1450 
1451         private Iterator<? extends T> iterator;
1452 
1453         IteratorStream(final Iterator<? extends T> iterator) {
1454             this.iterator = Preconditions.checkNotNull(iterator);
1455         }
1456 
1457         @SuppressWarnings("unchecked")
1458         @Override
1459         protected Iterator<T> doIterator() {
1460             return (Iterator<T>) this.iterator;
1461         }
1462 
1463         @Override
1464         protected void doToHandler(final Handler<? super T> handler) throws Throwable {
1465             while (this.iterator.hasNext()) {
1466                 if (Thread.interrupted()) {
1467                     return;
1468                 }
1469                 final T element = this.iterator.next();
1470                 handler.handle(element);
1471             }
1472             handler.handle(null);
1473         }
1474 
1475         @Override
1476         protected void doClose() throws Throwable {
1477             if (this.iterator instanceof Closeable) {
1478                 ((Closeable) this.iterator).close();
1479             }
1480             this.iterator = null;
1481         }
1482 
1483     }
1484 
1485     private static final class IterationStream<T> extends Stream<T> {
1486 
1487         private Iteration<? extends T, ?> iteration;
1488 
1489         IterationStream(final Iteration<? extends T, ?> iteration) {
1490             this.iteration = Preconditions.checkNotNull(iteration);
1491         }
1492 
1493         @Override
1494         protected Iterator<T> doIterator() {
1495             return new AbstractIterator<T>() {
1496 
1497                 @Override
1498                 protected T advance() {
1499                     try {
1500                         if (IterationStream.this.iteration.hasNext()) {
1501                             return IterationStream.this.iteration.next();
1502                         } else {
1503                             return null;
1504                         }
1505                     } catch (final Throwable ex) {
1506                         throw Throwables.propagate(ex);
1507                     }
1508                 }
1509 
1510             };
1511         }
1512 
1513         @Override
1514         protected void doToHandler(final Handler<? super T> handler) throws Throwable {
1515             while (this.iteration.hasNext()) {
1516                 if (Thread.interrupted()) {
1517                     return;
1518                 }
1519                 final T element = this.iteration.next();
1520                 handler.handle(element);
1521             }
1522             handler.handle(null);
1523         }
1524 
1525         @Override
1526         protected void doClose() throws Throwable {
1527             if (this.iteration instanceof CloseableIteration<?, ?>) {
1528                 ((CloseableIteration<? extends T, ?>) this.iteration).close();
1529             }
1530             this.iteration = null;
1531         }
1532 
1533     }
1534 
1535     private abstract static class DelegatingStream<I, O> extends Stream<O> {
1536 
1537         final Stream<I> delegate;
1538 
1539         DelegatingStream(final Stream<I> delegate) {
1540             super(delegate.state);
1541             this.delegate = delegate;
1542         }
1543 
1544         @Override
1545         void toStringHelper(final StringBuilder builder) {
1546             super.toStringHelper(builder);
1547             builder.append(" (");
1548             this.delegate.toStringHelper(builder);
1549             builder.append(")");
1550         }
1551 
1552     }
1553 
1554     private static final class ConcatStream<I extends Iterable<? extends O>, O> extends
1555             DelegatingStream<I, O> {
1556 
1557         ConcatStream(final Stream<I> delegate) {
1558             super(delegate);
1559         }
1560 
1561         @Override
1562         protected Iterator<O> doIterator() throws Throwable {
1563             final Iterator<I> streamIterator = this.delegate.doIterator();
1564             final Iterator<O> elementIterator = new AbstractIterator<O>() {
1565 
1566                 private Stream<? extends O> stream;
1567 
1568                 private Iterator<? extends O> iterator;
1569 
1570                 @Override
1571                 protected O advance() {
1572                     while (this.iterator == null || !this.iterator.hasNext()) {
1573                         if (this.stream != null) {
1574                             this.stream.close();
1575                         }
1576                         if (!streamIterator.hasNext()) {
1577                             return null;
1578                         }
1579                         this.stream = create(streamIterator.next());
1580                         this.iterator = this.stream.iterator();
1581                     }
1582                     return this.iterator.next();
1583                 }
1584 
1585                 @Override
1586                 public void close() {
1587                     if (this.stream != null) {
1588                         this.stream.close();
1589                     }
1590                 }
1591 
1592             };
1593             onClose(elementIterator);
1594             return elementIterator;
1595         }
1596 
1597         @Override
1598         protected void doToHandler(final Handler<? super O> handler) throws Throwable {
1599             this.delegate.doToHandler(new Handler<I>() {
1600 
1601                 @Override
1602                 public void handle(final I iterable) throws Throwable {
1603                     if (iterable == null) {
1604                         handler.handle(null);
1605                     } else {
1606                         final AtomicBoolean eof = new AtomicBoolean(false);
1607                         create(iterable).toHandler(new Handler<O>() {
1608 
1609                             @Override
1610                             public void handle(final O element) throws Throwable {
1611                                 if (element != null) {
1612                                     handler.handle(element);
1613                                 } else {
1614                                     eof.set(true);
1615                                 }
1616                             }
1617 
1618                         });
1619                         if (!eof.get()) { 
1620                             Thread.currentThread().interrupt();
1621                         }
1622                     }
1623                 }
1624 
1625             });
1626         }
1627 
1628     }
1629 
1630     private abstract static class ProcessingStream<I, O> extends DelegatingStream<I, O> {
1631 
1632         final int parallelism;
1633 
1634         ProcessingStream(final Stream<I> delegate, final int parallelism) {
1635             super(delegate);
1636             this.parallelism = parallelism;
1637         }
1638 
1639         @Override
1640         protected final Iterator<O> doIterator() throws Throwable {
1641             if (this.parallelism <= 1) {
1642                 return doIteratorSequential();
1643             } else {
1644                 return doIteratorParallel();
1645             }
1646         }
1647 
1648         private Iterator<O> doIteratorSequential() throws Throwable {
1649             final Iterator<I> iterator = this.delegate.doIterator();
1650             return new AbstractIterator<O>() {
1651 
1652                 @SuppressWarnings("unchecked")
1653                 @Override
1654                 protected O advance() {
1655                     while (iterator.hasNext()) {
1656                         final I element = iterator.next();
1657                         final Object transformed = process(element);
1658                         if (transformed == EOF) {
1659                             return null;
1660                         } else if (transformed != null) {
1661                             return (O) transformed;
1662                         }
1663                     }
1664                     return null;
1665                 }
1666 
1667             };
1668         }
1669 
1670         private Iterator<O> doIteratorParallel() throws Throwable {
1671             final Iterator<I> iterator = this.delegate.doIterator();
1672             final List<Future<Object>> queue = Lists.newLinkedList();
1673             final Iterator<O> result = new AbstractIterator<O>() {
1674 
1675                 @SuppressWarnings("unchecked")
1676                 @Override
1677                 protected O advance() {
1678                     while (true) {
1679                         while (queue.size() < ProcessingStream.this.parallelism
1680                                 && iterator.hasNext()) {
1681                             offer(queue, iterator.next());
1682                         }
1683                         if (queue.isEmpty()) {
1684                             return null;
1685                         }
1686                         final Object output = take(queue);
1687                         if (output == EOF) {
1688                             return null;
1689                         } else if (output != null) {
1690                             return (O) output;
1691                         }
1692                     }
1693                 }
1694 
1695                 @Override
1696                 public void close() {
1697                     for (final Future<Object> future : queue) {
1698                         try {
1699                             future.cancel(true);
1700                         } catch (final Exception ex) {
1701                             
1702                         }
1703                     }
1704                 }
1705 
1706             };
1707             onClose(result);
1708             return result;
1709         }
1710 
1711         @Override
1712         protected final void doToHandler(final Handler<? super O> handler) throws Throwable {
1713             if (this.parallelism <= 1) {
1714                 doToHandlerSequential(handler);
1715             } else {
1716                 doToHandlerParallel(handler);
1717             }
1718         }
1719 
1720         private void doToHandlerSequential(final Handler<? super O> handler) throws Throwable {
1721             this.delegate.doToHandler(new Handler<I>() {
1722 
1723                 private boolean done = false;
1724 
1725                 @SuppressWarnings("unchecked")
1726                 @Override
1727                 public void handle(final I element) throws Throwable {
1728                     if (!this.done) {
1729                         if (element == null) {
1730                             handler.handle(null);
1731                             this.done = true;
1732                         } else {
1733                             final Object transformed = process(element);
1734                             if (transformed == EOF) {
1735                                 handler.handle(null);
1736                                 Thread.currentThread().interrupt();
1737                                 this.done = true;
1738                             } else if (transformed != null) {
1739                                 handler.handle((O) transformed);
1740                             }
1741                         }
1742                     }
1743                 }
1744 
1745             });
1746         }
1747 
1748         private void doToHandlerParallel(final Handler<? super O> handler) throws Throwable {
1749             final List<Future<Object>> queue = Lists.newLinkedList();
1750             try {
1751                 this.delegate.doToHandler(new Handler<I>() {
1752 
1753                     private boolean done = false;
1754 
1755                     @SuppressWarnings("unchecked")
1756                     @Override
1757                     public void handle(final I element) throws Throwable {
1758                         if (!this.done) {
1759                             if (element == null) {
1760                                 while (!this.done && !queue.isEmpty()) {
1761                                     final Object output = take(queue);
1762                                     if (output == EOF) {
1763                                         break;
1764                                     } else if (output != null) {
1765                                         handler.handle((O) output);
1766                                     }
1767                                 }
1768                                 handler.handle(null);
1769                                 this.done = true;
1770                             } else {
1771                                 if (queue.size() == ProcessingStream.this.parallelism) {
1772                                     final Object output = take(queue);
1773                                     if (output == EOF) {
1774                                         handler.handle(null);
1775                                         Thread.currentThread().interrupt();
1776                                         this.done = true;
1777                                     } else if (output != null) {
1778                                         handler.handle((O) output);
1779                                     }
1780                                 }
1781                                 if (!this.done) {
1782                                     offer(queue, element);
1783                                 }
1784                             }
1785                         }
1786                     }
1787 
1788                 });
1789             } finally {
1790                 for (final Future<Object> future : queue) {
1791                     try {
1792                         future.cancel(true);
1793                     } catch (final Exception ex) {
1794                         
1795                     }
1796                 }
1797             }
1798         }
1799 
1800         private Object take(final List<Future<Object>> queue) {
1801             return Futures.get(queue.remove(0), RuntimeException.class);
1802         }
1803 
1804         private void offer(final List<Future<Object>> queue, final I element) {
1805             queue.add(Data.getExecutor().submit(new Callable<Object>() {
1806 
1807                 @Override
1808                 public Object call() {
1809                     return process(element);
1810                 }
1811 
1812             }));
1813         }
1814 
1815         protected abstract Object process(I element);
1816 
1817     }
1818 
1819     private static final class FilterStream<T> extends ProcessingStream<T, T> {
1820 
1821         private final Predicate<? super T> predicate;
1822 
1823         FilterStream(final Stream<T> delegate, final int parallelism,
1824                 final Predicate<? super T> predicate) {
1825             super(delegate, parallelism);
1826             this.predicate = Preconditions.checkNotNull(predicate);
1827         }
1828 
1829         @Override
1830         protected Object process(final T element) {
1831             return this.predicate.apply(element) ? element : null;
1832         }
1833 
1834         @Override
1835         protected String doToString() {
1836             return this.predicate + ", " + this.parallelism;
1837         }
1838 
1839     }
1840 
1841     private static final class TransformElementStream<I, O> extends ProcessingStream<I, O> {
1842 
1843         private final Function<? super I, ? extends O> function;
1844 
1845         TransformElementStream(final Stream<I> delegate, final int parallelism,
1846                 final Function<? super I, ? extends O> function) {
1847             super(delegate, parallelism);
1848             this.function = Preconditions.checkNotNull(function);
1849         }
1850 
1851         @Override
1852         protected Object process(final I element) {
1853             return this.function.apply(element);
1854         }
1855 
1856         @Override
1857         protected String doToString() {
1858             return this.function + ", " + this.parallelism;
1859         }
1860 
1861     }
1862 
1863     private static final class TransformSequenceStream<I, O> extends DelegatingStream<I, O> {
1864 
1865         private final Function<Iterator<I>, Iterator<O>> iteratorFunction;
1866 
1867         private final Function<Handler<O>, Handler<I>> handlerFunction;
1868 
1869         TransformSequenceStream(final Stream<I> delegate,
1870                 @Nullable final Function<Iterator<I>, Iterator<O>> iteratorFunction,
1871                 @Nullable final Function<Handler<O>, Handler<I>> handlerFunction) {
1872             super(delegate);
1873             Preconditions.checkArgument(iteratorFunction != null || handlerFunction != null,
1874                     "At least one function must be supplied");
1875             this.iteratorFunction = iteratorFunction;
1876             this.handlerFunction = handlerFunction;
1877         }
1878 
1879         @Override
1880         protected Iterator<O> doIterator() throws Throwable {
1881             if (this.iteratorFunction != null) {
1882                 return this.iteratorFunction.apply(this.delegate.doIterator());
1883             } else {
1884                 return super.doIterator(); 
1885             }
1886         }
1887 
1888         @SuppressWarnings("unchecked")
1889         @Override
1890         protected void doToHandler(final Handler<? super O> handler) throws Throwable {
1891             if (this.handlerFunction != null) {
1892                 this.delegate.doToHandler(this.handlerFunction.apply((Handler<O>) handler));
1893             } else {
1894                 super.doToHandler(handler); 
1895             }
1896         }
1897 
1898         @Override
1899         protected String doToString() {
1900             return this.iteratorFunction == null ? this.handlerFunction.toString()
1901                     : this.handlerFunction == null ? this.iteratorFunction.toString()
1902                             : this.iteratorFunction.toString() + ", "
1903                                     + this.handlerFunction.toString();
1904         }
1905 
1906     }
1907 
1908     private static final class TransformPathStream<I, O> extends ProcessingStream<I, List<O>> {
1909 
1910         private final Class<O> type;
1911 
1912         private final boolean lenient;
1913 
1914         private final Object[] path;
1915 
1916         TransformPathStream(final Stream<I> delegate, final Class<O> type, final boolean lenient,
1917                 final Object[] path) {
1918             super(delegate, 0);
1919             this.type = Preconditions.checkNotNull(type);
1920             this.lenient = lenient;
1921             this.path = Preconditions.checkNotNull(path);
1922         }
1923 
1924         @Override
1925         protected Object process(final I element) {
1926             final List<O> elements = Lists.newArrayList();
1927             path(element, 0, elements);
1928             return elements;
1929         }
1930 
1931         @SuppressWarnings({ "rawtypes", "unchecked" })
1932         private void path(final Object object, final int index, final List<O> result) {
1933             if (object == null) {
1934                 return;
1935             } else if (object instanceof Iterable && !(object instanceof BindingSet)) {
1936                 for (final Object element : (Iterable) object) {
1937                     path(element, index, result);
1938                 }
1939             } else if (object instanceof Iterator) {
1940                 final Iterator<?> iterator = (Iterator<?>) object;
1941                 while (iterator.hasNext()) {
1942                     path(iterator.next(), index, result);
1943                 }
1944             } else if (object.getClass().isArray()) {
1945                 final int length = Array.getLength(object);
1946                 for (int i = 0; i < length; ++i) {
1947                     path(Array.get(object, i), index, result);
1948                 }
1949             } else if (index == this.path.length) {
1950                 final O element = this.lenient ? Data.convert(object, this.type, null) : 
1951                         Data.convert(object, this.type);
1952                 if (element != null) {
1953                     result.add(element);
1954                 }
1955             } else {
1956                 final Object key = this.path[index];
1957                 if (object instanceof Record) {
1958                     if (key instanceof URI) {
1959                         path(((Record) object).get((URI) key), index + 1, result);
1960                     }
1961                 } else if (object instanceof BindingSet) {
1962                     if (key instanceof String) {
1963                         path(((BindingSet) object).getValue((String) key), index + 1, result);
1964                     }
1965                 } else if (object instanceof Map) {
1966                     path(((Map<Object, Object>) object).get(key), index + 1, result);
1967                 } else if (object instanceof Multimap) {
1968                     path(((Multimap<Object, Object>) object).get(this.path), index + 1, result);
1969                 }
1970             }
1971         }
1972 
1973     }
1974 
1975     private static final class DistinctStream<T> extends ProcessingStream<T, T> {
1976 
1977         private final Set<T> seen;
1978 
1979         DistinctStream(final Stream<T> delegate) {
1980             super(delegate, 0); 
1981             this.seen = Sets.newHashSet();
1982         }
1983 
1984         @Override
1985         protected Object process(final T element) {
1986             return this.seen.add(element) ? element : null; 
1987         }
1988 
1989     }
1990 
1991     private static final class SliceStream<T> extends ProcessingStream<T, T> {
1992 
1993         private final long startIndex;
1994 
1995         private final long endIndex;
1996 
1997         private long index;
1998 
1999         SliceStream(final Stream<T> delegate, final long offset, final long limit) {
2000             super(delegate, 0); 
2001             Preconditions.checkArgument(offset >= 0, "Negative offset: {}", limit);
2002             Preconditions.checkArgument(limit >= 0, "Negative limit: {}", limit);
2003             this.startIndex = offset;
2004             this.endIndex = offset + limit;
2005             this.index = 0;
2006         }
2007 
2008         @Override
2009         protected Object process(final T element) {
2010             Object result = null;
2011             if (this.index >= this.endIndex) {
2012                 result = EOF;
2013             } else if (this.index >= this.startIndex) {
2014                 result = element;
2015             }
2016             ++this.index;
2017             return result;
2018         }
2019 
2020     }
2021 
2022     private static final class ChunkStream<T> extends DelegatingStream<T, List<T>> {
2023 
2024         private final int chunkSize;
2025 
2026         ChunkStream(final Stream<T> delegate, final int chunkSize) {
2027             super(delegate);
2028             Preconditions.checkArgument(chunkSize > 0, "Invalid chunk size: %d", chunkSize);
2029             this.chunkSize = chunkSize;
2030         }
2031 
2032         @Override
2033         protected Iterator<List<T>> doIterator() throws Throwable {
2034             final Iterator<T> iterator = this.delegate.doIterator();
2035             return new AbstractIterator<List<T>>() {
2036 
2037                 private final Object[] chunk = new Object[ChunkStream.this.chunkSize];
2038 
2039                 @SuppressWarnings("unchecked")
2040                 @Override
2041                 protected List<T> advance() {
2042                     int index = 0;
2043                     for (; index < ChunkStream.this.chunkSize && iterator.hasNext(); ++index) {
2044                         this.chunk[index] = iterator.next();
2045                     }
2046                     if (index == 0) {
2047                         return null;
2048                     } else if (index == ChunkStream.this.chunkSize) {
2049                         return (List<T>) ImmutableList.copyOf(this.chunk);
2050                     } else {
2051                         return (List<T>) ImmutableList.copyOf(Arrays.asList(this.chunk).subList(0,
2052                                 index));
2053                     }
2054                 }
2055 
2056             };
2057         }
2058 
2059         @Override
2060         protected void doToHandler(final Handler<? super List<T>> handler) throws Throwable {
2061             this.delegate.doToHandler(new Handler<T>() {
2062 
2063                 private final List<T> chunk = Lists.newArrayList();
2064 
2065                 @Override
2066                 public void handle(final T element) throws Throwable {
2067                     if (element == null) {
2068                         if (!this.chunk.isEmpty()) {
2069                             handler.handle(ImmutableList.copyOf(this.chunk));
2070                         }
2071                         handler.handle(null);
2072                     } else {
2073                         this.chunk.add(element);
2074                         if (this.chunk.size() == ChunkStream.this.chunkSize) {
2075                             handler.handle(ImmutableList.copyOf(this.chunk));
2076                             this.chunk.clear();
2077                         }
2078                     }
2079                 }
2080 
2081             });
2082         }
2083 
2084         @Override
2085         protected String doToString() {
2086             return Integer.toString(this.chunkSize);
2087         }
2088 
2089     }
2090 
2091     private static final class TrackStream<T> extends DelegatingStream<T, T> {
2092 
2093         private final AtomicLong counter;
2094 
2095         private final AtomicBoolean eof;
2096 
2097         public TrackStream(final Stream<T> delegate, @Nullable final AtomicLong counter,
2098                 @Nullable final AtomicBoolean eof) {
2099             super(delegate);
2100             this.counter = counter != null ? counter : new AtomicLong();
2101             this.eof = eof != null ? eof : new AtomicBoolean();
2102             this.counter.set(0L);
2103             this.eof.set(false);
2104         }
2105 
2106         @Override
2107         protected Iterator<T> doIterator() throws Throwable {
2108             final Iterator<T> iterator = this.delegate.doIterator();
2109             return new UnmodifiableIterator<T>() {
2110 
2111                 private long count = 0L;
2112 
2113                 @Override
2114                 public boolean hasNext() {
2115                     final boolean result = iterator.hasNext();
2116                     TrackStream.this.eof.set(result);
2117                     return result;
2118                 }
2119 
2120                 @Override
2121                 public T next() {
2122                     final T next = iterator.next();
2123                     TrackStream.this.counter.set(++this.count);
2124                     return next;
2125                 }
2126 
2127             };
2128         }
2129 
2130         @Override
2131         protected void doToHandler(final Handler<? super T> handler) throws Throwable {
2132             this.delegate.doToHandler(new Handler<T>() {
2133 
2134                 private long count = 0L;
2135 
2136                 @Override
2137                 public void handle(final T element) throws Throwable {
2138                     if (element != null) {
2139                         TrackStream.this.counter.set(++this.count);
2140                         handler.handle(element);
2141                     } else {
2142                         TrackStream.this.eof.set(true);
2143                         handler.handle(null);
2144                     }
2145                 }
2146 
2147             });
2148         }
2149 
2150     }
2151 
2152     private static final class PathComparator implements Comparator<Object> {
2153 
2154         private final Class<? extends Comparable<?>> type;
2155 
2156         private final Object[] path;
2157 
2158         private final boolean lenient;
2159 
2160         PathComparator(final Class<? extends Comparable<?>> type, final boolean lenient,
2161                 final Object... path) {
2162             this.type = Preconditions.checkNotNull(type);
2163             this.lenient = lenient;
2164             this.path = path.clone();
2165         }
2166 
2167         @SuppressWarnings({ "rawtypes", "unchecked" })
2168         @Override
2169         public int compare(final Object first, final Object second) {
2170             final Comparable firstKey = path(first, 0);
2171             final Comparable secondKey = path(second, 0);
2172             if (firstKey == null) {
2173                 return secondKey == null ? 0 : 1;
2174             } else {
2175                 return secondKey == null ? -1 : firstKey.compareTo(secondKey);
2176             }
2177         }
2178 
2179         @SuppressWarnings({ "unchecked" })
2180         private Comparable<?> path(final Object object, final int index) {
2181             if (object == null) {
2182                 return null;
2183             } else if (object instanceof Iterable && !(object instanceof BindingSet)) {
2184                 final Iterator<?> iterator = ((Iterable<?>) object).iterator();
2185                 return iterator.hasNext() ? path(iterator.next(), index) : null;
2186             } else if (object instanceof Iterator) {
2187                 final Iterator<?> iterator = (Iterator<?>) object;
2188                 return iterator.hasNext() ? path(iterator.next(), index) : null;
2189             } else if (object.getClass().isArray()) {
2190                 final int length = Array.getLength(object);
2191                 return length > 0 ? path(Array.get(object, 0), index) : null;
2192             } else if (index == this.path.length) {
2193                 return this.lenient ? Data.convert(object, this.type, null) : 
2194                         Data.convert(object, this.type);
2195             } else {
2196                 final Object key = this.path[index];
2197                 if (object instanceof Record) {
2198                     return key instanceof URI ? path(((Record) object).get((URI) key), index + 1)
2199                             : null;
2200                 } else if (object instanceof BindingSet) {
2201                     return key instanceof String ? path(
2202                             ((BindingSet) object).getValue((String) key), index + 1) : null;
2203                 } else if (object instanceof Map) {
2204                     return path(((Map<Object, Object>) object).get(key), index + 1);
2205                 } else if (object instanceof Multimap) {
2206                     return path(((Multimap<Object, Object>) object).get(this.path), index + 1);
2207                 }
2208                 return null;
2209             }
2210         }
2211 
2212     }
2213 
2214 }