1 package eu.fbk.knowledgestore.tool;
2
3 import java.io.File;
4 import java.io.IOException;
5 import java.io.OutputStream;
6 import java.util.List;
7
8 import javax.annotation.Nullable;
9
10 import com.google.common.base.Strings;
11 import com.google.common.collect.ImmutableSet;
12 import com.google.common.collect.Lists;
13 import com.google.common.io.CountingOutputStream;
14
15 import org.openrdf.model.Statement;
16 import org.openrdf.model.URI;
17 import org.openrdf.model.impl.URIImpl;
18 import org.openrdf.model.vocabulary.OWL;
19 import org.openrdf.model.vocabulary.RDF;
20 import org.openrdf.model.vocabulary.RDFS;
21 import org.openrdf.rio.RDFFormat;
22 import org.openrdf.rio.RDFHandlerException;
23 import org.openrdf.rio.RDFWriter;
24 import org.openrdf.rio.Rio;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27
28 import jersey.repackaged.com.google.common.base.Throwables;
29
30 import eu.fbk.knowledgestore.KnowledgeStore;
31 import eu.fbk.knowledgestore.Operation.Retrieve;
32 import eu.fbk.knowledgestore.Session;
33 import eu.fbk.knowledgestore.client.Client;
34 import eu.fbk.knowledgestore.data.Dictionary;
35 import eu.fbk.knowledgestore.data.Handler;
36 import eu.fbk.knowledgestore.data.Record;
37 import eu.fbk.knowledgestore.data.Serializer;
38 import eu.fbk.knowledgestore.data.Stream;
39 import eu.fbk.knowledgestore.internal.CommandLine;
40 import eu.fbk.knowledgestore.vocabulary.KS;
41 import eu.fbk.knowledgestore.vocabulary.NFO;
42 import eu.fbk.knowledgestore.vocabulary.NIE;
43 import eu.fbk.knowledgestore.vocabulary.NIF;
44 import eu.fbk.knowledgestore.vocabulary.NWR;
45 import eu.fbk.knowledgestore.vocabulary.SEM;
46 import eu.fbk.knowledgestore.vocabulary.TIME;
47 import eu.fbk.rdfpro.util.IO;
48 import eu.fbk.rdfpro.util.Statements;
49
50 public class Dumper {
51
52 private static Logger LOGGER = LoggerFactory.getLogger(Dumper.class);
53
54 public static void main(final String[] args) throws Throwable {
55 try {
56 final CommandLine cmd = CommandLine
57 .parser()
58 .withName("ks-dumper")
59 .withHeader("Downloads the contents of the resource "
60 + "or mention layer as a single RDF file")
61 .withOption("s", "server", "the URL of the KS instance", "URL",
62 CommandLine.Type.STRING, true, false, true)
63 .withOption("u", "username", "the KS username (if required)", "USER",
64 CommandLine.Type.STRING, true, false, false)
65 .withOption("p", "password", "the KS password (if required)", "PWD",
66 CommandLine.Type.STRING, true, false, false)
67 .withOption("r", "resources", "dump resources data (default: false)")
68 .withOption("m", "mentions", "dump mentions data (default: false)")
69 .withOption("i", "identifier", "the URI identifier of the single record "
70 + "to download (default unspecified = download all records)", "URI",
71 CommandLine.Type.STRING, true, false, false)
72 .withOption("b", "binary", "use binary format (two files produced)")
73 .withOption("o", "output", "the output file", "FILE", CommandLine.Type.FILE,
74 true, false, true)
75 .withFooter(
76 "The RDF format and compression type is automatically detected based on the\n"
77 + "file extension (e.g., 'tql.gz' = gzipped TQL file)")
78 .withLogger(LoggerFactory.getLogger("eu.fbk.knowledgestore")).parse(args);
79
80 final String serverURL = cmd.getOptionValue("s", String.class);
81 final String username = Strings.emptyToNull(cmd.getOptionValue("u", String.class));
82 final String password = Strings.emptyToNull(cmd.getOptionValue("p", String.class));
83 final boolean dumpResources = cmd.hasOption("r");
84 final boolean dumpMentions = cmd.hasOption("m");
85 final String id = Strings.emptyToNull(cmd.getOptionValue("i", String.class, null));
86 final boolean binary = cmd.hasOption("b");
87 final File outputFile = cmd.getOptionValue("o", File.class);
88
89 final KnowledgeStore ks = Client.builder(serverURL).compressionEnabled(true)
90 .maxConnections(2).validateServer(false).build();
91 try {
92 final Session session;
93 if (username != null && password != null) {
94 session = ks.newSession(username, password);
95 } else {
96 session = ks.newSession();
97 }
98 final Stream<Record> records = download(session, dumpResources, dumpMentions, id);
99 if (binary) {
100 writeBinary(records, outputFile);
101 } else {
102 writeRDF(records, outputFile);
103 }
104 session.close();
105 } finally {
106 ks.close();
107 }
108
109 } catch (final Throwable ex) {
110 CommandLine.fail(ex);
111 }
112 }
113
114 private static Stream<Record> download(final Session session, final boolean dumpResources,
115 final boolean dumpMentions, @Nullable final String id) throws Throwable {
116
117 final List<URI> types = Lists.newArrayList();
118 if (dumpResources) {
119 types.add(KS.RESOURCE);
120 }
121 if (dumpMentions) {
122 types.add(KS.MENTION);
123 }
124
125 return Stream.concat(Stream.create(types)
126 .transform(
127 (final URI type) -> {
128 LOGGER.info("Downloading {} data", type.getLocalName().toLowerCase());
129 try {
130 final Retrieve retrieve = session.retrieve(type)
131 .limit((long) Integer.MAX_VALUE)
132 .timeout(7 * 24 * 60 * 60 * 1000L);
133 if (id != null) {
134 retrieve.ids(new URIImpl(id));
135 }
136 return retrieve.exec();
137
138
139 } catch (final Throwable ex) {
140 throw Throwables.propagate(ex);
141 }
142 }, 1));
143 }
144
145 private static void writeRDF(final Stream<Record> records, final File file)
146 throws RDFHandlerException, IOException {
147
148 final RDFFormat format = Rio.getParserFormatForFileName(file.getAbsolutePath());
149 final OutputStream stream = IO.write(file.getAbsolutePath());
150
151 try {
152 final RDFWriter writer = Rio.createWriter(format, stream);
153 writer.startRDF();
154
155 writer.handleNamespace("rdf", RDF.NAMESPACE);
156 writer.handleNamespace("rdfs", RDFS.NAMESPACE);
157 writer.handleNamespace("owl", OWL.NAMESPACE);
158 writer.handleNamespace("ks", KS.NAMESPACE);
159 writer.handleNamespace("nwr", NWR.NAMESPACE);
160 writer.handleNamespace("nif", NIF.NAMESPACE);
161 writer.handleNamespace("nfo", NFO.NAMESPACE);
162 writer.handleNamespace("nie", NIE.NAMESPACE);
163 writer.handleNamespace("sem", SEM.NAMESPACE);
164 writer.handleNamespace("time", TIME.NAMESPACE);
165
166 records.toHandler(new Handler<Record>() {
167
168 private int records = 0;
169
170 private long triples = 0;
171
172 @Override
173 public void handle(final Record record) throws Throwable {
174 if (record == null || this.records > 0 && this.records % 1000 == 0) {
175 LOGGER.info(this.records + " records, " + this.triples
176 + " triples processed");
177 }
178 if (record != null) {
179 final List<Statement> statements = Record.encode(Stream.create(record),
180 ImmutableSet.of()).toList();
181 writer.handleComment(record.getID().toString());
182 for (final Statement statement : statements) {
183 writer.handleStatement(statement);
184 }
185 ++this.records;
186 this.triples += statements.size();
187 }
188 }
189
190 });
191
192 writer.endRDF();
193
194 } finally {
195 stream.close();
196 }
197 }
198
199 private static void writeBinary(final Stream<Record> records, final File file)
200 throws IOException {
201
202 final String base = file.getAbsolutePath();
203 final Dictionary<URI> dictionary = Dictionary.createLocalDictionary(URI.class, new File(
204 base + ".dict"));
205 final Serializer serializer = new Serializer(false, dictionary, Statements.VALUE_FACTORY);
206 final CountingOutputStream stream = new CountingOutputStream(IO.write(base + ".gz"));
207 try {
208 records.toHandler(new Handler<Record>() {
209
210 private int records = 0;
211
212 @Override
213 public void handle(final Record record) throws Throwable {
214 if (record == null || this.records > 0 && this.records % 1000 == 0) {
215 LOGGER.info("{} records, {} bytes processed ({} bytes/record)",
216 this.records, stream.getCount(), stream.getCount() / this.records);
217 }
218 if (record != null) {
219 serializer.toStream(stream, record);
220 ++this.records;
221 }
222 }
223
224 });
225
226 } finally {
227 IO.closeQuietly(records);
228 IO.closeQuietly(stream);
229 }
230 }
231
232 }