1   package eu.fbk.knowledgestore.runtime;
2   
3   import java.io.File;
4   import java.io.IOException;
5   import java.net.URI;
6   import java.util.Map;
7   
8   import javax.annotation.Nullable;
9   
10  import org.apache.hadoop.fs.*;
11  import org.slf4j.LoggerFactory;
12  
13  /**
14   * Utility methods for dealing with files using the Hadoop API.
15   */
16  public final class Files {
17  
18      private static FileSystem rawLocalFileSystem = null;
19  
20      public static FileSystem getFileSystem(final String url, final Map<String, String> properties)
21              throws IOException {
22  
23          final URI uri = URI.create(url.replace('\\', '/'));
24  
25          org.apache.hadoop.conf.Configuration conf;
26          conf = new org.apache.hadoop.conf.Configuration(true);
27          conf.set("fs.default.name", uri.toString());
28          for (final Map.Entry<String, String> entry : properties.entrySet()) {
29              final String name = entry.getKey();
30              final String value = entry.getValue();
31              conf.set(name, value);
32          }
33  
34  		FileSystem fs;
35  
36  		if (url.startsWith("file://")) {
37  			fs = getRawLocalFileSystem();
38  		}
39  		else {
40  			fs = FileSystem.get(conf);
41  		}
42  
43          fs.setWorkingDirectory(new Path(uri.getPath()));
44          return fs;
45      }
46  
47      public static FileSystem getRawLocalFileSystem() {
48  
49          synchronized (Files.class) {
50              if (Files.rawLocalFileSystem == null) {
51                  LoggerFactory.getLogger(Files.class).debug(
52                          "You can safely ignore the following reported IOException - it's the way "
53                                  + "Hadoop people report where the Configuration is initialized )");
54                  final org.apache.hadoop.conf.Configuration conf;
55                  conf = new org.apache.hadoop.conf.Configuration(true);
56                  Files.rawLocalFileSystem = new RawLocalFileSystem();
57                  try {
58                      Files.rawLocalFileSystem.initialize(//
59                              new File(System.getProperty("user.dir")).toURI(), //
60                              conf);
61                  } catch (final IOException ex) {
62                      throw new Error("Failed to initialize local raw filesystem (!)", ex);
63                  }
64              }
65              return Files.rawLocalFileSystem;
66          }
67      }
68  
69      @Nullable
70      public static FSDataInputStream readWithBackup(final FileSystem fs, final Path path)
71              throws IOException {
72  
73          // we keep track of filesystem exceptions (but it's unclear when they are thrown)
74          IOException exception = null;
75  
76          try {
77              // 1. try to read the requested file
78              final FSDataInputStream result = fs.open(path);
79              if (result != null) {
80                  return result;
81              }
82          } catch (final IOException ex) {
83              exception = ex;
84          }
85  
86          final Path backupPath = new Path(path.getParent() + "/." + path.getName() + ".backup");
87          try {
88              // 2. on failure, try to read its backup
89              final FSDataInputStream result = fs.open(backupPath);
90              if (result != null) {
91                  return result;
92              }
93          } catch (final IOException ex) {
94              if (exception == null) {
95                  exception = ex;
96              }
97          }
98  
99          // 3. only on failure check whether the two files exist
100         final boolean fileExists = fs.exists(path);
101         final boolean backupExists = fs.exists(backupPath);
102 
103         // 4. if they don't exist it's ok, just report this returning null
104         if (!fileExists && !backupExists) {
105             return null;
106         }
107 
108         // 5. otherwise we throw an exception (possibly the ones got before)
109         if (exception == null) {
110             exception = new IOException("Cannot read " + (fileExists ? path : backupExists)
111                     + " (file reported to exist)");
112         }
113         throw exception;
114     }
115 
116     public static FSDataOutputStream writeWithBackup(final FileSystem fs, final Path path)
117             throws IOException {
118 
119         // compute paths of new and backup files
120         final Path newPath = new Path(path.getParent() + "/." + path.getName() + ".new");
121         final Path backupPath = new Path(path.getParent() + "/." + path.getName() + ".backup");
122 
123         // 1. delete filename.new if it exists
124         Files.delete(fs, newPath);
125 
126         // 2. if filename exists, rename it to filename.backup (deleting old backup)
127         if (fs.exists(path)) {
128             Files.delete(fs, backupPath);
129             Files.rename(fs, path, backupPath);
130         }
131 
132         // 3. create filename.new, returning a stream for writing its content
133         return new FSDataOutputStream(fs.create(newPath), null) {
134 
135             @Override
136             public void close() throws IOException {
137                 super.close();
138 
139                 // 4. rename filename.new to filename
140                 Files.rename(fs, newPath, path);
141             }
142 
143         };
144     }
145 
146     public static void delete(final FileSystem fs, final Path path) throws IOException {
147 
148         IOException exception = null;
149 
150         try {
151             if (fs.delete(path, false)) {
152                 return;
153             }
154         } catch (final IOException ex) {
155             exception = ex;
156         }
157 
158         if (fs.exists(path)) {
159             throw exception != null ? exception : new IOException("Cannot delete " + path);
160         }
161     }
162 
163     public static void rename(final FileSystem fs, final Path from, final Path to)
164             throws IOException {
165 
166         if (from.equals(to)) {
167             return;
168         }
169 
170         final boolean renamed = fs.rename(from, to);
171 
172         if (!renamed) {
173             String message = "Cannot rename " + from + " to " + to;
174             if (fs.exists(to)) {
175                 message += ": destination already exists";
176             } else if (fs.exists(from)) {
177                 message += ": source does not exist";
178             }
179             throw new IOException(message);
180         }
181     }
182 
183     @Nullable
184     public static FileStatus stat(final FileSystem fs, final Path path) throws IOException {
185 
186         try {
187             final FileStatus status = fs.getFileStatus(path);
188             if (status != null) {
189                 return status;
190             }
191 
192         } catch (final IOException ex) {
193             if (fs.exists(path)) {
194                 throw ex;
195             }
196         }
197 
198         return null;
199     }
200 
201     private Files() {
202     }
203 
204 }