1 package eu.fbk.knowledgestore.internal;
2
3 import java.io.BufferedInputStream;
4 import java.io.BufferedOutputStream;
5 import java.io.BufferedReader;
6 import java.io.File;
7 import java.io.FileInputStream;
8 import java.io.FileOutputStream;
9 import java.io.IOException;
10 import java.io.InputStream;
11 import java.io.InputStreamReader;
12 import java.io.OutputStream;
13 import java.lang.reflect.Method;
14 import java.util.List;
15 import java.util.Map;
16 import java.util.Set;
17 import java.util.concurrent.Executor;
18
19 import javax.annotation.Nullable;
20
21 import com.google.common.base.Joiner;
22 import com.google.common.base.Preconditions;
23 import com.google.common.collect.ImmutableList;
24 import com.google.common.collect.ImmutableSet;
25 import com.google.common.collect.Lists;
26 import com.google.common.collect.Maps;
27 import com.google.common.io.ByteStreams;
28
29 import org.slf4j.Logger;
30 import org.slf4j.LoggerFactory;
31
32
33
34
35 public final class Compression {
36
37 private static final Logger LOGGER = LoggerFactory.getLogger(Compression.class);
38
39 private static final String FACTORY_CLASS = "org.apache.commons.compress.compressors."
40 + "CompressorStreamFactory";
41
42 private static final String READ_METHOD = "createCompressorInputStream";
43
44 private static final String WRITE_METHOD = "createCompressorOutputStream";
45
46 private static final Map<String, String> CMD_MAP = Maps.newHashMap();
47
48 public static final Compression NONE = new Compression("NONE", ImmutableList.<String>of(),
49 ImmutableList.<String>of(), null, null, null, null);
50
51 public static final Compression GZIP = new Compression("GZIP", ImmutableList.of(
52 "application/gzip", "application/x-gzip"), ImmutableList.of("gz"), "%s -dc %s",
53 "%s -dc", "sh -c '%s -9c > %s'", "%s -9c");
54
55 public static final Compression BZIP2 = new Compression("BZIP2", ImmutableList.of(
56 "application/bzip2", "application/x-bzip2"), ImmutableList.of("bz2"), "%s -dkc %s",
57 "%s -dc", "sh -c '%s -9c > %s'", "%s -9c");
58
59 public static final Compression XZ = new Compression("XZ",
60 ImmutableList.of("application/x-xz"), ImmutableList.of("xz"), "%s -dkc %s", "%s -dc",
61 "sh -c '%s -9c > %s'", "%s -9c");
62
63 private static Set<Compression> register = ImmutableSet.of(NONE, GZIP, BZIP2, XZ);
64
65 private final String name;
66
67 private final List<String> mimeTypes;
68
69 private final List<String> fileExtensions;
70
71 @Nullable
72 private final String readFileCmd;
73
74 @Nullable
75 private final String readPipeCmd;
76
77 @Nullable
78 private final String writeFileCmd;
79
80 @Nullable
81 private final String writePipeCmd;
82
83 public Compression(final String name, final Iterable<? extends String> mimeTypes,
84 final Iterable<? extends String> fileExtensions, @Nullable final String readFileCmd,
85 @Nullable final String readPipeCmd, @Nullable final String writeFileCmd,
86 @Nullable final String writePipeCmd) {
87 this.name = Preconditions.checkNotNull(name);
88 this.mimeTypes = ImmutableList.copyOf(mimeTypes);
89 this.fileExtensions = ImmutableList.copyOf(fileExtensions);
90 this.readFileCmd = readFileCmd;
91 this.readPipeCmd = readPipeCmd;
92 this.writeFileCmd = writeFileCmd;
93 this.writePipeCmd = writePipeCmd;
94 }
95
96
97
98
99
100
101 public String getName() {
102 return this.name;
103 }
104
105
106
107
108
109
110
111 public List<String> getMIMETypes() {
112 return this.mimeTypes;
113 }
114
115
116
117
118
119
120
121 public List<String> getFileExtensions() {
122 return this.fileExtensions;
123 }
124
125 public InputStream read(@Nullable final Executor executor, final File file) throws IOException {
126
127 Preconditions.checkNotNull(file);
128 if (!file.exists()) {
129 throw new IllegalArgumentException("File " + file + " does not exist");
130 }
131
132 if (this == NONE) {
133 return new BufferedInputStream(new FileInputStream(file));
134 }
135
136 if (this.readFileCmd != null && executor != null) {
137 final String command = String.format(this.readFileCmd,
138 quote(lookupProgram(this.name)), quote(file.getAbsolutePath()));
139 try {
140 final Process process = Runtime.getRuntime().exec(tokenize(command));
141 logStream(executor, this.name, process.getErrorStream(), true);
142 return wrap(process.getInputStream(), process);
143 } catch (final Throwable ex) {
144 invalidateProgram(this.name);
145 LOGGER.debug("Failed to run: ", command);
146 }
147 }
148
149 InputStream stream = null;
150 try {
151 final Class<?> clazz = Class.forName(FACTORY_CLASS);
152 final Method method = clazz.getMethod(READ_METHOD, String.class, InputStream.class);
153 final Object factory = clazz.newInstance();
154 stream = new FileInputStream(file);
155 return (InputStream) method.invoke(factory, this.name, stream);
156 } catch (final Throwable ex) {
157 Util.closeQuietly(stream);
158 if (ex instanceof IOException) {
159 throw (IOException) ex;
160 }
161 }
162
163 throw new IllegalArgumentException("Cannot decompress " + this + " file " + file);
164 }
165
166 public InputStream read(@Nullable final Executor executor, final InputStream stream)
167 throws IOException {
168
169 Preconditions.checkNotNull(stream);
170
171 if (this == NONE) {
172 return stream;
173 }
174
175 if (this.readPipeCmd != null && executor != null) {
176 final String command = String
177 .format(this.readPipeCmd, quote(lookupProgram(this.name)));
178 try {
179 final Process process = Runtime.getRuntime().exec(tokenize(command));
180 copyStream(executor, stream, process.getOutputStream());
181 logStream(executor, this.name, process.getErrorStream(), true);
182 return wrap(process.getInputStream(), process);
183 } catch (final Throwable ex) {
184 invalidateProgram(this.name);
185 LOGGER.debug("Failed to run: ", command);
186 }
187 }
188
189 try {
190 final Class<?> clazz = Class.forName(FACTORY_CLASS);
191 final Method method = clazz.getMethod(READ_METHOD, String.class, InputStream.class);
192 final Object factory = clazz.newInstance();
193 return (InputStream) method.invoke(factory, this.name, stream);
194 } catch (final Throwable ex) {
195 if (ex instanceof IOException) {
196 throw (IOException) ex;
197 }
198 }
199
200 throw new IllegalArgumentException("Cannot decompress " + this + " stream");
201 }
202
203 public OutputStream write(@Nullable final Executor executor, final File file)
204 throws IOException {
205
206 Preconditions.checkNotNull(file);
207
208 if (this == NONE) {
209 return new BufferedOutputStream(new FileOutputStream(file));
210 }
211
212 if (this.writeFileCmd != null && executor != null) {
213 final String command = String.format(this.writeFileCmd,
214 quote(lookupProgram(this.name)), quote(file.getAbsolutePath()));
215 try {
216 final Process process = Runtime.getRuntime().exec(tokenize(command));
217 logStream(executor, this.name, process.getInputStream(), false);
218 logStream(executor, this.name, process.getErrorStream(), true);
219 return wrap(process.getOutputStream(), process);
220 } catch (final Throwable ex) {
221 invalidateProgram(this.name);
222 LOGGER.debug("Failed to run: ", command);
223 }
224 }
225
226 OutputStream stream = null;
227 try {
228 final Class<?> clazz = Class.forName(FACTORY_CLASS);
229 final Method method = clazz.getMethod(WRITE_METHOD, String.class, OutputStream.class);
230 final Object factory = clazz.newInstance();
231 stream = new FileOutputStream(file);
232 return (OutputStream) method.invoke(factory, this.name, stream);
233 } catch (final Throwable ex) {
234 Util.closeQuietly(stream);
235 if (ex instanceof IOException) {
236 throw (IOException) ex;
237 }
238 }
239
240 throw new IllegalArgumentException("Cannot compress " + this + " file " + file);
241 }
242
243 public OutputStream write(@Nullable final Executor executor, final OutputStream stream)
244 throws IOException {
245
246 Preconditions.checkNotNull(stream);
247
248 if (this == NONE) {
249 return stream;
250 }
251
252 if (this.writePipeCmd != null && executor != null) {
253 final String command = String.format(this.writePipeCmd,
254 quote(lookupProgram(this.name)));
255 try {
256 final Process process = Runtime.getRuntime().exec(tokenize(command));
257 copyStream(executor, process.getInputStream(), stream);
258 logStream(executor, this.name, process.getErrorStream(), true);
259 return wrap(process.getOutputStream(), process);
260 } catch (final Throwable ex) {
261 invalidateProgram(this.name);
262 LOGGER.debug("Failed to run: ", command);
263 }
264 }
265
266 try {
267 final Class<?> clazz = Class.forName(FACTORY_CLASS);
268 final Method method = clazz.getMethod(WRITE_METHOD, String.class, OutputStream.class);
269 final Object factory = clazz.newInstance();
270 return (OutputStream) method.invoke(factory, this.name, stream);
271 } catch (final Throwable ex) {
272 if (ex instanceof IOException) {
273 throw (IOException) ex;
274 }
275 }
276
277 throw new IllegalArgumentException("Cannot compress " + this + " stream");
278 }
279
280 @Override
281 public boolean equals(final Object object) {
282 if (object == this) {
283 return true;
284 }
285 if (!(object instanceof Compression)) {
286 return false;
287 }
288 final Compression other = (Compression) object;
289 return this.name.equals(other.name);
290 }
291
292 @Override
293 public int hashCode() {
294 return this.name.hashCode();
295 }
296
297 @Override
298 public String toString() {
299 final StringBuilder builder = new StringBuilder(64);
300 builder.append(this.name);
301 builder.append(" (mimeTypes=");
302 Joiner.on(", ").appendTo(builder, this.mimeTypes);
303 builder.append("; ext=");
304 Joiner.on(", ").appendTo(builder, this.fileExtensions);
305 builder.append(")");
306 return builder.toString();
307 }
308
309
310
311
312
313
314
315
316
317
318
319
320 @Nullable
321 public static Compression forFileName(final String fileName,
322 @Nullable final Compression fallback) {
323 final int index = fileName.lastIndexOf('.');
324 final String extension = (index < 0 ? fileName : fileName.substring(index + 1))
325 .toLowerCase();
326 for (final Compression compression : register) {
327 final List<String> extensions = compression.fileExtensions;
328 if (!extensions.isEmpty() && extensions.get(0).equals(extension)) {
329 return compression;
330 }
331 }
332 for (final Compression compression : register) {
333 if (compression.fileExtensions.contains(extension)) {
334 return compression;
335 }
336 }
337 return fallback;
338 }
339
340
341
342
343
344
345
346
347
348
349
350
351 @Nullable
352 public static Compression forMIMEType(final String mimeType,
353 @Nullable final Compression fallback) {
354 final String actualMimeType = mimeType.toLowerCase();
355 for (final Compression compression : register) {
356 final List<String> mimeTypes = compression.mimeTypes;
357 if (!mimeTypes.isEmpty() && mimeTypes.get(0).equals(actualMimeType)) {
358 return compression;
359 }
360 }
361 for (final Compression compression : register) {
362 if (compression.mimeTypes.contains(actualMimeType)) {
363 return compression;
364 }
365 }
366 return fallback;
367 }
368
369
370
371
372
373
374
375
376 @Nullable
377 public static Compression valueOf(final String name) {
378 final String actualName = name.trim().toUpperCase();
379 for (final Compression compression : register) {
380 if (compression.name.equals(actualName)) {
381 return compression;
382 }
383 }
384 Preconditions.checkNotNull(name);
385 return null;
386 }
387
388 public static Set<Compression> values() {
389 return register;
390 }
391
392 public synchronized static void register(final Compression compression) {
393 Preconditions.checkNotNull(compression);
394 final List<Compression> newRegister = Lists.newArrayList(register);
395 newRegister.add(compression);
396 register = ImmutableSet.copyOf(newRegister);
397 }
398
399 private static String lookupProgram(final String name) {
400 synchronized (CMD_MAP) {
401 String cmd = CMD_MAP.get(name);
402 if (cmd == null) {
403 cmd = System.getenv(name.toUpperCase() + "_CMD");
404 if (cmd != null) {
405 LOGGER.info("Using '{}' for '{}'", cmd, name);
406 } else {
407 cmd = name;
408 }
409 CMD_MAP.put(name, cmd);
410 }
411 return cmd;
412 }
413 }
414
415 private static void invalidateProgram(final String name) {
416 synchronized (CMD_MAP) {
417 CMD_MAP.put(name, null);
418 }
419 }
420
421 private static String[] tokenize(final String command) {
422 final List<String> tokens = Lists.newArrayList();
423 final int length = command.length();
424 boolean escape = false;
425 char quote = 0;
426 int start = -1;
427 for (int i = 0; i < length; ++i) {
428 final char ch = command.charAt(i);
429 if (escape) {
430 escape = false;
431 } else if (ch == '\\') {
432 escape = true;
433 } else if (quote != 0) {
434 if (ch == quote) {
435 tokens.add(command.substring(start, i));
436 start = -1;
437 quote = 0;
438 }
439 } else if (start == -1) {
440 if (ch == '\'' || ch == '"') {
441 start = i + 1;
442 quote = ch;
443 } else if (!Character.isWhitespace(ch)) {
444 start = i;
445 }
446 } else if (Character.isWhitespace(ch)) {
447 tokens.add(command.substring(start, i));
448 start = -1;
449 }
450 }
451 if (quote == 0 && start >= 0) {
452 tokens.add(command.substring(start));
453 }
454 return tokens.toArray(new String[tokens.size()]);
455 }
456
457 private static String quote(final String string) {
458 return '"' + string + '"';
459 }
460
461 private static void copyStream(final Executor executor, final InputStream in,
462 final OutputStream out) {
463 executor.execute(new Runnable() {
464
465 @Override
466 public void run() {
467 try {
468 ByteStreams.copy(in, out);
469 } catch (final IOException ex) {
470 LOGGER.error("Stream copy failed", ex);
471 }
472 }
473
474 });
475 }
476
477 private static void logStream(final Executor executor, final String name,
478 final InputStream stream, final boolean error) {
479 executor.execute(new Runnable() {
480
481 @Override
482 public void run() {
483 try {
484 final BufferedReader in = new BufferedReader(new InputStreamReader(stream));
485 String line;
486 while ((line = in.readLine()) != null) {
487 final String message = "[" + name + "] " + line;
488 if (error) {
489 LOGGER.error(message);
490 } else {
491 LOGGER.debug(message);
492 }
493 }
494 } catch (final Throwable ex) {
495
496 }
497 }
498
499 });
500 }
501
502 private static InputStream wrap(final InputStream stream, final Process process) {
503 final Thread destroyHook = new DestroyHook(process);
504 Runtime.getRuntime().addShutdownHook(destroyHook);
505 return new BufferedInputStream(stream) {
506
507 @Override
508 public void close() throws IOException {
509 try {
510 super.close();
511 } finally {
512 process.destroy();
513 Runtime.getRuntime().removeShutdownHook(destroyHook);
514 }
515 }
516
517 };
518 }
519
520 private static OutputStream wrap(final OutputStream stream, final Process process) {
521 final Thread destroyHook = new DestroyHook(process);
522 Runtime.getRuntime().addShutdownHook(destroyHook);
523 return new BufferedOutputStream(stream) {
524
525 @Override
526 public void close() throws IOException {
527 try {
528 super.close();
529 } finally {
530 process.destroy();
531 Runtime.getRuntime().removeShutdownHook(destroyHook);
532 }
533 }
534
535 };
536 }
537
538 private static class DestroyHook extends Thread {
539
540 private final Process process;
541
542 DestroyHook(final Process process) {
543 this.process = Preconditions.checkNotNull(process);
544 }
545
546 @Override
547 public void run() {
548 this.process.destroy();
549 }
550
551 }
552
553 }