1 package eu.fbk.knowledgestore.populator.naf;
2
3 import eu.fbk.knowledgestore.Session;
4 import eu.fbk.knowledgestore.data.Data;
5 import eu.fbk.knowledgestore.data.Record;
6
7 import java.io.IOException;
8 import java.lang.reflect.InvocationTargetException;
9 import java.lang.reflect.Method;
10 import java.util.Hashtable;
11 import java.util.Map.Entry;
12 import java.util.concurrent.BlockingQueue;
13
14 public class Consumer implements Runnable {
15
16 private BlockingQueue<Hashtable<String, KSPresentation>> queue;
17
18 public Consumer(BlockingQueue<Hashtable<String, KSPresentation>> q) {
19 this.queue = q;
20 }
21 static boolean called = false;
22 static int cc=1;
23 @Override
24 public void run() {
25 Session session = null;
26 try {
27 System.out.println("Start Consumer:"+cc);
28 int cN= cc;
29 cc++;
30 if(nafPopulator.store!=null&&!nafPopulator.store.isClosed()){
31 session = nafPopulator.store.newSession(nafPopulator.USERNAME, nafPopulator.PASSWORD);
32 }
33
34 while (!nafPopulator.JobFinished||queue.size()>0) {
35
36
37
38 Hashtable<String, KSPresentation> obl = queue.poll();
39 if(obl!=null){
40 System.out.println("Consumer:"+cc+" is serving{"+obl.keySet()+"}");
41 if (!nafPopulator.printToFile) {
42 submitCollectedData(obl,session);
43
44 } else {
45 appendCollectedDataToFile(obl);
46
47 }
48 }
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65 }
66
67
68
69
70
71
72
73 } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | NoSuchMethodException | SecurityException | IOException e) {
74 e.printStackTrace();
75 }
76 finally {
77 if (session != null) {
78 session.close();
79 }
80 }
81 }
82
83
84
85
86 private static void submitCollectedData(Hashtable<String, KSPresentation> mentions, Session session) throws ClassNotFoundException,
87 InstantiationException, IllegalAccessException, NoSuchMethodException,
88 SecurityException, IOException {
89 String className = "eu.fbk.knowledgestore.populator.naf.submitKS";
90 Class clazz = Class.forName(className);
91 Class[] parameters = new Class[] { Hashtable.class, boolean.class, Session.class };
92 Method method = clazz.getMethod("init", parameters);
93 Object obj = clazz.newInstance();
94 try {
95
96 nafPopulator.checkSession();
97 int status = (Integer) method.invoke(obj, mentions, nafPopulator.store_partial_info, session);
98 if (status == 1) {
99 }
100 if (status == 0) {
101
102 Hashtable<String, KSPresentation> mentmp = new Hashtable<String, KSPresentation>();
103 for (Entry<String, KSPresentation> rc : mentions.entrySet()) {
104 mentmp.put(rc.getKey(), rc.getValue());
105
106 int status2 = (Integer) method
107 .invoke(obj, mentmp, nafPopulator.store_partial_info, nafPopulator.session);
108 if (status2 == 1) {
109
110 mentmp.clear();
111 }
112 if (status2 == 0) {
113 nafPopulator.logger.error("Error storing this file to KS: "
114 + rc.getValue().getNaf_file_path());
115 }
116
117 }
118 }
119 mentions.clear();
120 } catch (IllegalAccessException e) {
121 e.printStackTrace();
122 String error = " Involved file(s):";
123 for (KSPresentation vl : mentions.values()) {
124 error += vl.getNaf_file_path() + ",";
125 }
126 error += ((e.getMessage() != null) ? e.getMessage() : "")
127 + "\nStoring to KS phase: Populating mentions interrupted!";
128 nafPopulator.logger.error(error);
129 } catch (IllegalArgumentException e) {
130 e.printStackTrace();
131 String error = " Involved file(s):";
132 for (KSPresentation vl : mentions.values()) {
133 error += vl.getNaf_file_path() + ",";
134 }
135 error += ((e.getMessage() != null) ? e.getMessage() : "")
136 + "\nStoring to KS phase: Populating mentions interrupted!";
137 nafPopulator.logger.error(error);
138 } catch (InvocationTargetException e) {
139 e.printStackTrace();
140 String error = " Involved file(s):";
141 for (KSPresentation vl : mentions.values()) {
142 error += vl.getNaf_file_path() + ",";
143 }
144 error += ((e.getMessage() != null) ? e.getMessage() : "")
145 + "\nStoring to KS phase: Populating mentions interrupted!";
146 nafPopulator.logger.error(error);
147 } finally {
148 mentions.clear();
149 }
150 }
151
152 private static void appendCollectedDataToFile(Hashtable<String, KSPresentation> mentions) throws IOException {
153
154 for (Entry<String, KSPresentation> mn : mentions.entrySet()) {
155 String naf_file_path = mn.getValue().getNaf_file_path();
156 String stats = mn.getValue().getStats().getStats();
157 if (nafPopulator.out != null) {
158 nafPopulator.out.append("NAF: " + naf_file_path);
159 nafPopulator.out.append(stats);
160 nafPopulator.out.append("\n");
161 nafPopulator.out.flush();
162 }
163 nafPopulator.updatestats(mn.getValue().getStats());
164 nafPopulator.mentionFile.append(mn.getValue().getNewsResource()
165 .toString(Data.getNamespaceMap(), true)
166 + "\n");
167 nafPopulator.mentionFile.append(mn.getValue().getNaf().toString(Data.getNamespaceMap(), true)
168 + "\n");
169 for (Record mnMen : mn.getValue().getMentions().values()) {
170 nafPopulator.mentionFile.append(mnMen.toString(Data.getNamespaceMap(), true) + "\n");
171 }
172
173 }
174 if (nafPopulator.out != null) {
175 nafPopulator.out.flush();
176 }
177 nafPopulator.mentionFile.flush();
178 mentions.clear();
179
180 }
181 }