1 package eu.fbk.knowledgestore.filestore;
2
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.io.OutputStream;
6
7 import javax.annotation.Nullable;
8
9 import com.google.common.base.MoreObjects;
10 import com.google.common.base.Preconditions;
11 import com.google.common.base.Throwables;
12 import com.google.common.collect.AbstractIterator;
13
14 import org.apache.hadoop.fs.FileStatus;
15 import org.apache.hadoop.fs.FileSystem;
16 import org.apache.hadoop.fs.Path;
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
19
20 import eu.fbk.knowledgestore.data.Data;
21 import eu.fbk.knowledgestore.data.Stream;
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38 public class HadoopFileStore implements FileStore {
39
40 private static final Logger LOGGER = LoggerFactory.getLogger(HadoopFileStore.class);
41
42 private static final String DEFAULT_PATH = "files";
43
44 private final FileSystem fileSystem;
45
46 private final Path rootPath;
47
48
49
50
51
52
53
54
55
56
57
58 public HadoopFileStore(final FileSystem fileSystem, @Nullable final String path) {
59 this.fileSystem = Preconditions.checkNotNull(fileSystem);
60 this.rootPath = new Path(MoreObjects.firstNonNull(path, DEFAULT_PATH))
61 .makeQualified(this.fileSystem);
62 LOGGER.info("{} configured, path={}", getClass().getSimpleName(), this.rootPath);
63 }
64
65 @Override
66 public void init() throws IOException {
67 if (!this.fileSystem.exists(this.rootPath)) {
68 this.fileSystem.mkdirs(this.rootPath);
69 }
70 }
71
72 @Override
73 public InputStream read(final String fileName) throws FileMissingException, IOException {
74 final Path path = getFullPath(fileName);
75 try {
76 final InputStream stream = this.fileSystem.open(path);
77 if (LOGGER.isDebugEnabled()) {
78 LOGGER.debug("Reading file " + getRelativePath(path));
79 }
80 return stream;
81 } catch (final IOException ex) {
82 if (!this.fileSystem.exists(path)) {
83 throw new FileMissingException(fileName, "Cannot read non-existing file");
84 }
85 throw ex;
86 }
87 }
88
89 @Override
90 public OutputStream write(final String fileName) throws FileExistsException, IOException {
91 final Path path = getFullPath(fileName);
92 try {
93 final OutputStream stream = this.fileSystem.create(path, false);
94 if (LOGGER.isDebugEnabled()) {
95 LOGGER.debug("Creating file " + getRelativePath(path));
96 }
97 return stream;
98 } catch (final IOException ex) {
99 if (this.fileSystem.exists(path)) {
100 throw new FileExistsException(fileName, "Cannot overwrite file");
101 }
102 throw ex;
103 }
104 }
105
106 @Override
107 public void delete(final String fileName) throws FileMissingException, IOException {
108 final Path path = getFullPath(fileName);
109 boolean deleted = false;
110 try {
111 deleted = this.fileSystem.delete(path, false);
112 if (deleted) {
113 final Path parent = path.getParent();
114 if (this.fileSystem.listStatus(parent).length == 0) {
115 this.fileSystem.delete(parent, false);
116 }
117 }
118
119 } finally {
120 if (!deleted && !this.fileSystem.exists(path)) {
121 throw new FileMissingException(fileName, "Cannot delete non-existing file.");
122 }
123 }
124 if (LOGGER.isDebugEnabled()) {
125 LOGGER.debug("Deleted file " + getRelativePath(path));
126 }
127 }
128
129 @Override
130 public Stream<String> list() throws IOException {
131 return Stream.create(new HadoopIterator());
132 }
133
134 @Override
135 public void close() {
136
137 }
138
139 @Override
140 public String toString() {
141 return getClass().getSimpleName();
142 }
143
144 private Path getFullPath(final String fileName) {
145 final String typeDirectory = MoreObjects.firstNonNull(Data.extensionToMimeType(fileName),
146 "application/octet-stream").replace('/', '_');
147 final String bucketDirectory = Data.hash(fileName).substring(0, 2);
148 return new Path(this.rootPath, typeDirectory + "/" + bucketDirectory + "/" + fileName);
149 }
150
151 private String getRelativePath(final Path path) {
152 return path.toString().substring(this.rootPath.toString().length());
153 }
154
155 private class HadoopIterator extends AbstractIterator<String> {
156
157 private final FileStatus[] typeDirectories;
158
159 private FileStatus[] bucketDirectories;
160
161 private FileStatus[] files;
162
163 private int typeIndex;
164
165 private int bucketIndex;
166
167 private int fileIndex;
168
169 HadoopIterator() throws IOException {
170 this.typeDirectories = HadoopFileStore.this.fileSystem
171 .listStatus(HadoopFileStore.this.rootPath);
172 this.bucketDirectories = new FileStatus[] {};
173 this.files = new FileStatus[] {};
174 }
175
176 @Override
177 protected String computeNext() {
178 try {
179 while (true) {
180 if (this.fileIndex < this.files.length) {
181 final FileStatus file = this.files[this.fileIndex++];
182 if (!file.isDir()) {
183 return file.getPath().getName();
184 }
185 } else if (this.bucketIndex < this.bucketDirectories.length) {
186 final FileStatus bucketDirectory;
187 bucketDirectory = this.bucketDirectories[this.bucketIndex++];
188 if (bucketDirectory.isDir()) {
189 this.files = HadoopFileStore.this.fileSystem
190 .listStatus(bucketDirectory.getPath());
191 this.fileIndex = 0;
192 }
193 } else if (this.typeIndex < this.typeDirectories.length) {
194 final FileStatus typeDirectory;
195 typeDirectory = this.typeDirectories[this.typeIndex++];
196 if (typeDirectory.isDir()) {
197 this.bucketDirectories = HadoopFileStore.this.fileSystem
198 .listStatus(typeDirectory.getPath());
199 this.bucketIndex = 0;
200 }
201 } else {
202 return endOfData();
203 }
204 }
205 } catch (final Throwable ex) {
206 throw Throwables.propagate(ex);
207 }
208 }
209
210 }
211
212 }