1 package eu.fbk.knowledgestore.client;
2
3 import com.google.common.base.Charsets;
4 import com.google.common.base.MoreObjects;
5 import com.google.common.base.Preconditions;
6 import com.google.common.base.Strings;
7 import com.google.common.collect.ImmutableSet;
8 import com.google.common.collect.Maps;
9 import com.google.common.escape.Escaper;
10 import com.google.common.io.BaseEncoding;
11 import com.google.common.net.HttpHeaders;
12 import com.google.common.net.UrlEscapers;
13 import eu.fbk.knowledgestore.AbstractKnowledgeStore;
14 import eu.fbk.knowledgestore.AbstractSession;
15 import eu.fbk.knowledgestore.Outcome;
16 import eu.fbk.knowledgestore.Outcome.Status;
17 import eu.fbk.knowledgestore.Session;
18 import eu.fbk.knowledgestore.data.*;
19 import eu.fbk.knowledgestore.internal.Util;
20 import eu.fbk.knowledgestore.internal.jaxrs.Protocol;
21 import eu.fbk.knowledgestore.internal.jaxrs.Serializer;
22 import eu.fbk.knowledgestore.internal.rdf.RDFUtil;
23 import eu.fbk.knowledgestore.vocabulary.NIE;
24 import org.apache.http.client.config.RequestConfig;
25 import org.apache.http.config.RegistryBuilder;
26 import org.apache.http.conn.HttpClientConnectionManager;
27 import org.apache.http.conn.socket.ConnectionSocketFactory;
28 import org.apache.http.conn.socket.PlainConnectionSocketFactory;
29 import org.apache.http.conn.ssl.DefaultHostnameVerifier;
30 import org.apache.http.conn.ssl.NoopHostnameVerifier;
31 import org.apache.http.conn.ssl.SSLConnectionSocketFactory;
32 import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
33 import org.glassfish.jersey.apache.connector.ApacheClientProperties;
34 import org.glassfish.jersey.apache.connector.ApacheConnectorProvider;
35 import org.glassfish.jersey.client.ClientConfig;
36 import org.glassfish.jersey.client.ClientProperties;
37 import org.glassfish.jersey.client.RequestEntityProcessing;
38 import org.glassfish.jersey.message.GZipEncoder;
39 import org.openrdf.model.Statement;
40 import org.openrdf.model.URI;
41 import org.openrdf.model.Value;
42 import org.openrdf.query.BindingSet;
43 import org.openrdf.rio.RDFFormat;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46
47 import javax.annotation.Nullable;
48 import javax.net.ssl.HostnameVerifier;
49 import javax.net.ssl.SSLContext;
50 import javax.net.ssl.TrustManager;
51 import javax.net.ssl.X509TrustManager;
52 import javax.ws.rs.HttpMethod;
53 import javax.ws.rs.WebApplicationException;
54 import javax.ws.rs.client.ClientBuilder;
55 import javax.ws.rs.client.Entity;
56 import javax.ws.rs.client.Invocation;
57 import javax.ws.rs.client.ResponseProcessingException;
58 import javax.ws.rs.core.*;
59 import java.io.InputStream;
60 import java.lang.reflect.Type;
61 import java.security.cert.X509Certificate;
62 import java.text.SimpleDateFormat;
63 import java.util.Date;
64 import java.util.Map;
65 import java.util.Set;
66 import java.util.concurrent.atomic.AtomicReference;
67
68
69
70 public final class Client extends AbstractKnowledgeStore {
71
72 private static final Logger LOGGER = LoggerFactory.getLogger(Client.class);
73
74 private static final String USER_AGENT = String.format(
75 "KnowledgeStore/%s Apache-HttpClient/%s",
76 Util.getVersion("eu.fbk.knowledgestore", "ks-core", "devel"),
77 Util.getVersion("org.apache.httpcomponents", "httpclient", "unknown"));
78
79 private static final SimpleDateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
80
81 private static final String MIME_TYPE_RDF = "application/x-tql";
82
83 private static final String MIME_TYPE_TUPLE = "text/tab-separated-values";
84
85 private static final String MIME_TYPE_BOOLEAN = "text/boolean";
86
87 private static final int DEFAULT_MAX_CONNECTIONS = 2;
88
89 private static final boolean DEFAULT_VALIDATE_SERVER = true;
90
91 private static final int DEFAULT_CONNECTION_TIMEOUT = 1000;
92 private static final int DEFAULT_SOCKET_TIMEOUT = 10000;
93
94 private static final boolean DEFAULT_COMPRESSION_ENABLED = LoggerFactory.getLogger(
95 "org.apache.http.wire").isDebugEnabled();;
96
97 private final String serverURL;
98
99 private final boolean compressionEnabled;
100
101 private final HttpClientConnectionManager connectionManager;
102
103 private final javax.ws.rs.client.Client client;
104
105 private final Map<String, String> targets;
106
107 private Client(final Builder builder) {
108
109 String url = Preconditions.checkNotNull(builder.serverURL);
110 if (url.endsWith("/")) {
111 url = url.substring(0, url.length() - 1);
112 }
113
114 final int timeout;
115 timeout = MoreObjects.firstNonNull(builder.connectionTimeout, DEFAULT_CONNECTION_TIMEOUT);
116 Preconditions.checkArgument(timeout >= 0, "Invalid connection timeout %d", timeout);
117
118 final int socketTimeout;
119 socketTimeout = MoreObjects.firstNonNull(builder.socketTimeout, DEFAULT_SOCKET_TIMEOUT);
120 Preconditions.checkArgument(socketTimeout >= 0, "Invalid connection timeout %d", socketTimeout);
121
122 this.serverURL = url;
123 this.compressionEnabled = MoreObjects.firstNonNull(builder.compressionEnabled,
124 DEFAULT_COMPRESSION_ENABLED);
125 this.connectionManager = createConnectionManager(
126 MoreObjects.firstNonNull(builder.maxConnections, DEFAULT_MAX_CONNECTIONS),
127 MoreObjects.firstNonNull(builder.validateServer, DEFAULT_VALIDATE_SERVER));
128 this.client = createJaxrsClient(this.connectionManager, timeout, socketTimeout, builder.proxy);
129 this.targets = Maps.newConcurrentMap();
130 }
131
132 public synchronized String getServerURL() {
133 checkNotClosed();
134 return this.serverURL;
135 }
136
137 @Override
138 protected Session doNewSession(@Nullable final String username, @Nullable final String password) {
139 return new SessionImpl(username, password);
140 }
141
142 @Override
143 protected void doClose() {
144 try {
145 this.client.close();
146 } finally {
147 this.connectionManager.shutdown();
148 }
149 }
150
151 private static PoolingHttpClientConnectionManager createConnectionManager(
152 final int maxConnections, final boolean validateServer) {
153
154
155 final SSLContext sslContext;
156 HostnameVerifier hostVerifier;
157 try {
158 if (validateServer) {
159 sslContext = SSLContext.getDefault();
160 hostVerifier = new DefaultHostnameVerifier();
161 } else {
162 sslContext = SSLContext.getInstance(Protocol.HTTPS_PROTOCOLS[0]);
163 sslContext.init(null, new TrustManager[] { new X509TrustManager() {
164
165 @Override
166 public void checkClientTrusted(final X509Certificate[] chain,
167 final String authType) {
168 }
169
170 @Override
171 public void checkServerTrusted(final X509Certificate[] chain,
172 final String authType) {
173 }
174
175 @Override
176 public X509Certificate[] getAcceptedIssuers() {
177 return null;
178 }
179
180 } }, null);
181 hostVerifier = NoopHostnameVerifier.INSTANCE;
182 }
183 } catch (final Throwable ex) {
184 throw new RuntimeException("SSL configuration failed", ex);
185 }
186
187
188 final ConnectionSocketFactory httpConnectionFactory = PlainConnectionSocketFactory
189 .getSocketFactory();
190
191
192 final ConnectionSocketFactory httpsConnectionFactory = new SSLConnectionSocketFactory(
193 sslContext, Protocol.HTTPS_PROTOCOLS, Protocol.HTTPS_CIPHER_SUITES, hostVerifier);
194
195
196 final PoolingHttpClientConnectionManager manager = new PoolingHttpClientConnectionManager(
197 RegistryBuilder.<ConnectionSocketFactory>create()
198 .register("http", httpConnectionFactory)
199 .register("https", httpsConnectionFactory).build());
200
201
202 manager.setMaxTotal(maxConnections);
203 manager.setDefaultMaxPerRoute(maxConnections);
204 manager.setValidateAfterInactivity(1000);
205 return manager;
206 }
207
208 private static javax.ws.rs.client.Client createJaxrsClient(
209 final HttpClientConnectionManager connectionManager, final int connectionTimeout,
210 final int socketTimeout, @Nullable final ProxyConfig proxy) {
211
212
213 final RequestConfig requestConfig = RequestConfig.custom()
214 .setExpectContinueEnabled(false)
215 .setRedirectsEnabled(false)
216 .setConnectionRequestTimeout(connectionTimeout)
217 .setConnectTimeout(connectionTimeout)
218 .setSocketTimeout(socketTimeout)
219 .build();
220
221
222 final ClientConfig config = new ClientConfig();
223 config.connectorProvider(new ApacheConnectorProvider());
224 config.property(ApacheClientProperties.CONNECTION_MANAGER, connectionManager);
225 config.property(ApacheClientProperties.REQUEST_CONFIG, requestConfig);
226 config.property(ApacheClientProperties.DISABLE_COOKIES, true);
227 config.property(ClientProperties.REQUEST_ENTITY_PROCESSING,
228 RequestEntityProcessing.CHUNKED);
229 if (proxy != null) {
230 config.property(ClientProperties.PROXY_URI, proxy.getURL());
231 config.property(ClientProperties.PROXY_USERNAME, proxy.getUsername());
232 config.property(ClientProperties.PROXY_PASSWORD, proxy.getPassword());
233 }
234
235
236 config.register(Serializer.class);
237 config.register(GZipEncoder.class);
238
239
240 return ClientBuilder.newClient(config);
241 }
242
243 private final class SessionImpl extends AbstractSession {
244
245 private final String authorization;
246
247 SessionImpl(@Nullable final String username, @Nullable final String password) {
248 super(Data.newNamespaceMap(Data.newNamespaceMap(), Data.getNamespaceMap()), username,
249 password);
250 final String actualUsername = MoreObjects.firstNonNull(username, "");
251 final String actualPassword = MoreObjects.firstNonNull(password, "");
252 final String authorizationString = actualUsername + ":" + actualPassword;
253 final byte[] authorizationBytes = authorizationString.getBytes(Charsets.ISO_8859_1);
254 this.authorization = "Basic " + BaseEncoding.base64().encode(authorizationBytes);
255 }
256
257 @Override
258 protected Status doFail(final Throwable ex, final AtomicReference<String> message)
259 throws Throwable {
260
261 if (ex instanceof WebApplicationException) {
262 final Response response = ((WebApplicationException) ex).getResponse();
263 try {
264 final RDFFormat format = RDFFormat.forMIMEType(response.getMediaType()
265 .toString());
266 final Outcome outcome = Outcome.decode(
267 RDFUtil.readRDF((InputStream) response.getEntity(), format, null,
268 null, false), false).getUnique();
269 message.set(outcome.getMessage());
270 return outcome.getStatus();
271 } catch (final Throwable ex2) {
272 LOGGER.error("Unable to decode error body", ex2);
273 return Status.valueOf(response.getStatus());
274 } finally {
275 response.close();
276 }
277
278 } else if (ex instanceof ResponseProcessingException) {
279 final Response response = ((ResponseProcessingException) ex).getResponse();
280 try {
281 final StringBuilder builder = new StringBuilder(
282 "Client side error (server response: ");
283 builder.append(response.getStatus());
284 if (response.hasEntity()) {
285 final String etag = response.getHeaderString(HttpHeaders.ETAG);
286 builder.append(", ").append(etag != null ? etag : response.getMediaType());
287 final Date lastModified = response.getLastModified();
288 if (lastModified != null) {
289 synchronized (DATE_FORMAT) {
290 builder.append(", ").append(DATE_FORMAT.format(lastModified));
291 }
292 }
293 }
294 message.set(builder.toString());
295 return Status.valueOf(response.getStatus());
296 } finally {
297 response.close();
298 }
299
300 } else {
301 return super.doFail(ex, message);
302 }
303 }
304
305 @Override
306 @Nullable
307 protected Representation doDownload(@Nullable final Long timeout, final URI id,
308 @Nullable final Set<String> mimeTypes, final boolean useCaches) throws Throwable {
309
310 final String query = query(Protocol.PARAMETER_ID, id);
311
312 final Map<String, Object> headers = Maps.newHashMap();
313 if (mimeTypes != null) {
314 headers.put(HttpHeaders.ACCEPT, mimeTypes);
315 }
316 if (!useCaches) {
317 final CacheControl cacheControl = new CacheControl();
318 cacheControl.setNoStore(true);
319 headers.put(HttpHeaders.CACHE_CONTROL, cacheControl);
320 }
321
322 try {
323 return invoke(HttpMethod.GET, Protocol.PATH_REPRESENTATIONS, query, headers, null,
324 new GenericType<Representation>(Representation.class), timeout);
325
326 } catch (final WebApplicationException ex) {
327 if (ex.getResponse().getStatus() == 404) {
328 ex.getResponse().close();
329 return null;
330 }
331 throw ex;
332 }
333 }
334
335 @Override
336 protected Outcome doUpload(@Nullable final Long timeout, final URI id,
337 final Representation representation) throws Exception {
338
339 final String path = Protocol.PATH_REPRESENTATIONS;
340 final String query = query(Protocol.PARAMETER_ID, id);
341 final Entity<?> entity = representation == null ? null : entity(representation);
342 return invoke(HttpMethod.PUT, path, query, null, entity, Protocol.STREAM_OF_OUTCOMES,
343 timeout).getUnique();
344 }
345
346 @Override
347 protected long doCount(@Nullable final Long timeout, final URI type,
348 @Nullable final XPath condition, @Nullable final Set<URI> ids) throws Throwable {
349
350 final String path = Protocol.pathFor(type) + "/" + Protocol.SUBPATH_COUNT;
351 final String query = query(Protocol.PARAMETER_CONDITION, condition,
352 Protocol.PARAMETER_ID, ids);
353 final Statement result = invoke(HttpMethod.GET, path, query, null, null,
354 Protocol.STREAM_OF_STATEMENTS, timeout).getUnique();
355 return Data.convert(result.getObject(), Long.class);
356 }
357
358 @Override
359 protected Stream<Record> doRetrieve(@Nullable final Long timeout, final URI type,
360 @Nullable final XPath condition, @Nullable final Set<URI> ids,
361 @Nullable final Set<URI> properties, @Nullable final Long offset,
362 @Nullable final Long limit) throws Throwable {
363
364 final String path = Protocol.pathFor(type);
365 final String query = query(
366 Protocol.PARAMETER_CONDITION, condition,
367 Protocol.PARAMETER_ID, ids,
368 Protocol.PARAMETER_PROPERTY, properties,
369 Protocol.PARAMETER_OFFSET, offset,
370 Protocol.PARAMETER_LIMIT, limit);
371 final Stream<Record> result = invoke(HttpMethod.GET, path, query, null, null,
372 Protocol.STREAM_OF_RECORDS, timeout);
373 result.setProperty("types", ImmutableSet.of(type));
374 return result;
375 }
376
377 @Override
378 protected void doCreate(@Nullable final Long timeout, final URI type,
379 final Stream<? extends Record> records, final Handler<? super Outcome> handler)
380 throws Exception {
381
382 final String path = Protocol.pathFor(type) + "/" + Protocol.SUBPATH_CREATE;
383 final Entity<?> entity = entity(records, type);
384 final Stream<Outcome> result = invoke(HttpMethod.POST, path, null, null, entity,
385 Protocol.STREAM_OF_OUTCOMES, timeout);
386 result.toHandler(handler);
387 }
388
389 @Override
390 protected void doMerge(@Nullable final Long timeout, final URI type,
391 final Stream<? extends Record> records, final Criteria criteria,
392 final Handler<? super Outcome> handler) throws Exception {
393
394 final String path = Protocol.pathFor(type) + "/" + Protocol.SUBPATH_MERGE;
395 final String query = query(Protocol.PARAMETER_CRITERIA, criteria);
396 final Entity<?> entity = entity(records, type);
397 final Stream<Outcome> result = invoke(HttpMethod.POST, path, query, null, entity,
398 Protocol.STREAM_OF_OUTCOMES, timeout);
399 result.toHandler(handler);
400 }
401
402 @Override
403 protected void doUpdate(@Nullable final Long timeout, final URI type,
404 final XPath condition, final Set<URI> ids, final Record record,
405 final Criteria criteria, final Handler<? super Outcome> handler) throws Exception {
406
407 final String path = Protocol.pathFor(type) + "/" + Protocol.SUBPATH_UPDATE;
408 final String query = query(
409 Protocol.PARAMETER_CRITERIA, criteria,
410 Protocol.PARAMETER_CONDITION, condition,
411 Protocol.PARAMETER_ID, ids);
412 final Entity<?> entity = entity(Stream.create(record), type);
413 final Stream<Outcome> result = invoke(HttpMethod.POST, path, query, null, entity,
414 Protocol.STREAM_OF_OUTCOMES, timeout);
415 result.toHandler(handler);
416 }
417
418 @Override
419 protected void doDelete(@Nullable final Long timeout, final URI type,
420 final XPath condition, final Set<URI> ids, final Handler<? super Outcome> handler)
421 throws Exception {
422
423 final String path = Protocol.pathFor(type) + "/" + Protocol.SUBPATH_DELETE;
424 final String query = query(
425 Protocol.PARAMETER_CONDITION, condition,
426 Protocol.PARAMETER_ID, ids);
427 final Stream<Outcome> result = invoke(HttpMethod.POST, path, query, null, null,
428 Protocol.STREAM_OF_OUTCOMES, timeout);
429 result.toHandler(handler);
430 }
431
432 @Override
433 protected Stream<Record> doMatch(@Nullable final Long timeout,
434 final Map<URI, XPath> conditions, final Map<URI, Set<URI>> ids,
435 final Map<URI, Set<URI>> properties) throws Exception {
436
437 throw new UnsupportedOperationException();
438 }
439
440 @SuppressWarnings("unchecked")
441 @Override
442 protected <T> Stream<T> doSparql(@Nullable final Long timeout, final Class<T> type,
443 final String expression, final Set<URI> defaultGraphs, final Set<URI> namedGraphs)
444 throws Exception {
445
446 final String path = Protocol.PATH_SPARQL;
447 final String query = query(
448 Protocol.PARAMETER_QUERY, expression,
449 Protocol.PARAMETER_DEFAULT_GRAPH, defaultGraphs,
450 Protocol.PARAMETER_NAMED_GRAPH, namedGraphs);
451 GenericType<?> responseType;
452 if (type == Statement.class) {
453 responseType = Protocol.STREAM_OF_STATEMENTS;
454 } else if (type == BindingSet.class) {
455 responseType = Protocol.STREAM_OF_TUPLES;
456 } else if (type == Boolean.class) {
457 responseType = Protocol.STREAM_OF_BOOLEANS;
458 } else {
459 throw new Error("Unexpected result type: " + type);
460 }
461 return (Stream<T>) invoke(HttpMethod.GET, path, query, null, null, responseType,
462 timeout);
463 }
464
465 @Override
466 protected Outcome doSparqlUpdate(@Nullable Long timeout, @Nullable Stream<? extends Statement> statements) throws Throwable {
467 final String path = Protocol.PATH_UPDATE;
468 final GenericEntity<Stream<Statement>> entity = new GenericEntity<Stream<Statement>>((Stream<Statement>) statements, Protocol.STREAM_OF_STATEMENTS.getType());
469 Entity<?> entityEntity = Entity.entity(entity, new Variant(MediaType.valueOf(MIME_TYPE_RDF), (String) null, Client.this.compressionEnabled ? "gzip" : "identity"));
470 return invoke(HttpMethod.POST, path, null, null, entityEntity, Protocol.STREAM_OF_OUTCOMES, timeout).getUnique();
471 }
472
473 @Override
474 protected Outcome doSparqlDelete(@Nullable Long timeout, @Nullable Stream<? extends Statement> statements) throws Throwable {
475 final String path = Protocol.PATH_DELETE;
476 final GenericEntity<Stream<Statement>> entity = new GenericEntity<Stream<Statement>>((Stream<Statement>) statements, Protocol.STREAM_OF_STATEMENTS.getType());
477 Entity<?> entityEntity = Entity.entity(entity, new Variant(MediaType.valueOf(MIME_TYPE_RDF), (String) null, Client.this.compressionEnabled ? "gzip" : "identity"));
478 return invoke(HttpMethod.POST, path, null, null, entityEntity, Protocol.STREAM_OF_OUTCOMES, timeout).getUnique();
479 }
480
481 private String query(final Object... queryNameValues) {
482 final StringBuilder builder = new StringBuilder();
483 final Escaper escaper = UrlEscapers.urlFormParameterEscaper();
484 String separator = "?";
485 for (int i = 0; i < queryNameValues.length; i += 2) {
486 final Object name = queryNameValues[i].toString();
487 final Object value = queryNameValues[i + 1];
488 if (value == null) {
489 continue;
490 }
491 final Iterable<?> iterable = value instanceof Iterable<?> ? (Iterable<?>) value
492 : ImmutableSet.of(value);
493 for (final Object element : iterable) {
494 if (element == null) {
495 continue;
496 }
497 String encoded;
498 if (element instanceof Value && !name.equals(Protocol.PARAMETER_DEFAULT_GRAPH)
499 && !name.equals(Protocol.PARAMETER_NAMED_GRAPH)) {
500 encoded = Data.toString(element, Data.getNamespaceMap());
501 } else {
502 encoded = element.toString();
503 }
504 builder.append(separator).append(name).append("=");
505 builder.append(escaper.escape(encoded));
506 separator = "&";
507 }
508 }
509 return builder.toString();
510 }
511
512 private Entity<Representation> entity(final Representation representation) {
513 final String mimeType = representation.getMetadata().getUnique(NIE.MIME_TYPE,
514 String.class, MediaType.APPLICATION_OCTET_STREAM);
515 final Variant variant = new Variant(MediaType.valueOf(mimeType), (String) null,
516 Client.this.compressionEnabled ? "gzip" : "identity");
517 return Entity.entity(representation, variant);
518 }
519
520 @SuppressWarnings("unchecked")
521 private Entity<GenericEntity<Stream<Record>>> entity(
522 final Stream<? extends Record> records, final URI type) {
523
524 records.setProperty("types", ImmutableSet.of(type));
525 final GenericEntity<Stream<Record>> entity = new GenericEntity<Stream<Record>>(
526 (Stream<Record>) records, Protocol.STREAM_OF_RECORDS.getType());
527 return Entity.entity(entity, new Variant(MediaType.valueOf(MIME_TYPE_RDF),
528 (String) null, Client.this.compressionEnabled ? "gzip" : "identity"));
529 }
530
531 private <T> T invoke(final String method, final String path, @Nullable final String query,
532 @Nullable final Map<String, Object> headers,
533 @Nullable final Entity<?> requestEntity, final GenericType<T> responseType,
534 @Nullable final Long timeout) {
535
536
537 final String action = method + ":" + path;
538 final String target = Client.this.targets.get(action);
539 final String uri = target != null ? target : Client.this.serverURL + "/" + path;
540
541
542 String actualQuery = query;
543 Entity<?> actualRequestEntity = requestEntity;
544 if (target == null) {
545 actualQuery = Strings.isNullOrEmpty(query) ? "?probe=true" : query + "&probe=true";
546 if (requestEntity != null) {
547 final Variant variant = requestEntity.getVariant();
548 actualRequestEntity = Entity.entity(new byte[0],
549 new Variant(variant.getMediaType(), (String) null, "identity"));
550 }
551 }
552
553
554 if (timeout != null) {
555 final long timeoutInSeconds = Math.max(1, timeout / 1000);
556 actualQuery = Strings.isNullOrEmpty(actualQuery) ? "?timeout=" + timeoutInSeconds
557 : actualQuery + "&timeout=" + timeoutInSeconds;
558 }
559
560
561 String acceptType = MediaType.WILDCARD;
562 if (responseType.equals(Protocol.STREAM_OF_RECORDS)
563 || responseType.equals(Protocol.STREAM_OF_OUTCOMES)
564 || responseType.equals(Protocol.STREAM_OF_STATEMENTS)) {
565 acceptType = MIME_TYPE_RDF;
566 } else if (responseType.equals(Protocol.STREAM_OF_TUPLES)) {
567 acceptType = MIME_TYPE_TUPLE;
568 } else if (responseType.equals(Protocol.STREAM_OF_BOOLEANS)) {
569 acceptType = MIME_TYPE_BOOLEAN;
570 }
571
572
573 final Invocation.Builder invoker = Client.this.client.target(
574 actualQuery == null ? uri : uri + actualQuery).request(acceptType);
575
576
577 if (headers != null) {
578 for (final Map.Entry<String, Object> entry : headers.entrySet()) {
579 invoker.header(entry.getKey(), entry.getValue());
580 }
581 }
582
583
584 invoker.header(HttpHeaders.USER_AGENT, USER_AGENT);
585 invoker.header(Protocol.HEADER_INVOCATION, getInvocationID().stringValue());
586
587
588 invoker.header(HttpHeaders.ACCEPT_ENCODING,
589 Client.this.compressionEnabled ? "gzip, deflate, identity" : "identity");
590
591
592 if (uri.startsWith("https")) {
593 invoker.header(HttpHeaders.AUTHORIZATION, this.authorization);
594 }
595
596
597 if (LOGGER.isDebugEnabled()) {
598 final StringBuilder builder = new StringBuilder("Http: ");
599 builder.append(method).append(' ')
600 .append(actualQuery == null ? uri : uri + actualQuery);
601 if (actualRequestEntity != null) {
602 Type type = actualRequestEntity.getEntity().getClass();
603 if (type.equals(GenericEntity.class)) {
604 type = ((GenericEntity<?>) actualRequestEntity.getEntity()).getType();
605 }
606 builder.append(' ').append(Util.formatType(type));
607 builder.append(' ').append(actualRequestEntity.getMediaType());
608 }
609 if (getUsername() != null) {
610 builder.append(' ').append(getUsername());
611 }
612 LOGGER.debug(builder.toString());
613 }
614
615
616 final long timestamp = System.currentTimeMillis();
617 final Response response = actualRequestEntity == null ? invoker.method(method) :
618 invoker.method(method, actualRequestEntity);
619 final long elapsed = System.currentTimeMillis() - timestamp;
620
621
622 if (LOGGER.isDebugEnabled()) {
623 final StringBuilder builder = new StringBuilder("Http: ");
624 builder.append(response.getStatus());
625 if (response.hasEntity()) {
626 final String etag = response.getHeaderString(HttpHeaders.ETAG);
627 builder.append(", ").append(etag != null ? etag : response.getMediaType());
628 final Date lastModified = response.getLastModified();
629 if (lastModified != null) {
630 synchronized (DATE_FORMAT) {
631 builder.append(", ").append(DATE_FORMAT.format(lastModified));
632 }
633 }
634 }
635 builder.append(", ").append(elapsed).append(" ms");
636 LOGGER.debug(builder.toString());
637 }
638
639
640 final int status = response.getStatus();
641 if (status == 302 || status == 307 || status == 308) {
642 response.close();
643 String newURI = response.getHeaderString(HttpHeaders.LOCATION);
644 final int index = newURI.indexOf('?');
645 newURI = index < 0 ? newURI : newURI.substring(0, index);
646 Client.this.targets.put(action, newURI);
647 LOGGER.debug("Http: stored redirection: {} -> {}", path, newURI);
648 return invoke(method, path, query, headers, requestEntity, responseType, timeout);
649 }
650
651
652 Client.this.targets.put(action, uri);
653 if (status / 100 == 2) {
654 if (Representation.class.isAssignableFrom(responseType.getRawType())) {
655 response.bufferEntity();
656 }
657 final T result = response.readEntity(responseType);
658 if (result instanceof Stream<?>) {
659 ((Stream<?>) result).onClose(new Runnable() {
660
661 @Override
662 public void run() {
663 response.close();
664 }
665
666 });
667 }
668 return result;
669 } else {
670 Util.closeQuietly(response);
671 throw new WebApplicationException(response);
672 }
673 }
674 }
675
676 public static Builder builder(final String serverURL) {
677 return new Builder(serverURL);
678 }
679
680 public static class Builder {
681
682 String serverURL;
683
684 @Nullable
685 Integer maxConnections;
686
687 @Nullable
688 Integer connectionTimeout;
689
690 @Nullable
691 Integer socketTimeout;
692
693 @Nullable
694 Boolean compressionEnabled;
695
696 @Nullable
697 Boolean validateServer;
698
699 @Nullable
700 ProxyConfig proxy;
701
702 Builder(final String serverURL) {
703 this.serverURL = Preconditions.checkNotNull(serverURL);
704 }
705
706 public Builder maxConnections(@Nullable final Integer maxConnections) {
707 this.maxConnections = maxConnections;
708 return this;
709 }
710
711 public Builder connectionTimeout(@Nullable final Integer connectionTimeout) {
712 this.connectionTimeout = connectionTimeout;
713 return this;
714 }
715
716 public Builder socketTimeout(@Nullable final Integer socketTimeout) {
717 this.socketTimeout = socketTimeout;
718 return this;
719 }
720
721 public Builder compressionEnabled(@Nullable final Boolean compressionEnabled) {
722 this.compressionEnabled = compressionEnabled;
723 return this;
724 }
725
726 public Builder validateServer(@Nullable final Boolean validateServer) {
727 this.validateServer = validateServer;
728 return this;
729 }
730
731 public Builder proxy(@Nullable final ProxyConfig proxy) {
732 this.proxy = proxy;
733 return this;
734 }
735
736 public Client build() {
737 return new Client(this);
738 }
739
740 }
741
742 }