Pig is a platform that works with large data sets for the purpose of analysis. The Pig dialect is called Pig Latin, and the Pig Latin commands get compiled into MapReduce jobs that can be run on a suitable platform, like Hadoop.

This is a simple getting started example that’s based upon “Pig for Beginners”, with what I feel is a bit more useful information.

(This example is also available using Hive.)

The installation and configuration of Hadoop and Pig is beyond the scope of this article. If you’re just getting started, I would highly recommend grabbing one of Cloudera’s pre-built virtual machines that have everything you need.

I’m assuming that you will be running the following steps using the Cloudera VM, logged in as the cloudera user. If your setup is different, adjust accordingly.

Step 1: Preparing the Data

Like “Pig for Beginners”, we’re going to use the Book Crossing Dataset. Download the CSV dump and extract the files. We’re interested in the BX-Books.csv data file.

Type head BX-Books.csv to see the first few lines of the raw data. You’ll notice that’s it’s not really comma-delimited; the delimiter is ‘;‘. There are also some escaped HTML entities we can clean up, and the quotes around all of the values can be removed.

The first line in the file looks like this:

"ISBN";"Book-Title";"Book-Author";"Year-Of-Publication";"Publisher";"Image-URL-S";"Image-URL-M";"Image-URL-L"

This lines defines the data format of the fields in the file. We’ll want to refer back to it later.

Open a terminal and enter:

sed 's/\&/\&/g' BX-Books.csv | sed -e '1d' |sed 's/;/$$$/g' | sed 's/"$$$"/";"/g' | sed 's/"//g' > BX-BooksCorrected.txt

This will:

  1. Replace all & instances with &
  2. Remove the first (header) line
  3. Change all semicolons to $$$
  4. Change all "$$$" instances into ";"
  5. Remove all " characters

Steps 3 and 4 may look strange, but some of the field content may contain semicolons. In this case, they will be converted to $$$, but they will not match the "$$$" pattern, and will not be converted back into semicolons and mess up the import process.

Step 2: Importing the Data

Now that we have some normalized data, we need to add it to the Hadoop file system (HDFS) so that Pig can access it. This can be accomplished from the command line as well as within Pig itself. For this example, we’ll use the command line. In the terminal, type:

hadoop fs -mkdir input
hadoop fs -put /path/to/BX-BooksCorrected.txt input

(Use the correct path to the BX-BooksCorrected.txt file.)

Step 3: Running Pig

There are several ways to run Pig. We’re going to run in MapReduce mode against the local node. MapReduce mode runs against the HDFS, which is why we needed to import the file in the previous step.

Enter pig at the console to start Pig.

Once Pig has started, you’ll see the grunt> command prompt. Let’s verify that our file did get loaded into HDFS.

ls
hdfs://localhost.localdomain:8020/user/cloudera/input	<dir>

cd input
ls
hdfs://localhost.localdomain:8020/user/cloudera/input/BX-BooksCorrected.txt	<dir>

Step 4: Analyzing the Data

Now that we have the data ready, let’s do something with it. The simple example is to see how many books were published per year. We’ll start with that, then see if we can do a bit more.

Load the data into a Pig collection:

books = LOAD '/user/cloudera/input/BX-BooksCorrected.txt' 
>> USING PigStorage(';') AS 
>> (ISBN:chararray, BookTitle:chararray, 
>> BookAuthor:chararray, YearOfPublication:int, 
>> Publisher:chararray);

(Pig Latin commands are terminated with semicolons. If you press Return on a line without terminating it, you’ll get the >> characters, indicating that the command has been continued and not entered yet.)
This line loads the information from HDFS into a Pig collection named books. Pig expects data to be tab-delimited by default, but ours is not; we have to tell it that semicolons are field separators by providing the PigStorage() argument. The AS clause defines how the fields in the file are mapped into Pig data types. You’ll notice that we left off all of the “Image-URL-XXX” fields; we don’t need them for analysis, and Pig will ignore fields that we don’t tell it to load.

If you want to see what the loaded data structure looks like, you can use the DESCRIBE command:

DESCRIBE books;
books: {ISBN: chararray,BookTitle: chararray,BookAuthor: chararray,YearOfPublication: int,Publisher: chararray}

Finding books by year

We’ll start with the simple analysis of how many books were written by year.

Group the collection by year of publication:

groupByYear = GROUP books BY YearOfPublication;
DESCRIBE groupByYear;
groupByYear: {group: int,books: {(ISBN: chararray,BookTitle: chararray,BookAuthor: chararray,YearOfPublication: int,Publisher: chararray)}}

This is a lot like a Java Map. The groupByYear collection contains all unique year values, along with a list holding the full entry of each book that belongs to that year.

Generate book count by year:

countByYear = FOREACH groupByYear 
>> GENERATE group AS YearOfPublication, COUNT($1) AS BookCount;
DESCRIBE countByYear;
countByYear: {YearOfPublication: int,BookCount: int}

This is the meat of the operation. The FOREACH loops over the groupByYear collection, and we GENERATE values. Our output is defined using some values available to us within the FOREACH. We first take group, which is an alias for the grouping value and say to place it in our new collection as an item named YearOfPublication. (This value is also available as $0.) The next value $1 is an alias for the list of book entries for that group. We only care about the number of books, so we use COUNT and give it the name BookCount.

We have the results, but how do we see them? We could store them back into HDFS and extract them that way, or we can use the DUMP command.

DUMP countByYear;

You’ll see a listing of years, along with the number of books for that year. You may notice that some of the values don’t make much sense; there should be no year 0, nor should there be entries for a blank year. We’ll clean those problems up in the next analysis.

More Advanced Analysis

There’s a lot more data in the set beyond years and books counts. What if we wanted to see books published per year by author? Why don’t we go a step farther and group those results by publisher as well?

You should still have your books collection defined if you haven’t exited your Pig session. You can redefined it easily by following the above steps again. Let’s do a little bit of cleanup on the data this time, however.

books = FILTER books BY YearOfPublication > 0;

This will only keep records where we have a positive year of publication value.

Now, we need to create a set of all authors, and all years they wrote books:

pivot = FOREACH (GROUP books BY BookAuthor) 
>> GENERATE group AS BookAuthor, FLATTEN(books.YearOfPublication) AS Year;

Note that I’ve inlined the group creation in the FOREACH statement. It should be obvious that we’re grouping books by author.
This statement also introduces the FLATTEN operation. We know that the GROUP operation creates a collection where each key corresponds to a list of values; FLATTEN “flattens” this list to generate entries for each list value. (See the Pig Latin reference for a more detailed definition.) You may want to DUMP the pivot collection to see how the flattening works.

Create author book count by year:

authorYearGroup = GROUP pivot BY (BookAuthor, Year);
with_count = FOREACH authorYearGroup 
>> GENERATE FLATTEN(group), COUNT(pivot) as count;
DESCRIBE with_count;
with_count: {group::BookAuthor: chararray,group::Year: int, count: long}

This time, we’re grouping on 2 fields. This will find all (author, year) combinations. We then execute a FOREACH over the group; this time, however, we’re using FLATTEN on the group which will expand to BookAuthor and Year. We also grab the count of items to get the number of books for that author/year combination. You can see the resulting data structure in the DESCRIBE results.

Let’s group those results a little better:

author_result = FOREACH (GROUP with_count BY BookAuthor) {
>> order_by_count = ORDER with_count BY count DESC;
>> GENERATE group AS Author, order_by_count.(Year, count) AS Books;
>> };
DESCRIBE author_result;
author_result: {Author: chararray,Books: {(group::Year: int,count: long)}}

We’re running a FOREACH that looks different than the ones we’ve seen to this point. This nested structure lets us perform some extra steps before generation of values. In this case, we’re sorting the book count, then placing the year/count tuples into the collection with the author.

Group the author results by publisher:

pub_auth = FOREACH books GENERATE Publisher, BookAuthor;
distinct_authors = FOREACH (GROUP pub_auth BY Publisher) {
>> da = DISTINCT pub_auth.BookAuthor;
>> GENERATE group AS Publisher, da AS Author;
>> };
distinct_flat = FOREACH distinct_authors GENERATE Publisher, FLATTEN(Author) AS Author;

First, we use a projection to extract only the publisher and author from the books collection. This is a recommended practice as it helps with performance. In this FOREACH, we’re grabbing the distinct authors per publisher. This collection is then flattened into distinct_flat, and we end up with a publisher/author list, similar to the author/year collection previously.

Join the results so far:

joined = JOIN distinct_flat BY Author, author_result BY Author;
filtered = FOREACH joined GENERATE 
>> distinct_flat::Publisher AS Publisher, 
>> distinct_flat::Author AS Author, 
>> author_result::Books AS Books;

Here we’re joining the author/year results with the publisher/author results. The “::” syntax may look a little different than what we’ve seen before; it’s a dereference operator, and it’s used when you need to unambiguously declare a column. Try DESCRIBE on joined and filtered to see the structure difference.

Generate the final results:

result = FOREACH (GROUP filtered BY Publisher) {
>> order_by_pub = ORDER filtered BY Publisher ASC;
>> GENERATE group AS Publisher, order_by_pub.(Author, Books);
};

This should be familiar by now. We sort publishers, then generate a collection of publishers/authors/books.

At last! We have our results. You can DUMP them, but let’s also save the results back into HDFS.

STORE result INTO '/user/cloudera/publisher_books';

Now you can exit Pig with “quit”.

Let’s get a file listing:

hadoop fs -ls publisher_books

Hadoop stores publisher_books as a directory with associated files. Our actual data is in part-r-00000

hadoop fs -cat publisher_books/part-r-00000

(Want to compare the Pig steps with Hive? Here’s the same example using Hive.)

Posted in Development with : Development, Hadoop, Pig