/    /  Pig – Practical

Pig – Practical:

First we need to start all the Hadoop and yarn daemons by using ‘start-dfs.sh’ and ‘start-yarn.sh’ in the Hadoop cluster. We need to check whether all the daemons are running or not using ‘JPS’ option.

We need to start another daemon called ‘JobHistoryServer’ in pig which is used to Keep track of Map-Reduce jobs.

hdadmin@ubuntu:~$ cd hadoop-2.5.0-cdh5.3.2/

hdadmin@ubuntu:~/hadoop-2.5.0-cdh5.3.2$ sbin/mr-jobhistory-daemon.sh start historyserver

starting historyserver, logging to /home/hdadmin/hadoop-2.5.0-cdh5.3.2/logs/mapred-hdadmin-historyserver-ubuntu.out


hdadmin@ubuntu:~/hadoop-2.5.0-cdh5.3.2$ jps

3811 ResourceManager

3957 NodeManager

3030 NameNode

4359 Jps

4329 JobHistoryServer

3308 SecondaryNameNode

3150 DataNode

Now we have all the daemons up and running.

Next we need to start the ‘grunt’ shell.

hdadmin@ubuntu:~$ cd pig-0.12.0-cdh5.3.2/

hdadmin@ubuntu:~/pig-0.12.0-cdh5.3.2$ bin/pig


2017-12-03 04:33:56,085 [main] INFO  org.apache.pig.Main - Apache Pig version 0.12.0-cdh5.3.2 (rexported) compiled Feb 24 2015, 12:58:37

2017-12-03 04:33:56,087 [main] INFO  org.apache.pig.Main - Logging error messages to: /home/hdadmin/pig-0.12.0-cdh5.3.2/pig_1512304436078.log

2017-12-03 04:33:56,278 [main] INFO  org.apache.pig.impl.util.Utils - Default bootup file /home/hdadmin/.pigbootup not found

2017-12-03 04:33:57,364 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address

2017-12-03 04:33:57,374 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS

2017-12-03 04:33:57,374 [main] INFO  org.apache.pig.backend.hadoop.executionengine.HExecutionEngine - Connecting to hadoop file system at: hdfs://localhost:9000

2017-12-03 04:33:58,334 [main] WARN  org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

2017-12-03 04:34:00,026 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS

grunt>

We can use help command in Pig as,

hdadmin@ubuntu:~/pig-0.12.0-cdh5.3.2$ bin/pig -help



Apache Pig version 0.12.0-cdh5.3.2 (rexported)

compiled Feb 24 2015, 12:58:37


USAGE: Pig [options] [-] : Run interactively in grunt shell.

            Pig [options] -e[xecute] cmd [cmd ...] : Run cmd(s).

            Pig [options] [-f[ile]] file : Run cmds found in file.

 options include:

            -4, -log4jconf - Log4j configuration file, overrides log conf

            -b, -brief - Brief logging (no timestamps)

            -c, -check - Syntax check

            -d, -debug - Debug level, INFO is default

            -e, -execute - Commands to execute (within quotes)

            -f, -file - Path to the script to execute

            -g, -embedded - ScriptEngine classname or keyword for the ScriptEngine

            -h, -help - Display this message. You can specify topic to get help for that topic.

            properties is the only topic currently supported: -h properties.

            -i, -version - Display version information

            -l, -logfile - Path to client side log file; default is current working directory.

            -m, -param_file - Path to the parameter file

            -p, -param - Key value pair of the form param=val

            -r, -dryrun - Produces script with substituted parameters. Script is not executed.

            -t, -optimizer_off - Turn optimizations off. The following values are supported:

            SplitFilter - Split filter conditions

            PushUpFilter - Filter as early as possible

            MergeFilter - Merge filter conditions

            PushDownForeachFlatten - Join or explode as late as possible

            LimitOptimizer - Limit as early as possible

            ColumnMapKeyPrune - Remove unused data

            AddForEach - Add ForEach to remove unneeded columns

            MergeForEach - Merge adjacent ForEach

            GroupByConstParallelSetter - Force parallel 1 for "group all" statement

            All - Disable all optimizations

            All optimizations listed here are enabled by default. Optimization values are case insensitive.

            -v, -verbose - Print all error messages to screen

            -w, -warning - Turn warning logging on; also turns warning aggregation off

            -x, -exectype - Set execution mode: local|mapreduce, default is mapreduce.

            -F, -stop_on_failure - Aborts execution on the first failed job; default is off

            -M, -no_multiquery - Turn multiquery optimization off; default is on

            -P, -propertyFile - Path to property file

            -printCmdDebug - Overrides anything else and prints the actual command used to run Pig, including
any environment variables that are set by the pig command.

Now just put some data in HDFS and use some Pig operators on it.

hdadmin@ubuntu:~$ hdfs dfs -put india.txt /data1

17/12/03 04:43:44 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Now we are loading the data into Pig and using the ‘DUMP’ operator to display it on screen.

grunt> data = LOAD '/data1' AS (line:chararray);

grunt> dump data;

org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer - {RULES_ENABLED=[AddForEach, ColumnMapKeyPrune, DuplicateForEachColumnRewrite, GroupByConstParallelSetter, ImplicitSplitInserter, LimitOptimizer, LoadTypeCastInserter, MergeFilter, MergeForEach, NewPartitionFilterOptimizer, PushDownForEachFlatten, PushUpFilter, SplitFilter, StreamTypeCastInserter], RULES_DISABLED=[FilterLogicExpressionSimplifier, PartitionFilterOptimizer]}

2017-12-03 04:52:29,794 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler - File concatenation threshold: 100 optimistic? false

2017-12-03 04:52:29,795 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size before optimization: 1

2017-12-03 04:52:29,796 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MultiQueryOptimizer - MR plan size after optimization: 1

2017-12-03 04:52:29,893 [main] INFO  org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at /0.0.0.0:8032

2017-12-03 04:52:29,895 [main] INFO  org.apache.pig.tools.pigstats.ScriptState - Pig script settings are added to the job

2017-12-03 04:52:29,967 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - mapred.job.reduce.markreset.buffer.percent is not set, set to default 0.3

2017-12-03 04:52:29,973 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - creating jar file Job6049784579156298372.jar

2017-12-03 04:52:39,200 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - jar file Job6049784579156298372.jar created

2017-12-03 04:52:39,214 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler - Setting up single store job

2017-12-03 04:52:39,215 [main] INFO  org.apache.pig.data.SchemaTupleFrontend - Key [pig.schematuple] is false, will not generate code.

2017-12-03 04:52:39,220 [main] INFO  org.apache.pig.data.SchemaTupleFrontend - Starting process to move generated code to distributed cache

2017-12-03 04:52:39,221 [main] INFO  org.apache.pig.data.SchemaTupleFrontend - Setting key [pig.schematuple.classes] with classes to deserialize []

2017-12-03 04:52:39,288 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 1 map-reduce job(s) waiting for submission.

2017-12-03 04:52:39,297 [JobControl] INFO  org.apache.hadoop.yarn.client.RMProxy - Connecting to ResourceManager at /0.0.0.0:8032

2017-12-03 04:52:39,337 [JobControl] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS

2017-12-03 04:52:40,288 [JobControl] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1

2017-12-03 04:52:40,288 [JobControl] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1

2017-12-03 04:52:40,384 [JobControl] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths (combined) to process : 1

2017-12-03 04:52:40,548 [JobControl] INFO  org.apache.hadoop.mapreduce.JobSubmitter - number of splits:1

2017-12-03 04:52:41,461 [JobControl] INFO  org.apache.hadoop.mapreduce.JobSubmitter - Submitting tokens for job: job_1512303991356_0003

2017-12-03 04:52:42,320 [JobControl] INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl - Submitted application application_1512303991356_0003

2017-12-03 04:52:42,585 [JobControl] INFO  org.apache.hadoop.mapreduce.Job - The url to track the job: http://ubuntu:8088/proxy/application_1512303991356_0003/

2017-12-03 04:52:42,590 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - HadoopJobId: job_1512303991356_0003

2017-12-03 04:52:42,591 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Processing aliases data

2017-12-03 04:52:42,592 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - detailed locations: M: data[3,7],data[-1,-1] C:  R:

2017-12-03 04:52:42,763 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 0% complete

2017-12-03 04:53:15,205 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 50% complete

2017-12-03 04:53:18,967 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - mapred.reduce.tasks is deprecated. Instead, use mapreduce.job.reduces

2017-12-03 04:53:19,283 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - 100% complete

2017-12-03 04:53:19,285 [main] INFO  org.apache.pig.tools.pigstats.SimplePigStats - Script Statistics:



HadoopVersion    PigVersion    UserId    StartedAt    FinishedAt    Features

2.5.0-cdh5.3.2    0.12.0-cdh5.3.2    hdadmin    2017-12-03 04:52:29    2017-12-03 04:53:19    UNKNOWN


Success!

Job Stats (time in seconds):

JobId    Maps    Reduces    MaxMapTime    MinMapTIme    AvgMapTime    MedianMapTime    MaxReduceTime    MinReduceTime    AvgReduceTime    MedianReducetime    Alias    Feature    Outputs

job_1512303991356_0003    1    0    9    9    9    9    n/a    n/a    n/a    n/a    data    MAP_ONLY    hdfs://localhost:9000/tmp/temp1417536758/tmp-1934090340,


Input(s):

Successfully read 3 records (459 bytes) from: "/data1"

Output(s):

Successfully stored 3 records (121 bytes) in: "hdfs://localhost:9000/tmp/temp1417536758/tmp-1934090340"


Counters:

Total records written : 3

Total bytes written : 121

Spillable Memory Manager spill count : 0

Total bags proactively spilled: 0

Total records proactively spilled: 0

Job DAG:

job_1512303991356_0003


2017-12-03 04:53:19,613 [main] INFO  org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!

2017-12-03 04:53:19,621 [main] INFO  org.apache.hadoop.conf.Configuration.deprecation - fs.default.name is deprecated. Instead, use fs.defaultFS

2017-12-03 04:53:19,623 [main] INFO  org.apache.pig.data.SchemaTupleBackend - Key [pig.schematuple] was not set... will not generate code.

2017-12-03 04:53:19,659 [main] INFO  org.apache.hadoop.mapreduce.lib.input.FileInputFormat - Total input paths to process : 1

2017-12-03 04:53:19,664 [main] INFO  org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil - Total input paths to process : 1


(India is one of the best country in the world.)

(India is the best country.)

(Inida has a huge population.)

This is just a Word Count example,

Here we are dividing the input text value into individual words.

grunt> characters = FOREACH data GENERATE FLATTEN(TOKENIZE(line)) AS word;  

grunt> dump characters;

(India)

(is)

(one)

(of)

(the)

(best)

(country)

(in)

(the)

(world.)

(India)

(is)

(the)

(best)

(country.)

(Inida)

(has)

(a)

(huge)

(population.)

Now we are grouping all the words

grunt>word_groups = GROUP characters BY word;

(a,{(a)})

(in,{(in)})

(is,{(is),(is)})

(of,{(of)})

(has,{(has)})

(one,{(one)})

(the,{(the),(the),(the)})

(best,{(best),(best)})

(huge,{(huge)})

(India,{(India),(India)})

(Inida,{(Inida)})

(country,{(country),(country)})

(world,{(world)})

(population,{(population)})

Now we are counting each word.

grunt>word_count = FOREACH word_groups GENERATE COUNT(characters) AS count, group AS word;

(1,a)

(1,in)

(2,is)

(1,of)

(1,has)

(1,one)

(3,the)

(2,best)

(1,huge)

(2,India)

(1,Inida)

(2,country)

(1,world)

(1,population)

Now we are counting the words in descending order.

grunt>ordered_word_count = ORDER word_count BY count DESC;
dump ordered_word_count;


When we dump our map-reduce job is started.


(3,the)

(2,India)

(2,best)

(2,is)

(2,country)

(1,Inida)

(1,huge)

(1,one)

(1,has)

(1,of)

(1,in)

(1,a)

(1,world)

(1,population)

By this we have got the word count of each and every word in the program.

Next we are loading some sample data into pig.

grunt> A = load '/data1' using PigStorage(‘,’) as (name:chararray, age:int);

grunt> dump A;

(siva,3)

(sai,6)

(rahul,9)

Let us see the use of Foreach generate function, and see its functionality.

grunt> b= foreach a generate name;

grunt> dump b;

(siva)

(sai)

(rahul)

Let us apply group operators on it and store the data into ‘output’

grunt> c = group a by name;

grunt> dump c;

(sai,{(sai,6)})

(siva,{(siva,3)})

(rahul,{(rahul,9)})


grunt> store c into ‘output’;


Input(s):

Successfully read 3 records (379 bytes) from: "/data1"


Output(s):

Successfully stored 3 records (48 bytes) in: "hdfs://localhost:9000/user/hdadmin/output"

The output will be stored into output directory in ‘/user/hdadmin’ directory present in HDFS.

hdadmin@ubuntu:~/pig-0.12.0-cdh5.3.2$ hdfs dfs -ls /user/hdadmin/output

sai    {(sai,6)}

siva    {(siva,3)}

rahul    {(rahul,9)}

Now let us see some ‘JOIN” operators. Here we are using 2 text files and joining them.

hdadmin@ubuntu:~$ hdfs dfs -cat /data/data.txt

17/12/09 06:35:56 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

1,anu,3

2,sai,4

3,sri,7


hdadmin@ubuntu:~$ hdfs dfs -cat /data1/student.txt

17/12/09 06:36:19 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

siva,3

sai,6

rahul,9

We are using 2 relations big and small for loading the data and joining it.

grunt> big = load '/data' using PigStorage(',') as (id:int, name:chararray, age:int);

grunt> small= load '/data1' using PigStorage(',') as (name:chararray, age:int);


grunt> c = join big by name, small by name;

grunt> dump c;

Input(s):

Successfully read 3 records from: "/data"

Successfully read 3 records from: "/data1"



Output(s):

Successfully stored 1 records (22 bytes) in: "hdfs://localhost:9000/tmp/temp-1904148403/tmp1496302303"

(2,sai,4,sai,6)

Let us see some of the Pig operators,

grunt> a = load '/data' using PigStorage(',') as (id:int, name:chararray, age:int);

grunt> b = filter a by $2 <5 ;

(1,anu,3)

(2,sai,4)



grunt> a = load '/data1' using PigStorage(',') as (name:chararray, age:int);

grunt> b = foreach a generate age + 5;

grunt> dump b;

(8)

(11)

(14)

Let us see Explain plan of any relation.

grunt> describe a;

a: {name: chararray,age: int}

grunt> explain a;


#-----------------------------------------------

# New Logical Plan:

#-----------------------------------------------

a: (Name: LOStore Schema: name#49:chararray,age#50:int)

|

|---a: (Name: LOForEach Schema: name#49:chararray,age#50:int)

            |   |

            |   (Name: LOGenerate[false,false] Schema: name#49:chararray,age#50:int)ColumnPrune:OutputUids=[49, 50]ColumnPrune:InputUids=[49, 50]

            |   |   |

            |   |   (Name: Cast Type: chararray Uid: 49)

            |   |   |

            |   |   |---name:(Name: Project Type: bytearray Uid: 49 Input: 0 Column: (*))

            |   |   |

            |   |   (Name: Cast Type: int Uid: 50)

            |   |   |

            |   |   |---age:(Name: Project Type: bytearray Uid: 50 Input: 1 Column: (*))

            |   |

            |   |---(Name: LOInnerLoad[0] Schema: name#49:bytearray)

            |   |

            |   |---(Name: LOInnerLoad[1] Schema: age#50:bytearray)

            |

            |---a: (Name: LOLoad Schema: name#49:bytearray,age#50:bytearray)RequiredFields:null

#-----------------------------------------------

# Physical Plan:

#-----------------------------------------------

a: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-42

|

|---a: New For Each(false,false)[bag] - scope-41

            |   |

            |   Cast[chararray] - scope-36

            |   |

            |   |---Project[bytearray][0] - scope-35

            |   |

            |   Cast[int] - scope-39

            |   |

            |   |---Project[bytearray][1] - scope-38

            |

            |---a: Load(/data1:PigStorage(',')) - scope-34


#--------------------------------------------------

# Map Reduce Plan                                   

#--------------------------------------------------

MapReduce node scope-43

Map Plan

a: Store(fakefile:org.apache.pig.builtin.PigStorage) - scope-42

|

|---a: New For Each(false,false)[bag] - scope-41

            |   |

            |   Cast[chararray] - scope-36

            |   |

            |   |---Project[bytearray][0] - scope-35

            |   |

            |   Cast[int] - scope-39

            |   |

            |   |---Project[bytearray][1] - scope-38

            |

            |---a: Load(/data1:PigStorage(',')) - scope-34--------

Global sort: false

----------------

Now let us just execute Pig using script commands.

First we need to write a pig script. This script shows that the data is loaded from ‘/data1’ directory.

hdadmin@ubuntu:~/pig-0.12.0-cdh5.3.2$ cat id.pig

a = load '/data1' as (name:chararray, age:int);

dump a;

Now execute the pig script.

hdadmin@ubuntu:~/pig-0.12.0-cdh5.3.2$ bin/pig id.pig

(siva,3,)

(sai,6,)

(rahul,9,)

Now we are passing input parameter to the pig script using ‘-param’ parameter.

hdadmin@ubuntu:~/pig-0.12.0-cdh5.3.2$ cat id.pig

a = load '$data' as (name:chararray, age:int);

dump a;

hdadmin@ubuntu:~/pig-0.12.0-cdh5.3.2$ bin/pig -param data=/data1 id.pig

(siva,3,)

(sai,6,)

(rahul,9,)

We can use Linux commands and HDFS command directly in grunt like,

grunt> ls                  

hdfs://localhost:9000/user/hdadmin/output    <dir>

grunt> pwd

hdfs://localhost:9000/user/hdadmin

grunt> cd /data1

grunt> ls

hdfs://localhost:9000/data1/input.pig<r 1>    657