1
2 package eu.fbk.knowledgestore.elastic;
3
4 import com.google.common.collect.Iterables;
5 import com.google.common.collect.Range;
6 import eu.fbk.knowledgestore.data.Record;
7 import eu.fbk.knowledgestore.data.Stream;
8 import eu.fbk.knowledgestore.data.XPath;
9 import eu.fbk.knowledgestore.datastore.DataTransaction;
10 import eu.fbk.knowledgestore.runtime.DataCorruptedException;
11 import eu.fbk.knowledgestore.vocabulary.KS;
12 import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
13 import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
14 import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
15 import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
16 import org.elasticsearch.action.bulk.BulkItemResponse;
17 import org.elasticsearch.action.bulk.BulkProcessor;
18 import org.elasticsearch.action.bulk.BulkRequest;
19 import org.elasticsearch.action.bulk.BulkResponse;
20 import org.elasticsearch.action.count.CountRequestBuilder;
21 import org.elasticsearch.action.count.CountResponse;
22 import org.elasticsearch.action.delete.DeleteRequest;
23 import org.elasticsearch.action.get.*;
24 import org.elasticsearch.action.get.MultiGetRequest.Item;
25 import org.elasticsearch.action.get.MultiGetResponse.Failure;
26 import org.elasticsearch.action.index.IndexRequest;
27 import org.elasticsearch.action.search.SearchRequestBuilder;
28 import org.elasticsearch.action.search.SearchResponse;
29 import org.elasticsearch.action.update.UpdateRequest;
30 import org.elasticsearch.client.Client;
31 import org.elasticsearch.common.unit.TimeValue;
32 import org.elasticsearch.common.xcontent.XContentBuilder;
33 import org.elasticsearch.index.query.FilterBuilder;
34 import org.elasticsearch.index.query.FilterBuilders;
35 import org.elasticsearch.index.query.QueryBuilder;
36 import org.elasticsearch.index.query.QueryBuilders;
37 import org.elasticsearch.indices.IndexMissingException;
38 import org.openrdf.model.URI;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 import javax.annotation.Nullable;
43 import java.io.IOException;
44 import java.util.*;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.atomic.AtomicReference;
47
48
49
50
51
52 public final class DataTransactionElastic implements DataTransaction{
53 private static final Logger LOGGER = LoggerFactory.getLogger(DataTransactionElastic.class);
54 private final Client client;
55
56 private BulkProcessor bulk;
57
58 private final ElasticConfigurations configs;
59 private final MappingHandler mapper;
60 private final URIHandler uriHandler;
61 private final AtomicReference<RuntimeException> bulkException;
62
63
64
65
66
67
68 public DataTransactionElastic(ElasticConfigurations configs, Client client, MappingHandler mapper, URIHandler uriHandler) {
69 this.configs = configs;
70 this.client = client;
71 this.mapper = mapper;
72 this.uriHandler = uriHandler;
73 bulkException = new AtomicReference<>();
74 bulk = null;
75 }
76
77
78
79
80
81
82 private String getTypeAsString(URI type){
83 if(type.equals(KS.MENTION)) return "mention";
84 if(type.equals(KS.RESOURCE)) return "resource";
85 throw new IllegalArgumentException("unknow type: " + type.toString());
86 }
87
88
89
90
91
92
93
94 private void initBulk(){
95 LOGGER.trace("starting a new bulk");
96 BulkProcessor.Builder builder = BulkProcessor.builder(client, new BulkProcessor.Listener(){
97 @Override
98 public void beforeBulk(long l, BulkRequest br) {
99 client.admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet();
100 LOGGER.trace("starting bulk operation with {} request", br.numberOfActions());
101 }
102
103 @Override
104 public void afterBulk(long l, BulkRequest br, BulkResponse br1) {
105 LOGGER.trace("finished bulk operation with {} request", br.numberOfActions());
106 if(br1.hasFailures()){
107 String log = "errors in bulk execution:";
108 for(BulkItemResponse item : br1){
109 if(item.isFailed())
110 log += "\n\t id: " + item.getId() + " ; message: " + item.getFailureMessage();
111 }
112 LOGGER.error(log);
113
114 }
115 }
116
117 @Override
118 public void afterBulk(long l, BulkRequest br, Throwable thrwbl){
119 LOGGER.trace("finished bulk operation with {} request and errors", br.numberOfActions());
120 bulkException.set(new RuntimeException("Caught exception in bulk: " + br + ", failure: " + thrwbl, thrwbl));
121 }
122 });
123
124 if(configs.getBulkSize() != null) builder.setBulkSize(configs.getBulkSize());
125
126 if(configs.getConcurrentRequests() != -1) builder.setConcurrentRequests(configs.getConcurrentRequests());
127
128 bulk = builder.build();
129 }
130
131 private void checkNotFailed(){
132 RuntimeException exception = bulkException.get();
133 if(exception != null){
134 if(!(exception instanceof IllegalStateException))
135 bulkException.set(new IllegalStateException("previous bulk operation failed"));
136 throw exception;
137 }
138 }
139
140
141
142
143
144
145
146 private void flushBulk(TimeValue time) throws IllegalStateException{
147 if(bulk != null){
148 LOGGER.debug("flushing bulk");
149 try{
150 bulk.flush();
151 if(!bulk.awaitClose(time.getMillis(), TimeUnit.MILLISECONDS))
152 throw new RuntimeException("bulk request did not completed in " + time.getMillis() + TimeUnit.MILLISECONDS);
153
154 bulk = null;
155
156 RefreshResponse fr = client.admin().indices().refresh(new RefreshRequest(configs.getIndexName())).actionGet();
157 LOGGER.debug(String.format("Flush: %s failed, %s successful, %s total",fr.getFailedShards(),fr.getSuccessfulShards(),fr.getTotalShards()));
158 }catch(InterruptedException ex){
159 throw new IllegalStateException("bulk execution interrupted", ex);
160 }
161 }
162 checkNotFailed();
163 }
164
165
166
167
168 private void waitYellowStatus(){
169 client.admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet();
170 }
171
172
173
174
175
176
177
178
179
180
181
182 @Override
183 public Stream<Record> lookup(URI type, Set<? extends URI> ids, @Nullable Set<? extends URI> properties) throws IOException, IllegalArgumentException, IllegalStateException {
184 LOGGER.debug("lookup, type: " + type + " ; ids: " + ids + " ; properties: " + properties);
185 checkNotFailed();
186 if(ids.isEmpty()){
187 return Stream.create();
188 }
189
190 this.flushBulk(configs.getBulkTime());
191
192 MultiGetRequestBuilder request = client.prepareMultiGet();
193
194 String typeName = getTypeAsString(type);
195
196 for(URI id : ids){
197 Item item = new MultiGetRequest.Item(configs.getIndexName(), typeName, uriHandler.encode(id));
198 request.add(item);
199 }
200 LOGGER.debug("multiGetRequest: "+ request);
201 this.waitYellowStatus();
202
203
204 MultiGetResponse response = request.execute().actionGet();
205 ArrayList<Record> records = new ArrayList<>();
206
207 for(MultiGetItemResponse item : response) {
208 if(item.isFailed()){
209 Failure failure = item.getFailure();
210 LOGGER.error("failed MultiGet: id: " + failure.getId() + " message: " + failure.getMessage());
211 }else{
212 GetResponse tmp = item.getResponse();
213 Record recToAdd = Utility.deserialize(tmp, mapper, uriHandler);
214 if(recToAdd != null){
215 if(properties != null)
216 recToAdd = recToAdd.retain(Iterables.toArray(properties, URI.class));
217 records.add(recToAdd);
218 }
219 }
220 }
221 return Stream.create(records);
222 }
223
224
225
226
227
228
229
230 private FilterBuilder buildFilter(XPath condition, boolean partialAccept) throws IOException{
231 LOGGER.debug("buildFilter condition: " + condition + " ; partialAccept: " + partialAccept);
232 Map<URI, Set<Object>> map = new HashMap<>();
233 XPath remain = condition.decompose(map);
234
235 if((!partialAccept && remain != null) || map.isEmpty()){
236 LOGGER.debug("entrySet number: " + map.size());
237 return null;
238 }
239 List<FilterBuilder> filters = new ArrayList<>();
240
241 for(Map.Entry el : map.entrySet()){
242 LOGGER.debug("analyze key: " + el.getKey());
243 String elKey = null;
244 if(el.getKey() instanceof URI)
245 elKey = uriHandler.encode((URI)el.getKey());
246 else{
247
248 throw new IllegalArgumentException("found a key in the xPath map that is not a URI");
249 }
250
251 if(mapper.getValueClass(elKey).equals(byte[].class)){
252 LOGGER.debug("byte[], ignore");
253 if(!partialAccept) return null;
254
255 }else{
256 LOGGER.debug("analizing key: " + elKey);
257 ArrayList<FilterBuilder> tmpFilters = new ArrayList<>();
258 LOGGER.debug("number of orFilters: " + ((Set<Object>)el.getValue()).size());
259 for(Object item : (Set<Object>)el.getValue()){
260 FilterBuilder tmpFilter = null;
261 LOGGER.debug("analizing value: " + item.toString() + " ; class: " + item.getClass());
262 if(item instanceof Range){
263 tmpFilter = Utility.buildRangeFilter(elKey, (Range)item, mapper, uriHandler);
264 }else{
265 LOGGER.debug("term filter: " + item.getClass());
266 tmpFilter = Utility.buildTermFilter(elKey, item, mapper, uriHandler);
267 }
268 if(tmpFilter != null){
269 tmpFilters.add(tmpFilter);
270 }else{
271 LOGGER.debug("creation of the filter on {} failed", condition);
272 if(!partialAccept) return null;
273 }
274 }
275 filters.add(FilterBuilders.orFilter(Iterables.toArray(tmpFilters, FilterBuilder.class)));
276 }
277 }
278 FilterBuilder mainFilter = null;
279 if(!filters.isEmpty())
280 mainFilter = FilterBuilders.andFilter(Iterables.toArray(filters, FilterBuilder.class));
281
282 return mainFilter;
283 }
284
285
286
287
288
289
290
291
292
293
294
295 @Override
296 public Stream<Record> retrieve(URI type, @Nullable XPath condition, @Nullable Set<? extends URI> properties) throws IOException, IllegalArgumentException, IllegalStateException {
297 checkNotFailed();
298 LOGGER.debug("retrieve type: " + type + " ; condition: " + condition + " ; properties: " + properties);
299
300 this.flushBulk(configs.getBulkTime());
301
302 String typeName = getTypeAsString(type);
303
304 SearchRequestBuilder responseBuilder;
305 QueryBuilder mainQuery;
306
307 if(condition == null){
308 LOGGER.debug("match all query");
309 mainQuery = QueryBuilders.constantScoreQuery(FilterBuilders.matchAllFilter());
310
311 }else{
312 LOGGER.debug("try to build a filter of condition: " + condition);
313 FilterBuilder filter = buildFilter(condition, true);
314 if(filter == null){
315 LOGGER.debug("prefilter failed");
316 filter = FilterBuilders.matchAllFilter();
317 }
318 mainQuery = QueryBuilders.constantScoreQuery(filter);
319 }
320
321 LOGGER.debug("Query: " + mainQuery);
322 responseBuilder = client.prepareSearch(configs.getIndexName()).setTypes(typeName)
323 .setQuery(mainQuery).setScroll(configs.getTimeout());
324
325 this.waitYellowStatus();
326
327 SearchResponse response = responseBuilder.execute().actionGet();
328 Stream<Record> res = new SearchResponseStream(response, client, configs.getTimeout(), mapper, uriHandler);
329
330 if(condition != null){
331 LOGGER.debug("post-filtering: " + condition);
332 res = res.filter(condition.asPredicate(), 0);
333 }
334
335 if(properties != null){
336 final URI[] propURIs = Iterables.toArray(properties, URI.class);
337 res = res.transform((Record r) -> {return r.retain(propURIs);}, 0);
338 }
339 return res;
340 }
341
342
343
344
345
346
347
348
349
350
351 @Override
352 public long count(URI type, @Nullable XPath condition) throws IOException, IllegalArgumentException, IllegalStateException {
353 checkNotFailed();
354 LOGGER.debug("count type: " + type + " ; condition: " + condition);
355
356
357 this.flushBulk(configs.getBulkTime());
358
359
360 String typeName = getTypeAsString(type);
361
362 CountRequestBuilder responseBuilder = null;
363
364 if(condition == null){
365 responseBuilder = client.prepareCount(configs.getIndexName()).setTypes(typeName)
366 .setQuery(QueryBuilders.matchAllQuery());
367
368 }else{
369 FilterBuilder mainFilter = buildFilter(condition, false);
370 if(mainFilter != null){
371 QueryBuilder mainQuery = QueryBuilders.constantScoreQuery(mainFilter);
372 responseBuilder = client.prepareCount(configs.getIndexName()).setTypes(typeName)
373 .setQuery(mainQuery);
374 }else{
375 this.waitYellowStatus();
376 LOGGER.debug("can not prefilter all, have to do a matchall and postfilter");
377 Stream<Record> stream = this.retrieve(type, condition, new HashSet<>());
378 return stream.count();
379 }
380 }
381 this.waitYellowStatus();
382 CountResponse response = responseBuilder.execute().actionGet();
383 return response.getCount();
384 }
385
386 @Override
387 public Stream<Record> match(Map<URI, XPath> conditions, Map<URI, Set<URI>> ids, Map<URI, Set<URI>> properties) throws IOException, IllegalStateException {
388 throw new UnsupportedOperationException("Not supported yet.");
389 }
390
391
392
393
394
395
396
397
398 @Override
399 public void store(URI type, Record record) throws IOException, IllegalStateException {
400 checkNotFailed();
401 if(bulk == null)
402 this.initBulk();
403
404 String typeName = getTypeAsString(type);
405
406 XContentBuilder source = Utility.serialize(record, uriHandler);
407 LOGGER.trace("storing\n\trecord: " + record.toString(null, true) + "\n\t-> serialized: " + source.string());
408 IndexRequest indexRequest = new IndexRequest(configs.getIndexName(), typeName, uriHandler.encode(record.getID())).source(source);
409 UpdateRequest updateRequest = new UpdateRequest(configs.getIndexName(), typeName, uriHandler.encode(record.getID())).doc(source).upsert(indexRequest);
410 bulk.add(updateRequest);
411 }
412
413
414
415
416
417
418
419
420 @Override
421 public void delete(URI type, URI id) throws IOException, IllegalStateException {
422 checkNotFailed();
423 LOGGER.trace("delete document type: " + type + " ; id: " + id);
424
425 String typeName = getTypeAsString(type);
426 String indexName = configs.getIndexName();
427 String idStr = uriHandler.encode(id);
428 DeleteRequest deleteRequest = new DeleteRequest(indexName, typeName, idStr);
429
430 if(bulk == null)
431 this.initBulk();
432 bulk.add(deleteRequest);
433 }
434
435
436
437
438
439
440
441
442 @Override
443 public void end(boolean commit) throws DataCorruptedException, IOException, IllegalStateException {
444 checkNotFailed();
445 LOGGER.debug("end");
446 if(commit){
447 LOGGER.debug("flushing pending operation");
448 this.flushBulk(configs.getBulkTime());
449 LOGGER.debug("done");
450 }
451 }
452
453
454
455
456
457 public void optimizeIndex(int segNumber){
458 client.admin().indices().prepareOptimize(configs.getIndexName()).setMaxNumSegments(segNumber).setFlush(true).execute().actionGet();
459 }
460
461 public void deleteAllIndexContent(){
462 this.waitYellowStatus();
463 if(!client.admin().indices().prepareDelete("*").execute().actionGet().isAcknowledged())
464 throw new RuntimeException("can not delete the index ("+configs.getIndexName()+") content ");
465 }
466 public void deleteAll(){
467 this.waitYellowStatus();
468 DeleteIndexResponse response = null;
469 try{
470 response = client.admin().indices().delete(new DeleteIndexRequest(configs.getIndexName())).actionGet();
471 }catch(IndexMissingException ex){
472 return;
473 }
474 if(!response.isAcknowledged())
475 throw new RuntimeException("can not delete the index: " + configs.getIndexName());
476 }
477 }