1   package eu.fbk.knowledgestore.data;
2   
3   import java.io.ByteArrayInputStream;
4   import java.io.ByteArrayOutputStream;
5   import java.io.EOFException;
6   import java.io.IOException;
7   import java.io.InputStream;
8   import java.io.OutputStream;
9   import java.math.BigInteger;
10  import java.util.GregorianCalendar;
11  import java.util.List;
12  import java.util.Set;
13  import java.util.TimeZone;
14  import java.util.zip.Deflater;
15  import java.util.zip.DeflaterOutputStream;
16  import java.util.zip.Inflater;
17  import java.util.zip.InflaterInputStream;
18  
19  import javax.annotation.Nullable;
20  import javax.xml.datatype.XMLGregorianCalendar;
21  
22  import com.google.common.base.MoreObjects;
23  import com.google.common.collect.ImmutableSet;
24  import com.google.common.collect.Iterables;
25  import com.google.common.collect.Lists;
26  import com.google.common.io.ByteStreams;
27  import com.google.common.primitives.Ints;
28  import com.google.common.primitives.Longs;
29  
30  import org.openrdf.model.BNode;
31  import org.openrdf.model.Literal;
32  import org.openrdf.model.Resource;
33  import org.openrdf.model.Statement;
34  import org.openrdf.model.URI;
35  import org.openrdf.model.Value;
36  import org.openrdf.model.ValueFactory;
37  import org.openrdf.model.vocabulary.XMLSchema;
38  
39  // NOTE: supports only serialization and deserialization of Record, URI, BNode, Literal,
40  // Statement objects. For records, it is possible to specify which properties to serialize /
41  // deserialize.
42  
43  public final class Serializer {
44  
45      private static final Set<String> KB_PREFIXES = ImmutableSet.of("dbpedia", "yago", "gn",
46              "geonames", "lgdo", "lgv");
47  
48      private static final String LANG_NS = "lang:";
49  
50      private static final int TYPE_NULL = 0x00;
51  
52      private static final int TYPE_LIST = 0x10;
53  
54      private static final int TYPE_RECORD = 0x20;
55  
56      private static final int TYPE_LIT_STRING = 0x40;
57  
58      private static final int TYPE_LIT_STRING_LANG = 0x80;
59  
60      private static final int TYPE_LIT_TRUE = 0x01;
61  
62      private static final int TYPE_LIT_FALSE = 0x02;
63  
64      private static final int TYPE_LIT_LONG = 0x03;
65  
66      private static final int TYPE_LIT_INT = 0x04;
67  
68      private static final int TYPE_LIT_SHORT = 0x05;
69  
70      private static final int TYPE_LIT_BYTE = 0x06;
71  
72      private static final int TYPE_LIT_DOUBLE = 0x07;
73  
74      private static final int TYPE_LIT_FLOAT = 0x08;
75  
76      private static final int TYPE_LIT_BIG_INTEGER = 0x09;
77  
78      private static final int TYPE_LIT_BIG_DECIMAL = 0x0A;
79  
80      private static final int TYPE_LIT_DATETIME = 0x0B;
81  
82      private static final int TYPE_BNODE = 0x30;
83  
84      private static final int TYPE_URI_PLAIN = 0xC0;
85  
86      private static final int TYPE_URI_COMPRESSED = 0x0C;
87  
88      private static final int TYPE_STATEMENT = 0x0D;
89  
90      // Number serialization
91  
92      // bits len hi mask layout
93      // 07 01 0x00 0x7F 0 7
94      // 14 02 0x80 0x3F 10 6 8
95      // 21 03 0xC0 0x1F 110 5 8 8
96      // 28 04 0xE0 0x0F 1110 4 8 8 8
97      // 35 05 0xF0 0x07 11110 3 8 8 8 8
98      // 42 06 0xF8 0x03 111110 2 8 8 8 8 8
99      // 49 07 0xFC 0x01 1111110 1 8 8 8 8 8 8
100     // 56 08 0xFE 0x00 11111110 8 8 8 8 8 8 8
101     // 64 09 0xFF 0x00 11111111 8 8 8 8 8 8 8 8
102 
103     private final boolean compress;
104 
105     @Nullable
106     private final Dictionary<URI> dictionary;
107 
108     private final ValueFactory factory;
109 
110     public Serializer() {
111         this(false, null, null);
112     }
113 
114     public Serializer(final boolean compress, @Nullable final Dictionary<URI> dictionary,
115             @Nullable final ValueFactory factory) {
116         this.compress = compress;
117         this.dictionary = dictionary;
118         this.factory = MoreObjects.firstNonNull(factory, Data.getValueFactory());
119     }
120 
121     public byte[] toBytes(final Object object) {
122         try {
123             final ByteArrayOutputStream stream = new ByteArrayOutputStream();
124             toStream(stream, object);
125             return stream.toByteArray();
126         } catch (final IOException ex) {
127             throw new Error("Unexpected exception (!): " + ex.getMessage(), ex);
128         }
129     }
130 
131     public Object fromBytes(final byte[] bytes) {
132         try {
133             return fromStream(new ByteArrayInputStream(bytes));
134         } catch (final IOException ex) {
135             throw new Error("Unexpected exception (!): " + ex.getMessage(), ex);
136         }
137     }
138 
139     public void toStream(final OutputStream stream, final Object object) throws IOException {
140         if (this.compress) {
141             final Deflater deflater = new Deflater(Deflater.BEST_COMPRESSION, true);
142             final DeflaterOutputStream compressStream = new DeflaterOutputStream(stream, deflater);
143             writeObject(compressStream, object);
144             compressStream.finish();
145         } else {
146             writeObject(stream, object);
147         }
148     }
149 
150     public Object fromStream(final InputStream stream) throws IOException {
151         if (this.compress) {
152             final Inflater inflater = new Inflater(true);
153             final InflaterInputStream compressStream = new InflaterInputStream(stream, inflater);
154             return readObject(compressStream);
155         } else {
156             return readObject(stream);
157         }
158     }
159 
160     private void writeObject(final OutputStream stream, final Object object) throws IOException {
161 
162         if (object == null) {
163             writeHeader(stream, TYPE_NULL, 0);
164 
165         } else if (object instanceof Iterable<?>) {
166             final Iterable<?> iterable = (Iterable<?>) object;
167             final int size = Iterables.size(iterable);
168             writeHeader(stream, TYPE_LIST, size);
169             for (final Object element : iterable) {
170                 writeObject(stream, element);
171             }
172 
173         } else if (object instanceof Record) {
174             final Record record = (Record) object;
175             writeHeader(stream, TYPE_RECORD, record.getProperties().size());
176             writeObject(stream, record.getID());
177             for (final URI property : record.getProperties()) {
178                 writeCompressedURI(stream, property);
179                 final List<? extends Object> nodes = record.get(property);
180                 writeObject(stream, nodes.size() == 1 ? nodes.get(0) : nodes);
181             }
182 
183         } else if (object instanceof Literal) {
184             final Literal literal = (Literal) object;
185             final URI datatype = literal.getDatatype();
186             if (datatype == null || datatype.equals(XMLSchema.STRING)) {
187                 final String language = literal.getLanguage();
188                 final byte[] label = encodeString(literal.getLabel());
189                 if (language == null) {
190                     writeHeader(stream, TYPE_LIT_STRING, label.length);
191                 } else {
192                     writeHeader(stream, TYPE_LIT_STRING_LANG, label.length);
193                     final URI langURI = this.factory.createURI("lang:" + language);
194                     writeCompressedURI(stream, langURI);
195                 }
196                 stream.write(label);
197             } else if (datatype.equals(XMLSchema.BOOLEAN)) {
198                 writeHeader(stream, literal.booleanValue() ? TYPE_LIT_TRUE : TYPE_LIT_FALSE, 0);
199             } else if (datatype.equals(XMLSchema.LONG)) {
200                 writeHeader(stream, TYPE_LIT_LONG, 0);
201                 writeNumber(stream, literal.longValue());
202             } else if (datatype.equals(XMLSchema.INT)) {
203                 writeHeader(stream, TYPE_LIT_INT, 0);
204                 writeNumber(stream, literal.longValue());
205             } else if (datatype.equals(XMLSchema.DOUBLE)) {
206                 writeHeader(stream, TYPE_LIT_DOUBLE, 0);
207                 stream.write(Longs.toByteArray(Double.doubleToLongBits(literal.doubleValue())));
208             } else if (datatype.equals(XMLSchema.FLOAT)) {
209                 writeHeader(stream, TYPE_LIT_FLOAT, 0);
210                 stream.write(Ints.toByteArray(Float.floatToIntBits(literal.floatValue())));
211             } else if (datatype.equals(XMLSchema.SHORT)) {
212                 writeHeader(stream, TYPE_LIT_SHORT, 0);
213                 writeNumber(stream, literal.longValue());
214             } else if (datatype.equals(XMLSchema.BYTE)) {
215                 writeHeader(stream, TYPE_LIT_BYTE, 0);
216                 writeNumber(stream, literal.longValue());
217             } else if (datatype.equals(XMLSchema.INTEGER)) {
218                 writeHeader(stream, TYPE_LIT_BIG_INTEGER, 0);
219                 final byte[] bytes = literal.integerValue().toByteArray();
220                 writeNumber(stream, bytes.length);
221                 stream.write(bytes);
222             } else if (datatype.equals(XMLSchema.DECIMAL)) {
223                 writeHeader(stream, TYPE_LIT_BIG_DECIMAL, 0);
224                 final byte[] bytes = encodeString(literal.decimalValue().toString());
225                 writeNumber(stream, bytes.length);
226                 stream.write(bytes);
227             } else if (datatype.equals(XMLSchema.DATETIME)) {
228                 writeHeader(stream, TYPE_LIT_DATETIME, 0);
229                 final XMLGregorianCalendar calendar = literal.calendarValue();
230                 writeNumber(stream, calendar.getTimezone());
231                 writeNumber(stream, calendar.toGregorianCalendar().getTimeInMillis());
232             } else {
233                 throw new UnsupportedOperationException("Don't know how to serialize: " + literal);
234             }
235 
236         } else if (object instanceof BNode) {
237             final byte[] id = encodeString(((BNode) object).getID());
238             writeHeader(stream, TYPE_BNODE, id.length);
239             stream.write(id);
240 
241         } else if (object instanceof URI) {
242             final URI uri = (URI) object;
243             if (isVocabTerm(uri)) {
244                 writeHeader(stream, TYPE_URI_COMPRESSED, 0);
245                 writeCompressedURI(stream, uri);
246             } else {
247                 final byte[] string = encodeString(uri.stringValue());
248                 writeHeader(stream, TYPE_URI_PLAIN, string.length);
249                 stream.write(string);
250             }
251 
252         } else if (object instanceof Statement) {
253             final Statement statement = (Statement) object;
254             writeHeader(stream, TYPE_STATEMENT, 0);
255             writeObject(stream, statement.getSubject());
256             writeObject(stream, statement.getPredicate());
257             writeObject(stream, statement.getObject());
258             writeObject(stream, statement.getContext());
259 
260         } else {
261             throw new UnsupportedOperationException("Don't know how to serialize "
262                     + object.getClass());
263         }
264     }
265 
266     private void writeHeader(final OutputStream stream, final int type, final int number)
267             throws IOException {
268         if ((type & 0xC0) != 0 && number <= 62) {
269             stream.write(type | number + 1);
270         } else if ((type & 0x30) != 0 && number <= 14) {
271             stream.write(type | number + 1);
272         } else if ((type & 0xF0) != 0) {
273             stream.write(type);
274             writeNumber(stream, number);
275         } else {
276             stream.write(type);
277         }
278     }
279 
280     private void writeCompressedURI(final OutputStream stream, final URI uri) throws IOException {
281         if (this.dictionary != null) {
282             final int key = this.dictionary.keyFor(uri, true);
283             writeNumber(stream, key);
284         } else {
285             final String ns = uri.getNamespace();
286             if (LANG_NS.equals(ns)) {
287                 final byte[] utf8 = encodeString(uri.getLocalName());
288                 writeNumber(stream, utf8.length << 2 | 1);
289                 stream.write(utf8);
290             } else {
291                 final String prefix = Data.namespaceToPrefix(uri.getNamespace(),
292                         Data.getNamespaceMap());
293                 if (prefix != null) {
294                     final byte[] utf8 = encodeString(prefix + ":" + uri.getLocalName());
295                     writeNumber(stream, utf8.length << 2 | 3);
296                     stream.write(utf8);
297                 } else {
298                     final byte[] utf8 = encodeString(uri.stringValue());
299                     writeNumber(stream, utf8.length << 1);
300                     stream.write(utf8);
301                 }
302             }
303         }
304     }
305 
306     private void writeNumber(final OutputStream stream, final long num) throws IOException {
307         if (num < 0L || num > 0xFFFFFFFFFFFFFFL /* 56 bit */) {
308             writeNumberHelper(stream, 9, 0xFF, num);
309         } else if (num <= 0x7FL /* 7 bit */) {
310             writeNumberHelper(stream, 1, 0x00, num);
311         } else if (num <= 0x3FFFL /* 14 bit */) {
312             writeNumberHelper(stream, 2, 0x80, num);
313         } else if (num <= 0x1FFFFFL /* 21 bit */) {
314             writeNumberHelper(stream, 3, 0xC0, num);
315         } else if (num <= 0xFFFFFFFL /* 28 bit */) {
316             writeNumberHelper(stream, 4, 0xE0, num);
317         } else if (num <= 0x7FFFFFFFFL /* 35 bit */) {
318             writeNumberHelper(stream, 5, 0xF0, num);
319         } else if (num <= 0x3FFFFFFFFFFL /* 42 bit */) {
320             writeNumberHelper(stream, 6, 0xF8, num);
321         } else if (num <= 0x1FFFFFFFFFFFFL /* 49 bit */) {
322             writeNumberHelper(stream, 7, 0xFC, num);
323         } else {
324             writeNumberHelper(stream, 8, 0xFE, num);
325         }
326     }
327 
328     private void writeNumberHelper(final OutputStream stream, final int len, final int mask,
329             final long num) throws IOException {
330         stream.write(mask | (int) (num >>> (len - 1) * 8));
331         for (int i = len - 2; i >= 0; --i) {
332             stream.write((int) (num >>> i * 8 & 0xFF));
333         }
334     }
335 
336     private Object readObject(final InputStream stream) throws IOException {
337 
338         // Read header: type and optional number used later for parsing
339         int type = stream.read();
340         if (type < 0) {
341             throw new EOFException();
342         }
343         int num = 0;
344         if ((type & 0xC0) != 0) {
345             final int n = type & 0x3F;
346             num = n > 0 ? n - 1 : (int) readNumber(stream);
347             type = type & 0xC0;
348         } else if ((type & 0x30) != 0) {
349             final int n = type & 0x0F;
350             num = n > 0 ? n - 1 : (int) readNumber(stream);
351             type = type & 0x30;
352         }
353 
354         // Read the remainder based on parsed type
355         switch (type) {
356         case TYPE_NULL:
357             return null;
358 
359         case TYPE_LIST:
360             final List<Object> list = Lists.newArrayListWithCapacity(num);
361             for (int i = 0; i < num; ++i) {
362                 list.add(readObject(stream));
363             }
364             return list;
365 
366         case TYPE_RECORD:
367             final Record record = Record.create();
368             record.setID((URI) readObject(stream));
369             for (int i = 0; i < num; ++i) {
370                 final URI property = readCompressedURI(stream);
371                 final Object value = readObject(stream);
372                 record.set(property, value);
373             }
374             return record;
375 
376         case TYPE_BNODE:
377             final String bnodeID = decodeString(readBytes(stream, num));
378             return this.factory.createBNode(bnodeID);
379 
380         case TYPE_URI_COMPRESSED:
381             return readCompressedURI(stream);
382 
383         case TYPE_URI_PLAIN:
384             final String uriString = decodeString(readBytes(stream, num));
385             return this.factory.createURI(uriString);
386 
387         case TYPE_LIT_STRING:
388             final String plainLabel = decodeString(readBytes(stream, num));
389             return this.factory.createLiteral(plainLabel);
390 
391         case TYPE_LIT_STRING_LANG:
392             final String lang = readCompressedURI(stream).getLocalName();
393             final String label = decodeString(readBytes(stream, num));
394             return this.factory.createLiteral(label, lang);
395 
396         case TYPE_LIT_TRUE:
397             return this.factory.createLiteral(true);
398 
399         case TYPE_LIT_FALSE:
400             return this.factory.createLiteral(false);
401 
402         case TYPE_LIT_LONG:
403             final long longVal = readNumber(stream);
404             return this.factory.createLiteral(longVal);
405 
406         case TYPE_LIT_INT:
407             final int intVal = (int) readNumber(stream);
408             return this.factory.createLiteral(intVal);
409 
410         case TYPE_LIT_SHORT:
411             final short shortVal = (short) readNumber(stream);
412             return this.factory.createLiteral(shortVal);
413 
414         case TYPE_LIT_BYTE:
415             final byte byteVal = (byte) readNumber(stream);
416             return this.factory.createLiteral(byteVal);
417 
418         case TYPE_LIT_DOUBLE:
419             final byte[] doubleBytes = readBytes(stream, 8);
420             final double doubleVal = Double.longBitsToDouble(Longs.fromByteArray(doubleBytes));
421             return this.factory.createLiteral(doubleVal);
422 
423         case TYPE_LIT_FLOAT:
424             final byte[] floatBytes = readBytes(stream, 4);
425             final float floatVal = Float.intBitsToFloat(Ints.fromByteArray(floatBytes));
426             return this.factory.createLiteral(floatVal);
427 
428         case TYPE_LIT_BIG_INTEGER:
429             final int bigintLen = (int) readNumber(stream);
430             final String bigintVal = new BigInteger(readBytes(stream, bigintLen)).toString();
431             return this.factory.createLiteral(bigintVal, XMLSchema.INTEGER);
432 
433         case TYPE_LIT_BIG_DECIMAL:
434             final int bigdecLen = (int) readNumber(stream);
435             final String bigdecVal = decodeString(readBytes(stream, bigdecLen));
436             return this.factory.createLiteral(bigdecVal, XMLSchema.DECIMAL);
437 
438         case TYPE_LIT_DATETIME:
439             final int tz = (int) readNumber(stream);
440             final long millis = readNumber(stream);
441             final GregorianCalendar calendar = new GregorianCalendar();
442             calendar.setTimeInMillis(millis);
443             calendar.setTimeZone(TimeZone.getTimeZone(String.format("GMT%s%02d:%02d",
444                     tz >= 0 ? "+" : "-", Math.abs(tz) / 60, Math.abs(tz) % 60)));
445             return this.factory.createLiteral(Data.getDatatypeFactory().newXMLGregorianCalendar(
446                     calendar));
447 
448         case TYPE_STATEMENT:
449             final Resource subj = (Resource) readObject(stream);
450             final URI pred = (URI) readObject(stream);
451             final Value obj = (Value) readObject(stream);
452             final Resource ctx = (Resource) readObject(stream);
453             return ctx == null ? this.factory.createStatement(subj, pred, obj) : this.factory
454                     .createStatement(subj, pred, obj, ctx);
455 
456         default:
457             throw new UnsupportedOperationException("Don't know how to deserialize type " + type);
458         }
459     }
460 
461     private byte[] readBytes(final InputStream stream, final int length) throws IOException {
462         final byte[] bytes = new byte[length];
463         ByteStreams.readFully(stream, bytes);
464         return bytes;
465     }
466 
467     private URI readCompressedURI(final InputStream stream) throws IOException {
468         if (this.dictionary != null) {
469             final int key = (int) readNumber(stream);
470             return this.dictionary.objectFor(key);
471         } else {
472             final int header = (int) readNumber(stream);
473             if ((header & 0x1) == 0) {
474                 final String string = decodeString(readBytes(stream, header >> 1));
475                 return this.factory.createURI(string);
476             } else {
477                 final String string = decodeString(readBytes(stream, header >> 2));
478                 return (header & 0x3) == 1 ? this.factory.createURI(LANG_NS, string) //
479                         : (URI) Data.parseValue(string, Data.getNamespaceMap());
480             }
481         }
482     }
483 
484     private long readNumber(final InputStream stream) throws IOException {
485         final int b = stream.read();
486         if (b < 0) {
487             throw new EOFException();
488         }
489         if (b <= 0x00 + 0x7F) {
490             return readNumberHelper(stream, 1, b & 0x7F);
491         } else if (b <= 0x80 + 0x3F) {
492             return readNumberHelper(stream, 2, b & 0x3F);
493         } else if (b <= 0xC0 + 0x1F) {
494             return readNumberHelper(stream, 3, b & 0x1F);
495         } else if (b <= 0xE0 + 0x0F) {
496             return readNumberHelper(stream, 4, b & 0x0F);
497         } else if (b <= 0xF0 + 0x07) {
498             return readNumberHelper(stream, 5, b & 0x07);
499         } else if (b <= 0xF8 + 0x03) {
500             return readNumberHelper(stream, 6, b & 0x03);
501         } else if (b <= 0xFC + 0x01) {
502             return readNumberHelper(stream, 7, b & 0x01);
503         } else if (b <= 0xFE + 0x01) {
504             return readNumberHelper(stream, 8, b & 0x00);
505         } else {
506             return readNumberHelper(stream, 9, b & 0x00);
507         }
508     }
509 
510     private long readNumberHelper(final InputStream stream, final int len, final int start)
511             throws IOException {
512         long num = start;
513         for (int i = 1; i < len; ++i) {
514             final int c = stream.read();
515             if (c < 0) {
516                 throw new EOFException();
517             }
518             num = num << 8 | c & 0xFF;
519         }
520         return num;
521     }
522 
523     private byte[] encodeString(final String string) {
524         // return string.getBytes(Charsets.UTF_8);
525         return Smaz.compress(string);
526     }
527 
528     private String decodeString(final byte[] bytes) {
529         // return new String(bytes, Charsets.UTF_8);
530         return Smaz.decompress(bytes);
531     }
532 
533     private static boolean isVocabTerm(final URI uri) {
534         final String prefix = Data.namespaceToPrefix(uri.getNamespace(), Data.getNamespaceMap());
535         return prefix != null && !KB_PREFIXES.contains(prefix);
536     }
537 
538 }