1 package eu.fbk.knowledgestore.datastore.hbase;
2
3 import java.io.IOException;
4 import java.util.Collections;
5 import java.util.Iterator;
6 import java.util.List;
7 import java.util.Set;
8
9 import javax.annotation.Nullable;
10
11 import com.google.common.base.Preconditions;
12 import com.google.common.base.Throwables;
13 import com.google.common.collect.AbstractIterator;
14 import com.google.common.collect.ImmutableList;
15 import com.google.common.collect.Iterables;
16 import com.google.common.collect.Lists;
17
18 import org.openrdf.model.URI;
19
20 import eu.fbk.knowledgestore.data.Record;
21 import eu.fbk.knowledgestore.datastore.hbase.utils.AbstractHBaseUtils;
22
23
24
25
26
27 public class HBaseIterator extends AbstractIterator<Record> {
28
29
30 private static final int BATCH_SIZE = 100;
31
32
33 private final AbstractHBaseUtils hbaseUtils;
34
35
36 private final String tableName;
37
38
39 private final URI[] properties;
40
41
42 private final Iterator<URI> idIterator;
43
44
45 private Iterator<Record> recordIterator;
46
47
48
49
50
51
52
53
54
55
56
57
58
59 @SuppressWarnings("unchecked")
60 public HBaseIterator(final AbstractHBaseUtils hbaseUtils, final String tableName,
61 final Set<? extends URI> ids, @Nullable final Set<? extends URI> properties) {
62
63 Preconditions.checkNotNull(hbaseUtils);
64 Preconditions.checkNotNull(tableName);
65 Preconditions.checkNotNull(ids);
66
67 this.hbaseUtils = hbaseUtils;
68 this.tableName = tableName;
69 this.properties = properties == null ? null : Iterables.toArray(properties, URI.class);
70 this.idIterator = (Iterator<URI>) ImmutableList.copyOf(ids).iterator();
71 this.recordIterator = Collections.emptyIterator();
72 }
73
74 @Override
75 protected Record computeNext() {
76 while (true) {
77
78 if (this.recordIterator.hasNext()) {
79 return this.recordIterator.next();
80 }
81
82
83 final List<URI> ids = Lists.newArrayListWithCapacity(BATCH_SIZE);
84 while (this.idIterator.hasNext() && ids.size() < BATCH_SIZE) {
85 ids.add(this.idIterator.next());
86 }
87
88
89 if (ids.isEmpty()) {
90 return endOfData();
91 }
92
93
94 final List<Record> records;
95 try {
96 records = this.hbaseUtils.get(this.tableName, ids);
97 } catch (final IOException ex) {
98 throw Throwables.propagate(ex);
99 }
100
101
102 if (this.properties != null) {
103 for (int i = 0; i < records.size(); ++i) {
104 records.set(i, records.get(i).retain(this.properties));
105 }
106 }
107
108
109 this.recordIterator = records.iterator();
110 }
111 }
112
113 }