1   package eu.fbk.knowledgestore.datastore;
2   
3   import java.io.IOException;
4   import java.util.Iterator;
5   import java.util.List;
6   import java.util.Map;
7   import java.util.Set;
8   import java.util.concurrent.atomic.AtomicLong;
9   import java.util.concurrent.locks.ReadWriteLock;
10  import java.util.concurrent.locks.ReentrantReadWriteLock;
11  
12  import javax.annotation.Nullable;
13  
14  import com.google.common.base.MoreObjects;
15  import com.google.common.base.Preconditions;
16  import com.google.common.cache.Cache;
17  import com.google.common.cache.CacheBuilder;
18  import com.google.common.collect.ImmutableList;
19  import com.google.common.collect.Lists;
20  import com.google.common.collect.Maps;
21  import com.google.common.collect.Sets;
22  
23  import org.openrdf.model.URI;
24  import org.slf4j.Logger;
25  import org.slf4j.LoggerFactory;
26  
27  import eu.fbk.knowledgestore.data.Record;
28  import eu.fbk.knowledgestore.data.Stream;
29  import eu.fbk.knowledgestore.data.XPath;
30  import eu.fbk.knowledgestore.vocabulary.KS;
31  
32  
33  
34  
35  
36  
37  
38  
39  
40  
41  
42  
43  
44  
45  
46  
47  
48  
49  
50  
51  
52  
53  
54  
55  
56  
57  
58  
59  
60  
61  
62  
63  
64  
65  
66  
67  
68  
69  
70  
71  
72  
73  
74  public class CachingDataStore extends ForwardingDataStore {
75  
76      private static final Logger LOGGER = LoggerFactory.getLogger(CachingDataStore.class);
77  
78      private static final int DEFAULT_MAX_SIZE = 1024;
79  
80      private static final int DEFAULT_MAX_CHANGES = 1024;
81  
82      private static final int DEFAULT_MAX_BUFFERED_CHANGES = 1024;
83  
84      private static final Record NULL = Record.create();
85  
86      private final DataStore delegate;
87  
88      private final int maxChanges;
89  
90      private final int maxBufferedChanges;
91  
92      private final ReadWriteLock globalLock;
93  
94      private final Map<URI, Cache<URI, Record>> globalCaches;
95  
96      private long globalRevision;
97  
98      
99  
100     private final AtomicLong globalHitCount;
101 
102     private final AtomicLong localHitCount;
103 
104     private final AtomicLong fetchCount;
105 
106     private final AtomicLong changeCount;
107 
108     private final AtomicLong flushCount;
109 
110     
111 
112 
113 
114 
115 
116 
117 
118 
119 
120 
121 
122 
123 
124 
125 
126 
127     public CachingDataStore(final DataStore delegate, @Nullable final Integer maxSize,
128             @Nullable final Integer maxChanges, @Nullable final Integer maxBufferedChanges) {
129 
130         final int actualMaxSize = MoreObjects.firstNonNull(maxSize, DEFAULT_MAX_SIZE);
131         final int actualMaxChanges = MoreObjects.firstNonNull(maxChanges, DEFAULT_MAX_CHANGES);
132         final int actualMaxBufferedChanges = MoreObjects.firstNonNull(maxBufferedChanges,
133                 DEFAULT_MAX_BUFFERED_CHANGES);
134 
135         Preconditions.checkArgument(actualMaxSize > 0);
136         Preconditions.checkArgument(actualMaxChanges > 0);
137 
138         this.delegate = Preconditions.checkNotNull(delegate);
139         this.maxChanges = actualMaxChanges;
140         this.maxBufferedChanges = actualMaxBufferedChanges;
141         this.globalLock = new ReentrantReadWriteLock(true);
142         this.globalCaches = Maps.newHashMap();
143         this.globalRevision = 0L;
144         this.globalHitCount = new AtomicLong(0);
145         this.localHitCount = new AtomicLong(0);
146         this.fetchCount = new AtomicLong(0);
147         this.changeCount = new AtomicLong(0);
148         this.flushCount = new AtomicLong(0);
149 
150         for (final URI type : DataStore.SUPPORTED_TYPES) {
151             
152             
153             
154             this.globalCaches.put(type,
155                     CacheBuilder.newBuilder().softValues().maximumSize(actualMaxSize)
156                             .<URI, Record>build());
157         }
158 
159         CachingDataStore.LOGGER.info("{} configured", this.getClass().getSimpleName());
160     }
161 
162     @Override
163     protected DataStore delegate() {
164         return this.delegate;
165     }
166 
167     @Override
168     public DataTransaction begin(final boolean readOnly) throws IOException, IllegalStateException {
169 
170         
171         CachingDataStore.this.globalLock.readLock().lock();
172         try {
173             final long revision = CachingDataStore.this.globalRevision;
174             final DataTransaction tx = delegate().begin(readOnly);
175             return new CachingDataTransaction(tx, readOnly, revision);
176         } finally {
177             CachingDataStore.this.globalLock.readLock().unlock();
178         }
179     }
180 
181     @Override
182     public void close() {
183         try {
184             LOGGER.info("{} - {} local cache hits, {} global cache hits, {} fetches, "
185                     + "{} changes, {} flushes", this.getClass().getSimpleName(),
186                     this.localHitCount, this.globalHitCount, this.fetchCount, this.changeCount,
187                     this.flushCount);
188         } finally {
189             super.close();
190         }
191     }
192 
193     private class CachingDataTransaction extends ForwardingDataTransaction {
194 
195         private final DataTransaction delegate;
196 
197         @Nullable
198         private final Set<URI> dirty; 
199 
200         @Nullable
201         private final Map<URI, Map<URI, Record>> changes; 
202 
203         @Nullable
204         private final Map<URI, Set<URI>> invalidated; 
205 
206         private final Map<URI, Cache<URI, Record>> localCaches;
207 
208         private final long localRevision;
209 
210         CachingDataTransaction(final DataTransaction delegate, final boolean readOnly,
211                 final long revision) {
212 
213             this.delegate = Preconditions.checkNotNull(delegate);
214 
215             if (readOnly) {
216                 this.dirty = null;
217                 this.changes = null;
218                 this.invalidated = null;
219             } else {
220                 this.dirty = Sets.newHashSet();
221                 this.changes = Maps.newHashMap();
222                 this.invalidated = Maps.newHashMap();
223                 for (final URI type : DataStore.SUPPORTED_TYPES) {
224                     this.changes.put(type, Maps.<URI, Record>newHashMap());
225                     this.invalidated.put(type, Sets.<URI>newHashSet());
226                 }
227             }
228 
229             this.localRevision = revision;
230             this.localCaches = Maps.newHashMap();
231             for (final URI type : DataStore.SUPPORTED_TYPES) {
232                 this.localCaches.put(type, CacheBuilder.newBuilder().softValues()
233                         .<URI, Record>build());
234             }
235         }
236 
237         @Override
238         protected DataTransaction delegate() {
239             return this.delegate;
240         }
241 
242         @Override
243         public Stream<Record> lookup(final URI type, final Set<? extends URI> ids,
244                 final Set<? extends URI> properties) throws IOException, IllegalArgumentException,
245                 IllegalStateException {
246 
247             final long globalRevision = CachingDataStore.this.globalRevision;
248             final Cache<URI, Record> globalCache = CachingDataStore.this.globalCaches.get(type);
249             final Cache<URI, Record> localCache = this.localCaches.get(type);
250             final List<Record> result = Lists.newArrayList();
251 
252             
253             
254             
255             
256             final boolean mightUseGlobalCache = this.localRevision == globalRevision
257                     && (this.dirty == null || !this.dirty.contains(type));
258 
259             
260             final Set<URI> missingIDs = Sets.newHashSet();
261             for (final URI id : ids) {
262                 final Record record = localCache.getIfPresent(id);
263                 if (record == null) {
264                     missingIDs.add(id);
265                 } else if (record != CachingDataStore.NULL) {
266                     CachingDataStore.this.localHitCount.incrementAndGet();
267                     result.add(Record.create(record, true)); 
268                 }
269             }
270 
271             
272             
273             if (mightUseGlobalCache) {
274                 CachingDataStore.this.globalLock.readLock().lock();
275                 try {
276                     if (this.localRevision == globalRevision) {
277                         for (final Iterator<URI> i = missingIDs.iterator(); i.hasNext();) {
278                             final URI id = i.next();
279                             final Record record = globalCache.getIfPresent(id);
280                             if (record != null) {
281                                 CachingDataStore.this.globalHitCount.incrementAndGet();
282                                 localCache.put(id, record); 
283                                 result.add(Record.create(record, true)); 
284                                 i.remove(); 
285                             }
286                         }
287 
288                     }
289                 } finally {
290                     CachingDataStore.this.globalLock.readLock().unlock();
291                 }
292             }
293 
294             
295             final List<Record> fetched = missingIDs.isEmpty() ? ImmutableList.<Record>of() : 
296                     delegate().lookup(type, missingIDs, null).toList();
297             CachingDataStore.this.fetchCount.addAndGet(missingIDs.size());
298 
299             
300             for (final Record record : fetched) {
301                 result.add(Record.create(record, true));
302                 localCache.put(record.getID(), record);
303                 missingIDs.remove(record.getID());
304             }
305 
306             
307             for (final URI id : missingIDs) {
308                 localCache.put(id, CachingDataStore.NULL);
309             }
310 
311             
312             
313             if (mightUseGlobalCache) {
314                 CachingDataStore.this.globalLock.readLock().lock();
315                 try {
316                     if (this.localRevision == CachingDataStore.this.globalRevision) {
317                         for (final Record record : fetched) {
318                             globalCache.put(record.getID(), record);
319                         }
320                     }
321                 } finally {
322                     CachingDataStore.this.globalLock.readLock().unlock();
323                 }
324             }
325 
326             
327             if (properties != null && !properties.isEmpty()) {
328                 for (final Record record : result) {
329                     final URI[] projection = properties.toArray(new URI[properties.size()]);
330                     record.retain(projection);
331                 }
332             }
333 
334             
335             return Stream.create(result);
336         }
337 
338         @Override
339         public Stream<Record> retrieve(final URI type, final XPath condition,
340                 final Set<? extends URI> properties) throws IOException, IllegalArgumentException,
341                 IllegalStateException {
342 
343             if (this.changes != null) {
344                 flushChanges(type);
345             }
346 
347             return delegate().retrieve(type, condition, properties);
348         }
349 
350         @Override
351         public long count(final URI type, final XPath condition) throws IOException,
352                 IllegalArgumentException, IllegalStateException {
353 
354             if (this.changes != null) {
355                 flushChanges(type);
356             }
357 
358             return delegate().count(type, condition);
359         }
360 
361         @Override
362         public Stream<Record> match(final Map<URI, XPath> conditions,
363                 final Map<URI, Set<URI>> ids, final Map<URI, Set<URI>> properties)
364                 throws IOException, IllegalStateException {
365 
366             if (this.changes != null) {
367                 flushChanges(KS.RESOURCE);
368                 flushChanges(KS.MENTION);
369                 flushChanges(KS.ENTITY);
370                 flushChanges(KS.AXIOM);
371             }
372 
373             return delegate().match(conditions, ids, properties);
374         }
375 
376         @Override
377         public void store(final URI type, final Record record) throws IOException,
378                 IllegalStateException {
379             Preconditions.checkState(this.changes != null, "Read-only DataTransaction");
380             registerChange(type, record.getID(), record);
381         }
382 
383         @Override
384         public void delete(final URI type, final URI id) throws IOException, IllegalStateException {
385             Preconditions.checkState(this.changes != null, "Read-only DataTransaction");
386             registerChange(type, id, CachingDataStore.NULL);
387         }
388 
389         @Override
390         public void end(final boolean commit) throws IOException, IllegalStateException {
391 
392             
393             if (this.changes == null || !commit) {
394                 this.delegate.end(commit);
395                 return;
396             }
397 
398             
399             for (final URI type : DataStore.SUPPORTED_TYPES) {
400                 flushChanges(type);
401             }
402 
403             
404             
405             
406             CachingDataStore.this.globalLock.writeLock().lock();
407             try {
408                 delegate().end(true);
409                 ++CachingDataStore.this.globalRevision;
410                 for (final URI type : DataStore.SUPPORTED_TYPES) {
411                     synchronizeCaches(
412                             this.invalidated.get(type), 
413                             this.localCaches.get(type), 
414                             CachingDataStore.this.globalCaches.get(type));
415                 }
416             } finally {
417                 CachingDataStore.this.globalLock.writeLock().unlock();
418             }
419         }
420 
421         private void synchronizeCaches(@Nullable final Set<URI> invalidatedIDs,
422                 final Cache<URI, Record> localCache, final Cache<URI, Record> globalCache) {
423 
424             if (invalidatedIDs == null) {
425                 globalCache.invalidateAll();
426                 return;
427             }
428 
429             globalCache.invalidateAll(invalidatedIDs);
430 
431             for (final Map.Entry<URI, Record> entry : localCache.asMap().entrySet()) {
432                 final URI id = entry.getKey();
433                 final Record record = entry.getValue();
434                 if (record != CachingDataStore.NULL) {
435                     globalCache.put(id, record);
436                 }
437             }
438         }
439 
440         private void registerChange(final URI type, final URI id, final Record record)
441                 throws IOException {
442 
443             assert this.changes != null && this.invalidated != null; 
444 
445             CachingDataStore.this.changeCount.incrementAndGet();
446 
447             this.localCaches.get(type).put(id, record);
448 
449             final Map<URI, Record> changeMap = this.changes.get(type);
450             changeMap.put(id, record);
451             if (changeMap.size() > CachingDataStore.this.maxBufferedChanges) {
452                 flushChanges(type);
453             }
454 
455             final Set<URI> invalidatedIDs = this.invalidated.get(type);
456             if (invalidatedIDs != null) {
457                 invalidatedIDs.add(id);
458                 if (invalidatedIDs.size() > CachingDataStore.this.maxChanges) {
459                     this.invalidated.put(type, null);
460                 }
461             }
462         }
463 
464         private void flushChanges(final URI type) throws IOException {
465 
466             assert this.changes != null && this.invalidated != null; 
467 
468             final Map<URI, Record> map = this.changes.get(type);
469             if (map.isEmpty()) {
470                 return;
471             }
472 
473             this.dirty.add(type);
474 
475             CachingDataStore.this.flushCount.addAndGet(map.size());
476 
477             for (final Map.Entry<URI, Record> entry : map.entrySet()) {
478                 final URI id = entry.getKey();
479                 final Record record = entry.getValue();
480                 if (record == CachingDataStore.NULL) {
481                     delegate().delete(type, id);
482                 } else {
483                     delegate().store(type, record);
484                 }
485             }
486             map.clear();
487         }
488 
489     }
490 
491 }