1   
2   package eu.fbk.knowledgestore.elastic;
3   
4   import com.google.common.collect.Iterables;
5   import com.google.common.collect.Range;
6   import eu.fbk.knowledgestore.data.Record;
7   import eu.fbk.knowledgestore.data.Stream;
8   import eu.fbk.knowledgestore.data.XPath;
9   import eu.fbk.knowledgestore.datastore.DataTransaction;
10  import eu.fbk.knowledgestore.runtime.DataCorruptedException;
11  import eu.fbk.knowledgestore.vocabulary.KS;
12  import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
13  import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
14  import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
15  import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
16  import org.elasticsearch.action.bulk.BulkItemResponse;
17  import org.elasticsearch.action.bulk.BulkProcessor;
18  import org.elasticsearch.action.bulk.BulkRequest;
19  import org.elasticsearch.action.bulk.BulkResponse;
20  import org.elasticsearch.action.count.CountRequestBuilder;
21  import org.elasticsearch.action.count.CountResponse;
22  import org.elasticsearch.action.delete.DeleteRequest;
23  import org.elasticsearch.action.get.*;
24  import org.elasticsearch.action.get.MultiGetRequest.Item;
25  import org.elasticsearch.action.get.MultiGetResponse.Failure;
26  import org.elasticsearch.action.index.IndexRequest;
27  import org.elasticsearch.action.search.SearchRequestBuilder;
28  import org.elasticsearch.action.search.SearchResponse;
29  import org.elasticsearch.action.update.UpdateRequest;
30  import org.elasticsearch.client.Client;
31  import org.elasticsearch.common.unit.TimeValue;
32  import org.elasticsearch.common.xcontent.XContentBuilder;
33  import org.elasticsearch.index.query.FilterBuilder;
34  import org.elasticsearch.index.query.FilterBuilders;
35  import org.elasticsearch.index.query.QueryBuilder;
36  import org.elasticsearch.index.query.QueryBuilders;
37  import org.elasticsearch.indices.IndexMissingException;
38  import org.openrdf.model.URI;
39  import org.slf4j.Logger;
40  import org.slf4j.LoggerFactory;
41  
42  import javax.annotation.Nullable;
43  import java.io.IOException;
44  import java.util.*;
45  import java.util.concurrent.TimeUnit;
46  import java.util.concurrent.atomic.AtomicReference;
47  
48  
49  
50  
51  
52  public final class DataTransactionElastic implements DataTransaction{
53      private static final Logger LOGGER = LoggerFactory.getLogger(DataTransactionElastic.class);
54      private final Client client; 
55      
56      private BulkProcessor bulk; 
57      
58      private final ElasticConfigurations configs; 
59      private final MappingHandler mapper; 
60      private final URIHandler uriHandler; 
61      private final AtomicReference<RuntimeException> bulkException;
62      
63      
64  
65  
66  
67  
68      public DataTransactionElastic(ElasticConfigurations configs, Client client, MappingHandler mapper, URIHandler uriHandler) {
69          this.configs = configs;
70          this.client = client;
71          this.mapper = mapper;
72          this.uriHandler = uriHandler;
73          bulkException = new AtomicReference<>();
74          bulk = null;
75      }
76      
77      
78  
79  
80  
81  
82      private String getTypeAsString(URI type){
83          if(type.equals(KS.MENTION)) return "mention";
84          if(type.equals(KS.RESOURCE)) return "resource";
85          throw new IllegalArgumentException("unknow type: " + type.toString());
86      }
87      
88      
89      
90  
91  
92  
93  
94      private void initBulk(){
95          LOGGER.trace("starting a new bulk");
96          BulkProcessor.Builder builder = BulkProcessor.builder(client, new BulkProcessor.Listener(){
97              @Override
98              public void beforeBulk(long l, BulkRequest br) {
99                  client.admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet();
100                 LOGGER.trace("starting bulk operation with {} request", br.numberOfActions());
101             }
102             
103             @Override
104             public void afterBulk(long l, BulkRequest br, BulkResponse br1) {
105                 LOGGER.trace("finished bulk operation with {} request", br.numberOfActions());
106                 if(br1.hasFailures()){
107                     String log = "errors in bulk execution:";
108                     for(BulkItemResponse item : br1){
109                         if(item.isFailed())
110                             log += "\n\t id: " + item.getId() + " ; message: " + item.getFailureMessage();
111                     }
112                     LOGGER.error(log);
113                     
114                 }
115             }
116             
117             @Override
118             public void afterBulk(long l, BulkRequest br, Throwable thrwbl){
119                 LOGGER.trace("finished bulk operation with {} request and errors", br.numberOfActions());
120                 bulkException.set(new RuntimeException("Caught exception in bulk: " + br + ", failure: " + thrwbl, thrwbl));
121             }
122         });
123         
124         if(configs.getBulkSize() != null) builder.setBulkSize(configs.getBulkSize());
125         
126         if(configs.getConcurrentRequests() != -1) builder.setConcurrentRequests(configs.getConcurrentRequests());
127         
128         bulk = builder.build();
129     }
130     
131     private void checkNotFailed(){
132         RuntimeException exception = bulkException.get();
133         if(exception != null){
134             if(!(exception instanceof IllegalStateException))
135                 bulkException.set(new IllegalStateException("previous bulk operation failed"));
136             throw exception;
137         }
138     }
139     
140     
141 
142 
143 
144 
145 
146     private void flushBulk(TimeValue time) throws IllegalStateException{
147         if(bulk != null){
148             LOGGER.debug("flushing bulk");
149             try{
150                 bulk.flush();
151                 if(!bulk.awaitClose(time.getMillis(), TimeUnit.MILLISECONDS))
152                     throw new RuntimeException("bulk request did not completed in " + time.getMillis() + TimeUnit.MILLISECONDS);
153                 
154                 bulk = null; 
155 
156                 RefreshResponse fr = client.admin().indices().refresh(new RefreshRequest(configs.getIndexName())).actionGet();
157                 LOGGER.debug(String.format("Flush: %s failed,  %s successful, %s total",fr.getFailedShards(),fr.getSuccessfulShards(),fr.getTotalShards()));
158             }catch(InterruptedException ex){
159                 throw new IllegalStateException("bulk execution interrupted", ex);
160             }
161         }
162         checkNotFailed();
163     }
164     
165     
166 
167 
168     private void waitYellowStatus(){
169         client.admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet();
170     }
171     
172     
173 
174 
175 
176 
177 
178 
179 
180 
181 
182     @Override
183     public Stream<Record> lookup(URI type, Set<? extends URI> ids,  @Nullable Set<? extends URI> properties) throws IOException, IllegalArgumentException, IllegalStateException {
184         LOGGER.debug("lookup, type: " + type + " ; ids: " + ids + " ; properties: " + properties);
185         checkNotFailed();
186         if(ids.isEmpty()){
187             return Stream.create();
188         }
189 
190         this.flushBulk(configs.getBulkTime());
191         
192         MultiGetRequestBuilder request = client.prepareMultiGet();
193         
194         String typeName = getTypeAsString(type);
195         
196         for(URI id : ids){ 
197             Item item = new MultiGetRequest.Item(configs.getIndexName(), typeName, uriHandler.encode(id));
198             request.add(item);
199         }
200         LOGGER.debug("multiGetRequest: "+ request);
201         this.waitYellowStatus();
202         
203 
204         MultiGetResponse response = request.execute().actionGet();
205         ArrayList<Record> records = new ArrayList<>();
206         
207         for(MultiGetItemResponse item : response) {
208             if(item.isFailed()){
209                 Failure failure = item.getFailure();
210                 LOGGER.error("failed MultiGet: id: " + failure.getId() + " message: " + failure.getMessage());
211             }else{
212                 GetResponse tmp = item.getResponse();
213                 Record recToAdd =  Utility.deserialize(tmp, mapper, uriHandler);
214                 if(recToAdd != null){
215                     if(properties != null)
216                         recToAdd = recToAdd.retain(Iterables.toArray(properties, URI.class));
217                     records.add(recToAdd);
218                 }
219             }
220         }
221         return Stream.create(records);
222     }
223     
224     
225     
226 
227 
228 
229 
230     private FilterBuilder buildFilter(XPath condition, boolean partialAccept) throws IOException{
231         LOGGER.debug("buildFilter condition: " + condition + " ; partialAccept: " + partialAccept);
232         Map<URI, Set<Object>> map = new HashMap<>();
233         XPath remain = condition.decompose(map); 
234         
235         if((!partialAccept && remain != null) || map.isEmpty()){
236             LOGGER.debug("entrySet number: " + map.size());
237             return null;
238         }
239         List<FilterBuilder> filters = new ArrayList<>();
240         
241         for(Map.Entry el : map.entrySet()){
242             LOGGER.debug("analyze key: " + el.getKey());
243             String elKey = null;
244             if(el.getKey() instanceof URI)
245                 elKey = uriHandler.encode((URI)el.getKey());
246             else{
247                 
248                 throw new IllegalArgumentException("found a key in the xPath map that is not a URI");
249             }
250             
251             if(mapper.getValueClass(elKey).equals(byte[].class)){ 
252                 LOGGER.debug("byte[], ignore");
253                 if(!partialAccept) return null;
254                 
255             }else{
256                 LOGGER.debug("analizing key: " + elKey);
257                 ArrayList<FilterBuilder> tmpFilters = new ArrayList<>();
258                 LOGGER.debug("number of orFilters: " + ((Set<Object>)el.getValue()).size());
259                 for(Object item : (Set<Object>)el.getValue()){
260                     FilterBuilder tmpFilter = null;
261                     LOGGER.debug("analizing value: " + item.toString() + " ; class: " + item.getClass());
262                     if(item instanceof Range){
263                         tmpFilter = Utility.buildRangeFilter(elKey, (Range)item, mapper, uriHandler);
264                     }else{
265                         LOGGER.debug("term filter: " + item.getClass());
266                         tmpFilter = Utility.buildTermFilter(elKey, item, mapper, uriHandler);
267                     }
268                     if(tmpFilter != null){ 
269                         tmpFilters.add(tmpFilter);
270                     }else{
271                         LOGGER.debug("creation of the filter on {} failed", condition);
272                         if(!partialAccept) return null;
273                     }
274                 }
275                 filters.add(FilterBuilders.orFilter(Iterables.toArray(tmpFilters, FilterBuilder.class))); 
276             }
277         }
278         FilterBuilder mainFilter = null;
279         if(!filters.isEmpty())
280             mainFilter = FilterBuilders.andFilter(Iterables.toArray(filters, FilterBuilder.class)); 
281         
282         return mainFilter;
283     }
284     
285     
286 
287 
288 
289 
290 
291 
292 
293 
294 
295     @Override
296     public Stream<Record> retrieve(URI type, @Nullable XPath condition, @Nullable Set<? extends URI> properties) throws IOException, IllegalArgumentException, IllegalStateException {
297         checkNotFailed();
298         LOGGER.debug("retrieve type: " + type + " ; condition: " + condition + " ; properties: " + properties);
299         
300         this.flushBulk(configs.getBulkTime());
301         
302         String typeName = getTypeAsString(type);
303         
304         SearchRequestBuilder responseBuilder;
305         QueryBuilder mainQuery;
306         
307         if(condition == null){ 
308             LOGGER.debug("match all query");
309             mainQuery = QueryBuilders.constantScoreQuery(FilterBuilders.matchAllFilter());
310             
311         }else{ 
312             LOGGER.debug("try to build a filter of condition: " + condition);
313             FilterBuilder filter = buildFilter(condition, true);
314             if(filter == null){ 
315                 LOGGER.debug("prefilter failed");
316                 filter = FilterBuilders.matchAllFilter();
317             }
318             mainQuery = QueryBuilders.constantScoreQuery(filter);
319         }
320         
321         LOGGER.debug("Query: " + mainQuery);
322         responseBuilder = client.prepareSearch(configs.getIndexName()).setTypes(typeName)
323                 .setQuery(mainQuery).setScroll(configs.getTimeout());
324         
325         this.waitYellowStatus();
326         
327         SearchResponse response = responseBuilder.execute().actionGet();
328         Stream<Record> res = new SearchResponseStream(response, client, configs.getTimeout(), mapper, uriHandler);
329         
330         if(condition != null){
331             LOGGER.debug("post-filtering: " + condition);
332             res = res.filter(condition.asPredicate(), 0); 
333         }
334         
335         if(properties != null){ 
336             final URI[] propURIs = Iterables.toArray(properties, URI.class);
337             res = res.transform((Record r) -> {return r.retain(propURIs);}, 0);
338         }        
339         return res;
340     }
341     
342     
343 
344 
345 
346 
347 
348 
349 
350 
351     @Override
352     public long count(URI type, @Nullable XPath condition) throws IOException, IllegalArgumentException, IllegalStateException {
353         checkNotFailed();
354         LOGGER.debug("count type: " + type + " ; condition: " + condition);
355         
356 
357         this.flushBulk(configs.getBulkTime());
358         
359         
360         String typeName = getTypeAsString(type);
361         
362         CountRequestBuilder responseBuilder = null;
363         
364         if(condition == null){ 
365             responseBuilder = client.prepareCount(configs.getIndexName()).setTypes(typeName)
366                     .setQuery(QueryBuilders.matchAllQuery());
367             
368         }else{ 
369             FilterBuilder mainFilter = buildFilter(condition, false);
370             if(mainFilter != null){
371                 QueryBuilder mainQuery = QueryBuilders.constantScoreQuery(mainFilter);
372                 responseBuilder = client.prepareCount(configs.getIndexName()).setTypes(typeName)
373                         .setQuery(mainQuery);
374             }else{ 
375                 this.waitYellowStatus();
376                 LOGGER.debug("can not prefilter all, have to do a matchall and postfilter");
377                 Stream<Record> stream = this.retrieve(type, condition, new HashSet<>());
378                 return stream.count();
379             }
380         }
381         this.waitYellowStatus();
382         CountResponse response = responseBuilder.execute().actionGet();
383         return response.getCount();
384     }
385     
386     @Override
387     public Stream<Record> match(Map<URI, XPath> conditions, Map<URI, Set<URI>> ids, Map<URI, Set<URI>> properties) throws IOException, IllegalStateException {
388         throw new UnsupportedOperationException("Not supported yet.");
389     }
390     
391     
392 
393 
394 
395 
396 
397 
398     @Override
399     public void store(URI type, Record record) throws IOException, IllegalStateException {
400         checkNotFailed();
401         if(bulk == null) 
402             this.initBulk();
403         
404         String typeName = getTypeAsString(type);
405 
406         XContentBuilder source = Utility.serialize(record, uriHandler);
407         LOGGER.trace("storing\n\trecord: " + record.toString(null, true) + "\n\t-> serialized: " + source.string());
408         IndexRequest indexRequest = new IndexRequest(configs.getIndexName(), typeName, uriHandler.encode(record.getID())).source(source);
409         UpdateRequest updateRequest = new UpdateRequest(configs.getIndexName(), typeName,  uriHandler.encode(record.getID())).doc(source).upsert(indexRequest);
410         bulk.add(updateRequest);
411     }
412     
413     
414 
415 
416 
417 
418 
419 
420     @Override
421     public void delete(URI type, URI id) throws IOException, IllegalStateException {
422         checkNotFailed();
423         LOGGER.trace("delete document type: " + type + " ; id: " + id);
424         
425         String typeName = getTypeAsString(type);
426         String indexName = configs.getIndexName();
427         String idStr = uriHandler.encode(id);
428         DeleteRequest deleteRequest = new DeleteRequest(indexName, typeName, idStr);
429         
430         if(bulk == null) 
431             this.initBulk();
432         bulk.add(deleteRequest);
433     }
434     
435     
436 
437 
438 
439 
440 
441 
442     @Override
443     public void end(boolean commit) throws DataCorruptedException, IOException, IllegalStateException {
444         checkNotFailed();
445         LOGGER.debug("end");
446         if(commit){
447             LOGGER.debug("flushing pending operation");
448             this.flushBulk(configs.getBulkTime());
449             LOGGER.debug("done");
450         }
451     }
452     
453     
454 
455 
456 
457     public void optimizeIndex(int segNumber){
458         client.admin().indices().prepareOptimize(configs.getIndexName()).setMaxNumSegments(segNumber).setFlush(true).execute().actionGet();
459     }
460     
461     public void deleteAllIndexContent(){
462         this.waitYellowStatus();
463         if(!client.admin().indices().prepareDelete("*").execute().actionGet().isAcknowledged())
464             throw new RuntimeException("can not delete the index ("+configs.getIndexName()+") content ");
465     }
466     public void deleteAll(){
467         this.waitYellowStatus();
468         DeleteIndexResponse response = null;
469         try{
470             response = client.admin().indices().delete(new DeleteIndexRequest(configs.getIndexName())).actionGet();
471         }catch(IndexMissingException ex){
472             return;
473         }
474         if(!response.isAcknowledged())
475             throw new RuntimeException("can not delete the index: " + configs.getIndexName());
476     }
477 }