1 package eu.fbk.knowledgestore.data;
2
3 import java.io.ByteArrayInputStream;
4 import java.io.ByteArrayOutputStream;
5 import java.io.EOFException;
6 import java.io.IOException;
7 import java.io.InputStream;
8 import java.io.OutputStream;
9 import java.math.BigInteger;
10 import java.util.GregorianCalendar;
11 import java.util.List;
12 import java.util.Set;
13 import java.util.TimeZone;
14 import java.util.zip.Deflater;
15 import java.util.zip.DeflaterOutputStream;
16 import java.util.zip.Inflater;
17 import java.util.zip.InflaterInputStream;
18
19 import javax.annotation.Nullable;
20 import javax.xml.datatype.XMLGregorianCalendar;
21
22 import com.google.common.base.MoreObjects;
23 import com.google.common.collect.ImmutableSet;
24 import com.google.common.collect.Iterables;
25 import com.google.common.collect.Lists;
26 import com.google.common.io.ByteStreams;
27 import com.google.common.primitives.Ints;
28 import com.google.common.primitives.Longs;
29
30 import org.openrdf.model.BNode;
31 import org.openrdf.model.Literal;
32 import org.openrdf.model.Resource;
33 import org.openrdf.model.Statement;
34 import org.openrdf.model.URI;
35 import org.openrdf.model.Value;
36 import org.openrdf.model.ValueFactory;
37 import org.openrdf.model.vocabulary.XMLSchema;
38
39
40
41
42
43 public final class Serializer {
44
45 private static final Set<String> KB_PREFIXES = ImmutableSet.of("dbpedia", "yago", "gn",
46 "geonames", "lgdo", "lgv");
47
48 private static final String LANG_NS = "lang:";
49
50 private static final int TYPE_NULL = 0x00;
51
52 private static final int TYPE_LIST = 0x10;
53
54 private static final int TYPE_RECORD = 0x20;
55
56 private static final int TYPE_LIT_STRING = 0x40;
57
58 private static final int TYPE_LIT_STRING_LANG = 0x80;
59
60 private static final int TYPE_LIT_TRUE = 0x01;
61
62 private static final int TYPE_LIT_FALSE = 0x02;
63
64 private static final int TYPE_LIT_LONG = 0x03;
65
66 private static final int TYPE_LIT_INT = 0x04;
67
68 private static final int TYPE_LIT_SHORT = 0x05;
69
70 private static final int TYPE_LIT_BYTE = 0x06;
71
72 private static final int TYPE_LIT_DOUBLE = 0x07;
73
74 private static final int TYPE_LIT_FLOAT = 0x08;
75
76 private static final int TYPE_LIT_BIG_INTEGER = 0x09;
77
78 private static final int TYPE_LIT_BIG_DECIMAL = 0x0A;
79
80 private static final int TYPE_LIT_DATETIME = 0x0B;
81
82 private static final int TYPE_BNODE = 0x30;
83
84 private static final int TYPE_URI_PLAIN = 0xC0;
85
86 private static final int TYPE_URI_COMPRESSED = 0x0C;
87
88 private static final int TYPE_STATEMENT = 0x0D;
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103 private final boolean compress;
104
105 @Nullable
106 private final Dictionary<URI> dictionary;
107
108 private final ValueFactory factory;
109
110 public Serializer() {
111 this(false, null, null);
112 }
113
114 public Serializer(final boolean compress, @Nullable final Dictionary<URI> dictionary,
115 @Nullable final ValueFactory factory) {
116 this.compress = compress;
117 this.dictionary = dictionary;
118 this.factory = MoreObjects.firstNonNull(factory, Data.getValueFactory());
119 }
120
121 public byte[] toBytes(final Object object) {
122 try {
123 final ByteArrayOutputStream stream = new ByteArrayOutputStream();
124 toStream(stream, object);
125 return stream.toByteArray();
126 } catch (final IOException ex) {
127 throw new Error("Unexpected exception (!): " + ex.getMessage(), ex);
128 }
129 }
130
131 public Object fromBytes(final byte[] bytes) {
132 try {
133 return fromStream(new ByteArrayInputStream(bytes));
134 } catch (final IOException ex) {
135 throw new Error("Unexpected exception (!): " + ex.getMessage(), ex);
136 }
137 }
138
139 public void toStream(final OutputStream stream, final Object object) throws IOException {
140 if (this.compress) {
141 final Deflater deflater = new Deflater(Deflater.BEST_COMPRESSION, true);
142 final DeflaterOutputStream compressStream = new DeflaterOutputStream(stream, deflater);
143 writeObject(compressStream, object);
144 compressStream.finish();
145 } else {
146 writeObject(stream, object);
147 }
148 }
149
150 public Object fromStream(final InputStream stream) throws IOException {
151 if (this.compress) {
152 final Inflater inflater = new Inflater(true);
153 final InflaterInputStream compressStream = new InflaterInputStream(stream, inflater);
154 return readObject(compressStream);
155 } else {
156 return readObject(stream);
157 }
158 }
159
160 private void writeObject(final OutputStream stream, final Object object) throws IOException {
161
162 if (object == null) {
163 writeHeader(stream, TYPE_NULL, 0);
164
165 } else if (object instanceof Iterable<?>) {
166 final Iterable<?> iterable = (Iterable<?>) object;
167 final int size = Iterables.size(iterable);
168 writeHeader(stream, TYPE_LIST, size);
169 for (final Object element : iterable) {
170 writeObject(stream, element);
171 }
172
173 } else if (object instanceof Record) {
174 final Record record = (Record) object;
175 writeHeader(stream, TYPE_RECORD, record.getProperties().size());
176 writeObject(stream, record.getID());
177 for (final URI property : record.getProperties()) {
178 writeCompressedURI(stream, property);
179 final List<? extends Object> nodes = record.get(property);
180 writeObject(stream, nodes.size() == 1 ? nodes.get(0) : nodes);
181 }
182
183 } else if (object instanceof Literal) {
184 final Literal literal = (Literal) object;
185 final URI datatype = literal.getDatatype();
186 if (datatype == null || datatype.equals(XMLSchema.STRING)) {
187 final String language = literal.getLanguage();
188 final byte[] label = encodeString(literal.getLabel());
189 if (language == null) {
190 writeHeader(stream, TYPE_LIT_STRING, label.length);
191 } else {
192 writeHeader(stream, TYPE_LIT_STRING_LANG, label.length);
193 final URI langURI = this.factory.createURI("lang:" + language);
194 writeCompressedURI(stream, langURI);
195 }
196 stream.write(label);
197 } else if (datatype.equals(XMLSchema.BOOLEAN)) {
198 writeHeader(stream, literal.booleanValue() ? TYPE_LIT_TRUE : TYPE_LIT_FALSE, 0);
199 } else if (datatype.equals(XMLSchema.LONG)) {
200 writeHeader(stream, TYPE_LIT_LONG, 0);
201 writeNumber(stream, literal.longValue());
202 } else if (datatype.equals(XMLSchema.INT)) {
203 writeHeader(stream, TYPE_LIT_INT, 0);
204 writeNumber(stream, literal.longValue());
205 } else if (datatype.equals(XMLSchema.DOUBLE)) {
206 writeHeader(stream, TYPE_LIT_DOUBLE, 0);
207 stream.write(Longs.toByteArray(Double.doubleToLongBits(literal.doubleValue())));
208 } else if (datatype.equals(XMLSchema.FLOAT)) {
209 writeHeader(stream, TYPE_LIT_FLOAT, 0);
210 stream.write(Ints.toByteArray(Float.floatToIntBits(literal.floatValue())));
211 } else if (datatype.equals(XMLSchema.SHORT)) {
212 writeHeader(stream, TYPE_LIT_SHORT, 0);
213 writeNumber(stream, literal.longValue());
214 } else if (datatype.equals(XMLSchema.BYTE)) {
215 writeHeader(stream, TYPE_LIT_BYTE, 0);
216 writeNumber(stream, literal.longValue());
217 } else if (datatype.equals(XMLSchema.INTEGER)) {
218 writeHeader(stream, TYPE_LIT_BIG_INTEGER, 0);
219 final byte[] bytes = literal.integerValue().toByteArray();
220 writeNumber(stream, bytes.length);
221 stream.write(bytes);
222 } else if (datatype.equals(XMLSchema.DECIMAL)) {
223 writeHeader(stream, TYPE_LIT_BIG_DECIMAL, 0);
224 final byte[] bytes = encodeString(literal.decimalValue().toString());
225 writeNumber(stream, bytes.length);
226 stream.write(bytes);
227 } else if (datatype.equals(XMLSchema.DATETIME)) {
228 writeHeader(stream, TYPE_LIT_DATETIME, 0);
229 final XMLGregorianCalendar calendar = literal.calendarValue();
230 writeNumber(stream, calendar.getTimezone());
231 writeNumber(stream, calendar.toGregorianCalendar().getTimeInMillis());
232 } else {
233 throw new UnsupportedOperationException("Don't know how to serialize: " + literal);
234 }
235
236 } else if (object instanceof BNode) {
237 final byte[] id = encodeString(((BNode) object).getID());
238 writeHeader(stream, TYPE_BNODE, id.length);
239 stream.write(id);
240
241 } else if (object instanceof URI) {
242 final URI uri = (URI) object;
243 if (isVocabTerm(uri)) {
244 writeHeader(stream, TYPE_URI_COMPRESSED, 0);
245 writeCompressedURI(stream, uri);
246 } else {
247 final byte[] string = encodeString(uri.stringValue());
248 writeHeader(stream, TYPE_URI_PLAIN, string.length);
249 stream.write(string);
250 }
251
252 } else if (object instanceof Statement) {
253 final Statement statement = (Statement) object;
254 writeHeader(stream, TYPE_STATEMENT, 0);
255 writeObject(stream, statement.getSubject());
256 writeObject(stream, statement.getPredicate());
257 writeObject(stream, statement.getObject());
258 writeObject(stream, statement.getContext());
259
260 } else {
261 throw new UnsupportedOperationException("Don't know how to serialize "
262 + object.getClass());
263 }
264 }
265
266 private void writeHeader(final OutputStream stream, final int type, final int number)
267 throws IOException {
268 if ((type & 0xC0) != 0 && number <= 62) {
269 stream.write(type | number + 1);
270 } else if ((type & 0x30) != 0 && number <= 14) {
271 stream.write(type | number + 1);
272 } else if ((type & 0xF0) != 0) {
273 stream.write(type);
274 writeNumber(stream, number);
275 } else {
276 stream.write(type);
277 }
278 }
279
280 private void writeCompressedURI(final OutputStream stream, final URI uri) throws IOException {
281 if (this.dictionary != null) {
282 final int key = this.dictionary.keyFor(uri, true);
283 writeNumber(stream, key);
284 } else {
285 final String ns = uri.getNamespace();
286 if (LANG_NS.equals(ns)) {
287 final byte[] utf8 = encodeString(uri.getLocalName());
288 writeNumber(stream, utf8.length << 2 | 1);
289 stream.write(utf8);
290 } else {
291 final String prefix = Data.namespaceToPrefix(uri.getNamespace(),
292 Data.getNamespaceMap());
293 if (prefix != null) {
294 final byte[] utf8 = encodeString(prefix + ":" + uri.getLocalName());
295 writeNumber(stream, utf8.length << 2 | 3);
296 stream.write(utf8);
297 } else {
298 final byte[] utf8 = encodeString(uri.stringValue());
299 writeNumber(stream, utf8.length << 1);
300 stream.write(utf8);
301 }
302 }
303 }
304 }
305
306 private void writeNumber(final OutputStream stream, final long num) throws IOException {
307 if (num < 0L || num > 0xFFFFFFFFFFFFFFL ) {
308 writeNumberHelper(stream, 9, 0xFF, num);
309 } else if (num <= 0x7FL ) {
310 writeNumberHelper(stream, 1, 0x00, num);
311 } else if (num <= 0x3FFFL ) {
312 writeNumberHelper(stream, 2, 0x80, num);
313 } else if (num <= 0x1FFFFFL ) {
314 writeNumberHelper(stream, 3, 0xC0, num);
315 } else if (num <= 0xFFFFFFFL ) {
316 writeNumberHelper(stream, 4, 0xE0, num);
317 } else if (num <= 0x7FFFFFFFFL ) {
318 writeNumberHelper(stream, 5, 0xF0, num);
319 } else if (num <= 0x3FFFFFFFFFFL ) {
320 writeNumberHelper(stream, 6, 0xF8, num);
321 } else if (num <= 0x1FFFFFFFFFFFFL ) {
322 writeNumberHelper(stream, 7, 0xFC, num);
323 } else {
324 writeNumberHelper(stream, 8, 0xFE, num);
325 }
326 }
327
328 private void writeNumberHelper(final OutputStream stream, final int len, final int mask,
329 final long num) throws IOException {
330 stream.write(mask | (int) (num >>> (len - 1) * 8));
331 for (int i = len - 2; i >= 0; --i) {
332 stream.write((int) (num >>> i * 8 & 0xFF));
333 }
334 }
335
336 private Object readObject(final InputStream stream) throws IOException {
337
338
339 int type = stream.read();
340 if (type < 0) {
341 throw new EOFException();
342 }
343 int num = 0;
344 if ((type & 0xC0) != 0) {
345 final int n = type & 0x3F;
346 num = n > 0 ? n - 1 : (int) readNumber(stream);
347 type = type & 0xC0;
348 } else if ((type & 0x30) != 0) {
349 final int n = type & 0x0F;
350 num = n > 0 ? n - 1 : (int) readNumber(stream);
351 type = type & 0x30;
352 }
353
354
355 switch (type) {
356 case TYPE_NULL:
357 return null;
358
359 case TYPE_LIST:
360 final List<Object> list = Lists.newArrayListWithCapacity(num);
361 for (int i = 0; i < num; ++i) {
362 list.add(readObject(stream));
363 }
364 return list;
365
366 case TYPE_RECORD:
367 final Record record = Record.create();
368 record.setID((URI) readObject(stream));
369 for (int i = 0; i < num; ++i) {
370 final URI property = readCompressedURI(stream);
371 final Object value = readObject(stream);
372 record.set(property, value);
373 }
374 return record;
375
376 case TYPE_BNODE:
377 final String bnodeID = decodeString(readBytes(stream, num));
378 return this.factory.createBNode(bnodeID);
379
380 case TYPE_URI_COMPRESSED:
381 return readCompressedURI(stream);
382
383 case TYPE_URI_PLAIN:
384 final String uriString = decodeString(readBytes(stream, num));
385 return this.factory.createURI(uriString);
386
387 case TYPE_LIT_STRING:
388 final String plainLabel = decodeString(readBytes(stream, num));
389 return this.factory.createLiteral(plainLabel);
390
391 case TYPE_LIT_STRING_LANG:
392 final String lang = readCompressedURI(stream).getLocalName();
393 final String label = decodeString(readBytes(stream, num));
394 return this.factory.createLiteral(label, lang);
395
396 case TYPE_LIT_TRUE:
397 return this.factory.createLiteral(true);
398
399 case TYPE_LIT_FALSE:
400 return this.factory.createLiteral(false);
401
402 case TYPE_LIT_LONG:
403 final long longVal = readNumber(stream);
404 return this.factory.createLiteral(longVal);
405
406 case TYPE_LIT_INT:
407 final int intVal = (int) readNumber(stream);
408 return this.factory.createLiteral(intVal);
409
410 case TYPE_LIT_SHORT:
411 final short shortVal = (short) readNumber(stream);
412 return this.factory.createLiteral(shortVal);
413
414 case TYPE_LIT_BYTE:
415 final byte byteVal = (byte) readNumber(stream);
416 return this.factory.createLiteral(byteVal);
417
418 case TYPE_LIT_DOUBLE:
419 final byte[] doubleBytes = readBytes(stream, 8);
420 final double doubleVal = Double.longBitsToDouble(Longs.fromByteArray(doubleBytes));
421 return this.factory.createLiteral(doubleVal);
422
423 case TYPE_LIT_FLOAT:
424 final byte[] floatBytes = readBytes(stream, 4);
425 final float floatVal = Float.intBitsToFloat(Ints.fromByteArray(floatBytes));
426 return this.factory.createLiteral(floatVal);
427
428 case TYPE_LIT_BIG_INTEGER:
429 final int bigintLen = (int) readNumber(stream);
430 final String bigintVal = new BigInteger(readBytes(stream, bigintLen)).toString();
431 return this.factory.createLiteral(bigintVal, XMLSchema.INTEGER);
432
433 case TYPE_LIT_BIG_DECIMAL:
434 final int bigdecLen = (int) readNumber(stream);
435 final String bigdecVal = decodeString(readBytes(stream, bigdecLen));
436 return this.factory.createLiteral(bigdecVal, XMLSchema.DECIMAL);
437
438 case TYPE_LIT_DATETIME:
439 final int tz = (int) readNumber(stream);
440 final long millis = readNumber(stream);
441 final GregorianCalendar calendar = new GregorianCalendar();
442 calendar.setTimeInMillis(millis);
443 calendar.setTimeZone(TimeZone.getTimeZone(String.format("GMT%s%02d:%02d",
444 tz >= 0 ? "+" : "-", Math.abs(tz) / 60, Math.abs(tz) % 60)));
445 return this.factory.createLiteral(Data.getDatatypeFactory().newXMLGregorianCalendar(
446 calendar));
447
448 case TYPE_STATEMENT:
449 final Resource subj = (Resource) readObject(stream);
450 final URI pred = (URI) readObject(stream);
451 final Value obj = (Value) readObject(stream);
452 final Resource ctx = (Resource) readObject(stream);
453 return ctx == null ? this.factory.createStatement(subj, pred, obj) : this.factory
454 .createStatement(subj, pred, obj, ctx);
455
456 default:
457 throw new UnsupportedOperationException("Don't know how to deserialize type " + type);
458 }
459 }
460
461 private byte[] readBytes(final InputStream stream, final int length) throws IOException {
462 final byte[] bytes = new byte[length];
463 ByteStreams.readFully(stream, bytes);
464 return bytes;
465 }
466
467 private URI readCompressedURI(final InputStream stream) throws IOException {
468 if (this.dictionary != null) {
469 final int key = (int) readNumber(stream);
470 return this.dictionary.objectFor(key);
471 } else {
472 final int header = (int) readNumber(stream);
473 if ((header & 0x1) == 0) {
474 final String string = decodeString(readBytes(stream, header >> 1));
475 return this.factory.createURI(string);
476 } else {
477 final String string = decodeString(readBytes(stream, header >> 2));
478 return (header & 0x3) == 1 ? this.factory.createURI(LANG_NS, string)
479 : (URI) Data.parseValue(string, Data.getNamespaceMap());
480 }
481 }
482 }
483
484 private long readNumber(final InputStream stream) throws IOException {
485 final int b = stream.read();
486 if (b < 0) {
487 throw new EOFException();
488 }
489 if (b <= 0x00 + 0x7F) {
490 return readNumberHelper(stream, 1, b & 0x7F);
491 } else if (b <= 0x80 + 0x3F) {
492 return readNumberHelper(stream, 2, b & 0x3F);
493 } else if (b <= 0xC0 + 0x1F) {
494 return readNumberHelper(stream, 3, b & 0x1F);
495 } else if (b <= 0xE0 + 0x0F) {
496 return readNumberHelper(stream, 4, b & 0x0F);
497 } else if (b <= 0xF0 + 0x07) {
498 return readNumberHelper(stream, 5, b & 0x07);
499 } else if (b <= 0xF8 + 0x03) {
500 return readNumberHelper(stream, 6, b & 0x03);
501 } else if (b <= 0xFC + 0x01) {
502 return readNumberHelper(stream, 7, b & 0x01);
503 } else if (b <= 0xFE + 0x01) {
504 return readNumberHelper(stream, 8, b & 0x00);
505 } else {
506 return readNumberHelper(stream, 9, b & 0x00);
507 }
508 }
509
510 private long readNumberHelper(final InputStream stream, final int len, final int start)
511 throws IOException {
512 long num = start;
513 for (int i = 1; i < len; ++i) {
514 final int c = stream.read();
515 if (c < 0) {
516 throw new EOFException();
517 }
518 num = num << 8 | c & 0xFF;
519 }
520 return num;
521 }
522
523 private byte[] encodeString(final String string) {
524
525 return Smaz.compress(string);
526 }
527
528 private String decodeString(final byte[] bytes) {
529
530 return Smaz.decompress(bytes);
531 }
532
533 private static boolean isVocabTerm(final URI uri) {
534 final String prefix = Data.namespaceToPrefix(uri.getNamespace(), Data.getNamespaceMap());
535 return prefix != null && !KB_PREFIXES.contains(prefix);
536 }
537
538 }