1 package eu.fbk.knowledgestore.datastore.hbase.utils;
2
3 import java.io.ByteArrayInputStream;
4 import java.io.ByteArrayOutputStream;
5 import java.io.IOException;
6 import java.io.InputStream;
7 import java.io.OutputStream;
8 import java.util.GregorianCalendar;
9 import java.util.List;
10 import java.util.Set;
11 import java.util.TimeZone;
12
13 import javax.annotation.Nullable;
14 import javax.xml.datatype.DatatypeFactory;
15 import javax.xml.datatype.XMLGregorianCalendar;
16
17 import com.google.common.base.Preconditions;
18 import com.google.common.collect.ImmutableList;
19 import com.google.common.collect.Iterables;
20 import com.google.common.collect.Lists;
21
22 import org.apache.avro.Schema;
23 import org.apache.avro.generic.GenericData;
24 import org.apache.avro.generic.GenericDatumReader;
25 import org.apache.avro.generic.GenericDatumWriter;
26 import org.apache.avro.generic.GenericRecord;
27 import org.apache.avro.io.DatumReader;
28 import org.apache.avro.io.DatumWriter;
29 import org.apache.avro.io.Decoder;
30 import org.apache.avro.io.DecoderFactory;
31 import org.apache.avro.io.Encoder;
32 import org.apache.avro.io.EncoderFactory;
33 import org.openrdf.model.BNode;
34 import org.openrdf.model.Literal;
35 import org.openrdf.model.Resource;
36 import org.openrdf.model.Statement;
37 import org.openrdf.model.URI;
38 import org.openrdf.model.Value;
39 import org.openrdf.model.ValueFactory;
40 import org.openrdf.model.vocabulary.RDF;
41 import org.openrdf.model.vocabulary.XMLSchema;
42
43 import eu.fbk.knowledgestore.data.Data;
44 import eu.fbk.knowledgestore.data.Dictionary;
45 import eu.fbk.knowledgestore.data.Record;
46
47
48
49
50
51
52
53
54 public final class AvroSerializer {
55
56 private final Dictionary<URI> dictionary;
57
58 private final ValueFactory factory;
59
60 private final DatatypeFactory datatypeFactory;
61
62 public AvroSerializer() {
63 this(null);
64 }
65
66 public AvroSerializer(@Nullable final Dictionary<URI> dictionary) {
67 this.dictionary = dictionary;
68 this.factory = Data.getValueFactory();
69 this.datatypeFactory = Data.getDatatypeFactory();
70 }
71
72 public Dictionary<URI> getDictionary() {
73 return this.dictionary;
74 }
75
76 public byte[] compressURI(final URI uri) {
77 Preconditions.checkNotNull(uri);
78 try {
79 final ByteArrayOutputStream stream = new ByteArrayOutputStream();
80 final Encoder encoder = EncoderFactory.get().directBinaryEncoder(stream, null);
81 final DatumWriter<Object> writer = new GenericDatumWriter<Object>(
82 AvroSchemas.COMPRESSED_IDENTIFIER);
83 this.dictionary.keyFor(uri);
84 final Object generic = encodeIdentifier(uri);
85 writer.write(generic, encoder);
86 return stream.toByteArray();
87
88 } catch (final IOException ex) {
89 throw new Error("Unexpected exception (!): " + ex.getMessage(), ex);
90 }
91 }
92
93 public URI expandURI(final byte[] bytes) {
94 Preconditions.checkNotNull(bytes);
95 try {
96 final InputStream stream = new ByteArrayInputStream(bytes);
97 final Decoder decoder = DecoderFactory.get().directBinaryDecoder(stream, null);
98 final DatumReader<Object> reader = new GenericDatumReader<Object>(
99 AvroSchemas.COMPRESSED_IDENTIFIER);
100 final Object generic = reader.read(null, decoder);
101 return (URI) decodeNode(generic);
102
103 } catch (final IOException ex) {
104 throw new Error("Unexpected exception (!): " + ex.getMessage(), ex);
105 }
106 }
107
108 public byte[] toBytes(final Object object) {
109 try {
110 final ByteArrayOutputStream stream = new ByteArrayOutputStream();
111 this.toStream(stream, object);
112 return stream.toByteArray();
113 } catch (final IOException ex) {
114 throw new Error("Unexpected exception (!): " + ex.getMessage(), ex);
115 }
116 }
117
118 public byte[] toBytes(final Record object, @Nullable final Set<URI> propertiesToSerialize) {
119 try {
120 final ByteArrayOutputStream stream = new ByteArrayOutputStream();
121 this.toStream(stream, object, propertiesToSerialize);
122 return stream.toByteArray();
123 } catch (final IOException ex) {
124 throw new Error("Unexpected exception (!): " + ex.getMessage(), ex);
125 }
126 }
127
128 public Object fromBytes(final byte[] bytes) {
129 try {
130 return this.fromStream(new ByteArrayInputStream(bytes));
131 } catch (final IOException ex) {
132 throw new Error("Unexpected exception (!): " + ex.getMessage(), ex);
133 }
134 }
135
136 public Record fromBytes(final byte[] bytes, final @Nullable Set<URI> propertiesToDeserialize) {
137 try {
138 return this.fromStream(new ByteArrayInputStream(bytes), propertiesToDeserialize);
139 } catch (final IOException ex) {
140 throw new Error("Unexpected exception (!): " + ex.getMessage(), ex);
141 }
142 }
143
144 public void toStream(final OutputStream stream, final Object object) throws IOException {
145 final Object generic = encodeNode(object);
146 final Encoder encoder = EncoderFactory.get().directBinaryEncoder(stream, null);
147 final DatumWriter<Object> writer = new GenericDatumWriter<Object>(AvroSchemas.NODE);
148 writer.write(generic, encoder);
149 encoder.flush();
150 }
151
152 public void toStream(final OutputStream stream, final Record object,
153 @Nullable final Set<URI> propertiesToSerialize) throws IOException {
154 final Object generic = encodeRecord(object, propertiesToSerialize);
155 final Encoder encoder = EncoderFactory.get().directBinaryEncoder(stream, null);
156 final DatumWriter<Object> writer = new GenericDatumWriter<Object>(AvroSchemas.NODE);
157 writer.write(generic, encoder);
158 encoder.flush();
159 }
160
161 public Object fromStream(final InputStream stream) throws IOException {
162 final Decoder decoder = DecoderFactory.get().directBinaryDecoder(stream, null);
163 final DatumReader<Object> reader = new GenericDatumReader<Object>(AvroSchemas.NODE);
164 final Object generic = reader.read(null, decoder);
165 return decodeNode(generic);
166 }
167
168 public Record fromStream(final InputStream stream,
169 @Nullable final Set<URI> propertiesToDeserialize) throws IOException {
170 final Decoder decoder = DecoderFactory.get().directBinaryDecoder(stream, null);
171 final DatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(
172 AvroSchemas.NODE);
173 final GenericRecord generic = reader.read(null, decoder);
174 return decodeRecord(generic, propertiesToDeserialize);
175 }
176
177 private List<Object> decodeNodes(final Object generic) {
178 if (generic instanceof Iterable<?>) {
179 final Iterable<?> iterable = (Iterable<?>) generic;
180 final int size = Iterables.size(iterable);
181 final List<Object> nodes = Lists.<Object>newArrayListWithCapacity(size);
182 for (final Object element : iterable) {
183 nodes.add(decodeNode(element));
184 }
185 return nodes;
186 }
187 Preconditions.checkNotNull(generic);
188 return ImmutableList.of(decodeNode(generic));
189 }
190
191 private Object decodeNode(final Object generic) {
192 if (generic instanceof GenericRecord) {
193 final GenericRecord record = (GenericRecord) generic;
194 final Schema schema = record.getSchema();
195 if (schema.equals(AvroSchemas.RECORD)) {
196 return decodeRecord(record, null);
197 } else if (schema.equals(AvroSchemas.PLAIN_IDENTIFIER)
198 || schema.equals(AvroSchemas.COMPRESSED_IDENTIFIER)) {
199 return decodeIdentifier(record);
200 } else if (schema.equals(AvroSchemas.STATEMENT)) {
201 return decodeStatement(record);
202 }
203 }
204 return decodeLiteral(generic);
205 }
206
207 @SuppressWarnings("unchecked")
208 private Record decodeRecord(final GenericRecord generic,
209 @Nullable final Set<URI> propertiesToDecode) {
210 final Record record = Record.create();
211 final GenericRecord encodedID = (GenericRecord) generic.get(0);
212 if (encodedID != null) {
213 record.setID((URI) decodeIdentifier(encodedID));
214 }
215 for (final GenericRecord prop : (Iterable<GenericRecord>) generic.get(1)) {
216 final URI property = (URI) decodeIdentifier((GenericRecord) prop.get(0));
217 final List<Object> values = decodeNodes(prop.get(1));
218 if (propertiesToDecode == null || propertiesToDecode.contains(property)) {
219 record.set(property, values);
220 }
221 }
222 return record;
223 }
224
225 private Value decodeValue(final Object generic) {
226 if (generic instanceof GenericRecord) {
227 final GenericRecord record = (GenericRecord) generic;
228 final Schema schema = record.getSchema();
229 if (schema.equals(AvroSchemas.COMPRESSED_IDENTIFIER)
230 || schema.equals(AvroSchemas.PLAIN_IDENTIFIER)) {
231 return decodeIdentifier(record);
232 }
233 }
234 return decodeLiteral(generic);
235 }
236
237 private Resource decodeIdentifier(final GenericRecord record) {
238 final Schema schema = record.getSchema();
239 if (schema.equals(AvroSchemas.COMPRESSED_IDENTIFIER)) {
240 try {
241 return this.dictionary.objectFor((Integer) record.get(0));
242 } catch (final IOException ex) {
243 throw new IllegalStateException("Cannot access dictionary: " + ex.getMessage(), ex);
244 }
245 } else if (schema.equals(AvroSchemas.PLAIN_IDENTIFIER)) {
246 final String string = record.get(0).toString();
247 if (string.startsWith("_:")) {
248 return this.factory.createBNode(string.substring(2));
249 } else {
250 return this.factory.createURI(string);
251 }
252 }
253 throw new IllegalArgumentException("Unsupported encoded identifier: " + record);
254 }
255
256 private Literal decodeLiteral(final Object generic) {
257 if (generic instanceof GenericRecord) {
258 final GenericRecord record = (GenericRecord) generic;
259 final Schema schema = record.getSchema();
260 if (schema.equals(AvroSchemas.STRING_LANG)) {
261 final String label = record.get(0).toString();
262 final Object language = record.get(1);
263 return this.factory.createLiteral(label, language.toString());
264 } else if (schema.equals(AvroSchemas.SHORT)) {
265 return this.factory.createLiteral(((Integer) record.get(0)).shortValue());
266 } else if (schema.equals(AvroSchemas.BYTE)) {
267 return this.factory.createLiteral(((Integer) record.get(0)).byteValue());
268 } else if (schema.equals(AvroSchemas.BIGINTEGER)) {
269 return this.factory.createLiteral(record.get(0).toString(), XMLSchema.INTEGER);
270 } else if (schema.equals(AvroSchemas.BIGDECIMAL)) {
271 return this.factory.createLiteral(record.get(0).toString(), XMLSchema.DECIMAL);
272 } else if (schema.equals(AvroSchemas.CALENDAR)) {
273 final int tz = (Integer) record.get(0);
274 final GregorianCalendar calendar = new GregorianCalendar();
275 calendar.setTimeInMillis((Long) record.get(1));
276 calendar.setTimeZone(TimeZone.getTimeZone(String.format("GMT%s%02d:%02d",
277 tz >= 0 ? "+" : "-", Math.abs(tz) / 60, Math.abs(tz) % 60)));
278 return this.factory.createLiteral(this.datatypeFactory
279 .newXMLGregorianCalendar(calendar));
280 }
281 } else if (generic instanceof CharSequence) {
282 return this.factory.createLiteral(generic.toString());
283 } else if (generic instanceof Boolean) {
284 return this.factory.createLiteral((Boolean) generic);
285 } else if (generic instanceof Long) {
286 return this.factory.createLiteral((Long) generic);
287 } else if (generic instanceof Integer) {
288 return this.factory.createLiteral((Integer) generic);
289 } else if (generic instanceof Double) {
290 return this.factory.createLiteral((Double) generic);
291 } else if (generic instanceof Float) {
292 return this.factory.createLiteral((Float) generic);
293 }
294 Preconditions.checkNotNull(generic);
295 throw new IllegalArgumentException("Unsupported generic data: " + generic);
296 }
297
298 private Statement decodeStatement(final GenericRecord record) {
299 final Resource subj = decodeIdentifier((GenericRecord) record.get(0));
300 final URI pred = (URI) decodeIdentifier((GenericRecord) record.get(1));
301 final Value obj = decodeValue(record.get(2));
302 final Resource ctx = decodeIdentifier((GenericRecord) record.get(3));
303 if (ctx == null) {
304 return this.factory.createStatement(subj, pred, obj);
305 } else {
306 return this.factory.createStatement(subj, pred, obj, ctx);
307 }
308 }
309
310 private Object encodeNodes(final Iterable<? extends Object> nodes) {
311 final int size = Iterables.size(nodes);
312 if (size == 1) {
313 return encodeNode(Iterables.get(nodes, 0));
314 }
315 final List<Object> list = Lists.<Object>newArrayListWithCapacity(size);
316 for (final Object node : nodes) {
317 list.add(encodeNode(node));
318 }
319 return list;
320 }
321
322 private Object encodeNode(final Object node) {
323 if (node instanceof Record) {
324 return encodeRecord((Record) node, null);
325 } else if (node instanceof Literal) {
326 return encodeLiteral((Literal) node);
327 } else if (node instanceof Resource) {
328 return encodeIdentifier((Resource) node);
329 } else if (node instanceof Statement) {
330 return encodeStatement((Statement) node);
331 }
332 Preconditions.checkNotNull(node);
333 throw new IllegalArgumentException("Unsupported node: " + node);
334 }
335
336 private Object encodeRecord(final Record record, @Nullable final Set<URI> propertiesToEncode) {
337 final URI id = record.getID();
338 final Object encodedID = id == null ? null : encodeIdentifier(id);
339 final List<Object> props = Lists.newArrayList();
340 for (final URI property : record.getProperties()) {
341 if (propertiesToEncode == null || propertiesToEncode.contains(property)) {
342 ensureInDictionary(property);
343 final List<? extends Object> nodes = record.get(property);
344 if (property.equals(RDF.TYPE)) {
345 for (final Object value : nodes) {
346 if (value instanceof URI) {
347 ensureInDictionary((URI) value);
348 }
349 }
350 }
351 final GenericData.Record prop = new GenericData.Record(AvroSchemas.PROPERTY);
352 prop.put("propertyURI", encodeIdentifier(property));
353 prop.put("propertyValue", encodeNodes(nodes));
354 props.add(prop);
355 }
356 }
357 return AvroSerializer.newGenericRecord(AvroSchemas.RECORD, encodedID, props);
358 }
359
360 private Object encodeValue(final Value value) {
361 if (value instanceof Literal) {
362 return encodeLiteral((Literal) value);
363 } else if (value instanceof Resource) {
364 return encodeIdentifier((Resource) value);
365 } else {
366 throw new IllegalArgumentException("Unsupported value: " + value);
367 }
368 }
369
370 private Object encodeIdentifier(final Resource identifier) {
371 if (identifier instanceof URI) {
372 try {
373 final Integer key = this.dictionary.keyFor((URI) identifier, false);
374 if (key != null) {
375 return AvroSerializer.newGenericRecord(AvroSchemas.COMPRESSED_IDENTIFIER, key);
376 }
377 } catch (final IOException ex) {
378 throw new IllegalStateException("Cannot access dictionary: " + ex.getMessage(), ex);
379 }
380 }
381 final String id = identifier instanceof BNode ? "_:" + ((BNode) identifier).getID()
382 : identifier.stringValue();
383 return AvroSerializer.newGenericRecord(AvroSchemas.PLAIN_IDENTIFIER, id);
384 }
385
386 private Object encodeLiteral(final Literal literal) {
387 final URI datatype = literal.getDatatype();
388 if (datatype == null || datatype.equals(XMLSchema.STRING)) {
389 final String language = literal.getLanguage();
390 if (language == null) {
391 return literal.getLabel();
392 } else {
393 return AvroSerializer.newGenericRecord(AvroSchemas.STRING_LANG,
394 literal.getLabel(), language);
395 }
396 } else if (datatype.equals(XMLSchema.BOOLEAN)) {
397 return literal.booleanValue();
398 } else if (datatype.equals(XMLSchema.LONG)) {
399 return literal.longValue();
400 } else if (datatype.equals(XMLSchema.INT)) {
401 return literal.intValue();
402 } else if (datatype.equals(XMLSchema.DOUBLE)) {
403 return literal.doubleValue();
404 } else if (datatype.equals(XMLSchema.FLOAT)) {
405 return literal.floatValue();
406 } else if (datatype.equals(XMLSchema.SHORT)) {
407 return AvroSerializer.newGenericRecord(AvroSchemas.SHORT, literal.intValue());
408 } else if (datatype.equals(XMLSchema.BYTE)) {
409 return AvroSerializer.newGenericRecord(AvroSchemas.BYTE, literal.intValue());
410 } else if (datatype.equals(XMLSchema.INTEGER)) {
411 return AvroSerializer.newGenericRecord(AvroSchemas.BIGINTEGER, literal.stringValue());
412 } else if (datatype.equals(XMLSchema.DECIMAL)) {
413 return AvroSerializer.newGenericRecord(AvroSchemas.BIGDECIMAL, literal.stringValue());
414 } else if (datatype.equals(XMLSchema.DATETIME)) {
415 final XMLGregorianCalendar calendar = literal.calendarValue();
416 return AvroSerializer.newGenericRecord(AvroSchemas.CALENDAR, calendar.getTimezone(),
417 calendar.toGregorianCalendar().getTimeInMillis());
418 }
419 throw new IllegalArgumentException("Unsupported literal: " + literal);
420 }
421
422 private Object encodeStatement(final Statement statement) {
423 return AvroSerializer.newGenericRecord(AvroSchemas.STATEMENT,
424 encodeIdentifier(statement.getSubject()),
425 encodeIdentifier(statement.getPredicate()),
426 encodeValue(statement.getObject()),
427 encodeIdentifier(statement.getContext()));
428 }
429
430 private URI ensureInDictionary(final URI uri) {
431 try {
432 this.dictionary.keyFor(uri);
433 return uri;
434 } catch (final IOException ex) {
435 throw new IllegalStateException("Cannot access dictionary: " + ex.getMessage(), ex);
436 }
437 }
438
439 private static GenericData.Record newGenericRecord(final Schema schema,
440 final Object... fieldValues) {
441
442 final GenericData.Record record = new GenericData.Record(schema);
443 for (int i = 0; i < fieldValues.length; ++i) {
444 record.put(i, fieldValues[i]);
445 }
446 return record;
447 }
448
449 }