1   package eu.fbk.knowledgestore.server.http.jaxrs;
2   
3   import java.util.List;
4   
5   import javax.ws.rs.Consumes;
6   import javax.ws.rs.GET;
7   import javax.ws.rs.HttpMethod;
8   import javax.ws.rs.POST;
9   import javax.ws.rs.Path;
10  import javax.ws.rs.Produces;
11  import javax.ws.rs.QueryParam;
12  import javax.ws.rs.core.Response;
13  import javax.ws.rs.core.Response.Status;
14  
15  import com.google.common.base.MoreObjects;
16  import com.google.common.base.Preconditions;
17  import com.google.common.collect.ImmutableList;
18  import com.google.common.collect.ImmutableSet;
19  import com.google.common.collect.Iterables;
20  import com.google.common.collect.Lists;
21  
22  import org.codehaus.enunciate.jaxrs.ResponseCode;
23  import org.codehaus.enunciate.jaxrs.StatusCodes;
24  import org.codehaus.enunciate.jaxrs.TypeHint;
25  import org.openrdf.model.Literal;
26  import org.openrdf.model.Statement;
27  import org.openrdf.model.URI;
28  import org.openrdf.model.ValueFactory;
29  
30  import eu.fbk.knowledgestore.Operation;
31  import eu.fbk.knowledgestore.OperationException;
32  import eu.fbk.knowledgestore.Outcome;
33  import eu.fbk.knowledgestore.data.Criteria;
34  import eu.fbk.knowledgestore.data.Data;
35  import eu.fbk.knowledgestore.data.Record;
36  import eu.fbk.knowledgestore.data.Stream;
37  import eu.fbk.knowledgestore.data.XPath;
38  import eu.fbk.knowledgestore.internal.jaxrs.Protocol;
39  import eu.fbk.knowledgestore.vocabulary.KSR;
40  
41  public abstract class Crud extends Resource {
42  
43      private static final long DEFAULT_RETRIEVE_LIMIT = 1000;
44  
45      private final URI recordType;
46  
47      Crud(final URI recordType) {
48          this.recordType = Preconditions.checkNotNull(recordType);
49      }
50  
51      final URI getRecordType() {
52          return this.recordType;
53      }
54  
55      @GET
56      @Produces(Protocol.MIME_TYPES_RDF)
57      @TypeHint(Stream.class)
58      @StatusCodes({ @ResponseCode(code = 200, condition = "if the request is acceptable and the "
59              + "query is being executed") })
60      public Response retrieve(
61              @QueryParam(Protocol.PARAMETER_CONDITION) final List<XPath> conditions,
62              @QueryParam(Protocol.PARAMETER_ID) final List<String> ids,
63              @QueryParam(Protocol.PARAMETER_PROPERTY) final List<String> properties,
64              @QueryParam(Protocol.PARAMETER_OFFSET) final Long offset,
65              @QueryParam(Protocol.PARAMETER_LIMIT) final Long limit) throws OperationException {
66  
67          // Apply default limit, if not explicitly given
68          final long actualLimit = MoreObjects.firstNonNull(limit, DEFAULT_RETRIEVE_LIMIT);
69  
70          // Prepare the retrieve operation, returning an error if parameters are wrong
71          final Operation.Retrieve operation;
72          try {
73              operation = getSession().retrieve(getRecordType()) //
74                      .timeout(getTimeout()) //
75                      .conditions(emptyToNull(conditions)) //
76                      .ids(emptyToNull(parseURIs(ids))) //
77                      .properties(emptyToNull(parseURIs(properties))) //
78                      .offset(offset) //
79                      .limit(actualLimit);
80          } catch (final IllegalArgumentException ex) {
81              throw new OperationException(newOutcome(Outcome.Status.ERROR_INVALID_INPUT,
82                      ex.getMessage()), ex);
83          }
84  
85          // Validate client preconditions, using default last modified and tag
86          init(false, null, null, null);
87  
88          // Setup the resulting stream (materialized only for GET requests)
89          Stream<Record> entity;
90          if (getMethod().equals(HttpMethod.HEAD)) {
91              entity = Stream.create();
92          } else {
93              entity = operation.exec();
94          }
95          entity.setProperty("types", ImmutableSet.of(getRecordType()));
96  
97          // Stream the results in the HTTP response
98          return newResponseBuilder(Status.OK, closeOnCompletion(entity), Protocol.STREAM_OF_RECORDS)
99                  .build();
100     }
101 
102     @GET
103     @Path(Protocol.SUBPATH_COUNT)
104     @Produces(Protocol.MIME_TYPES_RDF)
105     @TypeHint(Stream.class)
106     @StatusCodes({ @ResponseCode(code = 200, condition = "if the request is acceptable and the "
107             + "requested count is being returned") })
108     public Response count( //
109             @QueryParam(Protocol.PARAMETER_CONDITION) final List<XPath> conditions, //
110             @QueryParam(Protocol.PARAMETER_ID) final List<String> ids) throws OperationException {
111 
112         // Prepare the count operation, returning an error if parameters are wrong
113         final Operation.Count operation;
114         try {
115             operation = getSession() //
116                     .count(getRecordType()) //
117                     .timeout(getTimeout()) //
118                     .conditions(emptyToNull(conditions)) //
119                     .ids(emptyToNull(parseURIs(ids)));
120         } catch (final IllegalArgumentException ex) {
121             throw new OperationException(newOutcome(Outcome.Status.ERROR_INVALID_INPUT,
122                     ex.getMessage()), ex);
123         }
124 
125         // Validate client preconditions, using default last modified and tag
126         init(false, null, null, null);
127 
128         // Setup the resulting stream (materialized only for GET requests)
129         Stream<Statement> entity;
130         if (getMethod().equals(HttpMethod.HEAD)) {
131             entity = Stream.create();
132         } else {
133             final long count = operation.exec();
134             final ValueFactory factory = Data.getValueFactory();
135             final Literal literal = factory.createLiteral(count);
136             final Statement statement = factory.createStatement(getInvocationID(), KSR.RESULT,
137                     literal);
138             entity = Stream.create(statement);
139         }
140 
141         // Return the result in the HTTP response
142         return newResponseBuilder(Status.OK, closeOnCompletion(entity),
143                 Protocol.STREAM_OF_STATEMENTS).build();
144     }
145 
146     @POST
147     @Path(Protocol.SUBPATH_CREATE)
148     @Consumes(Protocol.MIME_TYPES_RDF)
149     @Produces(Protocol.MIME_TYPES_RDF)
150     @TypeHint(Stream.class)
151     public Response create(final Stream<Record> records) throws OperationException {
152 
153         // Schedule closing of input entity
154         closeOnCompletion(records);
155 
156         // Setup the Create operation, returning an error if parameters are wrong
157         final Operation.Create operation;
158         try {
159             operation = getSession() //
160                     .create(getRecordType()) //
161                     .timeout(getTimeout()) //
162                     .records(records);
163         } catch (final IllegalArgumentException ex) {
164             throw new OperationException(newOutcome(Outcome.Status.ERROR_INVALID_INPUT,
165                     ex.getMessage()), ex);
166         }
167 
168         // Validate client preconditions and handle probing
169         init(true, null);
170 
171         // Setup record decoding
172         records.setProperty("types", ImmutableList.of(getRecordType()));
173         closeOnCompletion(records);
174 
175         // Perform the operation
176         final List<Outcome> outcomes = Lists.newArrayList();
177         operation.exec(outcomes);
178 
179         // Setup the resulting stream
180         final Stream<Outcome> entity = Stream.create(outcomes);
181         entity.setProperty("types", ImmutableSet.of(KSR.INVOCATION));
182 
183         // final Stream<Outcome> entity = new Stream<Outcome>() {
184         //
185         // @Override
186         // protected void doToHandler(final Handler<? super Outcome> handler) {
187         // try {
188         // operation.exec(handler);
189         // } catch (final Throwable ex) {
190         // propagateIfNotBulk(ex);
191         // }
192         // }
193         //
194         // };
195 
196         // Stream the results in the HTTP response
197         return newResponseBuilder(Status.OK, entity, Protocol.STREAM_OF_OUTCOMES).build();
198     }
199 
200     @POST
201     @Path(Protocol.SUBPATH_MERGE)
202     @Consumes(Protocol.MIME_TYPES_RDF)
203     @Produces(Protocol.MIME_TYPES_RDF)
204     @TypeHint(Stream.class)
205     public Response merge( //
206             @QueryParam(Protocol.PARAMETER_CRITERIA) final List<Criteria> criterias, //
207             final Stream<Record> records) throws OperationException {
208 
209         // Schedule closing of input entity
210         closeOnCompletion(records);
211 
212         // Setup the merge operation, returning an error if parameters are wrong
213         final Operation.Merge operation;
214         try {
215             operation = getSession() //
216                     .merge(getRecordType()) //
217                     .timeout(getTimeout()) //
218                     .records(records) //
219                     .criteria(emptyToNull(criterias));
220         } catch (final IllegalArgumentException ex) {
221             throw new OperationException(newOutcome(Outcome.Status.ERROR_INVALID_INPUT,
222                     ex.getMessage()), ex);
223         }
224 
225         // Validate client preconditions
226         init(true, null);
227 
228         // Setup record decoding
229         records.setProperty("types", ImmutableList.of(getRecordType()));
230         closeOnCompletion(records);
231 
232         // Perform the operation
233         final List<Outcome> outcomes = Lists.newArrayList();
234         operation.exec(outcomes);
235 
236         // Setup the resulting stream
237         final Stream<Outcome> entity = Stream.create(outcomes);
238         entity.setProperty("types", ImmutableSet.of(KSR.INVOCATION));
239 
240         // final Stream<Outcome> entity = new Stream<Outcome>() {
241         //
242         // @Override
243         // protected void doToHandler(final Handler<? super Outcome> handler) {
244         // try {
245         // operation.exec(handler);
246         // } catch (final Throwable ex) {
247         // propagateIfNotBulk(ex);
248         // }
249         // }
250         //
251         // };
252 
253         // Stream the results in the HTTP response
254         return newResponseBuilder(Status.OK, entity, Protocol.STREAM_OF_OUTCOMES).build();
255     }
256 
257     @POST
258     @Path(Protocol.SUBPATH_UPDATE)
259     @Consumes(Protocol.MIME_TYPES_RDF)
260     @Produces(Protocol.MIME_TYPES_RDF)
261     @TypeHint(Stream.class)
262     public Response update( //
263             @QueryParam(Protocol.PARAMETER_CONDITION) final List<XPath> conditions, //
264             @QueryParam(Protocol.PARAMETER_ID) final List<URI> ids, //
265             @QueryParam(Protocol.PARAMETER_CRITERIA) final List<Criteria> criterias, //
266             final Stream<Record> records) throws OperationException {
267 
268         // Schedule closing of input entity
269         closeOnCompletion(records);
270 
271         // Setup the update operation (apart record), returning an error if parameters are wrong
272         final Operation.Update operation;
273         try {
274             operation = getSession() //
275                     .update(getRecordType()) //
276                     .timeout(getTimeout()) //
277                     .conditions(emptyToNull(conditions)) //
278                     .ids(emptyToNull(ids)) //
279                     .criteria(emptyToNull(criterias));
280         } catch (final IllegalArgumentException ex) {
281             throw new OperationException(newOutcome(Outcome.Status.ERROR_INVALID_INPUT,
282                     ex.getMessage()), ex);
283         }
284 
285         // Validate client preconditions and handle probing
286         init(true, null);
287 
288         // Decode input record
289         records.setProperty("types", ImmutableList.of(getRecordType()));
290         final Record record = records.getUnique();
291 
292         // Perform the operation
293         final List<Outcome> outcomes = Lists.newArrayList();
294         operation.record(record).exec(outcomes);
295 
296         // Setup the resulting stream
297         final Stream<Outcome> entity = Stream.create(outcomes);
298         entity.setProperty("types", ImmutableSet.of(KSR.INVOCATION));
299 
300         // final Stream<Outcome> entity = new Stream<Outcome>() {
301         //
302         // @Override
303         // protected void doToHandler(final Handler<? super Outcome> handler) {
304         // try {
305         // operation.record(record).exec(handler);
306         // } catch (final Throwable ex) {
307         // propagateIfNotBulk(ex);
308         // }
309         // }
310         //
311         // };
312 
313         // Stream the results in the HTTP response
314         return newResponseBuilder(Status.OK, entity, Protocol.STREAM_OF_OUTCOMES).build();
315     }
316 
317     @POST
318     @Path(Protocol.SUBPATH_DELETE)
319     @Produces(Protocol.MIME_TYPES_RDF)
320     @TypeHint(Stream.class)
321     public Response delete( //
322             @QueryParam(Protocol.PARAMETER_CONDITION) final List<XPath> conditions, //
323             @QueryParam(Protocol.PARAMETER_ID) final List<URI> ids) throws OperationException {
324 
325         // Setup the delete operation, returning an error if parameters are wrong
326         final Operation.Delete operation;
327         try {
328             operation = getSession() //
329                     .delete(getRecordType()) //
330                     .timeout(getTimeout()) //
331                     .conditions(emptyToNull(conditions)) //
332                     .ids(emptyToNull(ids));
333         } catch (final IllegalArgumentException ex) {
334             throw new OperationException(newOutcome(Outcome.Status.ERROR_INVALID_INPUT,
335                     ex.getMessage()), ex);
336         }
337 
338         // Validate client preconditions and handle probing
339         init(true, null);
340 
341         // Perform the operation
342         final List<Outcome> outcomes = Lists.newArrayList();
343         operation.exec(outcomes);
344 
345         // Setup the resulting stream
346         final Stream<Outcome> entity = Stream.create(outcomes);
347         entity.setProperty("types", ImmutableSet.of(KSR.INVOCATION));
348 
349         // final Stream<Outcome> entity = new Stream<Outcome>() {
350         //
351         // @Override
352         // protected void doToHandler(final Handler<? super Outcome> handler) {
353         // try {
354         // operation.exec(handler);
355         // } catch (final Throwable ex) {
356         // propagateIfNotBulk(ex);
357         // }
358         // }
359         //
360         // };
361 
362         // Stream the results in the HTTP response
363         return newResponseBuilder(Status.OK, closeOnCompletion(entity),
364                 Protocol.STREAM_OF_OUTCOMES).build();
365     }
366 
367     private <T extends Iterable<?>> T emptyToNull(final T iterable) {
368         return iterable == null || Iterables.isEmpty(iterable) ? null : iterable;
369     }
370 
371     private List<URI> parseURIs(final Iterable<String> strings) {
372         final List<URI> uris = Lists.newArrayList();
373         for (final String string : strings) {
374             final int length = string.length();
375             boolean escape = false;
376             boolean qname = false;
377             int start = -1;
378             for (int i = 0; i < length; ++i) {
379                 final char ch = string.charAt(i);
380                 if (escape) {
381                     escape = false;
382                 } else if (start >= 0) {
383                     if (qname) {
384                         if (ch == ',' || ch == ';' || Character.isWhitespace(ch)) {
385                             uris.add((URI) Data.parseValue(string.substring(start, i),
386                                     Data.getNamespaceMap()));
387                             start = -1;
388                         }
389                     } else {
390                         if (ch == '\\') {
391                             escape = true;
392                         } else if (ch == '>') {
393                             uris.add((URI) Data.parseValue(string.substring(start, i + 1),
394                                     Data.getNamespaceMap()));
395                             start = -1;
396 
397                         }
398                     }
399                 } else if (ch == '<') {
400                     start = i;
401                     qname = false;
402                 } else if (!Character.isWhitespace(ch)) {
403                     start = i;
404                     qname = true;
405                 }
406             }
407             if (start >= 0) {
408                 if (qname) {
409                     uris.add((URI) Data.parseValue(string.substring(start), Data.getNamespaceMap()));
410                 } else {
411                     throw new IllegalArgumentException("Invalid ID(s): " + string);
412                 }
413             }
414         }
415 
416         return uris;
417     }
418 
419 }