共计 3227 个字符,预计需要花费 9 分钟才能阅读完成。
环境:Spark-1.5.0 HBase-1.0.0。
场景:HBase 中按天分表存数据,要求将任意时间段的数据合并成一个 RDD 以做后续计算。
尝试 1: 寻找一次读取多个表的 API,找到最接近的是一个叫 MultiTableInputFormat 的东西,它在 MapReduce 中使用良好,
但没有找到用于 RDD 读 HBase 的方法。
尝试 2: 每个表生成一个 RDD,再用 union 合并,代码逻辑如下:
var totalRDD = xxx // 读取第一张表
for {// 循环读表并合并到 totalRDD
val sRDD = xxx
totalRDD.union(sRDD)
}
代码放到集群上执行,totalRDD 并不是正确的 union 结果,用 var 还真是不行。
尝试 3: 思路类似 2,但使用 SparkContext.union 来一次合并多个 RDD,代码逻辑如下:
var rddSet: xxx = Set() // 创建 RDD 列表
dateSet.foreach(date => { // 将所有表的 RDD 放入列表中
val sRDD = xxx
rddSet += sRDD
}
val totalRDD = sc.union(rddSet.toSeq) // 合并列表中的所有 RDD
完整代码如下:
import Java.text.SimpleDateFormat
import org.apache.Hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapreduce.TableInputFormat
import scala.collection.mutable.Set
/**
* 时间处理类
*/
object Htime {
/**
* 根据起止日期获取日期列表
* 例如起止时间为 20160118,20160120, 那么日期列表为(20160118,20160119,20160120)
*
* @param sDate 开始日期
* @param eDate 结束日期
* @return 日期列表
*/
def getDateSet(sDate:String, eDate:String): Set[String] = {
// 定义要生成的日期列表
var dateSet: Set[String] = Set()
// 定义日期格式
val sdf = new SimpleDateFormat(“yyyyMMdd”)
// 按照上边定义的日期格式将起止时间转化成毫秒数
val sDate_ms = sdf.parse(sDate).getTime
val eDate_ms = sdf.parse(eDate).getTime
// 计算一天的毫秒数用于后续迭代
val day_ms = 24*60*60*1000
// 循环生成日期列表
var tm = sDate_ms
while (tm <= eDate_ms) {
val dateStr = sdf.format(tm)
dateSet += dateStr
tm = tm + day_ms
}
// 日期列表作为返回
dateSet
}
}
/**
* 从 HBase 中读取行为数据计算人群分类
*/
object Classify {
/**
* @param args 命令行参数, 第一个参数为行为数据开始日期, 第二个为结束日期, 例如 20160118
*/
def main(args: Array[String]) {
// 命令行参数个数必须为 2
if (args.length != 2) {
System.err.println(“ 参数个数错误 ”)
System.err.println(“Usage: Classify < 开始日期 > < 结束日期 >”)
System.exit(1)
}
// 获取命令行参数中的行为数据起止日期
val startDate = args(0)
val endDate = args(1)
// 根据起止日志获取日期列表
// 例如起止时间为 20160118,20160120, 那么日期列表为(20160118,20160119,20160120)
val dateSet = Htime.getDateSet(startDate, endDate)
// Spark 上下文
val sparkConf = new SparkConf().setAppName(“Classify”)
val sc = new SparkContext(sparkConf)
// 初始化 HBase 配置
val conf = HBaseConfiguration.create()
// 按照日期列表读出多个 RDD 存在一个 Set 中, 再用 SparkContext.union()合并成一个 RDD
var rddSet: Set[RDD[(ImmutableBytesWritable, Result)] ] = Set()
dateSet.foreach(date => {
conf.set(TableInputFormat.INPUT_TABLE, “behaviour_test_” + date) // 设置表名
val bRdd: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
rddSet += bRdd
})
val behavRdd = sc.union(rddSet.toSeq)
behavRdd.collect().foreach(println)
}
}
更多 Spark 相关教程见以下内容:
CentOS 7.0 下安装并配置 Spark http://www.linuxidc.com/Linux/2015-08/122284.htm
Spark1.0.0 部署指南 http://www.linuxidc.com/Linux/2014-07/104304.htm
CentOS 6.2(64 位)下安装 Spark0.8.0 详细记录 http://www.linuxidc.com/Linux/2014-06/102583.htm
Spark 简介及其在 Ubuntu 下的安装使用 http://www.linuxidc.com/Linux/2013-08/88606.htm
安装 Spark 集群(在 CentOS 上) http://www.linuxidc.com/Linux/2013-08/88599.htm
Hadoop vs Spark 性能对比 http://www.linuxidc.com/Linux/2013-08/88597.htm
Spark 安装与学习 http://www.linuxidc.com/Linux/2013-08/88596.htm
Spark 并行计算模型 http://www.linuxidc.com/Linux/2012-12/76490.htm
Spark 的详细介绍:请点这里
Spark 的下载地址:请点这里
本文永久更新链接地址:http://www.linuxidc.com/Linux/2016-01/127901.htm