1 package eu.fbk.knowledgestore.internal.jaxrs;
2
3 import java.io.ByteArrayInputStream;
4 import java.io.ByteArrayOutputStream;
5 import java.io.FilterInputStream;
6 import java.io.IOException;
7 import java.io.InputStream;
8 import java.io.OutputStream;
9 import java.lang.annotation.Annotation;
10 import java.lang.reflect.Type;
11 import java.util.Map;
12 import java.util.concurrent.atomic.AtomicLong;
13
14 import javax.annotation.Nullable;
15 import javax.ws.rs.Consumes;
16 import javax.ws.rs.Produces;
17 import javax.ws.rs.WebApplicationException;
18 import javax.ws.rs.core.HttpHeaders;
19 import javax.ws.rs.core.MediaType;
20 import javax.ws.rs.core.MultivaluedMap;
21 import javax.ws.rs.ext.MessageBodyReader;
22 import javax.ws.rs.ext.MessageBodyWriter;
23 import javax.ws.rs.ext.Provider;
24
25 import com.google.common.base.Charsets;
26 import com.google.common.base.Splitter;
27 import com.google.common.base.Throwables;
28 import com.google.common.collect.ImmutableSet;
29 import com.google.common.io.CountingInputStream;
30 import com.google.common.io.CountingOutputStream;
31 import com.google.common.reflect.TypeToken;
32
33 import org.openrdf.model.Statement;
34 import org.openrdf.model.URI;
35 import org.openrdf.model.vocabulary.DCTERMS;
36 import org.openrdf.query.BindingSet;
37 import org.openrdf.query.resultio.BooleanQueryResultFormat;
38 import org.openrdf.query.resultio.TupleQueryResultFormat;
39 import org.openrdf.rio.RDFFormat;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42
43 import eu.fbk.knowledgestore.Outcome;
44 import eu.fbk.knowledgestore.data.Data;
45 import eu.fbk.knowledgestore.data.Record;
46 import eu.fbk.knowledgestore.data.Representation;
47 import eu.fbk.knowledgestore.data.Stream;
48 import eu.fbk.knowledgestore.internal.Logging;
49 import eu.fbk.knowledgestore.internal.Util;
50 import eu.fbk.knowledgestore.internal.rdf.RDFUtil;
51 import eu.fbk.knowledgestore.vocabulary.KS;
52 import eu.fbk.knowledgestore.vocabulary.NFO;
53 import eu.fbk.knowledgestore.vocabulary.NIE;
54 import eu.fbk.rdfpro.tql.TQL;
55
56 @Provider
57 @Consumes(MediaType.WILDCARD)
58 @Produces(MediaType.WILDCARD)
59 public class Serializer implements MessageBodyReader<Object>, MessageBodyWriter<Object> {
60
61
62
63 private static final Logger LOGGER = LoggerFactory.getLogger(Serializer.class);
64
65 @Override
66 public boolean isReadable(final Class<?> type, final Type genericType,
67 final Annotation[] annotations, final MediaType mediaType) {
68
69 final boolean result = type.isAssignableFrom(Representation.class)
70 || isAssignable(genericType, Protocol.STREAM_OF_RECORDS.getType())
71 || isAssignable(genericType, Protocol.STREAM_OF_OUTCOMES.getType())
72 || isAssignable(genericType, Protocol.STREAM_OF_STATEMENTS.getType())
73 || isAssignable(genericType, Protocol.STREAM_OF_TUPLES.getType())
74 || isAssignable(genericType, Protocol.STREAM_OF_BOOLEANS.getType());
75
76 if (!result) {
77 LOGGER.debug("Non deserializable stream: {} ({})", genericType, mediaType);
78 }
79
80 return result;
81 }
82
83 @Override
84 public boolean isWriteable(final Class<?> type, final Type genericType,
85 final Annotation[] annotations, final MediaType mediaType) {
86
87 final boolean result = Representation.class.isAssignableFrom(type)
88 || isAssignable(Protocol.STREAM_OF_RECORDS.getType(), genericType)
89 || isAssignable(Protocol.STREAM_OF_OUTCOMES.getType(), genericType)
90 || isAssignable(Protocol.STREAM_OF_STATEMENTS.getType(), genericType)
91 || isAssignable(Protocol.STREAM_OF_TUPLES.getType(), genericType)
92 || isAssignable(Protocol.STREAM_OF_BOOLEANS.getType(), genericType);
93
94 if (!result) {
95 LOGGER.debug("Non serializable stream: {} ({})", genericType, mediaType);
96 }
97
98 return result;
99 }
100
101 @Override
102 public long getSize(final Object object, final Class<?> type, final Type genericType,
103 final Annotation[] annotations, final MediaType mediaType) {
104 throw new UnsupportedOperationException();
105 }
106
107 @SuppressWarnings("resource")
108 @Override
109 public Object readFrom(final Class<Object> type, final Type genericType,
110 final Annotation[] annotations, final MediaType mediaType,
111 final MultivaluedMap<String, String> headers, final InputStream input)
112 throws IOException, WebApplicationException {
113
114 final String mimeType = mediaType.getType() + "/" + mediaType.getSubtype();
115
116 final CountingInputStream in = new CountingInputStream(input);
117 final boolean chunked = "true".equalsIgnoreCase(headers.getFirst(Protocol.HEADER_CHUNKED));
118 final long ts = System.currentTimeMillis();
119
120 try {
121 if (type.isAssignableFrom(Representation.class)) {
122 final InputStream stream = interceptClose(in, ts);
123 final Representation representation = Representation.create(stream);
124 readMetadata(representation.getMetadata(), headers);
125 return representation;
126
127 } else if (isAssignable(genericType, Protocol.STREAM_OF_RECORDS.getType())) {
128 final RDFFormat format = formatFor(mimeType);
129 final AtomicLong numStatements = new AtomicLong();
130 final AtomicLong numRecords = new AtomicLong();
131 Stream<Statement> statements = RDFUtil.readRDF(in, format, null, null, false);
132 statements = statements.track(numStatements, null);
133 Stream<Record> records = Record.decode(statements, null, chunked);
134 records = records.track(numRecords, null);
135 interceptClose(records, in, ts, numRecords, "record(s)", numStatements,
136 "statement(s)");
137 return records;
138
139 } else if (isAssignable(genericType, Protocol.STREAM_OF_OUTCOMES.getType())) {
140 final RDFFormat format = formatFor(mimeType);
141 final AtomicLong numStatements = new AtomicLong();
142 final AtomicLong numOutcomes = new AtomicLong();
143 Stream<Statement> statements = RDFUtil.readRDF(in, format, null, null, false);
144 statements = statements.track(numStatements, null);
145 Stream<Outcome> outcomes = Outcome.decode(statements, chunked);
146 outcomes = outcomes.track(numOutcomes, null);
147 interceptClose(outcomes, in, ts, numOutcomes, "outcome(s)", numStatements,
148 "statement(s)");
149 return outcomes;
150
151 } else if (isAssignable(genericType, Protocol.STREAM_OF_STATEMENTS.getType())) {
152 final RDFFormat format = formatFor(mimeType);
153 final AtomicLong numStatements = new AtomicLong();
154 Stream<Statement> statements = RDFUtil.readRDF(in, format, null, null, false);
155 statements = statements.track(numStatements, null);
156 interceptClose(statements, in, ts, numStatements, "statement(s)");
157 return statements;
158
159 } else if (isAssignable(genericType, Protocol.STREAM_OF_TUPLES.getType())) {
160 final TupleQueryResultFormat format;
161 format = TupleQueryResultFormat.forMIMEType(mimeType);
162 final AtomicLong numTuples = new AtomicLong();
163 Stream<BindingSet> tuples = RDFUtil.readSparqlTuples(format, in);
164 tuples = tuples.track(numTuples, null);
165 interceptClose(tuples, in, ts, numTuples, "tuple(s)");
166 return tuples;
167
168 } else if (isAssignable(genericType, Protocol.STREAM_OF_BOOLEANS.getType())) {
169 final BooleanQueryResultFormat format;
170 format = BooleanQueryResultFormat.forMIMEType(mimeType);
171 final boolean result = RDFUtil.readSparqlBoolean(format, in);
172 final Stream<Boolean> stream = Stream.create(result);
173 interceptClose(stream, in, ts, 1, "boolean");
174 return stream;
175 }
176 } catch (final Throwable ex) {
177 Util.closeQuietly(in);
178 Throwables.propagateIfPossible(ex, IOException.class);
179 throw Throwables.propagate(ex);
180 }
181
182 throw new IllegalArgumentException("Cannot deserialize " + genericType + " from "
183 + mimeType);
184 }
185
186 @SuppressWarnings("unchecked")
187 @Override
188 public void writeTo(final Object object, final Class<?> type, final Type genericType,
189 final Annotation[] annotations, final MediaType mediaType,
190 final MultivaluedMap<String, Object> headers, final OutputStream output)
191 throws IOException, WebApplicationException {
192
193 final String mimeType = mediaType.getType() + "/" + mediaType.getSubtype();
194
195 final Map<String, String> namespaces = Data.getNamespaceMap();
196 final CountingOutputStream out = new CountingOutputStream(output);
197 final long ts = System.currentTimeMillis();
198
199 try {
200 if (Representation.class.isAssignableFrom(type)) {
201 final Representation representation = (Representation) object;
202 writeMetadata(representation.getMetadata(), headers);
203 representation.writeTo(out);
204 logWrite(ts, out);
205
206 } else if (isAssignable(Protocol.STREAM_OF_RECORDS.getType(), genericType)) {
207 headers.putSingle(Protocol.HEADER_CHUNKED, "true");
208 final String mime = setupType(mimeType, Protocol.MIME_TYPES_RDF, headers);
209 final RDFFormat format = formatFor(mime);
210 final AtomicLong recordCounter = new AtomicLong();
211 Stream<? extends Record> records = (Stream<? extends Record>) object;
212 records = records.track(recordCounter, null);
213 final Stream<Statement> stmt = Record.encode(records, null);
214 final long count = RDFUtil.writeRDF(out, format, namespaces, null, stmt);
215 logWrite(ts, out, recordCounter.get(), "record(s)", count, "statement(s)");
216
217 } else if (isAssignable(Protocol.STREAM_OF_OUTCOMES.getType(), genericType)) {
218 headers.putSingle(Protocol.HEADER_CHUNKED, "true");
219 final String mime = setupType(mimeType, Protocol.MIME_TYPES_RDF, headers);
220 final RDFFormat format = formatFor(mime);
221 final AtomicLong outcomeCounter = new AtomicLong();
222 Stream<? extends Outcome> outcomes = (Stream<? extends Outcome>) object;
223 outcomes = outcomes.track(outcomeCounter, null);
224 final Stream<Statement> stmt = Outcome.encode(outcomes);
225 final long count = RDFUtil.writeRDF(out, format, namespaces, null, stmt);
226 logWrite(ts, out, outcomeCounter.get(), "outcome(s)", count, "statement(s)");
227
228 } else if (isAssignable(Protocol.STREAM_OF_STATEMENTS.getType(), genericType)) {
229 final String mime = setupType(mimeType, Protocol.MIME_TYPES_RDF, headers);
230 final RDFFormat format = formatFor(mime);
231 final Stream<? extends Statement> stmt = (Stream<? extends Statement>) object;
232 final long count = RDFUtil.writeRDF(out, format, namespaces, null, stmt);
233 logWrite(ts, out, count, "statement(s)");
234
235 } else if (isAssignable(Protocol.STREAM_OF_TUPLES.getType(), genericType)) {
236 final String mime = setupType(mimeType, Protocol.MIME_TYPES_SPARQL_TUPLE, headers);
237 final TupleQueryResultFormat format = TupleQueryResultFormat.forMIMEType(mime);
238 final Stream<? extends BindingSet> tuples = (Stream<? extends BindingSet>) object;
239 final long count = RDFUtil.writeSparqlTuples(format, out, tuples);
240 logWrite(ts, out, count, "tuple(s)");
241
242 } else if (isAssignable(Protocol.STREAM_OF_BOOLEANS.getType(), genericType)) {
243 final String mime = setupType(mimeType, Protocol.MIME_TYPES_SPARQL_BOOLEAN,
244 headers);
245 final BooleanQueryResultFormat format = BooleanQueryResultFormat.forMIMEType(mime);
246 final boolean bool = ((Stream<? extends Boolean>) object).getUnique();
247 RDFUtil.writeSparqlBoolean(format, out, bool);
248 logWrite(ts, out, 1, "boolean");
249
250 } else {
251 throw new IllegalArgumentException("Cannot serialize " + genericType + " to "
252 + mediaType);
253 }
254
255 } finally {
256 Util.closeQuietly(object);
257 Util.closeQuietly(out);
258 }
259 }
260
261 @Nullable
262 private static void readMetadata(final Record metadata,
263 final MultivaluedMap<String, String> headers) {
264
265
266 final String mime = headers.getFirst(HttpHeaders.CONTENT_TYPE);
267 metadata.set(NIE.MIME_TYPE, mime != null ? mime : MediaType.APPLICATION_OCTET_STREAM);
268
269
270 final String md5 = headers.getFirst("Content-MD5");
271 if (md5 != null) {
272 final Record hash = Record.create();
273 hash.set(NFO.HASH_ALGORITHM, "MD5");
274 hash.set(NFO.HASH_VALUE, md5);
275 metadata.set(NFO.HAS_HASH, hash);
276 }
277
278
279 final String language = headers.getFirst(HttpHeaders.CONTENT_LANGUAGE);
280 try {
281 metadata.set(DCTERMS.LANGUAGE, Data.languageCodeToURI(language));
282 } catch (final Throwable ex) {
283 LOGGER.warn("Invalid {}: {}", HttpHeaders.CONTENT_LANGUAGE, language);
284 }
285
286
287 final String encodedMeta = headers.getFirst(Protocol.HEADER_META);
288 if (encodedMeta != null) {
289 final InputStream in = new ByteArrayInputStream(encodedMeta.getBytes(Charsets.UTF_8));
290 final Stream<Statement> statements = RDFUtil.readRDF(in, RDFFormat.TURTLE,
291 Data.getNamespaceMap(), null, true);
292 final Record record = Record.decode(statements,
293 ImmutableSet.<URI>of(KS.REPRESENTATION), true).getUnique();
294 metadata.setID(record.getID());
295 for (final URI property : record.getProperties()) {
296 metadata.set(property, record.get(property));
297 }
298 }
299 }
300
301 @Nullable
302 private static void writeMetadata(final Record metadata,
303 final MultivaluedMap<String, Object> headers) {
304
305
306 headers.putSingle(HttpHeaders.CONTENT_TYPE, metadata.getUnique(NIE.MIME_TYPE,
307 String.class, MediaType.APPLICATION_OCTET_STREAM));
308
309
310 final Record hash = metadata.getUnique(NFO.HAS_HASH, Record.class, null);
311 final String md5 = hash == null ? null : !"MD5".equals(hash.getUnique(NFO.HASH_ALGORITHM,
312 String.class, null)) ? null
313 : hash.getUnique(NFO.HASH_VALUE, String.class, null);
314 headers.putSingle("Content-MD5", md5);
315
316
317 String language = metadata.getUnique(NIE.LANGUAGE, String.class, null);
318 if (language == null) {
319 final URI languageURI = metadata.getUnique(DCTERMS.LANGUAGE, URI.class, null);
320 try {
321 language = Data.languageURIToCode(languageURI);
322 } catch (final Throwable ex) {
323 LOGGER.warn("Invalid language URI: ", languageURI);
324 }
325 }
326 headers.putSingle(HttpHeaders.CONTENT_LANGUAGE, language);
327
328
329 final ByteArrayOutputStream out = new ByteArrayOutputStream();
330 final Stream<Statement> statements = Record.encode(Stream.create(metadata),
331 ImmutableSet.<URI>of(KS.REPRESENTATION));
332 RDFUtil.writeRDF(out, RDFFormat.TURTLE, Data.getNamespaceMap(), null, statements);
333 final String string = new String(out.toByteArray(), Charsets.UTF_8);
334 final StringBuilder builder = new StringBuilder();
335 String separator = "";
336 for (final String line : Splitter.on('\n').trimResults().omitEmptyStrings().split(string)) {
337 if (!line.toLowerCase().startsWith("@prefix")) {
338 builder.append(separator).append(line);
339 separator = " ";
340 }
341 }
342 headers.putSingle(Protocol.HEADER_META, builder.toString());
343 }
344
345 private static boolean isAssignable(final Type lhs, final Type rhs) {
346 return TypeToken.of(lhs).isAssignableFrom(rhs);
347 }
348
349 private static String setupType(final String jaxrsType, final String supportedTypes,
350 final MultivaluedMap<String, Object> headers) {
351
352 if (jaxrsType != null) {
353 return jaxrsType;
354 }
355
356 final int index = supportedTypes.indexOf(',');
357 final String mediaType = index < 0 ? supportedTypes : supportedTypes.substring(0, index);
358
359 headers.putSingle("Content-Type", mediaType);
360 headers.remove("ETag");
361
362 return mediaType;
363 }
364
365 private static void interceptClose(final Stream<?> stream, final CountingInputStream in,
366 final long startTime, final Object... args) {
367 final Map<String, String> mdc = Logging.getMDC();
368 stream.onClose(new Runnable() {
369
370 @Override
371 public void run() {
372 final Map<String, String> oldMdc = Logging.getMDC();
373 try {
374 Logging.setMDC(mdc);
375 logRead(in, startTime, args);
376
377
378
379 Util.closeQuietly(in);
380 } finally {
381 Logging.setMDC(oldMdc);
382 }
383 }
384
385 });
386 }
387
388 private static InputStream interceptClose(final CountingInputStream stream,
389 final long startTime, final Object... args) {
390 final Map<String, String> mdc = Logging.getMDC();
391 return new FilterInputStream(stream) {
392
393 private boolean closed;
394
395 @Override
396 public void close() throws IOException {
397 if (this.closed) {
398 return;
399 }
400 final Map<String, String> oldMdc = Logging.getMDC();
401 try {
402 Logging.setMDC(mdc);
403 logRead(stream, startTime, args);
404
405
406
407 Util.closeQuietly(this.in);
408 } finally {
409 this.closed = true;
410 Logging.setMDC(oldMdc);
411 super.close();
412 }
413 }
414
415 };
416 }
417
418 private static void logRead(final CountingInputStream in, final long startTime,
419 final Object... args) {
420 if (LOGGER.isDebugEnabled()) {
421 boolean eof = false;
422 try {
423 eof = in.read() == -1;
424 } catch (final Throwable ex) {
425
426 }
427 final long elapsed = System.currentTimeMillis() - startTime;
428 final StringBuilder builder = new StringBuilder();
429 builder.append("Http: read complete, ");
430 for (int i = 0; i < args.length; i += 2) {
431 builder.append(args[i]).append(" ").append(args[i + 1]).append(", ");
432 }
433 builder.append(in.getCount()).append(" byte(s), ");
434 if (eof) {
435 builder.append("EOF, ");
436 }
437 builder.append(elapsed).append(" ms");
438 LOGGER.debug(builder.toString());
439 }
440 }
441
442 private static void logWrite(final long startTime, final CountingOutputStream stream,
443 final Object... args) {
444 if (LOGGER.isDebugEnabled()) {
445 final long elapsed = System.currentTimeMillis() - startTime;
446 final StringBuilder builder = new StringBuilder();
447 builder.append("Http: write complete, ");
448 for (int i = 0; i < args.length; i += 2) {
449 builder.append(args[i]).append(" ").append(args[i + 1]).append(", ");
450 }
451 builder.append(stream.getCount()).append(" byte(s), ");
452 builder.append(elapsed).append(" ms");
453 LOGGER.debug(builder.toString());
454 }
455 }
456
457 private static RDFFormat formatFor(final String mimeType) {
458 final RDFFormat format = RDFFormat.forMIMEType(mimeType);
459 if (format == null) {
460 throw new IllegalArgumentException("No RDF format for MIME type '" + mimeType + "'");
461 }
462 return format;
463 }
464
465 static {
466 RDFFormat.register(TQL.FORMAT);
467 }
468
469 }