Pig – Practical:
First we need to start all the Hadoop and yarn daemons by using ‘start-dfs.sh’ and ‘start-yarn.sh’ in the Hadoop cluster. We need to check whether all the daemons are running or not using ‘JPS’ option.
We need to start another daemon called ‘JobHistoryServer’ in pig which is used to Keep track of Map-Reduce jobs.
hdadmin@ubuntu:~$ cd hadoop-2.5.0-cdh5.3.2/ hdadmin@ubuntu:~/hadoop-2.5.0-cdh5.3.2$ sbin/mr-jobhistory-daemon.sh start historyserver starting historyserver, logging to /home/hdadmin/hadoop-2.5.0-cdh5.3.2/logs/mapred-hdadmin-historyserver-ubuntu.out hdadmin@ubuntu:~/hadoop-2.5.0-cdh5.3.2$ jps 3811 ResourceManager 3957 NodeManager 3030 NameNode 4359 Jps 4329 JobHistoryServer 3308 SecondaryNameNode 3150 DataNode
Now we have all the daemons up and running.
Next we need to start the ‘grunt’ shell.
hdadmin@ubuntu:~$ cd pig-0.12.0-cdh5.3.2/ hdadmin@ubuntu:~/pig-0.12.0-cdh5.3.2$ bin/pig 2017-12-03 04:33:56,085 [main] INFO org.apache.pig.Main - Apache Pig version 0.12.0-cdh5.3.2 (rexported) compiled Feb 24 2015, 12:58:37 2017-12-03 04:33:56,087 [main] INFO org.apache.pig.Main - Logging error messages to: /home/hdadmin/pig-0.12.0-cdh5.3.2/pig_1512304436078.log 2017-12-03 04:33:56,278 [main] INFO org.apache.pig.impl.util.Utils - Default bootup file /home/hdadmin/.pigbootup not found 2017-12-03 04:33:57,364 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address 2017-12-03 04:33:57,374 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS 2017-12-03 04:33:57,374 [main] INFO org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://localhost:9000 2017-12-03 04:33:58,334 [main] WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 2017-12-03 04:34:00,026 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS grunt>
We can use help command in Pig as,
hdadmin@ubuntu:~/pig-0.12.0-cdh5.3.2$ bin/pig -help Apache Pig version 0.12.0-cdh5.3.2 (rexported) compiled Feb 24 2015, 12:58:37 USAGE: Pig [options] [-] : Run interactively in grunt shell. Pig [options] -e[xecute] cmd [cmd ...] : Run cmd(s). Pig [options] [-f[ile]] file : Run cmds found in file. options include: -4, -log4jconf - Log4j configuration file, overrides log conf -b, -brief - Brief logging (no timestamps) -c, -check - Syntax check -d, -debug - Debug level, INFO is default -e, -execute - Commands to execute (within quotes) -f, -file - Path to the script to execute -g, -embedded - ScriptEngine classname or keyword for the ScriptEngine -h, -help - Display this message. You can specify topic to get help for that topic. properties is the only topic currently supported: -h properties. -i, -version - Display version information -l, -logfile - Path to client side log file; default is current working directory. -m, -param_file - Path to the parameter file -p, -param - Key value pair of the form param=val -r, -dryrun - Produces script with substituted parameters. Script is not executed. -t, -optimizer_off - Turn optimizations off. The following values are supported: SplitFilter - Split filter conditions PushUpFilter - Filter as early as possible MergeFilter - Merge filter conditions PushDownForeachFlatten - Join or explode as late as possible LimitOptimizer - Limit as early as possible ColumnMapKeyPrune - Remove unused data AddForEach - Add ForEach to remove unneeded columns MergeForEach - Merge adjacent ForEach GroupByConstParallelSetter - Force parallel 1 for "group all" statement All - Disable all optimizations All optimizations listed here are enabled by default. Optimization values are case insensitive. -v, -verbose - Print all error messages to screen -w, -warning - Turn warning logging on; also turns warning aggregation off -x, -exectype - Set execution mode: local|mapreduce, default is mapreduce. -F, -stop_on_failure - Aborts execution on the first failed job; default is off -M, -no_multiquery - Turn multiquery optimization off; default is on -P, -propertyFile - Path to property file -printCmdDebug - Overrides anything else and prints the actual command used to run Pig, including any environment variables that are set by the pig command.
Now just put some data in HDFS and use some Pig operators on it.
hdadmin@ubuntu:~$ hdfs dfs -put india.txt /data1 17/12/03 04:43:44 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Now we are loading the data into Pig and using the ‘DUMP’ operator to display it on screen.
grunt> data = LOAD '/data1' AS (line:chararray); grunt> dump data; org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer - {RULES_ENABLED=[AddForEach, ColumnMapKeyPrune, DuplicateForEachColumnRewrite, GroupByConstParallelSetter, ImplicitSplitInserter, LimitOptimizer, LoadTypeCastInserter, MergeFilter, MergeForEach, NewPartitionFilterOptimizer, PushDownForEachFlatten, PushUpFilter, SplitFilter, StreamTypeCastInserter], RULES_DISABLED=[FilterLogicExpressionSimplifier, PartitionFilterOptimizer]} 2017-12-03 04:52:29,794 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler - File concatenation threshold: 100 optimistic? false 2017-12-03 04:52:29,795 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size before optimization: 1 2017-12-03 04:52:29,796 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size after optimization: 1 2017-12-03 04:52:29,893 [main] INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at /0.0.0.0:8032 2017-12-03 04:52:29,895 [main] INFO org.apache.pig.tools.pigstats.ScriptState - Pig script settings are added to the job 2017-12-03 04:52:29,967 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - mapred.job.reduce.markreset.buffer.percent is not set, set to default 0.3 2017-12-03 04:52:29,973 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - creating jar file Job6049784579156298372.jar 2017-12-03 04:52:39,200 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - jar file Job6049784579156298372.jar created 2017-12-03 04:52:39,214 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Setting up single store job 2017-12-03 04:52:39,215 [main] INFO org.apache.pig.data.SchemaTupleFrontend - Key [pig.schematuple] is false, will not generate code. 2017-12-03 04:52:39,220 [main] INFO org.apache.pig.data.SchemaTupleFrontend - Starting process to move generated code to distributed cache 2017-12-03 04:52:39,221 [main] INFO org.apache.pig.data.SchemaTupleFrontend - Setting key [pig.schematuple.classes] with classes to deserialize [] 2017-12-03 04:52:39,288 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 1 map-reduce job(s) waiting for submission. 2017-12-03 04:52:39,297 [JobControl] INFO org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at /0.0.0.0:8032 2017-12-03 04:52:39,337 [JobControl] INFO org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS 2017-12-03 04:52:40,288 [JobControl] INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1 2017-12-03 04:52:40,288 [JobControl] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1 2017-12-03 04:52:40,384 [JobControl] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths (combined) to process : 1 2017-12-03 04:52:40,548 [JobControl] INFO org.apache.hadoop.mapreduce.JobSubmitter - number of splits:1 2017-12-03 04:52:41,461 [JobControl] INFO org.apache.hadoop.mapreduce.JobSubmitter - Submitting tokens for job: job_1512303991356_0003 2017-12-03 04:52:42,320 [JobControl] INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1512303991356_0003 2017-12-03 04:52:42,585 [JobControl] INFO org.apache.hadoop.mapreduce.Job - The url to track the job: http://ubuntu:8088/proxy/application_1512303991356_0003/ 2017-12-03 04:52:42,590 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - HadoopJobId: job_1512303991356_0003 2017-12-03 04:52:42,591 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Processing aliases data 2017-12-03 04:52:42,592 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - detailed locations: M: data[3,7],data[-1,-1] C: R: 2017-12-03 04:52:42,763 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 0% complete 2017-12-03 04:53:15,205 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 50% complete 2017-12-03 04:53:18,967 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces 2017-12-03 04:53:19,283 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 100% complete 2017-12-03 04:53:19,285 [main] INFO org.apache.pig.tools.pigstats.SimplePigStats - Script Statistics: HadoopVersion PigVersion UserId StartedAt FinishedAt Features 2.5.0-cdh5.3.2 0.12.0-cdh5.3.2 hdadmin 2017-12-03 04:52:29 2017-12-03 04:53:19 UNKNOWN Success! Job Stats (time in seconds): JobId Maps Reduces MaxMapTime MinMapTIme AvgMapTime MedianMapTime MaxReduceTime MinReduceTime AvgReduceTime MedianReducetime Alias Feature Outputs job_1512303991356_0003 1 0 9 9 9 9 n/a n/a n/a n/a data MAP_ONLY hdfs://localhost:9000/tmp/temp1417536758/tmp-1934090340, Input(s): Successfully read 3 records (459 bytes) from: "/data1" Output(s): Successfully stored 3 records (121 bytes) in: "hdfs://localhost:9000/tmp/temp1417536758/tmp-1934090340" Counters: Total records written : 3 Total bytes written : 121 Spillable Memory Manager spill count : 0 Total bags proactively spilled: 0 Total records proactively spilled: 0 Job DAG: job_1512303991356_0003 2017-12-03 04:53:19,613 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success! 2017-12-03 04:53:19,621 [main] INFO org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS 2017-12-03 04:53:19,623 [main] INFO org.apache.pig.data.SchemaTupleBackend - Key [pig.schematuple] was not set... will not generate code. 2017-12-03 04:53:19,659 [main] INFO org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1 2017-12-03 04:53:19,664 [main] INFO org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1 (India is one of the best country in the world.) (India is the best country.) (Inida has a huge population.)
This is just a Word Count example,
Here we are dividing the input text value into individual words.
grunt> characters = FOREACH data GENERATE FLATTEN(TOKENIZE(line)) AS word; grunt> dump characters; (India) (is) (one) (of) (the) (best) (country) (in) (the) (world.) (India) (is) (the) (best) (country.) (Inida) (has) (a) (huge) (population.)
Now we are grouping all the words
grunt>word_groups = GROUP characters BY word; (a,{(a)}) (in,{(in)}) (is,{(is),(is)}) (of,{(of)}) (has,{(has)}) (one,{(one)}) (the,{(the),(the),(the)}) (best,{(best),(best)}) (huge,{(huge)}) (India,{(India),(India)}) (Inida,{(Inida)}) (country,{(country),(country)}) (world,{(world)}) (population,{(population)})
Now we are counting each word.
grunt>word_count = FOREACH word_groups GENERATE COUNT(characters) AS count, group AS word; (1,a) (1,in) (2,is) (1,of) (1,has) (1,one) (3,the) (2,best) (1,huge) (2,India) (1,Inida) (2,country) (1,world) (1,population)
Now we are counting the words in descending order.
grunt>ordered_word_count = ORDER word_count BY count DESC; dump ordered_word_count; When we dump our map-reduce job is started. (3,the) (2,India) (2,best) (2,is) (2,country) (1,Inida) (1,huge) (1,one) (1,has) (1,of) (1,in) (1,a) (1,world) (1,population)
By this we have got the word count of each and every word in the program.
Next we are loading some sample data into pig.
grunt> A = load '/data1' using PigStorage(‘,’) as (name:chararray, age:int); grunt> dump A; (siva,3) (sai,6) (rahul,9)
Let us see the use of Foreach generate function, and see its functionality.
grunt> b= foreach a generate name; grunt> dump b; (siva) (sai) (rahul)
Let us apply group operators on it and store the data into ‘output’
grunt> c = group a by name; grunt> dump c; (sai,{(sai,6)}) (siva,{(siva,3)}) (rahul,{(rahul,9)}) grunt> store c into ‘output’; Input(s): Successfully read 3 records (379 bytes) from: "/data1" Output(s): Successfully stored 3 records (48 bytes) in: "hdfs://localhost:9000/user/hdadmin/output"
The output will be stored into output directory in ‘/user/hdadmin’ directory present in HDFS.
hdadmin@ubuntu:~/pig-0.12.0-cdh5.3.2$ hdfs dfs -ls /user/hdadmin/output sai {(sai,6)} siva {(siva,3)} rahul {(rahul,9)}
Now let us see some ‘JOIN” operators. Here we are using 2 text files and joining them.
hdadmin@ubuntu:~$ hdfs dfs -cat /data/data.txt 17/12/09 06:35:56 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 1,anu,3 2,sai,4 3,sri,7 hdadmin@ubuntu:~$ hdfs dfs -cat /data1/student.txt 17/12/09 06:36:19 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable siva,3 sai,6 rahul,9
We are using 2 relations big and small for loading the data and joining it.
grunt> big = load '/data' using PigStorage(',') as (id:int, name:chararray, age:int); grunt> small= load '/data1' using PigStorage(',') as (name:chararray, age:int); grunt> c = join big by name, small by name; grunt> dump c; Input(s): Successfully read 3 records from: "/data" Successfully read 3 records from: "/data1" Output(s): Successfully stored 1 records (22 bytes) in: "hdfs://localhost:9000/tmp/temp-1904148403/tmp1496302303" (2,sai,4,sai,6)
Let us see some of the Pig operators,
grunt> a = load '/data' using PigStorage(',') as (id:int, name:chararray, age:int); grunt> b = filter a by $2 <5 ; (1,anu,3) (2,sai,4) grunt> a = load '/data1' using PigStorage(',') as (name:chararray, age:int); grunt> b = foreach a generate age + 5; grunt> dump b; (8) (11) (14)
Let us see Explain plan of any relation.
grunt> describe a; a: {name: chararray,age: int} grunt> explain a; #----------------------------------------------- # New Logical Plan: #----------------------------------------------- a: (Name: LOStore Schema: name#49:chararray,age#50:int) | |---a: (Name: LOForEach Schema: name#49:chararray,age#50:int) | | | (Name: LOGenerate[false,false] Schema: name#49:chararray,age#50:int)ColumnPrune:OutputUids=[49, 50]ColumnPrune:InputUids=[49, 50] | | | | | (Name: Cast Type: chararray Uid: 49) | | | | | |---name:(Name: Project Type: bytearray Uid: 49 Input: 0 Column: (*)) | | | | | (Name: Cast Type: int Uid: 50) | | | | | |---age:(Name: Project Type: bytearray Uid: 50 Input: 1 Column: (*)) | | | |---(Name: LOInnerLoad[0] Schema: name#49:bytearray) | | | |---(Name: LOInnerLoad[1] Schema: age#50:bytearray) | |---a: (Name: LOLoad Schema: name#49:bytearray,age#50:bytearray)RequiredFields:null #----------------------------------------------- # Physical Plan: #----------------------------------------------- a: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-42 | |---a: New For Each(false,false)[bag] - scope-41 | | | Cast[chararray] - scope-36 | | | |---Project[bytearray][0] - scope-35 | | | Cast[int] - scope-39 | | | |---Project[bytearray][1] - scope-38 | |---a: Load(/data1:PigStorage(',')) - scope-34 #-------------------------------------------------- # Map Reduce Plan #-------------------------------------------------- MapReduce node scope-43 Map Plan a: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-42 | |---a: New For Each(false,false)[bag] - scope-41 | | | Cast[chararray] - scope-36 | | | |---Project[bytearray][0] - scope-35 | | | Cast[int] - scope-39 | | | |---Project[bytearray][1] - scope-38 | |---a: Load(/data1:PigStorage(',')) - scope-34-------- Global sort: false ----------------
Now let us just execute Pig using script commands.
First we need to write a pig script. This script shows that the data is loaded from ‘/data1’ directory.
hdadmin@ubuntu:~/pig-0.12.0-cdh5.3.2$ cat id.pig a = load '/data1' as (name:chararray, age:int); dump a;
Now execute the pig script.
hdadmin@ubuntu:~/pig-0.12.0-cdh5.3.2$ bin/pig id.pig (siva,3,) (sai,6,) (rahul,9,)
Now we are passing input parameter to the pig script using ‘-param’ parameter.
hdadmin@ubuntu:~/pig-0.12.0-cdh5.3.2$ cat id.pig a = load '$data' as (name:chararray, age:int); dump a; hdadmin@ubuntu:~/pig-0.12.0-cdh5.3.2$ bin/pig -param data=/data1 id.pig (siva,3,) (sai,6,) (rahul,9,)
We can use Linux commands and HDFS command directly in grunt like,
grunt> ls hdfs://localhost:9000/user/hdadmin/output <dir> grunt> pwd hdfs://localhost:9000/user/hdadmin grunt> cd /data1 grunt> ls hdfs://localhost:9000/data1/input.pig<r 1> 657