共计 11058 个字符,预计需要花费 28 分钟才能阅读完成。
1. 场景
在搭建好 Hadoop+Spark 环境后,现准备在此环境上提交简单的任务到 Spark 进行计算并输出结果。搭建过程:http://www.linuxidc.com/Linux/2017-06/144926.htm
本人比较熟悉 Java 语言,现以 Java 的 WordCount 为例讲解这整个过程,要实现计算出给定文本中每个单词出现的次数。
2. 环境测试
在讲解例子之前,我想先测试一下之前搭建好的环境。
2.1 测试 Hadoop 环境
首先创建一个文件 wordcount.txt 内容如下:
Hello hadoop | |
hello spark | |
hello bigdata | |
yellow banana | |
red apple |
然后执行如下命令:
hadoop fs -mkdir -p /Hadoop/Input(在 HDFS 创建目录)
hadoop fs -put wordcount.txt /Hadoop/Input(将 wordcount.txt 文件上传到 HDFS)
hadoop fs -ls /Hadoop/Input(查看上传的文件)
hadoop fs -text /Hadoop/Input/wordcount.txt(查看文件内容)
2.2Spark 环境测试
我使用 spark-shell,做一个简单的 WordCount 的测试。我就用上面 Hadoop 测试上传到 HDFS 的文件 wordcount.txt。
首先启动 spark-shell 命令:
spark-shell
然后直接输入 scala 语句:
val file=sc.textFile(“hdfs://Master:9000/Hadoop/Input/wordcount.txt”)
val rdd = file.flatMap(line => line.split(” “)).map(word => (word,1)).reduceByKey(_+_)
rdd.collect()
rdd.foreach(println)
退出使用如下命令:
:quit
这样环境测试就结束了。
3.Java 实现单词计数
package com.example.spark; | |
import java.util.Arrays; | |
import java.util.Iterator; | |
import java.util.List; | |
import java.util.regex.Pattern; | |
import org.apache.spark.SparkConf; | |
import org.apache.spark.api.java.JavaPairRDD; | |
import org.apache.spark.api.java.JavaRDD; | |
import org.apache.spark.api.java.JavaSparkContext; | |
import org.apache.spark.api.java.function.FlatMapFunction; | |
import org.apache.spark.api.java.function.Function2; | |
import org.apache.spark.api.java.function.PairFunction; | |
import scala.Tuple2; | |
public final class WordCount {private static final Pattern SPACE = Pattern.compile(""); | |
public static void main(String[] args) throws Exception {SparkConf conf = new SparkConf().setAppName("kevin's first spark app"); | |
JavaSparkContext sc = new JavaSparkContext(conf); | |
JavaRDD<String> lines = sc.textFile(args[0]).cache(); | |
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {private static final long serialVersionUID = 1L; | |
public Iterator<String> call(String s) {return Arrays.asList(SPACE.split(s)).iterator();} | |
}); | |
JavaPairRDD<String, Integer> ones = words.mapToPair(new PairFunction<String, String, Integer>() {private static final long serialVersionUID = 1L; | |
public Tuple2<String, Integer> call(String s) {return new Tuple2<String, Integer>(s, 1); | |
} | |
}); | |
JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {private static final long serialVersionUID = 1L; | |
public Integer call(Integer i1, Integer i2) {return i1 + i2; | |
} | |
}); | |
List<Tuple2<String, Integer>> output = counts.collect(); | |
for (Tuple2<?, ?> tuple : output) {System.out.println(tuple._1() + ":" + tuple._2()); | |
} | |
sc.close();} | |
} |
4. 任务提交实现
将上面 Java 实现的单词计数打成 jar 包 spark-example-0.0.1-SNAPSHOT.jar,并且将 jar 包上传到 Master 节点,我是将 jar 包上传到 /opt 目录下,本文将以两种方式提交任务到 spark,第一种是以 spark-submit 命令的方式提交任务,第二种是以 java web 的方式提交任务。
4.1 以 spark-submit 命令的方式提交任务
spark-submit –master spark://114.55.246.88:7077 –class com.example.spark.WordCount /opt/spark-example-0.0.1-SNAPSHOT.jar hdfs://Master:9000/Hadoop/Input/wordcount.txt
4.2 以 java web 的方式提交任务
我是用 spring boot 搭建的 java web 框架,实现代码如下:
1)新建 maven 项目 spark-submit
2)pom.xml 文件内容,这里要注意 spark 的依赖 jar 包要与 scala 的版本相对应,如 spark-core_2.11,这后面 2.11 就是你安装的 scala 的版本。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | |
<modelVersion>4.0.0</modelVersion> | |
<parent> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-parent</artifactId> | |
<version>1.4.1.RELEASE</version> | |
</parent> | |
<artifactId>spark-submit</artifactId> | |
<description>spark-submit</description> | |
<properties> | |
<start-class>com.example.spark.SparkSubmitApplication</start-class> | |
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> | |
<java.version>1.8</java.version> | |
<commons.version>3.4</commons.version> | |
<org.apache.spark-version>2.1.0</org.apache.spark-version> | |
</properties> | |
<dependencies> | |
<dependency> | |
<groupId>org.apache.commons</groupId> | |
<artifactId>commons-lang3</artifactId> | |
<version>${commons.version}</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.tomcat.embed</groupId> | |
<artifactId>tomcat-embed-jasper</artifactId> | |
<scope>provided</scope> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-data-jpa</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-data-Redis</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-test</artifactId> | |
<scope>test</scope> | |
</dependency> | |
<dependency> | |
<groupId>com.jayway.jsonpath</groupId> | |
<artifactId>json-path</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-web</artifactId> | |
<exclusions> | |
<exclusion> | |
<artifactId>spring-boot-starter-tomcat</artifactId> | |
<groupId>org.springframework.boot</groupId> | |
</exclusion> | |
</exclusions> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-jetty</artifactId> | |
<exclusions> | |
<exclusion> | |
<groupId>org.eclipse.jetty.websocket</groupId> | |
<artifactId>*</artifactId> | |
</exclusion> | |
</exclusions> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-jetty</artifactId> | |
<scope>provided</scope> | |
</dependency> | |
<dependency> | |
<groupId>javax.servlet</groupId> | |
<artifactId>jstl</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.eclipse.jetty</groupId> | |
<artifactId>apache-jsp</artifactId> | |
<scope>provided</scope> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-data-solr</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-data-jpa</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.springframework.boot</groupId> | |
<artifactId>spring-boot-starter-web</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>javax.servlet</groupId> | |
<artifactId>jstl</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.spark</groupId> | |
<artifactId>spark-core_2.11</artifactId> | |
<version>${org.apache.spark-version}</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.spark</groupId> | |
<artifactId>spark-sql_2.11</artifactId> | |
<version>${org.apache.spark-version}</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.spark</groupId> | |
<artifactId>spark-hive_2.11</artifactId> | |
<version>${org.apache.spark-version}</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.spark</groupId> | |
<artifactId>spark-streaming_2.11</artifactId> | |
<version>${org.apache.spark-version}</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.hadoop</groupId> | |
<artifactId>hadoop-client</artifactId> | |
<version>2.7.3</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.spark</groupId> | |
<artifactId>spark-streaming-kafka_2.11</artifactId> | |
<version>1.6.3</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.spark</groupId> | |
<artifactId>spark-graphx_2.11</artifactId> | |
<version>${org.apache.spark-version}</version> | |
</dependency> | |
<dependency> | |
<groupId>org.apache.maven.plugins</groupId> | |
<artifactId>maven-assembly-plugin</artifactId> | |
<version>3.0.0</version> | |
</dependency> | |
<dependency> | |
<groupId>com.fasterxml.jackson.core</groupId> | |
<artifactId>jackson-core</artifactId> | |
<version>2.6.5</version> | |
</dependency> | |
<dependency> | |
<groupId>com.fasterxml.jackson.core</groupId> | |
<artifactId>jackson-databind</artifactId> | |
<version>2.6.5</version> | |
</dependency> | |
<dependency> | |
<groupId>com.fasterxml.jackson.core</groupId> | |
<artifactId>jackson-annotations</artifactId> | |
<version>2.6.5</version> | |
</dependency> | |
</dependencies> | |
<packaging>war</packaging> | |
<repositories> | |
<repository> | |
<id>spring-snapshots</id> | |
<name>Spring Snapshots</name> | |
<url>https://repo.spring.io/snapshot</url> | |
<snapshots> | |
<enabled>true</enabled> | |
</snapshots> | |
</repository> | |
<repository> | |
<id>spring-milestones</id> | |
<name>Spring Milestones</name> | |
<url>https://repo.spring.io/milestone</url> | |
<snapshots> | |
<enabled>false</enabled> | |
</snapshots> | |
</repository> | |
<repository> | |
<id>maven2</id> | |
<url>http://repo1.maven.org/maven2/</url> | |
</repository> | |
</repositories> | |
<pluginRepositories> | |
<pluginRepository> | |
<id>spring-snapshots</id> | |
<name>Spring Snapshots</name> | |
<url>https://repo.spring.io/snapshot</url> | |
<snapshots> | |
<enabled>true</enabled> | |
</snapshots> | |
</pluginRepository> | |
<pluginRepository> | |
<id>spring-milestones</id> | |
<name>Spring Milestones</name> | |
<url>https://repo.spring.io/milestone</url> | |
<snapshots> | |
<enabled>false</enabled> | |
</snapshots> | |
</pluginRepository> | |
</pluginRepositories> | |
<build> | |
<plugins> | |
<plugin> | |
<artifactId>maven-war-plugin</artifactId> | |
<configuration> | |
<warSourceDirectory>src/main/webapp</warSourceDirectory> | |
</configuration> | |
</plugin> | |
<plugin> | |
<groupId>org.mortbay.jetty</groupId> | |
<artifactId>jetty-maven-plugin</artifactId> | |
<configuration> | |
<systemProperties> | |
<systemProperty> | |
<name>spring.profiles.active</name> | |
<value>development</value> | |
</systemProperty> | |
<systemProperty> | |
<name>org.eclipse.jetty.server.Request.maxFormContentSize</name> | |
<!-- - 1 代表不作限制 --> | |
<value>600000</value> | |
</systemProperty> | |
</systemProperties> | |
<useTestClasspath>true</useTestClasspath> | |
<webAppConfig> | |
<contextPath>/</contextPath> | |
</webAppConfig> | |
<connectors> | |
<connector implementation="org.eclipse.jetty.server.nio.SelectChannelConnector"> | |
<port>7080</port> | |
</connector> | |
</connectors> | |
</configuration> | |
</plugin> | |
</plugins> | |
</build> | |
</project> |
3)SubmitJobToSpark.java
package com.example.spark; | |
import org.apache.spark.deploy.SparkSubmit; | |
/** | |
* @author kevin | |
* | |
*/ | |
public class SubmitJobToSpark {public static void submitJob() {String[] args = new String[] { "--master", "spark://114.55.246.88:7077", "--name", "test java submit job to spark", "--class", "com.example.spark.WordCount", "/opt/spark-example-0.0.1-SNAPSHOT.jar", "hdfs://Master:9000/Hadoop/Input/wordcount.txt" }; | |
SparkSubmit.main(args); | |
} | |
} |
4)SparkController.java
package com.example.spark.web.controller; | |
import javax.servlet.http.HttpServletRequest; | |
import javax.servlet.http.HttpServletResponse; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.springframework.stereotype.Controller; | |
import org.springframework.web.bind.annotation.RequestMapping; | |
import org.springframework.web.bind.annotation.RequestMethod; | |
import org.springframework.web.bind.annotation.ResponseBody; | |
import com.example.spark.SubmitJobToSpark; | |
public class SparkController {private Logger logger = LoggerFactory.getLogger(SparkController.class); | |
public String sparkSubmit(HttpServletRequest request, HttpServletResponse response) {logger.info("start submit spark tast..."); | |
SubmitJobToSpark.submitJob(); | |
return "hello"; | |
} | |
} |
5)将项目 spark-submit 打成 war 包部署到 Master 节点 tomcat 上,访问如下请求:
http://114.55.246.88:9090/spark-submit/spark/sparkSubmit
在 tomcat 的 log 中能看到计算的结果。
更多 Spark 相关教程见以下内容:
CentOS 7.0 下安装并配置 Spark http://www.linuxidc.com/Linux/2015-08/122284.htm
Spark1.0.0 部署指南 http://www.linuxidc.com/Linux/2014-07/104304.htm
Spark2.0 安装配置文档 http://www.linuxidc.com/Linux/2016-09/135352.htm
Spark 1.5、Hadoop 2.7 集群环境搭建 http://www.linuxidc.com/Linux/2016-09/135067.htm
Spark 官方文档 – 中文翻译 http://www.linuxidc.com/Linux/2016-04/130621.htm
CentOS 6.2(64 位)下安装 Spark0.8.0 详细记录 http://www.linuxidc.com/Linux/2014-06/102583.htm
Spark2.0.2 Hadoop2.6.4 全分布式配置详解 http://www.linuxidc.com/Linux/2016-11/137367.htm
Ubuntu 14.04 LTS 安装 Spark 1.6.0(伪分布式)http://www.linuxidc.com/Linux/2016-03/129068.htm
Spark 的详细介绍:请点这里
Spark 的下载地址:请点这里
本文永久更新链接地址:http://www.linuxidc.com/Linux/2017-06/144928.htm
