package examples;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
public class RecorderWriter implements java.io.Closeable{
FSDataOutputStream out;
Long pos ;
Long blockSize ;
public RecorderWriter(FileSystem fs, Path name, Configuration conf)
{
try {
this.out = fs.create(name);
blockSize = conf.getLong("dfs.block.size", (long)2097152) ;
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void close() throws IOException {
if (out != null) {
out.flush();
out.close();
out = null;
}
}
public synchronized void append(Writable key, Writable val)
throws IOException {
long space = (long)Math.abs(out.getPos()%blockSize) ;
long spaceleft = blockSize - space ;
if(spaceleft<10&&space!=0){
int l = (int)spaceleft ;
for(int i =0 ;i<l;i++)
out.writeBytes(" ") ;
}
IntWritable k = (IntWritable)key ;
IntWritable v = (IntWritable)val ;
k.write(out);
out.writeBytes(" ") ;
v.write(out);
out.writeBytes("\n") ;
}
public synchronized long getLength() throws IOException {
return out.getPos();
}
}
package examples;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hama.bsp.LineRecordReader;
import org.apache.hama.bsp.RecordReader;
public class FileFormatConvert {
public static void write2NewFormatFile(Path inputFile, Path outputFile) {
//读取输入文件的所有记录,依次转化为SequenceFile格式输出
FileSystem hdfs;
RecordReader<LongWritable, Text> reader ;
RecorderWriter writer = null ;
conf.set("fs.default.name", "hdfs://localhost:9000/");
conf.setLong("dfs.block.size", (long)2097152) ;
try {
hdfs = (DistributedFileSystem) FileSystem.get(conf);
// writer = SequenceFile.createWriter(hdfs, conf, outputFile,
// Text.class, Text.class,CompressionType.NONE);
writer = new RecorderWriter(hdfs, outputFile,conf) ;
FSDataInputStream dis = hdfs.open(inputFile);
reader = new LineRecordReader(dis,0, hdfs.getFileStatus(inputFile).getLen() , conf) ;
LongWritable key = reader.createKey();
Text value = reader.createValue();
while(reader.next(key, value))
{
String[] keyValue = value.toString().split(" ") ;
IntWritable k = new IntWritable(Integer.parseInt(keyValue[0])) ;
IntWritable v = new IntWritable(Integer.parseInt(keyValue[1])) ;
writer.append(k,v);
}
} catch (IOException e) {
e.printStackTrace();
} finally {
IOUtils.closeStream(writer);
}
}
public static void main(String[] args) {
Path inputFile = new Path("hdfs://localhost:9000/test/test.txt") ;
Path outputFile = new Path("hdfs://localhost:9000/liuqiang2/test.seq") ;
write2NewFormatFile(inputFile, outputFile) ;
}
static Configuration conf = new Configuration();
}
分享到:
相关推荐
hdfs文件的查看 hdfs fs -cat /文件名
Spark自定义RDD从HDFS读取数据,实现和sc.textFile相同功能,代码测试通过,可以根据需求避免数据源数据倾斜
hdfs文件的下载
hdfs 文件的上传,hdfs fs -put /文件名
本文档是关于hadoop中HDFS的文件读写操作的一份ppt,适用于学习hadoop新手.
HDFS文件系统,大数据资源文件
Java管理hdfs文件和文件夹的工具类,最近版本hadoop2.4。
本文档时Hadoop云计算平台下运用Hadoop API对HDFS进行相关的操作,详细的记录了程序在eclipse中的应用,代码内容和运行结果,是学习Hadoop非常有用的资料。
文档详细的讲述了Hadoop中HDFS文件操作命令和HDFS编程
hdfs文件传输调优,hdfs文件传输调优 hdfs文件传输调优
storm-hdfs, 用于与HDFS文件系统交互的风暴组件 风暴 HDFS用于与HDFS文件系统交互的风暴组件用法以下示例将在每 1,000个元组同步后将管道("|") -delimited文件写入HDFS路径 hdfs://localhost:54310/foo.,使它的对...
python解析hdfs文件内容生成本地文件、及相关插件包安装实现方式
Hadoop hdfs文件操作,单词统计MR代码 demo,topN求解编程代码 demo
若输出结果为0,则说明文件或目录存在,若为1,则说明文件或目录不存在。 二、Java代码实现 import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs....
主要介绍了Python连接HDFS实现文件上传下载及Pandas转换文本文件到CSV操作,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
java对大数据HDFS文件操作jar包和maven2个项目,有jar包,有依赖,2个项目呦
java 读写 hdfs文件系统例子(包括权限设置);java 读写 hdfs文件系统例子(包括权限设置)java 读写 hdfs文件系统例子(包括权限设置)
3、HDFS的使用(读写、上传、下载、遍历、查找文件、整个目录拷贝、只拷贝文件、列出文件夹下文件、删除文件及目录、获取文件及文件夹属性等)-java 网址:...
hadoop及eclipse操作HDFS需要文件、、
上传文件到HDFS的java代码实现。已经测试了,可以直接运行。