Compaction in Hive

Image result for hive

This article centers around covering how to utilize compaction effectively to counter the small file problem in HDFS.

Small File Problem

HDFS is not suitable to work with small files. In HDFS a file is considered smaller, if it is significantly smaller than the HDFS default block size (I.e. 128mb).

To make HDFS faster all file names and block addresses are stored in Namenode memory. In this way, that implies, if there is any modification done in the file system or reading a file location, all of these can be served without any disk I/OS.

But this design approach to put away all metadata in memory has its own tradeoffs. One of them is that for every file, directory and block in HDFS we need to spend around 150 bytes of namenode’s memory. So, If there are millions of files, then the amount of RAM that needs to be reserved by the Namenode becomes large.

Besides, HDFS is not designed to geared up to efficiently accessing small files. It is designed to work with a small number of large files rather than working with large number of small files. Reading through small files normally causes lots of disk seeks which mitigates the performance.

Compaction to the rescue

Compaction can be used to counter small file problems by consolidating small files. This article will walk you through small file problems in Hive and how compaction can be applied on both transactional and non-transactional hive tables to overcome small files problem.

In Hive small files are normally created when any one of the accompanying scenario happen.

  • Number of files in a partition will be increased as frequent updates are made on the hive table.
  • Chances are high to create more number of small files (i.e. Size lesser than default HDFS block size) when the number of reducers utilized is on the higher side.
  • Since there is a growing demand to query the streaming data in near real time (within 5 to 10 minutes). So the streaming data needs to be ingested into a Hive table with short interim of time, which will eventually result in lots of small files and this should be addressed in the design.

For demonstration, let’s use the following hive query to create an ‘orders’ table and then apply a compaction algorithm over it.


CREATE TABLE IF NOT EXISTS orders (id bigint,
customer_id string,
customer_name string,
product_id int,
product_name string,
product_price decimal(12,2),
quantity int,
src_update_ts timestamp,
src_file string,
ingestion_ts timestamp)
PARTITIONED BY (order_date date)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
LOCATION '/user/dks/datalake/orders';

The Hive table is partitioned by date and stored in the form of JSON. As this table is partitioned by date, for 5 years of data with Avg 20 files per partition, then possibly we will end up with 5* 365 * 20 = 36,500 files. Having a huge number of files may lead to performance bottlenecks.

So, let’s run a compaction algorithm out of sight periodically to combine files whenever a particular partition reaches a certain number of files, in our case it is 5.

Approach to enable compaction on Non-Transactional tables

  • Find out the list of all partitions which holds more than 5 files, this can be done by using the hive virtual column ‘input__file__name’.
  • Set the reducer size to define approximate file size.
  • Execute insert overwrite on the partitions which exceeded file threshold count, in our case which is 5.

The hive configurations to use,

set hive.support.quoted.identifiers=none;
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.reducers.bytes.per.reducer=268435456; --256MB reducer size.

Use the accompanying hive query to perform compaction.

with partition_list as
(select order_date, count(distinct input__file__name) cnt from orders
group by order_date  having cnt > 5)
insert overwrite table orders partition (order_date)
select * from orders
where order_date  in (select order_date from partition_list)

Approach to enable compaction on Transactional tables

I have already covered about Transactional tables in one of my past articles, so please examine the same here, before proceeding further.

Given the need to apply frequent updates on the ACID enabled table, the hive can generate a large number of small files. Unlike a regular Hive table, ACID table handles compaction automatically. All it needs is some table properties to enable auto compaction.

“compactor.mapreduce.map.memory.mb” : specify compaction map job properties

“compactorthreshold.hive.compactor.delta.num.threshold: Trigger minor compaction threshold

“compactorthreshold.hive.compactor.delta.pct.threshold”: Threshold when to trigger major compaction

CREATE TABLE IF NOT EXISTS orders (id bigint,
customer_id string,
customer_name string,
product_id int,
product_name string,
product_price decimal(12,2),
quantity int,
src_update_ts timestamp,
src_file string,
ingestion_ts timestamp)PARTITIONED BY (order_date date)
CLUSTERED BY (id) INTO 10 BUCKETS STORED AS ORC
LOCATION '/user/dks/datalake/orders_acid';
TBLPROPERTIES ("transactional"="true",
"compactor.mapreduce.map.memory.mb"="3072",     -- specify compaction map job properties
"compactorthreshold.hive.compactor.delta.num.threshold"="20",  -- trigger minor compaction if there are more than 20 delta directories
"compactorthreshold.hive.compactor.delta.pct.threshold"="0.5" -- trigger major compaction if the ratio of size of delta files to          -- size of base files is greater than 50%);

Conclusion

Thus, compaction can be utilized effectively to build a robust data pipeline, which will ensure there will be of no small files.

Comments

Leave a Reply

XHTML: You can use these tags: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>