1   package eu.fbk.knowledgestore.datastore.hbase;
2   
3   import java.io.Closeable;
4   import java.io.IOException;
5   import java.util.Iterator;
6   
7   import javax.annotation.Nullable;
8   
9   import com.google.common.base.Preconditions;
10  import com.google.common.collect.AbstractIterator;
11  import com.google.common.collect.Iterables;
12  
13  import org.apache.hadoop.hbase.client.Result;
14  import org.apache.hadoop.hbase.client.ResultScanner;
15  import org.apache.hadoop.hbase.client.Scan;
16  import org.openrdf.model.URI;
17  import org.slf4j.Logger;
18  import org.slf4j.LoggerFactory;
19  
20  import eu.fbk.knowledgestore.data.Record;
21  import eu.fbk.knowledgestore.data.XPath;
22  import eu.fbk.knowledgestore.datastore.hbase.utils.AbstractHBaseUtils;
23  import eu.fbk.knowledgestore.datastore.hbase.utils.AvroSerializer;
24  import eu.fbk.knowledgestore.datastore.hbase.utils.HBaseFilter;
25  
26  
27  
28  
29  
30  public class HBaseScanIterator extends AbstractIterator<Record> implements Closeable {
31  
32      
33      private static final Logger LOGGER = LoggerFactory.getLogger(HBaseScanIterator.class);
34  
35      
36      @Nullable
37  	private final XPath condition;
38  
39      
40      @Nullable
41  	private final URI[] properties;
42  
43      
44      @Nullable
45  	private final ResultScanner scanner;
46  
47      
48      private final Iterator<Result> hbaseIterator;
49  
50      
51      private final AvroSerializer serializer;
52  
53      
54  
55  
56  
57  
58  
59  
60  
61  
62  
63  
64  
65  
66  
67  
68  
69  
70  
71      public HBaseScanIterator(final AbstractHBaseUtils hbaseUtils, final String tableName,
72  			     final String familyName, @Nullable final XPath condition,
73  			     @Nullable final Iterable<? extends URI> properties, final boolean localFiltering)
74  	throws IOException {
75  
76          
77          Preconditions.checkNotNull(hbaseUtils);
78          Preconditions.checkNotNull(tableName);
79          Preconditions.checkNotNull(familyName);
80  
81          
82          final Scan scan = hbaseUtils.getScan(tableName, familyName);
83          if (condition != null && !localFiltering) {
84              scan.setFilter(new HBaseFilter(condition, hbaseUtils.getSerializer()));
85          }
86  
87          
88          final ResultScanner scanner = hbaseUtils.getScanner(tableName, scan);
89  
90          
91          this.condition = localFiltering ? condition : null; 
92          this.properties = properties == null ? null : Iterables.toArray(properties, URI.class);
93          this.serializer = hbaseUtils.getSerializer();
94          this.scanner = scanner;
95          this.hbaseIterator = this.scanner.iterator();
96      }
97  
98      @Override
99  	protected Record computeNext() {
100 
101 	try {
102 	    
103 	    while (this.hbaseIterator.hasNext()) {
104 
105 		
106 		final byte[] bytes = this.hbaseIterator.next().value();
107 
108 		
109 		Record record;
110 		try {
111 		    record = (Record) this.serializer.fromBytes(bytes);
112 		} catch (final Throwable ex) {
113 		    LOGGER.error("discarded record with avroBytes \"" + bytes 
114 				 + ", " + ex.toString());
115 		    continue;
116 		}
117 
118 		
119 		if (this.condition != null) {
120 		    final boolean matches = this.condition.evalBoolean(record);
121 		    if (!matches) {
122 			continue;
123 		    }
124 		}
125 
126 		
127 		if (this.properties != null) {
128 		    record = record.retain(this.properties);
129 		}
130 
131 		
132 		return record;
133 	    }
134 
135 	    
136 	    return endOfData();
137 
138 	} catch (Exception e) {
139 	    LOGGER.warn("ignored Exception |" + e.toString() + "| and returned");
140 	    return null;
141 	}
142     }
143 
144     
145 
146 
147     @Override
148     public void close() {
149         if (this.scanner != null) {
150             LOGGER.debug("Closing HBaseScanIterator");
151             this.scanner.close();
152         }
153     }
154 
155 }