《Hive编程指南》读书笔记

Hive cli配置文件 ~/.hiverc

set hive.cli.print.current.db=true;             # 在cli里显示当前db名
set hive.cli.print.header=true                  # 输出表头
set hive.exec.mode.local.auto=true;             # 启动本地模式,尽量不启动map reduce
set hive.auto.convert.join=true;                # 启动map-side JOIN
set hive.mapjoin.smalltable.filesize=25000000;  # map-side JOIN表需要小于25M

Hive cli里可以直接执行dfs命令。比如 dfs -ls /;

Hive数据库所在目录由属性 hive.metastore.warehouse.dir 指定,数据库中的表会以子目录的形式存储。默认位置在 /user/hive/warehouse,当此时创建数据库financials,则Hive相应的创建目录/user/hive/warehouse/financials.db

4. HiveQL数据定义

查看表结构详细信息:

DESC FORMATTED <table_name>;

基本数据类型有:INT, FLOAT, DOUBLE, STRING, TIMESTAMP, BINARY

Hive新增了集合数据类型:STRUCT, ARRAY, MAP

ARRAY类型可以使用lateral view explode展开成类似普通的表来查询。

建表示例:

CREATE TABLE employees (
  name   STRING,
  salary FLOAT,
  subordinates ARRAY<STRING>,
  deductions   MAP<STRING, FLOAT>,
  address      STRUCT<street:STRING, city:STRING, state:STRING, zip:INT>
)
PARTITIONED BY (country STRING, state STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\001'
COLLECTION ITEMS TERMINATED BY '\002'
MAP KEYS TERMINATED BY '\003'
LINES TERMINATED BY '\n'
STORED AS TEXTFILE;

除了存储为TEXTFILE格式,也可以使用 SEQUENCEFILE、RCFILE和ORCFILE,后面几种使用二进制编码和压缩来优化IO性能。

【注】hive使用InputFormat对象将输入流分割为记录,使用OutputFormat对象来将记录格式化为输出流;再使用SerDe在读数据时将记录解析成列,在写数据时将列编码成记录。

CREATE TABLE php_svr_log
PARTITIONED BY (ds STRING, hr STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
TBLPROPERTIES ('avro.schema.url'='http://10.163.104.13/datawh/php_svr_log.avsc');

修改表的schema定义文件:

ALTER TABLE php_svr_log SET TBLPROPERTIES ('avro.schema.url'='hdfs://Tyrael/user/work/schema/php_svr_log.avsc');

使用STRICT模式,可以避免全分区查询: set hive.mapred.mode=strict

通常管理大规模数据会使用外部表:

CREATE EXTERNAL TABLE IF NOT EXISTS log_messages (
  hms  INT,
  severity STRING,
  server   STRING,
  process_id INT,
  message    STRING
)
PARTITIONED BY (year INT, month INT, day INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION "/user/work/log_messages";

对外部表可以单独增加分区及其对应的文件:

ALTER TABLE log_messages ADD IF NOT EXISTS
PARTITION (year=2012, month=1, day=2) LOCATION 'hdfs://data/log_messages/2012/01/02';

删除这个分区:

ALTER TABLE log_messages DROP IF EXISTS
PARTITION (year=2012, month=1, day=2);

或者可以利用MSCK命令直接修复表分区:

MSCK REPAIR TABLE log_messages;

显示Hive表和分区定义的语句:

SHOW TABLE EXTENDED LIKE table_name PARTITION (dt=20131023);
SHOW CREATE TABLE table_name;
SHOW PARTITIONS table_name;
DESCRIBE EXTENDED table_name;
show table extended like 'tbl_name' partition (dt='20131023');

5. HiveQL数据管理

从本地载入数据(如果不使用OVERWRITE,则只复制到原目录,但不清空原目录)

LOAD DATA LOCAL INPATH '${env:HOME}/california-employees'
OVERWRITE INTO TABLE employees
PARTITION (country='US', state='CA');

通过查询语句插入数据(动态分区方法)

set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions.pernode=1000;

INSERT OVERWRITE TABLE employees
PARTITION (country, state)
SELECT ..., se.cty, se.st                  # SELECT最后两位原表字段作为分区字段
FROM staged_employees se;

导出数据(转换成需要的格式)

INSERT OVERWRITE LOCAL DIRECTORY '/tmp/employees_out'
SELECT name, salary, address
FROM employees
WHERE se.state='CA';

6. HiveQL查询

集合类型的输出是JSON格式,引用集合元素的方法如下:

SELECT name, subordinates[0] from employees;
SELECT name, deductions["State Taxes"] FROM employees;
SELECT name, address.city FROM employees;

比较特别的一些聚合函数:

percentile(BIGINT int_expr, p)      # 返回int_expr在p[0,1]处对应的百分比
histogram_numric(col, NB)    # 返回NB数量的直方图仓库数组

可以设置参数提高聚合性能:

set hive.map.aggr=true;

表生成函数:可以把一列数据扩展成多列或多行。

explode(ARRAY array)         # 返回0到多行,每行对应array数组里的一个元素
parse_url_tuple(url, param1, ..., paramN)   # 从url里提取各部分信息,产生N列
stack(INT n, col1, ..., colM)       # 把M列转换成N行,每行有M/N个字段

更多函数用法参考 6.1.4

RLIKE子句是HiveQL的扩展,可以使用Java正则表达式来查询。

JOIN子句

【注】Hive里的JOIN不支持非等值连接和OR连接。比如 table_a JOIN table_b ON a!=b 这种是非法的。

JOIN优化

  • 按数据量的大小JOIN,把最大的表放到最右边。
  • map-side JOIN,可以把较小的表直接在mapper端JOIN。

大数据下尽量不要ORDER BY,全局排序很耗时。

SORT BY只在每个reducer上局部排序,常搭配DISTRIBUTE BY使用,后者可以让指定列的数据发送到同一个reducer处理。

如果SORT BYDISTRIBUTE BY的字段相同且使用默认升序排列,则可以简写为CLUSTER BY。此时会等效于ORDER BY,输出全局有序的结果,但此时实际执行的SORT BY将无法并行化。

抽样查询?

SELECT * FROM numbersflat TABLESAMPLE(0.1 PERCENT) s;

设置reducer的数量:

set mapred.reduce.tasks=50
set hive.exec.reducers.max=50

7. HiveQL视图

当在视图上查询时,Hive会先解析视图,然后使用解析结果再来解析整个查询语句;通常查询语句和视图语句会被优化成为一条单一的查询语句。

视图的一种用法是:创建一个视图开放给受权限限制的用户,这样可以方便的提供给他们脱敏数据的查询 :)

13. 函数

DESC FUNCTION EXTENDED concat;
  • UDF 用户自定义函数
  • UDAF 用户自定义聚合函数
  • UDTF 用户自定义表生成函数

示例,大写字母转换1

package org.hue.udf;

import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;

public final class MyUpper extends UDF {
    public Text evaluate(final Text s) {
        if (s == null) { return null; }
        return new Text(s.toString().toUpperCase());
    }
}

【注】必须实现evaluate()函数,且其参数和输出必须是Hive可以序列化的数据类型。

然后,在hive里添加这个UDF:

ADD JAR /local/path/to/myupper.jar
CREATE TEMPORARY FUNCTION myupper AS 'org.hue.udf.MyUpper';

派生GenericUDF

GenericUDF需要override三个函数,这里有个例子

@Description(name = "Long2Ip",
             value = "_FUNC_(iplong) - return IP address from long format",
             extended = "Example:\n"
             + "> SELECT _FUNC_(16843009) FROM table\n"
             + "> 1.1.1.1\n")
public class Long2IpUDF extends GenericUDF {
  // 一般会声明converter作为成员变量,在initialize()里初始化,然后在evaluate()里用来对输入数据做类型转换。  private ObjectInspectorConverters.Converter converter;
  @Override
  public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
    // 判断输入参数的个数和类型,返回输出的类型。只在每个JVM初始化UDF时调用。    // 1) 判断输入参数个数    if (arguments.length != 1) {
      throw new UDFArgumentLengthException("_FUNC_ expects only 1 argument.");
    }
    // 2) 判断输入参数类型    ObjectInspector argument = arguments[0];
    if (argument.getCategory() != ObjectInspector.Category.PRIMITIVE) {
      throw new UDFArgumentTypeException(0,
              "A primitive argument was expected but an argument of type " + argument.getTypeName()
              + " was given.");
    }
    PrimitiveObjectInspector.PrimitiveCategory primitiveCategory = ((PrimitiveObjectInspector) argument)
        .getPrimitiveCategory();

    if (primitiveCategory != PrimitiveObjectInspector.PrimitiveCategory.LONG) {
      throw new UDFArgumentTypeException(0,
              "A long argument was expected but an argument of type " + argument.getTypeName()
              + " was given.");
    }
    // 3) 生成conveter供evaluate()使用    converter = ObjectInspectorConverters.getConverter(argument, PrimitiveObjectInspectorFactory.writableLongObjectInspector);
    // 4) 返回输出的数据类型    return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
  }
  @Override
  public Object evaluate(DeferredObject[] arguments) throws HiveException {
    // 逐行处理数据时调用    // 1) 判断输入数据是否合法    assert (arguments.length == 1);
    if (arguments[0].get() == null) return null;
    // 2) 使用conveter把输入数据转换成相应的Java数据类型    LongWritable iplong = (LongWritable) converter.convert(arguments[0].get());
    // 3) 具体的UDF实现    long ip = iplong.get();
    Text t = new Text(InetAddrHelper.longToIP(ip));
    // 4) 输出    return t;
  }
  @Override
  public String getDisplayString(String[] children) {
    // 用于EXPLAIN时候的显示    return children[0];
  }
}

14. Streaming

InputFormat, OutputFormat, SerDe

主要使用TRANSFORM函数,可以把脚本先部署到分布式缓存,然后再调用。

示例,可以实现Java UDF的功能:

ADD FILE /path/to/ctof.sh;
SELECT TRANSFORM(col1) USING ‘ctof.sh’ AS convert FROM a;

实际使用中应该注意数据的分发和排序,利用CLUSTER BYDISTRIBUTE BYSORT BY,避免数据集中到单点的map或reduce上。

示例,统计词频:

CREATE TABLE doc (line STRING);
CREATE TABLE word_count (word STRING, count INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘\t’;
FROM (
    FROM doc
    SELECT TRANSFORM (line) USING ‘/path/to/mapper.py'
    AS word, count
    CLUSTER BY word) wc
INSERT OVERWRITE TABLE word_count
SELECT TRANSFORM (wc.word, wc.count) USING ‘/path/to/reducer.py'
AS word, count;

15. 自定义Hive文件格式

RCFile

打开方法:hive --service rcfilecat /user/hive/warehouse/columntable/000000_0

SerDe让Hive可以处理各种格式数据:

  • ORCFile 对RCFile的优化 Hive 0.11之后内置的SerDe
  • RegexSerDe 可以直接通过正则表达式处理标准格式的web日志
  • JsonSerDe 可以直接处理JSON格式数据
  • AvroSerDe 可以通过avrò schema定义hive表结构(注意schema evolution的情况)
  1. https://cwiki.apache.org/confluence/display/Hive/HivePlugins