1 package eu.fbk.knowledgestore.triplestore;
2
3 import java.io.IOException;
4 import java.lang.ref.WeakReference;
5 import java.util.List;
6 import java.util.concurrent.atomic.AtomicBoolean;
7 import java.util.concurrent.atomic.AtomicInteger;
8
9 import javax.annotation.Nullable;
10
11 import com.google.common.base.Preconditions;
12 import com.google.common.collect.Lists;
13
14 import org.openrdf.model.Resource;
15 import org.openrdf.model.Statement;
16 import org.openrdf.model.URI;
17 import org.openrdf.model.Value;
18 import org.openrdf.query.BindingSet;
19 import org.openrdf.query.QueryEvaluationException;
20 import org.slf4j.Logger;
21 import org.slf4j.LoggerFactory;
22
23 import info.aduna.iteration.CloseableIteration;
24
25 import eu.fbk.knowledgestore.data.Handler;
26 import eu.fbk.knowledgestore.internal.Util;
27 import eu.fbk.knowledgestore.runtime.Component;
28 import eu.fbk.knowledgestore.runtime.Synchronizer;
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55 public class SynchronizedTripleStore extends ForwardingTripleStore {
56
57 private static final Logger LOGGER = LoggerFactory.getLogger(SynchronizedTripleStore.class);
58
59 private static final int NEW = 0;
60
61 private static final int INITIALIZED = 1;
62
63 private static final int CLOSED = 2;
64
65 private final TripleStore delegate;
66
67 private final Synchronizer synchronizer;
68
69 private final List<TripleTransaction> transactions;
70
71 private final AtomicInteger state;
72
73
74
75
76
77
78
79
80
81
82 public SynchronizedTripleStore(final TripleStore delegate, final String synchronizerSpec) {
83 this(delegate, Synchronizer.create(synchronizerSpec));
84 }
85
86
87
88
89
90
91
92
93
94
95
96 public SynchronizedTripleStore(final TripleStore delegate, final Synchronizer synchronizer) {
97 this.delegate = Preconditions.checkNotNull(delegate);
98 this.synchronizer = Preconditions.checkNotNull(synchronizer);
99 this.transactions = Lists.newArrayList();
100 this.state = new AtomicInteger(NEW);
101 LOGGER.debug("{} configured, synchronizer={}", getClass().getSimpleName(), synchronizer);
102 }
103
104 @Override
105 protected TripleStore delegate() {
106 return this.delegate;
107 }
108
109 private void checkState(final int expected) {
110 final int state = this.state.get();
111 if (state != expected) {
112 throw new IllegalStateException("TripleStore "
113 + (state == NEW ? "not initialized"
114 : state == INITIALIZED ? "already initialized" : "already closed"));
115 }
116 }
117
118 @Override
119 public synchronized void init() throws IOException {
120 checkState(NEW);
121 super.init();
122 this.state.set(INITIALIZED);
123 }
124
125 @Override
126 public TripleTransaction begin(final boolean readOnly) throws IOException {
127 checkState(INITIALIZED);
128 this.synchronizer.beginTransaction(readOnly);
129 TripleTransaction transaction = null;
130 try {
131 synchronized (this) {
132 checkState(INITIALIZED);
133 transaction = delegate().begin(readOnly);
134 if (Thread.interrupted()) {
135 transaction.end(false);
136 throw new IllegalStateException("Interrupted");
137 }
138 transaction = new SynchronizedTripleTransaction(transaction, readOnly);
139 synchronized (this.transactions) {
140 this.transactions.add(transaction);
141 }
142 }
143 } finally {
144 if (transaction == null) {
145 this.synchronizer.endTransaction(readOnly);
146 }
147 }
148 return transaction;
149 }
150
151 @Override
152 public void reset() throws IOException {
153 checkState(INITIALIZED);
154 this.synchronizer.beginExclusive();
155 try {
156 synchronized (this) {
157 checkState(INITIALIZED);
158 delegate().reset();
159 }
160 } finally {
161 this.synchronizer.endExclusive();
162 }
163 }
164
165 @Override
166 public void close() {
167 if (!this.state.compareAndSet(INITIALIZED, CLOSED)
168 && !this.state.compareAndSet(NEW, CLOSED)) {
169 return;
170 }
171 List<TripleTransaction> transactionsToEnd;
172 synchronized (this.transactions) {
173 transactionsToEnd = Lists.newArrayList(this.transactions);
174 }
175 try {
176 for (final TripleTransaction transaction : transactionsToEnd) {
177 try {
178 LOGGER.warn("Forcing rollback of tx " + transaction
179 + " due to closure of TripleStore");
180 transaction.end(false);
181 } catch (final Throwable ex) {
182 LOGGER.error("Exception caught while ending tx " + transaction
183 + " (rollback assumed): " + ex.getMessage(), ex);
184 }
185 }
186 } finally {
187 super.close();
188 }
189 }
190
191 private final class SynchronizedTripleTransaction extends ForwardingTripleTransaction {
192
193 private final TripleTransaction delegate;
194
195 private final List<WeakReference<CloseableIteration<?, ?>>> iterations;
196
197 private final boolean readOnly;
198
199 private final AtomicBoolean ended;
200
201 SynchronizedTripleTransaction(final TripleTransaction delegate, final boolean readOnly) {
202 this.delegate = delegate;
203 this.iterations = Lists.newArrayList();
204 this.readOnly = readOnly;
205 this.ended = new AtomicBoolean(false);
206 }
207
208 @Override
209 protected TripleTransaction delegate() {
210 return this.delegate;
211 }
212
213 private <T extends CloseableIteration<?, ?>> T registerIteration(
214 @Nullable final T iteration) {
215 synchronized (this.iterations) {
216 if (iteration == null) {
217 return null;
218 } else if (this.ended.get() || Thread.interrupted()) {
219 Util.closeQuietly(iteration);
220 throw new IllegalStateException("Closed / interrupted");
221 } else {
222 final int size = this.iterations.size();
223 for (int i = size - 1; i >= 0; --i) {
224 if (this.iterations.get(i).get() == null) {
225 this.iterations.remove(i);
226 }
227 }
228 this.iterations.add(new WeakReference<CloseableIteration<?, ?>>(iteration));
229 }
230 }
231 return iteration;
232 }
233
234 private void closeIterations() {
235 synchronized (this.iterations) {
236 final int size = this.iterations.size();
237 for (int i = size - 1; i >= 0; --i) {
238 Util.closeQuietly(this.iterations.remove(i).get());
239 }
240 }
241 }
242
243 private void checkState() {
244 if (this.ended.get()) {
245 throw new IllegalStateException("DataTransaction already ended");
246 }
247 if (Thread.interrupted()) {
248 throw new IllegalStateException("Interrupted");
249 }
250 }
251
252 @Override
253 public synchronized CloseableIteration<? extends Statement, ? extends Exception> get(
254 @Nullable final Resource subject, @Nullable final URI predicate,
255 @Nullable final Value object, @Nullable final Resource context)
256 throws IOException, IllegalStateException {
257 checkState();
258 return registerIteration(super.get(subject, predicate, object, context));
259 }
260
261 @Override
262 public synchronized CloseableIteration<BindingSet, QueryEvaluationException> query(
263 final SelectQuery query, @Nullable final BindingSet bindings,
264 @Nullable final Long timeout) throws IOException, UnsupportedOperationException {
265 checkState();
266 return registerIteration(super.query(query, bindings, timeout));
267 }
268
269 @Override
270 public synchronized void infer(@Nullable final Handler<? super Statement> handler)
271 throws IOException, IllegalStateException {
272 checkState();
273 super.infer(handler);
274 }
275
276 @Override
277 public synchronized void add(final Iterable<? extends Statement> stream)
278 throws IOException, IllegalStateException {
279 checkState();
280 super.add(stream);
281 }
282
283 @Override
284 public synchronized void remove(final Iterable<? extends Statement> stream)
285 throws IOException, IllegalStateException {
286 checkState();
287 super.remove(stream);
288 }
289
290 @Override
291 public void end(final boolean commit) throws IOException {
292 if (!this.ended.compareAndSet(false, true)) {
293 return;
294 }
295 closeIterations();
296 SynchronizedTripleStore.this.synchronizer.beginCommit();
297 try {
298 super.end(commit);
299 } finally {
300 SynchronizedTripleStore.this.synchronizer.endCommit();
301 SynchronizedTripleStore.this.synchronizer.endTransaction(this.readOnly);
302 synchronized (SynchronizedTripleStore.this.transactions) {
303 SynchronizedTripleStore.this.transactions.remove(this);
304 }
305 }
306 }
307
308 }
309
310 }