public SequenceFileRecordReader(Configuration conf, FileSplit split)
throws IOException {
Path path = split.getPath();
FileSystem fs = path.getFileSystem(conf);
this.in = new SequenceFile.Reader(fs, path, conf);
this.end = split.getStart() + split.getLength();
this.conf = conf;
if (split.getStart() > in.getPosition()) {
in.sync(split.getStart()); // sync to start //注意这里的sync函数,这里根据split的起始位置,找出SequenceFile类型文件的合适的key V对的开始读取位置,要滑过相应的分隔符等位移,最终定位in.postion, 实际上是DFSClient中的postion.
}
this.start = in.getPosition();
more = start < end; //这里不满足位置条件的话,后面的next(k,v)函数自然就不读了
}
换句话说, 这里读取时,如果同步函数执行后,SequenceFileRecordReader的位置不匹配,则该分片就不会读取了.
另外, SequenceFile和MapFile一样,都是不能追加记录的文件!!!!!!!!!!!!
增加的过滤SequenceFile无效分片的函数:
public InputSplit[] filerSplits(ArrayList<InputSplit> splits,
BSPJob job) {
Counters counters = new Counters();
List<InputSplit> fileSplits = new ArrayList<InputSplit>();
StringBuffer sb = new StringBuffer() ;
try {
for (InputSplit split : splits) {
FileSplit sp = (FileSplit) split;
RecordReader<Text, Text> in;
//默认 Reader是SequenceFileRecordReader
in = new TrackedRecordReader<Text, Text>(job.getInputFormat()
.getRecordReader(sp, job), getCounter(counters,
BSPJobClient.PeerCounter.TASK_INPUT_RECORDS),
getCounter(counters,
BSPJobClient.PeerCounter.IO_BYTES_READ));
SequenceFileRecordReader sfreader = (SequenceFileRecordReader) job
.getInputFormat().getRecordReader(sp, job);
if (sfreader.getIfHasRecord())
fileSplits.add(sp);
}
RecordReader<Text, Text> rr;
for (InputSplit is : fileSplits) {
FileSplit fs = (FileSplit) is;
rr = new TrackedRecordReader<Text, Text>(
job.getInputFormat().getRecordReader(fs, job),
getCounter(counters,
BSPJobClient.PeerCounter.TASK_INPUT_RECORDS),
getCounter(counters,
BSPJobClient.PeerCounter.IO_BYTES_READ));
Text k = rr.createKey();
Text v = rr.createValue();
if(rr.next(k, v))
{
String startKey = k.toString() ;
sb.append(startKey) ;
LOG.info(" Key:" + k + " Value:" + v);
}
while (rr.next(k, v)) {
LOG.info(" Key:" + k + " Value:" + v);
}
String lastKey = ((TrackedRecordReader<Text, Text>) rr).getLastKey().toString() ;
sb.append("-").append(lastKey).append(",");
LOG.info(" Next Split");
}
} catch (IOException e) {
e.printStackTrace();
}
jobSubmitClient.setIndex(sb.toString());
return fileSplits.toArray(new InputSplit[fileSplits.size()]);
}
分享到:
相关推荐
抗拔桩轴力分布函数和位移分布函数间的关系模型,许宏发,,为了从荷载传递微分方程导出具有实际意义的非线性解析解,建立一种简单的沿桩长轴力分布函数和位移分布函数之间的关系模型是非常
在使用ansys进行分析后,需要批量提取某些节点的位移数据,可以将这些节点写到txt文件中,然后通过编写循环来提取某时间段内的位移数据。同样原理可以扩展到提取杆件应力等问题,但是循环提取的过程是比较慢的,如果...
在小变形假定下推导了加劲十字形轴压杆考虑初始的扭转位移函数解,讨论了解的形态及其统一表达式,并与有限元进行了对比验证。分析表明,理论解与有限元结果吻合较好,当轴力小于0.4Afy时两者基本没有误差;轴力与构件跨...
石家庄市2019-2020学年人教版高中物理必修一1.2时间和位移同步练习.pdf
2019-2020学年人教版高中物理必修一1.2时间和位移同步练习D卷.pdf
2019-2020学年人教版高中物理必修一1.2时间和位移同步练习C卷.pdf
2019-2020学年人教版高中物理必修一1.2时间和位移同步练习A卷.pdf
电信设备-便捷式大位移同步移动管道支架.zip
青海省2019-2020学年人教版高中物理必修一1.2时间和位移同步练习-.pdf
山西省2019-2020学年人教版高中物理必修一1.2时间和位移同步练习-.pdf
哈尔滨市2019-2020学年人教版高中物理必修一1.2时间和位移同步练习-.pdf
长春市2019-2020学年人教版高中物理必修一1.2时间和位移同步练习-.pdf
沈阳市2019-2020学年人教版高中物理必修一1.2时间和位移同步练习-.pdf
2019-2020学年人教版高中物理必修一1.2时间和位移同步练习(II)卷.pdf
宁夏回族自治区2019-2020学年人教版高中物理必修一1.2时间和位移同步练习-.pdf
基于FPGA的双路同步角位移数据采集系统设计.pdf
黑龙江省人教版物理高一必修1第一章第二节时间和位移同步练习-.pdf
吉林省人教版物理高一必修1第一章第二节时间和位移同步练习-.pdf
位移函数应用,合并表操作及对象应用.sql
程序用于根据变形前后点的坐标计算刚体位移值