阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Hadoop处理HDF文件

182次阅读
没有评论

共计 15595 个字符,预计需要花费 39 分钟才能阅读完成。

1、前言

HDF 文件是遥感应用中一种常见的数据格式,由于其高度结构化的特点,笔者曾被如何使用 Hadoop 处理 HDF 文件这个问题困扰过相当长的一段时间。于是 Google 各种解决方案,但都没有找到一种理想的处理办法。也曾参考过 HDFGroup 官方发的一篇帖子(网址在这里),里面提供了使用 Hadoop 针对大、中、小 HDF 文件的处理思路。虽然根据他提供的解决办法,按图索骥,肯定能解决如何使用 Hadoop 处理 HDF 文件这个问题,但个人感觉方法偏复杂且需要对 HDF 的数据格式有较深的理解,实现起来不太容易。于是乎,笔者又继续寻找解决方案,终于发现了一种办法,下面将对该方法进行具体说明。

2、MapReduce 主程序

这里主要使用到了 netcdf 的库进行 hdf 数据流的反序列化工作(netcdf 库的下载地址)。与 HDF 官方提供的 Java 库不同,netcdf 仅利用 Java 进行 HDF 文件的读写操作,且这个库支持多种科学数据,包括 HDF4、HDF5 等多种格式。而 HDF 的官方 Java 库中,底层实际仍是用 C 进行 HDF 文件的操作。

CentOS 安装和配置 Hadoop2.2.0  http://www.linuxidc.com/Linux/2014-01/94685.htm

Ubuntu 13.04 上搭建 Hadoop 环境 http://www.linuxidc.com/Linux/2013-06/86106.htm

Ubuntu 12.10 +Hadoop 1.2.1 版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm

Ubuntu 上搭建 Hadoop 环境(单机模式 + 伪分布模式)http://www.linuxidc.com/Linux/2013-01/77681.htm

Ubuntu 下 Hadoop 环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm

单机版搭建 Hadoop 环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm

搭建 Hadoop 环境(在 Winodws 环境下用虚拟机虚拟两个 Ubuntu 系统进行搭建)http://www.linuxidc.com/Linux/2011-12/48894.htm

下面贴出 MapReduce 的 Mapper 函数代码:

package example;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.net.URI;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import ucar.ma2.ArrayShort;
import ucar.nc2.Dimension;
import ucar.nc2.Group;
import ucar.nc2.NetcdfFile;
import ucar.nc2.Variable;

public class ReadMapper extends
  Mapper<Text, BytesWritable, Text, BytesWritable> {

 public void map(Text key, BytesWritable value, Context context)
   throws IOException, InterruptedException {
  String fileName = key.toString();
  NetcdfFile file = NetcdfFile.openInMemory(“hdf4”, value.get());
  Group dataGroup = (file.findGroup(“MOD_Grid_monthly_1km_VI”)).findGroup(“Data_Fields”);
  // 读取到 1_km_monthly_red_reflectance 的变量
  Variable redVar = dataGroup.findVariable(“1_km_monthly_red_reflectance”);
  short[][] data = new short[1200][1200];
  if(dataGroup != null){
   ArrayShort.D2 dataArray;
   // 读取 redVar 中的影像数据
   dataArray = (ArrayShort.D2) redVar.read();
   List<Dimension> dimList = file.getDimensions();
   // 获取影像的 y 方向像元个数
   Dimension ydim = dimList.get(0);
   // 获取影像的 x 方向像元个数
   Dimension xdim = dimList.get(1);
   // 遍历整个影像,读取出像元的值
   for(int i=0;i<xdim.getLength();i++){
    for(int j=0;j<ydim.getLength();j++){
     data[i][j] = dataArray.get(i, j);     
    }   
   }         
  } 
  System.out.print(file.getDetailInfo());
 }
}

注意程序中的 NetcdfFile.openInMemory 方法,该静态方法支持从 byte[] 中构造 HDF 文件,从而实现了 HDF 文件的反序列化操作。下面贴出主程序的示例代码:

package example;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;

import example.WholeFileInputFormat;

public class ReadMain {
 public boolean runJob(String[] args) throws IOException,
   ClassNotFoundException, InterruptedException {
  Configuration conf = new Configuration();
  // conf.set(“mapred.job.tracker”, Utils.JOBTRACKER);
  String rootPath= “/opt/hadoop-2.3.0/etc/hadoop”;
  //String rootPath=”/opt/hadoop-2.3.0/etc/hadoop/”;
  conf.addResource(new Path(rootPath+”yarn-site.xml”));
  conf.addResource(new Path(rootPath+”core-site.xml”));
  conf.addResource(new Path(rootPath+”hdfs-site.xml”));
  conf.addResource(new Path(rootPath+”mapred-site.xml”));
  Job job = new Job(conf);

  job.setJobName(“Job name:” + args[0]);
  job.setJarByClass(ReadMain.class);

  job.setMapperClass(ReadMapper.class);
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(BytesWritable.class);
 
  job.setInputFormatClass(WholeFileInputFormat.class);
  job.setOutputFormatClass(NullOutputFormat.class);
  FileInputFormat.addInputPath(job, new Path(args[1]));
  FileOutputFormat.setOutputPath(job, new Path(args[2]));
  boolean flag = job.waitForCompletion(true);
  return flag;
 }

 public static void main(String[] args) throws ClassNotFoundException,
   IOException, InterruptedException {
  String[] inputPaths = new String[] {“normalizeJob”,
    “hdfs://192.168.168.101:9000/user/hduser/hdf/MOD13A3.A2005274.h00v10.005.2008079143041.hdf”,
    “hdfs://192.168.168.101:9000/user/hduser/test/” };
  ReadMain test = new ReadMain();
  test.runJob(inputPaths);
 }

}

关于 MapReduce 主程序有几点值得说明一下:

1、MapReduce 数据的输入格式为 WholeFileInputFormat.class,即不对数据进行切分。关于该格式,可以参考另外一篇文章:如何通过 Java 程序提交 Yarn 的计算任务(http://www.linuxidc.com/Linux/2014-11/109360.htm),这里不再赘述。

2、本人用的是 Yarn2.3.0 来执行计算任务,如果用老版本的 hadoop,如 1.2.0,则把以上主程序中的 conf.addResource 部分的代码删掉即可。

3、以上 MapReduce 程序中,只用到了 Map 函数,未设置 Reduce 函数。

4、以上程序用到的为 HDF4 格式的数据,按理说,HDF5 格式的数据应该也是支持的。

更多详情见请继续阅读下一页的精彩内容 :http://www.linuxidc.com/Linux/2014-11/109359p2.htm

3、HDF 数据的格式

由于 HDF 数据高度结构化,因此在 netcdf 库的使用中,需要使用类似于 ” 标签 ” 的方式来访问 HDF 中的具体数据。下面贴出 netcdf 中读出来的 HDF 数据的具体格式信息(即使用 file.getDetailInfo() 函数,打印出来的信息):

注意,ReadMapper 函数中出现的类似于“MOD_Grid_monthly_1km_VI”、”Data_Fields” 等信息,即根据以下 HDF 数据的格式信息得到的。

netcdf D:/2005-274/MOD13A3.A2005274.h00v08.005.2008079142757.hdf {
  variables:
    char StructMetadata.0(32000);

    char CoreMetadata.0(40874);

    char ArchiveMetadata.0(6530);

  group: MOD_Grid_monthly_1km_VI {
    variables:
      short _HDFEOS_CRS;
        :Projection = “GCTP_SNSOID”;
        :UpperLeftPointMtrs = -2.0015109354E7, 1111950.519667; // double
        :LowerRightMtrs = -1.8903158834333E7, -0.0; // double
        :ProjParams = 6371007.181, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0; // double
        :SphereCode = “-1”;

    group: Data_Fields {
      dimensions:
        YDim = 1200;
        XDim = 1200;
      variables:
        short 1_km_monthly_NDVI(YDim=1200, XDim=1200);
          :long_name = “1 km monthly NDVI”;
          :units = “NDVI”;
          :valid_range = -2000S, 10000S; // short
          :_FillValue = -3000S; // short
          :scale_factor = 10000.0; // double
          :scale_factor_err = 0.0; // double
          :add_offset = 0.0; // double
          :add_offset_err = 0.0; // double
          :calibrated_nt = 5; // int

        short 1_km_monthly_EVI(YDim=1200, XDim=1200);
          :long_name = “1 km monthly EVI”;
          :units = “EVI”;
          :valid_range = -2000S, 10000S; // short
          :_FillValue = -3000S; // short
          :scale_factor = 10000.0; // double
          :scale_factor_err = 0.0; // double
          :add_offset = 0.0; // double
          :add_offset_err = 0.0; // double
          :calibrated_nt = 5; // int

        short 1_km_monthly_VI_Quality(YDim=1200, XDim=1200);
          :_Unsigned = “true”;
          :long_name = “1 km monthly VI Quality”;
          :units = “bit field”;
          :valid_range = 0S, -2S; // short
          :_FillValue = -1S; // short
          :Legend = “\n\t Bit Fields Description (Right to Left): \n\t[0-1] : MODLAND_QA [2 bit range]\n\t\t 00: VI produced, good quality \n\t\t 01: VI produced, but check other QA \n\t\t 10: Pixel produced, but most probably cloudy \n\t\t 11: Pixel not produced due to other reasons than clouds \n\t[2-5] : VI usefulness [4 bit range]  \n\t\t 0000: Highest quality  \n\t\t 0001: Lower quality  \n\t\t 0010..1010: Decreasing quality  \n\t\t 1100: Lowest quality  \n\t\t 1101: Quality so low that it is not useful \n\t\t 1110: L1B data faulty \n\t\t 1111: Not useful for any other reason/not processed \n\t[6-7] : Aerosol quantity [2 bit range] \n\t\t 00: Climatology \n\t\t 01: Low \n\t\t 10: Average \n\t\t 11: High (11) \n\t[8] : Adjacent cloud detected; [1 bit range] \n\t\t 1: Yes \n\t\t 0: No \n\t[9] : Atmosphere BRDF correction performed [1 bit range] \n\t\t 1: Yes \n\t\t 0: No \n\t[10] : Mixed clouds  [1 bit range] \n\t\t 1: Yes \n\t\t 0: No \n\t[11-13] : Land/Water Flag [3 bit range]  \n\t\t 000: Shallow ocean \n\t\t 001: Land (Nothing else but land) \n\t\t 010: Ocean coastlines and lake shorelines \n\t\t 011: Shallow inland water \n\t\t 100: Ephemeral water \n\t\t 101: Deep inland water \n\t\t 110: Moderate or continental ocean \n\t\t 111: Deep ocean \n\t[14] : Possible snow/ice [1 bit range] \n\t\t 1: Yes \n\t\t 0: No \n\t[15] : Possible shadow [1 bit range] \n\t\t 1: Yes \n\t\t 0: No \n”;

        short 1_km_monthly_red_reflectance(YDim=1200, XDim=1200);
          :long_name = “1 km monthly red reflectance”;
          :units = “reflectance”;
          :valid_range = 0S, 10000S; // short
          :_FillValue = -1000S; // short
          :scale_factor = 10000.0; // double
          :scale_factor_err = 0.0; // double
          :add_offset = 0.0; // double
          :add_offset_err = 0.0; // double
          :calibrated_nt = 5; // int

        short 1_km_monthly_NIR_reflectance(YDim=1200, XDim=1200);
          :long_name = “1 km monthly NIR reflectance”;
          :units = “reflectance”;
          :valid_range = 0S, 10000S; // short
          :_FillValue = -1000S; // short
          :scale_factor = 10000.0; // double
          :scale_factor_err = 0.0; // double
          :add_offset = 0.0; // double
          :add_offset_err = 0.0; // double
          :calibrated_nt = 5; // int

        short 1_km_monthly_blue_reflectance(YDim=1200, XDim=1200);
          :long_name = “1 km monthly blue reflectance”;
          :units = “reflectance”;
          :valid_range = 0S, 10000S; // short
          :_FillValue = -1000S; // short
          :scale_factor = 10000.0; // double
          :scale_factor_err = 0.0; // double
          :add_offset = 0.0; // double
          :add_offset_err = 0.0; // double
          :calibrated_nt = 5; // int

        short 1_km_monthly_MIR_reflectance(YDim=1200, XDim=1200);
          :long_name = “1 km monthly MIR reflectance”;
          :units = “reflectance”;
          :valid_range = 0S, 10000S; // short
          :_FillValue = -1000S; // short
          :Legend = “\n\t The MIR band saved in the VI product is MODIS band 7 \n\t\t Bandwidth : 2105-2155 nm \n\t\t Band center: 2130 nm \n”;
          :scale_factor = 10000.0; // double
          :scale_factor_err = 0.0; // double
          :add_offset = 0.0; // double
          :add_offset_err = 0.0; // double
          :calibrated_nt = 5; // int

        short 1_km_monthly_view_zenith_angle(YDim=1200, XDim=1200);
          :long_name = “1 km monthly view zenith angle”;
          :units = “degrees”;
          :valid_range = -9000S, 9000S; // short
          :_FillValue = -10000S; // short
          :scale_factor = 100.0; // double
          :scale_factor_err = 0.0; // double
          :add_offset = 0.0; // double
          :add_offset_err = 0.0; // double
          :calibrated_nt = 5; // int

        short 1_km_monthly_sun_zenith_angle(YDim=1200, XDim=1200);
          :long_name = “1 km monthly sun zenith angle”;
          :units = “degrees”;
          :valid_range = -9000S, 9000S; // short
          :_FillValue = -10000S; // short
          :scale_factor = 100.0; // double
          :scale_factor_err = 0.0; // double
          :add_offset = 0.0; // double
          :add_offset_err = 0.0; // double
          :calibrated_nt = 5; // int

        short 1_km_monthly_relative_azimuth_angle(YDim=1200, XDim=1200);
          :long_name = “1 km monthly relative azimuth angle”;
          :units = “degrees”;
          :valid_range = -3600S, 3600S; // short
          :_FillValue = -4000S; // short
          :scale_factor = 10.0; // double
          :scale_factor_err = 0.0; // double
          :add_offset = 0.0; // double
          :add_offset_err = 0.0; // double
          :calibrated_nt = 5; // int

        byte 1_km_monthly_pixel_raliability(YDim=1200, XDim=1200);
          :long_name = “1 km monthly pixel raliability”;
          :units = “rank”;
          :valid_range = 0B, 3B; // byte
          :_FillValue = -1B; // byte
          :Legend = “\n\t Rank Keys: \n\t\t[-1]:  Fill/No Data-Not Processed. \n\t\t [0]:  Good data    – Use with confidence \n\t\t [1]:  Marginal data – Useful, but look at other QA information \n\t\t [2]:  Snow/Ice      – Target covered with snow/ice\n\t\t [3]:  Cloudy        – Target not visible, covered with cloud \n”;

    }
  }
  // global attributes:
  :HDFEOSVersion = “HDFEOS_V2.9”;
  :_History = “Direct read of HDF4 file through CDM library; HDF-EOS StructMetadata information was read”;
  :HDF4_Version = “4.2.1 (NCSA HDF Version 4.2 Release 1-post3, January 27, 2006)”;
  :featureType = “GRID”;
}

更多 Hadoop 相关信息见 Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13

1、前言

HDF 文件是遥感应用中一种常见的数据格式,由于其高度结构化的特点,笔者曾被如何使用 Hadoop 处理 HDF 文件这个问题困扰过相当长的一段时间。于是 Google 各种解决方案,但都没有找到一种理想的处理办法。也曾参考过 HDFGroup 官方发的一篇帖子(网址在这里),里面提供了使用 Hadoop 针对大、中、小 HDF 文件的处理思路。虽然根据他提供的解决办法,按图索骥,肯定能解决如何使用 Hadoop 处理 HDF 文件这个问题,但个人感觉方法偏复杂且需要对 HDF 的数据格式有较深的理解,实现起来不太容易。于是乎,笔者又继续寻找解决方案,终于发现了一种办法,下面将对该方法进行具体说明。

2、MapReduce 主程序

这里主要使用到了 netcdf 的库进行 hdf 数据流的反序列化工作(netcdf 库的下载地址)。与 HDF 官方提供的 Java 库不同,netcdf 仅利用 Java 进行 HDF 文件的读写操作,且这个库支持多种科学数据,包括 HDF4、HDF5 等多种格式。而 HDF 的官方 Java 库中,底层实际仍是用 C 进行 HDF 文件的操作。

CentOS 安装和配置 Hadoop2.2.0  http://www.linuxidc.com/Linux/2014-01/94685.htm

Ubuntu 13.04 上搭建 Hadoop 环境 http://www.linuxidc.com/Linux/2013-06/86106.htm

Ubuntu 12.10 +Hadoop 1.2.1 版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm

Ubuntu 上搭建 Hadoop 环境(单机模式 + 伪分布模式)http://www.linuxidc.com/Linux/2013-01/77681.htm

Ubuntu 下 Hadoop 环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm

单机版搭建 Hadoop 环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm

搭建 Hadoop 环境(在 Winodws 环境下用虚拟机虚拟两个 Ubuntu 系统进行搭建)http://www.linuxidc.com/Linux/2011-12/48894.htm

下面贴出 MapReduce 的 Mapper 函数代码:

package example;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.net.URI;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import ucar.ma2.ArrayShort;
import ucar.nc2.Dimension;
import ucar.nc2.Group;
import ucar.nc2.NetcdfFile;
import ucar.nc2.Variable;

public class ReadMapper extends
  Mapper<Text, BytesWritable, Text, BytesWritable> {

 public void map(Text key, BytesWritable value, Context context)
   throws IOException, InterruptedException {
  String fileName = key.toString();
  NetcdfFile file = NetcdfFile.openInMemory(“hdf4”, value.get());
  Group dataGroup = (file.findGroup(“MOD_Grid_monthly_1km_VI”)).findGroup(“Data_Fields”);
  // 读取到 1_km_monthly_red_reflectance 的变量
  Variable redVar = dataGroup.findVariable(“1_km_monthly_red_reflectance”);
  short[][] data = new short[1200][1200];
  if(dataGroup != null){
   ArrayShort.D2 dataArray;
   // 读取 redVar 中的影像数据
   dataArray = (ArrayShort.D2) redVar.read();
   List<Dimension> dimList = file.getDimensions();
   // 获取影像的 y 方向像元个数
   Dimension ydim = dimList.get(0);
   // 获取影像的 x 方向像元个数
   Dimension xdim = dimList.get(1);
   // 遍历整个影像,读取出像元的值
   for(int i=0;i<xdim.getLength();i++){
    for(int j=0;j<ydim.getLength();j++){
     data[i][j] = dataArray.get(i, j);     
    }   
   }         
  } 
  System.out.print(file.getDetailInfo());
 }
}

注意程序中的 NetcdfFile.openInMemory 方法,该静态方法支持从 byte[] 中构造 HDF 文件,从而实现了 HDF 文件的反序列化操作。下面贴出主程序的示例代码:

package example;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;

import example.WholeFileInputFormat;

public class ReadMain {
 public boolean runJob(String[] args) throws IOException,
   ClassNotFoundException, InterruptedException {
  Configuration conf = new Configuration();
  // conf.set(“mapred.job.tracker”, Utils.JOBTRACKER);
  String rootPath= “/opt/hadoop-2.3.0/etc/hadoop”;
  //String rootPath=”/opt/hadoop-2.3.0/etc/hadoop/”;
  conf.addResource(new Path(rootPath+”yarn-site.xml”));
  conf.addResource(new Path(rootPath+”core-site.xml”));
  conf.addResource(new Path(rootPath+”hdfs-site.xml”));
  conf.addResource(new Path(rootPath+”mapred-site.xml”));
  Job job = new Job(conf);

  job.setJobName(“Job name:” + args[0]);
  job.setJarByClass(ReadMain.class);

  job.setMapperClass(ReadMapper.class);
  job.setMapOutputKeyClass(Text.class);
  job.setMapOutputValueClass(BytesWritable.class);
 
  job.setInputFormatClass(WholeFileInputFormat.class);
  job.setOutputFormatClass(NullOutputFormat.class);
  FileInputFormat.addInputPath(job, new Path(args[1]));
  FileOutputFormat.setOutputPath(job, new Path(args[2]));
  boolean flag = job.waitForCompletion(true);
  return flag;
 }

 public static void main(String[] args) throws ClassNotFoundException,
   IOException, InterruptedException {
  String[] inputPaths = new String[] {“normalizeJob”,
    “hdfs://192.168.168.101:9000/user/hduser/hdf/MOD13A3.A2005274.h00v10.005.2008079143041.hdf”,
    “hdfs://192.168.168.101:9000/user/hduser/test/” };
  ReadMain test = new ReadMain();
  test.runJob(inputPaths);
 }

}

关于 MapReduce 主程序有几点值得说明一下:

1、MapReduce 数据的输入格式为 WholeFileInputFormat.class,即不对数据进行切分。关于该格式,可以参考另外一篇文章:如何通过 Java 程序提交 Yarn 的计算任务(http://www.linuxidc.com/Linux/2014-11/109360.htm),这里不再赘述。

2、本人用的是 Yarn2.3.0 来执行计算任务,如果用老版本的 hadoop,如 1.2.0,则把以上主程序中的 conf.addResource 部分的代码删掉即可。

3、以上 MapReduce 程序中,只用到了 Map 函数,未设置 Reduce 函数。

4、以上程序用到的为 HDF4 格式的数据,按理说,HDF5 格式的数据应该也是支持的。

更多详情见请继续阅读下一页的精彩内容 :http://www.linuxidc.com/Linux/2014-11/109359p2.htm

正文完
星哥玩云-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-20发表,共计15595字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中