Hive cli配置文件 ~/.hiverc
set hive.cli.print.current.db=true; # 在cli里显示当前db名
set hive.cli.print.header=true # 输出表头
set hive.exec.mode.local.auto=true; # 启动本地模式,尽量不启动map reduce
set hive.auto.convert.join=true; # 启动map-side JOIN
set hive.mapjoin.smalltable.filesize=25000000; # map-side JOIN表需要小于25M
Hive cli里可以直接执行dfs命令。比如 dfs -ls /;
Hive数据库所在目录由属性 hive.metastore.warehouse.dir
指定,数据库中的表会以子目录的形式存储。默认位置在 /user/hive/warehouse
,当此时创建数据库financials,则Hive相应的创建目录/user/hive/warehouse/financials.db
。
4. HiveQL数据定义
查看表结构详细信息:
DESC FORMATTED <table_name>;
基本数据类型有:INT
, FLOAT
, DOUBLE
, STRING
, TIMESTAMP
, BINARY
。
Hive新增了集合数据类型:STRUCT
, ARRAY
, MAP
。
ARRAY
类型可以使用lateral view explode展开成类似普通的表来查询。
建表示例:
CREATE TABLE employees (
name STRING,
salary FLOAT,
subordinates ARRAY<STRING>,
deductions MAP<STRING, FLOAT>,
address STRUCT<street:STRING, city:STRING, state:STRING, zip:INT>
)
PARTITIONED BY (country STRING, state STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '\001'
COLLECTION ITEMS TERMINATED BY '\002'
MAP KEYS TERMINATED BY '\003'
LINES TERMINATED BY '\n'
STORED AS TEXTFILE;
除了存储为TEXTFILE格式,也可以使用 SEQUENCEFILE、RCFILE和ORCFILE,后面几种使用二进制编码和压缩来优化IO性能。
【注】hive使用InputFormat
对象将输入流分割为记录,使用OutputFormat
对象来将记录格式化为输出流;再使用SerDe在读数据时将记录解析成列,在写数据时将列编码成记录。
CREATE TABLE php_svr_log
PARTITIONED BY (ds STRING, hr STRING)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe'
STORED AS
INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat'
TBLPROPERTIES ('avro.schema.url'='http://10.163.104.13/datawh/php_svr_log.avsc');
修改表的schema定义文件:
ALTER TABLE php_svr_log SET TBLPROPERTIES ('avro.schema.url'='hdfs://Tyrael/user/work/schema/php_svr_log.avsc');
使用STRICT模式,可以避免全分区查询: set hive.mapred.mode=strict
通常管理大规模数据会使用外部表:
CREATE EXTERNAL TABLE IF NOT EXISTS log_messages (
hms INT,
severity STRING,
server STRING,
process_id INT,
message STRING
)
PARTITIONED BY (year INT, month INT, day INT)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE
LOCATION "/user/work/log_messages";
对外部表可以单独增加分区及其对应的文件:
ALTER TABLE log_messages ADD IF NOT EXISTS
PARTITION (year=2012, month=1, day=2) LOCATION 'hdfs://data/log_messages/2012/01/02';
删除这个分区:
ALTER TABLE log_messages DROP IF EXISTS
PARTITION (year=2012, month=1, day=2);
或者可以利用MSCK
命令直接修复表分区:
MSCK REPAIR TABLE log_messages;
显示Hive表和分区定义的语句:
SHOW TABLE EXTENDED LIKE table_name PARTITION (dt=20131023);
SHOW CREATE TABLE table_name;
SHOW PARTITIONS table_name;
DESCRIBE EXTENDED table_name;
show table extended like 'tbl_name' partition (dt='20131023');
5. HiveQL数据管理
从本地载入数据(如果不使用OVERWRITE,则只复制到原目录,但不清空原目录)
LOAD DATA LOCAL INPATH '${env:HOME}/california-employees'
OVERWRITE INTO TABLE employees
PARTITION (country='US', state='CA');
通过查询语句插入数据(动态分区方法)
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions.pernode=1000;
INSERT OVERWRITE TABLE employees
PARTITION (country, state)
SELECT ..., se.cty, se.st # SELECT最后两位原表字段作为分区字段
FROM staged_employees se;
导出数据(转换成需要的格式)
INSERT OVERWRITE LOCAL DIRECTORY '/tmp/employees_out'
SELECT name, salary, address
FROM employees
WHERE se.state='CA';
6. HiveQL查询
集合类型的输出是JSON格式,引用集合元素的方法如下:
SELECT name, subordinates[0] from employees;
SELECT name, deductions["State Taxes"] FROM employees;
SELECT name, address.city FROM employees;
比较特别的一些聚合函数:
percentile(BIGINT int_expr, p) # 返回int_expr在p[0,1]处对应的百分比
histogram_numric(col, NB) # 返回NB数量的直方图仓库数组
可以设置参数提高聚合性能:
set hive.map.aggr=true;
表生成函数:可以把一列数据扩展成多列或多行。
explode(ARRAY array) # 返回0到多行,每行对应array数组里的一个元素
parse_url_tuple(url, param1, ..., paramN) # 从url里提取各部分信息,产生N列
stack(INT n, col1, ..., colM) # 把M列转换成N行,每行有M/N个字段
更多函数用法参考 6.1.4
RLIKE
子句是HiveQL的扩展,可以使用Java正则表达式来查询。
JOIN子句
【注】Hive里的JOIN不支持非等值连接和OR连接。比如 table_a JOIN table_b ON a!=b
这种是非法的。
JOIN优化
- 按数据量的大小JOIN,把最大的表放到最右边。
- map-side JOIN,可以把较小的表直接在mapper端JOIN。
大数据下尽量不要ORDER BY
,全局排序很耗时。
SORT BY
只在每个reducer上局部排序,常搭配DISTRIBUTE BY
使用,后者可以让指定列的数据发送到同一个reducer处理。
如果SORT BY
和DISTRIBUTE BY
的字段相同且使用默认升序排列,则可以简写为CLUSTER BY
。此时会等效于ORDER BY
,输出全局有序的结果,但此时实际执行的SORT BY
将无法并行化。
抽样查询?
SELECT * FROM numbersflat TABLESAMPLE(0.1 PERCENT) s;
设置reducer的数量:
set mapred.reduce.tasks=50
set hive.exec.reducers.max=50
7. HiveQL视图
当在视图上查询时,Hive会先解析视图,然后使用解析结果再来解析整个查询语句;通常查询语句和视图语句会被优化成为一条单一的查询语句。
视图的一种用法是:创建一个视图开放给受权限限制的用户,这样可以方便的提供给他们脱敏数据的查询 :)
13. 函数
DESC FUNCTION EXTENDED concat;
示例,大写字母转换1:
package org.hue.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
public final class MyUpper extends UDF {
public Text evaluate(final Text s) {
if (s == null) { return null; }
return new Text(s.toString().toUpperCase());
}
}
【注】必须实现evaluate()
函数,且其参数和输出必须是Hive可以序列化的数据类型。
然后,在hive里添加这个UDF:
ADD JAR /local/path/to/myupper.jar
CREATE TEMPORARY FUNCTION myupper AS 'org.hue.udf.MyUpper';
派生GenericUDF
GenericUDF需要override三个函数,这里有个例子:
@Description(name = "Long2Ip",
value = "_FUNC_(iplong) - return IP address from long format",
extended = "Example:\n"
+ "> SELECT _FUNC_(16843009) FROM table\n"
+ "> 1.1.1.1\n")
public class Long2IpUDF extends GenericUDF {
// 一般会声明converter作为成员变量,在initialize()里初始化,然后在evaluate()里用来对输入数据做类型转换。 private ObjectInspectorConverters.Converter converter;
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
// 判断输入参数的个数和类型,返回输出的类型。只在每个JVM初始化UDF时调用。 // 1) 判断输入参数个数 if (arguments.length != 1) {
throw new UDFArgumentLengthException("_FUNC_ expects only 1 argument.");
}
// 2) 判断输入参数类型 ObjectInspector argument = arguments[0];
if (argument.getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentTypeException(0,
"A primitive argument was expected but an argument of type " + argument.getTypeName()
+ " was given.");
}
PrimitiveObjectInspector.PrimitiveCategory primitiveCategory = ((PrimitiveObjectInspector) argument)
.getPrimitiveCategory();
if (primitiveCategory != PrimitiveObjectInspector.PrimitiveCategory.LONG) {
throw new UDFArgumentTypeException(0,
"A long argument was expected but an argument of type " + argument.getTypeName()
+ " was given.");
}
// 3) 生成conveter供evaluate()使用 converter = ObjectInspectorConverters.getConverter(argument, PrimitiveObjectInspectorFactory.writableLongObjectInspector);
// 4) 返回输出的数据类型 return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
}
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {
// 逐行处理数据时调用 // 1) 判断输入数据是否合法 assert (arguments.length == 1);
if (arguments[0].get() == null) return null;
// 2) 使用conveter把输入数据转换成相应的Java数据类型 LongWritable iplong = (LongWritable) converter.convert(arguments[0].get());
// 3) 具体的UDF实现 long ip = iplong.get();
Text t = new Text(InetAddrHelper.longToIP(ip));
// 4) 输出 return t;
}
@Override
public String getDisplayString(String[] children) {
// 用于EXPLAIN时候的显示 return children[0];
}
}
14. Streaming
InputFormat, OutputFormat, SerDe
主要使用TRANSFORM
函数,可以把脚本先部署到分布式缓存,然后再调用。
示例,可以实现Java UDF的功能:
ADD FILE /path/to/ctof.sh;
SELECT TRANSFORM(col1) USING ‘ctof.sh’ AS convert FROM a;
实际使用中应该注意数据的分发和排序,利用CLUSTER BY
、DISTRIBUTE BY
或SORT BY
,避免数据集中到单点的map或reduce上。
示例,统计词频:
CREATE TABLE doc (line STRING);
CREATE TABLE word_count (word STRING, count INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘\t’;
FROM (
FROM doc
SELECT TRANSFORM (line) USING ‘/path/to/mapper.py'
AS word, count
CLUSTER BY word) wc
INSERT OVERWRITE TABLE word_count
SELECT TRANSFORM (wc.word, wc.count) USING ‘/path/to/reducer.py'
AS word, count;
15. 自定义Hive文件格式
RCFile
打开方法:hive --service rcfilecat /user/hive/warehouse/columntable/000000_0
SerDe让Hive可以处理各种格式数据:
- ORCFile 对RCFile的优化 Hive 0.11之后内置的SerDe
- RegexSerDe 可以直接通过正则表达式处理标准格式的web日志
- JsonSerDe 可以直接处理JSON格式数据
- AvroSerDe 可以通过avrò schema定义hive表结构(注意schema evolution的情况)
-
https://cwiki.apache.org/confluence/display/Hive/HivePlugins ↩