1 package eu.fbk.knowledgestore.datastore.hbase;
2
3 import java.io.Closeable;
4 import java.io.IOException;
5 import java.util.Iterator;
6
7 import javax.annotation.Nullable;
8
9 import com.google.common.base.Preconditions;
10 import com.google.common.collect.AbstractIterator;
11 import com.google.common.collect.Iterables;
12
13 import org.apache.hadoop.hbase.client.Result;
14 import org.apache.hadoop.hbase.client.ResultScanner;
15 import org.apache.hadoop.hbase.client.Scan;
16 import org.openrdf.model.URI;
17 import org.slf4j.Logger;
18 import org.slf4j.LoggerFactory;
19
20 import eu.fbk.knowledgestore.data.Record;
21 import eu.fbk.knowledgestore.data.XPath;
22 import eu.fbk.knowledgestore.datastore.hbase.utils.AbstractHBaseUtils;
23 import eu.fbk.knowledgestore.datastore.hbase.utils.AvroSerializer;
24 import eu.fbk.knowledgestore.datastore.hbase.utils.HBaseFilter;
25
26
27
28
29
30 public class HBaseScanIterator extends AbstractIterator<Record> implements Closeable {
31
32
33 private static final Logger LOGGER = LoggerFactory.getLogger(HBaseScanIterator.class);
34
35
36 @Nullable
37 private final XPath condition;
38
39
40 @Nullable
41 private final URI[] properties;
42
43
44 @Nullable
45 private final ResultScanner scanner;
46
47
48 private final Iterator<Result> hbaseIterator;
49
50
51 private final AvroSerializer serializer;
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71 public HBaseScanIterator(final AbstractHBaseUtils hbaseUtils, final String tableName,
72 final String familyName, @Nullable final XPath condition,
73 @Nullable final Iterable<? extends URI> properties, final boolean localFiltering)
74 throws IOException {
75
76
77 Preconditions.checkNotNull(hbaseUtils);
78 Preconditions.checkNotNull(tableName);
79 Preconditions.checkNotNull(familyName);
80
81
82 final Scan scan = hbaseUtils.getScan(tableName, familyName);
83 if (condition != null && !localFiltering) {
84 scan.setFilter(new HBaseFilter(condition, hbaseUtils.getSerializer()));
85 }
86
87
88 final ResultScanner scanner = hbaseUtils.getScanner(tableName, scan);
89
90
91 this.condition = localFiltering ? condition : null;
92 this.properties = properties == null ? null : Iterables.toArray(properties, URI.class);
93 this.serializer = hbaseUtils.getSerializer();
94 this.scanner = scanner;
95 this.hbaseIterator = this.scanner.iterator();
96 }
97
98 @Override
99 protected Record computeNext() {
100
101 try {
102
103 while (this.hbaseIterator.hasNext()) {
104
105
106 final byte[] bytes = this.hbaseIterator.next().value();
107
108
109 Record record;
110 try {
111 record = (Record) this.serializer.fromBytes(bytes);
112 } catch (final Throwable ex) {
113 LOGGER.error("discarded record with avroBytes \"" + bytes
114 + ", " + ex.toString());
115 continue;
116 }
117
118
119 if (this.condition != null) {
120 final boolean matches = this.condition.evalBoolean(record);
121 if (!matches) {
122 continue;
123 }
124 }
125
126
127 if (this.properties != null) {
128 record = record.retain(this.properties);
129 }
130
131
132 return record;
133 }
134
135
136 return endOfData();
137
138 } catch (Exception e) {
139 LOGGER.warn("ignored Exception |" + e.toString() + "| and returned");
140 return null;
141 }
142 }
143
144
145
146
147 @Override
148 public void close() {
149 if (this.scanner != null) {
150 LOGGER.debug("Closing HBaseScanIterator");
151 this.scanner.close();
152 }
153 }
154
155 }