阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Hadoop中SequenceFile的使用

191次阅读
没有评论

共计 12532 个字符,预计需要花费 32 分钟才能阅读完成。

1. 对于某些应用而言,需要特殊的数据结构来存储自己的数据。对于基于 MapReduce 的数据处理,将每个二进制数据的大对象融入自己的文件中并不能实现很高的可扩展性,针对上述情况,Hadoop 开发了一组更高层次的容器 SequenceFile。

2. 考虑日志文件,其中每一条日志记录是一行文本。如果想记录二进制类型,纯文本是不合适的。这种情况下,Hadoop 的 SequenceFile 类非常合适,因为上述提供了二进制键 / 值对的永久存储的数据结构。当作为日志文件的存储格式时,可以自己选择键,比如由 LongWritable 类型表示的时间戳,以及值可以是 Writable 类型,用于表示日志记录的数量。SequenceFile 同样为可以作为小文件的容器。而 HDFS 和 MapReduce 是针对大文件进行优化的,所以通过 SequenceFile 类型将小文件包装起来,可以获得更高效率的存储和处理。

3. SequenceFile 类内部有两个比较主要的内部类分别是 SequenceFile.Reader 和 SequenceFile.Writer

Hadoop 小文件操作之 SequenceFile  http://www.linuxidc.com/Linux/2012-03/56218.htm

SequenceFile.Reader

通过 createWriter()静态方法可以创建 SequenceFile 对象,并返 SequenceFile.Writer 实例。该静态方法有多个重载版本,但都需要指定待写入的数据流(FSDataOutputStream 或 FileSystem 对象和 Path 对象),Configuration 对象,以及键和值的类型。另外可选参数包括压缩类型以及相应的 codec,Progressable 回调函数用于通知写入的进度,以及在 SequenceFile 头文件中存储的 Metadata 实例。存储在 SequenceFile 中的键和值对并不一定是 Writable 类型。任意可以通过 Serialization 类实现序列化和反序列化的类型均可被使用。一旦拥有 SequenceFile.Writer 实例,就可以通过 append()方法在文件末尾附件键 / 值对。

SequenceFile.Writer

创建 SequenceFile.Writer 可以通过调用本身的构造函数 SequenceFile.Reader(FileSystem fs, Path file, Configuration conf) 来构造实例对象,从头到尾读取顺序文件的过程是创建 SequenceFile.Reader 实例后反复调用 next()方法迭代读取记录的过程。读取的是哪条记录与你使用的序列化框架相关。如果使用的是 Writable 类型,那么通过键和值作为参数的 Next()方法可以将数据流中的下一条键值对读入变量中:

public boolean next(Writable key,Writable val),如果键值对成功读取,则返回 true,如果已读到文件末尾,则返回 false。

————————————– 分割线 ————————————–

Ubuntu 13.04 上搭建 Hadoop 环境 http://www.linuxidc.com/Linux/2013-06/86106.htm

Ubuntu 12.10 +Hadoop 1.2.1 版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm

Ubuntu 上搭建 Hadoop 环境(单机模式 + 伪分布模式)http://www.linuxidc.com/Linux/2013-01/77681.htm

Ubuntu 下 Hadoop 环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm

单机版搭建 Hadoop 环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm

Hadoop 中 HDFS 和 MapReduce 节点基本简介 http://www.linuxidc.com/Linux/2013-09/89653.htm

《Hadoop 实战》中文版 + 英文文字版 + 源码【PDF】http://www.linuxidc.com/Linux/2012-10/71901.htm

Hadoop: The Definitive Guide【PDF 版】http://www.linuxidc.com/Linux/2012-01/51182.htm

————————————– 分割线 ————————————–

具体示例代码如下所示:

import java.io.IOException;
import java.net.URI;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;

public class sequence {
    /**
    * @param args
    */
    public static  FileSystem fs;
    public static final String Output_path=”/home/hadoop/test/A.txt”;
    public static Random random=new Random();
    private static final String[] DATA={
          “One,two,buckle my shoe”,
          “Three,four,shut the door”,
          “Five,six,pick up sticks”,
          “Seven,eight,lay them straight”,
          “Nine,ten,a big fat hen”
        };
    public static Configuration conf=new Configuration();
    public static void write(String pathStr) throws IOException{
        Path path=new Path(pathStr);
        FileSystem fs=FileSystem.get(URI.create(pathStr), conf);
        SequenceFile.Writer writer=SequenceFile.createWriter(fs, conf, path, Text.class, IntWritable.class);
        Text key=new Text();
        IntWritable value=new IntWritable();
        for(int i=0;i<DATA.length;i++){
            key.set(DATA[i]);
            value.set(random.nextInt(10));
            System.out.println(key);
            System.out.println(value);
         
            System.out.println(writer.getLength());
            writer.append(key, value);
         
        }
        writer.close();
    }
    public static void read(String pathStr) throws IOException{
        FileSystem fs=FileSystem.get(URI.create(pathStr), conf);
        SequenceFile.Reader reader=new SequenceFile.Reader(fs, new Path(pathStr), conf);
        Text key=new Text();
        IntWritable value=new IntWritable();
        while(reader.next(key, value)){
            System.out.println(key);
            System.out.println(value);
        }
    }
 
    public static void main(String[] args) throws IOException {
        // TODO Auto-generated method stub
        write(Output_path);
        read(Output_path);
    } 
}

更多详情见请继续阅读下一页的精彩内容 :http://www.linuxidc.com/Linux/2014-10/107989p2.htm

如果需要在 mapreduce 中进行 SequenceFile 的读取和写入,则需要到 SequcenFileInputFormat 和 SequenceFileOutputFormat,示例代码如下所示:

1)输出格式为 SequenceFileOutputFormat

public class SequenceFileOutputFormatDemo extends Configured implements Tool {
    public static class SequenceFileOutputFormatDemoMapper extends
            Mapper<LongWritable, Text, LongWritable, Text> {
        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            context.write(key, value);
        }
    }

    public static void main(String[] args) throws Exception {
        int nRet = ToolRunner.run(new Configuration(),
                new SequenceFileOutputFormatDemo(), args);
        System.out.println(nRet);
    }
    @Override
    public int run(String[] args) throws Exception {
        // TODO Auto-generated method stub
        Configuration conf = getConf();
        Job job = new Job(conf, “sequence file output demo “);
        job.setJarByClass(SequenceFileOutputFormatDemo.class);
        FileInputFormat.addInputPaths(job, args[0]);
        HdfsUtil.deleteDir(args[1]);
        job.setMapperClass(SequenceFileOutputFormatDemoMapper.class);
        // 因为没有 reducer,所以 map 的输出为 job 的最后输出,所以需要把 outputkeyclass
        // outputvalueclass 设置为与 map 的输出一致
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(Text.class);
        // 如果不希望有 reducer,设置为 0
        job.setNumReduceTasks(0);
        // 设置输出类
        job.setOutputFormatClass(SequenceFileOutputFormat.class);
        // 设置 sequecnfile 的格式,对于 sequencefile 的输出格式,有多种组合方式,
        // 从下面的模式中选择一种,并将其余的注释掉
        // 组合方式 1:不压缩模式
        SequenceFileOutputFormat.setOutputCompressionType(job,
                CompressionType.NONE);

        // 组合方式 2:record 压缩模式,并指定采用的压缩方式:默认、gzip 压缩等
//        SequenceFileOutputFormat.setOutputCompressionType(job,
//                CompressionType.RECORD);
//        SequenceFileOutputFormat.setOutputCompressorClass(job,
//                DefaultCodec.class);

        // 组合方式 3:block 压缩模式,并指定采用的压缩方式:默认、gzip 压缩等
//        SequenceFileOutputFormat.setOutputCompressionType(job,
//                CompressionType.BLOCK);
//        SequenceFileOutputFormat.setOutputCompressorClass(job,
//                DefaultCodec.class);
        SequenceFileOutputFormat.setOutputPath(job, new Path(args[1]));
        int result = job.waitForCompletion(true) ? 0 : 1;
        return result;
    }
}

 

 

2)输入格式为 SequcenFileInputFormat

public class SequenceFileInputFormatDemo extends Configured implements Tool {
    public static class SequenceFileInputFormatDemoMapper extends
            Mapper<LongWritable, Text, Text, NullWritable> {

        public void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            System.out.println(“key:  ” + key.toString() + ”  ;  value: “
                    + value.toString());
        }

    }

    public static void main(String[] args) throws Exception {

        int nRet = ToolRunner.run(new Configuration(),
                new SequenceFileInputFormatDemo(), args);
        System.out.println(nRet);
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        Job job = new Job(conf, “sequence file input demo”);
        job.setJarByClass(SequenceFileInputFormatDemo.class);
        FileInputFormat.addInputPaths(job, args[0]);
        HdfsUtil.deleteDir(args[1]);
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setMapperClass(SequenceFileInputFormatDemoMapper.class);
        job.setNumReduceTasks(1);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        job.setInputFormatClass(SequenceFileInputFormat.class);
        int result = job.waitForCompletion(true) ? 0 : 1;
        return result;
    }
}

 

或者读取的时候也可以如下面的方式进行读取,但是此时输出格式就为普通 FileOutputFormat 了,输入格式也为普通 FileInputFormat 了。示例代码如下面所示:

public class MapReduceReadFile {

private static SequenceFile.Reader reader = null;
private static Configuration conf = new Configuration();

public static class ReadFileMapper extends
Mapper<LongWritable, Text, LongWritable, Text> {

/* (non-Javadoc)
* @see org.apache.Hadoop.mapreduce.Mapper#map(KEYIN, VALUEIN, org.apache.hadoop.mapreduce.Mapper.Context)
*/
@Override
public void map(LongWritable key, Text value, Context context) {
key = (LongWritable) ReflectionUtils.newInstance(
reader.getKeyClass(), conf);
value = (Text) ReflectionUtils.newInstance(
reader.getValueClass(), conf);
try {
while (reader.next(key, value)) {
System.out.printf(“%s\t%s\n”, key, value);
context.write(key, value);
}
} catch (IOException e1) {
e1.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* @param args
* @throws IOException
* @throws InterruptedException
* @throws ClassNotFoundException
*/
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {

Job job = new Job(conf,”read seq file”);
job.setJarByClass(MapReduceReadFile.class);
job.setMapperClass(ReadFileMapper.class);
job.setMapOutputValueClass(Text.class);
Path path = new Path(“logfile2”);
FileSystem fs = FileSystem.get(conf);
reader = new SequenceFile.Reader(fs, path, conf);
FileInputFormat.addInputPath(job, path);
FileOutputFormat.setOutputPath(job, new Path(“result”));
System.exit(job.waitForCompletion(true)?0:1);
}
}

更多 Hadoop 相关信息见 Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13

1. 对于某些应用而言,需要特殊的数据结构来存储自己的数据。对于基于 MapReduce 的数据处理,将每个二进制数据的大对象融入自己的文件中并不能实现很高的可扩展性,针对上述情况,Hadoop 开发了一组更高层次的容器 SequenceFile。

2. 考虑日志文件,其中每一条日志记录是一行文本。如果想记录二进制类型,纯文本是不合适的。这种情况下,Hadoop 的 SequenceFile 类非常合适,因为上述提供了二进制键 / 值对的永久存储的数据结构。当作为日志文件的存储格式时,可以自己选择键,比如由 LongWritable 类型表示的时间戳,以及值可以是 Writable 类型,用于表示日志记录的数量。SequenceFile 同样为可以作为小文件的容器。而 HDFS 和 MapReduce 是针对大文件进行优化的,所以通过 SequenceFile 类型将小文件包装起来,可以获得更高效率的存储和处理。

3. SequenceFile 类内部有两个比较主要的内部类分别是 SequenceFile.Reader 和 SequenceFile.Writer

Hadoop 小文件操作之 SequenceFile  http://www.linuxidc.com/Linux/2012-03/56218.htm

SequenceFile.Reader

通过 createWriter()静态方法可以创建 SequenceFile 对象,并返 SequenceFile.Writer 实例。该静态方法有多个重载版本,但都需要指定待写入的数据流(FSDataOutputStream 或 FileSystem 对象和 Path 对象),Configuration 对象,以及键和值的类型。另外可选参数包括压缩类型以及相应的 codec,Progressable 回调函数用于通知写入的进度,以及在 SequenceFile 头文件中存储的 Metadata 实例。存储在 SequenceFile 中的键和值对并不一定是 Writable 类型。任意可以通过 Serialization 类实现序列化和反序列化的类型均可被使用。一旦拥有 SequenceFile.Writer 实例,就可以通过 append()方法在文件末尾附件键 / 值对。

SequenceFile.Writer

创建 SequenceFile.Writer 可以通过调用本身的构造函数 SequenceFile.Reader(FileSystem fs, Path file, Configuration conf) 来构造实例对象,从头到尾读取顺序文件的过程是创建 SequenceFile.Reader 实例后反复调用 next()方法迭代读取记录的过程。读取的是哪条记录与你使用的序列化框架相关。如果使用的是 Writable 类型,那么通过键和值作为参数的 Next()方法可以将数据流中的下一条键值对读入变量中:

public boolean next(Writable key,Writable val),如果键值对成功读取,则返回 true,如果已读到文件末尾,则返回 false。

————————————– 分割线 ————————————–

Ubuntu 13.04 上搭建 Hadoop 环境 http://www.linuxidc.com/Linux/2013-06/86106.htm

Ubuntu 12.10 +Hadoop 1.2.1 版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm

Ubuntu 上搭建 Hadoop 环境(单机模式 + 伪分布模式)http://www.linuxidc.com/Linux/2013-01/77681.htm

Ubuntu 下 Hadoop 环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm

单机版搭建 Hadoop 环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm

Hadoop 中 HDFS 和 MapReduce 节点基本简介 http://www.linuxidc.com/Linux/2013-09/89653.htm

《Hadoop 实战》中文版 + 英文文字版 + 源码【PDF】http://www.linuxidc.com/Linux/2012-10/71901.htm

Hadoop: The Definitive Guide【PDF 版】http://www.linuxidc.com/Linux/2012-01/51182.htm

————————————– 分割线 ————————————–

具体示例代码如下所示:

import java.io.IOException;
import java.net.URI;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;

public class sequence {
    /**
    * @param args
    */
    public static  FileSystem fs;
    public static final String Output_path=”/home/hadoop/test/A.txt”;
    public static Random random=new Random();
    private static final String[] DATA={
          “One,two,buckle my shoe”,
          “Three,four,shut the door”,
          “Five,six,pick up sticks”,
          “Seven,eight,lay them straight”,
          “Nine,ten,a big fat hen”
        };
    public static Configuration conf=new Configuration();
    public static void write(String pathStr) throws IOException{
        Path path=new Path(pathStr);
        FileSystem fs=FileSystem.get(URI.create(pathStr), conf);
        SequenceFile.Writer writer=SequenceFile.createWriter(fs, conf, path, Text.class, IntWritable.class);
        Text key=new Text();
        IntWritable value=new IntWritable();
        for(int i=0;i<DATA.length;i++){
            key.set(DATA[i]);
            value.set(random.nextInt(10));
            System.out.println(key);
            System.out.println(value);
         
            System.out.println(writer.getLength());
            writer.append(key, value);
         
        }
        writer.close();
    }
    public static void read(String pathStr) throws IOException{
        FileSystem fs=FileSystem.get(URI.create(pathStr), conf);
        SequenceFile.Reader reader=new SequenceFile.Reader(fs, new Path(pathStr), conf);
        Text key=new Text();
        IntWritable value=new IntWritable();
        while(reader.next(key, value)){
            System.out.println(key);
            System.out.println(value);
        }
    }
 
    public static void main(String[] args) throws IOException {
        // TODO Auto-generated method stub
        write(Output_path);
        read(Output_path);
    } 
}

更多详情见请继续阅读下一页的精彩内容 :http://www.linuxidc.com/Linux/2014-10/107989p2.htm

正文完
星哥玩云-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-20发表,共计12532字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中