1 package eu.fbk.knowledgestore.data;
2
3 import java.io.Closeable;
4 import java.io.IOException;
5 import java.lang.reflect.Array;
6 import java.lang.reflect.Method;
7 import java.util.Arrays;
8 import java.util.Collection;
9 import java.util.Collections;
10 import java.util.Comparator;
11 import java.util.Enumeration;
12 import java.util.Iterator;
13 import java.util.List;
14 import java.util.Map;
15 import java.util.NoSuchElementException;
16 import java.util.Set;
17 import java.util.SortedSet;
18 import java.util.concurrent.ArrayBlockingQueue;
19 import java.util.concurrent.BlockingQueue;
20 import java.util.concurrent.Callable;
21 import java.util.concurrent.Future;
22 import java.util.concurrent.ScheduledFuture;
23 import java.util.concurrent.TimeUnit;
24 import java.util.concurrent.atomic.AtomicBoolean;
25 import java.util.concurrent.atomic.AtomicLong;
26 import java.util.concurrent.atomic.AtomicReference;
27
28 import javax.annotation.Nullable;
29
30 import com.google.common.base.Function;
31 import com.google.common.base.Functions;
32 import com.google.common.base.Preconditions;
33 import com.google.common.base.Predicate;
34 import com.google.common.base.Throwables;
35 import com.google.common.collect.ArrayListMultimap;
36 import com.google.common.collect.ImmutableCollection;
37 import com.google.common.collect.ImmutableList;
38 import com.google.common.collect.ImmutableListMultimap;
39 import com.google.common.collect.ImmutableMap;
40 import com.google.common.collect.ImmutableSet;
41 import com.google.common.collect.ImmutableSortedSet;
42 import com.google.common.collect.Iterables;
43 import com.google.common.collect.Iterators;
44 import com.google.common.collect.ListMultimap;
45 import com.google.common.collect.Lists;
46 import com.google.common.collect.Maps;
47 import com.google.common.collect.Multimap;
48 import com.google.common.collect.Ordering;
49 import com.google.common.collect.Sets;
50 import com.google.common.collect.UnmodifiableIterator;
51 import com.google.common.util.concurrent.Futures;
52
53 import org.openrdf.model.URI;
54 import org.openrdf.query.BindingSet;
55 import org.slf4j.Logger;
56 import org.slf4j.LoggerFactory;
57
58 import info.aduna.iteration.CloseableIteration;
59 import info.aduna.iteration.Iteration;
60
61 import eu.fbk.knowledgestore.internal.Util;
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154 public abstract class Stream<T> implements Iterable<T>, Closeable {
155
156 private static final Logger LOGGER = LoggerFactory.getLogger(Stream.class);
157
158 private static final Object EOF = new Object();
159
160 final State state;
161
162
163
164
165 protected Stream() {
166 this(new State());
167 }
168
169 Stream(final State state) {
170 this.state = state;
171 state.closeObjects.add(this);
172 }
173
174
175
176
177
178
179
180
181
182
183
184
185 public static <T> Stream<T> create(@SuppressWarnings("unchecked") final T... elements) {
186 if (elements.length == 0) {
187 return new EmptyStream<T>();
188 } else if (elements.length == 1) {
189 return new SingletonStream<T>(elements[0]);
190 } else {
191 return new IteratorStream<T>(Iterators.forArray(elements));
192 }
193 }
194
195
196
197
198
199
200
201
202
203
204
205
206
207 @SuppressWarnings("unchecked")
208 public static <T> Stream<T> create(final Iterable<? extends T> iterable) {
209 if (iterable instanceof Stream) {
210 return (Stream<T>) iterable;
211 } else if (iterable instanceof ImmutableCollection<?>
212 && ((ImmutableCollection<? extends T>) iterable).isEmpty()) {
213 return new EmptyStream<T>();
214 } else {
215 return new IterableStream<T>(iterable);
216 }
217 }
218
219
220
221
222
223
224
225
226
227
228 public static <T> Stream<T> create(final Iterator<? extends T> iterator) {
229 if (iterator.hasNext()) {
230 return new IteratorStream<T>(iterator);
231 } else {
232 return new EmptyStream<T>();
233 }
234 }
235
236
237
238
239
240
241
242
243
244
245 public static <T> Stream<T> create(final Iteration<? extends T, ?> iteration) {
246 return new IterationStream<T>(iteration);
247 }
248
249
250
251
252
253
254
255
256
257
258 public static <T> Stream<T> create(final Enumeration<? extends T> enumeration) {
259 if (enumeration.hasMoreElements()) {
260 return new IteratorStream<T>(Iterators.forEnumeration(enumeration));
261 } else {
262 return new EmptyStream<T>();
263 }
264 }
265
266
267
268
269
270
271
272
273
274
275
276 public static <T> Stream<T> concat(final Iterable<? extends Iterable<? extends T>> iterables) {
277 return new ConcatStream<Iterable<? extends T>, T>(create(iterables));
278 }
279
280
281
282
283
284
285
286
287
288
289
290 @SafeVarargs
291 public static <T> Stream<T> concat(final Iterable<? extends T>... iterables) {
292 return new ConcatStream<Iterable<? extends T>, T>(create(iterables));
293 }
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308 public final Stream<T> filter(final Predicate<? super T> predicate, final int parallelism) {
309 synchronized (this.state) {
310 checkState();
311 return new FilterStream<T>(this, parallelism, predicate);
312 }
313 }
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334 public final <R> Stream<R> transform(final Function<? super T, ? extends R> function,
335 final int parallelism) {
336 synchronized (this.state) {
337 checkState();
338 return new TransformElementStream<T, R>(this, parallelism, function);
339 }
340 }
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358 public final <R> Stream<R> transform(
359 @Nullable final Function<Iterator<T>, Iterator<R>> iteratorFunction,
360 @Nullable final Function<Handler<R>, Handler<T>> handlerFunction) {
361 synchronized (this.state) {
362 checkState();
363 return new TransformSequenceStream<T, R>(this, iteratorFunction, handlerFunction);
364 }
365 }
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392 public final <R> Stream<R> transform(final Class<R> type, final boolean lenient,
393 final Object... path) {
394 synchronized (this.state) {
395 checkState();
396 return concat(new TransformPathStream<T, R>(this, type, lenient, path));
397 }
398 }
399
400
401
402
403
404
405
406
407
408 public final Stream<T> distinct() {
409 synchronized (this.state) {
410 checkState();
411 return this instanceof DistinctStream<?> ? this : new DistinctStream<T>(this);
412 }
413 }
414
415
416
417
418
419
420
421
422
423
424
425
426 public final Stream<T> slice(final long offset, final long limit) {
427 synchronized (this.state) {
428 checkState();
429 return new SliceStream<T>(this, offset, limit);
430 }
431 }
432
433
434
435
436
437
438
439
440
441 public final Stream<List<T>> chunk(final int chunkSize) {
442 synchronized (this.state) {
443 checkState();
444 return new ChunkStream<T>(this, chunkSize);
445 }
446 }
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462 public final Stream<T> track(@Nullable final AtomicLong counter,
463 @Nullable final AtomicBoolean eof) {
464 synchronized (this.state) {
465 checkState();
466 return new TrackStream<T>(this, counter, eof);
467 }
468 }
469
470
471
472
473
474
475
476
477 public final long count() {
478 final AtomicLong result = new AtomicLong();
479 toHandler(new Handler<T>() {
480
481 private long count;
482
483 @Override
484 public void handle(final T element) {
485 if (element != null) {
486 ++this.count;
487 } else {
488 result.set(this.count);
489 }
490
491 }
492
493 });
494 return result.get();
495 }
496
497
498
499
500 @Override
501 public final Iterator<T> iterator() {
502 synchronized (this.state) {
503 checkState();
504 this.state.available = false;
505 final Iterator<T> iterator;
506 try {
507 iterator = new CheckedIterator<T>(doIterator(), this);
508 } catch (final Throwable ex) {
509 throw Throwables.propagate(ex);
510 }
511 this.state.activeIterator = iterator;
512 return iterator;
513 }
514 }
515
516
517
518
519
520
521
522
523
524 public final void toHandler(final Handler<? super T> handler) {
525 Preconditions.checkNotNull(handler);
526 synchronized (this.state) {
527 checkState();
528 this.state.available = false;
529 this.state.toHandlerThread = Thread.currentThread();
530 }
531 try {
532 try {
533 doToHandler(handler);
534 } catch (final Throwable ex) {
535 Throwables.propagate(ex);
536 }
537 } finally {
538 synchronized (this.state) {
539 if (this.state.closed) {
540 checkState();
541 }
542 this.state.toHandlerThread = null;
543 Thread.interrupted();
544 close();
545 }
546 }
547 }
548
549
550
551
552
553
554
555
556
557 public final T[] toArray(final Class<T> elementClass) {
558 return Iterables.toArray(toCollection(Lists.<T>newArrayListWithCapacity(256)),
559 elementClass);
560 }
561
562
563
564
565
566
567
568 public final List<T> toList() {
569 return ImmutableList.copyOf(toCollection(Lists.<T>newArrayListWithCapacity(256)));
570 }
571
572
573
574
575
576
577
578
579 public final Set<T> toSet() {
580 return ImmutableSet.copyOf(toCollection(Lists.<T>newArrayListWithCapacity(256)));
581 }
582
583
584
585
586
587
588
589
590
591
592
593 public final List<T> toSortedList(final Comparator<? super T> comparator) {
594 return Ordering.from(comparator).immutableSortedCopy(
595 toCollection(Lists.<T>newArrayListWithCapacity(256)));
596 }
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618 public final List<T> toSortedList(final Class<? extends Comparable<?>> type,
619 final boolean lenient, final Object... path) {
620 return Ordering.from(new PathComparator(type, lenient, path)).immutableSortedCopy(
621 toCollection(Lists.<T>newArrayListWithCapacity(256)));
622 }
623
624
625
626
627
628
629
630
631
632
633
634 public final SortedSet<T> toSortedSet(final Comparator<? super T> comparator) {
635 return ImmutableSortedSet.copyOf(comparator,
636 toCollection(Lists.<T>newArrayListWithCapacity(256)));
637 }
638
639
640
641
642
643
644
645
646
647
648
649 public final <C extends Collection<? super T>> C toCollection(final C collection) {
650 Preconditions.checkNotNull(collection);
651 toHandler(new Handler<T>() {
652
653 @Override
654 public void handle(final T element) {
655 if (element != null) {
656 collection.add(element);
657 }
658 }
659 });
660 return collection;
661 }
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682 public final <K, V> Map<K, V> toMap(final Function<? super T, ? extends K> keyFunction,
683 final Function<? super T, ? extends V> valueFunction) {
684 return ImmutableMap
685 .copyOf(toMap(keyFunction, valueFunction, Maps.<K, V>newLinkedHashMap()));
686 }
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711 public final <K, V, M extends Map<K, V>> M toMap(
712 final Function<? super T, ? extends K> keyFunction,
713 final Function<? super T, ? extends V> valueFunction, final M map) {
714 Preconditions.checkNotNull(keyFunction);
715 Preconditions.checkNotNull(valueFunction);
716 Preconditions.checkNotNull(map);
717 toHandler(new Handler<T>() {
718
719 @Override
720 public void handle(final T element) {
721 if (element != null) {
722 final K key = keyFunction.apply(element);
723 final V value = valueFunction.apply(element);
724 if (key != null && value != null) {
725 map.put(key, value);
726 }
727 }
728 }
729
730 });
731 return map;
732 }
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751 public final <K, V> ListMultimap<K, V> toMultimap(
752 final Function<? super T, ? extends K> keyFunction,
753 final Function<? super T, ? extends V> valueFunction) {
754 return ImmutableListMultimap.copyOf(toMultimap(keyFunction, valueFunction,
755 ArrayListMultimap.<K, V>create()));
756 }
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779 public final <K, V, M extends Multimap<K, V>> M toMultimap(
780 final Function<? super T, ? extends K> keyFunction,
781 final Function<? super T, ? extends V> valueFunction, final M multimap) {
782 Preconditions.checkNotNull(keyFunction);
783 Preconditions.checkNotNull(valueFunction);
784 Preconditions.checkNotNull(multimap);
785 toHandler(new Handler<T>() {
786
787 @Override
788 public void handle(final T element) {
789 if (element != null) {
790 final K key = keyFunction.apply(element);
791 final V value = valueFunction.apply(element);
792 if (key != null && value != null) {
793 multimap.put(key, value);
794 }
795 }
796 }
797
798 });
799 return multimap;
800 }
801
802
803
804
805
806
807
808
809
810
811 public final T getUnique(final T defaultValue) {
812 try {
813 final T result = getUnique();
814 if (result != null) {
815 return result;
816 }
817 } catch (final Throwable ex) {
818
819 }
820 return defaultValue;
821 }
822
823
824
825
826
827
828
829
830
831
832
833 @SuppressWarnings("unchecked")
834 @Nullable
835 public final T getUnique() throws IllegalStateException {
836 final AtomicReference<Object> holder = new AtomicReference<Object>();
837 toHandler(new Handler<T>() {
838
839 @Override
840 public void handle(final T element) {
841 if (element != null) {
842 if (holder.get() == null) {
843 holder.set(element);
844 } else {
845 holder.set(EOF);
846 Thread.currentThread().interrupt();
847 }
848 }
849 }
850
851 });
852 final Object result = holder.get();
853 if (result != EOF) {
854 return (T) result;
855 }
856 throw new IllegalStateException("Stream " + this + " returned more than one element");
857 }
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873 public final <V> V getProperty(final String name, final Class<V> type) {
874 Preconditions.checkNotNull(name);
875 try {
876 Object value = null;
877 synchronized (this.state) {
878 if (this.state.properties != null) {
879 value = this.state.properties.get(name);
880 }
881 }
882 return Data.convert(value, type);
883 } catch (final Throwable ex) {
884 throw Throwables.propagate(ex);
885 }
886 }
887
888
889
890
891
892
893
894
895
896
897
898
899 public final Stream<T> setProperty(final String name, @Nullable final Object value) {
900 Preconditions.checkNotNull(name);
901 synchronized (this.state) {
902 if (this.state.properties != null) {
903 this.state.properties.put(name, value);
904 } else if (value != null) {
905 this.state.properties = Maps.newHashMap();
906 this.state.properties.put(name, value);
907 }
908 }
909 return this;
910 }
911
912
913
914
915
916
917
918
919 public final Long getTimeout() {
920 synchronized (this.state) {
921 return this.state.timeoutFuture == null ? null : this.state.timeoutFuture
922 .getDelay(TimeUnit.MILLISECONDS) + System.currentTimeMillis();
923 }
924 }
925
926
927
928
929
930
931
932
933
934
935
936 public final Stream<T> setTimeout(@Nullable final Long timestamp) {
937 Preconditions.checkArgument(timestamp == null || timestamp > System.currentTimeMillis());
938 synchronized (this.state) {
939 if (this.state.closed) {
940 return this;
941 }
942 if (this.state.timeoutFuture != null) {
943 if (!this.state.timeoutFuture.cancel(false)) {
944 return this;
945 }
946 }
947 if (timestamp != null) {
948 this.state.timeoutFuture = Data.getExecutor().schedule(new Runnable() {
949
950 @Override
951 public void run() {
952 close();
953 }
954
955 }, Math.max(0, timestamp - System.currentTimeMillis()), TimeUnit.MILLISECONDS);
956 }
957 return this;
958 }
959 }
960
961
962
963
964
965
966
967 public final boolean isAvailable() {
968 synchronized (this.state) {
969 return this.state.available;
970 }
971 }
972
973
974
975
976
977
978
979 public final boolean isClosed() {
980 synchronized (this.state) {
981 return this.state.closed;
982 }
983 }
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998 public final Stream<T> onClose(final Object... objects) {
999 synchronized (this.state) {
1000 for (final Object object : objects) {
1001 if (!(object instanceof Closeable) && !(object instanceof Runnable)
1002 && !(object instanceof Callable)) {
1003 throw new IllegalArgumentException("Illegal object: " + object);
1004 } else if (this.state.closed) {
1005 closeAction(object);
1006 } else {
1007 boolean alreadyContained = false;
1008 for (final Object o : this.state.closeObjects) {
1009 if (o == object) {
1010 alreadyContained = true;
1011 break;
1012 }
1013 }
1014 if (!alreadyContained) {
1015 this.state.closeObjects.add(object);
1016 }
1017 }
1018 }
1019 }
1020 return this;
1021 }
1022
1023
1024
1025
1026
1027
1028
1029
1030 @Override
1031 public final void close() {
1032 synchronized (this.state) {
1033 if (this.state.closed) {
1034 return;
1035 }
1036 if (this.state.activeIterator instanceof Closeable) {
1037 Util.closeQuietly(this.state.activeIterator);
1038 }
1039 if (this.state.toHandlerThread != null) {
1040 this.state.toHandlerThread.interrupt();
1041 }
1042 for (final Object object : this.state.closeObjects) {
1043 closeAction(object);
1044 }
1045 this.state.activeIterator = null;
1046 this.state.toHandlerThread = null;
1047 this.state.available = false;
1048 this.state.closed = true;
1049 }
1050 }
1051
1052
1053
1054
1055
1056
1057
1058
1059 @Override
1060 public final String toString() {
1061 final StringBuilder builder = new StringBuilder();
1062 toStringHelper(builder);
1063 return builder.toString();
1064 }
1065
1066 void toStringHelper(final StringBuilder builder) {
1067 String name = getClass().getSimpleName();
1068 if (name == null) {
1069 final Method method = getClass().getEnclosingMethod();
1070 if (method != null) {
1071 name = method.getDeclaringClass().getSimpleName() + "." + method.getName()
1072 + "-Stream";
1073 } else {
1074 name = "anon-Stream";
1075 }
1076 }
1077 builder.append(name);
1078 final String args = doToString();
1079 if (args != null) {
1080 builder.append("<").append(args).append(">");
1081 }
1082 }
1083
1084 final void checkState() {
1085 synchronized (this.state) {
1086 if (this.state.closed) {
1087 throw new IllegalStateException("Stream already closed: " + this);
1088 } else if (!this.state.available) {
1089 throw new IllegalStateException("Stream already being iterated: " + this);
1090 }
1091 }
1092 }
1093
1094 final void closeAction(final Object object) {
1095 try {
1096 if (object instanceof Stream<?>) {
1097 ((Stream<?>) object).doClose();
1098 } else if (object instanceof Closeable) {
1099 ((Closeable) object).close();
1100 } else if (object instanceof Runnable) {
1101 ((Runnable) object).run();
1102 } else if (object instanceof Callable<?>) {
1103 ((Callable<?>) object).call();
1104 }
1105 } catch (final Throwable ex) {
1106 LOGGER.error("Error performing close action on " + object, ex);
1107 }
1108 }
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120 protected Iterator<T> doIterator() throws Throwable {
1121 final ToHandlerIterator<T> iterator = new ToHandlerIterator<T>(this);
1122 iterator.submit();
1123 return iterator;
1124 }
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140 protected void doToHandler(final Handler<? super T> handler) throws Throwable {
1141 final Iterator<T> iterator = doIterator();
1142 while (iterator.hasNext()) {
1143 if (Thread.interrupted()) {
1144 return;
1145 }
1146 handler.handle(iterator.next());
1147 }
1148 handler.handle(null);
1149 }
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159 @Nullable
1160 protected String doToString() {
1161 return null;
1162 }
1163
1164
1165
1166
1167
1168
1169
1170
1171 protected void doClose() throws Throwable {
1172 }
1173
1174 @Override
1175 protected void finalize() throws Throwable {
1176 try {
1177 close();
1178 } finally {
1179 super.finalize();
1180 }
1181 }
1182
1183 private static final class State {
1184
1185 @Nullable
1186 List<Object> closeObjects = Lists.newArrayList();
1187
1188 @Nullable
1189 ScheduledFuture<?> timeoutFuture;
1190
1191 @Nullable
1192 Map<String, Object> properties;
1193
1194 @Nullable
1195 Iterator<?> activeIterator;
1196
1197 @Nullable
1198 Thread toHandlerThread;
1199
1200 boolean available = true;
1201
1202 boolean closed = false;
1203
1204 }
1205
1206 private abstract static class AbstractIterator<T> extends UnmodifiableIterator<T> implements
1207 Closeable {
1208
1209 @Nullable
1210 private T next;
1211
1212 @Override
1213 public final boolean hasNext() {
1214 if (this.next == null) {
1215 this.next = advance();
1216 }
1217 return this.next != null;
1218 }
1219
1220 @Override
1221 public final T next() {
1222 if (this.next == null) {
1223 final T result = advance();
1224 if (result != null) {
1225 return result;
1226 }
1227 throw new NoSuchElementException();
1228 } else {
1229 final T result = this.next;
1230 this.next = null;
1231 return result;
1232 }
1233 }
1234
1235 @Override
1236 public void close() throws IOException {
1237 }
1238
1239 protected abstract T advance();
1240
1241 }
1242
1243 private static final class CheckedIterator<T> extends UnmodifiableIterator<T> {
1244
1245 private final Iterator<T> iterator;
1246
1247 private final Stream<T> stream;
1248
1249 private final State state;
1250
1251 private boolean exhausted;
1252
1253 CheckedIterator(final Iterator<T> iterator, final Stream<T> stream) {
1254 this.iterator = iterator;
1255 this.stream = stream;
1256 this.state = stream.state;
1257 this.exhausted = false;
1258 }
1259
1260 @Override
1261 public boolean hasNext() {
1262 boolean result = false;
1263 if (!this.exhausted) {
1264 checkState();
1265 try {
1266 result = this.iterator.hasNext();
1267 } finally {
1268 if (!result) {
1269 this.stream.close();
1270 this.exhausted = true;
1271 }
1272 }
1273 }
1274 return result;
1275 }
1276
1277 @Override
1278 public T next() {
1279 checkState();
1280 try {
1281 return this.iterator.next();
1282 } catch (final Throwable ex) {
1283 this.stream.close();
1284 throw Throwables.propagate(ex);
1285 }
1286 }
1287
1288 private void checkState() {
1289 boolean closed;
1290 synchronized (this.state) {
1291 closed = this.state.closed;
1292 }
1293 Preconditions.checkState(!closed, "Stream has been closed");
1294 }
1295
1296 }
1297
1298 private static final class ToHandlerIterator<T> extends AbstractIterator<T> implements
1299 Handler<T>, Runnable {
1300
1301 private final Stream<T> stream;
1302
1303 private final BlockingQueue<Object> queue;
1304
1305 private Future<?> future;
1306
1307 ToHandlerIterator(final Stream<T> stream) {
1308 this.stream = stream;
1309 this.queue = new ArrayBlockingQueue<Object>(1024);
1310 this.future = null;
1311 }
1312
1313 public void submit() {
1314 this.future = Data.getExecutor().submit(this);
1315 }
1316
1317 @Override
1318 public void run() {
1319 try {
1320 this.stream.doToHandler(this);
1321 } catch (final Throwable ex) {
1322 putUninterruptibly(ex);
1323 putUninterruptibly(EOF);
1324 }
1325 }
1326
1327 @Override
1328 public void handle(final T element) {
1329 try {
1330 this.queue.put(element == null ? EOF : element);
1331 } catch (final InterruptedException ex) {
1332 putUninterruptibly(ex);
1333 putUninterruptibly(EOF);
1334 Thread.currentThread().interrupt();
1335 }
1336 }
1337
1338 @SuppressWarnings("unchecked")
1339 @Override
1340 protected T advance() {
1341 try {
1342 final Object element = this.queue.take();
1343 if (element == EOF) {
1344 return null;
1345 } else if (element instanceof Throwable) {
1346 throw Throwables.propagate((Throwable) element);
1347 } else {
1348 return (T) element;
1349 }
1350 } catch (final InterruptedException ex) {
1351 Thread.currentThread().interrupt();
1352 throw new RuntimeException("Interrupted while waiting for next element", ex);
1353 }
1354 }
1355
1356 @Override
1357 public void close() {
1358 if (this.future != null) {
1359 this.future.cancel(true);
1360 }
1361 }
1362
1363 private void putUninterruptibly(final Object element) {
1364 while (true) {
1365 try {
1366 this.queue.put(element);
1367 return;
1368 } catch (final InterruptedException ex) {
1369
1370 }
1371 }
1372 }
1373
1374 }
1375
1376
1377
1378 private static final class EmptyStream<T> extends Stream<T> {
1379
1380 @Override
1381 protected Iterator<T> doIterator() {
1382 return Collections.emptyIterator();
1383 }
1384
1385 @Override
1386 protected void doToHandler(final Handler<? super T> handler) throws Throwable {
1387 handler.handle(null);
1388 }
1389
1390 }
1391
1392 private static final class SingletonStream<T> extends Stream<T> {
1393
1394 private T element;
1395
1396 SingletonStream(final T element) {
1397 this.element = Preconditions.checkNotNull(element);
1398 }
1399
1400 @Override
1401 protected Iterator<T> doIterator() {
1402 return Iterators.singletonIterator(this.element);
1403 }
1404
1405 @Override
1406 protected void doToHandler(final Handler<? super T> handler) throws Throwable {
1407 handler.handle(this.element);
1408 handler.handle(null);
1409 }
1410
1411 @Override
1412 protected void doClose() throws Throwable {
1413 this.element = null;
1414 }
1415 }
1416
1417 private static final class IterableStream<T> extends Stream<T> {
1418
1419 private Iterable<? extends T> iterable;
1420
1421 IterableStream(final Iterable<? extends T> iterable) {
1422 this.iterable = Preconditions.checkNotNull(iterable);
1423 }
1424
1425 @SuppressWarnings("unchecked")
1426 @Override
1427 protected Iterator<T> doIterator() throws Throwable {
1428 return ((Iterable<T>) this.iterable).iterator();
1429 }
1430
1431 @Override
1432 protected void doToHandler(final Handler<? super T> handler) throws Throwable {
1433 for (final T element : this.iterable) {
1434 handler.handle(element);
1435 }
1436 handler.handle(null);
1437 }
1438
1439 @Override
1440 protected void doClose() throws Throwable {
1441 if (this.iterable instanceof Closeable) {
1442 ((Closeable) this.iterable).close();
1443 }
1444 this.iterable = null;
1445 }
1446
1447 }
1448
1449 private static final class IteratorStream<T> extends Stream<T> {
1450
1451 private Iterator<? extends T> iterator;
1452
1453 IteratorStream(final Iterator<? extends T> iterator) {
1454 this.iterator = Preconditions.checkNotNull(iterator);
1455 }
1456
1457 @SuppressWarnings("unchecked")
1458 @Override
1459 protected Iterator<T> doIterator() {
1460 return (Iterator<T>) this.iterator;
1461 }
1462
1463 @Override
1464 protected void doToHandler(final Handler<? super T> handler) throws Throwable {
1465 while (this.iterator.hasNext()) {
1466 if (Thread.interrupted()) {
1467 return;
1468 }
1469 final T element = this.iterator.next();
1470 handler.handle(element);
1471 }
1472 handler.handle(null);
1473 }
1474
1475 @Override
1476 protected void doClose() throws Throwable {
1477 if (this.iterator instanceof Closeable) {
1478 ((Closeable) this.iterator).close();
1479 }
1480 this.iterator = null;
1481 }
1482
1483 }
1484
1485 private static final class IterationStream<T> extends Stream<T> {
1486
1487 private Iteration<? extends T, ?> iteration;
1488
1489 IterationStream(final Iteration<? extends T, ?> iteration) {
1490 this.iteration = Preconditions.checkNotNull(iteration);
1491 }
1492
1493 @Override
1494 protected Iterator<T> doIterator() {
1495 return new AbstractIterator<T>() {
1496
1497 @Override
1498 protected T advance() {
1499 try {
1500 if (IterationStream.this.iteration.hasNext()) {
1501 return IterationStream.this.iteration.next();
1502 } else {
1503 return null;
1504 }
1505 } catch (final Throwable ex) {
1506 throw Throwables.propagate(ex);
1507 }
1508 }
1509
1510 };
1511 }
1512
1513 @Override
1514 protected void doToHandler(final Handler<? super T> handler) throws Throwable {
1515 while (this.iteration.hasNext()) {
1516 if (Thread.interrupted()) {
1517 return;
1518 }
1519 final T element = this.iteration.next();
1520 handler.handle(element);
1521 }
1522 handler.handle(null);
1523 }
1524
1525 @Override
1526 protected void doClose() throws Throwable {
1527 if (this.iteration instanceof CloseableIteration<?, ?>) {
1528 ((CloseableIteration<? extends T, ?>) this.iteration).close();
1529 }
1530 this.iteration = null;
1531 }
1532
1533 }
1534
1535 private abstract static class DelegatingStream<I, O> extends Stream<O> {
1536
1537 final Stream<I> delegate;
1538
1539 DelegatingStream(final Stream<I> delegate) {
1540 super(delegate.state);
1541 this.delegate = delegate;
1542 }
1543
1544 @Override
1545 void toStringHelper(final StringBuilder builder) {
1546 super.toStringHelper(builder);
1547 builder.append(" (");
1548 this.delegate.toStringHelper(builder);
1549 builder.append(")");
1550 }
1551
1552 }
1553
1554 private static final class ConcatStream<I extends Iterable<? extends O>, O> extends
1555 DelegatingStream<I, O> {
1556
1557 ConcatStream(final Stream<I> delegate) {
1558 super(delegate);
1559 }
1560
1561 @Override
1562 protected Iterator<O> doIterator() throws Throwable {
1563 final Iterator<I> streamIterator = this.delegate.doIterator();
1564 final Iterator<O> elementIterator = new AbstractIterator<O>() {
1565
1566 private Stream<? extends O> stream;
1567
1568 private Iterator<? extends O> iterator;
1569
1570 @Override
1571 protected O advance() {
1572 while (this.iterator == null || !this.iterator.hasNext()) {
1573 if (this.stream != null) {
1574 this.stream.close();
1575 }
1576 if (!streamIterator.hasNext()) {
1577 return null;
1578 }
1579 this.stream = create(streamIterator.next());
1580 this.iterator = this.stream.iterator();
1581 }
1582 return this.iterator.next();
1583 }
1584
1585 @Override
1586 public void close() {
1587 if (this.stream != null) {
1588 this.stream.close();
1589 }
1590 }
1591
1592 };
1593 onClose(elementIterator);
1594 return elementIterator;
1595 }
1596
1597 @Override
1598 protected void doToHandler(final Handler<? super O> handler) throws Throwable {
1599 this.delegate.doToHandler(new Handler<I>() {
1600
1601 @Override
1602 public void handle(final I iterable) throws Throwable {
1603 if (iterable == null) {
1604 handler.handle(null);
1605 } else {
1606 final AtomicBoolean eof = new AtomicBoolean(false);
1607 create(iterable).toHandler(new Handler<O>() {
1608
1609 @Override
1610 public void handle(final O element) throws Throwable {
1611 if (element != null) {
1612 handler.handle(element);
1613 } else {
1614 eof.set(true);
1615 }
1616 }
1617
1618 });
1619 if (!eof.get()) {
1620 Thread.currentThread().interrupt();
1621 }
1622 }
1623 }
1624
1625 });
1626 }
1627
1628 }
1629
1630 private abstract static class ProcessingStream<I, O> extends DelegatingStream<I, O> {
1631
1632 final int parallelism;
1633
1634 ProcessingStream(final Stream<I> delegate, final int parallelism) {
1635 super(delegate);
1636 this.parallelism = parallelism;
1637 }
1638
1639 @Override
1640 protected final Iterator<O> doIterator() throws Throwable {
1641 if (this.parallelism <= 1) {
1642 return doIteratorSequential();
1643 } else {
1644 return doIteratorParallel();
1645 }
1646 }
1647
1648 private Iterator<O> doIteratorSequential() throws Throwable {
1649 final Iterator<I> iterator = this.delegate.doIterator();
1650 return new AbstractIterator<O>() {
1651
1652 @SuppressWarnings("unchecked")
1653 @Override
1654 protected O advance() {
1655 while (iterator.hasNext()) {
1656 final I element = iterator.next();
1657 final Object transformed = process(element);
1658 if (transformed == EOF) {
1659 return null;
1660 } else if (transformed != null) {
1661 return (O) transformed;
1662 }
1663 }
1664 return null;
1665 }
1666
1667 };
1668 }
1669
1670 private Iterator<O> doIteratorParallel() throws Throwable {
1671 final Iterator<I> iterator = this.delegate.doIterator();
1672 final List<Future<Object>> queue = Lists.newLinkedList();
1673 final Iterator<O> result = new AbstractIterator<O>() {
1674
1675 @SuppressWarnings("unchecked")
1676 @Override
1677 protected O advance() {
1678 while (true) {
1679 while (queue.size() < ProcessingStream.this.parallelism
1680 && iterator.hasNext()) {
1681 offer(queue, iterator.next());
1682 }
1683 if (queue.isEmpty()) {
1684 return null;
1685 }
1686 final Object output = take(queue);
1687 if (output == EOF) {
1688 return null;
1689 } else if (output != null) {
1690 return (O) output;
1691 }
1692 }
1693 }
1694
1695 @Override
1696 public void close() {
1697 for (final Future<Object> future : queue) {
1698 try {
1699 future.cancel(true);
1700 } catch (final Exception ex) {
1701
1702 }
1703 }
1704 }
1705
1706 };
1707 onClose(result);
1708 return result;
1709 }
1710
1711 @Override
1712 protected final void doToHandler(final Handler<? super O> handler) throws Throwable {
1713 if (this.parallelism <= 1) {
1714 doToHandlerSequential(handler);
1715 } else {
1716 doToHandlerParallel(handler);
1717 }
1718 }
1719
1720 private void doToHandlerSequential(final Handler<? super O> handler) throws Throwable {
1721 this.delegate.doToHandler(new Handler<I>() {
1722
1723 private boolean done = false;
1724
1725 @SuppressWarnings("unchecked")
1726 @Override
1727 public void handle(final I element) throws Throwable {
1728 if (!this.done) {
1729 if (element == null) {
1730 handler.handle(null);
1731 this.done = true;
1732 } else {
1733 final Object transformed = process(element);
1734 if (transformed == EOF) {
1735 handler.handle(null);
1736 Thread.currentThread().interrupt();
1737 this.done = true;
1738 } else if (transformed != null) {
1739 handler.handle((O) transformed);
1740 }
1741 }
1742 }
1743 }
1744
1745 });
1746 }
1747
1748 private void doToHandlerParallel(final Handler<? super O> handler) throws Throwable {
1749 final List<Future<Object>> queue = Lists.newLinkedList();
1750 try {
1751 this.delegate.doToHandler(new Handler<I>() {
1752
1753 private boolean done = false;
1754
1755 @SuppressWarnings("unchecked")
1756 @Override
1757 public void handle(final I element) throws Throwable {
1758 if (!this.done) {
1759 if (element == null) {
1760 while (!this.done && !queue.isEmpty()) {
1761 final Object output = take(queue);
1762 if (output == EOF) {
1763 break;
1764 } else if (output != null) {
1765 handler.handle((O) output);
1766 }
1767 }
1768 handler.handle(null);
1769 this.done = true;
1770 } else {
1771 if (queue.size() == ProcessingStream.this.parallelism) {
1772 final Object output = take(queue);
1773 if (output == EOF) {
1774 handler.handle(null);
1775 Thread.currentThread().interrupt();
1776 this.done = true;
1777 } else if (output != null) {
1778 handler.handle((O) output);
1779 }
1780 }
1781 if (!this.done) {
1782 offer(queue, element);
1783 }
1784 }
1785 }
1786 }
1787
1788 });
1789 } finally {
1790 for (final Future<Object> future : queue) {
1791 try {
1792 future.cancel(true);
1793 } catch (final Exception ex) {
1794
1795 }
1796 }
1797 }
1798 }
1799
1800 private Object take(final List<Future<Object>> queue) {
1801 return Futures.get(queue.remove(0), RuntimeException.class);
1802 }
1803
1804 private void offer(final List<Future<Object>> queue, final I element) {
1805 queue.add(Data.getExecutor().submit(new Callable<Object>() {
1806
1807 @Override
1808 public Object call() {
1809 return process(element);
1810 }
1811
1812 }));
1813 }
1814
1815 protected abstract Object process(I element);
1816
1817 }
1818
1819 private static final class FilterStream<T> extends ProcessingStream<T, T> {
1820
1821 private final Predicate<? super T> predicate;
1822
1823 FilterStream(final Stream<T> delegate, final int parallelism,
1824 final Predicate<? super T> predicate) {
1825 super(delegate, parallelism);
1826 this.predicate = Preconditions.checkNotNull(predicate);
1827 }
1828
1829 @Override
1830 protected Object process(final T element) {
1831 return this.predicate.apply(element) ? element : null;
1832 }
1833
1834 @Override
1835 protected String doToString() {
1836 return this.predicate + ", " + this.parallelism;
1837 }
1838
1839 }
1840
1841 private static final class TransformElementStream<I, O> extends ProcessingStream<I, O> {
1842
1843 private final Function<? super I, ? extends O> function;
1844
1845 TransformElementStream(final Stream<I> delegate, final int parallelism,
1846 final Function<? super I, ? extends O> function) {
1847 super(delegate, parallelism);
1848 this.function = Preconditions.checkNotNull(function);
1849 }
1850
1851 @Override
1852 protected Object process(final I element) {
1853 return this.function.apply(element);
1854 }
1855
1856 @Override
1857 protected String doToString() {
1858 return this.function + ", " + this.parallelism;
1859 }
1860
1861 }
1862
1863 private static final class TransformSequenceStream<I, O> extends DelegatingStream<I, O> {
1864
1865 private final Function<Iterator<I>, Iterator<O>> iteratorFunction;
1866
1867 private final Function<Handler<O>, Handler<I>> handlerFunction;
1868
1869 TransformSequenceStream(final Stream<I> delegate,
1870 @Nullable final Function<Iterator<I>, Iterator<O>> iteratorFunction,
1871 @Nullable final Function<Handler<O>, Handler<I>> handlerFunction) {
1872 super(delegate);
1873 Preconditions.checkArgument(iteratorFunction != null || handlerFunction != null,
1874 "At least one function must be supplied");
1875 this.iteratorFunction = iteratorFunction;
1876 this.handlerFunction = handlerFunction;
1877 }
1878
1879 @Override
1880 protected Iterator<O> doIterator() throws Throwable {
1881 if (this.iteratorFunction != null) {
1882 return this.iteratorFunction.apply(this.delegate.doIterator());
1883 } else {
1884 return super.doIterator();
1885 }
1886 }
1887
1888 @SuppressWarnings("unchecked")
1889 @Override
1890 protected void doToHandler(final Handler<? super O> handler) throws Throwable {
1891 if (this.handlerFunction != null) {
1892 this.delegate.doToHandler(this.handlerFunction.apply((Handler<O>) handler));
1893 } else {
1894 super.doToHandler(handler);
1895 }
1896 }
1897
1898 @Override
1899 protected String doToString() {
1900 return this.iteratorFunction == null ? this.handlerFunction.toString()
1901 : this.handlerFunction == null ? this.iteratorFunction.toString()
1902 : this.iteratorFunction.toString() + ", "
1903 + this.handlerFunction.toString();
1904 }
1905
1906 }
1907
1908 private static final class TransformPathStream<I, O> extends ProcessingStream<I, List<O>> {
1909
1910 private final Class<O> type;
1911
1912 private final boolean lenient;
1913
1914 private final Object[] path;
1915
1916 TransformPathStream(final Stream<I> delegate, final Class<O> type, final boolean lenient,
1917 final Object[] path) {
1918 super(delegate, 0);
1919 this.type = Preconditions.checkNotNull(type);
1920 this.lenient = lenient;
1921 this.path = Preconditions.checkNotNull(path);
1922 }
1923
1924 @Override
1925 protected Object process(final I element) {
1926 final List<O> elements = Lists.newArrayList();
1927 path(element, 0, elements);
1928 return elements;
1929 }
1930
1931 @SuppressWarnings({ "rawtypes", "unchecked" })
1932 private void path(final Object object, final int index, final List<O> result) {
1933 if (object == null) {
1934 return;
1935 } else if (object instanceof Iterable && !(object instanceof BindingSet)) {
1936 for (final Object element : (Iterable) object) {
1937 path(element, index, result);
1938 }
1939 } else if (object instanceof Iterator) {
1940 final Iterator<?> iterator = (Iterator<?>) object;
1941 while (iterator.hasNext()) {
1942 path(iterator.next(), index, result);
1943 }
1944 } else if (object.getClass().isArray()) {
1945 final int length = Array.getLength(object);
1946 for (int i = 0; i < length; ++i) {
1947 path(Array.get(object, i), index, result);
1948 }
1949 } else if (index == this.path.length) {
1950 final O element = this.lenient ? Data.convert(object, this.type, null) :
1951 Data.convert(object, this.type);
1952 if (element != null) {
1953 result.add(element);
1954 }
1955 } else {
1956 final Object key = this.path[index];
1957 if (object instanceof Record) {
1958 if (key instanceof URI) {
1959 path(((Record) object).get((URI) key), index + 1, result);
1960 }
1961 } else if (object instanceof BindingSet) {
1962 if (key instanceof String) {
1963 path(((BindingSet) object).getValue((String) key), index + 1, result);
1964 }
1965 } else if (object instanceof Map) {
1966 path(((Map<Object, Object>) object).get(key), index + 1, result);
1967 } else if (object instanceof Multimap) {
1968 path(((Multimap<Object, Object>) object).get(this.path), index + 1, result);
1969 }
1970 }
1971 }
1972
1973 }
1974
1975 private static final class DistinctStream<T> extends ProcessingStream<T, T> {
1976
1977 private final Set<T> seen;
1978
1979 DistinctStream(final Stream<T> delegate) {
1980 super(delegate, 0);
1981 this.seen = Sets.newHashSet();
1982 }
1983
1984 @Override
1985 protected Object process(final T element) {
1986 return this.seen.add(element) ? element : null;
1987 }
1988
1989 }
1990
1991 private static final class SliceStream<T> extends ProcessingStream<T, T> {
1992
1993 private final long startIndex;
1994
1995 private final long endIndex;
1996
1997 private long index;
1998
1999 SliceStream(final Stream<T> delegate, final long offset, final long limit) {
2000 super(delegate, 0);
2001 Preconditions.checkArgument(offset >= 0, "Negative offset: {}", limit);
2002 Preconditions.checkArgument(limit >= 0, "Negative limit: {}", limit);
2003 this.startIndex = offset;
2004 this.endIndex = offset + limit;
2005 this.index = 0;
2006 }
2007
2008 @Override
2009 protected Object process(final T element) {
2010 Object result = null;
2011 if (this.index >= this.endIndex) {
2012 result = EOF;
2013 } else if (this.index >= this.startIndex) {
2014 result = element;
2015 }
2016 ++this.index;
2017 return result;
2018 }
2019
2020 }
2021
2022 private static final class ChunkStream<T> extends DelegatingStream<T, List<T>> {
2023
2024 private final int chunkSize;
2025
2026 ChunkStream(final Stream<T> delegate, final int chunkSize) {
2027 super(delegate);
2028 Preconditions.checkArgument(chunkSize > 0, "Invalid chunk size: %d", chunkSize);
2029 this.chunkSize = chunkSize;
2030 }
2031
2032 @Override
2033 protected Iterator<List<T>> doIterator() throws Throwable {
2034 final Iterator<T> iterator = this.delegate.doIterator();
2035 return new AbstractIterator<List<T>>() {
2036
2037 private final Object[] chunk = new Object[ChunkStream.this.chunkSize];
2038
2039 @SuppressWarnings("unchecked")
2040 @Override
2041 protected List<T> advance() {
2042 int index = 0;
2043 for (; index < ChunkStream.this.chunkSize && iterator.hasNext(); ++index) {
2044 this.chunk[index] = iterator.next();
2045 }
2046 if (index == 0) {
2047 return null;
2048 } else if (index == ChunkStream.this.chunkSize) {
2049 return (List<T>) ImmutableList.copyOf(this.chunk);
2050 } else {
2051 return (List<T>) ImmutableList.copyOf(Arrays.asList(this.chunk).subList(0,
2052 index));
2053 }
2054 }
2055
2056 };
2057 }
2058
2059 @Override
2060 protected void doToHandler(final Handler<? super List<T>> handler) throws Throwable {
2061 this.delegate.doToHandler(new Handler<T>() {
2062
2063 private final List<T> chunk = Lists.newArrayList();
2064
2065 @Override
2066 public void handle(final T element) throws Throwable {
2067 if (element == null) {
2068 if (!this.chunk.isEmpty()) {
2069 handler.handle(ImmutableList.copyOf(this.chunk));
2070 }
2071 handler.handle(null);
2072 } else {
2073 this.chunk.add(element);
2074 if (this.chunk.size() == ChunkStream.this.chunkSize) {
2075 handler.handle(ImmutableList.copyOf(this.chunk));
2076 this.chunk.clear();
2077 }
2078 }
2079 }
2080
2081 });
2082 }
2083
2084 @Override
2085 protected String doToString() {
2086 return Integer.toString(this.chunkSize);
2087 }
2088
2089 }
2090
2091 private static final class TrackStream<T> extends DelegatingStream<T, T> {
2092
2093 private final AtomicLong counter;
2094
2095 private final AtomicBoolean eof;
2096
2097 public TrackStream(final Stream<T> delegate, @Nullable final AtomicLong counter,
2098 @Nullable final AtomicBoolean eof) {
2099 super(delegate);
2100 this.counter = counter != null ? counter : new AtomicLong();
2101 this.eof = eof != null ? eof : new AtomicBoolean();
2102 this.counter.set(0L);
2103 this.eof.set(false);
2104 }
2105
2106 @Override
2107 protected Iterator<T> doIterator() throws Throwable {
2108 final Iterator<T> iterator = this.delegate.doIterator();
2109 return new UnmodifiableIterator<T>() {
2110
2111 private long count = 0L;
2112
2113 @Override
2114 public boolean hasNext() {
2115 final boolean result = iterator.hasNext();
2116 TrackStream.this.eof.set(result);
2117 return result;
2118 }
2119
2120 @Override
2121 public T next() {
2122 final T next = iterator.next();
2123 TrackStream.this.counter.set(++this.count);
2124 return next;
2125 }
2126
2127 };
2128 }
2129
2130 @Override
2131 protected void doToHandler(final Handler<? super T> handler) throws Throwable {
2132 this.delegate.doToHandler(new Handler<T>() {
2133
2134 private long count = 0L;
2135
2136 @Override
2137 public void handle(final T element) throws Throwable {
2138 if (element != null) {
2139 TrackStream.this.counter.set(++this.count);
2140 handler.handle(element);
2141 } else {
2142 TrackStream.this.eof.set(true);
2143 handler.handle(null);
2144 }
2145 }
2146
2147 });
2148 }
2149
2150 }
2151
2152 private static final class PathComparator implements Comparator<Object> {
2153
2154 private final Class<? extends Comparable<?>> type;
2155
2156 private final Object[] path;
2157
2158 private final boolean lenient;
2159
2160 PathComparator(final Class<? extends Comparable<?>> type, final boolean lenient,
2161 final Object... path) {
2162 this.type = Preconditions.checkNotNull(type);
2163 this.lenient = lenient;
2164 this.path = path.clone();
2165 }
2166
2167 @SuppressWarnings({ "rawtypes", "unchecked" })
2168 @Override
2169 public int compare(final Object first, final Object second) {
2170 final Comparable firstKey = path(first, 0);
2171 final Comparable secondKey = path(second, 0);
2172 if (firstKey == null) {
2173 return secondKey == null ? 0 : 1;
2174 } else {
2175 return secondKey == null ? -1 : firstKey.compareTo(secondKey);
2176 }
2177 }
2178
2179 @SuppressWarnings({ "unchecked" })
2180 private Comparable<?> path(final Object object, final int index) {
2181 if (object == null) {
2182 return null;
2183 } else if (object instanceof Iterable && !(object instanceof BindingSet)) {
2184 final Iterator<?> iterator = ((Iterable<?>) object).iterator();
2185 return iterator.hasNext() ? path(iterator.next(), index) : null;
2186 } else if (object instanceof Iterator) {
2187 final Iterator<?> iterator = (Iterator<?>) object;
2188 return iterator.hasNext() ? path(iterator.next(), index) : null;
2189 } else if (object.getClass().isArray()) {
2190 final int length = Array.getLength(object);
2191 return length > 0 ? path(Array.get(object, 0), index) : null;
2192 } else if (index == this.path.length) {
2193 return this.lenient ? Data.convert(object, this.type, null) :
2194 Data.convert(object, this.type);
2195 } else {
2196 final Object key = this.path[index];
2197 if (object instanceof Record) {
2198 return key instanceof URI ? path(((Record) object).get((URI) key), index + 1)
2199 : null;
2200 } else if (object instanceof BindingSet) {
2201 return key instanceof String ? path(
2202 ((BindingSet) object).getValue((String) key), index + 1) : null;
2203 } else if (object instanceof Map) {
2204 return path(((Map<Object, Object>) object).get(key), index + 1);
2205 } else if (object instanceof Multimap) {
2206 return path(((Multimap<Object, Object>) object).get(this.path), index + 1);
2207 }
2208 return null;
2209 }
2210 }
2211
2212 }
2213
2214 }