1   package eu.fbk.knowledgestore.triplestore.virtuoso;
2   
3   import java.io.IOException;
4   import java.sql.SQLException;
5   
6   import javax.annotation.Nullable;
7   import javax.sql.ConnectionPoolDataSource;
8   
9   import com.google.common.base.MoreObjects;
10  import com.google.common.base.Preconditions;
11  
12  import org.apache.hadoop.fs.FileSystem;
13  import org.apache.hadoop.fs.Path;
14  import org.openrdf.repository.RepositoryException;
15  import org.slf4j.Logger;
16  import org.slf4j.LoggerFactory;
17  
18  import virtuoso.jdbc4.VirtuosoConnectionPoolDataSource;
19  import virtuoso.sesame2.driver.VirtuosoRepository;
20  import virtuoso.sesame2.driver.VirtuosoRepositoryConnection;
21  
22  import eu.fbk.knowledgestore.runtime.DataCorruptedException;
23  import eu.fbk.knowledgestore.triplestore.SynchronizedTripleStore;
24  import eu.fbk.knowledgestore.triplestore.TripleStore;
25  import eu.fbk.knowledgestore.triplestore.TripleTransaction;
26  
27  
28  
29  
30  
31  
32  
33  
34  
35  
36  
37  
38  
39  
40  
41  
42  
43  
44  public final class VirtuosoTripleStore implements TripleStore {
45  
46      
47      
48      
49      
50  
51      private static final String DEFAULT_HOST = "localhost";
52  
53      private static final int DEFAULT_PORT = 1111;
54  
55      private static final String DEFAULT_USERNAME = "dba";
56  
57      private static final String DEFAULT_PASSWORD = "dba";
58  
59      private static final boolean DEFAULT_POOLING = false;
60  
61      private static final int DEFAULT_BATCH_SIZE = 5000;
62  
63      private static final int DEFAULT_FETCH_SIZE = 200;
64  
65      private static final String DEFAULT_MARKER_FILENAME = "virtuoso.bulk.transaction";
66  
67      private static final Logger LOGGER = LoggerFactory.getLogger(VirtuosoTripleStore.class);
68  
69      private final VirtuosoRepository virtuoso;
70  
71      private final FileSystem fileSystem;
72  
73      private final Path markerPath;
74  
75      
76  
77  
78  
79  
80  
81  
82  
83  
84  
85  
86  
87  
88  
89  
90      public VirtuosoTripleStore(final FileSystem fileSystem, @Nullable final String host,
91              @Nullable final Integer port, @Nullable final String username,
92              @Nullable final String password) {
93          this(fileSystem, host, port, username, password, null, null, null, null);
94      }
95  
96      
97  
98  
99  
100 
101 
102 
103 
104 
105 
106 
107 
108 
109 
110 
111 
112 
113 
114 
115 
116 
117 
118 
119 
120 
121 
122 
123     public VirtuosoTripleStore(final FileSystem fileSystem, @Nullable final String host,
124             @Nullable final Integer port, @Nullable final String username,
125             @Nullable final String password, @Nullable final Boolean pooling,
126             @Nullable final Integer batchSize, @Nullable final Integer fetchSize,
127             @Nullable final String markerFilename) {
128 
129         
130         final String actualMarkerFilename = MoreObjects.firstNonNull(markerFilename,
131                 DEFAULT_MARKER_FILENAME);
132         final String actualHost = MoreObjects.firstNonNull(host, DEFAULT_HOST);
133         final int actualPort = MoreObjects.firstNonNull(port, DEFAULT_PORT);
134         final String actualUsername = MoreObjects.firstNonNull(username, DEFAULT_USERNAME);
135         final String actualPassword = MoreObjects.firstNonNull(password, DEFAULT_PASSWORD);
136         final boolean actualPooling = MoreObjects.firstNonNull(pooling, DEFAULT_POOLING);
137         final int actualBatchSize = MoreObjects.firstNonNull(batchSize, DEFAULT_BATCH_SIZE);
138         final int actualFetchSize = MoreObjects.firstNonNull(fetchSize, DEFAULT_FETCH_SIZE);
139 
140         
141         Preconditions.checkArgument(actualPort > 0 && actualPort < 65536);
142         Preconditions.checkArgument(actualBatchSize > 0);
143         Preconditions.checkArgument(actualFetchSize > 0);
144 
145         
146         if (actualPooling) {
147             
148             
149             
150             final VirtuosoConnectionPoolDataSource source = new VirtuosoConnectionPoolDataSource();
151             source.setServerName(actualHost);
152             source.setPortNumber(actualPort);
153             source.setUser(actualUsername);
154             source.setPassword(actualPassword);
155             this.virtuoso = new VirtuosoRepository((ConnectionPoolDataSource) source,
156                     "sesame:nil", true);
157         } else {
158             final String url = String.format("jdbc:virtuoso://%s:%d", actualHost, actualPort);
159             this.virtuoso = new VirtuosoRepository(url, actualUsername, actualPassword,
160                     "sesame:nil", true);
161         }
162 
163         
164         this.virtuoso.setBatchSize(actualBatchSize);
165         this.virtuoso.setFetchSize(actualFetchSize);
166 
167         
168         this.fileSystem = Preconditions.checkNotNull(fileSystem);
169         this.markerPath = new Path(actualMarkerFilename).makeQualified(fileSystem);
170 
171         
172         LOGGER.info("VirtuosoTripleStore URL: {}", actualHost + ":" + actualPort);
173         LOGGER.info("VirtuosoTripleStore marker: {}", this.markerPath);
174     }
175 
176     @Override
177     public void init() throws IOException {
178         try {
179             this.virtuoso.initialize(); 
180         } catch (final RepositoryException ex) {
181             throw new IOException("Failed to initialize Virtuoso driver", ex);
182         }
183     }
184 
185     @Override
186     public TripleTransaction begin(final boolean readOnly) throws DataCorruptedException,
187             IOException {
188         
189         if (existsTransactionMarker()) {
190             throw new DataCorruptedException("The triple store performed a bulk operation "
191                     + "that didn't complete successfully.");
192         }
193         return new VirtuosoTripleTransaction(this, readOnly);
194     }
195 
196     @Override
197     public void reset() throws IOException {
198         VirtuosoRepositoryConnection connection = null;
199         try {
200             connection = (VirtuosoRepositoryConnection) this.virtuoso.getConnection();
201             connection.getQuadStoreConnection().prepareCall("RDF_GLOBAL_RESET ()").execute();
202         } catch (final RepositoryException ex) {
203             throw new IOException("Could not connect to Virtuoso server", ex);
204         } catch (final SQLException e) {
205             throw new IOException("Something went wrong while invoking stored procedure.", e);
206         } finally {
207             if (connection != null) {
208                 try {
209                     connection.close();
210                 } catch (final RepositoryException re) {
211                     throw new IOException("Error while closing connection.", re);
212                 }
213             }
214         }
215 
216         final boolean removedTransactionMarker = removeTransactionMarker();
217         LOGGER.info("Database reset. Transaction marker removed: " + removedTransactionMarker);
218     }
219 
220     @Override
221     public void close() {
222         
223         try {
224             this.virtuoso.shutDown(); 
225         } catch (final RepositoryException ex) {
226             LOGGER.error("Failed to shutdown Virtuoso driver", ex);
227         }
228     }
229 
230     @Override
231     public String toString() {
232         return getClass().getSimpleName();
233     }
234 
235     VirtuosoRepository getVirtuoso() {
236         return this.virtuoso;
237     }
238 
239     
240 
241 
242 
243 
244     boolean existsTransactionMarker() throws IOException {
245         
246         
247         
248         
249         
250         return false; 
251     }
252 
253     
254 
255 
256 
257 
258     boolean addTransactionMarker() throws IOException {
259         
260         
261         
262         
263         
264         return false; 
265     }
266 
267     
268 
269 
270 
271 
272     boolean removeTransactionMarker() throws IOException {
273         
274         
275         
276         
277         
278         return false; 
279     }
280 
281 }