1   package eu.fbk.knowledgestore.datastore;
2   
3   import java.io.IOException;
4   import java.util.Iterator;
5   import java.util.List;
6   import java.util.Map;
7   import java.util.Set;
8   import java.util.concurrent.atomic.AtomicLong;
9   import java.util.concurrent.locks.ReadWriteLock;
10  import java.util.concurrent.locks.ReentrantReadWriteLock;
11  
12  import javax.annotation.Nullable;
13  
14  import com.google.common.base.MoreObjects;
15  import com.google.common.base.Preconditions;
16  import com.google.common.cache.Cache;
17  import com.google.common.cache.CacheBuilder;
18  import com.google.common.collect.ImmutableList;
19  import com.google.common.collect.Lists;
20  import com.google.common.collect.Maps;
21  import com.google.common.collect.Sets;
22  
23  import org.openrdf.model.URI;
24  import org.slf4j.Logger;
25  import org.slf4j.LoggerFactory;
26  
27  import eu.fbk.knowledgestore.data.Record;
28  import eu.fbk.knowledgestore.data.Stream;
29  import eu.fbk.knowledgestore.data.XPath;
30  import eu.fbk.knowledgestore.vocabulary.KS;
31  
32  // TODO: global cache should store byte[] rather than Record object trees, so that a larger cache
33  // size can be used; this need to move serialization logic (no more Avro-based, please!) in
34  // ks-core
35  
36  /**
37   * A {@code DataStore} wrapper providing a transactional and a global cache for looked up and
38   * modified records.
39   * <p>
40   * This wrapper aims at improving the performances of record lookups (
41   * {@link DataTransaction#lookup(URI, Set, Set) lookup} calls) and modifications (
42   * {@link DataTransaction#store(URI, Record) store} and {@link DataTransaction#delete(URI, URI)
43   * delete} calls) through a two-level caching mechanism.
44   * </p>
45   * <p>
46   * A global cache holds records previously looked up by transactions, up to a configurable
47   * {@code maxSize} number of records for each record type. This cache provides optimal
48   * performances in a read-only load. However, in presence of read-write transactions a record may
49   * have multiple versions (one committed and other locally modified in active transactions),
50   * therefore it may be not possible for all the transactions to look up / place data in the global
51   * cache (this is enforced via a revision number mechanism).
52   * </p>
53   * <p>
54   * To overcome the limits of the global cache with read-write transactions, each transaction is
55   * also given a local cache that stores records looked up and modified locally by the transaction;
56   * this cache is synchronized with the global cache upon a successful commit (either by copying
57   * modified records or invalidating them). Note that synchronization is possible only if the
58   * complete set of records modified by the transaction is known. When this is impossible because
59   * the transaction modified more than {@code maxChanges} records to allow them to be stored in
60   * memory, then an invalidation of the global cache is mandatory in order to avoid dirty reads
61   * (this degrades performances!). The local cache is not just used for lookups but also to
62   * implement a write-back mechanism, where up to {@code maxBufferedChanges} records modified by
63   * the transaction are kept locally and flushed to the underlying data store only when strictly
64   * necessary. More precisely, changes are flushed at commit time and every time operations
65   * {@link DataTransaction#retrieve(URI, XPath, Set) retrieve},
66   * {@link DataTransaction#count(URI, XPath) count} and
67   * {@link DataTransaction#match(Map, Map, Map) match} are called.
68   * </p>
69   * <p>
70   * Some statistics about the number of cache hits (local and global caches), fetches, changes and
71   * flushes are logged at close time.
72   * </p>
73   */
74  public class CachingDataStore extends ForwardingDataStore {
75  
76      private static final Logger LOGGER = LoggerFactory.getLogger(CachingDataStore.class);
77  
78      private static final int DEFAULT_MAX_SIZE = 1024;
79  
80      private static final int DEFAULT_MAX_CHANGES = 1024;
81  
82      private static final int DEFAULT_MAX_BUFFERED_CHANGES = 1024;
83  
84      private static final Record NULL = Record.create();
85  
86      private final DataStore delegate;
87  
88      private final int maxChanges;
89  
90      private final int maxBufferedChanges;
91  
92      private final ReadWriteLock globalLock;
93  
94      private final Map<URI, Cache<URI, Record>> globalCaches;
95  
96      private long globalRevision;
97  
98      // Counters for statistics
99  
100     private final AtomicLong globalHitCount;
101 
102     private final AtomicLong localHitCount;
103 
104     private final AtomicLong fetchCount;
105 
106     private final AtomicLong changeCount;
107 
108     private final AtomicLong flushCount;
109 
110     /**
111      * Creates a new instance for the wrapped {@code DataStore} specified.
112      * 
113      * @param delegate
114      *            the wrapped {@code DataStore}
115      * @param maxSize
116      *            the maximum size of global per-record-type caches ( number of records); if null
117      *            defaults to 1024
118      * @param maxChanges
119      *            the max number (per-type) of records that a transaction can change before
120      *            modification tracking is aborted forcing the invalidation of global caches upon
121      *            commit; if null defaults to 1024
122      * @param maxBufferedChanges
123      *            the max number (per-type) of records changed by a transactions that are buffered
124      *            locally, before being flushed to the underlying {@code DataTransaction}; if null
125      *            defaults to 1024
126      */
127     public CachingDataStore(final DataStore delegate, @Nullable final Integer maxSize,
128             @Nullable final Integer maxChanges, @Nullable final Integer maxBufferedChanges) {
129 
130         final int actualMaxSize = MoreObjects.firstNonNull(maxSize, DEFAULT_MAX_SIZE);
131         final int actualMaxChanges = MoreObjects.firstNonNull(maxChanges, DEFAULT_MAX_CHANGES);
132         final int actualMaxBufferedChanges = MoreObjects.firstNonNull(maxBufferedChanges,
133                 DEFAULT_MAX_BUFFERED_CHANGES);
134 
135         Preconditions.checkArgument(actualMaxSize > 0);
136         Preconditions.checkArgument(actualMaxChanges > 0);
137 
138         this.delegate = Preconditions.checkNotNull(delegate);
139         this.maxChanges = actualMaxChanges;
140         this.maxBufferedChanges = actualMaxBufferedChanges;
141         this.globalLock = new ReentrantReadWriteLock(true);
142         this.globalCaches = Maps.newHashMap();
143         this.globalRevision = 0L;
144         this.globalHitCount = new AtomicLong(0);
145         this.localHitCount = new AtomicLong(0);
146         this.fetchCount = new AtomicLong(0);
147         this.changeCount = new AtomicLong(0);
148         this.flushCount = new AtomicLong(0);
149 
150         for (final URI type : DataStore.SUPPORTED_TYPES) {
151             // Original setting (may cause OutOfMemory if maximimum value is inappropriate
152             // this.globalCaches.put(type, CacheBuilder.newBuilder().maximumSize(actualMaxSize)
153             // .<URI, Record>build());
154             this.globalCaches.put(type,
155                     CacheBuilder.newBuilder().softValues().maximumSize(actualMaxSize)
156                             .<URI, Record>build());
157         }
158 
159         CachingDataStore.LOGGER.info("{} configured", this.getClass().getSimpleName());
160     }
161 
162     @Override
163     protected DataStore delegate() {
164         return this.delegate;
165     }
166 
167     @Override
168     public DataTransaction begin(final boolean readOnly) throws IOException, IllegalStateException {
169 
170         // Need to acquire an exclusive lock to prevent commits in the meanwhile
171         CachingDataStore.this.globalLock.readLock().lock();
172         try {
173             final long revision = CachingDataStore.this.globalRevision;
174             final DataTransaction tx = delegate().begin(readOnly);
175             return new CachingDataTransaction(tx, readOnly, revision);
176         } finally {
177             CachingDataStore.this.globalLock.readLock().unlock();
178         }
179     }
180 
181     @Override
182     public void close() {
183         try {
184             LOGGER.info("{} - {} local cache hits, {} global cache hits, {} fetches, "
185                     + "{} changes, {} flushes", this.getClass().getSimpleName(),
186                     this.localHitCount, this.globalHitCount, this.fetchCount, this.changeCount,
187                     this.flushCount);
188         } finally {
189             super.close();
190         }
191     }
192 
193     private class CachingDataTransaction extends ForwardingDataTransaction {
194 
195         private final DataTransaction delegate;
196 
197         @Nullable
198         private final Set<URI> dirty; // contains types for which a flush has been done
199 
200         @Nullable
201         private final Map<URI, Map<URI, Record>> changes; // null if read-only
202 
203         @Nullable
204         private final Map<URI, Set<URI>> invalidated; // null if read-only
205 
206         private final Map<URI, Cache<URI, Record>> localCaches;
207 
208         private final long localRevision;
209 
210         CachingDataTransaction(final DataTransaction delegate, final boolean readOnly,
211                 final long revision) {
212 
213             this.delegate = Preconditions.checkNotNull(delegate);
214 
215             if (readOnly) {
216                 this.dirty = null;
217                 this.changes = null;
218                 this.invalidated = null;
219             } else {
220                 this.dirty = Sets.newHashSet();
221                 this.changes = Maps.newHashMap();
222                 this.invalidated = Maps.newHashMap();
223                 for (final URI type : DataStore.SUPPORTED_TYPES) {
224                     this.changes.put(type, Maps.<URI, Record>newHashMap());
225                     this.invalidated.put(type, Sets.<URI>newHashSet());
226                 }
227             }
228 
229             this.localRevision = revision;
230             this.localCaches = Maps.newHashMap();
231             for (final URI type : DataStore.SUPPORTED_TYPES) {
232                 this.localCaches.put(type, CacheBuilder.newBuilder().softValues()
233                         .<URI, Record>build());
234             }
235         }
236 
237         @Override
238         protected DataTransaction delegate() {
239             return this.delegate;
240         }
241 
242         @Override
243         public Stream<Record> lookup(final URI type, final Set<? extends URI> ids,
244                 final Set<? extends URI> properties) throws IOException, IllegalArgumentException,
245                 IllegalStateException {
246 
247             final long globalRevision = CachingDataStore.this.globalRevision;
248             final Cache<URI, Record> globalCache = CachingDataStore.this.globalCaches.get(type);
249             final Cache<URI, Record> localCache = this.localCaches.get(type);
250             final List<Record> result = Lists.newArrayList();
251 
252             // Determine if there is a chance to use the global cache.
253             // The global cache cannot be used if we lost track of what we changed in current
254             // transaction, or if it got polluted with changes from other concurrent transaction
255             // (based on revision number)
256             final boolean mightUseGlobalCache = this.localRevision == globalRevision
257                     && (this.dirty == null || !this.dirty.contains(type));
258 
259             // Lookup in local cache
260             final Set<URI> missingIDs = Sets.newHashSet();
261             for (final URI id : ids) {
262                 final Record record = localCache.getIfPresent(id);
263                 if (record == null) {
264                     missingIDs.add(id);
265                 } else if (record != CachingDataStore.NULL) {
266                     CachingDataStore.this.localHitCount.incrementAndGet();
267                     result.add(Record.create(record, true)); // clone to preserve cached one
268                 }
269             }
270 
271             // Lookup in global cache, if possible. Need to check revision number holding a shared
272             // lock on the global cache
273             if (mightUseGlobalCache) {
274                 CachingDataStore.this.globalLock.readLock().lock();
275                 try {
276                     if (this.localRevision == globalRevision) {
277                         for (final Iterator<URI> i = missingIDs.iterator(); i.hasNext();) {
278                             final URI id = i.next();
279                             final Record record = globalCache.getIfPresent(id);
280                             if (record != null) {
281                                 CachingDataStore.this.globalHitCount.incrementAndGet();
282                                 localCache.put(id, record); // propagate to local cache
283                                 result.add(Record.create(record, true)); // clone record
284                                 i.remove(); // ID no more missing
285                             }
286                         }
287 
288                     }
289                 } finally {
290                     CachingDataStore.this.globalLock.readLock().unlock();
291                 }
292             }
293 
294             // Fetch missing records (possibly NOP)
295             final List<Record> fetched = missingIDs.isEmpty() ? ImmutableList.<Record>of() : //
296                     delegate().lookup(type, missingIDs, null).toList();
297             CachingDataStore.this.fetchCount.addAndGet(missingIDs.size());
298 
299             // Add fetched records to result (cloning them) and to local cache; update missing IDs
300             for (final Record record : fetched) {
301                 result.add(Record.create(record, true));
302                 localCache.put(record.getID(), record);
303                 missingIDs.remove(record.getID());
304             }
305 
306             // Non-existing records are also tracked in local cache for efficiency reasons
307             for (final URI id : missingIDs) {
308                 localCache.put(id, CachingDataStore.NULL);
309             }
310 
311             // If possible, fetched data is also put in the global cache. To access it, need to
312             // acquire a shared lock and check again the revision number.
313             if (mightUseGlobalCache) {
314                 CachingDataStore.this.globalLock.readLock().lock();
315                 try {
316                     if (this.localRevision == CachingDataStore.this.globalRevision) {
317                         for (final Record record : fetched) {
318                             globalCache.put(record.getID(), record);
319                         }
320                     }
321                 } finally {
322                     CachingDataStore.this.globalLock.readLock().unlock();
323                 }
324             }
325 
326             // All the data is here. Perform projection, if required
327             if (properties != null && !properties.isEmpty()) {
328                 for (final Record record : result) {
329                     final URI[] projection = properties.toArray(new URI[properties.size()]);
330                     record.retain(projection);
331                 }
332             }
333 
334             // Return a stream over the requested records
335             return Stream.create(result);
336         }
337 
338         @Override
339         public Stream<Record> retrieve(final URI type, final XPath condition,
340                 final Set<? extends URI> properties) throws IOException, IllegalArgumentException,
341                 IllegalStateException {
342 
343             if (this.changes != null) {
344                 flushChanges(type);
345             }
346 
347             return delegate().retrieve(type, condition, properties);
348         }
349 
350         @Override
351         public long count(final URI type, final XPath condition) throws IOException,
352                 IllegalArgumentException, IllegalStateException {
353 
354             if (this.changes != null) {
355                 flushChanges(type);
356             }
357 
358             return delegate().count(type, condition);
359         }
360 
361         @Override
362         public Stream<Record> match(final Map<URI, XPath> conditions,
363                 final Map<URI, Set<URI>> ids, final Map<URI, Set<URI>> properties)
364                 throws IOException, IllegalStateException {
365 
366             if (this.changes != null) {
367                 flushChanges(KS.RESOURCE);
368                 flushChanges(KS.MENTION);
369                 flushChanges(KS.ENTITY);
370                 flushChanges(KS.AXIOM);
371             }
372 
373             return delegate().match(conditions, ids, properties);
374         }
375 
376         @Override
377         public void store(final URI type, final Record record) throws IOException,
378                 IllegalStateException {
379             Preconditions.checkState(this.changes != null, "Read-only DataTransaction");
380             registerChange(type, record.getID(), record);
381         }
382 
383         @Override
384         public void delete(final URI type, final URI id) throws IOException, IllegalStateException {
385             Preconditions.checkState(this.changes != null, "Read-only DataTransaction");
386             registerChange(type, id, CachingDataStore.NULL);
387         }
388 
389         @Override
390         public void end(final boolean commit) throws IOException, IllegalStateException {
391 
392             // Simply delegate if read-only or on rollback
393             if (this.changes == null || !commit) {
394                 this.delegate.end(commit);
395                 return;
396             }
397 
398             // On read/write commit, start by flushing pending changes
399             for (final URI type : DataStore.SUPPORTED_TYPES) {
400                 flushChanges(type);
401             }
402 
403             // Then perform the commit and synchronize the global cache by holding an exclusive
404             // lock, so to properly handle revision numbers. Pre-existing transactions will be
405             // forced to stop using the global cache.
406             CachingDataStore.this.globalLock.writeLock().lock();
407             try {
408                 delegate().end(true);
409                 ++CachingDataStore.this.globalRevision;
410                 for (final URI type : DataStore.SUPPORTED_TYPES) {
411                     synchronizeCaches(//
412                             this.invalidated.get(type), //
413                             this.localCaches.get(type), //
414                             CachingDataStore.this.globalCaches.get(type));
415                 }
416             } finally {
417                 CachingDataStore.this.globalLock.writeLock().unlock();
418             }
419         }
420 
421         private void synchronizeCaches(@Nullable final Set<URI> invalidatedIDs,
422                 final Cache<URI, Record> localCache, final Cache<URI, Record> globalCache) {
423 
424             if (invalidatedIDs == null) {
425                 globalCache.invalidateAll();
426                 return;
427             }
428 
429             globalCache.invalidateAll(invalidatedIDs);
430 
431             for (final Map.Entry<URI, Record> entry : localCache.asMap().entrySet()) {
432                 final URI id = entry.getKey();
433                 final Record record = entry.getValue();
434                 if (record != CachingDataStore.NULL) {
435                     globalCache.put(id, record);
436                 }
437             }
438         }
439 
440         private void registerChange(final URI type, final URI id, final Record record)
441                 throws IOException {
442 
443             assert this.changes != null && this.invalidated != null; // need read/write tx
444 
445             CachingDataStore.this.changeCount.incrementAndGet();
446 
447             this.localCaches.get(type).put(id, record);
448 
449             final Map<URI, Record> changeMap = this.changes.get(type);
450             changeMap.put(id, record);
451             if (changeMap.size() > CachingDataStore.this.maxBufferedChanges) {
452                 flushChanges(type);
453             }
454 
455             final Set<URI> invalidatedIDs = this.invalidated.get(type);
456             if (invalidatedIDs != null) {
457                 invalidatedIDs.add(id);
458                 if (invalidatedIDs.size() > CachingDataStore.this.maxChanges) {
459                     this.invalidated.put(type, null);
460                 }
461             }
462         }
463 
464         private void flushChanges(final URI type) throws IOException {
465 
466             assert this.changes != null && this.invalidated != null; // need read/write tx
467 
468             final Map<URI, Record> map = this.changes.get(type);
469             if (map.isEmpty()) {
470                 return;
471             }
472 
473             this.dirty.add(type);
474 
475             CachingDataStore.this.flushCount.addAndGet(map.size());
476 
477             for (final Map.Entry<URI, Record> entry : map.entrySet()) {
478                 final URI id = entry.getKey();
479                 final Record record = entry.getValue();
480                 if (record == CachingDataStore.NULL) {
481                     delegate().delete(type, id);
482                 } else {
483                     delegate().store(type, record);
484                 }
485             }
486             map.clear();
487         }
488 
489     }
490 
491 }