1 package eu.fbk.knowledgestore.datastore;
2
3 import java.io.IOException;
4 import java.lang.ref.WeakReference;
5 import java.util.List;
6 import java.util.Map;
7 import java.util.Set;
8 import java.util.concurrent.atomic.AtomicBoolean;
9 import java.util.concurrent.atomic.AtomicInteger;
10
11 import javax.annotation.Nullable;
12
13 import com.google.common.base.Preconditions;
14 import com.google.common.collect.Lists;
15 import com.google.common.collect.Sets;
16
17 import org.openrdf.model.URI;
18 import org.slf4j.Logger;
19 import org.slf4j.LoggerFactory;
20
21 import eu.fbk.knowledgestore.data.Record;
22 import eu.fbk.knowledgestore.data.Stream;
23 import eu.fbk.knowledgestore.data.XPath;
24 import eu.fbk.knowledgestore.internal.Util;
25 import eu.fbk.knowledgestore.runtime.Component;
26 import eu.fbk.knowledgestore.runtime.Synchronizer;
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51 public final class SynchronizedDataStore extends ForwardingDataStore {
52
53 private static final Logger LOGGER = LoggerFactory.getLogger(SynchronizedDataStore.class);
54
55 private static final int NEW = 0;
56
57 private static final int INITIALIZED = 1;
58
59 private static final int CLOSED = 2;
60
61 private final DataStore delegate;
62
63 private final Synchronizer synchronizer;
64
65 private final Set<DataTransaction> transactions;
66
67 private final AtomicInteger state;
68
69
70
71
72
73
74
75
76
77
78 public SynchronizedDataStore(final DataStore delegate, final String synchronizerSpec) {
79 this(delegate, Synchronizer.create(synchronizerSpec));
80 }
81
82
83
84
85
86
87
88
89
90
91
92 public SynchronizedDataStore(final DataStore delegate, final Synchronizer synchronizer) {
93 this.delegate = Preconditions.checkNotNull(delegate);
94 this.synchronizer = Preconditions.checkNotNull(synchronizer);
95 this.transactions = Sets.newHashSet();
96 this.state = new AtomicInteger(NEW);
97 LOGGER.debug("{} configured, synchronizer=", getClass().getSimpleName(), synchronizer);
98 }
99
100 @Override
101 protected DataStore delegate() {
102 return this.delegate;
103 }
104
105 private void checkState(final int expected) {
106 final int state = this.state.get();
107 if (state != expected) {
108 throw new IllegalStateException("DataStore "
109 + (state == NEW ? "not initialized"
110 : state == INITIALIZED ? "already initialized" : "already closed"));
111 }
112 }
113
114 @Override
115 public synchronized void init() throws IOException {
116 checkState(NEW);
117 super.init();
118 this.state.set(INITIALIZED);
119 }
120
121 @Override
122 public DataTransaction begin(final boolean readOnly) throws IOException, IllegalStateException {
123 checkState(INITIALIZED);
124 this.synchronizer.beginTransaction(readOnly);
125 DataTransaction transaction = null;
126 try {
127 synchronized (this) {
128 checkState(INITIALIZED);
129 transaction = delegate().begin(readOnly);
130 transaction = new SynchronizedDataTransaction(transaction, readOnly);
131 synchronized (this.transactions) {
132 this.transactions.add(transaction);
133 }
134 }
135 } finally {
136 if (transaction == null) {
137 this.synchronizer.endTransaction(readOnly);
138 }
139 }
140 return transaction;
141 }
142
143 @Override
144 public void close() {
145 if (!this.state.compareAndSet(INITIALIZED, CLOSED)
146 && !this.state.compareAndSet(NEW, CLOSED)) {
147 return;
148 }
149 List<DataTransaction> transactionsToEnd;
150 synchronized (this.transactions) {
151 transactionsToEnd = Lists.newArrayList(this.transactions);
152 }
153 try {
154 for (final DataTransaction transaction : transactionsToEnd) {
155 try {
156 LOGGER.warn("Forcing rollback of DataTransaction " + transaction
157 + "due to closure of DataStore");
158 transaction.end(false);
159 } catch (final Throwable ex) {
160 LOGGER.error("Exception caught while ending DataTransaction " + transaction
161 + "(rollback assumed): " + ex.getMessage(), ex);
162 }
163 }
164 } finally {
165 super.close();
166 }
167 }
168
169 private final class SynchronizedDataTransaction extends ForwardingDataTransaction {
170
171 private final DataTransaction delegate;
172
173 private final List<WeakReference<Stream<?>>> streams;
174
175 private final boolean readOnly;
176
177 private final AtomicBoolean ended;
178
179 SynchronizedDataTransaction(final DataTransaction delegate, final boolean readOnly) {
180 this.delegate = Preconditions.checkNotNull(delegate);
181 this.streams = Lists.newArrayList();
182 this.readOnly = readOnly;
183 this.ended = new AtomicBoolean(false);
184 }
185
186 @Override
187 protected DataTransaction delegate() {
188 return this.delegate;
189 }
190
191 private <T extends Stream<?>> T registerStream(@Nullable final T stream) {
192 synchronized (this.streams) {
193 if (stream == null) {
194 return null;
195 } else if (this.ended.get()) {
196 Util.closeQuietly(stream);
197 } else {
198 final int size = this.streams.size();
199 for (int i = size - 1; i >= 0; --i) {
200 if (this.streams.get(i).get() == null) {
201 this.streams.remove(i);
202 }
203 }
204 this.streams.add(new WeakReference<Stream<?>>(stream));
205 }
206 }
207 return stream;
208 }
209
210 private void closeStreams() {
211 synchronized (this.streams) {
212 final int size = this.streams.size();
213 for (int i = size - 1; i >= 0; --i) {
214 Util.closeQuietly(this.streams.remove(i).get());
215 }
216 }
217 }
218
219 private void checkState() {
220 if (this.ended.get()) {
221 throw new IllegalStateException("DataTransaction already ended");
222 }
223 }
224
225 private void checkWritable() {
226 if (this.readOnly) {
227 throw new IllegalStateException("DataTransaction is read-only");
228 }
229 }
230
231 @Override
232 public synchronized Stream<Record> lookup(final URI type, final Set<? extends URI> ids,
233 @Nullable final Set<? extends URI> properties) throws IOException,
234 IllegalArgumentException, IllegalStateException {
235 checkState();
236 return registerStream(super.lookup(type, ids, properties));
237 }
238
239 @Override
240 public synchronized Stream<Record> retrieve(final URI type,
241 @Nullable final XPath condition, @Nullable final Set<? extends URI> properties)
242 throws IOException, IllegalArgumentException, IllegalStateException {
243 checkState();
244 return registerStream(super.retrieve(type, condition, properties));
245 }
246
247 @Override
248 public synchronized long count(final URI type, @Nullable final XPath condition)
249 throws IOException, IllegalArgumentException, IllegalStateException {
250 checkState();
251 return super.count(type, condition);
252 }
253
254 @Override
255 public Stream<Record> match(final Map<URI, XPath> conditions,
256 final Map<URI, Set<URI>> ids, final Map<URI, Set<URI>> properties)
257 throws IOException, IllegalStateException {
258 checkState();
259 return registerStream(super.match(conditions, ids, properties));
260 }
261
262 @Override
263 public void store(final URI type, final Record record) throws IOException,
264 IllegalStateException {
265 checkState();
266 checkWritable();
267 super.store(type, record);
268 }
269
270 @Override
271 public void delete(final URI type, final URI id) throws IOException, IllegalStateException {
272 checkState();
273 checkWritable();
274 super.delete(type, id);
275 }
276
277 @Override
278 public void end(final boolean commit) throws IOException, IllegalStateException {
279 if (!this.ended.compareAndSet(false, true)) {
280 return;
281 }
282 closeStreams();
283 SynchronizedDataStore.this.synchronizer.beginCommit();
284 try {
285 super.end(commit);
286 } finally {
287 SynchronizedDataStore.this.synchronizer.endCommit();
288 SynchronizedDataStore.this.synchronizer.endTransaction(this.readOnly);
289 synchronized (SynchronizedDataStore.this.transactions) {
290 SynchronizedDataStore.this.transactions.remove(this);
291 }
292 }
293 }
294
295 }
296
297 }