1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  package eu.fbk.knowledgestore.elastic;
17  
18  import org.elasticsearch.common.settings.ImmutableSettings;
19  import org.elasticsearch.common.settings.Settings;
20  import org.elasticsearch.common.settings.SettingsException;
21  import org.elasticsearch.common.transport.InetSocketTransportAddress;
22  import org.elasticsearch.common.transport.TransportAddress;
23  import org.elasticsearch.common.unit.ByteSizeValue;
24  import org.elasticsearch.common.unit.TimeValue;
25  import org.slf4j.Logger;
26  import org.slf4j.LoggerFactory;
27  
28  import java.io.FileInputStream;
29  import java.io.FileNotFoundException;
30  import java.io.InputStream;
31  import java.net.InetAddress;
32  import java.net.URL;
33  import java.net.UnknownHostException;
34  import java.util.Arrays;
35  import java.util.concurrent.TimeUnit;
36  
37  
38  
39  
40  
41  public class ElasticConfigurations {
42      
43      private static final Logger LOGGER = LoggerFactory.getLogger(ElasticConfigurations.class);
44      
45      
46  
47      private final Settings nodeSettings;
48      
49      private final String resourceMapping;
50      private final String mentionMapping;
51  
52      private final String indexName; 
53      
54   
55      private final TransportAddress[] addresses;  
56  
57      private final TimeValue timeout; 
58      
59  
60      private final TimeValue bulkTime; 
61      private final ByteSizeValue bulkSize; 
62      private final TimeValue flushInterval; 
63      private final int concurrentRequests; 
64      
65      
66      private final String weakCompressionPath;
67      private final String strongCompressionPath;
68      
69      ElasticConfigurations(String path){
70          
71          if(path != null){
72              URL url = DataTransactionElastic.class.getClassLoader().getResource(path);
73              if(url != null){
74                  try{
75                      LOGGER.debug("loading configuration from classPath");
76                      nodeSettings = ImmutableSettings.settingsBuilder().loadFromClasspath(path).build();
77                  }catch(SettingsException ex){
78                      throw new IllegalArgumentException("failed to load settings from classpath", ex);
79                  }
80              }else{
81                  try{
82                      LOGGER.debug("loading configuration from source");
83                      InputStream input = null;
84                      try {
85                          input = new FileInputStream(path);
86                      } catch (FileNotFoundException ex) {
87                          throw new IllegalArgumentException("file" + path + "not found", ex);
88                      }
89                      nodeSettings = ImmutableSettings.settingsBuilder().loadFromStream(path, input).build();
90                  }catch(SettingsException ex){
91                      throw new IllegalArgumentException("failed to load settings from source", ex);
92                  }
93              }
94              LOGGER.debug("nodeSettings: " + nodeSettings.names());
95              mentionMapping = nodeSettings.get("index.mapping.mention");
96              resourceMapping = nodeSettings.get("index.mapping.resource");
97              LOGGER.debug("mention mapping url: " + mentionMapping + " ; resource mapping url: " + resourceMapping);
98              indexName = nodeSettings.get("index.name");
99              if(indexName == null){
100                 throw new IllegalArgumentException("no index name found in the configuration file: " + path);
101             }
102             LOGGER.debug("index name: " + indexName);
103             bulkTime = new TimeValue(nodeSettings.getAsLong("bulk.timeout", 60000L), TimeUnit.MILLISECONDS);
104             bulkSize = new ByteSizeValue(getNodeSettings().getAsLong("bulk.size", 1024L));
105             concurrentRequests = nodeSettings.getAsInt("bulk.concurrent_request", 0);
106             flushInterval = new TimeValue(getNodeSettings().getAsLong("bulk.interval", 60000L), TimeUnit.MILLISECONDS);
107             
108             timeout = new TimeValue(getNodeSettings().getAsLong("scroll.timeout", 600000L), TimeUnit.MILLISECONDS);
109             
110             addresses = parseAddresses(nodeSettings.getAsArray("transport.client.initial_nodes"));
111             
112             weakCompressionPath = nodeSettings.get("uri_handler.weakcompression_path");
113             strongCompressionPath = nodeSettings.get("uri_handler.strongcompression_path");
114             
115         }else{
116             throw new IllegalArgumentException("can not load the settings from path: " + path);
117             
118 
119 
120 
121 
122 
123 
124 
125 
126 
127 
128 
129 
130 
131 
132 
133         }
134         
135         LOGGER.debug("elasticsearch configuration loaded");
136     }
137     
138     private TransportAddress[] parseAddresses(String[] addresses){
139         LOGGER.debug("addresses string: " + Arrays.toString(addresses));
140         
141         if(addresses == null || addresses.length == 0) return null;
142         
143         TransportAddress[] res = new InetSocketTransportAddress[addresses.length];
144         for(int i=0; i< addresses.length; i++){
145             String[] splitted = addresses[i].split(":", 2); 
146             InetAddress address = null;
147             int port = -1;
148             try {
149                 address = InetAddress.getByName(splitted[0]);
150                 port = Integer.parseInt(splitted[1]);
151                 LOGGER.debug("adress: " + address + " : " + port);
152                 res[i] = new InetSocketTransportAddress(address, port);
153             } catch (UnknownHostException | NumberFormatException ex) {
154                 LOGGER.error("can not find the host with IP: " + splitted[0] + " and port: " + port);
155             }
156         }
157         return res;
158     }
159     
160 
161     
162 
163 
164     public Settings getNodeSettings() {
165         return nodeSettings;
166     }
167 
168     
169 
170 
171     public String getResourceMapping() {
172         return resourceMapping;
173     }
174 
175     
176 
177 
178     public String getMentionMapping() {
179         return mentionMapping;
180     }
181 
182     
183 
184 
185     public String getIndexName() {
186         return indexName;
187     }
188 
189     
190 
191 
192     public TransportAddress[] getAddresses() {
193         return addresses;
194     }
195 
196     
197 
198 
199     public TimeValue getTimeout() {
200         return timeout;
201     }
202 
203     
204 
205 
206     public TimeValue getBulkTime() {
207         return bulkTime;
208     }
209 
210     
211     
212 
213 
214     public ByteSizeValue getBulkSize() {
215         return bulkSize;
216     }
217 
218     
219 
220 
221     public TimeValue getFlushInterval() {
222         return flushInterval;
223     }
224 
225     
226 
227 
228     public int getConcurrentRequests() {
229         return concurrentRequests;
230     }
231     
232     
233 
234 
235     public String getWeakCompressionPath(){
236         return weakCompressionPath;
237     }
238     
239     
240 
241 
242     public String getStrongCompressionPath(){
243         return strongCompressionPath;
244     }
245 }