1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package eu.fbk.knowledgestore.elastic;
17
18 import org.elasticsearch.common.settings.ImmutableSettings;
19 import org.elasticsearch.common.settings.Settings;
20 import org.elasticsearch.common.settings.SettingsException;
21 import org.elasticsearch.common.transport.InetSocketTransportAddress;
22 import org.elasticsearch.common.transport.TransportAddress;
23 import org.elasticsearch.common.unit.ByteSizeValue;
24 import org.elasticsearch.common.unit.TimeValue;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 import java.io.FileInputStream;
29 import java.io.FileNotFoundException;
30 import java.io.InputStream;
31 import java.net.InetAddress;
32 import java.net.URL;
33 import java.net.UnknownHostException;
34 import java.util.Arrays;
35 import java.util.concurrent.TimeUnit;
36
37
38
39
40
41 public class ElasticConfigurations {
42
43 private static final Logger LOGGER = LoggerFactory.getLogger(ElasticConfigurations.class);
44
45
46
47 private final Settings nodeSettings;
48
49 private final String resourceMapping;
50 private final String mentionMapping;
51
52 private final String indexName;
53
54
55 private final TransportAddress[] addresses;
56
57 private final TimeValue timeout;
58
59
60 private final TimeValue bulkTime;
61 private final ByteSizeValue bulkSize;
62 private final TimeValue flushInterval;
63 private final int concurrentRequests;
64
65
66 private final String weakCompressionPath;
67 private final String strongCompressionPath;
68
69 ElasticConfigurations(String path){
70
71 if(path != null){
72 URL url = DataTransactionElastic.class.getClassLoader().getResource(path);
73 if(url != null){
74 try{
75 LOGGER.debug("loading configuration from classPath");
76 nodeSettings = ImmutableSettings.settingsBuilder().loadFromClasspath(path).build();
77 }catch(SettingsException ex){
78 throw new IllegalArgumentException("failed to load settings from classpath", ex);
79 }
80 }else{
81 try{
82 LOGGER.debug("loading configuration from source");
83 InputStream input = null;
84 try {
85 input = new FileInputStream(path);
86 } catch (FileNotFoundException ex) {
87 throw new IllegalArgumentException("file" + path + "not found", ex);
88 }
89 nodeSettings = ImmutableSettings.settingsBuilder().loadFromStream(path, input).build();
90 }catch(SettingsException ex){
91 throw new IllegalArgumentException("failed to load settings from source", ex);
92 }
93 }
94 LOGGER.debug("nodeSettings: " + nodeSettings.names());
95 mentionMapping = nodeSettings.get("index.mapping.mention");
96 resourceMapping = nodeSettings.get("index.mapping.resource");
97 LOGGER.debug("mention mapping url: " + mentionMapping + " ; resource mapping url: " + resourceMapping);
98 indexName = nodeSettings.get("index.name");
99 if(indexName == null){
100 throw new IllegalArgumentException("no index name found in the configuration file: " + path);
101 }
102 LOGGER.debug("index name: " + indexName);
103 bulkTime = new TimeValue(nodeSettings.getAsLong("bulk.timeout", 60000L), TimeUnit.MILLISECONDS);
104 bulkSize = new ByteSizeValue(getNodeSettings().getAsLong("bulk.size", 1024L));
105 concurrentRequests = nodeSettings.getAsInt("bulk.concurrent_request", 0);
106 flushInterval = new TimeValue(getNodeSettings().getAsLong("bulk.interval", 60000L), TimeUnit.MILLISECONDS);
107
108 timeout = new TimeValue(getNodeSettings().getAsLong("scroll.timeout", 600000L), TimeUnit.MILLISECONDS);
109
110 addresses = parseAddresses(nodeSettings.getAsArray("transport.client.initial_nodes"));
111
112 weakCompressionPath = nodeSettings.get("uri_handler.weakcompression_path");
113 strongCompressionPath = nodeSettings.get("uri_handler.strongcompression_path");
114
115 }else{
116 throw new IllegalArgumentException("can not load the settings from path: " + path);
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133 }
134
135 LOGGER.debug("elasticsearch configuration loaded");
136 }
137
138 private TransportAddress[] parseAddresses(String[] addresses){
139 LOGGER.debug("addresses string: " + Arrays.toString(addresses));
140
141 if(addresses == null || addresses.length == 0) return null;
142
143 TransportAddress[] res = new InetSocketTransportAddress[addresses.length];
144 for(int i=0; i< addresses.length; i++){
145 String[] splitted = addresses[i].split(":", 2);
146 InetAddress address = null;
147 int port = -1;
148 try {
149 address = InetAddress.getByName(splitted[0]);
150 port = Integer.parseInt(splitted[1]);
151 LOGGER.debug("adress: " + address + " : " + port);
152 res[i] = new InetSocketTransportAddress(address, port);
153 } catch (UnknownHostException | NumberFormatException ex) {
154 LOGGER.error("can not find the host with IP: " + splitted[0] + " and port: " + port);
155 }
156 }
157 return res;
158 }
159
160
161
162
163
164 public Settings getNodeSettings() {
165 return nodeSettings;
166 }
167
168
169
170
171 public String getResourceMapping() {
172 return resourceMapping;
173 }
174
175
176
177
178 public String getMentionMapping() {
179 return mentionMapping;
180 }
181
182
183
184
185 public String getIndexName() {
186 return indexName;
187 }
188
189
190
191
192 public TransportAddress[] getAddresses() {
193 return addresses;
194 }
195
196
197
198
199 public TimeValue getTimeout() {
200 return timeout;
201 }
202
203
204
205
206 public TimeValue getBulkTime() {
207 return bulkTime;
208 }
209
210
211
212
213
214 public ByteSizeValue getBulkSize() {
215 return bulkSize;
216 }
217
218
219
220
221 public TimeValue getFlushInterval() {
222 return flushInterval;
223 }
224
225
226
227
228 public int getConcurrentRequests() {
229 return concurrentRequests;
230 }
231
232
233
234
235 public String getWeakCompressionPath(){
236 return weakCompressionPath;
237 }
238
239
240
241
242 public String getStrongCompressionPath(){
243 return strongCompressionPath;
244 }
245 }