1 package eu.fbk.knowledgestore.populator.naf;
2
3 import java.io.BufferedReader;
4 import java.io.BufferedWriter;
5 import java.io.File;
6 import java.io.FileInputStream;
7 import java.io.FileOutputStream;
8 import java.io.IOException;
9 import java.io.InputStreamReader;
10 import java.io.OutputStream;
11 import java.io.OutputStreamWriter;
12 import java.io.Reader;
13 import java.io.Writer;
14 import java.lang.reflect.InvocationTargetException;
15 import java.lang.reflect.Method;
16 import java.util.Hashtable;
17 import java.util.LinkedList;
18 import java.util.zip.ZipEntry;
19 import java.util.zip.ZipInputStream;
20
21 import javax.xml.bind.JAXBException;
22
23 import org.apache.commons.compress.archivers.tar.*;
24 import org.apache.commons.compress.compressors.gzip.*;
25 import org.apache.commons.compress.utils.IOUtils;
26 import org.slf4j.Logger;
27
28 public class NAFRunner {
29
30 void generate(){
31 try {
32 if (nafPopulator.FInFile) {
33
34
35
36 FileInputStream in = new FileInputStream(nafPopulator.INpath);
37 Reader reader = new InputStreamReader(in, "utf8");
38 BufferedReader br = new BufferedReader(reader);
39 String line = "";
40 LinkedList<File> fileslist = new LinkedList<File>();
41 while ((line = br.readLine()) != null) {
42
43 if(fileslist.size() >= nafPopulator.batchSize){
44 RunSystemOnList(fileslist, nafPopulator.disabledItems, nafPopulator.recursion);
45 fileslist.clear();
46 }
47
48 File e=new File(line);
49 if(e.exists())
50 fileslist.addLast(e);
51 else {
52 System.err.println("Path not exist!" + e.getPath());
53
54 }
55
56 }
57 if(fileslist.size()>0){
58 RunSystemOnList(fileslist, nafPopulator.disabledItems, nafPopulator.recursion);
59 fileslist.clear();
60 }
61 in.close();
62 } else if (nafPopulator.ZInFile) {
63
64
65
66 String ZIP_OUTPUT_DIR = "/tmp/nafPopulatorZipOutDir";
67 byte[] buffer = new byte[1024];
68 LinkedList<File> fileslist = new LinkedList<File>();
69 boolean multipleFileFlag = (nafPopulator.batchSize > 1);
70
71
72 File zipDir = new File(ZIP_OUTPUT_DIR);
73 if (!zipDir.exists()) {
74 zipDir.mkdir();
75 }
76
77
78 ZipInputStream zis = new ZipInputStream(new FileInputStream(nafPopulator.INpath));
79
80
81 ZipEntry ze = zis.getNextEntry();
82 while (ze != null) {
83
84
85
86 if (ze.isDirectory()) {
87
88
89
90
91
92
93 } else {
94
95 String zeName = ze.getName();
96
97
98 File tmpFile = new File(ZIP_OUTPUT_DIR + File.separator + zeName);
99 String basename = tmpFile.getName();
100
101 File extractedFile = new File(ZIP_OUTPUT_DIR + File.separator + basename);
102 String extractedPath = extractedFile.getAbsolutePath();
103
104
105
106
107
108 FileOutputStream fos = new FileOutputStream(extractedFile);
109 int len;
110 while ((len = zis.read(buffer)) > 0) {
111 fos.write(buffer, 0, len);
112 }
113 fos.close();
114
115
116 if (multipleFileFlag) {
117
118
119 if (fileslist.size() >= nafPopulator.batchSize) {
120 RunSystemOnList(fileslist, nafPopulator.disabledItems, nafPopulator.recursion);
121 fileslist.clear();
122 }
123
124
125 fileslist.addLast(extractedFile);
126 } else {
127
128
129 analyzePathAndRunSystem(extractedPath, nafPopulator.disabledItems, nafPopulator.recursion);
130 }
131 }
132
133 zis.closeEntry();
134 ze = zis.getNextEntry();
135 }
136
137
138
139 if (multipleFileFlag && (fileslist.size() > 0)) {
140 RunSystemOnList(fileslist, nafPopulator.disabledItems, nafPopulator.recursion);
141 fileslist.clear();
142 }
143
144
145 zis.closeEntry();
146 zis.close();
147
148
149 } else if (nafPopulator.TInFile) {
150
151
152
153 String TAR_OUTPUT_DIR = "/tmp/nafPopulatorTarOutDir";
154 byte[] buffer = new byte[1024];
155 LinkedList<File> fileslist = new LinkedList<File>();
156 boolean multipleFileFlag = (nafPopulator.batchSize > 1);
157
158
159 File tgzDir = new File(TAR_OUTPUT_DIR);
160 if (!tgzDir.exists()) {
161 tgzDir.mkdir();
162 }
163
164
165 TarArchiveInputStream is = new TarArchiveInputStream(new GzipCompressorInputStream(new FileInputStream(nafPopulator.INpath)));
166
167
168 TarArchiveEntry te = (TarArchiveEntry)is.getNextEntry();
169 while (te != null) {
170
171
172
173 if (te.isDirectory()) {
174
175
176
177
178
179
180 } else {
181
182 String teName = te.getName();
183
184
185 File tmpFile = new File(TAR_OUTPUT_DIR + File.separator + teName);
186 String basename = tmpFile.getName();
187
188 File extractedFile = new File(TAR_OUTPUT_DIR + File.separator + basename);
189 String extractedPath = extractedFile.getAbsolutePath();
190
191
192
193
194
195 OutputStream outputFileStream = new FileOutputStream(extractedFile);
196 IOUtils.copy(is, outputFileStream);
197 outputFileStream.close();
198
199
200 if (multipleFileFlag) {
201
202
203 if (fileslist.size() >= nafPopulator.batchSize) {
204 RunSystemOnList(fileslist, nafPopulator.disabledItems, nafPopulator.recursion);
205 fileslist.clear();
206 }
207
208
209 fileslist.addLast(extractedFile);
210 } else {
211
212
213 analyzePathAndRunSystem(extractedPath, nafPopulator.disabledItems, nafPopulator.recursion);
214 }
215 }
216
217 te = (TarArchiveEntry)is.getNextEntry();
218 }
219
220
221
222 if (multipleFileFlag && (fileslist.size() > 0)) {
223 RunSystemOnList(fileslist, nafPopulator.disabledItems, nafPopulator.recursion);
224 fileslist.clear();
225 }
226
227
228 is.close();
229
230 } else {
231
232
233
234 analyzePathAndRunSystem(nafPopulator.INpath, nafPopulator.disabledItems, nafPopulator.recursion);
235 }
236 } catch(Exception e) {
237 e.printStackTrace();
238 nafPopulator.logger.error(nafPopulator.INpath + " Processing phase: file discarded!\n");
239 }
240 nafPopulator.JobFinished=true;
241 }
242
243 private void RunSystemOnList(LinkedList<File> fileslist, String disabledItems, boolean rec)
244 throws JAXBException, IOException, InstantiationException, IllegalAccessException,
245 NoSuchMethodException, SecurityException, ClassNotFoundException, InterruptedException {
246 Hashtable<String, KSPresentation> mentions = new Hashtable<String, KSPresentation>();
247 boolean submittedFlag = false;
248 for(File filePath:fileslist){
249 if (filePath.exists() && filePath.isDirectory()) {
250
251 File[] listOfFiles = filePath.listFiles();
252
253 for (int i = 0; i < listOfFiles.length; i++) {
254 if (listOfFiles[i].exists() && listOfFiles[i].isFile()) {
255
256
257 runClass(listOfFiles[i].getPath(), disabledItems,mentions);
258 } else if (listOfFiles[i].exists() && listOfFiles[i].isDirectory()) {
259 analyzePathAndRunSystem(listOfFiles[i].getPath(), disabledItems, rec);
260 }
261 nafPopulator.out.flush();
262
263 submittedFlag = checkAddOrSubmit(mentions);
264
265 }
266
267 if ((nafPopulator.batchSize == -1) && (! submittedFlag)) {
268 addAndFreeMemory(mentions);
269 }
270 } else if (filePath.exists() && filePath.isFile()) {
271
272
273 runClass(filePath.getPath(), disabledItems,mentions);
274 submittedFlag = checkAddOrSubmit(mentions);
275 }
276 }
277 if(! submittedFlag){
278 addAndFreeMemory(mentions);
279 }
280
281 nafPopulator.out.flush();
282 if (nafPopulator.printToFile && (nafPopulator.mentionFile != null)) {
283 nafPopulator.mentionFile.flush();
284 }
285 }
286
287
288
289
290 boolean checkAddOrSubmit(Hashtable<String, KSPresentation> mentions) throws InterruptedException{
291 if (((mentions.size() % nafPopulator.batchSize) == 0) && (nafPopulator.batchSize != -1)) {
292 addAndFreeMemory(mentions);
293 return true;
294 } else {
295 return false;
296 }
297 }
298
299 void addAndFreeMemory(Hashtable<String, KSPresentation> mentions) throws InterruptedException{
300 Producer.queue.put(mentions);
301
302 mentions = new Hashtable<String, KSPresentation>();
303 System.gc();
304 Runtime.getRuntime().gc();
305 }
306
307 private void analyzePathAndRunSystem(String path, String disabledItems, boolean rec)
308 throws JAXBException, IOException, InstantiationException, IllegalAccessException,
309 NoSuchMethodException, SecurityException, ClassNotFoundException, InterruptedException {
310 File filePath = new File(path);
311 if (filePath.exists()) {
312
313 if (filePath.exists() && filePath.isDirectory()) {
314
315 File[] listOfFiles = filePath.listFiles();
316 Hashtable<String, KSPresentation> mentions = new Hashtable<String, KSPresentation>();
317 for (int i = 0; i < listOfFiles.length; i++) {
318 if (listOfFiles[i].exists() && listOfFiles[i].isFile()) {
319
320
321 runClass(listOfFiles[i].getPath(), disabledItems,mentions);
322 } else if (listOfFiles[i].exists() && listOfFiles[i].isDirectory()) {
323 analyzePathAndRunSystem(listOfFiles[i].getPath(), disabledItems, rec);
324 }
325
326 if (nafPopulator.batchSize != -1&&mentions.size() % nafPopulator.batchSize==0 ) {
327
328
329
330
331
332
333
334 Producer.queue.put(mentions);
335
336 mentions = new Hashtable<String, KSPresentation>();
337 System.gc();
338 Runtime.getRuntime().gc();
339 }
340
341 }
342 if(mentions.size()>0){
343 Producer.queue.put(mentions);
344
345 mentions = new Hashtable<String, KSPresentation>();
346 System.gc();
347 Runtime.getRuntime().gc();
348 }
349
350 if (nafPopulator.batchSize == -1) {
351
352
353
354
355
356
357 Producer.queue.put(mentions);
358
359 mentions = new Hashtable<String, KSPresentation>();
360 System.gc();
361 Runtime.getRuntime().gc();
362 }
363 } else if (filePath.exists() && filePath.isFile()) {
364
365
366 Hashtable<String, KSPresentation> mentions = new Hashtable<String, KSPresentation>();
367 runClass(filePath.getPath(), disabledItems,mentions);
368
369
370
371
372
373 Producer.queue.put(mentions);
374
375 mentions = new Hashtable<String, KSPresentation>();
376 System.gc();
377 Runtime.getRuntime().gc();
378 }
379 if (nafPopulator.printToFile &&
380 (nafPopulator.mentionFile != null)) {
381 nafPopulator.mentionFile.flush();
382 }
383 } else {
384 System.err.println("Path not exist!" + filePath.getPath());
385
386 }
387 }
388
389
390
391 public void runClass(String path, String disabledItems, Hashtable<String, KSPresentation> mentions) throws InstantiationException,
392 IllegalAccessException, NoSuchMethodException, SecurityException,
393 ClassNotFoundException, IOException {
394
395 System.out.println(path);
396 String className = "eu.fbk.knowledgestore.populator.naf.processNAF";
397 Class clazz = Class.forName(className);
398 Class[] parameters = new Class[] { String.class, Writer.class, String.class, boolean.class };
399 Method method = clazz.getMethod("init", parameters);
400 Object obj = clazz.newInstance();
401 try {
402 KSPresentation as = (KSPresentation) method.invoke(obj, path, nafPopulator.out, disabledItems,
403 nafPopulator.store_partial_info);
404 if (as != null) {
405 mentions.put(path, as);
406 } else {
407 nafPopulator.logger.error(path + " null is returned from processNAF procedure! - Processing phase: file discarded!\n");
408 }
409 } catch (IllegalAccessException e) {
410 e.printStackTrace();
411 nafPopulator.logger.error(path + " Processing phase: file discarded!\n");
412 } catch (IllegalArgumentException e) {
413 e.printStackTrace();
414 nafPopulator.logger.error(path + " Processing phase: file discarded!\n");
415 } catch (InvocationTargetException e) {
416 e.printStackTrace();
417 nafPopulator.logger.error(path + " Processing phase: file discarded!\n");
418 }
419
420 }
421
422 }