1   package eu.fbk.knowledgestore.client;
2   
3   import java.util.Date;
4   
5   import com.google.common.collect.AbstractIterator;
6   
7   import org.openrdf.model.ValueFactory;
8   import org.openrdf.model.vocabulary.DCTERMS;
9   import org.slf4j.Logger;
10  import org.slf4j.LoggerFactory;
11  
12  import eu.fbk.knowledgestore.KnowledgeStore;
13  import eu.fbk.knowledgestore.OperationException;
14  import eu.fbk.knowledgestore.Outcome;
15  import eu.fbk.knowledgestore.Session;
16  import eu.fbk.knowledgestore.data.Data;
17  import eu.fbk.knowledgestore.data.Handler;
18  import eu.fbk.knowledgestore.data.Record;
19  import eu.fbk.knowledgestore.data.Stream;
20  import eu.fbk.knowledgestore.vocabulary.KS;
21  
22  public class ClientStressTest {
23  
24      private static final Logger LOGGER = LoggerFactory.getLogger(ClientStressTest.class);
25  
26      private static final String SERVER_URL = "http://localhost:8080/";
27  
28      private static final String USERNAME = "ks";
29  
30      private static final String PASSWORD = "kspass";
31  
32      private static final int NUM_RECORDS = 100;
33  
34      public static void main(final String... args) throws Throwable {
35  
36          // Initialize a KnowledgeStore client
37          final KnowledgeStore store = Client.builder(SERVER_URL).maxConnections(2)
38                  .validateServer(false).build();
39  
40          try {
41              // Acquire a session for a given username/password pair
42              final Session session = store.newSession(USERNAME, PASSWORD);
43  
44              // Clear all resources stored in the KS
45              session.delete(KS.RESOURCE).exec();
46              session.delete(KS.MENTION).exec();
47  
48              // Upload records
49              storeSingle(session);
50  
51              // Clear all resources stored in the KS
52              session.delete(KS.RESOURCE).exec();
53  
54              // Close the session
55              session.close();
56  
57          } finally {
58              // Ensure to close the KS (will also close pending sessions)
59              store.close();
60          }
61      }
62  
63      static void storeSingle(final Session session) throws Throwable {
64          final Stream<Record> records = createRecordStream();
65          for (final Record record : records) {
66              session.merge(KS.RESOURCE).criteria("overwrite *").records(record).exec();
67          }
68      }
69  
70      static void storeBatch(final Session session) throws OperationException {
71          session.merge(KS.RESOURCE).criteria("overwrite *").records(createRecordStream())
72                  .exec(new Handler<Outcome>() {
73  
74                      private boolean started = false;
75  
76                      @Override
77                      public void handle(final Outcome outcome) throws Throwable {
78                          if (outcome == null) {
79                              LOGGER.info("Done receiving outcomes");
80  
81                          } else if (!this.started) {
82                              LOGGER.info("Started receiving outcomes");
83                              this.started = true;
84                          }
85                      }
86  
87                  });
88      }
89  
90      static Stream<Record> createRecordStream() {
91          return Stream.create(new AbstractIterator<Record>() {
92  
93              private int index = 0;
94  
95              @Override
96              protected Record computeNext() {
97                  ++this.index;
98                  if (this.index > NUM_RECORDS) {
99                      return endOfData();
100                 }
101                 if (this.index % 100 == 0) {
102                     LOGGER.info("{} records generated", this.index);
103                 }
104                 final ValueFactory factory = Data.getValueFactory();
105                 final Record record = Record.create(factory.createURI("ex:resource" + this.index),
106                         KS.RESOURCE);
107                 record.set(DCTERMS.TITLE, "Resource " + this.index);
108                 record.set(DCTERMS.CREATOR, "John Smith");
109                 record.set(DCTERMS.CREATED, new Date());
110                 // record.set(DCTERMS.ABSTRACT, Strings.repeat("... bla ...\n", 1000));
111                 return record;
112             }
113 
114         });
115     }
116 
117 }