1 package eu.fbk.knowledgestore.datastore;
2
3 import java.io.IOException;
4 import java.util.Iterator;
5 import java.util.List;
6 import java.util.Map;
7 import java.util.Set;
8 import java.util.concurrent.atomic.AtomicLong;
9 import java.util.concurrent.locks.ReadWriteLock;
10 import java.util.concurrent.locks.ReentrantReadWriteLock;
11
12 import javax.annotation.Nullable;
13
14 import com.google.common.base.MoreObjects;
15 import com.google.common.base.Preconditions;
16 import com.google.common.cache.Cache;
17 import com.google.common.cache.CacheBuilder;
18 import com.google.common.collect.ImmutableList;
19 import com.google.common.collect.Lists;
20 import com.google.common.collect.Maps;
21 import com.google.common.collect.Sets;
22
23 import org.openrdf.model.URI;
24 import org.slf4j.Logger;
25 import org.slf4j.LoggerFactory;
26
27 import eu.fbk.knowledgestore.data.Record;
28 import eu.fbk.knowledgestore.data.Stream;
29 import eu.fbk.knowledgestore.data.XPath;
30 import eu.fbk.knowledgestore.vocabulary.KS;
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74 public class CachingDataStore extends ForwardingDataStore {
75
76 private static final Logger LOGGER = LoggerFactory.getLogger(CachingDataStore.class);
77
78 private static final int DEFAULT_MAX_SIZE = 1024;
79
80 private static final int DEFAULT_MAX_CHANGES = 1024;
81
82 private static final int DEFAULT_MAX_BUFFERED_CHANGES = 1024;
83
84 private static final Record NULL = Record.create();
85
86 private final DataStore delegate;
87
88 private final int maxChanges;
89
90 private final int maxBufferedChanges;
91
92 private final ReadWriteLock globalLock;
93
94 private final Map<URI, Cache<URI, Record>> globalCaches;
95
96 private long globalRevision;
97
98
99
100 private final AtomicLong globalHitCount;
101
102 private final AtomicLong localHitCount;
103
104 private final AtomicLong fetchCount;
105
106 private final AtomicLong changeCount;
107
108 private final AtomicLong flushCount;
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127 public CachingDataStore(final DataStore delegate, @Nullable final Integer maxSize,
128 @Nullable final Integer maxChanges, @Nullable final Integer maxBufferedChanges) {
129
130 final int actualMaxSize = MoreObjects.firstNonNull(maxSize, DEFAULT_MAX_SIZE);
131 final int actualMaxChanges = MoreObjects.firstNonNull(maxChanges, DEFAULT_MAX_CHANGES);
132 final int actualMaxBufferedChanges = MoreObjects.firstNonNull(maxBufferedChanges,
133 DEFAULT_MAX_BUFFERED_CHANGES);
134
135 Preconditions.checkArgument(actualMaxSize > 0);
136 Preconditions.checkArgument(actualMaxChanges > 0);
137
138 this.delegate = Preconditions.checkNotNull(delegate);
139 this.maxChanges = actualMaxChanges;
140 this.maxBufferedChanges = actualMaxBufferedChanges;
141 this.globalLock = new ReentrantReadWriteLock(true);
142 this.globalCaches = Maps.newHashMap();
143 this.globalRevision = 0L;
144 this.globalHitCount = new AtomicLong(0);
145 this.localHitCount = new AtomicLong(0);
146 this.fetchCount = new AtomicLong(0);
147 this.changeCount = new AtomicLong(0);
148 this.flushCount = new AtomicLong(0);
149
150 for (final URI type : DataStore.SUPPORTED_TYPES) {
151
152
153
154 this.globalCaches.put(type,
155 CacheBuilder.newBuilder().softValues().maximumSize(actualMaxSize)
156 .<URI, Record>build());
157 }
158
159 CachingDataStore.LOGGER.info("{} configured", this.getClass().getSimpleName());
160 }
161
162 @Override
163 protected DataStore delegate() {
164 return this.delegate;
165 }
166
167 @Override
168 public DataTransaction begin(final boolean readOnly) throws IOException, IllegalStateException {
169
170
171 CachingDataStore.this.globalLock.readLock().lock();
172 try {
173 final long revision = CachingDataStore.this.globalRevision;
174 final DataTransaction tx = delegate().begin(readOnly);
175 return new CachingDataTransaction(tx, readOnly, revision);
176 } finally {
177 CachingDataStore.this.globalLock.readLock().unlock();
178 }
179 }
180
181 @Override
182 public void close() {
183 try {
184 LOGGER.info("{} - {} local cache hits, {} global cache hits, {} fetches, "
185 + "{} changes, {} flushes", this.getClass().getSimpleName(),
186 this.localHitCount, this.globalHitCount, this.fetchCount, this.changeCount,
187 this.flushCount);
188 } finally {
189 super.close();
190 }
191 }
192
193 private class CachingDataTransaction extends ForwardingDataTransaction {
194
195 private final DataTransaction delegate;
196
197 @Nullable
198 private final Set<URI> dirty;
199
200 @Nullable
201 private final Map<URI, Map<URI, Record>> changes;
202
203 @Nullable
204 private final Map<URI, Set<URI>> invalidated;
205
206 private final Map<URI, Cache<URI, Record>> localCaches;
207
208 private final long localRevision;
209
210 CachingDataTransaction(final DataTransaction delegate, final boolean readOnly,
211 final long revision) {
212
213 this.delegate = Preconditions.checkNotNull(delegate);
214
215 if (readOnly) {
216 this.dirty = null;
217 this.changes = null;
218 this.invalidated = null;
219 } else {
220 this.dirty = Sets.newHashSet();
221 this.changes = Maps.newHashMap();
222 this.invalidated = Maps.newHashMap();
223 for (final URI type : DataStore.SUPPORTED_TYPES) {
224 this.changes.put(type, Maps.<URI, Record>newHashMap());
225 this.invalidated.put(type, Sets.<URI>newHashSet());
226 }
227 }
228
229 this.localRevision = revision;
230 this.localCaches = Maps.newHashMap();
231 for (final URI type : DataStore.SUPPORTED_TYPES) {
232 this.localCaches.put(type, CacheBuilder.newBuilder().softValues()
233 .<URI, Record>build());
234 }
235 }
236
237 @Override
238 protected DataTransaction delegate() {
239 return this.delegate;
240 }
241
242 @Override
243 public Stream<Record> lookup(final URI type, final Set<? extends URI> ids,
244 final Set<? extends URI> properties) throws IOException, IllegalArgumentException,
245 IllegalStateException {
246
247 final long globalRevision = CachingDataStore.this.globalRevision;
248 final Cache<URI, Record> globalCache = CachingDataStore.this.globalCaches.get(type);
249 final Cache<URI, Record> localCache = this.localCaches.get(type);
250 final List<Record> result = Lists.newArrayList();
251
252
253
254
255
256 final boolean mightUseGlobalCache = this.localRevision == globalRevision
257 && (this.dirty == null || !this.dirty.contains(type));
258
259
260 final Set<URI> missingIDs = Sets.newHashSet();
261 for (final URI id : ids) {
262 final Record record = localCache.getIfPresent(id);
263 if (record == null) {
264 missingIDs.add(id);
265 } else if (record != CachingDataStore.NULL) {
266 CachingDataStore.this.localHitCount.incrementAndGet();
267 result.add(Record.create(record, true));
268 }
269 }
270
271
272
273 if (mightUseGlobalCache) {
274 CachingDataStore.this.globalLock.readLock().lock();
275 try {
276 if (this.localRevision == globalRevision) {
277 for (final Iterator<URI> i = missingIDs.iterator(); i.hasNext();) {
278 final URI id = i.next();
279 final Record record = globalCache.getIfPresent(id);
280 if (record != null) {
281 CachingDataStore.this.globalHitCount.incrementAndGet();
282 localCache.put(id, record);
283 result.add(Record.create(record, true));
284 i.remove();
285 }
286 }
287
288 }
289 } finally {
290 CachingDataStore.this.globalLock.readLock().unlock();
291 }
292 }
293
294
295 final List<Record> fetched = missingIDs.isEmpty() ? ImmutableList.<Record>of() :
296 delegate().lookup(type, missingIDs, null).toList();
297 CachingDataStore.this.fetchCount.addAndGet(missingIDs.size());
298
299
300 for (final Record record : fetched) {
301 result.add(Record.create(record, true));
302 localCache.put(record.getID(), record);
303 missingIDs.remove(record.getID());
304 }
305
306
307 for (final URI id : missingIDs) {
308 localCache.put(id, CachingDataStore.NULL);
309 }
310
311
312
313 if (mightUseGlobalCache) {
314 CachingDataStore.this.globalLock.readLock().lock();
315 try {
316 if (this.localRevision == CachingDataStore.this.globalRevision) {
317 for (final Record record : fetched) {
318 globalCache.put(record.getID(), record);
319 }
320 }
321 } finally {
322 CachingDataStore.this.globalLock.readLock().unlock();
323 }
324 }
325
326
327 if (properties != null && !properties.isEmpty()) {
328 for (final Record record : result) {
329 final URI[] projection = properties.toArray(new URI[properties.size()]);
330 record.retain(projection);
331 }
332 }
333
334
335 return Stream.create(result);
336 }
337
338 @Override
339 public Stream<Record> retrieve(final URI type, final XPath condition,
340 final Set<? extends URI> properties) throws IOException, IllegalArgumentException,
341 IllegalStateException {
342
343 if (this.changes != null) {
344 flushChanges(type);
345 }
346
347 return delegate().retrieve(type, condition, properties);
348 }
349
350 @Override
351 public long count(final URI type, final XPath condition) throws IOException,
352 IllegalArgumentException, IllegalStateException {
353
354 if (this.changes != null) {
355 flushChanges(type);
356 }
357
358 return delegate().count(type, condition);
359 }
360
361 @Override
362 public Stream<Record> match(final Map<URI, XPath> conditions,
363 final Map<URI, Set<URI>> ids, final Map<URI, Set<URI>> properties)
364 throws IOException, IllegalStateException {
365
366 if (this.changes != null) {
367 flushChanges(KS.RESOURCE);
368 flushChanges(KS.MENTION);
369 flushChanges(KS.ENTITY);
370 flushChanges(KS.AXIOM);
371 }
372
373 return delegate().match(conditions, ids, properties);
374 }
375
376 @Override
377 public void store(final URI type, final Record record) throws IOException,
378 IllegalStateException {
379 Preconditions.checkState(this.changes != null, "Read-only DataTransaction");
380 registerChange(type, record.getID(), record);
381 }
382
383 @Override
384 public void delete(final URI type, final URI id) throws IOException, IllegalStateException {
385 Preconditions.checkState(this.changes != null, "Read-only DataTransaction");
386 registerChange(type, id, CachingDataStore.NULL);
387 }
388
389 @Override
390 public void end(final boolean commit) throws IOException, IllegalStateException {
391
392
393 if (this.changes == null || !commit) {
394 this.delegate.end(commit);
395 return;
396 }
397
398
399 for (final URI type : DataStore.SUPPORTED_TYPES) {
400 flushChanges(type);
401 }
402
403
404
405
406 CachingDataStore.this.globalLock.writeLock().lock();
407 try {
408 delegate().end(true);
409 ++CachingDataStore.this.globalRevision;
410 for (final URI type : DataStore.SUPPORTED_TYPES) {
411 synchronizeCaches(
412 this.invalidated.get(type),
413 this.localCaches.get(type),
414 CachingDataStore.this.globalCaches.get(type));
415 }
416 } finally {
417 CachingDataStore.this.globalLock.writeLock().unlock();
418 }
419 }
420
421 private void synchronizeCaches(@Nullable final Set<URI> invalidatedIDs,
422 final Cache<URI, Record> localCache, final Cache<URI, Record> globalCache) {
423
424 if (invalidatedIDs == null) {
425 globalCache.invalidateAll();
426 return;
427 }
428
429 globalCache.invalidateAll(invalidatedIDs);
430
431 for (final Map.Entry<URI, Record> entry : localCache.asMap().entrySet()) {
432 final URI id = entry.getKey();
433 final Record record = entry.getValue();
434 if (record != CachingDataStore.NULL) {
435 globalCache.put(id, record);
436 }
437 }
438 }
439
440 private void registerChange(final URI type, final URI id, final Record record)
441 throws IOException {
442
443 assert this.changes != null && this.invalidated != null;
444
445 CachingDataStore.this.changeCount.incrementAndGet();
446
447 this.localCaches.get(type).put(id, record);
448
449 final Map<URI, Record> changeMap = this.changes.get(type);
450 changeMap.put(id, record);
451 if (changeMap.size() > CachingDataStore.this.maxBufferedChanges) {
452 flushChanges(type);
453 }
454
455 final Set<URI> invalidatedIDs = this.invalidated.get(type);
456 if (invalidatedIDs != null) {
457 invalidatedIDs.add(id);
458 if (invalidatedIDs.size() > CachingDataStore.this.maxChanges) {
459 this.invalidated.put(type, null);
460 }
461 }
462 }
463
464 private void flushChanges(final URI type) throws IOException {
465
466 assert this.changes != null && this.invalidated != null;
467
468 final Map<URI, Record> map = this.changes.get(type);
469 if (map.isEmpty()) {
470 return;
471 }
472
473 this.dirty.add(type);
474
475 CachingDataStore.this.flushCount.addAndGet(map.size());
476
477 for (final Map.Entry<URI, Record> entry : map.entrySet()) {
478 final URI id = entry.getKey();
479 final Record record = entry.getValue();
480 if (record == CachingDataStore.NULL) {
481 delegate().delete(type, id);
482 } else {
483 delegate().store(type, record);
484 }
485 }
486 map.clear();
487 }
488
489 }
490
491 }