1 package eu.fbk.knowledgestore.filestore;
2
3 import java.io.ByteArrayInputStream;
4 import java.io.ByteArrayOutputStream;
5 import java.io.DataInputStream;
6 import java.io.DataOutputStream;
7 import java.io.EOFException;
8 import java.io.IOException;
9 import java.io.InputStream;
10 import java.io.OutputStream;
11 import java.util.List;
12 import java.util.Objects;
13 import java.util.Set;
14 import java.util.concurrent.Future;
15 import java.util.concurrent.TimeUnit;
16 import java.util.concurrent.atomic.AtomicBoolean;
17 import java.util.concurrent.locks.ReadWriteLock;
18 import java.util.concurrent.locks.ReentrantReadWriteLock;
19
20 import javax.annotation.Nullable;
21
22 import com.google.common.base.Charsets;
23 import com.google.common.base.Function;
24 import com.google.common.base.Joiner;
25 import com.google.common.base.Throwables;
26 import com.google.common.collect.Lists;
27 import com.google.common.collect.Ordering;
28 import com.google.common.collect.Sets;
29 import com.google.common.io.ByteStreams;
30
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33
34 import eu.fbk.knowledgestore.data.Data;
35 import eu.fbk.knowledgestore.data.Stream;
36 import eu.fbk.rdfpro.util.Hash;
37 import eu.fbk.rdfpro.util.IO;
38
39 public class MultiFileStore extends ForwardingFileStore {
40
41 private static final Logger LOGGER = LoggerFactory.getLogger(MultiFileStore.class);
42
43 private static final int DEFAULT_BUCKET_SIZE = 10;
44
45 private static final String INDEX_FILENAME = "mfs.idx.bin";
46
47 private static final String BUCKET_PREFIX = "mfs.b";
48
49 private static final String BUCKET_EXT = ".bin";
50
51 private static final long MERGE_PERIOD = 5000L;
52
53 private final FileStore delegate;
54
55 private final int bucketSize;
56
57 private final ReadWriteLock lock;
58
59 private Future<?> mergeFuture;
60
61 private Index index;
62
63 private boolean dirty;
64
65 public MultiFileStore(final FileStore delegate, @Nullable final Integer bucketSize) {
66 this.delegate = Objects.requireNonNull(delegate);
67 this.bucketSize = bucketSize != null ? bucketSize : DEFAULT_BUCKET_SIZE;
68 this.lock = new ReentrantReadWriteLock(true);
69 this.mergeFuture = null;
70 this.index = null;
71 this.dirty = false;
72 LOGGER.info("{} configured, bucket size = {}", getClass().getSimpleName(), this.bucketSize);
73 }
74
75 @Override
76 protected FileStore delegate() {
77 return this.delegate;
78 }
79
80 @Override
81 public void init() throws IOException, IllegalStateException {
82
83
84 super.init();
85
86 try {
87
88 this.index = new Index();
89 try {
90
91 try (final InputStream stream = this.delegate.read(INDEX_FILENAME)) {
92 LOGGER.debug("Loading saved index from {}", INDEX_FILENAME);
93 this.index.read(stream);
94 this.dirty = false;
95 }
96
97 } catch (final FileMissingException ex) {
98
99 LOGGER.debug("Creating index (no file {} found)", INDEX_FILENAME);
100 for (final String filename : this.delegate.list()) {
101 if (!filename.startsWith(BUCKET_PREFIX)) {
102 this.index.initBucketForFilename(filename, -1);
103 } else {
104 final int bucket = Integer.parseInt(filename.substring(
105 BUCKET_PREFIX.length(), filename.length() - BUCKET_EXT.length()));
106 this.index.allocateBucket(bucket);
107 try (InputStream stream = this.delegate.read(filename)) {
108 Entry entry;
109 while ((entry = Entry.read(stream)) != null) {
110 this.index.initBucketForFilename(entry.getFilename(), bucket);
111 }
112 }
113 }
114 }
115 merge();
116 this.dirty = true;
117 }
118
119
120 if (LOGGER.isDebugEnabled()) {
121 final int numFilenames = this.index.countFilenames();
122 final int numStandaloneFilenames = this.index.getStandaloneFilenames().size();
123 final int numBuckets = this.index.countAllocatedBuckets();
124 LOGGER.debug("{} files ({} in {} buckets, {} standalone)", numFilenames,
125 numFilenames - numStandaloneFilenames, numBuckets, numStandaloneFilenames);
126 }
127
128
129 LOGGER.debug("Scheduling merge task every {} ms", MERGE_PERIOD);
130 this.mergeFuture = Data.getExecutor().scheduleWithFixedDelay(() -> {
131 try {
132 merge();
133 } catch (final Throwable ex) {
134 LOGGER.error("Merge failed", ex);
135 }
136 }, MERGE_PERIOD, MERGE_PERIOD, TimeUnit.MILLISECONDS);
137
138 } catch (final Throwable ex) {
139
140 IO.closeQuietly(this.delegate);
141 Throwables.propagateIfPossible(ex, IOException.class);
142 Throwables.propagate(ex);
143 }
144 }
145
146 @Override
147 public InputStream read(final String filename) throws FileMissingException, IOException {
148
149
150 this.lock.readLock().lock();
151
152 try {
153
154 final int bucket = this.index.getBucketForFilename(filename, true);
155
156
157
158
159 if (bucket < 0) {
160 LOGGER.debug("Reading {} from standalone file", filename);
161 try (InputStream in = this.delegate.read(filename)) {
162 final byte[] bytes = ByteStreams.toByteArray(in);
163 return new ByteArrayInputStream(bytes);
164 }
165 } else {
166 final String bucketName = BUCKET_PREFIX + bucket + BUCKET_EXT;
167 LOGGER.debug("Reading {} from bucket file {}", filename, bucketName);
168 try (InputStream in = this.delegate.read(bucketName)) {
169 Entry entry;
170 while ((entry = Entry.read(in)) != null) {
171 if (entry.getFilename().equals(filename)) {
172 return new ByteArrayInputStream(entry.getData());
173 }
174 }
175 }
176 throw new IOException("Cannot find '" + filename + "' in bucket file '"
177 + bucketName + "' - perhaps the file was changed by another application. "
178 + "Consider restarting the system rebuilding the index");
179 }
180
181 } finally {
182
183 this.lock.readLock().unlock();
184 }
185 }
186
187 @Override
188 public OutputStream write(final String filename) throws FileExistsException, IOException {
189
190
191 this.lock.readLock().lock();
192 try {
193 final int bucket = this.index.getBucketForFilename(filename, false);
194 if (bucket != 0) {
195 throw new FileExistsException(filename, null);
196 }
197 } finally {
198 this.lock.readLock().unlock();
199 }
200
201
202 LOGGER.debug("Writing {} to memory buffer", filename);
203
204
205
206 return new ByteArrayOutputStream() {
207
208 private final AtomicBoolean closed = new AtomicBoolean(false);
209
210 @Override
211 public void close() throws IOException {
212
213
214 if (!this.closed.compareAndSet(false, true)) {
215 return;
216 }
217
218
219 super.close();
220
221
222 MultiFileStore.this.lock.writeLock().lock();
223
224 try {
225
226 markDirty();
227
228
229 LOGGER.debug("Writing {} ({} bytes) to standalone file", filename, this.count);
230
231
232 try (OutputStream stream = MultiFileStore.this.delegate.write(filename)) {
233 stream.write(this.buf, 0, this.count);
234 }
235
236
237 MultiFileStore.this.index.initBucketForFilename(filename, -1);
238
239 } catch (final FileExistsException ex) {
240
241 throw new IOException("Write rejected: another file with the same name '"
242 + filename + "' has been written concurrently");
243
244 } finally {
245
246 MultiFileStore.this.lock.writeLock().unlock();
247 }
248 }
249
250 };
251 }
252
253 @Override
254 public void delete(final String filename) throws FileMissingException, IOException {
255
256
257 this.lock.writeLock().lock();
258
259 try {
260
261 final int bucket = this.index.getBucketForFilename(filename, true);
262
263
264 markDirty();
265
266 if (bucket < 0) {
267
268 LOGGER.debug("Deleting {} in standalone file", filename);
269 this.delegate.delete(filename);
270 this.index.updateBucketForFilename(filename, 0);
271
272 } else {
273
274
275 final String bucketName = BUCKET_PREFIX + bucket + BUCKET_EXT;
276 LOGGER.debug("Deleting {} in bucket file {}", filename, bucketName);
277 final List<String> explodedFilenames = Lists.newArrayList();
278 try (InputStream in = this.delegate.read(bucketName)) {
279 Entry entry;
280 while ((entry = Entry.read(in)) != null) {
281 try (OutputStream out = this.delegate.write(entry.getFilename())) {
282 if (!entry.getFilename().equals(filename)) {
283 LOGGER.debug(
284 "Extracting standalone file {} ({} bytes) from bucket file {}",
285 entry.getFilename(), entry.getData().length, bucketName);
286 out.write(entry.getData());
287 explodedFilenames.add(entry.getFilename());
288 }
289 }
290 }
291 LOGGER.debug("Deleting bucket file {}", bucketName);
292 this.delegate.delete(bucketName);
293 } catch (final Throwable ex) {
294 LOGGER.debug("Explosion of bucket {} failed - attempting recovery", bucketName);
295 for (final String explodedFilename : explodedFilenames) {
296 try {
297 LOGGER.debug("Deleting exploded standalone file {}", explodedFilename);
298 this.delegate.delete(explodedFilename);
299 } catch (final Throwable ex2) {
300 LOGGER.error("Recovery error: cannot delete '" + explodedFilename
301 + "' - you should delete it manually", ex2);
302 }
303 }
304 throw new IOException("Cannot explode bucket " + bucketName, ex);
305 }
306
307
308 for (final String explodedFilename : explodedFilenames) {
309 this.index.updateBucketForFilename(explodedFilename, -1);
310 }
311 this.index.updateBucketForFilename(filename, 0);
312 this.index.releaseBucket(bucket);
313 }
314
315 } finally {
316
317 this.lock.writeLock().unlock();
318 }
319 }
320
321 @Override
322 public Stream<String> list() throws IOException {
323
324
325
326
327 final Function<String, Stream<String>> transformer = new Function<String, Stream<String>>() {
328
329 @Override
330 public Stream<String> apply(final String filename) {
331 if (filename.equals(INDEX_FILENAME)) {
332 return Stream.create();
333 } else if (!filename.startsWith(BUCKET_PREFIX)) {
334 return Stream.create(filename);
335 } else {
336 final List<String> filenames = Lists.newArrayList();
337 try (InputStream in = MultiFileStore.this.delegate.read(filename)) {
338 Entry entry;
339 while ((entry = Entry.read(in)) != null) {
340 filenames.add(entry.getFilename());
341 }
342 } catch (final IOException ex) {
343 Throwables.propagate(ex);
344 }
345 return Stream.create(filenames);
346 }
347 }
348
349 };
350
351
352 return Stream.concat(this.delegate.list().transform(transformer, 1));
353 }
354
355 @Override
356 public void close() {
357
358 try {
359
360 LOGGER.debug("Unscheduling merge task");
361 this.mergeFuture.cancel(false);
362
363
364 if (this.dirty) {
365 LOGGER.debug("Saving modified index to {}", INDEX_FILENAME);
366 try (OutputStream stream = this.delegate.write(INDEX_FILENAME)) {
367 this.index.write(stream);
368 }
369 }
370
371 } catch (final Throwable ex) {
372
373 Throwables.propagate(ex);
374
375 } finally {
376
377 this.index = null;
378 this.delegate.close();
379 }
380 }
381
382 private void merge() throws IOException {
383
384
385 this.lock.writeLock().lock();
386
387 try {
388
389 if (this.index.getStandaloneFilenames().size() < this.bucketSize) {
390 return;
391 }
392
393
394 markDirty();
395
396
397 final List<String> sortedFilenames = Ordering.natural().sortedCopy(
398 this.index.getStandaloneFilenames());
399 LOGGER.debug("Merge started - {} files to merge", sortedFilenames.size());
400
401
402 for (int i = 0; i <= sortedFilenames.size() - this.bucketSize; i += this.bucketSize) {
403
404
405 final List<String> filenames = sortedFilenames.subList(i, i + this.bucketSize);
406
407
408 final int bucket = this.index.allocateBucket(-1);
409
410
411 final String bucketFilename = BUCKET_PREFIX + bucket + BUCKET_EXT;
412 LOGGER.debug("Creating bucket file {}", bucketFilename);
413 try (OutputStream out = this.delegate.write(bucketFilename)) {
414 for (final String filename : filenames) {
415 try (InputStream in = this.delegate.read(filename)) {
416 final byte[] data = ByteStreams.toByteArray(in);
417 final Entry entry = new Entry(filename, data);
418 Entry.write(out, entry);
419 }
420 }
421 } catch (final Throwable ex) {
422 try {
423 this.delegate.delete(bucketFilename);
424 } catch (final Throwable ex2) {
425 LOGGER.error("Recovery error: cannot delete '" + bucketFilename
426 + "' - you should delete it manually", ex2);
427 }
428 throw new IOException("Cannot create bucket " + bucketFilename
429 + " with files " + Joiner.on(", ").join(filenames), ex);
430 }
431
432
433 for (final String filename : filenames) {
434 try {
435 this.delegate.delete(filename);
436 } catch (final Throwable ex) {
437 LOGGER.error("Cannot delete standalone file " + bucketFilename
438 + " after creation of bucket " + bucketFilename
439 + " - you should delete it manually", ex);
440 }
441 }
442
443
444 for (final String filename : filenames) {
445 this.index.updateBucketForFilename(filename, bucket);
446 }
447 }
448
449
450 LOGGER.debug("Merge done - {} standalone files remaining", this.index
451 .getStandaloneFilenames().size());
452
453 } finally {
454
455 this.lock.writeLock().unlock();
456 }
457 }
458
459 private void markDirty() throws IOException {
460 if (!this.dirty) {
461 this.dirty = true;
462 LOGGER.info("Index has changed. Deleting index file {}. "
463 + "Will be recreated at close time", INDEX_FILENAME);
464 try {
465 this.delegate.delete(INDEX_FILENAME);
466 } catch (final FileMissingException ex) {
467
468 } catch (final Throwable ex) {
469 throw new IOException("Cannot delete stale index file", ex);
470 }
471 }
472 }
473
474 private static final class Index {
475
476 private static final long DELETED = 0xFFFFFFFFFFFFFFFFL;
477
478 private long[] tableHashes;
479
480 private int[] tableBuckets;
481
482 private int tableSize;
483
484 private int size;
485
486 private final Set<Integer> unusedBuckets;
487
488 private int maxBucket;
489
490 private final Set<String> standaloneFilenames;
491
492 public Index() {
493 this.tableHashes = new long[16 * 2];
494 this.tableBuckets = new int[16];
495 this.tableSize = 0;
496 this.size = 0;
497 this.unusedBuckets = Sets.newHashSet();
498 this.maxBucket = 0;
499 this.standaloneFilenames = Sets.newHashSet();
500 }
501
502 public int countFilenames() {
503 return this.size;
504 }
505
506 public int countAllocatedBuckets() {
507 return this.maxBucket - this.unusedBuckets.size();
508 }
509
510 public int allocateBucket(int bucket) {
511
512
513 if (bucket < 0) {
514 if (!this.unusedBuckets.isEmpty()) {
515 bucket = this.unusedBuckets.iterator().next();
516 } else {
517 bucket = this.maxBucket + 1;
518 }
519 }
520
521
522 if (bucket > this.maxBucket) {
523 for (int i = this.maxBucket + 1; i < bucket; ++i) {
524 this.unusedBuckets.add(i);
525 }
526 this.maxBucket = bucket;
527 } else {
528 this.unusedBuckets.remove(bucket);
529 }
530
531
532 return bucket;
533 }
534
535 public void releaseBucket(final int bucket) {
536
537 if (bucket < this.maxBucket) {
538 this.unusedBuckets.add(bucket);
539 } else {
540 do {
541 --this.maxBucket;
542 } while (this.unusedBuckets.remove(this.maxBucket));
543 }
544 }
545
546 public int getBucketForFilename(final String filename, final boolean mustExist)
547 throws FileMissingException {
548
549
550 final Hash hash = hash(filename);
551 int slot = ((int) hash.getLow() & 0x7FFFFFFF) % this.tableBuckets.length;
552 while (true) {
553 final long lo = this.tableHashes[slot * 2];
554 final long hi = this.tableHashes[slot * 2 + 1];
555 if (lo == 0L && hi == 0L) {
556 if (!mustExist) {
557 return 0;
558 } else {
559 throw new FileMissingException(filename, null);
560 }
561 } else if (lo == hash.getLow() && hi == hash.getHigh()) {
562 break;
563 }
564 slot = (slot + 1) % this.tableBuckets.length;
565 }
566
567
568 return this.tableBuckets[slot];
569 }
570
571 public void initBucketForFilename(final String filename, final int bucket)
572 throws FileExistsException {
573
574
575 final Hash hash = hash(filename);
576 int slot = ((int) hash.getLow() & 0x7FFFFFFF) % this.tableBuckets.length;
577 while (true) {
578 final long lo = this.tableHashes[slot * 2];
579 final long hi = this.tableHashes[slot * 2 + 1];
580 if (lo == 0L && hi == 0L) {
581 break;
582 } else if (lo == hash.getLow() && hi == hash.getHigh()) {
583 throw new FileExistsException(filename, null);
584 }
585 slot = (slot + 1) % this.tableBuckets.length;
586 }
587
588
589 if (bucket < 0) {
590 this.standaloneFilenames.add(filename);
591 }
592
593
594 this.tableHashes[slot * 2] = hash.getLow();
595 this.tableHashes[slot * 2 + 1] = hash.getHigh();
596 this.tableBuckets[slot] = bucket;
597 ++this.tableSize;
598 ++this.size;
599
600
601 if (this.tableSize > this.tableBuckets.length * 2 / 3) {
602 rehash();
603 }
604 }
605
606 public void updateBucketForFilename(final String filename, final int bucket)
607 throws FileMissingException {
608
609
610 final Hash hash = hash(filename);
611 int slot = ((int) hash.getLow() & 0x7FFFFFFF) % this.tableBuckets.length;
612 while (true) {
613 final long lo = this.tableHashes[slot * 2];
614 final long hi = this.tableHashes[slot * 2 + 1];
615 if (lo == 0L && hi == 0L) {
616 throw new FileMissingException(filename, null);
617 } else if (lo == hash.getLow() && hi == hash.getHigh()) {
618 break;
619 }
620 slot = (slot + 1) % this.tableBuckets.length;
621 }
622
623
624 if (this.tableBuckets[slot] < 0 && bucket >= 0) {
625 this.standaloneFilenames.remove(filename);
626 } else if (this.tableBuckets[slot] >= 0 && bucket < 0) {
627 this.standaloneFilenames.add(filename);
628 }
629
630
631 this.tableBuckets[slot] = bucket;
632 if (bucket == 0) {
633 this.tableHashes[slot * 2] = DELETED;
634 this.tableHashes[slot * 2 + 1] = DELETED;
635 --this.size;
636 }
637 }
638
639 public Set<String> getStandaloneFilenames() {
640 return this.standaloneFilenames;
641 }
642
643 public void read(final InputStream stream) throws IOException {
644
645
646 final DataInputStream in = new DataInputStream(stream);
647
648
649 this.maxBucket = in.readInt();
650 final int numUnusedBuckets = in.readInt();
651 for (int i = 0; i < numUnusedBuckets; ++i) {
652 this.unusedBuckets.add(in.readInt());
653 }
654
655
656 final int numStandaloneFilenames = in.readInt();
657 for (int i = 0; i < numStandaloneFilenames; ++i) {
658 final int len = in.readInt();
659 final byte[] filename = new byte[len];
660 in.readFully(filename);
661 this.standaloneFilenames.add(new String(filename, Charsets.UTF_8));
662 }
663
664
665 this.size = in.readInt();
666 this.tableSize = this.size;
667
668
669 int capacity = 16;
670 while (this.size >= capacity * 2 / 3) {
671 capacity *= 2;
672 }
673 this.tableHashes = new long[capacity * 2];
674 this.tableBuckets = new int[capacity];
675
676
677 for (int i = 0; i < this.size; ++i) {
678 final long lo = in.readLong();
679 final long hi = in.readLong();
680 final int bucket = in.readInt();
681 int slot = ((int) lo & 0x7FFFFFFF) % capacity;
682 while (true) {
683 if (this.tableHashes[slot * 2] == 0L && this.tableHashes[slot * 2 + 1] == 0L) {
684 this.tableHashes[slot * 2] = lo;
685 this.tableHashes[slot * 2 + 1] = hi;
686 this.tableBuckets[slot] = bucket;
687 break;
688 } else {
689 slot = (slot + 1) % capacity;
690 }
691 }
692 }
693 }
694
695 public void write(final OutputStream stream) throws IOException {
696
697
698 final DataOutputStream out = new DataOutputStream(stream);
699
700
701 out.writeInt(this.maxBucket);
702 out.writeInt(this.unusedBuckets.size());
703 for (final Integer unusedBucket : this.unusedBuckets) {
704 out.writeInt(unusedBucket);
705 }
706
707
708 out.writeInt(this.standaloneFilenames.size());
709 for (final String filename : this.standaloneFilenames) {
710 final byte[] bytes = filename.getBytes(Charsets.UTF_8);
711 out.writeInt(bytes.length);
712 out.write(bytes);
713 }
714
715
716 out.writeInt(this.size);
717 for (int slot = 0; slot < this.tableBuckets.length; ++slot) {
718 final long lo = this.tableHashes[slot * 2];
719 final long hi = this.tableHashes[slot * 2 + 1];
720 if (lo == 0L && hi == 0L || lo == DELETED && hi == DELETED) {
721 continue;
722 }
723 out.writeLong(lo);
724 out.writeLong(hi);
725 out.writeInt(this.tableBuckets[slot]);
726 }
727
728
729 out.flush();
730 }
731
732 private void rehash() {
733
734
735 final int newCapacity = this.tableBuckets.length * 2;
736 final long[] newTableHashes = new long[newCapacity * 2];
737 final int[] newTableBuckets = new int[newCapacity];
738
739
740 for (int slot = 0; slot < this.tableBuckets.length; ++slot) {
741
742
743 final long lo = this.tableHashes[slot * 2];
744 final long hi = this.tableHashes[slot * 2 + 1];
745 if (lo == 0L && hi == 0L || lo == DELETED && hi == DELETED) {
746 continue;
747 }
748 final int bucket = this.tableBuckets[slot];
749
750
751 int newSlot = ((int) lo & 0x7FFFFFFF) % newCapacity;
752 while (true) {
753 if (newTableHashes[newSlot * 2] == 0L && newTableHashes[newSlot * 2 + 1] == 0L) {
754 newTableHashes[newSlot * 2] = lo;
755 newTableHashes[newSlot * 2 + 1] = hi;
756 newTableBuckets[newSlot] = bucket;
757 break;
758 } else {
759 newSlot = (newSlot + 1) % newCapacity;
760 }
761 }
762 }
763
764
765 this.tableHashes = newTableHashes;
766 this.tableBuckets = newTableBuckets;
767 this.tableSize = this.size;
768
769
770 LOGGER.debug("Rehashed to {} entries", newCapacity);
771 }
772
773 private static Hash hash(final String filename) {
774 final Hash hash = Hash.murmur3(filename);
775 if (hash.getLow() == 0L && hash.getHigh() == 0L
776 || hash.getLow() == DELETED && hash.getHigh() == DELETED) {
777 return Hash.fromLongs(0L, 1L);
778 }
779 return hash;
780 }
781
782 }
783
784 public static final class Entry {
785
786 private final String filename;
787
788 private final byte[] data;
789
790 public Entry(final String filename, final byte[] data) {
791 this.filename = Objects.requireNonNull(filename);
792 this.data = Objects.requireNonNull(data);
793 }
794
795 public String getFilename() {
796 return this.filename;
797 }
798
799 public byte[] getData() {
800 return this.data;
801 }
802
803 @Override
804 public boolean equals(final Object object) {
805 if (object == this) {
806 return true;
807 }
808 if (!(object instanceof Entry)) {
809 return false;
810 }
811 final Entry other = (Entry) object;
812 return this.filename.equals(other.filename);
813 }
814
815 @Override
816 public int hashCode() {
817 return this.filename.hashCode();
818 }
819
820 @Override
821 public String toString() {
822 return this.filename + " (" + this.data + " bytes)";
823 }
824
825 @Nullable
826 public static Entry read(final InputStream stream) throws IOException {
827
828 try {
829
830 final DataInputStream in = new DataInputStream(stream);
831
832
833 final int nameLength = in.readShort();
834 final byte[] name = new byte[nameLength];
835 in.readFully(name);
836
837
838 final int dataLength = in.readInt();
839 final byte[] data = new byte[dataLength];
840 in.readFully(data);
841
842
843 return new Entry(new String(name, Charsets.UTF_8), data);
844
845 } catch (final EOFException ex) {
846
847 return null;
848 }
849 }
850
851 public static void write(final OutputStream stream, final Entry entry) throws IOException {
852
853
854 final byte[] name = entry.getFilename().getBytes(Charsets.UTF_8);
855 final byte[] data = entry.getData();
856
857
858 final DataOutputStream out = new DataOutputStream(stream);
859 out.writeShort(name.length);
860 out.write(name);
861 out.writeInt(data.length);
862 out.write(data);
863 }
864
865 }
866
867 }