共计 5481 个字符,预计需要花费 14 分钟才能阅读完成。
之前写了一篇分析 MapReduce 实现矩阵乘法算法的文章:Mapreduce 实现矩阵乘法的算法思路 http://www.linuxidc.com/Linux/2014-09/106646.htm
为了让大家更直观的了解程序执行,今天编写了实现代码供大家参考。
编程环境:
- java version “1.7.0_40”
- Eclipse Kepler
- Windows7 x64
- Ubuntu 12.04 LTS
- Hadoop2.2.0
- Vmware 9.0.0 build-812388
输入数据:
A 矩阵存放地址:hdfs://singlehadoop:8020/wordspace/dataguru/hadoopdev/week09/matrixmultiply/matrixA/matrixa
A 矩阵内容:
3 4 6
4 0 8
B 矩阵存放地址:hdfs://singlehadoop:8020/wordspace/dataguru/hadoopdev/week09/matrixmultiply/matrixB/matrixb
B 矩阵内容:
2 3
3 0
4 1
实现代码:
一共三个类:
- 驱动类 MMDriver
- Map 类 MMMapper
- Reduce 类 MMReducer
大家可根据个人习惯合并成一个类使用。
MMDriver.java
package dataguru.matrixmultiply;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MMDriver {
public static void main(String[] args) throws Exception {
// set configuration
Configuration conf = new Configuration();
// create job
Job job = new Job(conf,”MatrixMultiply”);
job.setJarByClass(dataguru.matrixmultiply.MMDriver.class);
// specify Mapper & Reducer
job.setMapperClass(dataguru.matrixmultiply.MMMapper.class);
job.setReducerClass(dataguru.matrixmultiply.MMReducer.class);
// specify output types of mapper and reducer
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
// specify input and output DIRECTORIES
Path inPathA = new Path(“hdfs://singlehadoop:8020/wordspace/dataguru/hadoopdev/week09/matrixmultiply/matrixA”);
Path inPathB = new Path(“hdfs://singlehadoop:8020/wordspace/dataguru/hadoopdev/week09/matrixmultiply/matrixB”);
Path outPath = new Path(“hdfs://singlehadoop:8020/wordspace/dataguru/hadoopdev/week09/matrixmultiply/matrixC”);
FileInputFormat.addInputPath(job, inPathA);
FileInputFormat.addInputPath(job, inPathB);
FileOutputFormat.setOutputPath(job,outPath);
// delete output directory
try{
FileSystem hdfs = outPath.getFileSystem(conf);
if(hdfs.exists(outPath))
hdfs.delete(outPath);
hdfs.close();
} catch (Exception e){
e.printStackTrace();
return ;
}
// run the job
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
MMMapper.java
package dataguru.matrixmultiply;
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
public class MMMapper extends Mapper<Object, Text, Text, Text> {
private String tag; //current matrix
private int crow = 2;// 矩阵 A 的行数
private int ccol = 2;// 矩阵 B 的列数
private static int arow = 0; //current arow
private static int brow = 0; //current brow
@Override
protected void setup(Context context) throws IOException,
InterruptedException {
// TODO get inputpath of input data, set to tag
FileSplit fs = (FileSplit)context.getInputSplit();
tag = fs.getPath().getParent().getName();
}
/**
* input data include two matrix files
*/
public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer str = new StringTokenizer(value.toString());
if (“matrixA”.equals(tag)) {//left matrix,output key:x,y
int col = 0;
while (str.hasMoreTokens()) {
String item = str.nextToken(); //current x,y = line,col
for (int i = 0; i < ccol; i++) {
Text outkey = new Text(arow+”,”+i);
Text outvalue = new Text(“a,”+col+”,”+item);
context.write(outkey, outvalue);
System.out.println(outkey+” | “+outvalue);
}
col++;
}
arow++;
}else if (“matrixB”.equals(tag)) {
int col = 0;
while (str.hasMoreTokens()) {
String item = str.nextToken(); //current x,y = line,col
for (int i = 0; i < crow; i++) {
Text outkey = new Text(i+”,”+col);
Text outvalue = new Text(“b,”+brow+”,”+item);
context.write(outkey, outvalue);
System.out.println(outkey+” | “+outvalue);
}
col++;
}
brow++;
}
}
}
MMReducer.java
package dataguru.matrixmultiply;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.StringTokenizer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
public class MMReducer extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
Map<String,String> matrixa = new HashMap<String,String>();
Map<String,String> matrixb = new HashMap<String,String>();
for (Text val : values) {//values example : b,0,2 or a,0,4
StringTokenizer str = new StringTokenizer(val.toString(),”,”);
String sourceMatrix = str.nextToken();
if (“a”.equals(sourceMatrix)) {
matrixa.put(str.nextToken(), str.nextToken()); //(0,4)
}
if (“b”.equals(sourceMatrix)) {
matrixb.put(str.nextToken(), str.nextToken()); //(0,2)
}
}
int result = 0;
Iterator<String> iter = matrixa.keySet().iterator();
while (iter.hasNext()) {
String mapkey = iter.next();
result += Integer.parseInt(matrixa.get(mapkey)) * Integer.parseInt(matrixb.get(mapkey));
}
context.write(key, new Text(String.valueOf(result)));
}
}
Ubuntu 13.04 上搭建 Hadoop 环境 http://www.linuxidc.com/Linux/2013-06/86106.htm
Ubuntu 12.10 +Hadoop 1.2.1 版本集群配置 http://www.linuxidc.com/Linux/2013-09/90600.htm
Ubuntu 上搭建 Hadoop 环境(单机模式 + 伪分布模式)http://www.linuxidc.com/Linux/2013-01/77681.htm
Ubuntu 下 Hadoop 环境的配置 http://www.linuxidc.com/Linux/2012-11/74539.htm
单机版搭建 Hadoop 环境图文教程详解 http://www.linuxidc.com/Linux/2012-02/53927.htm
搭建 Hadoop 环境(在 Winodws 环境下用虚拟机虚拟两个 Ubuntu 系统进行搭建)http://www.linuxidc.com/Linux/2011-12/48894.htm
更多 Hadoop 相关信息见 Hadoop 专题页面 http://www.linuxidc.com/topicnews.aspx?tid=13