1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package eu.fbk.knowledgestore.elastic;
17
18 import com.google.common.base.Charsets;
19 import com.google.common.io.Files;
20 import com.google.common.io.Resources;
21 import eu.fbk.knowledgestore.data.Data;
22 import eu.fbk.knowledgestore.data.ParseException;
23 import eu.fbk.knowledgestore.datastore.DataStore;
24 import eu.fbk.knowledgestore.datastore.DataTransaction;
25 import eu.fbk.knowledgestore.runtime.DataCorruptedException;
26 import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
27 import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
28 import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
29 import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
30 import org.elasticsearch.action.admin.indices.mapping.put.PutMappingResponse;
31 import org.elasticsearch.client.Client;
32 import org.elasticsearch.client.transport.TransportClient;
33 import org.elasticsearch.common.transport.TransportAddress;
34 import org.elasticsearch.common.xcontent.XContentFactory;
35 import org.elasticsearch.common.xcontent.XContentParser;
36 import org.elasticsearch.common.xcontent.XContentType;
37 import org.elasticsearch.indices.IndexAlreadyExistsException;
38 import org.elasticsearch.node.Node;
39 import org.openrdf.model.URI;
40 import org.openrdf.model.impl.URIImpl;
41 import org.slf4j.Logger;
42 import org.slf4j.LoggerFactory;
43
44 import java.io.File;
45 import java.io.IOException;
46 import java.net.URL;
47 import java.util.*;
48
49 import static org.elasticsearch.node.NodeBuilder.nodeBuilder;
50
51 public class DataStoreElastic implements DataStore{
52 private static final Logger LOGGER = LoggerFactory.getLogger(DataStoreElastic.class);
53 private Node node;
54 private Client client;
55 private final ElasticConfigurations configs;
56 private MappingHandler mapper;
57 private URIHandler uriHandler;
58
59 public DataStoreElastic(String path){
60 configs = new ElasticConfigurations(path);
61 mapper = null;
62 node = null;
63 client = null;
64 }
65
66 @Override
67 public void init() throws IOException, IllegalStateException {
68 LOGGER.debug("dataStore init");
69 TransportAddress[] addresses = configs.getAddresses();
70 if(addresses == null || addresses.length == 0){
71 node = nodeBuilder().settings(configs.getNodeSettings()).node();
72
73 this.client = node.client();
74 LOGGER.info("starting a local node");
75 }else{
76 node = null;
77 this.client = new TransportClient(configs.getNodeSettings()).addTransportAddresses(addresses);
78 LOGGER.info("starting transportClient");
79 }
80 try{
81 mapper = createIndex(client);
82 uriHandler.printMapping();
83 LOGGER.debug("init done; node: " + node + " ; client: " + client + " ; mapper: " + mapper);
84 }catch(Exception ex){
85 LOGGER.error("errore nella createIndex: " + ex);
86 }
87 }
88
89 @Override
90 public DataTransaction begin(boolean readOnly) throws DataCorruptedException, IOException, IllegalStateException {
91 LOGGER.debug("dataStore begin");
92 if(configs == null || client == null || mapper == null || uriHandler == null)
93 throw new IllegalStateException("can not start a transaction object with null values, have you called the init?\n" +
94 "configs:" + configs + " ; client: " + client + " ; mapper: " + mapper + " ; uriHandler: " + uriHandler);
95 return new DataTransactionElastic(configs, client, mapper, uriHandler);
96 }
97
98
99
100
101 public void optimize(){
102 client.admin().indices().prepareOptimize(configs.getIndexName()).setMaxNumSegments(1).setFlush(true).execute().actionGet();
103 }
104
105
106 @Override
107 public void close() {
108 if(node != null){
109 LOGGER.debug("close local node");
110 node.close();
111 }else{
112 LOGGER.debug("close transportClient");
113 ((TransportClient)client).close();
114 }
115 node = null;
116 }
117
118 private void waitYellowStatus(){
119 LOGGER.debug("wait yellow status...");
120 client.admin().cluster().prepareHealth().setWaitForYellowStatus().execute().actionGet();
121 LOGGER.debug("done");
122 }
123
124 private boolean isIndexExists(String indexName){
125 this.waitYellowStatus();
126 IndicesExistsRequest request = new IndicesExistsRequest(indexName);
127 IndicesExistsResponse response = client.admin().indices().exists(request).actionGet();
128 return response.isExists();
129 }
130
131
132
133
134
135
136 private synchronized MappingHandler createIndex(Client client) throws IOException{
137 String[] types = new String[2];
138 types[0] = "resource";
139 types[1] = "mention";
140
141 HashSet<URI> setWeakCompr = new HashSet<>();
142
143 String weakCompressionPath = configs.getWeakCompressionPath();
144 if(weakCompressionPath != null)
145 setWeakCompr = readNamespaceSetFromFile(configs.getWeakCompressionPath(), setWeakCompr);
146 HashSet<URI> setStrongCompr = new HashSet<>();
147
148 if(!isIndexExists(configs.getIndexName())){
149
150 String[] jsons = new String[2];
151 jsons[0] = readMappingFromFile(configs.getResourceMapping());
152
153 jsons[1] = readMappingFromFile(configs.getMentionMapping());
154
155
156 XContentParser parser0 = XContentFactory.xContent(XContentType.JSON).createParser(jsons[0]);
157 Map<String, Object> map0 = parser0.map();
158
159 XContentParser parser1 = XContentFactory.xContent(XContentType.JSON).createParser(jsons[1]);
160 Map<String, Object> map1 = parser1.map();
161
162
163 setStrongCompr = extractLeafProperties(map0, setStrongCompr);
164
165 setStrongCompr = extractLeafProperties(map1, setStrongCompr);
166
167 String strongCompressionPath = configs.getStrongCompressionPath();
168 if(strongCompressionPath != null)
169 setStrongCompr = readNamespaceSetFromFile(strongCompressionPath, setStrongCompr);
170 LOGGER.debug("strongSet: " + setStrongCompr);
171
172
173 uriHandler = new URIHandler(setWeakCompr, setStrongCompr, client);
174
175
176 LOGGER.debug("compressing mappings");
177 long startTime = System.currentTimeMillis();
178 map0 = compressMapping(map0);
179 map1 = compressMapping(map1);
180 LOGGER.debug("mapping compressed in: " + (System.currentTimeMillis()-startTime) + " ; result:\n\n");
181 LOGGER.debug("resource mapping map: " + map0);
182 LOGGER.debug("mention mapping map: " + map1 +"\n\n\n");
183 jsons[0] = XContentFactory.jsonBuilder().map(map0).string();
184 jsons[1] = XContentFactory.jsonBuilder().map(map1).string();
185 try{
186 CreateIndexRequestBuilder request = client.admin().indices().prepareCreate(configs.getIndexName());
187 setMappings(request, jsons, types);
188 CreateIndexResponse response = request.execute().actionGet();
189 if(!response.isAcknowledged()){
190 throw new RuntimeException("can not create the index: " + configs.getIndexName());
191 }
192 LOGGER.info("index {} created", configs.getIndexName());
193 }catch(IndexAlreadyExistsException ex){
194 LOGGER.debug("index {} already exists, isIndexExists failed", configs.getIndexName());
195 }
196 }else{
197 uriHandler = uriHandler = new URIHandler(setWeakCompr, setStrongCompr, client);
198 LOGGER.info("index {} already exists", configs.getIndexName());
199 }
200
201 return new MappingHandler(client.admin().indices().prepareGetMappings(configs.getIndexName())
202 .setTypes(types).execute().actionGet());
203 }
204
205 private void setMappings(CreateIndexRequestBuilder req, String[] jsons, String[] types){
206 for(int i=0; i< types.length; i++){
207 LOGGER.debug(types[i] + " mapping:\n" + jsons[i]);
208 }
209 int size = jsons.length;
210 if(size > types.length) size = types.length;
211 for(int i=0; i<size; i++){
212 req.addMapping(types[i], jsons[i]);
213 }
214 }
215
216 private void setMappings(String[] types, String[] jsons){
217 int size = jsons.length;
218 if(types.length < jsons.length)
219 size = types.length;
220
221 for(int i=0; i<size; i++){
222 LOGGER.info("setting {} mapping", types[i]);
223 PutMappingResponse mappingMentionResponse = client.admin().indices().preparePutMapping(configs.getIndexName())
224 .setType(types[i]).setSource(jsons[i]).execute().actionGet();
225 if(!mappingMentionResponse.isAcknowledged())
226 throw new RuntimeException("can not set the mapping for the type " + types[i]);
227 }
228
229 }
230
231 private String readMappingFromFile(String fileName) throws IOException{
232 String json;
233 URL url = DataTransactionElastic.class.getClassLoader().getResource(fileName);
234 if(url != null){
235 try {
236 json = Resources.toString(url, Charsets.UTF_8);
237 }catch (IOException ex){
238 throw new IllegalArgumentException("can not find file: " + fileName + " in the ClassPath", ex);
239 }
240 }else{
241 try {
242 json = Files.toString(new File(fileName), Charsets.UTF_8);
243 } catch (IOException ex) {
244 throw new IllegalArgumentException("can not find the specified file: " + fileName, ex);
245 }
246 }
247
248 return json;
249 }
250
251 private HashSet<URI> extractLeafProperties(Map<String, Object> map, HashSet<URI> set){
252 Set<String> propStrings = map.keySet();
253 for(String propStr : propStrings){
254 try{
255 URI propUri = (URI)Data.parseValue(propStr, null);
256 URI propNamespace = new URIImpl(propUri.getNamespace());
257 set.add(propNamespace);
258 }catch(ParseException ex){
259
260 }
261 Object value = map.get(propStr);
262 if(value instanceof Map){
263 set = extractLeafProperties((Map)value, set);
264 }
265 }
266 return set;
267 }
268
269 private HashSet<URI> readNamespaceSetFromFile(String fileName, HashSet<URI> set){
270 List<String> propStrings;
271 URL url = DataTransactionElastic.class.getClassLoader().getResource(fileName);
272 if(url != null){
273 try {
274 propStrings = Resources.readLines(url, Charsets.UTF_8);
275 }catch (IOException ex){
276 throw new IllegalArgumentException("can not find file: " + fileName + " in the ClassPath", ex);
277 }
278 }else{
279 try {
280 propStrings = Files.readLines(new File(fileName), Charsets.UTF_8);
281 } catch (IOException ex) {
282 throw new IllegalArgumentException("can not find the specified file: " + fileName, ex);
283 }
284 }
285
286 for(String propStr : propStrings){
287 URI propUri = (URI)Data.parseValue(propStr, null);
288 set.add(propUri);
289 }
290 return set;
291 }
292
293 private Map<String, Object> compressMapping(Map<String, Object> map) throws IOException{
294 HashMap<String, Object> destMap = new HashMap<>();
295 return compressMapping(destMap, map);
296 }
297 private Map<String, Object> compressMapping(Map<String, Object> destMap, Map<String, Object> sourceMap) throws IOException{
298 for(String key : sourceMap.keySet()){
299 Object value = sourceMap.get(key);
300 if(value instanceof Map){
301 value = compressMapping((Map)value);
302 }
303 try{
304 URI uriKey = (URI)Data.parseValue(key, null);
305 key = uriHandler.encode(uriKey);
306 destMap.put(key, value);
307 }catch(ParseException ex){
308
309 destMap.put(key, value);
310 }
311 }
312 return destMap;
313 }
314 }
315