Bucketing Tables

This document outlines the ODAS support for bucketing (clustered) tables.

Introduction

Bucketing or Clustering is a feature by which large data is bucketed by one or more bucketing columns. Each bucket or group of data is stored in one file (for hive bucketing) and/or more files with similar name (in case of Spark bucketing). The file name is derived from a hash of the bucketing column(s) value. ODAS currently supports hive bucketing where there is single file per bucket. More about hive bucketing.

Users can also choose the number of such buckets to create as part of the bucketed table definition. When generating the data via hive client, the data will automatically land into one of those bucketed files based on the hash of the bucketing column. More details below.

ODAS supports bucketed tables starting from 1.6 release.

Getting Started

Create bucketed tables

ODAS supports SQL syntax for bucketed tables. This syntax is very similar to hive CLUSTERED BY syntax. For example, below is a sample bucketed table for San Francisco Bay Area population stats by city. Note, this data is representational and was derived from this link

CREATE EXTERNAL TABLE IF NOT EXISTS buckets_test.sfbayarea_popl_stats_bucketed (
  name            STRING,
  type            STRING,
  county          STRING,
  population      BIGINT,
  landareasqmi    DECIMAL(5,2),
  landareasqkm    DECIMAL(5,2)
)
CLUSTERED BY (name) INTO 10 BUCKETS
STORED AS PARQUET
LOCATION 's3://buckets_test/sfbaystats_bucketed/';
  • Here the CLUSTERED BY is the keyword used to identify the bucketing column. The keyword is followed by a list of bucketing columns in braces. For example, here the bucketing column is name and so the SQL syntax has CLUSTERED BY (name). Multiple columns can be specified as bucketing columns in which case, while using hive to insert/update the data in this dataset, the bucketed files will be named based on the hash (TBD provide link) of the bucketing columns.
  • Users can also choose the number of buckets they would want the data to be bucketed/grouped. This is done by specifying a INTO <N> BUCKETS along with the CLUSTURED BY (<bucketing columns>) command. For example, when N=10 in the above example, the syntax is INTO 10 BUCKETS.
  • Together the full syntax for bucketing is CLUSTERED BY (name) INTO 10 buckets. This denotes, the table is bucketed by name column into 10 buckets.

Efficient Bucketed Joins

A primary reason to use bucketed tables is to support join between two large datasets.

For example, consider a scenario where we have two large tables, Table A and Table B and both are of 1TB data size each. Without bucketing, the join used is BROADCAST for Table B to match each row of Table A which could result in potential OOM errors or need infrastructure with huge amount of memory to support such BROADCAST of Table B in this example. With bucketing, this problem is solved because we only need to join between the buckets and not worry about broadcasting/streaming the Table B for each Table A splits.

ODAS from 1.6 has some significant improvements for supporting bucketed table joins. Restrictions apply see below. The Table A in the above example would be scanned in multiple splits by buckets. For example, if there are 100 buckets, the Table A will be scanned by 100 splits (Each split will be spread across one task per worker). Now, for joining with Table B instead of reading all data from Table B for each split, only the corresponding buckets of Table B will be streamed joined for a split from Table A. For example if a split is processing Bucket 20 of Table A, in order to join with Table B, only Bucket 20 of Table B will be scanned/streamed to that node. This not only results in efficient join condition evaluation in terms of memory, it also results in faster query evaulation due to reduced number of join rows evaluations.

Important restrictions for bucketed table joins

In order to support bucketed table joins as explained above, the datasets need to,

  • Have similar bucketing columns on tables.
  • Have matching datatypes in the bucketing columns.
  • Should be generated with same client application that way resulting in same bucketing scheme. For example, a bucketing table generated by hive cannot be used with spark generated bucketing tables. This is because spark uses a different bucketing mechanism than hive.
  • The order of the bucketing columns should match between the tables.

Example, lets consider two tables as below and discuss the bucketing restrictions,

CREATE EXTERNAL TABLE IF NOT EXISTS buckets_test.sfbayarea_popl_stats_bucketed (
  name            STRING,
  type            STRING,
  county          STRING,
  population      BIGINT,
  landareasqmi    DECIMAL(5,2),
  landareasqkm    DECIMAL(5,2)
)
CLUSTERED BY (name) INTO 10 BUCKETS
STORED AS PARQUET
LOCATION 's3://buckets_test/sfbaystats_bucketed/';
CREATE EXTERNAL TABLE IF NOT EXISTS buckets_test.sfbayarea_housing_stats_bucketed (
  city_name            STRING,
  type            STRING,
  county          STRING,
  number_of_houses      BIGINT,
)
CLUSTERED BY (city_name) INTO 10 BUCKETS
STORED AS PARQUET
LOCATION 's3://buckets_test/sfbay_housing_stats_bucketed/';

In order for ODAS bucketed join support to work,

  • The data for the above tables should be generated via same client. Example, hive.
  • If using hive, the datasets of the above tables would be 10 files each matching the 10 buckets.
  • Both the tables have the city's name as the bucketing column. Note, the name of the columns are different but they both have similar data in it. The sfbayarea_popl_stats_bucketed table has name as bucketing column and sfbayarea_housing_stats_bucketed has city_name as the bucketing column but they both have similar data in them. This is very typical scenario where the columns are similar but a primarykey in one table is a foreignkey with different name in another table. Hence, this scenario is valid and supported by ODAS as long as the data type matches.

Some invalid scenarios for bucketed joins,

  • If the sfbayarea_popl_stats_bucketed has name as bucketing column and sfbayarea_housing_stats_bucketed has number_of_houses as bucketing column, no bucketing join will kick-off. This is because they have different data types.
  • Similarly, if sfbayarea_popl_stats_bucketed has name as bucketing column and sfbayarea_housing_stats_bucketed has (city_name, type) as bucketing columns, the join will not result in a bucketed join. This is because the bucketing columns are varying for the two tables.
  • Also, if sfbayarea_popl_stats_bucketed has (name, type) and sfbayarea_housing_stats_bucketed has (number_of_houses, city_name) as bucketing columns, the join would not result in bucketing join because the datatypes or column ordering do not match.

Note

Since, ODAS does not restrict on the column name for bucketing (but enforce on datatypes), it is up to the end user to specify the right bucketing column otherwise it will lead to incorrect join evaluation. For example, bucketing column name from sfbayarea_popl_stats_bucketed and bucketing column county from sfbayarea_housing_stats_bucketed will still result in incorrect bucketing join because the datatypes match and the number of bucketing columns too match. So users, need to be careful not to create such incorrect bucketing columns.

FAQs

  • Currently hive bucketing is supported well in ODAS. This is because with hive client to generate the data, there will be exactly one file per bucket and ODAS can support joining between those files for two tables.
  • Spark bucketing would also work provided, the datasets for the tables are generated with the exact spark clients and that results in the exact number of files for the tables data. Spark generates multiple files per bucket and the file names are not an exact hash of the bucketing columns.
  • The auto crawler would detect the bucketed datasets as regular tables. In order to convert it to a bucketed table, users will have to recreate the table with appropriate keywords for Bucketing (CLUSTERED BY) along with the Bucketing information (INTO N BUCKETS).