`
cloudeagle_bupt
  • 浏览: 538806 次
文章分类
社区版块
存档分类
最新评论

SequenceFileRecordReader中的文件位移同步函数

 
阅读更多
public SequenceFileRecordReader(Configuration conf, FileSplit split)
throws IOException {
Path path = split.getPath();
FileSystem fs = path.getFileSystem(conf);
this.in = new SequenceFile.Reader(fs, path, conf);
this.end = split.getStart() + split.getLength();
this.conf = conf;


if (split.getStart() > in.getPosition()) {
in.sync(split.getStart()); // sync to start //注意这里的sync函数,这里根据split的起始位置,找出SequenceFile类型文件的合适的key V对的开始读取位置,要滑过相应的分隔符等位移,最终定位in.postion, 实际上是DFSClient中的postion.
}

this.start = in.getPosition();
more = start < end; //这里不满足位置条件的话,后面的next(k,v)函数自然就不读了

}


换句话说, 这里读取时,如果同步函数执行后,SequenceFileRecordReader的位置不匹配,则该分片就不会读取了.

另外, SequenceFile和MapFile一样,都是不能追加记录的文件!!!!!!!!!!!!



增加的过滤SequenceFile无效分片的函数:

public InputSplit[] filerSplits(ArrayList<InputSplit> splits,
BSPJob job) {
Counters counters = new Counters();
List<InputSplit> fileSplits = new ArrayList<InputSplit>();
StringBuffer sb = new StringBuffer() ;
try {
for (InputSplit split : splits) {
FileSplit sp = (FileSplit) split;
RecordReader<Text, Text> in;

//默认 Reader是SequenceFileRecordReader
in = new TrackedRecordReader<Text, Text>(job.getInputFormat()
.getRecordReader(sp, job), getCounter(counters,
BSPJobClient.PeerCounter.TASK_INPUT_RECORDS),
getCounter(counters,
BSPJobClient.PeerCounter.IO_BYTES_READ));


SequenceFileRecordReader sfreader = (SequenceFileRecordReader) job
.getInputFormat().getRecordReader(sp, job);
if (sfreader.getIfHasRecord())
fileSplits.add(sp);
}

RecordReader<Text, Text> rr;
for (InputSplit is : fileSplits) {
FileSplit fs = (FileSplit) is;
rr = new TrackedRecordReader<Text, Text>(
job.getInputFormat().getRecordReader(fs, job),
getCounter(counters,
BSPJobClient.PeerCounter.TASK_INPUT_RECORDS),
getCounter(counters,
BSPJobClient.PeerCounter.IO_BYTES_READ));


Text k = rr.createKey();
Text v = rr.createValue();
if(rr.next(k, v))
{
String startKey = k.toString() ;
sb.append(startKey) ;
LOG.info(" Key:" + k + " Value:" + v);
}
while (rr.next(k, v)) {
LOG.info(" Key:" + k + " Value:" + v);
}

String lastKey = ((TrackedRecordReader<Text, Text>) rr).getLastKey().toString() ;
sb.append("-").append(lastKey).append(",");
LOG.info(" Next Split");
}
} catch (IOException e) {
e.printStackTrace();
}
jobSubmitClient.setIndex(sb.toString());
return fileSplits.toArray(new InputSplit[fileSplits.size()]);
}

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics