共计 15595 个字符,预计需要花费 39 分钟才能阅读完成。
1、前言
HDF 文件是遥感应用中一种常见的数据格式,由于其高度结构化的特点,笔者曾被如何使用 Hadoop 处理 HDF 文件这个问题困扰过相当长的一段时间。于是 Google 各种解决方案,但都没有找到一种理想的处理办法。也曾参考过 HDFGroup 官方发的一篇帖子(网址在这里),里面提供了使用 Hadoop 针对大、中、小 HDF 文件的处理思路。虽然根据他提供的解决办法,按图索骥,肯定能解决如何使用 Hadoop 处理 HDF 文件这个问题,但个人感觉方法偏复杂且需要对 HDF 的数据格式有较深的理解,实现起来不太容易。于是乎,笔者又继续寻找解决方案,终于发现了一种办法,下面将对该方法进行具体说明。
2、MapReduce 主程序
这里主要使用到了 netcdf 的库进行 hdf 数据流的反序列化工作(netcdf 库的下载地址)。与 HDF 官方提供的 Java 库不同,netcdf 仅利用 Java 进行 HDF 文件的读写操作,且这个库支持多种科学数据,包括 HDF4、HDF5 等多种格式。而 HDF 的官方 Java 库中,底层实际仍是用 C 进行 HDF 文件的操作。
CentOS 安装和配置 Hadoop2.2.0 http://www.linuxidc.com/Linux/2014-01/94685.htm
Ubuntu 13.04 上搭建 Hadoop 环境 http://www.linuxidc.com/Linux/2013-06/86106.htm
Ubuntu 12.10 +Hadoop 1.2.1 版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm
Ubuntu 上搭建 Hadoop 环境(单机模式 + 伪分布模式)http://www.linuxidc.com/Linux/2013-01/77681.htm
Ubuntu 下 Hadoop 环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm
单机版搭建 Hadoop 环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm
搭建 Hadoop 环境(在 Winodws 环境下用虚拟机虚拟两个 Ubuntu 系统进行搭建)http://www.linuxidc.com/Linux/2011-12/48894.htm
下面贴出 MapReduce 的 Mapper 函数代码:
package example;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import ucar.ma2.ArrayShort;
import ucar.nc2.Dimension;
import ucar.nc2.Group;
import ucar.nc2.NetcdfFile;
import ucar.nc2.Variable;
public class ReadMapper extends
Mapper<Text, BytesWritable, Text, BytesWritable> {
public void map(Text key, BytesWritable value, Context context)
throws IOException, InterruptedException {
String fileName = key.toString();
NetcdfFile file = NetcdfFile.openInMemory(“hdf4”, value.get());
Group dataGroup = (file.findGroup(“MOD_Grid_monthly_1km_VI”)).findGroup(“Data_Fields”);
// 读取到 1_km_monthly_red_reflectance 的变量
Variable redVar = dataGroup.findVariable(“1_km_monthly_red_reflectance”);
short[][] data = new short[1200][1200];
if(dataGroup != null){
ArrayShort.D2 dataArray;
// 读取 redVar 中的影像数据
dataArray = (ArrayShort.D2) redVar.read();
List<Dimension> dimList = file.getDimensions();
// 获取影像的 y 方向像元个数
Dimension ydim = dimList.get(0);
// 获取影像的 x 方向像元个数
Dimension xdim = dimList.get(1);
// 遍历整个影像,读取出像元的值
for(int i=0;i<xdim.getLength();i++){
for(int j=0;j<ydim.getLength();j++){
data[i][j] = dataArray.get(i, j);
}
}
}
System.out.print(file.getDetailInfo());
}
}
注意程序中的 NetcdfFile.openInMemory 方法,该静态方法支持从 byte[] 中构造 HDF 文件,从而实现了 HDF 文件的反序列化操作。下面贴出主程序的示例代码:
package example;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import example.WholeFileInputFormat;
public class ReadMain {
public boolean runJob(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
// conf.set(“mapred.job.tracker”, Utils.JOBTRACKER);
String rootPath= “/opt/hadoop-2.3.0/etc/hadoop”;
//String rootPath=”/opt/hadoop-2.3.0/etc/hadoop/”;
conf.addResource(new Path(rootPath+”yarn-site.xml”));
conf.addResource(new Path(rootPath+”core-site.xml”));
conf.addResource(new Path(rootPath+”hdfs-site.xml”));
conf.addResource(new Path(rootPath+”mapred-site.xml”));
Job job = new Job(conf);
job.setJobName(“Job name:” + args[0]);
job.setJarByClass(ReadMain.class);
job.setMapperClass(ReadMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(BytesWritable.class);
job.setInputFormatClass(WholeFileInputFormat.class);
job.setOutputFormatClass(NullOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[1]));
FileOutputFormat.setOutputPath(job, new Path(args[2]));
boolean flag = job.waitForCompletion(true);
return flag;
}
public static void main(String[] args) throws ClassNotFoundException,
IOException, InterruptedException {
String[] inputPaths = new String[] {“normalizeJob”,
“hdfs://192.168.168.101:9000/user/hduser/hdf/MOD13A3.A2005274.h00v10.005.2008079143041.hdf”,
“hdfs://192.168.168.101:9000/user/hduser/test/” };
ReadMain test = new ReadMain();
test.runJob(inputPaths);
}
}
关于 MapReduce 主程序有几点值得说明一下:
1、MapReduce 数据的输入格式为 WholeFileInputFormat.class,即不对数据进行切分。关于该格式,可以参考另外一篇文章:如何通过 Java 程序提交 Yarn 的计算任务(http://www.linuxidc.com/Linux/2014-11/109360.htm),这里不再赘述。
2、本人用的是 Yarn2.3.0 来执行计算任务,如果用老版本的 hadoop,如 1.2.0,则把以上主程序中的 conf.addResource 部分的代码删掉即可。
3、以上 MapReduce 程序中,只用到了 Map 函数,未设置 Reduce 函数。
4、以上程序用到的为 HDF4 格式的数据,按理说,HDF5 格式的数据应该也是支持的。
更多详情见请继续阅读下一页的精彩内容 :http://www.linuxidc.com/Linux/2014-11/109359p2.htm
3、HDF 数据的格式
由于 HDF 数据高度结构化,因此在 netcdf 库的使用中,需要使用类似于 ” 标签 ” 的方式来访问 HDF 中的具体数据。下面贴出 netcdf 中读出来的 HDF 数据的具体格式信息(即使用 file.getDetailInfo() 函数,打印出来的信息):
注意,ReadMapper 函数中出现的类似于“MOD_Grid_monthly_1km_VI”、”Data_Fields” 等信息,即根据以下 HDF 数据的格式信息得到的。
netcdf D:/2005-274/MOD13A3.A2005274.h00v08.005.2008079142757.hdf {
variables:
char StructMetadata.0(32000);
char CoreMetadata.0(40874);
char ArchiveMetadata.0(6530);
group: MOD_Grid_monthly_1km_VI {
variables:
short _HDFEOS_CRS;
:Projection = “GCTP_SNSOID”;
:UpperLeftPointMtrs = -2.0015109354E7, 1111950.519667; // double
:LowerRightMtrs = -1.8903158834333E7, -0.0; // double
:ProjParams = 6371007.181, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0; // double
:SphereCode = “-1”;
group: Data_Fields {
dimensions:
YDim = 1200;
XDim = 1200;
variables:
short 1_km_monthly_NDVI(YDim=1200, XDim=1200);
:long_name = “1 km monthly NDVI”;
:units = “NDVI”;
:valid_range = -2000S, 10000S; // short
:_FillValue = -3000S; // short
:scale_factor = 10000.0; // double
:scale_factor_err = 0.0; // double
:add_offset = 0.0; // double
:add_offset_err = 0.0; // double
:calibrated_nt = 5; // int
short 1_km_monthly_EVI(YDim=1200, XDim=1200);
:long_name = “1 km monthly EVI”;
:units = “EVI”;
:valid_range = -2000S, 10000S; // short
:_FillValue = -3000S; // short
:scale_factor = 10000.0; // double
:scale_factor_err = 0.0; // double
:add_offset = 0.0; // double
:add_offset_err = 0.0; // double
:calibrated_nt = 5; // int
short 1_km_monthly_VI_Quality(YDim=1200, XDim=1200);
:_Unsigned = “true”;
:long_name = “1 km monthly VI Quality”;
:units = “bit field”;
:valid_range = 0S, -2S; // short
:_FillValue = -1S; // short
:Legend = “\n\t Bit Fields Description (Right to Left): \n\t[0-1] : MODLAND_QA [2 bit range]\n\t\t 00: VI produced, good quality \n\t\t 01: VI produced, but check other QA \n\t\t 10: Pixel produced, but most probably cloudy \n\t\t 11: Pixel not produced due to other reasons than clouds \n\t[2-5] : VI usefulness [4 bit range] \n\t\t 0000: Highest quality \n\t\t 0001: Lower quality \n\t\t 0010..1010: Decreasing quality \n\t\t 1100: Lowest quality \n\t\t 1101: Quality so low that it is not useful \n\t\t 1110: L1B data faulty \n\t\t 1111: Not useful for any other reason/not processed \n\t[6-7] : Aerosol quantity [2 bit range] \n\t\t 00: Climatology \n\t\t 01: Low \n\t\t 10: Average \n\t\t 11: High (11) \n\t[8] : Adjacent cloud detected; [1 bit range] \n\t\t 1: Yes \n\t\t 0: No \n\t[9] : Atmosphere BRDF correction performed [1 bit range] \n\t\t 1: Yes \n\t\t 0: No \n\t[10] : Mixed clouds [1 bit range] \n\t\t 1: Yes \n\t\t 0: No \n\t[11-13] : Land/Water Flag [3 bit range] \n\t\t 000: Shallow ocean \n\t\t 001: Land (Nothing else but land) \n\t\t 010: Ocean coastlines and lake shorelines \n\t\t 011: Shallow inland water \n\t\t 100: Ephemeral water \n\t\t 101: Deep inland water \n\t\t 110: Moderate or continental ocean \n\t\t 111: Deep ocean \n\t[14] : Possible snow/ice [1 bit range] \n\t\t 1: Yes \n\t\t 0: No \n\t[15] : Possible shadow [1 bit range] \n\t\t 1: Yes \n\t\t 0: No \n”;
short 1_km_monthly_red_reflectance(YDim=1200, XDim=1200);
:long_name = “1 km monthly red reflectance”;
:units = “reflectance”;
:valid_range = 0S, 10000S; // short
:_FillValue = -1000S; // short
:scale_factor = 10000.0; // double
:scale_factor_err = 0.0; // double
:add_offset = 0.0; // double
:add_offset_err = 0.0; // double
:calibrated_nt = 5; // int
short 1_km_monthly_NIR_reflectance(YDim=1200, XDim=1200);
:long_name = “1 km monthly NIR reflectance”;
:units = “reflectance”;
:valid_range = 0S, 10000S; // short
:_FillValue = -1000S; // short
:scale_factor = 10000.0; // double
:scale_factor_err = 0.0; // double
:add_offset = 0.0; // double
:add_offset_err = 0.0; // double
:calibrated_nt = 5; // int
short 1_km_monthly_blue_reflectance(YDim=1200, XDim=1200);
:long_name = “1 km monthly blue reflectance”;
:units = “reflectance”;
:valid_range = 0S, 10000S; // short
:_FillValue = -1000S; // short
:scale_factor = 10000.0; // double
:scale_factor_err = 0.0; // double
:add_offset = 0.0; // double
:add_offset_err = 0.0; // double
:calibrated_nt = 5; // int
short 1_km_monthly_MIR_reflectance(YDim=1200, XDim=1200);
:long_name = “1 km monthly MIR reflectance”;
:units = “reflectance”;
:valid_range = 0S, 10000S; // short
:_FillValue = -1000S; // short
:Legend = “\n\t The MIR band saved in the VI product is MODIS band 7 \n\t\t Bandwidth : 2105-2155 nm \n\t\t Band center: 2130 nm \n”;
:scale_factor = 10000.0; // double
:scale_factor_err = 0.0; // double
:add_offset = 0.0; // double
:add_offset_err = 0.0; // double
:calibrated_nt = 5; // int
short 1_km_monthly_view_zenith_angle(YDim=1200, XDim=1200);
:long_name = “1 km monthly view zenith angle”;
:units = “degrees”;
:valid_range = -9000S, 9000S; // short
:_FillValue = -10000S; // short
:scale_factor = 100.0; // double
:scale_factor_err = 0.0; // double
:add_offset = 0.0; // double
:add_offset_err = 0.0; // double
:calibrated_nt = 5; // int
short 1_km_monthly_sun_zenith_angle(YDim=1200, XDim=1200);
:long_name = “1 km monthly sun zenith angle”;
:units = “degrees”;
:valid_range = -9000S, 9000S; // short
:_FillValue = -10000S; // short
:scale_factor = 100.0; // double
:scale_factor_err = 0.0; // double
:add_offset = 0.0; // double
:add_offset_err = 0.0; // double
:calibrated_nt = 5; // int
short 1_km_monthly_relative_azimuth_angle(YDim=1200, XDim=1200);
:long_name = “1 km monthly relative azimuth angle”;
:units = “degrees”;
:valid_range = -3600S, 3600S; // short
:_FillValue = -4000S; // short
:scale_factor = 10.0; // double
:scale_factor_err = 0.0; // double
:add_offset = 0.0; // double
:add_offset_err = 0.0; // double
:calibrated_nt = 5; // int
byte 1_km_monthly_pixel_raliability(YDim=1200, XDim=1200);
:long_name = “1 km monthly pixel raliability”;
:units = “rank”;
:valid_range = 0B, 3B; // byte
:_FillValue = -1B; // byte
:Legend = “\n\t Rank Keys: \n\t\t[-1]: Fill/No Data-Not Processed. \n\t\t [0]: Good data – Use with confidence \n\t\t [1]: Marginal data – Useful, but look at other QA information \n\t\t [2]: Snow/Ice – Target covered with snow/ice\n\t\t [3]: Cloudy – Target not visible, covered with cloud \n”;
}
}
// global attributes:
:HDFEOSVersion = “HDFEOS_V2.9”;
:_History = “Direct read of HDF4 file through CDM library; HDF-EOS StructMetadata information was read”;
:HDF4_Version = “4.2.1 (NCSA HDF Version 4.2 Release 1-post3, January 27, 2006)”;
:featureType = “GRID”;
}
更多 Hadoop 相关信息见 Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13
1、前言
HDF 文件是遥感应用中一种常见的数据格式,由于其高度结构化的特点,笔者曾被如何使用 Hadoop 处理 HDF 文件这个问题困扰过相当长的一段时间。于是 Google 各种解决方案,但都没有找到一种理想的处理办法。也曾参考过 HDFGroup 官方发的一篇帖子(网址在这里),里面提供了使用 Hadoop 针对大、中、小 HDF 文件的处理思路。虽然根据他提供的解决办法,按图索骥,肯定能解决如何使用 Hadoop 处理 HDF 文件这个问题,但个人感觉方法偏复杂且需要对 HDF 的数据格式有较深的理解,实现起来不太容易。于是乎,笔者又继续寻找解决方案,终于发现了一种办法,下面将对该方法进行具体说明。
2、MapReduce 主程序
这里主要使用到了 netcdf 的库进行 hdf 数据流的反序列化工作(netcdf 库的下载地址)。与 HDF 官方提供的 Java 库不同,netcdf 仅利用 Java 进行 HDF 文件的读写操作,且这个库支持多种科学数据,包括 HDF4、HDF5 等多种格式。而 HDF 的官方 Java 库中,底层实际仍是用 C 进行 HDF 文件的操作。
CentOS 安装和配置 Hadoop2.2.0 http://www.linuxidc.com/Linux/2014-01/94685.htm
Ubuntu 13.04 上搭建 Hadoop 环境 http://www.linuxidc.com/Linux/2013-06/86106.htm
Ubuntu 12.10 +Hadoop 1.2.1 版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm
Ubuntu 上搭建 Hadoop 环境(单机模式 + 伪分布模式)http://www.linuxidc.com/Linux/2013-01/77681.htm
Ubuntu 下 Hadoop 环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm
单机版搭建 Hadoop 环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm
搭建 Hadoop 环境(在 Winodws 环境下用虚拟机虚拟两个 Ubuntu 系统进行搭建)http://www.linuxidc.com/Linux/2011-12/48894.htm
下面贴出 MapReduce 的 Mapper 函数代码:
package example;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.net.URI;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import ucar.ma2.ArrayShort;
import ucar.nc2.Dimension;
import ucar.nc2.Group;
import ucar.nc2.NetcdfFile;
import ucar.nc2.Variable;
public class ReadMapper extends
Mapper<Text, BytesWritable, Text, BytesWritable> {
public void map(Text key, BytesWritable value, Context context)
throws IOException, InterruptedException {
String fileName = key.toString();
NetcdfFile file = NetcdfFile.openInMemory(“hdf4”, value.get());
Group dataGroup = (file.findGroup(“MOD_Grid_monthly_1km_VI”)).findGroup(“Data_Fields”);
// 读取到 1_km_monthly_red_reflectance 的变量
Variable redVar = dataGroup.findVariable(“1_km_monthly_red_reflectance”);
short[][] data = new short[1200][1200];
if(dataGroup != null){
ArrayShort.D2 dataArray;
// 读取 redVar 中的影像数据
dataArray = (ArrayShort.D2) redVar.read();
List<Dimension> dimList = file.getDimensions();
// 获取影像的 y 方向像元个数
Dimension ydim = dimList.get(0);
// 获取影像的 x 方向像元个数
Dimension xdim = dimList.get(1);
// 遍历整个影像,读取出像元的值
for(int i=0;i<xdim.getLength();i++){
for(int j=0;j<ydim.getLength();j++){
data[i][j] = dataArray.get(i, j);
}
}
}
System.out.print(file.getDetailInfo());
}
}
注意程序中的 NetcdfFile.openInMemory 方法,该静态方法支持从 byte[] 中构造 HDF 文件,从而实现了 HDF 文件的反序列化操作。下面贴出主程序的示例代码:
package example;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import example.WholeFileInputFormat;
public class ReadMain {
public boolean runJob(String[] args) throws IOException,
ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
// conf.set(“mapred.job.tracker”, Utils.JOBTRACKER);
String rootPath= “/opt/hadoop-2.3.0/etc/hadoop”;
//String rootPath=”/opt/hadoop-2.3.0/etc/hadoop/”;
conf.addResource(new Path(rootPath+”yarn-site.xml”));
conf.addResource(new Path(rootPath+”core-site.xml”));
conf.addResource(new Path(rootPath+”hdfs-site.xml”));
conf.addResource(new Path(rootPath+”mapred-site.xml”));
Job job = new Job(conf);
job.setJobName(“Job name:” + args[0]);
job.setJarByClass(ReadMain.class);
job.setMapperClass(ReadMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(BytesWritable.class);
job.setInputFormatClass(WholeFileInputFormat.class);
job.setOutputFormatClass(NullOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[1]));
FileOutputFormat.setOutputPath(job, new Path(args[2]));
boolean flag = job.waitForCompletion(true);
return flag;
}
public static void main(String[] args) throws ClassNotFoundException,
IOException, InterruptedException {
String[] inputPaths = new String[] {“normalizeJob”,
“hdfs://192.168.168.101:9000/user/hduser/hdf/MOD13A3.A2005274.h00v10.005.2008079143041.hdf”,
“hdfs://192.168.168.101:9000/user/hduser/test/” };
ReadMain test = new ReadMain();
test.runJob(inputPaths);
}
}
关于 MapReduce 主程序有几点值得说明一下:
1、MapReduce 数据的输入格式为 WholeFileInputFormat.class,即不对数据进行切分。关于该格式,可以参考另外一篇文章:如何通过 Java 程序提交 Yarn 的计算任务(http://www.linuxidc.com/Linux/2014-11/109360.htm),这里不再赘述。
2、本人用的是 Yarn2.3.0 来执行计算任务,如果用老版本的 hadoop,如 1.2.0,则把以上主程序中的 conf.addResource 部分的代码删掉即可。
3、以上 MapReduce 程序中,只用到了 Map 函数,未设置 Reduce 函数。
4、以上程序用到的为 HDF4 格式的数据,按理说,HDF5 格式的数据应该也是支持的。
更多详情见请继续阅读下一页的精彩内容 :http://www.linuxidc.com/Linux/2014-11/109359p2.htm