1 package eu.fbk.knowledgestore.data;
2
3 import java.io.Serializable;
4 import java.util.Arrays;
5 import java.util.Collection;
6 import java.util.Collections;
7 import java.util.Iterator;
8 import java.util.List;
9 import java.util.Map;
10 import java.util.Set;
11 import java.util.UUID;
12
13 import javax.annotation.Nullable;
14
15 import com.google.common.base.Charsets;
16 import com.google.common.base.Function;
17 import com.google.common.base.Objects;
18 import com.google.common.base.Preconditions;
19 import com.google.common.base.Strings;
20 import com.google.common.base.Throwables;
21 import com.google.common.collect.ImmutableList;
22 import com.google.common.collect.ImmutableSet;
23 import com.google.common.collect.Iterables;
24 import com.google.common.collect.Lists;
25 import com.google.common.collect.Maps;
26 import com.google.common.collect.Ordering;
27 import com.google.common.collect.Sets;
28 import com.google.common.hash.Hasher;
29 import com.google.common.hash.Hashing;
30
31 import org.openrdf.model.BNode;
32 import org.openrdf.model.Literal;
33 import org.openrdf.model.Resource;
34 import org.openrdf.model.Statement;
35 import org.openrdf.model.URI;
36 import org.openrdf.model.Value;
37 import org.openrdf.model.vocabulary.RDF;
38 import org.openrdf.rio.RDFHandlerException;
39
40 import eu.fbk.knowledgestore.vocabulary.KS;
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123 public final class Record implements Serializable, Comparable<Record> {
124
125 private static final long serialVersionUID = 1L;
126
127 private static final int LENGTH_INCREMENT = 8;
128
129 private static final int OFFSET_OF_ID = 0;
130
131 private static final int OFFSET_OF_SHARED = 1;
132
133 private static final int OFFSET_OF_PROPERTIES = 2;
134
135 private static final ThreadLocal<Integer> INDENT_LEVEL = new ThreadLocal<Integer>();
136
137 private static final String INDENT_STRING = " ";
138
139 private Object[] state;
140
141 private Record(final URI id) {
142 this.state = new Object[OFFSET_OF_PROPERTIES + LENGTH_INCREMENT];
143 this.state[OFFSET_OF_ID] = id;
144 this.state[OFFSET_OF_SHARED] = Boolean.FALSE;
145 }
146
147 private Record(final Record record, final boolean deepClone) {
148 synchronized (record) {
149 Object[] state = record.state;
150 if (deepClone) {
151 state = cloneRecursively(state);
152 }
153 if (state != record.state) {
154 state[OFFSET_OF_SHARED] = Boolean.FALSE;
155 } else if (state[OFFSET_OF_SHARED] == Boolean.FALSE) {
156 state[OFFSET_OF_SHARED] = Boolean.TRUE;
157 }
158 this.state = state;
159 }
160 }
161
162 private static Object[] cloneRecursively(final Object[] array) {
163 Object[] result = array;
164 for (int i = 0; i < array.length; ++i) {
165 final Object element = array[i];
166 Object newElement = element;
167 if (element instanceof Record) {
168 newElement = new Record((Record) element, true);
169 } else if (element instanceof Object[]) {
170 newElement = cloneRecursively((Object[]) element);
171 }
172 if (newElement != element) {
173 if (result == array) {
174 result = array.clone();
175 }
176 result[i] = newElement;
177 }
178 }
179 return result;
180 }
181
182 private static Object encode(final Object object) {
183
184 return object;
185 }
186
187 private static <T> T decode(final Object object, final Class<T> clazz) {
188 return Data.convert(object, clazz);
189 }
190
191 @Nullable
192 private URI doGetID() {
193 return (URI) this.state[OFFSET_OF_ID];
194 }
195
196 private void doSetID(@Nullable final URI id) {
197 if (!Objects.equal(id, this.state[OFFSET_OF_ID])) {
198 if ((Boolean) this.state[OFFSET_OF_SHARED]) {
199 this.state = this.state.clone();
200 }
201 this.state[OFFSET_OF_ID] = id;
202 }
203 }
204
205 private List<URI> doGetProperties() {
206 final int capacity = this.state.length / 2;
207 final List<URI> properties = Lists.newArrayListWithCapacity(capacity);
208 for (int i = OFFSET_OF_PROPERTIES; i < this.state.length; i += 2) {
209 final URI property = (URI) this.state[i];
210 if (property != null) {
211 properties.add(property);
212 }
213 }
214 return properties;
215 }
216
217 private int doCount(final URI property) {
218 final int length = this.state.length;
219 for (int i = OFFSET_OF_PROPERTIES; i < length; i += 2) {
220 if (property.equals(this.state[i])) {
221 final Object object = this.state[i + 1];
222 if (object instanceof Object[]) {
223 return ((Object[]) object).length;
224 } else {
225 return 1;
226 }
227 }
228 }
229 return 0;
230 }
231
232 @Nullable
233 private <T> Object doGet(final URI property, final Class<T> clazz) {
234 final int length = this.state.length;
235 for (int i = OFFSET_OF_PROPERTIES; i < length; i += 2) {
236 if (property.equals(this.state[i])) {
237 final Object object = this.state[i + 1];
238 if (object instanceof Object[]) {
239 final Object[] array = (Object[]) object;
240 final List<T> list = Lists.newArrayListWithCapacity(array.length);
241 for (final Object element : array) {
242 list.add(decode(element, clazz));
243 }
244 return list;
245 } else {
246 return decode(object, clazz);
247 }
248 }
249 }
250 return null;
251 }
252
253 private void doSet(final URI property, final Collection<Object> nodes) {
254 if ((Boolean) this.state[OFFSET_OF_SHARED]) {
255 this.state = this.state.clone();
256 this.state[OFFSET_OF_SHARED] = Boolean.FALSE;
257 }
258 final int length = this.state.length;
259 if (nodes.isEmpty()) {
260 for (int i = OFFSET_OF_PROPERTIES; i < length; i += 2) {
261 if (property.equals(this.state[i])) {
262 this.state[i] = null;
263 this.state[i + 1] = null;
264 return;
265 }
266 }
267 return;
268 }
269 final Object value;
270 final int size = nodes.size();
271 if (size == 1) {
272 value = encode(Iterables.get(nodes, 0));
273 } else {
274 final Object[] array = new Object[size];
275 int index = 0;
276 for (final Object node : nodes) {
277 array[index++] = encode(node);
278 }
279 value = array;
280 }
281 int nullIndex = -1;
282 for (int i = OFFSET_OF_PROPERTIES; i < length; i += 2) {
283 if (this.state[i] == null) {
284 if (nullIndex < 0) {
285 nullIndex = i;
286 }
287 } else if (property.equals(this.state[i])) {
288 this.state[i + 1] = value;
289 return;
290 }
291 }
292 if (nullIndex >= 0) {
293 this.state[nullIndex] = property;
294 this.state[nullIndex + 1] = value;
295 } else {
296 final Object[] oldState = this.state;
297 this.state = new Object[length + LENGTH_INCREMENT];
298 System.arraycopy(oldState, 0, this.state, 0, length);
299 this.state[length] = property;
300 this.state[length + 1] = value;
301 }
302 }
303
304
305
306
307
308
309 public static Record create() {
310 return new Record(null);
311 }
312
313
314
315
316
317
318
319
320
321
322
323 public static Record create(final URI id, final URI... types) {
324 final Record record = new Record(id);
325 if (types.length > 0) {
326 record.set(RDF.TYPE, types);
327 }
328 return record;
329 }
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344 public static Record create(final Record record, final boolean deepClone) {
345 return new Record(record, deepClone);
346 }
347
348
349
350
351
352
353 @Nullable
354 public synchronized URI getID() {
355 return doGetID();
356 }
357
358
359
360
361
362
363
364
365 public synchronized Record setID(@Nullable final URI id) {
366 doSetID(id);
367 return this;
368 }
369
370
371
372
373
374
375
376
377
378 @Nullable
379 public synchronized URI getSystemType() throws IllegalArgumentException {
380 URI result = null;
381 for (final URI type : get(RDF.TYPE, URI.class)) {
382 if (type.getNamespace().equals(KS.NAMESPACE)) {
383 Preconditions.checkArgument(result == null, "Multiple system types: " + result
384 + ", " + type);
385 result = type;
386 }
387 }
388 return result;
389 }
390
391
392
393
394
395
396
397 public synchronized List<URI> getProperties() {
398 return doGetProperties();
399 }
400
401
402
403
404
405
406
407
408 public synchronized boolean isNull(final URI property) {
409 return doCount(property) == 0;
410 }
411
412
413
414
415
416
417
418
419 public synchronized boolean isUnique(final URI property) {
420 return doCount(property) <= 1;
421 }
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438 public boolean isTrue(final URI property) throws IllegalStateException,
439 IllegalArgumentException {
440 final Boolean value = getUnique(property, Boolean.class);
441 return value != null && value.booleanValue();
442 }
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459 public boolean isFalse(final URI property) throws IllegalStateException,
460 IllegalArgumentException {
461 final Boolean value = getUnique(property, Boolean.class);
462 return value != null && !value.booleanValue();
463 }
464
465
466
467
468
469
470
471
472
473 public synchronized int count(final URI property) {
474 return doCount(property);
475 }
476
477
478
479
480
481
482
483
484
485
486
487
488
489 @Nullable
490 public Object getUnique(final URI property) throws IllegalStateException {
491 return getUnique(property, Object.class);
492 }
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514 @SuppressWarnings("unchecked")
515 @Nullable
516 public <T> T getUnique(final URI property, final Class<T> valueClass)
517 throws IllegalStateException, IllegalArgumentException {
518 final Object result;
519 synchronized (this) {
520 result = doGet(property, valueClass);
521 }
522 if (result == null) {
523 return null;
524 } else if (result instanceof List<?>) {
525 final List<T> list = (List<T>) result;
526 final StringBuilder builder = new StringBuilder("Expected one value for property ")
527 .append(property).append(", found ").append(list.size()).append(" values: ");
528 for (int i = 0; i < Math.min(3, list.size()); ++i) {
529 builder.append(i > 0 ? ", " : "").append(list.get(i));
530 }
531 builder.append(list.size() > 3 ? ", ..." : "");
532 throw new IllegalStateException(builder.toString());
533 } else {
534 return (T) result;
535 }
536 }
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554 @Nullable
555 public <T> T getUnique(final URI property, final Class<T> valueClass,
556 @Nullable final T defaultValue) {
557 try {
558 final T value = getUnique(property, valueClass);
559 return value == null ? defaultValue : value;
560 } catch (final IllegalStateException ex) {
561 return defaultValue;
562 } catch (final IllegalArgumentException ex) {
563 return defaultValue;
564 }
565 }
566
567
568
569
570
571
572
573
574
575 public List<Object> get(final URI property) {
576 return get(property, Object.class);
577 }
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596 @SuppressWarnings("unchecked")
597 public <T> List<T> get(final URI property, final Class<T> valueClass)
598 throws IllegalArgumentException {
599 final Object result;
600 synchronized (this) {
601 result = doGet(property, valueClass);
602 }
603 if (result == null) {
604 return ImmutableList.of();
605 } else if (result instanceof List<?>) {
606 return (List<T>) result;
607 } else {
608 return ImmutableList.of((T) result);
609 }
610 }
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628 public <T> List<T> get(final URI property, final Class<T> valueClass,
629 final List<T> defaultValue) {
630 try {
631 final List<T> values = get(property, valueClass);
632 return values.isEmpty() ? defaultValue : values;
633 } catch (final IllegalArgumentException ex) {
634 return defaultValue;
635 }
636 }
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655 public Record set(final URI property, @Nullable final Object first, final Object... other)
656 throws IllegalArgumentException {
657 Preconditions.checkNotNull(property);
658 final Set<Object> values = Sets.<Object>newHashSet();
659 Data.normalize(first, values);
660 Data.normalize(other, values);
661 synchronized (this) {
662 doSet(property, values);
663 }
664 return this;
665 }
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684 public Record add(final URI property, @Nullable final Object first, final Object... other)
685 throws IllegalArgumentException {
686 Preconditions.checkNotNull(property);
687 final List<Object> added = Lists.newArrayList();
688 Data.normalize(first, added);
689 Data.normalize(other, added);
690 if (!Iterables.isEmpty(added)) {
691 synchronized (this) {
692 final Set<Object> values = Sets.newHashSet(get(property));
693 final boolean changed = values.addAll(added);
694 if (changed) {
695 doSet(property, values);
696 }
697 }
698 }
699 return this;
700 }
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719 public Record remove(final URI property, @Nullable final Object first, final Object... other)
720 throws IllegalArgumentException {
721 Preconditions.checkNotNull(property);
722 final List<Object> removed = Lists.newArrayList();
723 Data.normalize(first, removed);
724 Data.normalize(other, removed);
725 if (!removed.isEmpty()) {
726 synchronized (this) {
727 final Set<Object> values = Sets.newHashSet(get(property));
728 final boolean changed = values.removeAll(removed);
729 if (changed) {
730 doSet(property, values);
731 }
732 }
733 }
734 return this;
735 }
736
737
738
739
740
741
742
743
744
745
746 public synchronized Record retain(final URI... properties) {
747 for (final URI property : doGetProperties()) {
748 boolean retain = false;
749 for (int i = 0; i < properties.length; ++i) {
750 if (property.equals(properties[i])) {
751 retain = true;
752 break;
753 }
754 }
755 if (!retain) {
756 doSet(property, ImmutableSet.<Object>of());
757 }
758 }
759 return this;
760 }
761
762
763
764
765
766
767
768
769
770
771 public synchronized Record clear(final URI... properties) {
772 final List<URI> propertiesToClear;
773 if (properties == null || properties.length == 0) {
774 propertiesToClear = doGetProperties();
775 } else {
776 propertiesToClear = Arrays.asList(properties);
777 }
778 for (final URI property : propertiesToClear) {
779 doSet(property, ImmutableSet.<Object>of());
780 }
781 return this;
782 }
783
784
785
786
787 @Override
788 public int compareTo(final Record other) {
789 final URI thisID = getID();
790 final URI otherID = other.getID();
791 if (thisID == null) {
792 return otherID == null ? 0 : -1;
793 } else {
794 return otherID == null ? 1 : thisID.stringValue().compareTo(otherID.stringValue());
795 }
796 }
797
798
799
800
801 @Override
802 public boolean equals(final Object object) {
803 if (object == this) {
804 return true;
805 }
806 if (!(object instanceof Record)) {
807 return false;
808 }
809 final Record other = (Record) object;
810 return Objects.equal(getID(), other.getID());
811 }
812
813
814
815
816 @Override
817 public int hashCode() {
818 return Objects.hashCode(getID());
819 }
820
821
822
823
824
825
826
827
828
829
830
831
832 public synchronized String hash(final URI... properties) {
833 final List<URI> propertiesToHash;
834 if (properties == null || properties.length == 0) {
835 propertiesToHash = doGetProperties();
836 } else {
837 propertiesToHash = Arrays.asList(properties);
838 }
839 final Hasher hasher = Hashing.md5().newHasher();
840 for (final URI property : propertiesToHash) {
841 final Object object = doGet(property, Object.class);
842 @SuppressWarnings("unchecked")
843 final Iterable<Object> nodes = object instanceof List<?> ? (List<Object>) object
844 : ImmutableList.of(object);
845 for (final Object node : ((Ordering<Object>) Data.getTotalComparator())
846 .sortedCopy(nodes)) {
847
848 hasher.putString(Data.toString(node, null, true), Charsets.UTF_16LE);
849 }
850 hasher.putByte((byte) 0);
851 }
852 final StringBuilder builder = new StringBuilder(16);
853 final byte[] bytes = hasher.hash().asBytes();
854 int max = 52;
855 for (int i = 0; i < bytes.length; ++i) {
856 final int n = (bytes[i] & 0x7F) % max;
857 if (n < 26) {
858 builder.append((char) (65 + n));
859 } else if (n < 52) {
860 builder.append((char) (71 + n));
861 } else {
862 builder.append((char) (n - 4));
863 }
864 max = 62;
865 }
866 return builder.toString();
867 }
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882 public synchronized String toString(@Nullable final Map<String, String> namespaces,
883 final boolean includeProperties) {
884 final URI id = getID();
885 final String base = "Record " + (id == null ? "<no id>" : Data.toString(id, namespaces));
886 if (!includeProperties) {
887 return base;
888 }
889 final Integer oldIndent = INDENT_LEVEL.get();
890 try {
891 final int indent = oldIndent == null ? 1 : oldIndent + 1;
892 INDENT_LEVEL.set(indent + 1);
893 final StringBuilder builder = new StringBuilder(base).append(" {");
894 String propertySeparator = "\n";
895 final Ordering<Object> ordering = Ordering.from(Data.getTotalComparator());
896 for (final URI property : ordering.sortedCopy(doGetProperties())) {
897 builder.append(propertySeparator).append(Strings.repeat(INDENT_STRING, indent));
898 builder.append(Data.toString(property, namespaces));
899 builder.append(" = ");
900 final List<Object> values = ordering.sortedCopy(get(property));
901 String valueSeparator = values.size() == 1 ? "" : "\n"
902 + Strings.repeat(INDENT_STRING, indent + 1);
903 for (final Object value : values) {
904 builder.append(valueSeparator).append(Data.toString(value, namespaces, true));
905 valueSeparator = ",\n" + Strings.repeat(INDENT_STRING, indent + 1);
906 }
907 propertySeparator = ";\n";
908 }
909 builder.append(" }");
910 return builder.toString();
911 } finally {
912 INDENT_LEVEL.set(oldIndent);
913 }
914 }
915
916
917
918
919 @Override
920 public String toString() {
921 return toString(null, false);
922 }
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939 @SuppressWarnings("unchecked")
940 public static Stream<Statement> encode(final Stream<? extends Record> stream,
941 @Nullable final Iterable<? extends URI> types) {
942 Preconditions.checkNotNull(stream);
943 if (types != null) {
944 stream.setProperty("types", types);
945 }
946 final Stream<Record> records = (Stream<Record>) stream;
947 return records.transform(null, new Function<Handler<Statement>, Handler<Record>>() {
948
949 @Override
950 public Handler<Record> apply(final Handler<Statement> handler) {
951 final Iterable<? extends URI> types = stream.getProperty("types", Iterable.class);
952 return new Encoder(handler, types);
953 }
954
955 });
956 }
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980 public static Stream<Record> decode(final Stream<Statement> stream,
981 @Nullable final Iterable<? extends URI> types, @Nullable final Boolean chunked) {
982 Preconditions.checkNotNull(stream);
983 if (types != null) {
984 stream.setProperty("types", types);
985 }
986 if (chunked != null) {
987 stream.setProperty("chunked", chunked);
988 }
989 return stream.transform(null, new Function<Handler<Record>, Handler<Statement>>() {
990
991 @SuppressWarnings("unchecked")
992 @Override
993 public Handler<Statement> apply(final Handler<Record> handler) {
994 final Iterable<? extends URI> types = stream.getProperty("types", Iterable.class);
995 final Boolean chunked = stream.getProperty("chunked", Boolean.class);
996 return new Decoder(handler, types, chunked);
997 }
998
999 });
1000 }
1001
1002 private static class Encoder implements Handler<Record> {
1003
1004 private final Handler<? super Statement> handler;
1005
1006 private final Set<URI> types;
1007
1008 Encoder(final Handler<? super Statement> handler, final Iterable<? extends URI> types) {
1009 this.handler = Preconditions.checkNotNull(handler);
1010 this.types = ImmutableSet.copyOf(types);
1011 }
1012
1013 @Override
1014 public void handle(final Record record) throws Throwable {
1015 if (record != null) {
1016 emit(record, getID(record), true);
1017 } else {
1018 this.handler.handle(null);
1019 }
1020 }
1021
1022 private void emit(final Record record, final URI subject, final boolean addType)
1023 throws Throwable {
1024
1025 if (addType) {
1026 for (final URI type : this.types) {
1027 emit(subject, RDF.TYPE, type);
1028 }
1029 }
1030
1031 final List<URI> properties = record.getProperties();
1032 final List<Record> subRecords = Lists.newArrayList();
1033
1034 for (final URI property : properties) {
1035 final List<Object> values = record.get(property);
1036 for (final Object value : values) {
1037 if (value instanceof Value) {
1038 final Value v = (Value) value;
1039 if (!addType || !property.equals(RDF.TYPE) || !this.types.contains(v)) {
1040 emit(subject, property, v);
1041 }
1042 } else if (value instanceof Record) {
1043 final Record rv = (Record) value;
1044 emit(subject, property, getID(rv));
1045 subRecords.add(rv);
1046 } else if (value instanceof Statement) {
1047 final Statement s = (Statement) value;
1048 final URI id = hash(s);
1049 emit(subject, property, id);
1050 emit(id, RDF.SUBJECT, s.getSubject());
1051 emit(id, RDF.PREDICATE, s.getPredicate());
1052 emit(id, RDF.OBJECT, s.getObject());
1053 } else {
1054 throw new Error("Unexpected type for value: " + value);
1055 }
1056 }
1057 }
1058
1059 for (final Record subRecord : subRecords) {
1060 emit(subRecord, getID(subRecord), false);
1061 }
1062 }
1063
1064 private void emit(final Resource s, final URI p, final Value o) throws Throwable {
1065 this.handler.handle(Data.getValueFactory().createStatement(s, p, o));
1066 }
1067
1068 private URI hash(final Statement statement) {
1069 return Data.getValueFactory().createURI("triples:" + Data.hash(statement.toString()));
1070 }
1071
1072 private URI getID(final Record record) {
1073 final URI id = record.getID();
1074 if (id == null) {
1075 return Data.getValueFactory().createURI("bnode:" + record.hash());
1076 }
1077 return id;
1078 }
1079
1080 }
1081
1082 private static class Decoder implements Handler<Statement> {
1083
1084 private final Handler<? super Record> handler;
1085
1086 private final Set<URI> types;
1087
1088 private final boolean chunked;
1089
1090 private final UUID uuid;
1091
1092 private final Map<URI, Node> nodes;
1093
1094 private final List<Node> roots;
1095
1096 private Node current;
1097
1098 Decoder(final Handler<? super Record> handler, final Iterable<? extends URI> types,
1099 final boolean chunked) {
1100 this.handler = Preconditions.checkNotNull(handler);
1101 this.types = ImmutableSet.copyOf(types);
1102 this.chunked = chunked;
1103 this.uuid = UUID.randomUUID();
1104 this.nodes = this.chunked ? Maps.<URI, Node>newLinkedHashMap() : Maps
1105 .<URI, Node>newHashMap();
1106 this.roots = Lists.newArrayList();
1107 this.current = null;
1108 }
1109
1110 @Override
1111 public void handle(final Statement statement) throws RDFHandlerException {
1112
1113 if (statement == null) {
1114 flush(true);
1115 return;
1116 }
1117
1118 final Statement s = skolemize(statement);
1119
1120 final URI subj = (URI) s.getSubject();
1121 final URI pred = s.getPredicate();
1122 final Value obj = s.getObject();
1123
1124 if (this.current == null || !this.current.id().equals(subj)) {
1125 this.current = this.nodes.get(subj);
1126 if (this.current == null) {
1127 this.current = new Node(subj);
1128 this.nodes.put(subj, this.current);
1129 }
1130 }
1131
1132 this.current.add(s);
1133
1134 if (pred.equals(RDF.TYPE) && this.types.contains(obj)) {
1135 this.current.mark();
1136 if (this.chunked && !this.roots.isEmpty()) {
1137 flush(false);
1138 final URI threshold = this.roots.get(this.roots.size() - 1).id();
1139 final Iterator<URI> iterator = this.nodes.keySet().iterator();
1140 while (true) {
1141 final URI id = iterator.next();
1142 iterator.remove();
1143 if (id.equals(threshold)) {
1144 break;
1145 }
1146 }
1147 this.roots.clear();
1148 }
1149 this.roots.add(this.current);
1150 }
1151 }
1152
1153 private Statement skolemize(final Statement statement) {
1154 boolean skolemized = false;
1155 Resource subj = statement.getSubject();
1156 if (subj instanceof BNode) {
1157 subj = skolemize((BNode) subj);
1158 skolemized = true;
1159 }
1160 Value obj = statement.getObject();
1161 if (obj instanceof BNode) {
1162 obj = skolemize((BNode) obj);
1163 skolemized = true;
1164 }
1165 if (skolemized) {
1166 final URI pred = statement.getPredicate();
1167 return Data.getValueFactory().createStatement(subj, pred, obj);
1168 }
1169 return statement;
1170 }
1171
1172 private URI skolemize(final BNode bnode) {
1173 final String hash = Data.hash(this.uuid.getLeastSignificantBits(),
1174 this.uuid.getMostSignificantBits(), bnode.getID());
1175 return Data.getValueFactory().createURI("bnode:" + hash);
1176 }
1177
1178 private void flush(final boolean complete) throws RDFHandlerException {
1179 try {
1180 final List<Node> queue = Lists.newLinkedList();
1181 for (final Node root : this.roots) {
1182 final Record record = (Record) root.visit(root, queue);
1183 while (!queue.isEmpty()) {
1184 final Node node = queue.remove(0);
1185 node.complete(root, this.nodes, queue);
1186 }
1187 this.handler.handle(record);
1188 if (Thread.interrupted()) {
1189 throw new RDFHandlerException("Interrupted");
1190 }
1191 }
1192 if (complete) {
1193 this.handler.handle(null);
1194 }
1195 } catch (final Throwable ex) {
1196 Throwables.propagateIfPossible(ex, RDFHandlerException.class);
1197 throw new RDFHandlerException(ex);
1198 }
1199 }
1200
1201 private static class Node {
1202
1203 private final URI id;
1204
1205 private final List<Statement> statements;
1206
1207 private Object value;
1208
1209 private Node root;
1210
1211 private boolean reified;
1212
1213 private boolean result;
1214
1215 Node(final URI id) {
1216 this.id = id;
1217 this.statements = Lists.newArrayList();
1218 this.result = false;
1219 }
1220
1221 URI id() {
1222 return this.id;
1223 }
1224
1225 void mark() {
1226 this.result = true;
1227 }
1228
1229 void add(final Statement statement) {
1230 this.statements.add(statement);
1231 final URI pred = statement.getPredicate();
1232 this.reified = this.reified || pred.equals(RDF.SUBJECT)
1233 || pred.equals(RDF.PREDICATE) || pred.equals(RDF.OBJECT);
1234 }
1235
1236 Object visit(final Node root, final List<Node> queue) {
1237 if (this.root != root) {
1238 this.root = root;
1239 if (this.reified) {
1240 this.value = unreify();
1241 } else {
1242 this.value = Record.create((URI) this.statements.get(0).getSubject());
1243 queue.add(this);
1244 }
1245 return !this.result || this == root ? this.value : this.statements.get(0)
1246 .getSubject();
1247 } else if (this.value instanceof Statement) {
1248 return this.value;
1249 }
1250 return this.statements.get(0).getSubject();
1251 }
1252
1253 void complete(final Node root, final Map<URI, Node> nodes, final List<Node> queue) {
1254
1255 final Record record = (Record) this.value;
1256
1257 URI property = null;
1258 final List<Object> values = Lists.newArrayList();
1259
1260 Collections.sort(this.statements, Data.getTotalComparator());
1261 for (final Statement statement : this.statements) {
1262 if (!statement.getPredicate().equals(property)) {
1263 if (property != null) {
1264 record.set(property, values);
1265 }
1266 property = statement.getPredicate();
1267 values.clear();
1268 }
1269 Object value = statement.getObject();
1270 if (value instanceof URI) {
1271 final Node n = nodes.get(value);
1272 if (n != null) {
1273 value = n.visit(root, queue);
1274 }
1275 }
1276 values.add(value);
1277 }
1278 record.set(property, values);
1279 }
1280
1281 private Statement unreify() {
1282 Resource subj = null;
1283 URI pred = null;
1284 Value obj = null;
1285 for (final Statement statement : this.statements) {
1286 final URI property = statement.getPredicate();
1287 if (property.equals(RDF.SUBJECT)) {
1288 subj = (Resource) statement.getObject();
1289 } else if (property.equals(RDF.PREDICATE)) {
1290 pred = (URI) statement.getObject();
1291 } else if (property.equals(RDF.OBJECT)) {
1292 obj = statement.getObject();
1293 }
1294 }
1295 return Data.getValueFactory().createStatement(subj, pred, obj);
1296 }
1297
1298 }
1299
1300 }
1301
1302 }