共计 3438 个字符,预计需要花费 9 分钟才能阅读完成。
近日由于工作所需,需要使用到 Pig 来分析线上的搜索日志数据,本人本打算使用 hive 来分析的,但由于种种原因,没有用成,而 Pig(pig0.12-cdh)本人一直没有接触过,所以只能临阵磨枪了,花了两天时间,大致看完了 pig 官网的文档,在看文档期间,也是边实战边学习,这样以来,对 pig 的学习,会更加容易,当然本篇不是介绍如何快速学好一门框架或语言的文章,正如标题所示,本人打算介绍下如何在 Pig 中,使用用户自定义的 UDF 函数,关于学习经验,本人会在后面的文章里介绍。
一旦你学会了 UDF 的使用,就意味着,你可以以更加灵活的方式来使用 Pig,使它扩展一些为我们的业务场景定制的特殊功能,而这些功能,在通用的 pig 里是没有的,举个例子:
你从 HDFS 上读取的数据格式,如果使用默认的 PigStorage()来加载,存储可能只支持有限的数据编码和类型,如果我们定义了一种特殊的编码存储或序列化方式,那么当我们使用默认的 Pig 来加载的时候,就会发现加载不了,这时候我们的 UDF 就派上用场了,我们只需要自定义一个 LoadFunction 和一个 StoreFunction 就可以解决,这种问题。
本篇本人根据官方文档的例子,来实战一下,并在 Hadoop 集群上使用 Pig 测试通过:
我们先来看下定义一个 UDF 扩展类,需要几个步骤:
序号 | 步骤 | 说明 |
1 | 在 eclipse 里新建一个 java 工程,并导入 pig 的核心包 | java 项目 |
2 | 新建一个包,继承特定的接口或类,重写自定义部分 | 核心业务 |
3 | 编写完成后,使用 ant 打包成 jar | 编译时需要 pig 依赖,但不用把 pig 的 jar 包打入 UDF 中 |
4 | 把打包完成后的 jar 上传到 HDFS 上 | pig 运行时候需要加载使用 |
5 | 在 pig 脚本里,注册我们自定义的 udf 的 jar 包 | 注入运行时环境 |
6 | 编写我们的核心业务 pig 脚本运行 | 测试是否运行成功 |
项目工程截图如下:
核心代码如下:
package com.pigudf;
- import java.io.IOException;
- import org.apache.pig.EvalFunc;
- import org.apache.pig.data.Tuple;
- import org.apache.pig.impl.util.WrappedIOException;
- /**
- * 自定义 UDF 类, 对字符串转换大写
- * @author qindongliang
- * */
- public class MyUDF extends EvalFunc<String> {
- @Override
- public String exec(Tuple input) throws IOException {
- // 判断是否为 null 或空,就跳过
- if(input==null||input.size()==0){
- return null;
- }
- try{
- // 获取第一个元素
- String str=(String) input.get(0);
- // 转成大写返回
- return str.toUpperCase();
- }catch(Exception e){
- throw WrappedIOException.wrap(“Caught exception processing input row “,e);
- }
- }
- }
关于打包的 ant 脚本,散仙会在文末上传附件,下面看下造的一些测试数据(注意,文件一定要上传到 HDFS 上,除非你是 local 模式):
- grunt> cat s.txt
- zhang san,12
- Song,34
- long,34
- abC,12
- grunt>
我们在看下,操作文件和 jar 包是放在一起的:
- grunt> ls
- hdfs://dnode1:8020/tmp/udf/pudf.jar<r 3> 1295
- hdfs://dnode1:8020/tmp/udf/s.txt<r 3> 36
- grunt>
最后,我们看下 pig 脚本的定义:
- – 注册自定义的 jar 包
- REGISTER pudf.jar;
- – 加载测试文件的数据,逗号作为分隔符
- a = load ‘s.txt’ using PigStorage(‘,’);
- – 遍历数据,对 name 列转成大写
- b = foreach a generate com.pigudf.MyUDF((chararray)$0);
- – 启动 MapReduce 的 Job 进行数据分析
- dump b
最后,我们看下结果,只要过程不出现异常和任务失败,就证明我们的 udf 使用成功:
- Counters:
- Total records written : 4
- Total bytes written : 64
- Spillable Memory Manager spill count : 0
- Total bags proactively spilled: 0
- Total records proactively spilled: 0
- Job DAG:
- job_1419419533357_0147
- 2014-12-30 18:10:24,394 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher – Success!
- 2014-12-30 18:10:24,395 [main] INFO org.apache.hadoop.conf.Configuration.deprecation – fs.default.name is deprecated. Instead, use fs.defaultFS
- 2014-12-30 18:10:24,396 [main] INFO org.apache.pig.data.SchemaTupleBackend – Key [pig.schematuple] was not set… will not generate code.
- 2014-12-30 18:10:24,405 [main] INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat – Total input paths to process : 1
- 2014-12-30 18:10:24,405 [main] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil – Total input paths to process : 1
- (ZHANG SAN,12)
- (SONG,34)
- (LONG,34)
- (ABC,12)
结果没问题,我们的 UDF 加载执行成功,如果我们还想将我们的输出结果直接写入到 HDFS 上,可以在 pig 脚本的末尾,去掉 dump 命令,加入
store e into ‘/tmp/dongliang/result/’; 将结果存储到 HDFS 上,当然我们可以自定义存储函数,将结果写入数据库,Lucene,Hbase 等关系型或一些 NOSQL 数据库里。
Pig 的安装与测试 http://www.linuxidc.com/Linux/2014-07/104039.htm
Pig 安装与配置教程 http://www.linuxidc.com/Linux/2013-04/82785.htm
Pig 安装部署及 MapReduce 模式下测试 http://www.linuxidc.com/Linux/2013-04/82786.htm
Pig 安装及本地模式测试, 体验 http://www.linuxidc.com/Linux/2013-04/82783.htm
Pig 的安装配置与基本使用 http://www.linuxidc.com/Linux/2013-02/79928.htm
Hadoop Pig 进阶语法 http://www.linuxidc.com/Linux/2013-02/79462.htm