1 package eu.fbk.knowledgestore.filestore;
2
3 import java.io.IOException;
4 import java.io.InputStream;
5 import java.io.OutputStream;
6 import java.util.List;
7 import java.util.Set;
8 import java.util.concurrent.atomic.AtomicInteger;
9
10 import com.google.common.base.Preconditions;
11 import com.google.common.base.Throwables;
12 import com.google.common.collect.Lists;
13 import com.google.common.collect.Sets;
14
15 import org.slf4j.Logger;
16 import org.slf4j.LoggerFactory;
17
18 import eu.fbk.knowledgestore.data.Stream;
19 import eu.fbk.knowledgestore.runtime.Component;
20 import eu.fbk.knowledgestore.runtime.Synchronizer;
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39 public final class SynchronizedFileStore extends ForwardingFileStore {
40
41 private static final Logger LOGGER = LoggerFactory.getLogger(SynchronizedFileStore.class);
42
43 private static final int NUM_LOCKS = 255;
44
45 private static final int NEW = 0;
46
47 private static final int INITIALIZED = 1;
48
49 private static final int CLOSED = 2;
50
51 private final FileStore delegate;
52
53 private final Synchronizer synchronizer;
54
55 private final AtomicInteger state;
56
57 private final Object[] fileLocks;
58
59 private final Set<Stream<String>> pendingListStreams;
60
61
62
63
64
65
66
67
68
69
70 public SynchronizedFileStore(final FileStore delegate, final String synchronizerSpec) {
71 this.delegate = Preconditions.checkNotNull(delegate);
72 this.synchronizer = Synchronizer.create(synchronizerSpec);
73 this.state = new AtomicInteger(NEW);
74 this.fileLocks = new Object[NUM_LOCKS];
75 for (int i = 0; i < NUM_LOCKS; ++i) {
76 this.fileLocks[i] = new Object();
77 }
78 this.pendingListStreams = Sets.newHashSet();
79 }
80
81 @Override
82 protected FileStore delegate() {
83 return this.delegate;
84 }
85
86 private void checkState(final int expected) {
87 final int state = this.state.get();
88 if (state != expected) {
89 throw new IllegalStateException("FileStore "
90 + (state == NEW ? "not initialized"
91 : state == INITIALIZED ? "already initialized" : "already closed"));
92 }
93 }
94
95 private Object lockFor(final String fileName) {
96 return this.fileLocks[fileName.hashCode() % 0x7FFFFFFF % NUM_LOCKS];
97 }
98
99 @Override
100 public void init() throws IOException {
101 checkState(NEW);
102 super.init();
103 this.state.set(INITIALIZED);
104 }
105
106 @Override
107 public InputStream read(final String fileName) throws FileMissingException, IOException {
108 checkState(INITIALIZED);
109 Preconditions.checkNotNull(fileName);
110 this.synchronizer.beginTransaction(true);
111 try {
112 checkState(INITIALIZED);
113 synchronized (lockFor(fileName)) {
114 return super.read(fileName);
115 }
116 } finally {
117 this.synchronizer.endTransaction(true);
118 }
119 }
120
121 @Override
122 public OutputStream write(final String fileName) throws FileExistsException, IOException {
123 checkState(INITIALIZED);
124 Preconditions.checkNotNull(fileName);
125 this.synchronizer.beginTransaction(false);
126 try {
127 checkState(INITIALIZED);
128 synchronized (lockFor(fileName)) {
129 return super.write(fileName);
130 }
131 } finally {
132 this.synchronizer.endTransaction(false);
133 }
134 }
135
136 @Override
137 public void delete(final String fileName) throws FileMissingException, IOException {
138 checkState(INITIALIZED);
139 Preconditions.checkNotNull(fileName);
140 this.synchronizer.beginTransaction(false);
141 try {
142 checkState(INITIALIZED);
143 synchronized (lockFor(fileName)) {
144 super.delete(fileName);
145 }
146 } finally {
147 this.synchronizer.endTransaction(false);
148 }
149 }
150
151 @Override
152 public Stream<String> list() throws IOException {
153
154 checkState(INITIALIZED);
155
156 this.synchronizer.beginTransaction(true);
157 try {
158 checkState(INITIALIZED);
159 final Stream<String> stream = super.list();
160 synchronized (this.pendingListStreams) {
161 this.pendingListStreams.add(stream);
162 }
163 stream.onClose(new Runnable() {
164
165 @Override
166 public void run() {
167 SynchronizedFileStore.this.synchronizer.endTransaction(true);
168 synchronized (SynchronizedFileStore.this.pendingListStreams) {
169 SynchronizedFileStore.this.pendingListStreams.remove(this);
170 }
171 }
172
173 });
174 return stream;
175
176 } catch (final Throwable ex) {
177 this.synchronizer.endTransaction(true);
178 Throwables.propagateIfPossible(ex, IOException.class);
179 throw Throwables.propagate(ex);
180 }
181 }
182
183 @Override
184 public void close() {
185 if (!this.state.compareAndSet(INITIALIZED, CLOSED)
186 && !this.state.compareAndSet(NEW, CLOSED)) {
187 return;
188 }
189 List<Stream<String>> streamsToEnd;
190 synchronized (this.pendingListStreams) {
191 streamsToEnd = Lists.newArrayList(this.pendingListStreams);
192 }
193 try {
194 for (final Stream<String> stream : streamsToEnd) {
195 try {
196 LOGGER.warn("Forcing closure of stream due to FileStore closure");
197 stream.close();
198 } catch (final Throwable ex) {
199 LOGGER.error("Exception caught while closing stream: " + ex.getMessage(), ex);
200 }
201 }
202 } finally {
203 super.close();
204 }
205 }
206
207 }