1 package eu.fbk.knowledgestore.triplestore.virtuoso;
2
3 import java.io.IOException;
4 import java.sql.SQLException;
5
6 import javax.annotation.Nullable;
7 import javax.sql.ConnectionPoolDataSource;
8
9 import com.google.common.base.MoreObjects;
10 import com.google.common.base.Preconditions;
11
12 import org.apache.hadoop.fs.FileSystem;
13 import org.apache.hadoop.fs.Path;
14 import org.openrdf.repository.RepositoryException;
15 import org.slf4j.Logger;
16 import org.slf4j.LoggerFactory;
17
18 import virtuoso.jdbc4.VirtuosoConnectionPoolDataSource;
19 import virtuoso.sesame2.driver.VirtuosoRepository;
20 import virtuoso.sesame2.driver.VirtuosoRepositoryConnection;
21
22 import eu.fbk.knowledgestore.runtime.DataCorruptedException;
23 import eu.fbk.knowledgestore.triplestore.SynchronizedTripleStore;
24 import eu.fbk.knowledgestore.triplestore.TripleStore;
25 import eu.fbk.knowledgestore.triplestore.TripleTransaction;
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 public final class VirtuosoTripleStore implements TripleStore {
45
46
47
48
49
50
51 private static final String DEFAULT_HOST = "localhost";
52
53 private static final int DEFAULT_PORT = 1111;
54
55 private static final String DEFAULT_USERNAME = "dba";
56
57 private static final String DEFAULT_PASSWORD = "dba";
58
59 private static final boolean DEFAULT_POOLING = false;
60
61 private static final int DEFAULT_BATCH_SIZE = 5000;
62
63 private static final int DEFAULT_FETCH_SIZE = 200;
64
65 private static final String DEFAULT_MARKER_FILENAME = "virtuoso.bulk.transaction";
66
67 private static final Logger LOGGER = LoggerFactory.getLogger(VirtuosoTripleStore.class);
68
69 private final VirtuosoRepository virtuoso;
70
71 private final FileSystem fileSystem;
72
73 private final Path markerPath;
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90 public VirtuosoTripleStore(final FileSystem fileSystem, @Nullable final String host,
91 @Nullable final Integer port, @Nullable final String username,
92 @Nullable final String password) {
93 this(fileSystem, host, port, username, password, null, null, null, null);
94 }
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123 public VirtuosoTripleStore(final FileSystem fileSystem, @Nullable final String host,
124 @Nullable final Integer port, @Nullable final String username,
125 @Nullable final String password, @Nullable final Boolean pooling,
126 @Nullable final Integer batchSize, @Nullable final Integer fetchSize,
127 @Nullable final String markerFilename) {
128
129
130 final String actualMarkerFilename = MoreObjects.firstNonNull(markerFilename,
131 DEFAULT_MARKER_FILENAME);
132 final String actualHost = MoreObjects.firstNonNull(host, DEFAULT_HOST);
133 final int actualPort = MoreObjects.firstNonNull(port, DEFAULT_PORT);
134 final String actualUsername = MoreObjects.firstNonNull(username, DEFAULT_USERNAME);
135 final String actualPassword = MoreObjects.firstNonNull(password, DEFAULT_PASSWORD);
136 final boolean actualPooling = MoreObjects.firstNonNull(pooling, DEFAULT_POOLING);
137 final int actualBatchSize = MoreObjects.firstNonNull(batchSize, DEFAULT_BATCH_SIZE);
138 final int actualFetchSize = MoreObjects.firstNonNull(fetchSize, DEFAULT_FETCH_SIZE);
139
140
141 Preconditions.checkArgument(actualPort > 0 && actualPort < 65536);
142 Preconditions.checkArgument(actualBatchSize > 0);
143 Preconditions.checkArgument(actualFetchSize > 0);
144
145
146 if (actualPooling) {
147
148
149
150 final VirtuosoConnectionPoolDataSource source = new VirtuosoConnectionPoolDataSource();
151 source.setServerName(actualHost);
152 source.setPortNumber(actualPort);
153 source.setUser(actualUsername);
154 source.setPassword(actualPassword);
155 this.virtuoso = new VirtuosoRepository((ConnectionPoolDataSource) source,
156 "sesame:nil", true);
157 } else {
158 final String url = String.format("jdbc:virtuoso://%s:%d", actualHost, actualPort);
159 this.virtuoso = new VirtuosoRepository(url, actualUsername, actualPassword,
160 "sesame:nil", true);
161 }
162
163
164 this.virtuoso.setBatchSize(actualBatchSize);
165 this.virtuoso.setFetchSize(actualFetchSize);
166
167
168 this.fileSystem = Preconditions.checkNotNull(fileSystem);
169 this.markerPath = new Path(actualMarkerFilename).makeQualified(fileSystem);
170
171
172 LOGGER.info("VirtuosoTripleStore URL: {}", actualHost + ":" + actualPort);
173 LOGGER.info("VirtuosoTripleStore marker: {}", this.markerPath);
174 }
175
176 @Override
177 public void init() throws IOException {
178 try {
179 this.virtuoso.initialize();
180 } catch (final RepositoryException ex) {
181 throw new IOException("Failed to initialize Virtuoso driver", ex);
182 }
183 }
184
185 @Override
186 public TripleTransaction begin(final boolean readOnly) throws DataCorruptedException,
187 IOException {
188
189 if (existsTransactionMarker()) {
190 throw new DataCorruptedException("The triple store performed a bulk operation "
191 + "that didn't complete successfully.");
192 }
193 return new VirtuosoTripleTransaction(this, readOnly);
194 }
195
196 @Override
197 public void reset() throws IOException {
198 VirtuosoRepositoryConnection connection = null;
199 try {
200 connection = (VirtuosoRepositoryConnection) this.virtuoso.getConnection();
201 connection.getQuadStoreConnection().prepareCall("RDF_GLOBAL_RESET ()").execute();
202 } catch (final RepositoryException ex) {
203 throw new IOException("Could not connect to Virtuoso server", ex);
204 } catch (final SQLException e) {
205 throw new IOException("Something went wrong while invoking stored procedure.", e);
206 } finally {
207 if (connection != null) {
208 try {
209 connection.close();
210 } catch (final RepositoryException re) {
211 throw new IOException("Error while closing connection.", re);
212 }
213 }
214 }
215
216 final boolean removedTransactionMarker = removeTransactionMarker();
217 LOGGER.info("Database reset. Transaction marker removed: " + removedTransactionMarker);
218 }
219
220 @Override
221 public void close() {
222
223 try {
224 this.virtuoso.shutDown();
225 } catch (final RepositoryException ex) {
226 LOGGER.error("Failed to shutdown Virtuoso driver", ex);
227 }
228 }
229
230 @Override
231 public String toString() {
232 return getClass().getSimpleName();
233 }
234
235 VirtuosoRepository getVirtuoso() {
236 return this.virtuoso;
237 }
238
239
240
241
242
243
244 boolean existsTransactionMarker() throws IOException {
245
246
247
248
249
250 return false;
251 }
252
253
254
255
256
257
258 boolean addTransactionMarker() throws IOException {
259
260
261
262
263
264 return false;
265 }
266
267
268
269
270
271
272 boolean removeTransactionMarker() throws IOException {
273
274
275
276
277
278 return false;
279 }
280
281 }