1   package eu.fbk.knowledgestore.filestore;
2   
3   import java.io.ByteArrayInputStream;
4   import java.io.ByteArrayOutputStream;
5   import java.io.DataInputStream;
6   import java.io.DataOutputStream;
7   import java.io.EOFException;
8   import java.io.IOException;
9   import java.io.InputStream;
10  import java.io.OutputStream;
11  import java.util.List;
12  import java.util.Objects;
13  import java.util.Set;
14  import java.util.concurrent.Future;
15  import java.util.concurrent.TimeUnit;
16  import java.util.concurrent.atomic.AtomicBoolean;
17  import java.util.concurrent.locks.ReadWriteLock;
18  import java.util.concurrent.locks.ReentrantReadWriteLock;
19  
20  import javax.annotation.Nullable;
21  
22  import com.google.common.base.Charsets;
23  import com.google.common.base.Function;
24  import com.google.common.base.Joiner;
25  import com.google.common.base.Throwables;
26  import com.google.common.collect.Lists;
27  import com.google.common.collect.Ordering;
28  import com.google.common.collect.Sets;
29  import com.google.common.io.ByteStreams;
30  
31  import org.slf4j.Logger;
32  import org.slf4j.LoggerFactory;
33  
34  import eu.fbk.knowledgestore.data.Data;
35  import eu.fbk.knowledgestore.data.Stream;
36  import eu.fbk.rdfpro.util.Hash;
37  import eu.fbk.rdfpro.util.IO;
38  
39  public class MultiFileStore extends ForwardingFileStore {
40  
41      private static final Logger LOGGER = LoggerFactory.getLogger(MultiFileStore.class);
42  
43      private static final int DEFAULT_BUCKET_SIZE = 10;
44  
45      private static final String INDEX_FILENAME = "mfs.idx.bin";
46  
47      private static final String BUCKET_PREFIX = "mfs.b";
48  
49      private static final String BUCKET_EXT = ".bin";
50  
51      private static final long MERGE_PERIOD = 5000L; // ms
52  
53      private final FileStore delegate;
54  
55      private final int bucketSize;
56  
57      private final ReadWriteLock lock;
58  
59      private Future<?> mergeFuture;
60  
61      private Index index;
62  
63      private boolean dirty;
64  
65      public MultiFileStore(final FileStore delegate, @Nullable final Integer bucketSize) {
66          this.delegate = Objects.requireNonNull(delegate);
67          this.bucketSize = bucketSize != null ? bucketSize : DEFAULT_BUCKET_SIZE;
68          this.lock = new ReentrantReadWriteLock(true);
69          this.mergeFuture = null;
70          this.index = null;
71          this.dirty = false;
72          LOGGER.info("{} configured, bucket size = {}", getClass().getSimpleName(), this.bucketSize);
73      }
74  
75      @Override
76      protected FileStore delegate() {
77          return this.delegate;
78      }
79  
80      @Override
81      public void init() throws IOException, IllegalStateException {
82  
83          // Initialize underlying file store
84          super.init();
85  
86          try {
87              // Initialize index
88              this.index = new Index();
89              try {
90                  // If a previously saved index file is available, populate the index using it
91                  try (final InputStream stream = this.delegate.read(INDEX_FILENAME)) {
92                      LOGGER.debug("Loading saved index from {}", INDEX_FILENAME);
93                      this.index.read(stream);
94                      this.dirty = false;
95                  }
96  
97              } catch (final FileMissingException ex) {
98                  // Otherwise, populate the index by scanning files in the underlying file store
99                  LOGGER.debug("Creating index (no file {} found)", INDEX_FILENAME);
100                 for (final String filename : this.delegate.list()) {
101                     if (!filename.startsWith(BUCKET_PREFIX)) {
102                         this.index.initBucketForFilename(filename, -1);
103                     } else {
104                         final int bucket = Integer.parseInt(filename.substring(
105                                 BUCKET_PREFIX.length(), filename.length() - BUCKET_EXT.length()));
106                         this.index.allocateBucket(bucket);
107                         try (InputStream stream = this.delegate.read(filename)) {
108                             Entry entry;
109                             while ((entry = Entry.read(stream)) != null) {
110                                 this.index.initBucketForFilename(entry.getFilename(), bucket);
111                             }
112                         }
113                     }
114                 }
115                 merge(); // merge if necessary
116                 this.dirty = true; // index file will be created at close time
117             }
118 
119             // Log status
120             if (LOGGER.isDebugEnabled()) {
121                 final int numFilenames = this.index.countFilenames();
122                 final int numStandaloneFilenames = this.index.getStandaloneFilenames().size();
123                 final int numBuckets = this.index.countAllocatedBuckets();
124                 LOGGER.debug("{} files ({} in {} buckets, {} standalone)", numFilenames,
125                         numFilenames - numStandaloneFilenames, numBuckets, numStandaloneFilenames);
126             }
127 
128             // Schedule a periodic merge task
129             LOGGER.debug("Scheduling merge task every {} ms", MERGE_PERIOD);
130             this.mergeFuture = Data.getExecutor().scheduleWithFixedDelay(() -> {
131                 try {
132                     merge();
133                 } catch (final Throwable ex) {
134                     LOGGER.error("Merge failed", ex);
135                 }
136             }, MERGE_PERIOD, MERGE_PERIOD, TimeUnit.MILLISECONDS);
137 
138         } catch (final Throwable ex) {
139             // Close underlying file store and propagate
140             IO.closeQuietly(this.delegate);
141             Throwables.propagateIfPossible(ex, IOException.class);
142             Throwables.propagate(ex);
143         }
144     }
145 
146     @Override
147     public InputStream read(final String filename) throws FileMissingException, IOException {
148 
149         // This prevents concurrent write/delete/merge operations to occur
150         this.lock.readLock().lock();
151 
152         try {
153             // Lookup the bucket for the filename in the index; throw error if file not exist
154             final int bucket = this.index.getBucketForFilename(filename, true);
155 
156             // Either read a singleton file (delegating) or read from bucket. In both cases, we
157             // read all the data in advance so that there are no pending read operation to take
158             // into consideration when doing write/delete/merge
159             if (bucket < 0) {
160                 LOGGER.debug("Reading {} from standalone file", filename);
161                 try (InputStream in = this.delegate.read(filename)) {
162                     final byte[] bytes = ByteStreams.toByteArray(in);
163                     return new ByteArrayInputStream(bytes);
164                 }
165             } else {
166                 final String bucketName = BUCKET_PREFIX + bucket + BUCKET_EXT;
167                 LOGGER.debug("Reading {} from bucket file {}", filename, bucketName);
168                 try (InputStream in = this.delegate.read(bucketName)) {
169                     Entry entry;
170                     while ((entry = Entry.read(in)) != null) {
171                         if (entry.getFilename().equals(filename)) {
172                             return new ByteArrayInputStream(entry.getData());
173                         }
174                     }
175                 }
176                 throw new IOException("Cannot find '" + filename + "' in bucket file '"
177                         + bucketName + "' - perhaps the file was changed by another application. "
178                         + "Consider restarting the system rebuilding the index");
179             }
180 
181         } finally {
182             // Always release the lock
183             this.lock.readLock().unlock();
184         }
185     }
186 
187     @Override
188     public OutputStream write(final String filename) throws FileExistsException, IOException {
189 
190         // Check there is no file stored with the same filename
191         this.lock.readLock().lock();
192         try {
193             final int bucket = this.index.getBucketForFilename(filename, false);
194             if (bucket != 0) {
195                 throw new FileExistsException(filename, null);
196             }
197         } finally {
198             this.lock.readLock().unlock();
199         }
200 
201         // Log beginning of operation
202         LOGGER.debug("Writing {} to memory buffer", filename);
203 
204         // Return a ByteArrayOutputStream that collect data in memory. Once writing is done, data
205         // is written to the FileStore and the hash table is modified accordingly
206         return new ByteArrayOutputStream() {
207 
208             private final AtomicBoolean closed = new AtomicBoolean(false);
209 
210             @Override
211             public void close() throws IOException {
212 
213                 // Discard extra invocations of close()
214                 if (!this.closed.compareAndSet(false, true)) {
215                     return;
216                 }
217 
218                 // Delegate
219                 super.close();
220 
221                 // This prevents any other read/write/delete/merge operation to occur
222                 MultiFileStore.this.lock.writeLock().lock();
223 
224                 try {
225                     // Drop index file and schedule its re-creation at close time
226                     markDirty();
227 
228                     // Log beginning of operation
229                     LOGGER.debug("Writing {} ({} bytes) to standalone file", filename, this.count);
230 
231                     // Write a standalone file to the underlying file store
232                     try (OutputStream stream = MultiFileStore.this.delegate.write(filename)) {
233                         stream.write(this.buf, 0, this.count);
234                     }
235 
236                     // Update the index
237                     MultiFileStore.this.index.initBucketForFilename(filename, -1);
238 
239                 } catch (final FileExistsException ex) {
240                     // Propagate with a more useful error message
241                     throw new IOException("Write rejected: another file with the same name '"
242                             + filename + "' has been written concurrently");
243 
244                 } finally {
245                     // Always release the lock
246                     MultiFileStore.this.lock.writeLock().unlock();
247                 }
248             }
249 
250         };
251     }
252 
253     @Override
254     public void delete(final String filename) throws FileMissingException, IOException {
255 
256         // This prevents any other read/write/delete/merge operation to occur
257         this.lock.writeLock().lock();
258 
259         try {
260             // Lookup the bucket for the filename in the index; throw error if file not exist
261             final int bucket = this.index.getBucketForFilename(filename, true);
262 
263             // Drop index files and schedule their re-creation at close time
264             markDirty();
265 
266             if (bucket < 0) {
267                 // If there is no bucket, delete a singleton file delegating to wrapped FileStore
268                 LOGGER.debug("Deleting {} in standalone file", filename);
269                 this.delegate.delete(filename);
270                 this.index.updateBucketForFilename(filename, 0);
271 
272             } else {
273                 // Otherwise, try to replace the bucket file with a set of exploded singleton
274                 // files, collecting their names and trying to undo changes on error
275                 final String bucketName = BUCKET_PREFIX + bucket + BUCKET_EXT;
276                 LOGGER.debug("Deleting {} in bucket file {}", filename, bucketName);
277                 final List<String> explodedFilenames = Lists.newArrayList();
278                 try (InputStream in = this.delegate.read(bucketName)) {
279                     Entry entry;
280                     while ((entry = Entry.read(in)) != null) {
281                         try (OutputStream out = this.delegate.write(entry.getFilename())) {
282                             if (!entry.getFilename().equals(filename)) {
283                                 LOGGER.debug(
284                                         "Extracting standalone file {} ({} bytes) from bucket file {}",
285                                         entry.getFilename(), entry.getData().length, bucketName);
286                                 out.write(entry.getData());
287                                 explodedFilenames.add(entry.getFilename());
288                             }
289                         }
290                     }
291                     LOGGER.debug("Deleting bucket file {}", bucketName);
292                     this.delegate.delete(bucketName);
293                 } catch (final Throwable ex) {
294                     LOGGER.debug("Explosion of bucket {} failed - attempting recovery", bucketName);
295                     for (final String explodedFilename : explodedFilenames) {
296                         try {
297                             LOGGER.debug("Deleting exploded standalone file {}", explodedFilename);
298                             this.delegate.delete(explodedFilename);
299                         } catch (final Throwable ex2) {
300                             LOGGER.error("Recovery error: cannot delete '" + explodedFilename
301                                     + "' - you should delete it manually", ex2);
302                         }
303                     }
304                     throw new IOException("Cannot explode bucket " + bucketName, ex);
305                 }
306 
307                 // Update index
308                 for (final String explodedFilename : explodedFilenames) {
309                     this.index.updateBucketForFilename(explodedFilename, -1);
310                 }
311                 this.index.updateBucketForFilename(filename, 0);
312                 this.index.releaseBucket(bucket);
313             }
314 
315         } finally {
316             // Always release the lock
317             this.lock.writeLock().unlock();
318         }
319     }
320 
321     @Override
322     public Stream<String> list() throws IOException {
323 
324         // Define a function mapping each filename in underlying file store to the filenames it
325         // corresponds to: standalone filenames are mapped to themselves, bucket filenames are
326         // mapped to their content filenames; the index file is ignored
327         final Function<String, Stream<String>> transformer = new Function<String, Stream<String>>() {
328 
329             @Override
330             public Stream<String> apply(final String filename) {
331                 if (filename.equals(INDEX_FILENAME)) {
332                     return Stream.create();
333                 } else if (!filename.startsWith(BUCKET_PREFIX)) {
334                     return Stream.create(filename);
335                 } else {
336                     final List<String> filenames = Lists.newArrayList();
337                     try (InputStream in = MultiFileStore.this.delegate.read(filename)) {
338                         Entry entry;
339                         while ((entry = Entry.read(in)) != null) {
340                             filenames.add(entry.getFilename());
341                         }
342                     } catch (final IOException ex) {
343                         Throwables.propagate(ex);
344                     }
345                     return Stream.create(filenames);
346                 }
347             }
348 
349         };
350 
351         // Return a stream that applies the transformer lazily during the iteration
352         return Stream.concat(this.delegate.list().transform(transformer, 1));
353     }
354 
355     @Override
356     public void close() {
357 
358         try {
359             // Unschedule the periodic merge task
360             LOGGER.debug("Unscheduling merge task");
361             this.mergeFuture.cancel(false);
362 
363             // Save index data if necessary
364             if (this.dirty) {
365                 LOGGER.debug("Saving modified index to {}", INDEX_FILENAME);
366                 try (OutputStream stream = this.delegate.write(INDEX_FILENAME)) {
367                     this.index.write(stream);
368                 }
369             }
370 
371         } catch (final Throwable ex) {
372             // Wrap and propagate
373             Throwables.propagate(ex);
374 
375         } finally {
376             // Release memory and delegate
377             this.index = null;
378             this.delegate.close();
379         }
380     }
381 
382     private void merge() throws IOException {
383 
384         // This prevents any other read/write/delete/merge operation to occur
385         this.lock.writeLock().lock();
386 
387         try {
388             // Abort if there are not enough standalone files to merge
389             if (this.index.getStandaloneFilenames().size() < this.bucketSize) {
390                 return;
391             }
392 
393             // Drop index files and schedule their re-creation at close time
394             markDirty();
395 
396             // Sort standalone filenames (balance mix of raw and annotation files in bucket)
397             final List<String> sortedFilenames = Ordering.natural().sortedCopy(
398                     this.index.getStandaloneFilenames());
399             LOGGER.debug("Merge started - {} files to merge", sortedFilenames.size());
400 
401             // Create a bucket for each chunk of bucketSize files
402             for (int i = 0; i <= sortedFilenames.size() - this.bucketSize; i += this.bucketSize) {
403 
404                 // Obtain the standalone filenames (sorted) to be put in a new bucket
405                 final List<String> filenames = sortedFilenames.subList(i, i + this.bucketSize);
406 
407                 // Obtain a free bucket number
408                 final int bucket = this.index.allocateBucket(-1);
409 
410                 // Create the bucket file. On failure, delete it and propagate
411                 final String bucketFilename = BUCKET_PREFIX + bucket + BUCKET_EXT;
412                 LOGGER.debug("Creating bucket file {}", bucketFilename);
413                 try (OutputStream out = this.delegate.write(bucketFilename)) {
414                     for (final String filename : filenames) {
415                         try (InputStream in = this.delegate.read(filename)) {
416                             final byte[] data = ByteStreams.toByteArray(in);
417                             final Entry entry = new Entry(filename, data);
418                             Entry.write(out, entry);
419                         }
420                     }
421                 } catch (final Throwable ex) {
422                     try {
423                         this.delegate.delete(bucketFilename);
424                     } catch (final Throwable ex2) {
425                         LOGGER.error("Recovery error: cannot delete '" + bucketFilename
426                                 + "' - you should delete it manually", ex2);
427                     }
428                     throw new IOException("Cannot create bucket " + bucketFilename
429                             + " with files " + Joiner.on(", ").join(filenames), ex);
430                 }
431 
432                 // Delete standalone file that have been merged
433                 for (final String filename : filenames) {
434                     try {
435                         this.delegate.delete(filename);
436                     } catch (final Throwable ex) {
437                         LOGGER.error("Cannot delete standalone file " + bucketFilename
438                                 + " after creation of bucket " + bucketFilename
439                                 + " - you should delete it manually", ex);
440                     }
441                 }
442 
443                 // Update hash table and set of standalone files
444                 for (final String filename : filenames) {
445                     this.index.updateBucketForFilename(filename, bucket);
446                 }
447             }
448 
449             // Log status
450             LOGGER.debug("Merge done - {} standalone files remaining", this.index
451                     .getStandaloneFilenames().size());
452 
453         } finally {
454             // Always release the lock
455             this.lock.writeLock().unlock();
456         }
457     }
458 
459     private void markDirty() throws IOException {
460         if (!this.dirty) {
461             this.dirty = true;
462             LOGGER.info("Index has changed. Deleting index file {}. "
463                     + "Will be recreated at close time", INDEX_FILENAME);
464             try {
465                 this.delegate.delete(INDEX_FILENAME);
466             } catch (final FileMissingException ex) {
467                 // Ignore
468             } catch (final Throwable ex) {
469                 throw new IOException("Cannot delete stale index file", ex);
470             }
471         }
472     }
473 
474     private static final class Index {
475 
476         private static final long DELETED = 0xFFFFFFFFFFFFFFFFL;
477 
478         private long[] tableHashes;
479 
480         private int[] tableBuckets;
481 
482         private int tableSize;
483 
484         private int size;
485 
486         private final Set<Integer> unusedBuckets;
487 
488         private int maxBucket;
489 
490         private final Set<String> standaloneFilenames;
491 
492         public Index() {
493             this.tableHashes = new long[16 * 2];
494             this.tableBuckets = new int[16];
495             this.tableSize = 0;
496             this.size = 0;
497             this.unusedBuckets = Sets.newHashSet();
498             this.maxBucket = 0;
499             this.standaloneFilenames = Sets.newHashSet();
500         }
501 
502         public int countFilenames() {
503             return this.size;
504         }
505 
506         public int countAllocatedBuckets() {
507             return this.maxBucket - this.unusedBuckets.size();
508         }
509 
510         public int allocateBucket(int bucket) {
511 
512             // Identify the bucket to allocate in case a valid bucket number was not given
513             if (bucket < 0) {
514                 if (!this.unusedBuckets.isEmpty()) {
515                     bucket = this.unusedBuckets.iterator().next();
516                 } else {
517                     bucket = this.maxBucket + 1;
518                 }
519             }
520 
521             // Update unused bucket information
522             if (bucket > this.maxBucket) {
523                 for (int i = this.maxBucket + 1; i < bucket; ++i) {
524                     this.unusedBuckets.add(i);
525                 }
526                 this.maxBucket = bucket;
527             } else {
528                 this.unusedBuckets.remove(bucket);
529             }
530 
531             // Return the bucket number actually allocated
532             return bucket;
533         }
534 
535         public void releaseBucket(final int bucket) {
536 
537             if (bucket < this.maxBucket) {
538                 this.unusedBuckets.add(bucket);
539             } else {
540                 do {
541                     --this.maxBucket;
542                 } while (this.unusedBuckets.remove(this.maxBucket));
543             }
544         }
545 
546         public int getBucketForFilename(final String filename, final boolean mustExist)
547                 throws FileMissingException {
548 
549             // Identify the table slot for the filename supplied; throw error if not found
550             final Hash hash = hash(filename);
551             int slot = ((int) hash.getLow() & 0x7FFFFFFF) % this.tableBuckets.length;
552             while (true) {
553                 final long lo = this.tableHashes[slot * 2];
554                 final long hi = this.tableHashes[slot * 2 + 1];
555                 if (lo == 0L && hi == 0L) {
556                     if (!mustExist) {
557                         return 0;
558                     } else {
559                         throw new FileMissingException(filename, null);
560                     }
561                 } else if (lo == hash.getLow() && hi == hash.getHigh()) {
562                     break;
563                 }
564                 slot = (slot + 1) % this.tableBuckets.length;
565             }
566 
567             // Return the bucket number stored in the slot
568             return this.tableBuckets[slot];
569         }
570 
571         public void initBucketForFilename(final String filename, final int bucket)
572                 throws FileExistsException {
573 
574             // Assign a table slot to the filename supplied; throw error if already stored
575             final Hash hash = hash(filename);
576             int slot = ((int) hash.getLow() & 0x7FFFFFFF) % this.tableBuckets.length;
577             while (true) {
578                 final long lo = this.tableHashes[slot * 2];
579                 final long hi = this.tableHashes[slot * 2 + 1];
580                 if (lo == 0L && hi == 0L) {
581                     break;
582                 } else if (lo == hash.getLow() && hi == hash.getHigh()) {
583                     throw new FileExistsException(filename, null);
584                 }
585                 slot = (slot + 1) % this.tableBuckets.length;
586             }
587 
588             // Update set of standalone filenames
589             if (bucket < 0) {
590                 this.standaloneFilenames.add(filename);
591             }
592 
593             // Update table and counters
594             this.tableHashes[slot * 2] = hash.getLow();
595             this.tableHashes[slot * 2 + 1] = hash.getHigh();
596             this.tableBuckets[slot] = bucket;
597             ++this.tableSize;
598             ++this.size;
599 
600             // Rehash the map if too big
601             if (this.tableSize > this.tableBuckets.length * 2 / 3) {
602                 rehash();
603             }
604         }
605 
606         public void updateBucketForFilename(final String filename, final int bucket)
607                 throws FileMissingException {
608 
609             // Identify the table slot for the filename supplied; throw error if not found
610             final Hash hash = hash(filename);
611             int slot = ((int) hash.getLow() & 0x7FFFFFFF) % this.tableBuckets.length;
612             while (true) {
613                 final long lo = this.tableHashes[slot * 2];
614                 final long hi = this.tableHashes[slot * 2 + 1];
615                 if (lo == 0L && hi == 0L) {
616                     throw new FileMissingException(filename, null);
617                 } else if (lo == hash.getLow() && hi == hash.getHigh()) {
618                     break;
619                 }
620                 slot = (slot + 1) % this.tableBuckets.length;
621             }
622 
623             // Update set of standalone filenames
624             if (this.tableBuckets[slot] < 0 && bucket >= 0) {
625                 this.standaloneFilenames.remove(filename);
626             } else if (this.tableBuckets[slot] >= 0 && bucket < 0) {
627                 this.standaloneFilenames.add(filename);
628             }
629 
630             // Update table and counters
631             this.tableBuckets[slot] = bucket;
632             if (bucket == 0) {
633                 this.tableHashes[slot * 2] = DELETED;
634                 this.tableHashes[slot * 2 + 1] = DELETED;
635                 --this.size;
636             }
637         }
638 
639         public Set<String> getStandaloneFilenames() {
640             return this.standaloneFilenames;
641         }
642 
643         public void read(final InputStream stream) throws IOException {
644 
645             // Wrap the stream
646             final DataInputStream in = new DataInputStream(stream);
647 
648             // Read information about unused buckets
649             this.maxBucket = in.readInt();
650             final int numUnusedBuckets = in.readInt();
651             for (int i = 0; i < numUnusedBuckets; ++i) {
652                 this.unusedBuckets.add(in.readInt());
653             }
654 
655             // Read standalone filenames
656             final int numStandaloneFilenames = in.readInt();
657             for (int i = 0; i < numStandaloneFilenames; ++i) {
658                 final int len = in.readInt();
659                 final byte[] filename = new byte[len];
660                 in.readFully(filename);
661                 this.standaloneFilenames.add(new String(filename, Charsets.UTF_8));
662             }
663 
664             // Read number of elements in the hash table
665             this.size = in.readInt();
666             this.tableSize = this.size;
667 
668             // Resize hash table
669             int capacity = 16;
670             while (this.size >= capacity * 2 / 3) {
671                 capacity *= 2;
672             }
673             this.tableHashes = new long[capacity * 2];
674             this.tableBuckets = new int[capacity];
675 
676             // Load hash table
677             for (int i = 0; i < this.size; ++i) {
678                 final long lo = in.readLong();
679                 final long hi = in.readLong();
680                 final int bucket = in.readInt();
681                 int slot = ((int) lo & 0x7FFFFFFF) % capacity;
682                 while (true) {
683                     if (this.tableHashes[slot * 2] == 0L && this.tableHashes[slot * 2 + 1] == 0L) {
684                         this.tableHashes[slot * 2] = lo;
685                         this.tableHashes[slot * 2 + 1] = hi;
686                         this.tableBuckets[slot] = bucket;
687                         break;
688                     } else {
689                         slot = (slot + 1) % capacity;
690                     }
691                 }
692             }
693         }
694 
695         public void write(final OutputStream stream) throws IOException {
696 
697             // Wrap the stream
698             final DataOutputStream out = new DataOutputStream(stream);
699 
700             // Write information about unused buckets
701             out.writeInt(this.maxBucket);
702             out.writeInt(this.unusedBuckets.size());
703             for (final Integer unusedBucket : this.unusedBuckets) {
704                 out.writeInt(unusedBucket);
705             }
706 
707             // Write standalone filenames
708             out.writeInt(this.standaloneFilenames.size());
709             for (final String filename : this.standaloneFilenames) {
710                 final byte[] bytes = filename.getBytes(Charsets.UTF_8);
711                 out.writeInt(bytes.length);
712                 out.write(bytes);
713             }
714 
715             // Write hash table
716             out.writeInt(this.size);
717             for (int slot = 0; slot < this.tableBuckets.length; ++slot) {
718                 final long lo = this.tableHashes[slot * 2];
719                 final long hi = this.tableHashes[slot * 2 + 1];
720                 if (lo == 0L && hi == 0L || lo == DELETED && hi == DELETED) {
721                     continue;
722                 }
723                 out.writeLong(lo);
724                 out.writeLong(hi);
725                 out.writeInt(this.tableBuckets[slot]);
726             }
727 
728             // Flush
729             out.flush();
730         }
731 
732         private void rehash() {
733 
734             // Allocate data structures for the new hash table
735             final int newCapacity = this.tableBuckets.length * 2;
736             final long[] newTableHashes = new long[newCapacity * 2];
737             final int[] newTableBuckets = new int[newCapacity];
738 
739             // Populate the new hash table
740             for (int slot = 0; slot < this.tableBuckets.length; ++slot) {
741 
742                 // Retrieve <lo, hi, bucket> triple, skipping null and deleted entries
743                 final long lo = this.tableHashes[slot * 2];
744                 final long hi = this.tableHashes[slot * 2 + 1];
745                 if (lo == 0L && hi == 0L || lo == DELETED && hi == DELETED) {
746                     continue;
747                 }
748                 final int bucket = this.tableBuckets[slot];
749 
750                 // Reindex the triple in the new hash table
751                 int newSlot = ((int) lo & 0x7FFFFFFF) % newCapacity;
752                 while (true) {
753                     if (newTableHashes[newSlot * 2] == 0L && newTableHashes[newSlot * 2 + 1] == 0L) {
754                         newTableHashes[newSlot * 2] = lo;
755                         newTableHashes[newSlot * 2 + 1] = hi;
756                         newTableBuckets[newSlot] = bucket;
757                         break;
758                     } else {
759                         newSlot = (newSlot + 1) % newCapacity;
760                     }
761                 }
762             }
763 
764             // Replace old hash table with new one
765             this.tableHashes = newTableHashes;
766             this.tableBuckets = newTableBuckets;
767             this.tableSize = this.size;
768 
769             // Log operation
770             LOGGER.debug("Rehashed to {} entries", newCapacity);
771         }
772 
773         private static Hash hash(final String filename) {
774             final Hash hash = Hash.murmur3(filename);
775             if (hash.getLow() == 0L && hash.getHigh() == 0L //
776                     || hash.getLow() == DELETED && hash.getHigh() == DELETED) {
777                 return Hash.fromLongs(0L, 1L); // avoid 0,0 and DELETED,DELETED
778             }
779             return hash;
780         }
781 
782     }
783 
784     public static final class Entry {
785 
786         private final String filename;
787 
788         private final byte[] data;
789 
790         public Entry(final String filename, final byte[] data) {
791             this.filename = Objects.requireNonNull(filename);
792             this.data = Objects.requireNonNull(data);
793         }
794 
795         public String getFilename() {
796             return this.filename;
797         }
798 
799         public byte[] getData() {
800             return this.data;
801         }
802 
803         @Override
804         public boolean equals(final Object object) {
805             if (object == this) {
806                 return true;
807             }
808             if (!(object instanceof Entry)) {
809                 return false;
810             }
811             final Entry other = (Entry) object;
812             return this.filename.equals(other.filename);
813         }
814 
815         @Override
816         public int hashCode() {
817             return this.filename.hashCode();
818         }
819 
820         @Override
821         public String toString() {
822             return this.filename + " (" + this.data + " bytes)";
823         }
824 
825         @Nullable
826         public static Entry read(final InputStream stream) throws IOException {
827 
828             try {
829                 // Wrap the stream
830                 final DataInputStream in = new DataInputStream(stream);
831 
832                 // Read the filename
833                 final int nameLength = in.readShort();
834                 final byte[] name = new byte[nameLength];
835                 in.readFully(name);
836 
837                 // Read the file content
838                 final int dataLength = in.readInt();
839                 final byte[] data = new byte[dataLength];
840                 in.readFully(data);
841 
842                 // Return corresponding entry
843                 return new Entry(new String(name, Charsets.UTF_8), data);
844 
845             } catch (final EOFException ex) {
846                 // Ignore EOF and return a null result
847                 return null;
848             }
849         }
850 
851         public static void write(final OutputStream stream, final Entry entry) throws IOException {
852 
853             // Retrieve name and data byte arrays from the entry
854             final byte[] name = entry.getFilename().getBytes(Charsets.UTF_8);
855             final byte[] data = entry.getData();
856 
857             // Write name length, name bytes, data length, data bytes
858             final DataOutputStream out = new DataOutputStream(stream);
859             out.writeShort(name.length);
860             out.write(name);
861             out.writeInt(data.length);
862             out.write(data);
863         }
864 
865     }
866 
867 }