1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  package eu.fbk.knowledgestore.elastic;
17  
18  import eu.fbk.knowledgestore.data.Handler;
19  import eu.fbk.knowledgestore.data.Record;
20  import eu.fbk.knowledgestore.data.Stream;
21  import org.elasticsearch.action.search.SearchResponse;
22  import org.elasticsearch.client.Client;
23  import org.elasticsearch.common.unit.TimeValue;
24  import org.elasticsearch.search.SearchHit;
25  import org.openrdf.model.URI;
26  
27  
28  
29  
30  
31  public class SearchResponseStream extends Stream<Record>{
32      SearchResponse response;
33      Client client;
34      TimeValue timeout;
35      MappingHandler mapper;
36      URIHandler uriHandler;
37      URI[] properties;
38      
39      
40  
41  
42  
43  
44  
45      SearchResponseStream(SearchResponse res, Client client, TimeValue timeout,  MappingHandler mapper, URIHandler handler){
46          response = res;
47          this.client = client;
48          this.timeout = timeout;
49          this.mapper = mapper;
50          this.uriHandler = handler;
51      }
52      
53      @Override
54      protected void doToHandler(final Handler<? super Record> handler) throws Throwable {
55          while(true){            
56              for(SearchHit hit : response.getHits().getHits()){
57                  Record rec;
58                  rec = Utility.deserialize(hit, mapper, uriHandler);
59                  if(Thread.interrupted()){
60                      handler.handle(null);
61                      return;
62                  }
63                  handler.handle(rec);
64              }
65              response = client.prepareSearchScroll(response.getScrollId()).setScroll(timeout).execute().actionGet();
66              
67              if (response.getHits().getHits().length == 0){
68                  handler.handle(null);
69                  break;
70              }
71          }
72      }
73  }