1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package eu.fbk.knowledgestore.elastic;
17
18 import com.google.common.collect.BiMap;
19 import com.google.common.collect.HashBiMap;
20 import com.google.common.collect.Maps;
21 import org.elasticsearch.action.ShardOperationFailedException;
22 import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
23 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
24 import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
25 import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
26 import org.elasticsearch.action.admin.indices.flush.FlushResponse;
27 import org.elasticsearch.action.index.IndexRequest;
28 import org.elasticsearch.action.index.IndexResponse;
29 import org.elasticsearch.action.search.SearchResponse;
30 import org.elasticsearch.client.Client;
31 import org.elasticsearch.common.settings.ImmutableSettings;
32 import org.elasticsearch.common.unit.TimeValue;
33 import org.elasticsearch.common.xcontent.XContentBuilder;
34 import org.elasticsearch.index.query.FilterBuilders;
35 import org.elasticsearch.index.query.QueryBuilders;
36 import org.elasticsearch.search.SearchHit;
37 import org.openrdf.model.URI;
38 import org.openrdf.model.impl.URIImpl;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41
42 import java.io.IOException;
43 import java.util.HashSet;
44 import java.util.Map;
45 import java.util.Set;
46 import java.util.concurrent.atomic.AtomicInteger;
47
48 import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
49
50
51
52
53
54 public class URIHandler {
55 private static final Logger LOGGER = LoggerFactory.getLogger(URIHandler.class);
56 private static final String indexName = "urimappingindex";
57 private static final String extendedURIPropName = "ex";
58 private static final TimeValue scrollTimeOut = TimeValue.timeValueMinutes(1);
59 private final HashSet<URI> strongCompress;
60 private static final char URI_ESCAPE = (char)1;
61 private final String[] types;
62 private final Client client;
63 private final AtomicInteger counter;
64 private final BiMap<String, URI> mapper;
65
66 public URIHandler(Set<URI> weakCompression, HashSet<URI> strongCompression, Client client) throws IOException{
67
68 this.strongCompress = strongCompression;
69 this.client = client;
70 counter = new AtomicInteger(0);
71 types = new String[2];
72 types[0] = "s";
73 types[1] = "w";
74 BiMap<String, URI> map = HashBiMap.create();
75 mapper = Maps.synchronizedBiMap(map);
76 initMap(weakCompression);
77 client.admin().indices().prepareOptimize(indexName).setMaxNumSegments(1).setFlush(true).execute().actionGet();
78 }
79
80
81
82
83
84
85
86
87
88
89 private void initMap(Set<URI> nameSpaces) throws IOException{
90 if(isIndexExists()){
91 LOGGER.debug("index: " + indexName + " already exists");
92 loadMappingFromIndex();
93 }else{
94 createIndex();
95 LOGGER.debug("index: " + indexName + " created");
96 }
97 addWeakCompressions(nameSpaces);
98 }
99
100
101
102
103 private void loadMappingFromIndex(){
104 SearchResponse response = client.prepareSearch(indexName).setTypes(types)
105 .setQuery(QueryBuilders.constantScoreQuery(FilterBuilders.matchAllFilter()))
106 .setScroll(scrollTimeOut).execute().actionGet();
107 while(true){
108 for(SearchHit hit : response.getHits().getHits()){
109 if(hit.isSourceEmpty())
110 throw new UnsupportedOperationException("deserialization of projected object not supported, URIHandler");
111
112 String compressedURI = hit.getId();
113 Map<String, Object> document = hit.getSource();
114 URI extendedURI = new URIImpl((String)document.get(extendedURIPropName));
115 int uriInteger = Integer.parseInt(compressedURI);
116 if(uriInteger > counter.get()) counter.set(uriInteger);
117 mapper.put(compressedURI, extendedURI);
118 }
119 response = client.prepareSearchScroll(response.getScrollId()).setScroll(scrollTimeOut).execute().actionGet();
120 if (response.getHits().getHits().length == 0){
121 break;
122 }
123 }
124 counter.incrementAndGet();
125
126 LOGGER.debug("compression: " + mapper.toString());
127 }
128
129
130
131
132
133 private void createIndex() throws IOException{
134 CreateIndexRequestBuilder createRequest = client.admin().indices().prepareCreate(indexName)
135 .setSettings(ImmutableSettings.settingsBuilder().put("index.refresh_interval", -1).put().build());
136
137 for(String typeName : types){
138 XContentBuilder mapping = jsonBuilder().startObject()
139 .field(typeName).startObject()
140 .field("dynamic", "strict")
141 .field("properties").startObject()
142 .field("_timestamp").startObject().field("enabled", false).endObject()
143 .field("_all").startObject().field("enabled", false).endObject()
144 .field("norms").startObject().field("enabled", false).endObject()
145
146 .field(extendedURIPropName).startObject()
147 .field("type", "string")
148 .field("index", "no").endObject()
149
150 .endObject()
151 .endObject()
152 .endObject();
153
154 createRequest.addMapping(typeName, mapping);
155 LOGGER.trace("uriHandler index: " + indexName + "." + typeName + " mapping: " + mapping.string());
156 }
157 CreateIndexResponse createResponse = createRequest.execute().actionGet();
158 if(!createResponse.isAcknowledged()){
159 throw new RuntimeException("can not create the index: " + indexName);
160 }
161 }
162
163
164
165
166
167
168 private void addWeakCompressions(Set<URI> nameSpaces) throws IOException{
169 for(URI uri : nameSpaces){
170 if(!mapper.containsValue(uri)){
171 store(uri, types[1]);
172 }
173 }
174 }
175
176
177
178
179
180
181
182
183
184
185
186
187
188 private String store(URI extendedUri, String type) throws IOException{
189 String extended = extendedUri.toString();
190 XContentBuilder source = jsonBuilder().startObject().field(extendedURIPropName, extended).endObject();
191 String id;
192 synchronized(this){
193 id = Integer.toString(counter.get());
194
195 IndexRequest indexReq = new IndexRequest(indexName, type, id).source(source);
196 this.waitYellowStatus();
197 IndexResponse indexResponse = client.index(indexReq).actionGet();
198
199 if(!indexResponse.isCreated())
200 throw new IllegalStateException("failed to add the weakCompression of " + source.string() + " in: " + indexName + "." + type);
201 this.waitYellowStatus();
202 FlushResponse flushResponse = client.admin().indices().prepareFlush(indexName).execute().actionGet();
203
204 if(flushResponse.getFailedShards() != 0){
205 String errorMessage = "flush of the idex: " + indexName + " failed:\n";
206 for(ShardOperationFailedException failure : flushResponse.getShardFailures()){
207 errorMessage += "\t shardId: " + failure.shardId() + " ; reason: " + failure.reason() + "\n";
208 }
209 throw new IllegalStateException(errorMessage);
210 }
211
212 mapper.put(Integer.toString(counter.get()), extendedUri);
213 counter.incrementAndGet();
214 }
215 return id;
216 }
217
218
219
220
221
222
223
224 private synchronized String strongCompression(URI extended) throws IOException{
225
226 String compressedNotEscaped = mapper.inverse().get(extended);
227 if(compressedNotEscaped != null)
228 return String.valueOf(URI_ESCAPE) + compressedNotEscaped;
229
230 URI namespace = new URIImpl(extended.getNamespace());
231 if(!strongCompress.contains(namespace))
232 return null;
233
234
235
236
237
238 compressedNotEscaped = store(extended, types[0]);
239 LOGGER.debug(extended.toString() + " --S--> " + compressedNotEscaped);
240
241 return String.valueOf(URI_ESCAPE) + compressedNotEscaped;
242 }
243
244
245
246
247
248
249
250 public String encode(URI uri) throws IOException{
251
252
253 String compressed = strongCompression(uri);
254 if(compressed != null){
255
256 return compressed;
257 }
258
259
260
261 URI nameSpace = new URIImpl(uri.getNamespace());
262
263 String encodedNotEscape = mapper.inverse().get(nameSpace);
264
265 if(encodedNotEscape != null){
266 compressed = String.valueOf(URI_ESCAPE).concat(encodedNotEscape.concat(String.valueOf(URI_ESCAPE))
267 .concat(uri.getLocalName()));
268
269 return compressed;
270 }
271
272 compressed = String.valueOf(URI_ESCAPE).concat(uri.toString());
273
274 return compressed;
275 }
276
277
278
279
280
281
282 public URI decode(String compressed){
283 LOGGER.trace("decompression of: " + compressed);
284
285 if(compressed.length() < 2 || !(compressed.charAt(0) == URI_ESCAPE)){
286 throw new IllegalArgumentException(compressed + " is not a valid URI for decoding. Expected <$escape><number> or <$escape><number><$escape><String> or <$escape><URI>");
287 }
288 compressed = compressed.substring(1);
289
290
291 if(Character.isDigit(compressed.charAt(0))){
292 int pos = compressed.indexOf(Character.toString(URI_ESCAPE), 1);
293 if(pos == -1){
294 URI res = mapper.get(compressed);
295 LOGGER.trace("decompressed as: " + res);
296 return res;
297 }
298
299 String number = compressed.substring(0, pos);
300 String nameSpaceStr = mapper.get(number).toString();
301 String localNameStr = compressed.substring(pos+1);
302 URI res = new URIImpl(nameSpaceStr + localNameStr);
303 LOGGER.trace("decompressed as: " + res);
304 return res;
305 }
306
307 URI res = new URIImpl(compressed);
308 LOGGER.trace("decompressed as: " + res);
309 return res;
310 }
311
312 public boolean isEncodedUri(String str){
313 return str!= null && str.length()>1 && str.charAt(0) == URI_ESCAPE;
314 }
315
316 private void waitYellowStatus(){
317 client.admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet();
318 }
319 private boolean isIndexExists(){
320 this.waitYellowStatus();
321 IndicesExistsRequest request = new IndicesExistsRequest(indexName);
322 IndicesExistsResponse response = client.admin().indices().exists(request).actionGet();
323 return response.isExists();
324 }
325
326 public void printMapping(){
327 LOGGER.debug("uri mapping: " + mapper.toString());
328 }
329 }