Apache Pig and Distributed Cache

While writing a Java/Python/etc UDF for Apache Pig often you want to use files or libraries that are not available/installed on the Hadoop system. I previously posted a blog on this topic, but in the context of Hadoop Streaming. In this post I will show how to copy files/libraries (also known as archives) to distributed cache while working with Apache Pig.

Short Summary:
Copy the files/archives using mapred.cache.file or mapred.cache.archives settings to distributed cache and use mapred.create.symlink to create symbolic link in the current directory to files/archives in distributed cache. Hadoop will take care of un-archiving archives.

set mapred.create.symlink yes;
set mapred.cache.file hdfs://localhost:8020/user/ragrawal/cache/data.txt#myfile;
set mapred.cache.archives hdfs://localhost:8020/user/ragrawal/cache/data.tar.gz#datafiles;

Detailed Discussion

There are two aspects of copying files to distributed cache. First is the actual copying of files/archives and second is accessing these files in your Java/Python/Ruby/etc code.

1. Copying Files to Distributed Cache

Depending on whether you want to copy file or an archive, you can use one (or both) of the following settings in Apache Pig.

SET mapred.cache.file PATH_TO_HDFS_OR_S3_LOCATION
SET mapred.cache.archives PATH_TO_HDFS_OR_S3_LOCATION

There are few differences in the above two settings.
1. File Versus Archives: “mapred.cache.files” copies a single file from HDFS to distributed cache. Whereas “mapred.cache.archives” copies an archive (a set of files grouped together either using jar, zip, gz, etc) to distributed cache.
2. Post processing: “mapred.cache.files” simply copies the files to distributed cache. “mapred.cache.archives” copies and extract the content of the archive onto distributed cache. For instance, let’s assume data.tar.gz contains three files. In this case, Hadoop will copy data.tar.gz to distributed cache and will extract the three files to distributed cache. As mentioned over here, archives in the following format are automatically extracted: zip, gz, tar.gz, tar and jar.

2. Accessing the files

mapred.cache.[files|archives] copies and extract files/archives to a directory that is set by mapred.local.dir. However, your python/java UDF might not be running from the same directory. In order to easily access copied files, Hadoop can create symbolic link to these files in the current directory (i.e. directory in which your code is running). However you need to specifically instruct Hadoop to do so by using mapred.create.symlink setting as shown below:

set mapred.create.symlink yes;
set mapred.cache.archives hdfs://localhost:8020/user/ragrawal/cache/data.tar.gz#datafiles;

Note that you not only need to set mapred.create.symlink to yes but also need to tell Hadoop what symbolic name to use for a given file/archive. The symbolic name is appended to the files/archives and is separated by # sign as shown above. Based on the above shown settings, Hadoop will do the following three actions:
1. copy data.tar.gz from hdfs to all mappers/reducers,
2. unarchive data.tar.gz to a folder referred by mapred.local.dir
3. create a symbolic link “datafiles” in the current directory that points to the unarchived data.tar.gz

Below is a simple python code that shows how to access content of datafiles.

import os
import sys
print list(os.listdir("datafiles"))

Few Gotachas

  1. In my local hadoop/pig installation, If I used multiple mapred.cache.archives statement, Pig honored only the last one. For instance, in the below code only the last archive “data3.tar.gz” was copied to the distributed cache.
    set mapred.create.symlink yes;
    set mapred.cache.archives hdfs://localhost:8020/ragrawal/data1.tar.gz#a;
    set mapred.cache.archives hdfs://localhost:8020/ragrawal/data2.tar.gz#b;
    set mapred.cache.archives hdfs://localhost:8020/ragrawal/data3.tar.gz#c;
  2. mapred.cache.files vs mapred.cache.file

12 thoughts on “Apache Pig and Distributed Cache

  1. >> hdfs://localhost:8020
    Are these standard port numbers? What service are they referring to? It doesn’t seem to work on my machine so I wonder what port numbers to use.

  2. Wow! this really tied it up for me. I needed to send a python virtualenv so I can safely run it with streaming.
    I’ve followed this article to do it with hadoop streaming: http://henning.kropponline.de/2014/07/18/virtualenv-hadoop-streaming
    Now, using your 2 lines I was able to run it with pig streaming!

    pig code:
    set mapred.create.symlink yes;
    set mapred.cache.archives hdfs://[host]:8020/user/hk/virtualenv/demoenv.zip#demoenv;
    DEFINE CMD token_mapper.py ship(‘token_mapper.py’);
    A = load ‘/user/hk/hotels’ using PigStorage(‘,’);
    B = stream A through CMD;
    store B into ‘/user/hk/token_out’;

    python code:
    head webcat_svm_classify_only_01.py

    import sys

    Thanks for this, Ido

  3. Great article! This really helped me. However, I’d like to point out a few things:
    * “set mapred.create.symlink yes;” is not required in Hadoop 2 because symlinking is always on (see http://hadoop.apache.org/docs/r2.4.1/hadoop-project-dist/hadoop-common/DeprecatedProperties.html).
    * The property should be “mapred.cache.files” (with an ‘s’) (or use the non-deprecated property “mapreduce.job.cache.files”).
    * To add multiple files, you need to comma-separate your files, NOT call “set mapreduce.job.cache.files” multiple times. Example: “set mapreduce.job.cache.files hdfs:///foo1,hdfs:///foo2;”
    * If you do not need to rename the cache files, there is no need to provide a symlink (the “#linkName” part).

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )


Connecting to %s