1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package eu.fbk.knowledgestore.elastic;
17
18 import eu.fbk.knowledgestore.data.Handler;
19 import eu.fbk.knowledgestore.data.Record;
20 import eu.fbk.knowledgestore.data.Stream;
21 import org.elasticsearch.action.search.SearchResponse;
22 import org.elasticsearch.client.Client;
23 import org.elasticsearch.common.unit.TimeValue;
24 import org.elasticsearch.search.SearchHit;
25 import org.openrdf.model.URI;
26
27
28
29
30
31 public class SearchResponseStream extends Stream<Record>{
32 SearchResponse response;
33 Client client;
34 TimeValue timeout;
35 MappingHandler mapper;
36 URIHandler uriHandler;
37 URI[] properties;
38
39
40
41
42
43
44
45 SearchResponseStream(SearchResponse res, Client client, TimeValue timeout, MappingHandler mapper, URIHandler handler){
46 response = res;
47 this.client = client;
48 this.timeout = timeout;
49 this.mapper = mapper;
50 this.uriHandler = handler;
51 }
52
53 @Override
54 protected void doToHandler(final Handler<? super Record> handler) throws Throwable {
55 while(true){
56 for(SearchHit hit : response.getHits().getHits()){
57 Record rec;
58 rec = Utility.deserialize(hit, mapper, uriHandler);
59 if(Thread.interrupted()){
60 handler.handle(null);
61 return;
62 }
63 handler.handle(rec);
64 }
65 response = client.prepareSearchScroll(response.getScrollId()).setScroll(timeout).execute().actionGet();
66
67 if (response.getHits().getHits().length == 0){
68 handler.handle(null);
69 break;
70 }
71 }
72 }
73 }