1   package eu.fbk.knowledgestore.internal;
2   
3   import java.io.BufferedInputStream;
4   import java.io.BufferedOutputStream;
5   import java.io.BufferedReader;
6   import java.io.File;
7   import java.io.FileInputStream;
8   import java.io.FileOutputStream;
9   import java.io.IOException;
10  import java.io.InputStream;
11  import java.io.InputStreamReader;
12  import java.io.OutputStream;
13  import java.lang.reflect.Method;
14  import java.util.List;
15  import java.util.Map;
16  import java.util.Set;
17  import java.util.concurrent.Executor;
18  
19  import javax.annotation.Nullable;
20  
21  import com.google.common.base.Joiner;
22  import com.google.common.base.Preconditions;
23  import com.google.common.collect.ImmutableList;
24  import com.google.common.collect.ImmutableSet;
25  import com.google.common.collect.Lists;
26  import com.google.common.collect.Maps;
27  import com.google.common.io.ByteStreams;
28  
29  import org.slf4j.Logger;
30  import org.slf4j.LoggerFactory;
31  
32  // WARNING: on windows, if Java app is killed without shutdown hooks running, external processes
33  // launched from this class may not be terminated.
34  
35  public final class Compression {
36  
37      private static final Logger LOGGER = LoggerFactory.getLogger(Compression.class);
38  
39      private static final String FACTORY_CLASS = "org.apache.commons.compress.compressors."
40              + "CompressorStreamFactory";
41  
42      private static final String READ_METHOD = "createCompressorInputStream";
43  
44      private static final String WRITE_METHOD = "createCompressorOutputStream";
45  
46      private static final Map<String, String> CMD_MAP = Maps.newHashMap();
47  
48      public static final Compression NONE = new Compression("NONE", ImmutableList.<String>of(),
49              ImmutableList.<String>of(), null, null, null, null);
50  
51      public static final Compression GZIP = new Compression("GZIP", ImmutableList.of(
52              "application/gzip", "application/x-gzip"), ImmutableList.of("gz"), "%s -dc %s",
53              "%s -dc", "sh -c '%s -9c > %s'", "%s -9c");
54  
55      public static final Compression BZIP2 = new Compression("BZIP2", ImmutableList.of(
56              "application/bzip2", "application/x-bzip2"), ImmutableList.of("bz2"), "%s -dkc %s",
57              "%s -dc", "sh -c '%s -9c > %s'", "%s -9c");
58  
59      public static final Compression XZ = new Compression("XZ",
60              ImmutableList.of("application/x-xz"), ImmutableList.of("xz"), "%s -dkc %s", "%s -dc",
61              "sh -c '%s -9c > %s'", "%s -9c");
62  
63      private static Set<Compression> register = ImmutableSet.of(NONE, GZIP, BZIP2, XZ);
64  
65      private final String name;
66  
67      private final List<String> mimeTypes;
68  
69      private final List<String> fileExtensions;
70  
71      @Nullable
72      private final String readFileCmd;
73  
74      @Nullable
75      private final String readPipeCmd;
76  
77      @Nullable
78      private final String writeFileCmd;
79  
80      @Nullable
81      private final String writePipeCmd;
82  
83      public Compression(final String name, final Iterable<? extends String> mimeTypes,
84              final Iterable<? extends String> fileExtensions, @Nullable final String readFileCmd,
85              @Nullable final String readPipeCmd, @Nullable final String writeFileCmd,
86              @Nullable final String writePipeCmd) {
87          this.name = Preconditions.checkNotNull(name);
88          this.mimeTypes = ImmutableList.copyOf(mimeTypes);
89          this.fileExtensions = ImmutableList.copyOf(fileExtensions);
90          this.readFileCmd = readFileCmd;
91          this.readPipeCmd = readPipeCmd;
92          this.writeFileCmd = writeFileCmd;
93          this.writePipeCmd = writePipeCmd;
94      }
95  
96      /**
97       * Returns the name of this compression format.
98       * 
99       * @return a human readable name
100      */
101     public String getName() {
102         return this.name;
103     }
104 
105     /**
106      * Returns all the MIME types associated to this compression format, ranked in preference
107      * order.
108      * 
109      * @return an immutable list of MIME formats, not empty
110      */
111     public List<String> getMIMETypes() {
112         return this.mimeTypes;
113     }
114 
115     /**
116      * Returns all the file extensions associated to this compression format, ranked in preference
117      * order.
118      * 
119      * @return an immutable list of file extension, not empty
120      */
121     public List<String> getFileExtensions() {
122         return this.fileExtensions;
123     }
124 
125     public InputStream read(@Nullable final Executor executor, final File file) throws IOException {
126 
127         Preconditions.checkNotNull(file);
128         if (!file.exists()) {
129             throw new IllegalArgumentException("File " + file + " does not exist");
130         }
131 
132         if (this == NONE) {
133             return new BufferedInputStream(new FileInputStream(file));
134         }
135 
136         if (this.readFileCmd != null && executor != null) {
137             final String command = String.format(this.readFileCmd,
138                     quote(lookupProgram(this.name)), quote(file.getAbsolutePath()));
139             try {
140                 final Process process = Runtime.getRuntime().exec(tokenize(command));
141                 logStream(executor, this.name, process.getErrorStream(), true);
142                 return wrap(process.getInputStream(), process);
143             } catch (final Throwable ex) {
144                 invalidateProgram(this.name);
145                 LOGGER.debug("Failed to run: ", command);
146             }
147         }
148 
149         InputStream stream = null;
150         try {
151             final Class<?> clazz = Class.forName(FACTORY_CLASS);
152             final Method method = clazz.getMethod(READ_METHOD, String.class, InputStream.class);
153             final Object factory = clazz.newInstance();
154             stream = new FileInputStream(file);
155             return (InputStream) method.invoke(factory, this.name, stream);
156         } catch (final Throwable ex) {
157             Util.closeQuietly(stream);
158             if (ex instanceof IOException) {
159                 throw (IOException) ex;
160             }
161         }
162 
163         throw new IllegalArgumentException("Cannot decompress " + this + " file " + file);
164     }
165 
166     public InputStream read(@Nullable final Executor executor, final InputStream stream)
167             throws IOException {
168 
169         Preconditions.checkNotNull(stream);
170 
171         if (this == NONE) {
172             return stream;
173         }
174 
175         if (this.readPipeCmd != null && executor != null) {
176             final String command = String
177                     .format(this.readPipeCmd, quote(lookupProgram(this.name)));
178             try {
179                 final Process process = Runtime.getRuntime().exec(tokenize(command));
180                 copyStream(executor, stream, process.getOutputStream());
181                 logStream(executor, this.name, process.getErrorStream(), true);
182                 return wrap(process.getInputStream(), process);
183             } catch (final Throwable ex) {
184                 invalidateProgram(this.name);
185                 LOGGER.debug("Failed to run: ", command);
186             }
187         }
188 
189         try {
190             final Class<?> clazz = Class.forName(FACTORY_CLASS);
191             final Method method = clazz.getMethod(READ_METHOD, String.class, InputStream.class);
192             final Object factory = clazz.newInstance();
193             return (InputStream) method.invoke(factory, this.name, stream);
194         } catch (final Throwable ex) {
195             if (ex instanceof IOException) {
196                 throw (IOException) ex;
197             }
198         }
199 
200         throw new IllegalArgumentException("Cannot decompress " + this + " stream");
201     }
202 
203     public OutputStream write(@Nullable final Executor executor, final File file)
204             throws IOException {
205 
206         Preconditions.checkNotNull(file);
207 
208         if (this == NONE) {
209             return new BufferedOutputStream(new FileOutputStream(file));
210         }
211 
212         if (this.writeFileCmd != null && executor != null) {
213             final String command = String.format(this.writeFileCmd,
214                     quote(lookupProgram(this.name)), quote(file.getAbsolutePath()));
215             try {
216                 final Process process = Runtime.getRuntime().exec(tokenize(command));
217                 logStream(executor, this.name, process.getInputStream(), false);
218                 logStream(executor, this.name, process.getErrorStream(), true);
219                 return wrap(process.getOutputStream(), process);
220             } catch (final Throwable ex) {
221                 invalidateProgram(this.name);
222                 LOGGER.debug("Failed to run: ", command);
223             }
224         }
225 
226         OutputStream stream = null;
227         try {
228             final Class<?> clazz = Class.forName(FACTORY_CLASS);
229             final Method method = clazz.getMethod(WRITE_METHOD, String.class, OutputStream.class);
230             final Object factory = clazz.newInstance();
231             stream = new FileOutputStream(file);
232             return (OutputStream) method.invoke(factory, this.name, stream);
233         } catch (final Throwable ex) {
234             Util.closeQuietly(stream);
235             if (ex instanceof IOException) {
236                 throw (IOException) ex;
237             }
238         }
239 
240         throw new IllegalArgumentException("Cannot compress " + this + " file " + file);
241     }
242 
243     public OutputStream write(@Nullable final Executor executor, final OutputStream stream)
244             throws IOException {
245 
246         Preconditions.checkNotNull(stream);
247 
248         if (this == NONE) {
249             return stream;
250         }
251 
252         if (this.writePipeCmd != null && executor != null) {
253             final String command = String.format(this.writePipeCmd,
254                     quote(lookupProgram(this.name)));
255             try {
256                 final Process process = Runtime.getRuntime().exec(tokenize(command));
257                 copyStream(executor, process.getInputStream(), stream);
258                 logStream(executor, this.name, process.getErrorStream(), true);
259                 return wrap(process.getOutputStream(), process);
260             } catch (final Throwable ex) {
261                 invalidateProgram(this.name);
262                 LOGGER.debug("Failed to run: ", command);
263             }
264         }
265 
266         try {
267             final Class<?> clazz = Class.forName(FACTORY_CLASS);
268             final Method method = clazz.getMethod(WRITE_METHOD, String.class, OutputStream.class);
269             final Object factory = clazz.newInstance();
270             return (OutputStream) method.invoke(factory, this.name, stream);
271         } catch (final Throwable ex) {
272             if (ex instanceof IOException) {
273                 throw (IOException) ex;
274             }
275         }
276 
277         throw new IllegalArgumentException("Cannot compress " + this + " stream");
278     }
279 
280     @Override
281     public boolean equals(final Object object) {
282         if (object == this) {
283             return true;
284         }
285         if (!(object instanceof Compression)) {
286             return false;
287         }
288         final Compression other = (Compression) object;
289         return this.name.equals(other.name);
290     }
291 
292     @Override
293     public int hashCode() {
294         return this.name.hashCode();
295     }
296 
297     @Override
298     public String toString() {
299         final StringBuilder builder = new StringBuilder(64);
300         builder.append(this.name);
301         builder.append(" (mimeTypes=");
302         Joiner.on(", ").appendTo(builder, this.mimeTypes);
303         builder.append("; ext=");
304         Joiner.on(", ").appendTo(builder, this.fileExtensions);
305         builder.append(")");
306         return builder.toString();
307     }
308 
309     /**
310      * Returns the registered {@code Compression} format matching the extension in the file name
311      * supplied, or the {@code fallback} specified if none matches.
312      * 
313      * @param fileName
314      *            the file name, not null
315      * @param fallback
316      *            the {@code Compression} to return if none matches
317      * @return the matching {@code Compression}, or the {@code fallback} one if none of the
318      *         registered {@code Compression}s matches
319      */
320     @Nullable
321     public static Compression forFileName(final String fileName,
322             @Nullable final Compression fallback) {
323         final int index = fileName.lastIndexOf('.');
324         final String extension = (index < 0 ? fileName : fileName.substring(index + 1))
325                 .toLowerCase();
326         for (final Compression compression : register) {
327             final List<String> extensions = compression.fileExtensions;
328             if (!extensions.isEmpty() && extensions.get(0).equals(extension)) {
329                 return compression;
330             }
331         }
332         for (final Compression compression : register) {
333             if (compression.fileExtensions.contains(extension)) {
334                 return compression;
335             }
336         }
337         return fallback;
338     }
339 
340     /**
341      * Returns the registered {@code Compression} format matching the MIME type specified, or the
342      * {@code fallback} specified if none matches.
343      * 
344      * @param mimeType
345      *            the mime type, not null
346      * @param fallback
347      *            the {@code Compression} to return if none matches
348      * @return the matching {@code Compression}, or the {@code fallback} one if none of the
349      *         registered {@code Compression}s matches
350      */
351     @Nullable
352     public static Compression forMIMEType(final String mimeType,
353             @Nullable final Compression fallback) {
354         final String actualMimeType = mimeType.toLowerCase();
355         for (final Compression compression : register) {
356             final List<String> mimeTypes = compression.mimeTypes;
357             if (!mimeTypes.isEmpty() && mimeTypes.get(0).equals(actualMimeType)) {
358                 return compression;
359             }
360         }
361         for (final Compression compression : register) {
362             if (compression.mimeTypes.contains(actualMimeType)) {
363                 return compression;
364             }
365         }
366         return fallback;
367     }
368 
369     /**
370      * Returns the {@code Compression} with the name specified. Matching is case-insensitive.
371      * 
372      * @param name
373      *            the {@code Compression} name.
374      * @return the {@code Compression} for the name specified, or null if none matches
375      */
376     @Nullable
377     public static Compression valueOf(final String name) {
378         final String actualName = name.trim().toUpperCase();
379         for (final Compression compression : register) {
380             if (compression.name.equals(actualName)) {
381                 return compression;
382             }
383         }
384         Preconditions.checkNotNull(name);
385         return null;
386     }
387 
388     public static Set<Compression> values() {
389         return register;
390     }
391 
392     public synchronized static void register(final Compression compression) {
393         Preconditions.checkNotNull(compression);
394         final List<Compression> newRegister = Lists.newArrayList(register);
395         newRegister.add(compression);
396         register = ImmutableSet.copyOf(newRegister);
397     }
398 
399     private static String lookupProgram(final String name) {
400         synchronized (CMD_MAP) {
401             String cmd = CMD_MAP.get(name);
402             if (cmd == null) {
403                 cmd = System.getenv(name.toUpperCase() + "_CMD");
404                 if (cmd != null) {
405                     LOGGER.info("Using '{}' for '{}'", cmd, name);
406                 } else {
407                     cmd = name;
408                 }
409                 CMD_MAP.put(name, cmd);
410             }
411             return cmd;
412         }
413     }
414 
415     private static void invalidateProgram(final String name) {
416         synchronized (CMD_MAP) {
417             CMD_MAP.put(name, null);
418         }
419     }
420 
421     private static String[] tokenize(final String command) {
422         final List<String> tokens = Lists.newArrayList();
423         final int length = command.length();
424         boolean escape = false;
425         char quote = 0;
426         int start = -1;
427         for (int i = 0; i < length; ++i) {
428             final char ch = command.charAt(i);
429             if (escape) {
430                 escape = false;
431             } else if (ch == '\\') {
432                 escape = true;
433             } else if (quote != 0) {
434                 if (ch == quote) {
435                     tokens.add(command.substring(start, i));
436                     start = -1;
437                     quote = 0;
438                 }
439             } else if (start == -1) {
440                 if (ch == '\'' || ch == '"') {
441                     start = i + 1;
442                     quote = ch;
443                 } else if (!Character.isWhitespace(ch)) {
444                     start = i;
445                 }
446             } else if (Character.isWhitespace(ch)) {
447                 tokens.add(command.substring(start, i));
448                 start = -1;
449             }
450         }
451         if (quote == 0 && start >= 0) {
452             tokens.add(command.substring(start));
453         }
454         return tokens.toArray(new String[tokens.size()]);
455     }
456 
457     private static String quote(final String string) {
458         return '"' + string + '"';
459     }
460 
461     private static void copyStream(final Executor executor, final InputStream in,
462             final OutputStream out) {
463         executor.execute(new Runnable() {
464 
465             @Override
466             public void run() {
467                 try {
468                     ByteStreams.copy(in, out);
469                 } catch (final IOException ex) {
470                     LOGGER.error("Stream copy failed", ex);
471                 }
472             }
473 
474         });
475     }
476 
477     private static void logStream(final Executor executor, final String name,
478             final InputStream stream, final boolean error) {
479         executor.execute(new Runnable() {
480 
481             @Override
482             public void run() {
483                 try {
484                     final BufferedReader in = new BufferedReader(new InputStreamReader(stream));
485                     String line;
486                     while ((line = in.readLine()) != null) {
487                         final String message = "[" + name + "] " + line;
488                         if (error) {
489                             LOGGER.error(message);
490                         } else {
491                             LOGGER.debug(message);
492                         }
493                     }
494                 } catch (final Throwable ex) {
495                     // ignore
496                 }
497             }
498 
499         });
500     }
501 
502     private static InputStream wrap(final InputStream stream, final Process process) {
503         final Thread destroyHook = new DestroyHook(process);
504         Runtime.getRuntime().addShutdownHook(destroyHook);
505         return new BufferedInputStream(stream) {
506 
507             @Override
508             public void close() throws IOException {
509                 try {
510                     super.close();
511                 } finally {
512                     process.destroy();
513                     Runtime.getRuntime().removeShutdownHook(destroyHook);
514                 }
515             }
516 
517         };
518     }
519 
520     private static OutputStream wrap(final OutputStream stream, final Process process) {
521         final Thread destroyHook = new DestroyHook(process);
522         Runtime.getRuntime().addShutdownHook(destroyHook);
523         return new BufferedOutputStream(stream) {
524 
525             @Override
526             public void close() throws IOException {
527                 try {
528                     super.close();
529                 } finally {
530                     process.destroy();
531                     Runtime.getRuntime().removeShutdownHook(destroyHook);
532                 }
533             }
534 
535         };
536     }
537 
538     private static class DestroyHook extends Thread {
539 
540         private final Process process;
541 
542         DestroyHook(final Process process) {
543             this.process = Preconditions.checkNotNull(process);
544         }
545 
546         @Override
547         public void run() {
548             this.process.destroy();
549         }
550 
551     }
552 
553 }