`
cloudeagle_bupt
  • 浏览: 537704 次
文章分类
社区版块
存档分类
最新评论

自定义记录格式输出HDFS文件

 
阅读更多
package examples;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;

public class RecorderWriter  implements java.io.Closeable{

    FSDataOutputStream out;
    Long pos ;
    Long blockSize ;
    
	public RecorderWriter(FileSystem fs, Path name,  Configuration conf)
	{
		try {
			this.out = fs.create(name);
			blockSize = conf.getLong("dfs.block.size", (long)2097152) ;
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	
	@Override
	public void close() throws IOException {
	      if (out != null) {
	    	  out.flush();
	          out.close();
	          out = null;
	      }
	}
    
    public synchronized void append(Writable key, Writable val)
    	      throws IOException {
    	      long space =  (long)Math.abs(out.getPos()%blockSize) ;
    	      long spaceleft = blockSize - space ;
    	      
    	      if(spaceleft<10&&space!=0){
    	    	  int l = (int)spaceleft ;
    	    	  for(int i =0 ;i<l;i++)
    	    		  out.writeBytes(" ") ;
    	      }
    	      
    	      IntWritable k = (IntWritable)key ;
    	      IntWritable v = (IntWritable)val ;
    	      k.write(out);
    	      out.writeBytes(" ") ;
    	      v.write(out);
    	      out.writeBytes("\n") ;
    	    }
    
    public synchronized long getLength() throws IOException {
        return out.getPos();
      }
}



package examples;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hama.bsp.LineRecordReader;
import org.apache.hama.bsp.RecordReader;

public class FileFormatConvert {
	
	 public static void write2NewFormatFile(Path inputFile, Path outputFile) {  
		    //读取输入文件的所有记录,依次转化为SequenceFile格式输出
		    FileSystem hdfs;
	        RecordReader<LongWritable, Text> reader ;
	        RecorderWriter writer = null ;
	        conf.set("fs.default.name", "hdfs://localhost:9000/");
	        conf.setLong("dfs.block.size", (long)2097152)  ;
	        try {  
	           	hdfs = (DistributedFileSystem) FileSystem.get(conf);
//		        writer = SequenceFile.createWriter(hdfs, conf, outputFile,  
//	            		Text.class, Text.class,CompressionType.NONE); 
	           	writer = new RecorderWriter(hdfs, outputFile,conf) ;
	 
		        FSDataInputStream dis = hdfs.open(inputFile);
		        reader  = new LineRecordReader(dis,0,  hdfs.getFileStatus(inputFile).getLen() , conf) ;
		        LongWritable key = reader.createKey();
		        Text value = reader.createValue();
		        while(reader.next(key, value))
		        {
		        	String[] keyValue = value.toString().split(" ") ;
		        	IntWritable k = new IntWritable(Integer.parseInt(keyValue[0])) ;
		        	IntWritable v = new IntWritable(Integer.parseInt(keyValue[1])) ;
		        	writer.append(k,v); 
		        }
	        } catch (IOException e) {  
	            e.printStackTrace();  
	        } finally {  
	            IOUtils.closeStream(writer);  
	        }  
	    }  
	
	public static void main(String[] args) {
		Path inputFile = new Path("hdfs://localhost:9000/test/test.txt") ;
		Path outputFile = new Path("hdfs://localhost:9000/liuqiang2/test.seq") ;
		write2NewFormatFile(inputFile, outputFile) ;
  	}

	static Configuration conf = new Configuration();
}

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics