1   package eu.fbk.knowledgestore.datastore;
2   
3   import java.io.IOException;
4   import java.util.List;
5   import java.util.Map;
6   import java.util.Set;
7   
8   import javax.annotation.Nullable;
9   
10  import com.google.common.base.Function;
11  import com.google.common.base.Preconditions;
12  import com.google.common.base.Throwables;
13  import com.google.common.collect.ImmutableList;
14  import com.google.common.collect.ImmutableSet;
15  import com.google.common.collect.Sets;
16  
17  import org.openrdf.model.Resource;
18  import org.openrdf.model.Statement;
19  import org.openrdf.model.URI;
20  import org.openrdf.model.Value;
21  import org.openrdf.model.ValueFactory;
22  import org.openrdf.query.BindingSet;
23  import org.slf4j.Logger;
24  import org.slf4j.LoggerFactory;
25  
26  import eu.fbk.knowledgestore.data.Data;
27  import eu.fbk.knowledgestore.data.Handler;
28  import eu.fbk.knowledgestore.data.Record;
29  import eu.fbk.knowledgestore.data.Stream;
30  import eu.fbk.knowledgestore.data.XPath;
31  import eu.fbk.knowledgestore.runtime.DataCorruptedException;
32  import eu.fbk.knowledgestore.triplestore.SelectQuery;
33  import eu.fbk.knowledgestore.triplestore.TripleStore;
34  import eu.fbk.knowledgestore.triplestore.TripleTransaction;
35  import eu.fbk.knowledgestore.vocabulary.KS;
36  
37  public final class TripleDataStore implements DataStore {
38  
39      private static final Logger LOGGER = LoggerFactory.getLogger(TripleDataStore.class);
40  
41      private final TripleStore tripleStore;
42  
43      private boolean initialized;
44  
45      private boolean closed;
46  
47      public TripleDataStore(final TripleStore tripleStore) {
48          this.tripleStore = Preconditions.checkNotNull(tripleStore);
49          this.initialized = false;
50          this.closed = false;
51          LOGGER.info("{} configured, triplestore={}", this, this.tripleStore);
52      }
53  
54      @Override
55      public synchronized void init() throws IOException, IllegalStateException {
56          Preconditions.checkState(!this.initialized && !this.closed);
57          this.initialized = true;
58          LOGGER.info("{} initialized", this);
59      }
60  
61      @Override
62      public synchronized DataTransaction begin(final boolean readOnly)
63              throws DataCorruptedException, IOException, IllegalStateException {
64          Preconditions.checkState(this.initialized && !this.closed);
65          return new TripleDataTransaction(this.tripleStore.begin(readOnly));
66      }
67  
68      @Override
69      public synchronized void close() {
70          if (this.closed) {
71              return;
72          }
73          this.closed = true;
74      }
75  
76      @Override
77      public String toString() {
78          return this.getClass().getSimpleName();
79      }
80  
81      private static final class TripleDataTransaction implements DataTransaction {
82  
83          private final TripleTransaction transaction;
84  
85          TripleDataTransaction(final TripleTransaction transaction) {
86              this.transaction = transaction;
87          }
88  
89          private Stream<Record> query(final String spoPattern, final URI type,
90                  @Nullable final Set<? extends URI> properties, @Nullable final XPath condition)
91                  throws IOException {
92  
93              // Compose the query
94              final StringBuilder builder = new StringBuilder();
95              if (KS.RESOURCE.equals(type)) {
96                  builder.append("SELECT ?s ?p ?o ?p1 ?o1 ?p2 ?o2 {\n" //
97                          + "  ?s ?p ?o .\n" //
98                          + "  OPTIONAL {\n    ?o ?p1 ?o1\n" //
99                          + "    FILTER (?p = <http://dkm.fbk.eu/ontologies/knowledgestore#storedAs>)\n" //
100                         + "    OPTIONAL {\n" //
101                         + "      ?o1 ?p2 ?o2\n" //
102                         + "      FILTER (?p1 = <http://www.semanticdesktop.org/ontologies/2007/03/22/nfo#hasHash>)\n" //
103                         + "    }\n" //
104                         + "  }\n ");
105             } else {
106                 builder.append("SELECT ?s ?p ?o {\n" //
107                         + "  ?s ?p ?o .\n");
108             }
109             builder.append("  ?s a ").append(Data.toString(type, null)).append(" .\n");
110             builder.append(spoPattern);
111             builder.append("\n}");
112             final String query = builder.toString();
113 
114             // Issue the query
115             final Stream<BindingSet> bindingStream = Stream.create(this.transaction.query(
116                     SelectQuery.from(query), null, null));
117 
118             // Convert from bindings to statements
119             final Stream<Statement> stmtStream;
120             if (KS.RESOURCE.equals(type)) {
121                 stmtStream = bindingStream.transform(null,
122                         new Function<Handler<Statement>, Handler<BindingSet>>() {
123 
124                             @Override
125                             public Handler<BindingSet> apply(final Handler<Statement> handler) {
126                                 return new Handler<BindingSet>() {
127 
128                                     private final Set<Statement> set = Sets.newHashSet();
129 
130                                     private Resource subject = null;
131 
132                                     @Override
133                                     public void handle(final BindingSet bindings) throws Throwable {
134                                         if (bindings == null) {
135                                             handler.handle(null);
136                                             return;
137                                         }
138                                         final Resource s = (Resource) bindings.getValue("s");
139                                         final URI p = (URI) bindings.getValue("p");
140                                         final Value o = bindings.getValue("o");
141                                         final URI p1 = (URI) bindings.getValue("p1");
142                                         final Value o1 = bindings.getValue("o1");
143                                         final URI p2 = (URI) bindings.getValue("p2");
144                                         final Value o2 = bindings.getValue("o2");
145                                         final ValueFactory vf = Data.getValueFactory();
146                                         if (!s.equals(this.subject)) {
147                                             this.set.clear();
148                                             this.subject = s;
149                                         }
150                                         emit(handler, vf.createStatement(s, p, o));
151                                         if (o1 != null) {
152                                             emit(handler, vf.createStatement((URI) o, p1, o1));
153                                             if (o2 != null) {
154                                                 emit(handler, vf.createStatement((URI) o1, p2, o2));
155                                             }
156                                         }
157                                     }
158 
159                                     private void emit(final Handler<Statement> handler,
160                                             final Statement statement) throws Throwable {
161                                         if (this.set.add(statement)) {
162                                             handler.handle(statement);
163                                         }
164                                     }
165 
166                                 };
167 
168                             }
169 
170                         });
171             } else {
172                 stmtStream = bindingStream.transform(new Function<BindingSet, Statement>() {
173 
174                     @Override
175                     public Statement apply(final BindingSet bindings) {
176                         final Resource s = (Resource) bindings.getValue("s");
177                         final URI p = (URI) bindings.getValue("p");
178                         final Value o = bindings.getValue("o");
179                         return Data.getValueFactory().createStatement(s, p, o);
180                     }
181 
182                 }, 1);
183             }
184 
185             // Convert from statements to records
186             Stream<Record> recordStream = Record.decode(stmtStream, ImmutableList.of(type), true);
187 
188             // Apply condition, if specified
189             if (condition != null) {
190                 recordStream = recordStream.filter(condition.asPredicate(), 1);
191             }
192 
193             // Apply projection, if specified
194             if (properties != null && !properties.isEmpty()) {
195                 final URI[] props = properties.toArray(new URI[properties.size()]);
196                 recordStream = recordStream.transform(new Function<Record, Record>() {
197 
198                     @Override
199                     public Record apply(final Record record) {
200                         record.retain(props);
201                         return null;
202                     }
203 
204                 }, 1);
205             }
206             return recordStream;
207         }
208 
209         @Override
210         public Stream<Record> lookup(final URI type, final Set<? extends URI> ids,
211                 final Set<? extends URI> properties) throws IOException, IllegalArgumentException,
212                 IllegalStateException {
213             return Stream.concat(Stream.create(ids).chunk(64)
214                     .transform(new Function<List<? extends URI>, Stream<Record>>() {
215 
216                         @Override
217                         public Stream<Record> apply(final List<? extends URI> input) {
218                             final StringBuilder builder = new StringBuilder();
219                             builder.append("  VALUES ?s {");
220                             for (final URI id : input) {
221                                 builder.append(" <").append(id.toString()).append(">");
222                             }
223                             builder.append(" }");
224                             try {
225                                 return query(builder.toString(), type, properties, null);
226                             } catch (final IOException ex) {
227                                 throw Throwables.propagate(ex);
228                             }
229                         }
230 
231                     }, 1));
232         }
233 
234         @Override
235         public Stream<Record> retrieve(final URI type, final XPath condition,
236                 final Set<? extends URI> properties) throws IOException, IllegalArgumentException,
237                 IllegalStateException {
238             return query("  ?s a <" + type.toString() + "> .", type, properties, condition);
239         }
240 
241         @Override
242         public long count(final URI type, final XPath condition) throws IOException,
243                 IllegalArgumentException, IllegalStateException {
244             return query("  ?s a <" + type.toString() + "> .", type, null, condition).count();
245         }
246 
247         @Override
248         public Stream<Record> match(final Map<URI, XPath> conditions,
249                 final Map<URI, Set<URI>> ids, final Map<URI, Set<URI>> properties)
250                 throws IOException, IllegalStateException {
251             throw new UnsupportedOperationException();
252         }
253 
254         @Override
255         public void store(final URI type, final Record record) throws IOException,
256                 IllegalStateException {
257 
258             // Delete existing data for the record URI
259             delete(type, record.getID());
260 
261             // Add statements
262             final List<Statement> statements = Record.encode(Stream.create(record),
263                     ImmutableList.of(type)).toList();
264             this.transaction.add(statements);
265         }
266 
267         @Override
268         public void delete(final URI type, final URI id) throws IOException, IllegalStateException {
269 
270             // Obtain the statements to delete throuh a lookup
271             final List<Statement> statements = Record.encode(
272                     lookup(type, ImmutableSet.of(id), null), ImmutableList.of(type)).toList();
273 
274             // Perform the deletion
275             this.transaction.remove(statements);
276         }
277 
278         @Override
279         public void end(final boolean commit) throws DataCorruptedException, IOException,
280                 IllegalStateException {
281             this.transaction.end(commit);
282         }
283 
284     }
285 
286 }