Hive SQL 查询优化大总结
执行引擎选择
Hive支持多种执行引擎,分别是 MapReduce、Tez、Spark。可以通过hive-site.xml文件中的 hive.execution.engine 属性配置。
- 会对磁盘进行多次的读写操作,如果任务数量很多更占用资源。
- Tez将Map和Reduce两个操作进一步拆分,即Map被拆分成Input、Processor、Sort、Merge、Output, Reduce被拆分成Input、Shuffle、Sort、Merge、Processor、Output等依赖DAG,中间结果在内存中,只写一次磁盘。
- Spark 将Map和Reduce生成DAG,划分Stage,将中间结果保存内存,只写一次磁盘。
# 选择执行引擎,前提是要提前设置好这三种引擎配置
set hive.execution.engine=mr;
set hive.execution.engine=tez;
set hive.execution.engine=spark;
# 设置 yarn 队列
set mapreduce.job.queuename=default;
建表优化
使用分区表
- 通常情况下按照dt、hour分区
选择合适的文件存储格式 和 压缩格式
- ORC File 数据按行分块,每块按照列存储,会基于列创建索引,数据可以压缩存储,压缩快,支持三种ZLIB、SNAPPY、NONE(不压缩),默认采用的是 ZLIB 压缩。
- Parquet File 列式存储,对于大型查询高效,一般使用Snappy、Gzip压缩,默认为 Snappy。
CREATE EXTERNAL TABLE test.student (
id INT,
name STRING,
age INT
)
PARTITIONED BY (dt STRING, hour STRING)
STORED AS ORC tblproperties ("orc.compress"="SNAPPY");
压缩优化
选择好的压缩方式,对MapReduce任务的执行有很大的帮助
- 对Hive输出结果和中间都进行压缩
-- 默认值是 false
set hive.exec.compress.output=true
-- 默认值是 false
set hive.exec.compress.intermediate=true
- Mapper 输出结果压缩
-- 默认 false
set mapreduce.map.output.compress=true
-- 默认 org.apache.hadoop.io.compress.DefaultCodec
set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.GzipCodec
- MapReduce输出压缩
-- 默认值是 false
set mapreduce.output.fileoutputformat.compress=true
-- 默认值是 Record
set mapreduce.output.fileoutputformat.compress.type=BLOCK
-- 默认值是 org.apache.hadoop.io.compress.DefaultCodec
set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec
行列裁剪
- 列:在查询时只读取需要的列(select field from .....)
- 行:在查询时提前过滤掉不需要的行(select field from table where ......)
分区裁剪
- 加载只需要的分区(where dt=?)
合并小文件
-- 合并文件的大小,默认256M
set hive.merge.size.per.task=256000000;
-- 当输出文件的平均大小小于该值时,启动一个独立的MapReduce任务进行文件merge,默认值为 16000000
hive.merge.smallfiles.avgsize=16000000;
- 数据源文件合并(减少mapper数)
-- 默认 CombineHiveInputFormat
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
- MapTask 结束时合并文件
-- 默认 true
SET hive.merge.mapfiles=true;
- ReduceTask 结束时合并文件
# mapreduce 任务结束时合并小文件,默认 false
SET hive.merge.mapredfiles=true;
拆分大文件
- 拆分大文件(增加mapper数)
-- 大文件拆分大小,默认和HDFS的block块大小相同 256M
set mapred.max.split.size=256000000
CBO优化(略)
全称 Cost based Optimizer 成本优化器,Hive的CBO通过查询成本(由analyze收集的统计信息)会生成有效率的执行计划,最终会减少执行的时间和资源的利用,并选择合适的 Join 算法。下面是主要的参数。
set hive.cbo.enable=true;
set hive.compute.query.using.stats=true;
set hive.stats.fetch.column.stats=true;
set hive.stats.fetch.bitvector=true;
Fetch抓取(略)
简单的查询模式时不必要使用MapReduce来计算 SELECT field FROM employees WHERE filter
-- 设置简单查询的拉取方式,默认 more
-- none 不使用拉取的方式,使用 MapReduce 方式。
-- minimal 只有这三种场景下才启用 select * 、在分区字段上where过滤、有limit。
-- more 在select、where筛选、limit时,都会启用直接抓取方式。
set hive.fetch.task.conversion=more
本地执行(略)
在 Hive 查询处理的数据量比较小的时候可以开启本地执行模式,待测试。
--打开 hive 自动判断是否启动本地模式的开关
set hive.exec.mode.local.auto=true;
-- local 模式的 map 任务数最大值,超过这个最大值后不开启 local 模式
set hive.exec.mode.local.auto.input.files.max=4;
-- local 模式的 map 输入文件最大大小,超过这个最大值后不开启 local 模式
set hive.exec.mode.local.auto.inputbytes.max=134217728;
JVM重用(略)
默认情况下,MapReduce 中一个 MapTask 或者 ReduceTask 就会启动一个JVM 进程,一个 Task 执行完毕后,JVM 进程就会退出,因此可以考虑重用JVM进程,如下参数设置重用的进程数。缺点:当前的Job会一直占用这5个资源,其他的Job无法使用。
set mapred.job.reuse.jvm.num.tasks=5;
并行执行
一个 Hive SQL 语句可能会转为多个 MapReduce Job,每一个 job 就是一个 stage,这些 Job 顺序执行,这个在 cli 的运行日志中也可以看到。但是有时候这些任务之间并不是是相互依赖的,如果集群资源允许的话,可以让多个并不相互依赖 stage 并发执行。
-- 可以开启并发执行,默认为false
set hive.exec.parallel=true;
--同一个sql允许最大并行度,默认为8
set hive.exec.parallel.thread.number=16;
Mapper端配置
-- 设置 MapTask 的数据,待测试验证。
set mapred.map.tasks=2;
-- 设置MapReduce使用内存,可适当调整该参数
-- 或者修改配置文件的默认值(etc/hadoop/mapred-site.xml)
set mapreduce.map.memory.mb=10240;
-- map端的Combiner,将map端的数据提前reduce,然后再等待 reduce 来拉取。默认为ture
set hive.map.aggr=true;
-- 采样聚合的条数
set hive.groupby.mapaggr.checkinterval=100000;
-- 采样聚合后的比例阈值
set hive.map.aggr.hash.min.reduction=0.5
-- 预先取100000(10万)条数据聚合,如果聚合后的条数/100000>0.5,则不会聚合处理
-- 即在map端的conbiner处理效果不好,没有必要在map聚合了。
Reducer端配置
Reducer的配置根据实际情况修改
-- 设置MapReduce使用内存,可适当调整该参数
-- 或者修改配置文件的默认值(etc/hadoop/mapred-site.xml)
set mapreduce.reduce.memory.mb=10240;
-- 单个 reducer 处理的数据大小
set hive.exec.reducers.bytes.per.reducer=<number>
-- 最大的 reducer 个数
set hive.exec.reducers.max=<number>
-- reducer 固定个数
set mapreduce.job.reduces=<number>
CountDistinct优化
count distinct 使用 count group by来代替。hive计算count distinct依赖于内存(hash类型的数据结构),是把map端所有的输入放到一个(或多个)reduce任务,在内存里进行去重,然后进行计数。group by 可以依赖 set hive.groupby.skewindata=true 参数来解决数据倾斜问题。
SELECT a, COUNT(DISTINCT b) FROM t GROUP BY a;
可以优化为如下SQL
SELECT a, count(1)
FROM (
SELECT a, b FROM t GROUP BY a, b
) t1
GROUP BY a;
大小表JOIN优化
- 只要其中一个表满足 hive.mapjoin.smalltable.filesize 要求,默认25M字节,会自动选择小表作为Hash表,采用MapJoin的方式。
-- 开启 MapJoin 的自动转换
set hive.auto.convert.join=true;
-- 小表判断的依据,默认25M
set hive.mapjoin.smalltable.filesize=25000000;
JOIN数据倾斜
-- 开启join倾斜优化,默认false
set hive.optimize.skewjoin=true;
-- 在join时会将计数超过阈值的倾斜key对应的行临时写进文件,
-- 然后再启动另一个job做 mapJoin 生成结果,默认 100000
set hive.skewjoin.key=100000;
-- 控制第二个 job 的 mapper 数量,默认10000
set hive.skewjoin.mapjoin.map.tasks=10000
GROUPBY数据倾斜
-- group by时自动解决数据倾斜
set hive.groupby.skewindata=true;
开启数据倾斜自动优化后,此时生成的查询计划会有两个MRJob。
- 第一个MRJob 中,Map的输出结果集合会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的GroupBy Key有可能被分发到不同的Reduce中,从而达到负载均衡的目的;
- 第二个MRJob再根据预处理的数据结果按照GroupBy Key分布到Reduce中,这个过程可以保证相同的GroupBy Key被分布到同一个Reduce中,最后完成最终的聚合操作。