1 package eu.fbk.knowledgestore.tool;
2
3 import java.io.BufferedReader;
4 import java.io.File;
5 import java.io.IOException;
6 import java.io.InputStream;
7 import java.io.Writer;
8 import java.nio.file.Path;
9 import java.nio.file.Paths;
10 import java.util.Arrays;
11 import java.util.List;
12 import java.util.Map;
13 import java.util.Properties;
14 import java.util.Random;
15 import java.util.Set;
16 import java.util.concurrent.ExecutorService;
17 import java.util.concurrent.Executors;
18 import java.util.concurrent.TimeUnit;
19 import java.util.concurrent.atomic.AtomicInteger;
20 import java.util.concurrent.atomic.AtomicLong;
21 import java.util.concurrent.atomic.AtomicReference;
22 import java.util.regex.Matcher;
23 import java.util.regex.Pattern;
24
25 import javax.annotation.Nullable;
26
27 import com.google.common.base.Charsets;
28 import com.google.common.base.Joiner;
29 import com.google.common.base.Preconditions;
30 import com.google.common.base.Splitter;
31 import com.google.common.base.Strings;
32 import com.google.common.collect.ImmutableList;
33 import com.google.common.collect.ImmutableSet;
34 import com.google.common.collect.Iterables;
35 import com.google.common.collect.Lists;
36 import com.google.common.collect.Maps;
37 import com.google.common.collect.Ordering;
38 import com.google.common.collect.Sets;
39 import com.google.common.primitives.Ints;
40 import com.google.common.primitives.Longs;
41 import com.google.common.util.concurrent.ThreadFactoryBuilder;
42
43 import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
44 import org.openrdf.model.URI;
45 import org.openrdf.model.Value;
46 import org.openrdf.model.ValueFactory;
47 import org.openrdf.model.impl.ValueFactoryImpl;
48 import org.openrdf.query.Binding;
49 import org.openrdf.query.BindingSet;
50 import org.openrdf.query.impl.MapBindingSet;
51 import org.slf4j.Logger;
52 import org.slf4j.LoggerFactory;
53 import org.slf4j.MDC;
54
55 import eu.fbk.knowledgestore.Operation.Sparql;
56 import eu.fbk.knowledgestore.Session;
57 import eu.fbk.knowledgestore.client.Client;
58 import eu.fbk.knowledgestore.data.Data;
59 import eu.fbk.knowledgestore.data.Record;
60 import eu.fbk.knowledgestore.data.Representation;
61 import eu.fbk.knowledgestore.data.Stream;
62 import eu.fbk.knowledgestore.internal.CommandLine;
63 import eu.fbk.knowledgestore.internal.Logging;
64 import eu.fbk.knowledgestore.vocabulary.KS;
65 import eu.fbk.rdfpro.util.IO;
66 import eu.fbk.rdfpro.util.Namespaces;
67 import eu.fbk.rdfpro.util.Statements;
68 import eu.fbk.rdfpro.util.Tracker;
69
70 public final class TestDriver {
71
72 private static final Logger LOGGER = LoggerFactory.getLogger(TestDriver.class);
73
74 private static final ValueFactory FACTORY = ValueFactoryImpl.getInstance();
75
76 private final String url;
77
78 private final String username;
79
80 private final String password;
81
82 private final int warmupMixes;
83
84 private final int testMixes;
85
86 private final long warmupTime;
87
88 private final long testTime;
89
90 private final int clients;
91
92 private final long timeout;
93
94 private final Query[] queries;
95
96 private final List<String> outputVariables;
97
98 private List<String> inputVariables;
99
100 private final byte[][] inputData;
101
102 private final File outputFile;
103
104 private final long seed;
105
106 public static void main(final String... args) {
107 try {
108
109
110 final CommandLine cmd = CommandLine
111 .parser()
112 .withName("ks-test-driver")
113 .withHeader(
114 "Perform a query scalability test against a KnowledgeStore. "
115 + "Test parameters and queries are supplied in a .properties file. "
116 + "Test data (produced with query-test-generator) "
117 + "is supplied in a .tsv file.")
118 .withOption("c", "config", "the configuration file", "FILE",
119 CommandLine.Type.FILE_EXISTING, true, false, true)
120 .withFooter(
121 "Test configuration may be overridden by supplying additional "
122 + "property=value\narguments on the command line.")
123 .withLogger(LoggerFactory.getLogger("eu.fbk.nwrtools")).parse(args);
124
125 final File configFile = cmd.getOptionValue("c", File.class);
126
127 final Properties config = new Properties();
128 try (InputStream configStream = IO.read(configFile.getAbsolutePath())) {
129 config.load(configStream);
130 }
131
132 for (final String arg : cmd.getArgs(String.class)) {
133 final int index = arg.indexOf('=');
134 if (index > 0) {
135 final String name = arg.substring(0, index);
136 final String value = arg.substring(index + 1);
137 config.setProperty(name, value);
138 }
139 }
140
141 new TestDriver(config, configFile.getParentFile()).run();
142
143 } catch (final Throwable ex) {
144 CommandLine.fail(ex);
145 }
146 }
147
148 public TestDriver(final Properties properties, @Nullable final File baseDir)
149 throws IOException {
150
151
152 final Path base = (baseDir != null ? baseDir : new File(System.getProperty("user.dir")))
153 .toPath();
154
155
156 this.seed = TestUtil.read(properties, "test.seed", Long.class, 0L);
157
158
159 final String dataArg = TestUtil.read(properties, "test.data", String.class);
160 final String outputArg = TestUtil.read(properties, "test.out", String.class);
161 final File dataFile = base.resolve(Paths.get(dataArg)).toFile();
162 this.outputFile = outputArg == null ? null : base.resolve(Paths.get(outputArg)).toFile();
163
164
165 this.url = TestUtil.read(properties, "test.url", String.class);
166 this.username = TestUtil.read(properties, "test.username", String.class, null);
167 this.password = TestUtil.read(properties, "test.password", String.class, null);
168 LOGGER.info("SUT: {}{}", this.url,
169 this.username == null && this.password == null ? " (anonymous access)"
170 : " (authenticated access)");
171
172
173 this.warmupMixes = TestUtil.read(properties, "test.warmupmixes", Integer.class, 0);
174 this.testMixes = TestUtil.read(properties, "test.testmixes", Integer.class, 1);
175 this.warmupTime = TestUtil.read(properties, "test.warmuptime", Long.class, 3600L) * 1000;
176 this.testTime = TestUtil.read(properties, "test.testtime", Long.class, 3600L) * 1000;
177 this.clients = TestUtil.read(properties, "test.clients", Integer.class, 1);
178 LOGGER.info("{} mix(es), {} s warmup; {} mix(es), {} s test; {} client(s)",
179 this.warmupMixes, this.warmupTime / 1000, this.testMixes, this.testTime / 1000,
180 this.clients);
181
182
183 this.timeout = TestUtil.read(properties, "test.timeout", Long.class, -1L);
184
185
186 Preconditions.checkArgument(dataFile.exists(), "File " + dataFile + " does not exist");
187 final List<byte[]> data = Lists.newArrayList();
188 try (BufferedReader reader = new BufferedReader(IO.utf8Reader(IO.buffer(IO.read(dataFile
189 .getAbsolutePath()))))) {
190 String line = reader.readLine();
191 final String[] inputVariables = line.split("\t");
192 for (int i = 0; i < inputVariables.length; ++i) {
193 inputVariables[i] = inputVariables[i].substring(1);
194 }
195 this.inputVariables = ImmutableList.copyOf(inputVariables);
196 LOGGER.info("Input schema: ({})", Joiner.on(", ").join(this.inputVariables));
197 final Tracker tracker = new Tracker(LOGGER, null,
198 "Parsed " + dataFile + ": %d tuples (%d tuple/s avg)",
199 "Parsed %d tuples (%d tuple/s, %d tuple/s avg)");
200 tracker.start();
201 while ((line = reader.readLine()) != null) {
202 data.add(line.getBytes(Charsets.UTF_8));
203 tracker.increment();
204 }
205 tracker.end();
206 }
207 this.inputData = data.toArray(new byte[data.size()][]);
208
209
210 final Properties defaultQueryProperties = new Properties();
211 if (this.timeout >= 0) {
212 defaultQueryProperties.setProperty("timeout", Long.toString(this.timeout));
213 }
214 final List<Query> allQueries = Query.create(properties, defaultQueryProperties);
215 final List<Query> enabledQueries = Lists.newArrayList();
216 final Set<String> enabledNames = Sets.newLinkedHashSet(Arrays.asList(TestUtil.read(
217 properties, "test.queries", String.class).split("\\s*[,]\\s*")));
218 for (final String name : enabledNames) {
219 boolean added = false;
220 for (final Query query : allQueries) {
221 if (query.getName().equals(name)) {
222 enabledQueries.add(query);
223 added = true;
224 break;
225 }
226 }
227 Preconditions.checkArgument(added, "Unknown query " + name);
228 }
229 this.queries = enabledQueries.toArray(new Query[enabledQueries.size()]);
230 LOGGER.info("{} queries enabled ({} defined): {}", enabledQueries.size(),
231 allQueries.size(), Joiner.on(", ").join(enabledQueries));
232
233
234 final List<String> outputVariables = Lists.newArrayList("mix.client", "mix.index",
235 "mix.input", "mix.start", "mix.time");
236 for (final String variable : this.inputVariables) {
237 outputVariables.add("input." + variable);
238 }
239 for (final Query query : this.queries) {
240 for (final String inputVariable : query.getInputVariables()) {
241 if (!this.inputVariables.contains(inputVariable)) {
242 throw new IllegalArgumentException("Query " + query
243 + " refers to unknown input variable " + inputVariable);
244 }
245 }
246 for (final String outputVariable : Ordering.natural().immutableSortedCopy(
247 query.getOutputVariables())) {
248 outputVariables.add(query.getName() + "." + outputVariable);
249 }
250 }
251 this.outputVariables = ImmutableList.copyOf(outputVariables);
252 LOGGER.info("Output schema: {} attributes", this.outputVariables.size());
253 }
254
255 public void run() throws Throwable {
256
257
258 final Writer writer = this.outputFile == null ? null : IO.utf8Writer(IO.buffer(IO
259 .write(this.outputFile.getAbsolutePath())));
260
261 try {
262
263 final Random random = new Random(this.seed);
264
265
266 final long ts = System.currentTimeMillis();
267 final List<String> queryNames = Lists.newArrayList();
268 for (final Query query : this.queries) {
269 queryNames.add(query.getName());
270 }
271 final Statistics stats = new Statistics(queryNames);
272
273
274 LOGGER.info("Test started");
275
276
277 if (this.warmupMixes > 0) {
278 runClients(this.warmupMixes, this.warmupTime, random, "Warmup", null, null);
279 }
280
281
282 if (this.clients > 0 && this.testMixes > 0) {
283 if (writer != null) {
284 for (int i = 0; i < this.outputVariables.size(); ++i) {
285 writer.write(i == 0 ? "?" : "\t?");
286 writer.write(this.outputVariables.get(i));
287 }
288 writer.write("\n");
289 }
290 runClients(this.testMixes, this.testTime, random, "Measurement", writer, stats);
291 }
292
293
294 LOGGER.info("Test completed in {} ms\n\n{}\n", System.currentTimeMillis() - ts, stats);
295
296 } finally {
297
298 IO.closeQuietly(writer);
299 }
300 }
301
302 private void runClients(final int maxMixes, final long maxTime, final Random random,
303 final String phaseName, @Nullable final Writer writer, @Nullable final Statistics stats)
304 throws Throwable {
305
306
307 LOGGER.info("{} started ({} clients, {} mix(es), {} queries/mix)", phaseName,
308 this.clients, maxMixes, this.queries.length);
309
310
311 final Tracker tracker = new Tracker(LOGGER, null,
312 "Completed %d query mixes (%d mixes/s avg)",
313 "Completed %d query mixes (%d mixes/s, %d mixes/s avg)");
314 tracker.start();
315
316
317 final AtomicReference<Throwable> exceptionHolder = new AtomicReference<>();
318 final Thread mainThread = Thread.currentThread();
319 final ExecutorService executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder()
320 .setDaemon(true).setNameFormat("client-%02d").build());
321 final AtomicLong startTimestamp = new AtomicLong(Long.MAX_VALUE);
322 final AtomicLong endTimestamp = new AtomicLong(Long.MIN_VALUE);
323 final long[] clientExecutionTimes = new long[this.clients];
324 final int[] clientMixes = new int[this.clients];
325 try {
326 final AtomicInteger globalMixCounter = new AtomicInteger(maxMixes);
327 for (int i = 0; i < this.clients; ++i) {
328 final int clientId = i;
329 executor.submit(new Runnable() {
330
331 @Override
332 public void run() {
333 final String oldContext = MDC.get(Logging.MDC_CONTEXT);
334 try {
335 MDC.put(Logging.MDC_CONTEXT, String.format("client%d", clientId));
336 final AtomicInteger localMixCounter = new AtomicInteger(0);
337 final long startTs = System.currentTimeMillis();
338 synchronized (startTimestamp) {
339 if (startTs < startTimestamp.get()) {
340 startTimestamp.set(startTs);
341 }
342 }
343 final long endTs = runClient(clientId, globalMixCounter,
344 localMixCounter, maxTime, random, tracker, startTs, writer,
345 stats);
346 clientExecutionTimes[clientId] = endTs - startTs;
347 clientMixes[clientId] = localMixCounter.get();
348 synchronized (endTimestamp) {
349 if (endTs > endTimestamp.get()) {
350 endTimestamp.set(endTs);
351 }
352 }
353 } catch (final Throwable ex) {
354 LOGGER.error("[{}] Client failed", clientId, ex);
355 exceptionHolder.compareAndSet(null, ex);
356 mainThread.interrupt();
357 } finally {
358 MDC.put(Logging.MDC_CONTEXT, oldContext);
359 }
360 }
361
362 });
363 }
364 executor.shutdown();
365 executor.awaitTermination(Integer.MAX_VALUE, TimeUnit.MILLISECONDS);
366
367 } catch (final InterruptedException ex) {
368 final Throwable ex2 = exceptionHolder.get();
369 if (ex2 != null) {
370 ex.addSuppressed(ex2);
371 }
372 throw ex;
373
374 } finally {
375 executor.shutdownNow();
376 tracker.end();
377 }
378
379
380 if (exceptionHolder.get() != null) {
381 throw exceptionHolder.get();
382 }
383
384
385 final long elapsed = endTimestamp.get() - startTimestamp.get();
386 if (stats != null) {
387 stats.reportElapsedTest(elapsed);
388 }
389
390
391 LOGGER.info("{} completed in {} ms (client time: {}-{} ms; client mixes: {}-{})",
392 phaseName, elapsed, Longs.min(clientExecutionTimes),
393 Longs.max(clientExecutionTimes), Ints.min(clientMixes), Ints.max(clientMixes));
394 }
395
396 private long runClient(final int clientId, final AtomicInteger globalMixCounter,
397 final AtomicInteger localMixCounter, final long maxTime, final Random random,
398 @Nullable final Tracker tracker, final long startTimestamp,
399 @Nullable final Writer writer, @Nullable final Statistics stats) throws IOException {
400
401
402 LOGGER.debug("Client started");
403
404
405 long timestamp = startTimestamp;
406 try (Client client = Client.builder(this.url).compressionEnabled(true)
407 .validateServer(false).build()) {
408 try (final Session session = client.newSession(this.username, this.password)) {
409
410
411 LOGGER.debug("Client ready");
412
413
414 final String clientContext = MDC.get(Logging.MDC_CONTEXT);
415 try {
416 while (globalMixCounter.getAndDecrement() > 0
417 && timestamp < startTimestamp + maxTime) {
418
419 final int mixIndex = localMixCounter.incrementAndGet();
420 final String mixContext = String.format("%s.mix%d", clientContext,
421 mixIndex);
422 MDC.put(Logging.MDC_CONTEXT, mixContext);
423
424
425 final int index;
426 synchronized (random) {
427 index = random.nextInt(this.inputData.length);
428 }
429 final String inputLine = new String(this.inputData[index], Charsets.UTF_8);
430 final BindingSet input = TestUtil.decode(this.inputVariables, inputLine);
431
432
433 final ValueFactory vf = Statements.VALUE_FACTORY;
434 final MapBindingSet output = new MapBindingSet();
435 final long mixStartTimestamp = timestamp;
436 output.addBinding("mix.client", vf.createLiteral(clientId));
437 output.addBinding("mix.index", vf.createLiteral(mixIndex));
438 output.addBinding("mix.input", vf.createLiteral(index));
439 output.addBinding("mix.start", vf.createLiteral(mixStartTimestamp));
440
441
442 LOGGER.debug("Started for input #{}", index);
443
444
445 for (final Query query : this.queries) {
446 try {
447 MDC.put(Logging.MDC_CONTEXT,
448 String.format("%s.%s", mixContext, query.getName()));
449 final MapBindingSet queryOutput = new MapBindingSet();
450 timestamp = query.evaluate(session, timestamp, input, queryOutput,
451 stats);
452 for (final Binding binding : queryOutput) {
453 output.addBinding(query.getName() + "." + binding.getName(),
454 binding.getValue());
455 }
456 } finally {
457 MDC.put(Logging.MDC_CONTEXT, mixContext);
458 }
459 }
460 final long elapsed = timestamp - mixStartTimestamp;
461
462
463 LOGGER.debug("Completed in {} ms", elapsed);
464 if (tracker != null) {
465 tracker.increment();
466 }
467
468
469 output.addBinding("mix.time", vf.createLiteral(elapsed));
470 if (stats != null) {
471 stats.reportQueryMixCompletion(elapsed);
472 }
473
474
475 if (writer != null) {
476 LOGGER.trace("Emitting:\n{}",
477 TestUtil.format(this.outputVariables, output, "\n"));
478 final String outputLine = TestUtil
479 .encode(this.outputVariables, output);
480 synchronized (writer) {
481 writer.write(outputLine);
482 writer.write("\n");
483 }
484 }
485 }
486 } finally {
487 MDC.put(Logging.MDC_CONTEXT, clientContext);
488 }
489 }
490 }
491
492
493 LOGGER.debug("Client terminated ({} query mixes)", localMixCounter.get());
494
495
496 return timestamp;
497 }
498
499 private static abstract class Query {
500
501 private static final Logger LOGGER = LoggerFactory.getLogger(Query.class);
502
503 private final String name;
504
505 private final Long timeout;
506
507 private final Set<String> inputVariables;
508
509 private final Set<String> outputVariables;
510
511 Query(final String name, final Properties properties,
512 final Iterable<String> inputVariables, final Iterable<String> outputVariables) {
513
514 final String timeout = properties.getProperty("timeout");
515
516 this.name = name;
517 this.timeout = timeout != null ? Long.parseLong(timeout) : null;
518 this.inputVariables = ImmutableSet.copyOf(inputVariables);
519 this.outputVariables = ImmutableSet.copyOf(Iterables.concat(
520 ImmutableSet.of("start", "time", "error"), outputVariables));
521 }
522
523 public static List<Query> create(final Properties properties,
524 final Properties defaultQueryProperties) {
525
526 final Map<String, Properties> map = Maps.newLinkedHashMap();
527 for (final Object key : properties.keySet()) {
528 final String keyString = key.toString();
529 final int index = keyString.indexOf(".");
530 if (index > 0) {
531 final String queryName = keyString.substring(0, index);
532 final String propertyName = keyString.substring(index + 1);
533 final String propertyValue = properties.getProperty(keyString);
534 Properties queryProperties = map.get(queryName);
535 if (queryProperties == null) {
536 queryProperties = new Properties();
537 queryProperties.putAll(defaultQueryProperties);
538 map.put(queryName, queryProperties);
539 }
540 queryProperties.setProperty(propertyName, propertyValue);
541 }
542 }
543
544 final List<Query> queries = Lists.newArrayList();
545 for (final Map.Entry<String, Properties> entry : map.entrySet()) {
546 final String queryName = entry.getKey();
547 final Properties queryProperties = entry.getValue();
548 final String queryType = queryProperties.getProperty("type");
549 if ("download".equalsIgnoreCase(queryType)) {
550 queries.add(new DownloadQuery(queryName, queryProperties));
551 } else if ("retrieve".equalsIgnoreCase(queryType)) {
552 queries.add(new RetrieveQuery(queryName, queryProperties));
553 } else if ("lookup".equalsIgnoreCase(queryType)) {
554 queries.add(new LookupQuery(queryName, queryProperties));
555 } else if ("lookupall".equalsIgnoreCase(queryType)) {
556 queries.add(new LookupAllQuery(queryName, queryProperties));
557 } else if ("count".equalsIgnoreCase(queryType)) {
558 queries.add(new CountQuery(queryName, queryProperties));
559 } else if ("sparql".equalsIgnoreCase(queryType)) {
560 queries.add(new SparqlQuery(queryName, queryProperties));
561 }
562 }
563 return queries;
564 }
565
566 public String getName() {
567 return this.name;
568 }
569
570 public Long getTimeout() {
571 return this.timeout;
572 }
573
574 public Set<String> getInputVariables() {
575 return this.inputVariables;
576 }
577
578 public Set<String> getOutputVariables() {
579 return this.outputVariables;
580 }
581
582 public long evaluate(final Session session, final long startTimestamp,
583 final BindingSet input, final MapBindingSet output,
584 @Nullable final Statistics stats) {
585
586 final ValueFactory vf = ValueFactoryImpl.getInstance();
587
588 if (LOGGER.isDebugEnabled()) {
589 final StringBuilder builder = new StringBuilder();
590 builder.append("Started: ");
591 builder.append(TestUtil.format(this.inputVariables, input, " "));
592 LOGGER.debug(builder.toString());
593 }
594
595 String error = "";
596 try {
597 doEvaluate(session, input, output);
598 } catch (final Throwable ex) {
599 error = ex.getClass().getSimpleName() + " - "
600 + Strings.nullToEmpty(ex.getMessage());
601 LOGGER.warn("Got exception", ex);
602 }
603
604 long size = -1;
605 try {
606 final Value value = output.getValue("size");
607 if (value != null) {
608 size = Long.parseLong(value.stringValue());
609 }
610 } catch (final Throwable ex) {
611
612 }
613
614 final long endTimestamp = System.currentTimeMillis();
615 final long elapsed = endTimestamp - startTimestamp;
616
617 if (stats != null) {
618 stats.reportQueryCompletion(this.name, !"".equals(error), elapsed, size);
619 }
620
621 output.addBinding("time", vf.createLiteral(elapsed));
622
623 if (LOGGER.isDebugEnabled()) {
624 final StringBuilder builder = new StringBuilder();
625 builder.append("".equals(error) ? "Success" : "Failure");
626 builder.append(": ");
627 builder.append(TestUtil.format(this.inputVariables, input, " "));
628 builder.append(" -> ");
629 builder.append(TestUtil.format(this.outputVariables, output, " "));
630 LOGGER.debug(builder.toString());
631 }
632
633 output.addBinding("start", vf.createLiteral(startTimestamp));
634 output.addBinding("error", vf.createLiteral(error));
635
636 return endTimestamp;
637 }
638
639 abstract void doEvaluate(Session session, BindingSet input, MapBindingSet output)
640 throws Throwable;
641
642 @Override
643 public String toString() {
644 return this.name;
645 }
646
647 private static class DownloadQuery extends Query {
648
649 private final Template id;
650
651 private final boolean caching;
652
653 DownloadQuery(final String name, final Properties properties) {
654 this(name, properties, new Template(properties.getProperty("id")));
655 }
656
657 private DownloadQuery(final String name, final Properties properties, final Template id) {
658 super(name, properties, id.getVariables(), ImmutableList.of("size"));
659 this.id = id;
660 this.caching = "false".equalsIgnoreCase(properties.getProperty("caching"));
661 }
662
663 @Override
664 void doEvaluate(final Session session, final BindingSet input,
665 final MapBindingSet output) throws Throwable {
666
667 final URI id = (URI) Statements.parseValue(this.id.instantiate(input),
668 Namespaces.DEFAULT);
669
670 long size = 0L;
671 try (final Representation representation = session.download(id)
672 .caching(this.caching).timeout(getTimeout()).exec()) {
673 if (representation != null) {
674 size = representation.writeToByteArray().length;
675 } else {
676 LOGGER.warn("No results for DOWNLOAD request, id " + id);
677 }
678 } catch (final Throwable ex) {
679 throw new RuntimeException("Failed DOWNLOAD, id " + TestUtil.format(id)
680 + ", caching " + this.caching, ex);
681 } finally {
682 output.addBinding("size", FACTORY.createLiteral(size));
683 }
684 }
685 }
686
687 private static class RetrieveQuery extends Query {
688
689 private final URI layer;
690
691 @Nullable
692 private final Template condition;
693
694 @Nullable
695 private final Long offset;
696
697 @Nullable
698 private final Long limit;
699
700 @Nullable
701 private final List<URI> properties;
702
703 RetrieveQuery(final String name, final Properties properties) {
704 this(name, properties, Template.forString(properties.getProperty("condition")));
705 }
706
707 private RetrieveQuery(final String name, final Properties properties,
708 @Nullable final Template condition) {
709
710 super(name, properties, condition.getVariables(), ImmutableList.of("size"));
711
712 final String offset = properties.getProperty("offset");
713 final String limit = properties.getProperty("limit");
714
715 List<URI> props = null;
716 if (properties.containsKey("properties")) {
717 props = Lists.newArrayList();
718 for (final String token : Splitter.onPattern("[ ,;]").omitEmptyStrings()
719 .trimResults().split(properties.getProperty("properties"))) {
720 props.add((URI) Statements.parseValue(token));
721 }
722 }
723
724 this.layer = (URI) Statements.parseValue(properties.getProperty("layer"),
725 Namespaces.DEFAULT);
726 this.condition = condition;
727 this.offset = offset == null ? null : Long.parseLong(offset);
728 this.limit = limit == null ? null : Long.parseLong(limit);
729 this.properties = props;
730 }
731
732 @Override
733 void doEvaluate(final Session session, final BindingSet input,
734 final MapBindingSet output) throws Throwable {
735
736 final String condition = Strings.nullToEmpty(this.condition.instantiate(input));
737
738 long numTriples = 0L;
739 try {
740
741 final Stream<Record> stream = session.retrieve(this.layer)
742 .condition(condition).offset(this.offset).limit(this.limit)
743 .properties(this.properties).exec();
744 numTriples = Record.encode(stream, ImmutableList.of(this.layer)).count();
745 if (numTriples == 0) {
746 LOGGER.warn("No results for RETRIEVE request, layer "
747 + TestUtil.format(this.layer) + ", condition '" + condition
748 + "', offset " + this.offset + ", limit " + this.limit);
749 }
750
751 } catch (final Throwable ex) {
752 throw new RuntimeException("Failed RETRIEVE " + TestUtil.format(this.layer)
753 + ", condition " + condition + ", offset " + this.offset + ", limit"
754 + this.limit + ", properties " + this.properties, ex);
755 } finally {
756 output.addBinding("size", FACTORY.createLiteral(numTriples));
757 }
758 }
759 }
760
761 private static class LookupQuery extends Query {
762
763 private final URI layer;
764
765 @Nullable
766 private final Template id;
767
768 @Nullable
769 private final List<URI> properties;
770
771 LookupQuery(final String name, final Properties properties) {
772 this(name, properties, Template.forString(properties.getProperty("id")));
773 }
774
775 private LookupQuery(final String name, final Properties properties,
776 @Nullable final Template id) {
777
778 super(name, properties, id.getVariables(), ImmutableList.of("size"));
779
780 List<URI> props = null;
781 if (properties.containsKey("properties")) {
782 props = Lists.newArrayList();
783 for (final String token : Splitter.onPattern("[ ,;]").omitEmptyStrings()
784 .trimResults().split(properties.getProperty("properties"))) {
785 props.add((URI) Statements.parseValue(token));
786 }
787 }
788
789 this.layer = (URI) Statements.parseValue(properties.getProperty("layer"),
790 Namespaces.DEFAULT);
791 this.id = id;
792 this.properties = props;
793 }
794
795 @Override
796 void doEvaluate(final Session session, final BindingSet input,
797 final MapBindingSet output) throws Throwable {
798
799 final URI id = (URI) Statements.parseValue(this.id.instantiate(input),
800 Namespaces.DEFAULT);
801
802 long numTriples = 0L;
803 try {
804 final Stream<Record> stream = session.retrieve(this.layer).ids(id)
805 .properties(this.properties).exec();
806 numTriples = Record.encode(stream, ImmutableList.of(this.layer)).count();
807 if (numTriples == 0) {
808 LOGGER.warn("No results for LOOKUP request, layer "
809 + TestUtil.format(this.layer) + ", id " + id);
810 }
811
812 } catch (final Throwable ex) {
813 throw new RuntimeException("Failed LOOKUP " + TestUtil.format(this.layer)
814 + ", id " + TestUtil.format(id) + ", properties " + this.properties,
815 ex);
816 } finally {
817 output.addBinding("size", FACTORY.createLiteral(numTriples));
818 }
819 }
820
821 }
822
823 private static class LookupAllQuery extends Query {
824
825 @Nullable
826 private final Template id;
827
828 LookupAllQuery(final String name, final Properties properties) {
829 this(name, properties, Template.forString(properties.getProperty("id")));
830 }
831
832 private LookupAllQuery(final String name, final Properties properties,
833 @Nullable final Template id) {
834 super(name, properties, id.getVariables(), ImmutableList.of("size"));
835 this.id = id;
836 }
837
838 @Override
839 void doEvaluate(final Session session, final BindingSet input,
840 final MapBindingSet output) throws Throwable {
841
842 final URI id = (URI) Statements.parseValue(this.id.instantiate(input),
843 Namespaces.DEFAULT);
844
845 long numTriples = 0L;
846 try {
847 numTriples += Record.encode(session.retrieve(KS.RESOURCE).ids(id).exec(),
848 ImmutableList.of(KS.RESOURCE)).count();
849 numTriples += Record.encode(
850 session.retrieve(KS.MENTION).condition("ks:mentionOf = $$", id)
851 .limit(100000L).exec(), ImmutableList.of(KS.MENTION)).count();
852 if (numTriples == 0) {
853 LOGGER.warn("No results for LOOKUP ALL request, id " + id);
854 }
855 } catch (final Throwable ex) {
856 throw new RuntimeException("Failed LOOKUP ALL, id " + TestUtil.format(id), ex);
857 } finally {
858 output.addBinding("size", FACTORY.createLiteral(numTriples));
859 }
860 }
861 }
862
863 private static class CountQuery extends Query {
864
865 private final URI layer;
866
867 @Nullable
868 private final Template condition;
869
870 CountQuery(final String name, final Properties properties) {
871 this(name, properties, Template.forString(properties.getProperty("condition")));
872 }
873
874 private CountQuery(final String name, final Properties properties,
875 @Nullable final Template condition) {
876
877 super(name, properties, condition.getVariables(), ImmutableList.of("size"));
878
879 this.layer = (URI) Statements.parseValue(properties.getProperty("layer"));
880 this.condition = condition;
881 }
882
883 @Override
884 void doEvaluate(final Session session, final BindingSet input,
885 final MapBindingSet output) throws Throwable {
886
887 final String condition = Strings.nullToEmpty(this.condition.instantiate(input));
888 long numResults = 0L;
889 try {
890 numResults = session.count(this.layer).condition(condition).exec();
891 if (numResults == 0) {
892 LOGGER.warn("No results for COUNT request, layer "
893 + TestUtil.format(this.layer) + ", condition '" + condition + "'");
894 }
895 } catch (final Throwable ex) {
896 throw new RuntimeException("Count " + TestUtil.format(this.layer) + " where "
897 + condition + " failed", ex);
898 } finally {
899 output.addBinding("size", FACTORY.createLiteral(numResults));
900 }
901 }
902
903 }
904
905 private static final class SparqlQuery extends Query {
906
907 private final Template query;
908
909 private final String form;
910
911 SparqlQuery(final String name, final Properties properties) {
912 this(name, properties, Template.forString(properties.getProperty("query")));
913 }
914
915 private SparqlQuery(final String name, final Properties properties,
916 final Template query) {
917 super(name, properties, query.getVariables(), ImmutableList.of("size"));
918 this.query = query;
919 this.form = detectQueryForm(query.getText());
920 }
921
922 private static String detectQueryForm(final String query) {
923
924 final int length = query.length();
925
926 int start = 0;
927 while (start < length) {
928 final char ch = query.charAt(start);
929 if (ch == '#') {
930 while (start < length && query.charAt(start) != '\n') {
931 ++start;
932 }
933 } else if (ch == 'p' || ch == 'b' || ch == 'P' || ch == 'B') {
934 while (start < length && query.charAt(start) != '>') {
935 ++start;
936 }
937 } else if (!Character.isWhitespace(ch)) {
938 break;
939 }
940 ++start;
941 }
942
943 for (int i = start; i < query.length(); ++i) {
944 final char ch = query.charAt(i);
945 if (Character.isWhitespace(ch)) {
946 final String form = query.substring(start, i).toLowerCase();
947 if (!"select".equals(form) && !"construct".equals(form)
948 && !"describe".equals(form) && !"ask".equals(form)) {
949 throw new IllegalArgumentException("Unknown query form: " + form);
950 }
951 return form;
952 }
953 }
954
955 throw new IllegalArgumentException("Cannot detect query form");
956 }
957
958 @Override
959 void doEvaluate(final Session session, final BindingSet input,
960 final MapBindingSet output) throws Throwable {
961
962 long numResults = 0;
963 final String queryString = this.query.instantiate(input);
964 final Sparql operation = session.sparql(queryString).timeout(getTimeout());
965
966 try {
967 switch (this.form) {
968 case "select":
969 numResults = operation.execTuples().count();
970 break;
971 case "construct":
972 case "describe":
973 numResults = operation.execTriples().count();
974 break;
975 case "ask":
976 operation.execBoolean();
977 numResults = 1;
978 break;
979 default:
980 throw new Error();
981 }
982 if (numResults == 0) {
983 LOGGER.warn("No results for SPARQL request, query is\n" + queryString);
984 }
985 output.addBinding("size", FACTORY.createLiteral(numResults));
986 } catch (final Throwable ex) {
987 throw new RuntimeException("Failed SPARQL, form " + this.form.toUpperCase()
988 + ", query:\n" + queryString, ex);
989 } finally {
990 }
991 }
992
993 }
994
995 private static final class Template {
996
997 private static final Pattern PATTERN = Pattern.compile("\\$\\{([^\\}]+)\\}");
998
999 private static Template EMPTY = new Template("");
1000
1001 private final String text;
1002
1003 private final String[] placeholderVariables;
1004
1005 private final Set<String> variables;
1006
1007 private Template(final String string) {
1008 Preconditions.checkNotNull(string);
1009 final List<String> variables = Lists.newArrayList();
1010 final StringBuilder builder = new StringBuilder();
1011 final Matcher matcher = PATTERN.matcher(string);
1012 int offset = 0;
1013 while (matcher.find()) {
1014 builder.append(string.substring(offset, matcher.start()).replace("%", "%%"));
1015 builder.append("%s");
1016 variables.add(matcher.group(1));
1017 offset = matcher.end();
1018 }
1019 builder.append(string.substring(offset).replace("%", "%%"));
1020 this.text = builder.toString();
1021 this.placeholderVariables = variables.toArray(new String[variables.size()]);
1022 this.variables = ImmutableSet.copyOf(variables);
1023 }
1024
1025 static Template forString(@Nullable final String string) {
1026 return string == null ? EMPTY : new Template(string);
1027 }
1028
1029 String getText() {
1030 return this.text;
1031 }
1032
1033 Set<String> getVariables() {
1034 return this.variables;
1035 }
1036
1037 String instantiate(final BindingSet bindings) {
1038 final Object[] placeholderValues = new String[this.placeholderVariables.length];
1039 for (int i = 0; i < placeholderValues.length; ++i) {
1040 final Value value = bindings.getValue(this.placeholderVariables[i]);
1041 placeholderValues[i] = Data.toString(value, null);
1042 }
1043 return String.format(this.text, placeholderValues);
1044 }
1045
1046 }
1047
1048 }
1049
1050 private static final class Statistics {
1051
1052 private static final String EMPTY = String.format("%-8s", "");
1053
1054 private final DescriptiveStatistics queryMixTime;
1055
1056 private final Map<String, QueryInfo> queryInfos;
1057
1058 private final QueryInfo globalInfo;
1059
1060 private long elapsedTime;
1061
1062 public Statistics(final Iterable<String> queryNames) {
1063 this.queryMixTime = new DescriptiveStatistics();
1064 this.queryInfos = Maps.newLinkedHashMap();
1065 this.globalInfo = new QueryInfo();
1066 this.elapsedTime = 0L;
1067 for (final String queryName : queryNames) {
1068 this.queryInfos.put(queryName, new QueryInfo());
1069 }
1070 }
1071
1072 public synchronized void reportQueryCompletion(final String queryName,
1073 final boolean failure, final long time, final long size) {
1074 final QueryInfo info = this.queryInfos.get(queryName);
1075 info.time.addValue(time);
1076 this.globalInfo.time.addValue(time);
1077 if (size >= 0) {
1078 info.size.addValue(size);
1079 this.globalInfo.size.addValue(size);
1080 }
1081 if (failure) {
1082 ++info.numFailures;
1083 ++this.globalInfo.numFailures;
1084 }
1085 }
1086
1087 public synchronized void reportQueryMixCompletion(final long time) {
1088 this.queryMixTime.addValue(time);
1089 }
1090
1091 public synchronized void reportElapsedTest(final long elapsedTime) {
1092 this.elapsedTime = elapsedTime;
1093 }
1094
1095 @Override
1096 public synchronized String toString() {
1097
1098
1099 long testTotalTime = 0;
1100 for (final QueryInfo info : this.queryInfos.values()) {
1101 testTotalTime += (long) info.time.getSum();
1102 }
1103
1104
1105 final StringBuilder builder = new StringBuilder();
1106 emitHeader(builder);
1107 emitSeparator(builder);
1108 for (final Map.Entry<String, QueryInfo> entry : this.queryInfos.entrySet()) {
1109 emitStats(builder, testTotalTime, entry.getKey(), entry.getValue());
1110 }
1111 emitSeparator(builder);
1112 emitStats(builder, testTotalTime, "query (avg)", this.globalInfo);
1113 emitSeparator(builder);
1114 emitStats(builder, testTotalTime, "query mix", (int) this.queryMixTime.getN(), -1,
1115 null, this.queryMixTime);
1116 return builder.toString();
1117 }
1118
1119 private void emitHeader(final StringBuilder builder) {
1120
1121 builder.append(String.format("%-12s%-16s%-64s%-64s%-24s%-16s\n", "", " Executions",
1122 " Result size [solutions, triples or bytes]", " Execution time [ms]",
1123 " Total time [ms]", " Rate"));
1124
1125 builder.append(Strings.repeat(" ", 12));
1126 for (final String field : new String[] { "Total", "Error", "Min", "Q1", "Q2", "Q3",
1127 "Max", "Geom", "Mean", "Std", "Min", "Q1", "Q2", "Q3", "Max", "Geom", "Mean",
1128 "Std", "Sum", "Clock", "Share", "/Sec", "/Hour" }) {
1129 builder.append(String.format("%8s", field));
1130 }
1131 builder.append("\n");
1132 }
1133
1134 private void emitSeparator(final StringBuilder builder) {
1135 builder.append(Strings.repeat("-", 8 * 23 + 12)).append("\n");
1136 }
1137
1138 private void emitStats(final StringBuilder builder, final long testTotalTime,
1139 final String label, final QueryInfo info) {
1140 emitStats(builder, testTotalTime, label, (int) info.time.getN(), info.numFailures,
1141 info.size, info.time);
1142 }
1143
1144 private void emitStats(final StringBuilder builder, final long testTotalTime,
1145 final String label, final int numSuccesses, final int numFailures,
1146 @Nullable final DescriptiveStatistics size, final DescriptiveStatistics time) {
1147
1148 builder.append(String.format("%-12s", label));
1149
1150 builder.append(numSuccesses >= 0 ? String.format("%8d", numSuccesses) : EMPTY);
1151 builder.append(numFailures >= 0 ? String.format("%8d", numFailures) : EMPTY);
1152
1153 if (size != null) {
1154 builder.append(String.format("%8d%8d%8d%8d%8d%8.0f%8.0f%8.0f",
1155 (long) size.getMin(), (long) size.getPercentile(25),
1156 (long) size.getPercentile(50), (long) size.getPercentile(75),
1157 (long) size.getMax(), size.getGeometricMean(), size.getMean(),
1158 size.getStandardDeviation()));
1159 } else {
1160 builder.append(Strings.repeat(EMPTY, 8));
1161 }
1162
1163 if (time != null) {
1164 final long queryTotalTime = (long) time.getSum();
1165 final double share = (double) queryTotalTime / testTotalTime;
1166 final long elapsed = (long) (this.elapsedTime * share);
1167 final double rate = 1000.0 * time.getN() / elapsed;
1168
1169 builder.append(String.format("%8d%8d%8d%8d%8d%8.0f%8.0f%8.0f",
1170 (long) time.getMin(), (long) time.getPercentile(25),
1171 (long) time.getPercentile(50), (long) time.getPercentile(75),
1172 (long) time.getMax(), time.getGeometricMean(), time.getMean(),
1173 time.getStandardDeviation()));
1174
1175 builder.append(String.format("%8d%8d%8.2f", queryTotalTime, elapsed, share));
1176 builder.append(String.format("%8.2f%8.0f", rate, rate * 3600));
1177
1178 } else {
1179 builder.append(Strings.repeat(EMPTY, 13));
1180 }
1181
1182 builder.append("\n");
1183 }
1184
1185 private static class QueryInfo {
1186
1187 public final DescriptiveStatistics time = new DescriptiveStatistics();
1188
1189 public final DescriptiveStatistics size = new DescriptiveStatistics();
1190
1191 public int numFailures;
1192
1193 }
1194
1195 }
1196
1197 }