Hive Design Patterns
This article is a continuation of my previous article, which you can peruse here. Like the previous one this article also walks you through all the three sorts of Incremental Ingestion which should be possible on Hive tables yet just this time the tables are ACID enabled.
For demonstration purpose, we will be using the same example from the previous article. Let’s Start with creating a stage table for the same ‘orders’ data.
Step 1: Create Hive Stage Table
CREATE EXTERNAL TABLE IF NOT EXISTS orders_stg (id string,
customer_id string,
customer_name string,
product_id string,
product_name string,
product_price string,
quantity string,
order_date string,
src_update_ts string)
ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe'
LOCATION '/user/dks/datalake/orders_stg';
/user/dks/datalake/orders_stg // Location of the source data
Step 2: ACID enabled Tables in Hive
As to keep up with the consistency of database before and after transactions, certain properties are followed. ACID stands for four traits of database transactions which are,
Atomicity: A task either succeeds totally or fizzles, it doesn’t leave halfway information.
Consistency: Once an application performs an operation the results of that operations are notified to it in each subsequent activity.
Isolation: Anincomplete operation by one user/client does not cause startling reactions for different users/clients.
Durability: when an operation is finished it will be preserved even withstanding machine or system failure.
These qualities have for some time been anticipated from database systems as part of their transaction functionality.
ACID transaction table functionality is available from Hive version 0.13.
The principle motivation behind introducing ACID tables in Hive is to address following situations:
- Streaming Data Ingestion to Hive tables with frequent updates and insert data, enabling readers to get consistent view of data.
- To counter too many small files.
- Slowing changing functionality to Hive table.
- Bulk Updates using SQL Merge.
Please note that ACID tables still doesn’t bolster commit and rollback. Only ORC file format is supported. Tables must be bucketed in order to utilize this feature.
In an “ACID” enabled table, data is stored as a set of base files. New records, updates, and deletes are stored as delta files. For each transactions a new set of delta files are created.When reading the data from table, the reader blends the base and delta records, applying any updates and deletes as it reads.
Having so many delta/small files can lead to performance issues, this can be addressed by utilizing the compaction process which combines the delta files. Compaction is explained in the later part of this subject.
Defining Acid enabled Tables
To define an ACID enabled table in Hive use the accompanying configuration parameters,
Set hive.enforce.bucketing=true
Set hive.exec.dynamic.partition.mode= nonstrict
Set hive.support.concurrency=true
and set the “transactional”=”true” table property on HIVE DDL query which enables the ACID features on Hive table.
CREATE TABLE IF NOT EXISTS orders (id bigint,
customer_id string,
customer_name string,
product_id int,
product_name string,
product_price decimal(12,2),
quantity int,
src_update_ts timestamp,
src_file string,
ingestion_ts timestamp)
PARTITIONED BY (order_date date)
CLUSTERED BY (id) INTO 10 BUCKETS STORED AS ORC
LOCATION '/user/dks/datalake/orders_acid';
TBLPROPERTIES ("transactional"="true")
Step 3: Ingestion
The following query can be used to perform insert/update or delete based on the requirements.
with new_data_stg as (select * from orders_stg)
MERGE INTO orders AS T
USING new_data_stg AS S
ON T.ID = S.ID and T.order_date = S.order_date
WHEN MATCHED AND (T.quantity != S.quantity AND S.quantity IS NOT NULL) THEN UPDATE SET quantity = S.quantity, src_update_ts = S.src_update_ts
WHEN MATCHED AND S.quantity IS NULL THEN DELETE
WHEN NOT MATCHED THEN INSERT VALUES (S.ID, S.customer_id, s.customer_name, s.product_id, s.product_name, s.product_price,quantity,src_update_ts,ingestion_ts, cast
(order_date as date));
As mentioned above the disadvantage with this approach is each time an update/insert or delete operation is performed on a partition or table, Hive creates delta files which results in huge number of small delta files. Which can hinder the performance while reading the data. To counter this, enable auto compactor option on the table so that Hive can perform major and minor compaction out of sight to combine small/delta files.
The revised query to enable compaction on the hive table:
CREATE TABLE IF NOT EXISTS orders (id bigint,
customer_id string,
customer_name string,
product_id int,
product_name string,
product_price decimal(12,2),
quantity int,
src_update_ts timestamp,
src_file string,
ingestion_ts timestamp)
PARTITIONED BY (order_date date)
CLUSTERED BY (id) INTO 10 BUCKETS STORED AS ORC
LOCATION '/user/dks/datalake/orders_acid';
TBLPROPERTIES ("transactional"="true",
"compactor.mapreduce.map.memory.mb"="3072", -- specify compaction map job properties
"compactorthreshold.hive.compactor.delta.num.threshold"="20", -- trigger minor compaction if there are more than 20 delta directories
"compactorthreshold.hive.compactor.delta.pct.threshold"="0.5" -- trigger major compaction if the ratio of size of delta files to -- size of base files is greater than 50%
);
Conclusion
So far we have covered various sorts of methods to perform incremental ingestion on Hive table and ACID enabled hive tables. As discussed, based on the criteria and requirement choose the suitable ingestion type.
Please feel free to contact me at my twitter account, if you have any sort of queries.
Comments