此文章是vip文章,如何查看?  

1,点击链接获取密钥 http://nicethemes.cn/product/view29882.html

2,在下方输入文章查看密钥即可立即查看当前vip文章


Hadoop的数据分析引擎:Pig

  • 时间:
  • 浏览:
  • 来源:互联网
一、什么是Pig?安装和配置
1、最早由Yahoo开发,后来给Apache
2、支持语句PigLatin语句,类似SQL
3、翻译器:PigLatin语句 ----> MapReduce
                              Spark(从0.17开始支持)
4、安装和配置
    tar -zxvf pig-0.17.0.tar.gz -C ~/training/
设置环境变量
PIG_HOME=/root/training/pig-0.17.0
export PIG_HOME
PATH=$PIG_HOME/bin:$PATH
export PATH
5、两种模式
(1)本地模式:操作的是Linux的文件
pig -x local
日志: Connecting to hadoop file system at: file:///
(2)集群模式:链接到HDFS上
设置一个环境变量: PIG_CLASSPATH  ----> Hadoop的配置文件所在的目录
PIG_CLASSPATH=$HADOOP_HOME/etc/hadoop
export PIG_CLASSPATH
pig
日志:Connecting to hadoop file system at: hdfs://bigdata111:9000
二、Pig的常用命令:操作HDFS
ls、cd、cat、mkdir、pwd
copyFromLocal(上传)、copyToLocal(下载)
sh: 调用操作系统的命令
register、define -----> 部署pig的自定义函数的jar包
三、Pig的数据模型(重要) ----> 表结构
  Tuple: 行  Pig

             Apache Storm是实时计算引擎


四、使用PigLatin语句分析数据
1、需要启动Yarn的HistoryServer:记录所有执行过的任务
mr-jobhistory-daemon.sh start historyserver
URL:  http://ip:19888/jobhistory
2、常见的PigLatin语句(注意:PigLatin语句跟Spark中算子/方法非常像)
(*)load    加载数据到表(bag)
(*)foreach   相当于循环,对bag中每一个行tuple进行处理
(*)filter    过滤、相当于where
(*)group by  分组
(*)order by  排序
(*)join      链接
(*)generate  提取列
(*)union、intersect  集合运算
(*)输出:dump  屏幕
           store  保存到文件
   
注意:PigLatin有些会触发计算,有些不会
      类似Spark RDD 算子(方法):Transformation方法(算子) -----> 不会触发计算
                              Action方法(算子) ------> 会触发Spark的计算
3、使用PigLatin语句
7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30

(1)创建员工表  emp
     emp = load '/scott/emp.csv';
查看表结构
describe emp;
Schema for emp unknown
(2)创建表,指定schema(结构)
     emp = load '/scott/emp.csv' as(empno,ename,job,mgr,hiredate,sal,comm,deptno);
默认的列的类型:bytearray
默认的分隔符:制表符
 
最终版:
emp = load '/scott/emp.csv' using PigStorage(',') as(empno:int,ename:chararray,job:chararray,mgr:int,hiredate:chararray,sal:int,comm:int,deptno:int);

再创建部门表
10,ACCOUNTING,NEW YORK
dept = load '/scott/dept.csv' using PigStorage(',') as(deptno:int,dname:chararray,loc:chararray);

(3)查询员工信息:员工号 姓名 薪水
     SQL:  select empno,ename,sal from emp;
PL:   emp3 = foreach emp generate empno,ename,sal;    ----> 不会触发计算
       dump emp3;
   
(4)查询员工信息,按照月薪排序
SQL:  select * from emp order by sal;
PL:   emp4 = order emp by sal;
      dump emp4;
  
(5)分组:求每个部门的最高工资: 部门号  部门的最高工资
SQL:  select deptno,max(sal) from emp group by deptno;
PL:   第一步:先分组
emp51 = group emp by deptno;
查看emp51的表结构
emp51: {group: int,
emp: {(empno: int,ename: chararray,job: chararray,mgr: int,hiredate: chararray,sal: int,comm: int,deptno: int)}}

dump emp51;
数据
group         emp
(10,{(7934,MILLER,CLERK,7782,1982/1/23,1300,,10),
     (7839,KING,PRESIDENT,,1981/11/17,5000,,10),
(7782,CLARK,MANAGER,7839,1981/6/9,2450,,10)})
 
(20,{(7876,ADAMS,CLERK,7788,1987/5/23,1100,,20),
     (7788,SCOTT,ANALYST,7566,1987/4/19,3000,,20),
(7369,SMITH,CLERK,7902,1980/12/17,800,,20),
(7566,JONES,MANAGER,7839,1981/4/2,2975,,20),
(7902,FORD,ANALYST,7566,1981/12/3,3000,,20)})
 
(30,{(7844,TURNER,SALESMAN,7698,1981/9/8,1500,0,30),
     (7499,ALLEN,SALESMAN,7698,1981/2/20,1600,300,30),
(7698,BLAKE,MANAGER,7839,1981/5/1,2850,,30),
(7654,MARTIN,SALESMAN,7698,1981/9/28,1250,1400,30),
(7521,WARD,SALESMAN,7698,1981/2/22,1250,500,30),
(7900,JAMES,CLERK,7698,1981/12/3,950,,30)})
  第二步:求每个组(每个部门)工资的最大值
注意:MAX大写
        emp52 = foreach emp51 generate group,MAX(emp.sal)
(6)查询10号部门的员工
      SQL:  select * from emp where deptno=10;
  PL:   emp6 = filter emp by deptno==10;   注意:两个等号
(7)多表查询:部门名称、员工姓名
      SQL:  select d.dname,e.ename from emp e,dept d where e.deptno=d.deptno;
  PL:   emp71 = join dept by deptno,emp by deptno;
        emp72 = foreach emp71 generate dept::dname,emp::ename;
(8)集合运算:
      查询10和20号部门的员工信息
SQL:   select * from emp where deptno=10
        union
select * from emp where deptno=20;

注意:Oracle中,是否任意的集合都可以参与集合运算?
                           参与集合运算的各个集合,必须列数相同、且类型一致
                           select deptno,job,sum(sal) from emp group by deptno,job
                           union 
   select deptno,to_char(null),sum(sal) from emp group by deptno
   union
   select to_number(null),to_char(null),sum(sal) from emp;

PL:  emp10 = filter emp by deptno==10;
     emp20 = filter emp by deptno==20;
emp1020 = union emp10,emp20;
 
(9)执行WordCount
① 加载数据 
mydata = load '/input/data.txt' as (line:chararray);
② 将字符串分割成单词 
words = foreach mydata generate flatten(TOKENIZE(line)) as word;
③ 对单词进行分组 
grpd = group words by word; 
④ 统计每组中单词数量 
cntd = foreach grpd generate group,COUNT(words); 
⑤ 打印结果 
dump cntd;
注意:后面的操作依赖前面的操作,类似Spark的RDD(依赖关系)
五、Pig的自定义函数:3种  ---> 打成jar包
依赖的jar
/root/training/pig-0.14.0/pig-0.14.0-core-h2.jar
/root/training/pig-0.14.0/lib
/root/training/pig-0.14.0/lib/h2
/root/training/hadoop-2.4.1/share/hadoop/common
/root/training/hadoop-2.4.1/share/hadoop/common/lib

1、自定义的运算函数: 根据员工的薪水,判断薪水的级别
2、自定义的过滤函数: 查询薪水大于2000的员工
3、自定义的加载函数(最麻烦)
还需要MR的jar包
mydemo = load '/input/data.txt' using pig.MyLoadFunction();
注册jar包
   register /root/temp/pigfunc.jar
   define 创建别名
   define myloadfunc pig.MyLoadFunction;

本文链接http://element-ui.cn/news/show-577006.aspx