1 package eu.fbk.knowledgestore.datastore;
2
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.io.OutputStream;
6 import java.util.List;
7 import java.util.Map;
8 import java.util.Set;
9
10 import javax.annotation.Nullable;
11
12 import com.google.common.base.Function;
13 import com.google.common.base.MoreObjects;
14 import com.google.common.base.Preconditions;
15 import com.google.common.collect.ImmutableSet;
16 import com.google.common.collect.Iterables;
17 import com.google.common.collect.Lists;
18 import com.google.common.collect.Maps;
19
20 import org.apache.hadoop.fs.FileSystem;
21 import org.apache.hadoop.fs.Path;
22 import org.openrdf.model.URI;
23 import org.openrdf.rio.RDFFormat;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 import eu.fbk.knowledgestore.data.Data;
28 import eu.fbk.knowledgestore.data.Record;
29 import eu.fbk.knowledgestore.data.Stream;
30 import eu.fbk.knowledgestore.data.XPath;
31 import eu.fbk.knowledgestore.internal.Util;
32 import eu.fbk.knowledgestore.internal.rdf.RDFUtil;
33 import eu.fbk.knowledgestore.runtime.Files;
34 import eu.fbk.knowledgestore.vocabulary.KS;
35
36
37
38
39
40
41
42
43
44
45
46
47 public class MemoryDataStore implements DataStore {
48
49 private static final Logger LOGGER = LoggerFactory.getLogger(MemoryDataStore.class);
50
51 private static final String PATH_DEFAULT = "datastore.ttl";
52
53 private Map<URI, Map<URI, Record>> tables;
54
55 private int revision;
56
57 private boolean initialized;
58
59 private boolean closed;
60
61 private final FileSystem fileSystem;
62
63 private final Path filePath;
64
65
66
67
68
69
70
71
72
73
74
75 public MemoryDataStore(final FileSystem fileSystem, @Nullable final String path) {
76 this.fileSystem = Preconditions.checkNotNull(fileSystem);
77 this.filePath = new Path(MoreObjects.firstNonNull(path, MemoryDataStore.PATH_DEFAULT))
78 .makeQualified(this.fileSystem);
79 this.tables = Maps.newHashMap();
80 this.revision = 1;
81 this.initialized = false;
82 this.closed = false;
83 for (final URI supportedType : DataStore.SUPPORTED_TYPES) {
84 this.tables.put(supportedType, Maps.<URI, Record>newLinkedHashMap());
85 }
86 MemoryDataStore.LOGGER.info("{} configured, path={}", this.getClass().getSimpleName(),
87 this.filePath);
88 }
89
90 @Override
91 public synchronized void init() throws IOException, IllegalStateException {
92 Preconditions.checkState(!this.initialized && !this.closed);
93 this.initialized = true;
94
95 InputStream stream = null;
96 try {
97 if (this.fileSystem.exists(this.filePath)) {
98 stream = Files.readWithBackup(this.fileSystem, this.filePath);
99 final RDFFormat format = RDFFormat.forFileName(this.filePath.getName());
100 final List<Record> records = Record.decode(
101 RDFUtil.readRDF(stream, format, null, null, false),
102 ImmutableSet.of(KS.RESOURCE, KS.MENTION, KS.ENTITY, KS.CONTEXT), false)
103 .toList();
104 for (final Record record : records) {
105 final URI id = Preconditions.checkNotNull(record.getID());
106 final URI type = Preconditions.checkNotNull(record.getSystemType());
107 MemoryDataStore.this.tables.get(type).put(id, record);
108 }
109 MemoryDataStore.LOGGER.info("{} initialized, {} records loaded", this.getClass()
110 .getSimpleName(), records.size());
111 } else {
112 MemoryDataStore.LOGGER.info("{} initialized, no record loaded", this.getClass()
113 .getSimpleName());
114 }
115 } finally {
116 Util.closeQuietly(stream);
117 }
118 }
119
120 @Override
121 public synchronized DataTransaction begin(final boolean readOnly) throws IOException,
122 IllegalStateException {
123 Preconditions.checkState(this.initialized && !this.closed);
124 return new MemoryDataTransaction(readOnly);
125 }
126
127 @Override
128 public synchronized void close() {
129
130 if (this.closed) {
131 return;
132 }
133 this.closed = true;
134
135 }
136
137 @Override
138 public String toString() {
139 return this.getClass().getSimpleName();
140 }
141
142 private synchronized void update(final Map<URI, Map<URI, Record>> tables, final int revision)
143 throws IOException {
144 if (this.revision != revision) {
145 throw new IOException("Commit failed due to concurrent modifications " + this.revision
146 + ", " + revision);
147 }
148
149 OutputStream stream = null;
150 try {
151 stream = Files.writeWithBackup(this.fileSystem, this.filePath);
152 final List<Record> records = Lists.newArrayList();
153 for (final URI type : tables.keySet()) {
154 records.addAll(tables.get(type).values());
155 }
156 final RDFFormat format = RDFFormat.forFileName(this.filePath.getName());
157 RDFUtil.writeRDF(stream, format, Data.getNamespaceMap(), null,
158 Record.encode(Stream.create(records), ImmutableSet.<URI>of()));
159 ++this.revision;
160 this.tables = tables;
161 MemoryDataStore.LOGGER.info("MemoryDataStore updated, {} records persisted",
162 records.size());
163
164 } catch (final Throwable ex) {
165 MemoryDataStore.LOGGER.error("MemoryDataStore update failed", ex);
166
167 } finally {
168 Util.closeQuietly(stream);
169 }
170 }
171
172 private class MemoryDataTransaction implements DataTransaction {
173
174 private final Map<URI, Map<URI, Record>> tables;
175
176 private final int revision;
177
178 private final boolean readOnly;
179
180 private boolean ended;
181
182 MemoryDataTransaction(final boolean readOnly) {
183
184 Map<URI, Map<URI, Record>> tables = MemoryDataStore.this.tables;
185 if (!readOnly) {
186 tables = Maps.newHashMap();
187 for (final Map.Entry<URI, Map<URI, Record>> entry : MemoryDataStore.this.tables
188 .entrySet()) {
189 tables.put(entry.getKey(), Maps.newLinkedHashMap(entry.getValue()));
190 }
191 }
192
193 this.tables = tables;
194 this.revision = MemoryDataStore.this.revision;
195 this.readOnly = readOnly;
196 this.ended = false;
197 }
198
199 private Map<URI, Record> getTable(final URI type) {
200 final Map<URI, Record> table = this.tables.get(type);
201 if (table != null) {
202 return table;
203 }
204 throw new IllegalArgumentException("Unsupported type " + type);
205 }
206
207 private Stream<Record> select(final Map<URI, Record> table,
208 final Stream<? extends URI> stream) {
209 return stream.transform(new Function<URI, Record>() {
210
211 @Override
212 public Record apply(final URI id) {
213 return table.get(id);
214 }
215
216 }, 0);
217 }
218
219 private Stream<Record> filter(final Stream<Record> stream, @Nullable final XPath xpath) {
220 if (xpath == null) {
221 return stream;
222 }
223 return stream.filter(xpath.asPredicate(), 0);
224 }
225
226 private Stream<Record> project(final Stream<Record> stream,
227 @Nullable final Iterable<? extends URI> properties) {
228 final URI[] array = properties == null ? null : Iterables.toArray(properties,
229 URI.class);
230 return stream.transform(new Function<Record, Record>() {
231
232 @Override
233 public final Record apply(final Record input) {
234 final Record result = Record.create(input, true);
235 if (array != null) {
236 result.retain(array);
237 }
238 return result;
239 }
240
241 }, 0);
242 }
243
244 @Override
245 public synchronized Stream<Record> lookup(final URI type, final Set<? extends URI> ids,
246 @Nullable final Set<? extends URI> properties) throws IOException,
247 IllegalArgumentException, IllegalStateException {
248 Preconditions.checkState(!this.ended);
249 final Map<URI, Record> table = this.getTable(type);
250 return this.project(this.select(table, Stream.create(ids)), properties);
251 }
252
253 @Override
254 public synchronized Stream<Record> retrieve(final URI type,
255 @Nullable final XPath condition, @Nullable final Set<? extends URI> properties)
256 throws IOException, IllegalArgumentException, IllegalStateException {
257 Preconditions.checkState(!this.ended);
258 final Map<URI, Record> table = this.getTable(type);
259 return this.project(this.filter(Stream.create(table.values()), condition), properties);
260 }
261
262 @Override
263 public synchronized long count(final URI type, @Nullable final XPath condition)
264 throws IOException, IllegalArgumentException, IllegalStateException {
265 Preconditions.checkState(!this.ended);
266 final Map<URI, Record> table = this.getTable(type);
267 return this.filter(Stream.create(table.values()), condition).count();
268 }
269
270 @Override
271 public Stream<Record> match(final Map<URI, XPath> conditions,
272 final Map<URI, Set<URI>> ids, final Map<URI, Set<URI>> properties)
273 throws IOException, IllegalStateException {
274 throw new UnsupportedOperationException();
275 }
276
277 @Override
278 public void store(final URI type, final Record record) throws IOException,
279 IllegalStateException {
280 Preconditions.checkState(!this.ended);
281 Preconditions.checkState(!this.readOnly);
282 Preconditions.checkArgument(record.getID() != null);
283 final Map<URI, Record> table = this.getTable(type);
284 table.put(record.getID(), Record.create(record, true));
285 }
286
287 @Override
288 public void delete(final URI type, final URI id) throws IOException, IllegalStateException {
289 Preconditions.checkState(!this.ended);
290 Preconditions.checkState(!this.readOnly);
291 Preconditions.checkArgument(id != null);
292 final Map<URI, Record> table = this.getTable(type);
293 table.remove(id);
294 }
295
296 @Override
297 public synchronized void end(final boolean commit) throws IOException,
298 IllegalStateException {
299 if (!this.ended) {
300 this.ended = true;
301 if (commit && !this.readOnly) {
302 MemoryDataStore.this.update(this.tables, this.revision);
303 }
304 }
305 }
306
307 @Override
308 public String toString() {
309 return this.getClass().getSimpleName();
310 }
311
312 }
313
314 }