1 package eu.fbk.knowledgestore.data;
2
3 import java.io.File;
4 import java.io.FilterOutputStream;
5 import java.io.IOException;
6 import java.io.InputStream;
7 import java.io.ObjectInputStream;
8 import java.io.ObjectOutputStream;
9 import java.io.OutputStream;
10 import java.io.Serializable;
11 import java.net.URI;
12 import java.util.Collections;
13 import java.util.List;
14 import java.util.Map;
15 import java.util.NoSuchElementException;
16
17 import javax.annotation.Nullable;
18
19 import com.google.common.base.Joiner;
20 import com.google.common.base.Preconditions;
21 import com.google.common.base.Strings;
22 import com.google.common.collect.ImmutableList;
23 import com.google.common.collect.ImmutableMap;
24 import com.google.common.collect.Iterables;
25 import com.google.common.collect.Lists;
26 import com.google.common.collect.Maps;
27
28 import org.apache.hadoop.fs.FileStatus;
29 import org.apache.hadoop.fs.FileSystem;
30 import org.apache.hadoop.fs.Path;
31
32 import eu.fbk.rdfpro.util.IO;
33
34
35
36
37
38
39
40
41
42 public abstract class Dictionary<T extends Serializable> {
43
44 private static final long MAX_CLOCK_SKEW = 60 * 1000;
45
46 private final Class<T> clazz;
47
48 private final String url;
49
50 private volatile List<T> keyToObjectIndex;
51
52 private volatile Map<T, Integer> objectToKeyIndex;
53
54 private long lastAccessed;
55
56 public static <T extends Serializable> Dictionary<T> createLocalDictionary(
57 final Class<T> objectClass, final File file) throws IOException {
58
59
60 Preconditions.checkNotNull(objectClass);
61 Preconditions.checkNotNull(file);
62
63
64 final Dictionary<T> dictionary = new LocalDictionary<T>(objectClass, file.toURI()
65 .toString(), file);
66 dictionary.reload();
67 return dictionary;
68 }
69
70 public static <T extends Serializable> Dictionary<T> createHadoopDictionary(
71 final Class<T> objectClass, final String fileURL) throws IOException {
72
73
74 Preconditions.checkNotNull(objectClass);
75 Preconditions.checkNotNull(fileURL);
76
77
78 final FileSystem fs = FileSystem.get(URI.create(fileURL),
79 new org.apache.hadoop.conf.Configuration(true));
80 final Path path = new Path(URI.create(fileURL).getPath());
81
82
83 String urlBase = fs.getUri().toString();
84 String urlPath = path.toString();
85 if (urlBase.endsWith("/")) {
86 urlBase = urlBase.substring(0, urlBase.length() - 1);
87 }
88 if (!urlPath.startsWith("/")) {
89 urlPath = "/" + urlPath;
90 }
91 final String url = urlBase + urlPath;
92
93
94 final Dictionary<T> dictionary = new HadoopDictionary<T>(objectClass, url, fs, path);
95
96
97 dictionary.reload();
98 return dictionary;
99 }
100
101 Dictionary(final Class<T> objectClass, final String url) {
102 this.clazz = Preconditions.checkNotNull(objectClass);
103 this.url = Preconditions.checkNotNull(url);
104 this.keyToObjectIndex = Lists.newArrayList();
105 this.objectToKeyIndex = Maps.newHashMap();
106 }
107
108 @Nullable
109 abstract Long lastModified(String suffix) throws IOException;
110
111 abstract InputStream read(String suffix) throws IOException;
112
113 abstract OutputStream write(String suffix) throws IOException;
114
115 abstract void delete(String suffix) throws IOException;
116
117 abstract void rename(String oldSuffix, String newSuffix) throws IOException;
118
119 public Class<T> getObjectClass() {
120 return this.clazz;
121 }
122
123 public String getDictionaryURL() {
124 return this.url;
125 }
126
127 public T objectFor(final int key) throws IOException, NoSuchElementException {
128 return this.objectFor(key, true);
129 }
130
131 @Nullable
132 public T objectFor(final int key, final boolean mustExist) throws IOException,
133 NoSuchElementException {
134
135 Preconditions.checkArgument(key > 0, "Non-positive key %d", key);
136
137
138 List<T> index = this.keyToObjectIndex;
139
140 if (key > index.size()) {
141 this.reload();
142 index = this.keyToObjectIndex;
143 }
144
145 if (key <= index.size()) {
146 return index.get(key - 1);
147 } else if (!mustExist) {
148 return null;
149 }
150
151 throw new NoSuchElementException("No object for key " + key);
152 }
153
154 public List<T> objectsFor(final Iterable<? extends Integer> keys, final boolean mustExist)
155 throws IOException, NoSuchElementException {
156
157
158 List<T> index = this.keyToObjectIndex;
159
160 for (final int key : keys) {
161 Preconditions.checkArgument(key > 0, "Non-positive key %d", key);
162 if (key > index.size()) {
163 this.reload();
164 index = this.keyToObjectIndex;
165 break;
166 }
167 }
168
169 final List<T> result = Lists.newArrayListWithCapacity(Iterables.size(keys));
170 List<Integer> missing = null;
171 for (final int key : keys) {
172 if (key <= index.size()) {
173 result.add(index.get(key - 1));
174 } else if (mustExist) {
175 if (missing == null) {
176 missing = Lists.newArrayList();
177 }
178 missing.add(key);
179 }
180 }
181
182 if (missing != null) {
183 throw new NoSuchElementException("No objects for keys "
184 + Joiner.on(", ").join(missing));
185 }
186
187 return result;
188 }
189
190 public Integer keyFor(final T object) throws IOException {
191 final Integer key = this.keyFor(object, true);
192 assert key != null;
193 return key;
194 }
195
196 @Nullable
197 public Integer keyFor(final T object, final boolean mayGenerate) throws IOException {
198
199 Preconditions.checkNotNull(object);
200
201 Integer key = this.objectToKeyIndex.get(object);
202
203 if (key == null && mayGenerate) {
204 this.update(Collections.singletonList(object));
205 key = this.objectToKeyIndex.get(object);
206 }
207
208 return key;
209 }
210
211 public List<Integer> keysFor(final Iterable<? extends T> objects, final boolean mayGenerate)
212 throws IOException {
213
214 Preconditions.checkNotNull(objects);
215
216
217 Map<T, Integer> index = this.objectToKeyIndex;
218
219 final List<Integer> result = Lists.newArrayListWithCapacity(Iterables.size(objects));
220
221 List<T> missingObjects = null;
222 List<Integer> missingOffsets = null;
223
224 for (final T object : objects) {
225 final Integer key = index.get(object);
226 result.add(key);
227 if (key == null) {
228 Preconditions.checkNotNull(object);
229 if (missingOffsets == null) {
230 missingObjects = Lists.newArrayList();
231 missingOffsets = Lists.newArrayList();
232 }
233 assert missingObjects != null;
234 missingObjects.add(object);
235 missingOffsets.add(result.size());
236 }
237 }
238
239 if (missingObjects != null && mayGenerate) {
240 assert missingOffsets != null;
241 this.update(missingObjects);
242 index = this.objectToKeyIndex;
243 for (int i = 0; i < missingObjects.size(); ++i) {
244 final int offset = missingOffsets.get(i);
245 final T object = missingObjects.get(i);
246 final Integer key = this.objectToKeyIndex.get(object);
247 result.set(offset, key);
248 }
249 }
250
251 return result;
252 }
253
254 public <M extends Map<? super Integer, ? super T>> M toMap(@Nullable final M map)
255 throws IOException {
256
257 @SuppressWarnings("unchecked")
258 final M actualMap = map != null ? map : (M) Maps.newHashMap();
259
260 this.reload();
261
262
263 final List<T> index = this.keyToObjectIndex;
264
265 for (int i = 0; i < index.size(); ++i) {
266 actualMap.put(i, index.get(i));
267 }
268 return actualMap;
269 }
270
271 public <L extends List<? super T>> L toList(@Nullable final L list) throws IOException {
272
273 @SuppressWarnings("unchecked")
274 final L actualList = list != null ? list : (L) Lists.newArrayList();
275
276 this.reload();
277
278 actualList.addAll(this.keyToObjectIndex);
279 return actualList;
280 }
281
282 private synchronized void reload() throws IOException {
283
284
285 final Long lastModified = lastModifiedWithBackup();
286 if (lastModified == null || lastModified < this.lastAccessed - Dictionary.MAX_CLOCK_SKEW) {
287 return;
288 }
289
290
291 final ImmutableList.Builder<T> keyToObjectIndexBuilder = ImmutableList.builder();
292 final ImmutableMap.Builder<T, Integer> objectToKeyIndexBuilder = ImmutableMap.builder();
293
294
295 final ObjectInputStream stream = new ObjectInputStream(readWithBackup());
296 assert stream != null;
297
298 T object = null;
299 try {
300 final int size = stream.readInt();
301 for (int key = 1; key <= size; ++key) {
302 object = this.clazz.cast(stream.readObject());
303 keyToObjectIndexBuilder.add(object);
304 objectToKeyIndexBuilder.put(object, key);
305 }
306
307 } catch (final ClassCastException ex) {
308 assert object != null;
309 throw new IOException("Cannot read from " + this.url + ": found "
310 + object.getClass().getName() + ", expected " + this.clazz.getName());
311
312 } catch (final ClassNotFoundException ex) {
313 throw new IOException("Cannot read from " + this.url + ": either the content is "
314 + "malformed, or it encodes data of another dictionary using classes not "
315 + "available in this JVM");
316 } finally {
317 stream.close();
318 }
319
320
321 this.keyToObjectIndex = keyToObjectIndexBuilder.build();
322 this.objectToKeyIndex = objectToKeyIndexBuilder.build();
323
324
325 this.lastAccessed = System.currentTimeMillis();
326 }
327
328 private synchronized void update(final Iterable<T> newObjects) throws IOException {
329
330
331 this.reload();
332
333
334 List<T> keyToObjectIndex = this.keyToObjectIndex;
335 Map<T, Integer> objectToKeyIndex = this.objectToKeyIndex;
336
337
338 final List<T> missing = Lists.newArrayList();
339 for (final T object : newObjects) {
340 if (!objectToKeyIndex.containsKey(object)) {
341 missing.add(object);
342 }
343 }
344 if (missing.isEmpty()) {
345 return;
346 }
347
348
349 keyToObjectIndex = ImmutableList.copyOf(Iterables.concat(keyToObjectIndex, missing));
350
351
352 final ImmutableMap.Builder<T, Integer> builder = ImmutableMap.builder();
353 builder.putAll(objectToKeyIndex);
354 int key = objectToKeyIndex.size();
355 for (final T object : missing) {
356 builder.put(object, ++key);
357 }
358 objectToKeyIndex = builder.build();
359
360
361 final ObjectOutputStream stream = new ObjectOutputStream(writeWithBackup());
362 try {
363 stream.writeInt(keyToObjectIndex.size());
364 for (final T object : keyToObjectIndex) {
365 stream.writeObject(object);
366 }
367 } finally {
368 stream.close();
369 }
370
371
372 this.lastAccessed = System.currentTimeMillis();
373
374
375 this.keyToObjectIndex = keyToObjectIndex;
376 this.objectToKeyIndex = objectToKeyIndex;
377 }
378
379 private InputStream readWithBackup() throws IOException {
380
381
382 IOException exception = null;
383
384 try {
385
386 final InputStream result = read("");
387 if (result != null) {
388 return result;
389 }
390 } catch (final IOException ex) {
391 exception = ex;
392 }
393
394 try {
395
396 final InputStream result = read(".backup");
397 if (result != null) {
398 return result;
399 }
400 } catch (final IOException ex) {
401 if (exception == null) {
402 exception = ex;
403 }
404 }
405
406
407 final boolean fileExists = lastModified("") != null;
408 final boolean backupExists = lastModified(".backup") != null;
409
410
411 if (!fileExists && !backupExists) {
412 return null;
413 }
414
415
416 if (exception == null) {
417 exception = new IOException("Cannot read "
418 + (fileExists ? this.url : this.url + ".backup") + " (file reported to exist)");
419 }
420 throw exception;
421 }
422
423 private OutputStream writeWithBackup() throws IOException {
424
425
426 delete(".new");
427
428
429 if (lastModified("") != null) {
430 delete(".backup");
431 rename("", ".backup");
432 }
433
434
435 return new FilterOutputStream(write(".new")) {
436
437 @Override
438 public void close() throws IOException {
439 super.close();
440 rename(".new", "");
441 }
442
443 };
444 }
445
446 private Long lastModifiedWithBackup() throws IOException {
447
448 Long lastModified = lastModified("");
449 if (lastModified == null) {
450 lastModified = lastModified(".backup");
451 }
452 return lastModified;
453 }
454
455 private static final class LocalDictionary<T extends Serializable> extends Dictionary<T> {
456
457 private final File file;
458
459 LocalDictionary(final Class<T> objectClass, final String url, final File file) {
460 super(objectClass, url);
461 this.file = file;
462 }
463
464 @Override
465 @Nullable
466 Long lastModified(final String suffix) throws IOException {
467 final long modifiedTime = applySuffix(suffix).lastModified();
468 return modifiedTime > 0 ? modifiedTime : null;
469 }
470
471 @Override
472 InputStream read(final String suffix) throws IOException {
473 return IO.read(applySuffix(suffix).getAbsolutePath());
474 }
475
476 @Override
477 OutputStream write(final String suffix) throws IOException {
478 return IO.write(applySuffix(suffix).getAbsolutePath());
479 }
480
481 @Override
482 void delete(final String suffix) throws IOException {
483 applySuffix(suffix).delete();
484 }
485
486 @Override
487 void rename(final String oldSuffix, final String newSuffix) throws IOException {
488 java.nio.file.Files.move(applySuffix(oldSuffix).toPath(), applySuffix(newSuffix)
489 .toPath());
490 }
491
492 private File applySuffix(final String suffix) {
493 return Strings.isNullOrEmpty(suffix) ? this.file : new File(
494 this.file.getAbsolutePath() + suffix);
495 }
496
497 }
498
499 private static final class HadoopDictionary<T extends Serializable> extends Dictionary<T> {
500
501 private final FileSystem fs;
502
503 private final Path path;
504
505 HadoopDictionary(final Class<T> objectClass, final String url, final FileSystem fs,
506 final Path path) {
507 super(objectClass, url);
508 this.fs = fs;
509 this.path = path;
510 }
511
512 @Override
513 @Nullable
514 Long lastModified(final String suffix) throws IOException {
515 final Path path = applySuffix(suffix);
516 try {
517 final FileStatus status = this.fs.getFileStatus(path);
518 if (status != null) {
519 return status.getModificationTime();
520 }
521
522 } catch (final IOException ex) {
523 if (this.fs.exists(path)) {
524 throw ex;
525 }
526 }
527 return null;
528 }
529
530 @Override
531 InputStream read(final String suffix) throws IOException {
532 return this.fs.open(applySuffix(suffix));
533 }
534
535 @Override
536 OutputStream write(final String suffix) throws IOException {
537 return this.fs.create(applySuffix(suffix));
538 }
539
540 @Override
541 void delete(final String suffix) throws IOException {
542 final Path path = applySuffix(suffix);
543 IOException exception = null;
544 try {
545 if (this.fs.delete(path, false)) {
546 return;
547 }
548 } catch (final IOException ex) {
549 exception = ex;
550 }
551 if (this.fs.exists(path)) {
552 throw exception != null ? exception : new IOException("Cannot delete " + path);
553 }
554 }
555
556 @Override
557 void rename(final String oldSuffix, final String newSuffix) throws IOException {
558 if (oldSuffix.equals(newSuffix)) {
559 return;
560 }
561 final Path from = applySuffix(oldSuffix);
562 final Path to = applySuffix(newSuffix);
563 final boolean renamed = this.fs.rename(from, to);
564 if (!renamed) {
565 String message = "Cannot rename " + from + " to " + to;
566 if (this.fs.exists(to)) {
567 message += ": destination already exists";
568 } else if (this.fs.exists(from)) {
569 message += ": source does not exist";
570 }
571 throw new IOException(message);
572 }
573 }
574
575 private Path applySuffix(final String suffix) {
576 return Strings.isNullOrEmpty(suffix) ? this.path : new Path(this.path.getParent()
577 + "/" + this.path.getName() + suffix);
578 }
579
580 }
581
582 }