HIVE Bucketing
Bucketing is another way for dividing data sets into more manageable parts. Clustering, aka bucketing, will result in a fixed number of files, since we will specify the number of buckets. Hive will calculate a hash for it and assign a record to that bucket. Physically, each bucket is just a file in the table directory. It can be done with partitioning on hive tables or without partitioning also.
Here we use hive.enforce.bucketing property which is similar to hive.exec.dynamic.partition property in partitioning. By setting this property we will enable dynamic bucketing while loading the data into hive table.
Note-
Suppose if we mentioned 256 buckets and the field we are bucketing has only 50 different values then we will have 50 buckets with data, and 206 buckets with no data.
Difference between partition and bucketing
Partitioning data is often used for organizing data logically and distributing load horizontally. Example: if we are dealing with a large employee table and often run queries with WHERE clauses that restrict the results to a particular country or department. For a faster query response Hive table can be PARTITIONED BY (country STRING, DEPT STRING). Partitioning tables changes the structure of Hive and create subdirectories and store data in it based on partition like,
.../employees/country=A/DEPT=B.
If we want only ‘country=A’ data, then it will only scan the contents of one directory ‘country=A’. This will dramatically increase query performance, but only if the partitioning scheme reflects common filtering. One of the drawback of having many partitions is having large number of Hadoop files and directories that are created which become problem to NameNode since it must keep all that metadata in memory.
Bucketing is another way for dividing data sets into more manageable parts. For example, suppose we are having a huge table having student’s information and we are using student_data as the top-level partition and id as the second-level partition which leads to many small partitions. Instead of that if we bucket the student table and use id as the bucketing column, then the value of this column will be hashed by a user-defined number into buckets. Records with the same id will always be stored in the same bucket. While creating the table only we can mention the number of buckets by using,
CLUSTERED BY (id) into x buckets
The number of buckets is fixed so it does not fluctuate with data. It also helps us in doing efficient map-side joins etc.
Let us see one example of bucketing,
hive> show tables; OK student Time taken: 1.328 seconds, Fetched: 1 row(s) hive> select * from student; OK siva 3 sai 6 rahul 9 siva 8 Time taken: 1.393 seconds, Fetched: 5 row(s) hive> describe student; OK name string marks int Time taken: 0.189 seconds, Fetched: 2 row(s)
Here we have student table. For bucketing first we have to set the bucketing property to ‘true’. It can be done as,
hive> set hive.enforce.bucketing = true;
The above hive.enforce.bucketing = true property sets the number of reduce tasks to be equal to the number of buckets mentioned in the table definition (Which is ‘2’ in our case) and automatically selects the clustered by column from table definition
Next we need to create a temporary table first with all the columns of student table.
hive> create table bucket_student (name string , marks int) clustered by (name) into 2 buckets row format delimited fields terminated by ','; OK Time taken: 0.529 seconds hive> show tables; OK bucket_student student Time taken: 0.067 seconds, Fetched: 2 row(s)
Then we have to insert the student information into our temporary table using Select…From clause. We should use Insert..Into (or) Insert..Overwrite.
hive> insert into table bucket_student select name,marks from student; Total jobs = 1 Launching Job 1 out of 1 Number of reduce tasks determined at compile time: 2 In order to change the average load for a reducer (in bytes): set hive.exec.reducers.bytes.per.reducer=<number> In order to limit the maximum number of reducers: set hive.exec.reducers.max=<number> In order to set a constant number of reducers: set mapreduce.job.reduces=<number> Starting Job = job_1513572589371_0001, Tracking URL = http://ubuntu:8088/proxy/application_1513572589371_0001/ Kill Command = /home/hdadmin/hadoop-2.5.0-cdh5.3.2/bin/hadoop job -kill job_1513572589371_0001 Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 2 2017-12-17 20:57:30,612 Stage-1 map = 0%, reduce = 0% 2017-12-17 20:57:36,185 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 0.7 sec 2017-12-17 20:57:45,004 Stage-1 map = 100%, reduce = 50%, Cumulative CPU 1.64 sec 2017-12-17 20:57:47,165 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 2.54 sec MapReduce Total cumulative CPU time: 2 seconds 540 msec Ended Job = job_1513572589371_0001 Loading data to table default.bucket_student Table default.bucket_student stats: [numFiles=2, numRows=5, totalSize=37, rawDataSize=32] MapReduce Jobs Launched: Stage-Stage-1: Map: 1 Reduce: 2 Cumulative CPU: 2.54 sec HDFS Read: 249 HDFS Write: 193 SUCCESS Total MapReduce CPU Time Spent: 2 seconds 540 msec OK Time taken: 41.232 seconds
After executing the job, 2 files are created as we have given 2 buckets in ‘Clustered by’ clause.
hdadmin@ubuntu:~/hive-0.13.1-cdh5.3.2$ hdfs dfs -ls /user/hive/warehouse/bucket_student Found 2 items -rwxr-xr-x 1 hdadmin supergroup 8 2017-12-17 20:57 /user/hive/warehouse/bucket_student/000000_0 -rwxr-xr-x 1 hdadmin supergroup 29 2017-12-17 20:57 /user/hive/warehouse/bucket_student/000001_0
We can use ‘tablesample’ to get only particular information from specified bucket. This is similar to ‘limit’ operator but limit operator scans entire table. Tablesample operator scans only the mentioned bucket information.
hive> select name, marks from bucket_student > tablesample(bucket 1 out of 2 on name); rahul 9 Time taken: 10.95 seconds, Fetched: 1 row(s) hive> select name, marks from bucket_student tablesample(1 Percent);
Limit operator example,
hive> select name,marks from student limit 2; siva 3 sai 6 Time taken: 14.265 seconds, Fetched: 2 row(s)