1 package eu.fbk.knowledgestore.triplestore;
2
3 import com.google.common.base.Preconditions;
4 import eu.fbk.knowledgestore.data.Data;
5 import eu.fbk.knowledgestore.data.Handler;
6 import eu.fbk.knowledgestore.data.Stream;
7 import info.aduna.iteration.CloseableIteration;
8 import info.aduna.iteration.IterationWrapper;
9 import org.openrdf.model.Resource;
10 import org.openrdf.model.Statement;
11 import org.openrdf.model.URI;
12 import org.openrdf.model.Value;
13 import org.openrdf.query.BindingSet;
14 import org.openrdf.query.QueryEvaluationException;
15 import org.slf4j.Logger;
16 import org.slf4j.LoggerFactory;
17
18 import javax.annotation.Nullable;
19 import java.io.IOException;
20 import java.util.concurrent.atomic.AtomicBoolean;
21 import java.util.concurrent.atomic.AtomicLong;
22
23
24
25
26
27
28
29
30
31
32
33 public final class LoggingTripleStore extends ForwardingTripleStore {
34
35 private static final Logger LOGGER = LoggerFactory.getLogger(LoggingTripleStore.class);
36
37 private final TripleStore delegate;
38
39
40
41
42
43
44
45 public LoggingTripleStore(final TripleStore delegate) {
46 this.delegate = Preconditions.checkNotNull(delegate);
47 LOGGER.debug("{} configured", getClass().getSimpleName());
48 }
49
50 @Override
51 protected TripleStore delegate() {
52 return this.delegate;
53 }
54
55 @Override
56 public void init() throws IOException {
57 if (LOGGER.isDebugEnabled()) {
58 final long ts = System.currentTimeMillis();
59 super.init();
60 LOGGER.debug("{} - initialized in {} ms", this, System.currentTimeMillis() - ts);
61 } else {
62 super.init();
63 }
64 }
65
66 @Override
67 public TripleTransaction begin(final boolean readOnly) throws IOException {
68 if (LOGGER.isDebugEnabled()) {
69 final long ts = System.currentTimeMillis();
70 final TripleTransaction transaction = new LoggingTripleTransaction(
71 super.begin(readOnly), ts);
72 LOGGER.debug("{} - started in {} mode in {} ms", transaction, readOnly ? "read-only"
73 : "read-write", System.currentTimeMillis() - ts);
74 return transaction;
75 } else {
76 return super.begin(readOnly);
77 }
78 }
79
80 @Override
81 public void reset() throws IOException {
82 if (LOGGER.isDebugEnabled()) {
83 final long ts = System.currentTimeMillis();
84 super.reset();
85 LOGGER.debug("{} - reset done in {} ms", this, System.currentTimeMillis() - ts);
86 } else {
87 super.reset();
88 }
89 }
90
91 @Override
92 public void close() {
93 if (LOGGER.isDebugEnabled()) {
94 final long ts = System.currentTimeMillis();
95 super.close();
96 LOGGER.debug("{} - closed in {} ms", this, System.currentTimeMillis() - ts);
97 } else {
98 super.close();
99 }
100 }
101
102 private static final class LoggingTripleTransaction extends ForwardingTripleTransaction {
103
104 private final TripleTransaction delegate;
105
106 private final long ts;
107
108 LoggingTripleTransaction(final TripleTransaction delegate, final long ts) {
109 this.delegate = Preconditions.checkNotNull(delegate);
110 this.ts = ts;
111 }
112
113 @Override
114 protected TripleTransaction delegate() {
115 return this.delegate;
116 }
117
118 private String format(@Nullable final Value value) {
119 return value == null ? "*" : Data.toString(value, Data.getNamespaceMap());
120 }
121
122 @Nullable
123 private <T, E extends Exception> CloseableIteration<T, E> logClose(
124 @Nullable final CloseableIteration<T, E> iteration, final String name,
125 final long ts) {
126 return iteration == null ? null : new IterationWrapper<T, E>(iteration) {
127
128 private int count = 0;
129
130 private boolean hasNext = true;
131
132 @Override
133 public boolean hasNext() throws E {
134 this.hasNext = super.hasNext();
135 return this.hasNext;
136 }
137
138 @Override
139 public T next() throws E {
140 final T result = super.next();
141 ++this.count;
142 return result;
143 }
144
145 @Override
146 protected void handleClose() throws E {
147 try {
148 super.handleClose();
149 } finally {
150 if (LOGGER.isDebugEnabled()) {
151 LOGGER.debug("{} - {} closed after {} ms, {} results retrieved{}",
152 LoggingTripleTransaction.this, name,
153 System.currentTimeMillis() - ts, this.count, this.hasNext ? ""
154 : " (exhausted)");
155 }
156 }
157 }
158
159 };
160 }
161
162 @Override
163 public CloseableIteration<? extends Statement, ? extends Exception> get(
164 @Nullable final Resource subject, @Nullable final URI predicate,
165 @Nullable final Value object, @Nullable final Resource context)
166 throws IOException, IllegalStateException {
167
168 if (LOGGER.isDebugEnabled()) {
169 final String name = "statement iteration for <" + format(subject) + ", "
170 + format(predicate) + ", " + format(object) + ", " + format(context) + ">";
171 final long ts = System.currentTimeMillis();
172 CloseableIteration<? extends Statement, ? extends Exception> result;
173 result = logClose(super.get(subject, predicate, object, context), name, ts);
174 LOGGER.debug("{} - {} obtained in {} ms", this, name, System.currentTimeMillis()
175 - ts);
176 return result;
177 } else {
178 return super.get(subject, predicate, object, context);
179 }
180 }
181
182 @Override
183 public CloseableIteration<BindingSet, QueryEvaluationException> query(
184 final SelectQuery query, @Nullable final BindingSet bindings, final Long timeout)
185 throws IOException, UnsupportedOperationException {
186
187 if (LOGGER.isDebugEnabled()) {
188 LOGGER.debug("{} - evaluating query ({} bindings, {} timeout):\n{}", this,
189 bindings == null ? 0 : bindings.size(), timeout, query);
190 final String name = "query result iteration";
191 final long ts = System.currentTimeMillis();
192 CloseableIteration<BindingSet, QueryEvaluationException> result;
193 result = logClose(super.query(query, bindings, timeout), name, ts);
194 LOGGER.debug("{} - {} obtained in {} ms", this, name, System.currentTimeMillis()
195 - ts);
196 return result;
197 } else {
198 return super.query(query, bindings, timeout);
199 }
200 }
201
202 @Override
203 public void infer(@Nullable final Handler<? super Statement> handler) throws IOException,
204 IllegalStateException {
205
206 if (LOGGER.isDebugEnabled()) {
207 LOGGER.debug("{} - start materializing inferences");
208 final long ts = System.currentTimeMillis();
209 super.infer(handler);
210 LOGGER.debug("{} - inferences materialized in {} ms", this,
211 System.currentTimeMillis() - ts);
212 } else {
213 super.infer(handler);
214 }
215 }
216
217 @Override
218 public void add(final Iterable<? extends Statement> statements) throws IOException,
219 IllegalStateException {
220
221 if (LOGGER.isDebugEnabled()) {
222 final AtomicLong count = new AtomicLong();
223 final AtomicBoolean eof = new AtomicBoolean();
224 @SuppressWarnings("unchecked")
225 Iterable<Statement> stmts = (Iterable<Statement>) statements;
226 final Stream<Statement> stream = Stream.create(stmts).track(count, eof);
227 final long ts = System.currentTimeMillis();
228 super.add(stream);
229 LOGGER.debug("{} - {} statements added in {} ms{}", this, count,
230 System.currentTimeMillis() - ts, eof.get() ? ", EOF" : "");
231 } else {
232 super.add(statements);
233 }
234
235 }
236
237 @Override
238 public void remove(final Iterable<? extends Statement> statements) throws IOException,
239 IllegalStateException {
240
241 if (LOGGER.isDebugEnabled()) {
242 final AtomicLong count = new AtomicLong();
243 final AtomicBoolean eof = new AtomicBoolean();
244 @SuppressWarnings("unchecked")
245 Iterable<Statement> stmts = (Iterable<Statement>) statements;
246 final Stream<Statement> stream = Stream.create(stmts).track(count, eof);
247 final long ts = System.currentTimeMillis();
248 super.remove(stream);
249 LOGGER.debug("{} - {} statements removed in {} ms{}", this, count,
250 System.currentTimeMillis() - ts, eof.get() ? ", EOF" : "");
251 } else {
252 super.remove(statements);
253 }
254 }
255
256 @Override
257 public void end(final boolean commit) throws IOException {
258
259 if (LOGGER.isDebugEnabled()) {
260 final long ts = System.currentTimeMillis();
261 super.end(commit);
262 final long ts2 = System.currentTimeMillis();
263 LOGGER.debug("{} - {} done in {} ms, tx duration {} ms", this, commit ? "commit"
264 : "rollback", ts2 - ts, ts2 - this.ts);
265 } else {
266 super.end(commit);
267 }
268 }
269
270 }
271
272 }