1 package eu.fbk.knowledgestore.internal;
2
3 import java.io.Closeable;
4 import java.io.File;
5 import java.io.FilterInputStream;
6 import java.io.FilterOutputStream;
7 import java.io.IOException;
8 import java.io.InputStream;
9 import java.io.OutputStream;
10 import java.lang.Thread.UncaughtExceptionHandler;
11 import java.lang.reflect.Type;
12 import java.net.URL;
13 import java.util.Collection;
14 import java.util.List;
15 import java.util.Map;
16 import java.util.Properties;
17 import java.util.concurrent.Callable;
18 import java.util.concurrent.ExecutionException;
19 import java.util.concurrent.ExecutorService;
20 import java.util.concurrent.Executors;
21 import java.util.concurrent.Future;
22 import java.util.concurrent.ScheduledExecutorService;
23 import java.util.concurrent.ThreadFactory;
24 import java.util.concurrent.TimeUnit;
25 import java.util.concurrent.TimeoutException;
26
27 import javax.annotation.Nullable;
28
29 import com.google.common.base.Charsets;
30 import com.google.common.base.Preconditions;
31 import com.google.common.base.Throwables;
32 import com.google.common.collect.Lists;
33 import com.google.common.io.Resources;
34 import com.google.common.reflect.TypeToken;
35 import com.google.common.util.concurrent.ListenableFuture;
36 import com.google.common.util.concurrent.ListenableScheduledFuture;
37 import com.google.common.util.concurrent.ListeningExecutorService;
38 import com.google.common.util.concurrent.ListeningScheduledExecutorService;
39 import com.google.common.util.concurrent.MoreExecutors;
40 import com.google.common.util.concurrent.ThreadFactoryBuilder;
41
42 import org.slf4j.Logger;
43 import org.slf4j.LoggerFactory;
44 import org.slf4j.MDC;
45
46 public final class Util {
47
48 private static final Logger LOGGER = LoggerFactory.getLogger(Util.class);
49
50 public static URL getURL(final String location) {
51 URL url = null;
52 try {
53 url = Resources.getResource(location.startsWith("/") ? location.substring(1)
54 : location);
55 if (url != null) {
56 return url;
57 }
58 } catch (final Exception ex) {
59
60 }
61 try {
62 final File file = new File(location);
63 if (file.exists() && file.isFile()) {
64 return file.toURI().toURL();
65 }
66 } catch (final Exception ex) {
67
68 }
69 try {
70 return new URL(location);
71 } catch (final Exception ex) {
72
73 throw new IllegalArgumentException("Cannot extract a URL from: " + location);
74 }
75 }
76
77 public static String getResource(final Class<?> referenceClass, final String resourceName) {
78 try {
79 final URL url = referenceClass.getResource(resourceName);
80 return Resources.toString(url, Charsets.UTF_8);
81 } catch (final IOException ex) {
82 throw new Error("Missing resource '" + resourceName + "': " + ex.getMessage(), ex);
83 }
84 }
85
86 public static String getVersion(final String groupId, final String artifactId,
87 final String defaultValue) {
88 final URL url = Util.class.getClassLoader().getResource(
89 "META-INF/maven/" + groupId + "/" + artifactId + "/pom.properties");
90 String version = defaultValue;
91 if (url != null) {
92 try {
93 final InputStream stream = url.openStream();
94 try {
95 final Properties properties = new Properties();
96 properties.load(stream);
97 version = properties.getProperty("version").trim();
98 } finally {
99 stream.close();
100 }
101 } catch (final IOException ex) {
102 version = "unknown";
103 }
104 }
105 return version;
106 }
107
108 @SuppressWarnings({ "unchecked", "rawtypes" })
109 public static String formatType(final Type type) {
110 final TypeToken<?> token = TypeToken.of(type);
111 final Class<?> clazz = token.getRawType();
112 String name = clazz.getSimpleName();
113 if (name.isEmpty()) {
114 Class<?> parent = clazz.getSuperclass();
115 if (parent == null && clazz.getInterfaces().length > 0) {
116 parent = clazz.getInterfaces()[0];
117 }
118 if (parent != null) {
119 name = token.getSupertype((Class) parent).toString();
120 }
121 }
122 return name;
123 }
124
125 @Nullable
126 public static <T> T closeQuietly(@Nullable final T object) {
127 if (object instanceof Closeable) {
128 try {
129 ((Closeable) object).close();
130 } catch (final Throwable ex) {
131 LOGGER.error("Error closing " + object.getClass().getSimpleName(), ex);
132 }
133 }
134 return object;
135 }
136
137 @Nullable
138 public static InputStream interceptClose(@Nullable final InputStream stream,
139 @Nullable final Runnable runnable) {
140 if (stream == null || runnable == null) {
141 return stream;
142 }
143 final Map<String, String> mdc = Logging.getMDC();
144 return new FilterInputStream(stream) {
145
146 private boolean closed;
147
148 @Override
149 public void close() throws IOException {
150 if (this.closed) {
151 return;
152 }
153 final Map<String, String> oldMdc = Logging.getMDC();
154 try {
155 Logging.setMDC(mdc);
156 super.close();
157 runnable.run();
158 } finally {
159 this.closed = true;
160 Logging.setMDC(oldMdc);
161 }
162 }
163
164 };
165 }
166
167 @Nullable
168 public static OutputStream interceptClose(@Nullable final OutputStream stream,
169 @Nullable final Runnable runnable) {
170 if (stream == null || runnable == null) {
171 return stream;
172 }
173 final Map<String, String> mdc = Logging.getMDC();
174 return new FilterOutputStream(stream) {
175
176 private boolean closed;
177
178 @Override
179 public void close() throws IOException {
180 if (this.closed) {
181 return;
182 }
183 final Map<String, String> oldMdc = Logging.getMDC();
184 try {
185 Logging.setMDC(mdc);
186 super.close();
187 runnable.run();
188 } finally {
189 this.closed = true;
190 Logging.setMDC(oldMdc);
191 }
192 }
193
194 };
195 }
196
197 public static ListeningScheduledExecutorService newScheduler(final int numThreads,
198 final String nameFormat, final boolean daemon) {
199 final ThreadFactory factory = new ThreadFactoryBuilder().setDaemon(daemon)
200 .setNameFormat(nameFormat)
201 .setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
202
203 @Override
204 public void uncaughtException(final Thread thread, final Throwable ex) {
205 LOGGER.error("Uncaught exception in thread " + thread.getName(), ex);
206 }
207
208 }).build();
209 return decorate(Executors.newScheduledThreadPool(numThreads, factory));
210 }
211
212 public static ListeningExecutorService decorate(final ExecutorService executor) {
213 Preconditions.checkNotNull(executor);
214 if (executor instanceof MDCExecutorService) {
215 return (MDCExecutorService) executor;
216 } else if (executor instanceof ListeningExecutorService) {
217 return new MDCExecutorService((ListeningExecutorService) executor);
218 } else {
219 return new MDCExecutorService(MoreExecutors.listeningDecorator(executor));
220 }
221 }
222
223 public static ListeningScheduledExecutorService decorate(
224 final ScheduledExecutorService executor) {
225 if (executor instanceof MDCScheduledExecutorService) {
226 return (MDCScheduledExecutorService) executor;
227 } else if (executor instanceof ListeningScheduledExecutorService) {
228 return new MDCScheduledExecutorService((ListeningScheduledExecutorService) executor);
229 } else {
230
231 return new MDCScheduledExecutorService(MoreExecutors.listeningDecorator(executor));
232 }
233 }
234
235 private static class MDCScheduledExecutorService extends MDCExecutorService implements
236 ListeningScheduledExecutorService {
237
238 MDCScheduledExecutorService(final ListeningScheduledExecutorService delegate) {
239 super(Preconditions.checkNotNull(delegate));
240 }
241
242 @Override
243 ListeningScheduledExecutorService delegate() {
244 return (ListeningScheduledExecutorService) super.delegate();
245 }
246
247 @Override
248 public ListenableScheduledFuture<?> schedule(final Runnable command, final long delay,
249 final TimeUnit unit) {
250 return delegate().schedule(wrap(command, MDC.getCopyOfContextMap()), delay, unit);
251 }
252
253 @Override
254 public <V> ListenableScheduledFuture<V> schedule(final Callable<V> callable,
255 final long delay, final TimeUnit unit) {
256 return delegate().schedule(wrap(callable, MDC.getCopyOfContextMap()), delay, unit);
257 }
258
259 @Override
260 public ListenableScheduledFuture<?> scheduleAtFixedRate(final Runnable command,
261 final long initialDelay, final long period, final TimeUnit unit) {
262 return delegate().scheduleAtFixedRate(wrap(command, MDC.getCopyOfContextMap()),
263 initialDelay, period, unit);
264 }
265
266 @Override
267 public ListenableScheduledFuture<?> scheduleWithFixedDelay(final Runnable command,
268 final long initialDelay, final long delay, final TimeUnit unit) {
269 return delegate().scheduleWithFixedDelay(wrap(command, MDC.getCopyOfContextMap()),
270 initialDelay, delay, unit);
271 }
272
273 }
274
275 private static class MDCExecutorService implements ListeningExecutorService {
276
277 private final ListeningExecutorService delegate;
278
279 MDCExecutorService(final ListeningExecutorService delegate) {
280 this.delegate = Preconditions.checkNotNull(delegate);
281 }
282
283 ListeningExecutorService delegate() {
284 return this.delegate;
285 }
286
287 Runnable wrap(final Runnable runnable, final Map<String, String> mdcMap) {
288 return new Runnable() {
289
290 @Override
291 public void run() {
292 Map<String, String> oldMap = null;
293 try {
294 if (mdcMap != null) {
295 oldMap = MDC.getCopyOfContextMap();
296 MDC.setContextMap(mdcMap);
297 }
298 runnable.run();
299 } catch (final Throwable ex) {
300 LOGGER.error("Uncaught exception in thread "
301 + Thread.currentThread().getName() + ": " + ex.getMessage(), ex);
302 throw Throwables.propagate(ex);
303 } finally {
304 if (oldMap != null) {
305 MDC.setContextMap(oldMap);
306 }
307 }
308 }
309 };
310 }
311
312 <T> Callable<T> wrap(final Callable<T> callable, final Map<String, String> mdcMap) {
313 return new Callable<T>() {
314
315 @Override
316 public T call() throws Exception {
317 Map<String, String> oldMap = null;
318 try {
319 if (mdcMap != null) {
320 oldMap = MDC.getCopyOfContextMap();
321 MDC.setContextMap(mdcMap);
322 }
323 return callable.call();
324 } catch (final Throwable ex) {
325 LOGGER.error("Uncaught exception in thread "
326 + Thread.currentThread().getName() + ": " + ex.getMessage(), ex);
327 Throwables.propagateIfPossible(ex, Exception.class);
328 throw new RuntimeException(ex);
329 } finally {
330 if (oldMap != null) {
331 MDC.setContextMap(oldMap);
332 }
333 }
334 }
335 };
336 }
337
338 <T> Collection<Callable<T>> wrap(final Collection<? extends Callable<T>> callables,
339 final Map<String, String> mdcMap) {
340 final List<Callable<T>> result = Lists.newArrayListWithCapacity(callables.size());
341 for (final Callable<T> callable : callables) {
342 result.add(wrap(callable, mdcMap));
343 }
344 return result;
345 }
346
347 @Override
348 public void shutdown() {
349 delegate().shutdown();
350 }
351
352 @Override
353 public List<Runnable> shutdownNow() {
354 return delegate().shutdownNow();
355 }
356
357 @Override
358 public boolean isShutdown() {
359 return delegate().isShutdown();
360 }
361
362 @Override
363 public boolean isTerminated() {
364 return delegate().isTerminated();
365 }
366
367 @Override
368 public boolean awaitTermination(final long timeout, final TimeUnit unit)
369 throws InterruptedException {
370 return delegate().awaitTermination(timeout, unit);
371 }
372
373 @Override
374 public <T> T invokeAny(final Collection<? extends Callable<T>> tasks)
375 throws InterruptedException, ExecutionException {
376 return delegate().invokeAny(wrap(tasks, MDC.getCopyOfContextMap()));
377 }
378
379 @Override
380 public <T> T invokeAny(final Collection<? extends Callable<T>> tasks, final long timeout,
381 final TimeUnit unit) throws InterruptedException, ExecutionException,
382 TimeoutException {
383 return delegate().invokeAny(wrap(tasks, MDC.getCopyOfContextMap()), timeout, unit);
384 }
385
386 @Override
387 public void execute(final Runnable command) {
388 delegate().execute(wrap(command, MDC.getCopyOfContextMap()));
389 }
390
391 @Override
392 public <T> ListenableFuture<T> submit(final Callable<T> task) {
393 return delegate().submit(wrap(task, MDC.getCopyOfContextMap()));
394 }
395
396 @Override
397 public ListenableFuture<?> submit(final Runnable task) {
398 return delegate().submit(wrap(task, MDC.getCopyOfContextMap()));
399 }
400
401 @Override
402 public <T> ListenableFuture<T> submit(final Runnable task, final T result) {
403 return delegate().submit(wrap(task, MDC.getCopyOfContextMap()), result);
404 }
405
406 @Override
407 public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks)
408 throws InterruptedException {
409 return delegate().invokeAll(wrap(tasks, MDC.getCopyOfContextMap()));
410 }
411
412 @Override
413 public <T> List<Future<T>> invokeAll(final Collection<? extends Callable<T>> tasks,
414 final long timeout, final TimeUnit unit) throws InterruptedException {
415 return delegate().invokeAll(wrap(tasks, MDC.getCopyOfContextMap()), timeout, unit);
416 }
417
418 }
419
420 }