1 package eu.fbk.knowledgestore.datastore;
2
3 import com.google.common.collect.Iterables;
4 import com.zaxxer.hikari.HikariConfig;
5 import com.zaxxer.hikari.HikariDataSource;
6 import eu.fbk.knowledgestore.data.Record;
7 import eu.fbk.knowledgestore.data.Stream;
8 import eu.fbk.knowledgestore.data.XPath;
9 import eu.fbk.knowledgestore.runtime.DataCorruptedException;
10 import eu.fbk.knowledgestore.vocabulary.KS;
11 import org.openrdf.model.URI;
12 import org.slf4j.Logger;
13 import org.slf4j.LoggerFactory;
14
15 import javax.annotation.Nullable;
16 import java.io.*;
17 import java.sql.*;
18 import java.util.*;
19
20
21
22
23
24
25
26
27
28 public class MySQLDataStore implements DataStore {
29
30
31
32
33
34 static Logger logger = LoggerFactory.getLogger(MySQLDataStore.class);
35 public HikariDataSource dataSource;
36 final HikariConfig config = new HikariConfig();
37
38 public MySQLDataStore(String host, String username, String password, String databaseName) {
39 config.setMinimumIdle(2);
40 config.setMaximumPoolSize(10);
41 config.setConnectionTimeout(30000);
42 config.setIdleTimeout(600000);
43 config.setMaxLifetime(1800000);
44 config.setLeakDetectionThreshold(600000);
45
46 config.setDataSourceClassName("com.mysql.jdbc.jdbc2.optional.MysqlDataSource");
47 config.addDataSourceProperty("serverName", host);
48 config.addDataSourceProperty("port", "3306");
49 config.addDataSourceProperty("databaseName", databaseName);
50 config.addDataSourceProperty("user", username);
51 config.addDataSourceProperty("password", password);
52
53 config.addDataSourceProperty("cachePrepStmts", true);
54 config.addDataSourceProperty("prepStmtCacheSize", 250);
55 config.addDataSourceProperty("prepStmtCacheSqlLimit", 2048);
56 config.addDataSourceProperty("useServerPrepStmts", true);
57
58
59
60
61
62
63
64
65
66
67 }
68
69 public class MySQLTransaction implements DataTransaction {
70
71 private Connection con;
72 boolean readOnly;
73
74 private final static String insertQuery = "INSERT INTO $tableName (`key`, `value`) VALUES (MD5(?), ?) ON DUPLICATE KEY UPDATE `value` = VALUES(`value`)";
75 private final static String selectQuery = "SELECT `value` FROM $tableName WHERE `key` = MD5(?)";
76 private final static String deleteQuery = "DELETE FROM $tableName WHERE `key` = MD5(?)";
77 private final static String countQuery = "SELECT COUNT(*) FROM $tableName";
78 private final static String selectAllQuery = "SELECT `value` FROM $tableName";
79
80 HashMap<URI, PreparedStatement> insertBatchStatements = new HashMap<>();
81
82 public MySQLTransaction(boolean readOnly) throws SQLException {
83 this.readOnly = readOnly;
84 this.con = dataSource.getConnection();
85 this.con.setAutoCommit(false);
86
87 insertBatchStatements.put(KS.MENTION, con.prepareStatement(insertQuery.replace("$tableName", "mentions")));
88 insertBatchStatements.put(KS.RESOURCE, con.prepareStatement(insertQuery.replace("$tableName", "resources")));
89
90
91 }
92
93 private void connect(String dbUser, String dbPass) throws SQLException {
94
95 }
96
97 private String getTableName(URI type) throws IOException {
98 if (type.equals(KS.MENTION)) {
99 return "mentions";
100 }
101 else if (type.equals(KS.RESOURCE)) {
102 return "resources";
103 }
104 throw new IOException(String.format("Unknown URI: %s", type));
105 }
106
107 private byte[] serializeRecord(Record record) throws IOException {
108 ObjectOutput out = null;
109 byte[] returnBytes;
110
111 try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
112 out = new ObjectOutputStream(bos);
113 out.writeObject(record);
114 returnBytes = bos.toByteArray();
115 } finally {
116 if (out != null) {
117 out.close();
118 }
119 }
120
121 return returnBytes;
122 }
123
124 private Record unserializeRecord(byte[] bytes) throws IOException {
125 ObjectInput in = null;
126 Record returnRecord;
127
128 try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes)) {
129 in = new ObjectInputStream(bis);
130 try {
131 returnRecord = (Record) in.readObject();
132 } catch (ClassNotFoundException e) {
133 throw new IOException(e);
134 }
135 } finally {
136 if (in != null) {
137 in.close();
138 }
139 }
140
141 return returnRecord;
142 }
143
144 @Override
145 public Stream<Record> lookup(URI type, Set<? extends URI> ids, @Nullable Set<? extends URI> properties) throws IOException, IllegalArgumentException, IllegalStateException {
146 String tableName = getTableName(type);
147 List<Record> returns = new ArrayList<>();
148
149 for (URI id : ids) {
150 String uri;
151 try {
152 uri = id.toString();
153 } catch (NullPointerException e) {
154 throw new IOException(e);
155 }
156
157 logger.debug(String.format("Selecting %s", uri));
158 String query = selectQuery.replace("$tableName", tableName);
159 try {
160 PreparedStatement stmt = con.prepareStatement(query);
161 stmt.setString(1, uri);
162
163 ResultSet set = stmt.executeQuery();
164
165 while (set.next()) {
166 Record r = unserializeRecord(set.getBytes("value"));
167 if (properties != null) {
168 r.retain(Iterables.toArray(properties, URI.class));
169 }
170 returns.add(r);
171 }
172 } catch (SQLException e) {
173 throw new IOException(e);
174 }
175
176 }
177
178 return Stream.create(returns);
179 }
180
181 @Override
182 public Stream<Record> retrieve(URI type, @Nullable XPath condition, @Nullable Set<? extends URI> properties) throws IOException, IllegalArgumentException, IllegalStateException {
183 String tableName = getTableName(type);
184 List<Record> returns = new ArrayList<>();
185
186 logger.debug("Retrieving all lines");
187 String query = selectAllQuery.replace("$tableName", tableName);
188
189 try {
190 Statement statement = con.createStatement();
191 ResultSet resultSet = statement.executeQuery(query);
192
193 while (resultSet.next()) {
194 Record r = unserializeRecord(resultSet.getBytes("value"));
195 if (condition != null && !condition.evalBoolean(r)) {
196 continue;
197 }
198
199 if (properties != null) {
200 r.retain(Iterables.toArray(properties, URI.class));
201 }
202 returns.add(r);
203 }
204 } catch (SQLException e) {
205 throw new IOException(e);
206 }
207
208 return Stream.create(returns);
209 }
210
211 @Override
212 public long count(URI type, @Nullable XPath condition) throws IOException, IllegalArgumentException, IllegalStateException {
213 String tableName = getTableName(type);
214 logger.debug("Counting rows");
215 String query = countQuery.replace("$tableName", tableName);
216
217 try {
218 Statement statement = con.createStatement();
219 ResultSet resultSet = statement.executeQuery(query);
220
221 if (resultSet.next()) {
222 return resultSet.getLong(1);
223 }
224 } catch (SQLException e) {
225 throw new IOException(e);
226 }
227
228 throw new IOException();
229 }
230
231 @Override
232 public Stream<Record> match(Map<URI, XPath> conditions, Map<URI, Set<URI>> ids, Map<URI, Set<URI>> properties) throws IOException, IllegalStateException {
233 return null;
234 }
235
236 @Override
237 public void store(URI type, Record record) throws IOException, IllegalStateException {
238
239
240
241 String uri;
242 try {
243 uri = record.getID().toString();
244 } catch (NullPointerException e) {
245 throw new IOException(e);
246 }
247
248 logger.debug(String.format("Inserting %s", uri));
249 try {
250
251
252
253
254 insertBatchStatements.get(type).setString(1, uri);
255 insertBatchStatements.get(type).setBytes(2, serializeRecord(record));
256 insertBatchStatements.get(type).addBatch();
257 } catch (SQLException e) {
258 throw new IOException(e);
259 }
260 }
261
262 @Override
263 public void delete(URI type, URI id) throws IOException, IllegalStateException {
264 String tableName = getTableName(type);
265
266 String uri;
267 try {
268 uri = id.toString();
269 } catch (NullPointerException e) {
270 throw new IOException(e);
271 }
272
273 logger.debug(String.format("Deleting %s", uri));
274 String query = deleteQuery.replace("$tableName", tableName);
275 try {
276 PreparedStatement stmt = con.prepareStatement(query);
277 stmt.setString(1, uri);
278 stmt.executeUpdate();
279 } catch (SQLException e) {
280 throw new IOException(e);
281 }
282 }
283
284 @Override
285 public void end(boolean commit) throws DataCorruptedException, IOException, IllegalStateException {
286 try {
287 if (commit) {
288 for (URI type : insertBatchStatements.keySet()) {
289 insertBatchStatements.get(type).executeBatch();
290 }
291 con.commit();
292 }
293 else {
294 con.rollback();
295 }
296 con.close();
297 } catch (Exception e) {
298 throw new IOException(e);
299 }
300 }
301 }
302
303 @Override
304 public DataTransaction begin(boolean readOnly) throws DataCorruptedException, IOException, IllegalStateException {
305
306 MySQLTransaction ret = null;
307 try {
308 ret = new MySQLTransaction(readOnly);
309 } catch (Exception e) {
310 throw new IOException(e);
311 }
312
313 return ret;
314 }
315
316 @Override
317 public void init() throws IOException, IllegalStateException {
318 dataSource = new HikariDataSource(config);
319
320 }
321
322 @Override
323 public void close() {
324 dataSource.close();
325
326 }
327 }