Apache Pig and Distributed Cache

While writing a Java/Python/etc UDF for Apache Pig often you want to use files or libraries that are not available/installed on the Hadoop system. I previously posted a blog on this topic, but in the context of Hadoop Streaming. In this post I will show how to copy files/libraries (also known as archives) to distributed cache while working with Apache Pig.

Short Summary:
Copy the files/archives using mapred.cache.file or mapred.cache.archives settings to distributed cache and use mapred.create.symlink to create symbolic link in the current directory to files/archives in distributed cache. Hadoop will take care of un-archiving archives.

set mapred.create.symlink yes;
set mapred.cache.file hdfs://localhost:8020/user/ragrawal/cache/data.txt#myfile;
set mapred.cache.archives hdfs://localhost:8020/user/ragrawal/cache/data.tar.gz#datafiles;

Detailed Discussion

There are two aspects of copying files to distributed cache. First is the actual copying of files/archives and second is accessing these files in your Java/Python/Ruby/etc code.

1. Copying Files to Distributed Cache

Depending on whether you want to copy file or an archive, you can use one (or both) of the following settings in Apache Pig.

SET mapred.cache.file PATH_TO_HDFS_OR_S3_LOCATION
SET mapred.cache.archives PATH_TO_HDFS_OR_S3_LOCATION

There are few differences in the above two settings.
1. File Versus Archives: “mapred.cache.files” copies a single file from HDFS to distributed cache. Whereas “mapred.cache.archives” copies an archive (a set of files grouped together either using jar, zip, gz, etc) to distributed cache.
2. Post processing: “mapred.cache.files” simply copies the files to distributed cache. “mapred.cache.archives” copies and extract the content of the archive onto distributed cache. For instance, let’s assume data.tar.gz contains three files. In this case, Hadoop will copy data.tar.gz to distributed cache and will extract the three files to distributed cache. As mentioned over here, archives in the following format are automatically extracted: zip, gz, tar.gz, tar and jar.

2. Accessing the files

mapred.cache.[files|archives] copies and extract files/archives to a directory that is set by mapred.local.dir. However, your python/java UDF might not be running from the same directory. In order to easily access copied files, Hadoop can create symbolic link to these files in the current directory (i.e. directory in which your code is running). However you need to specifically instruct Hadoop to do so by using mapred.create.symlink setting as shown below:

set mapred.create.symlink yes;
set mapred.cache.archives hdfs://localhost:8020/user/ragrawal/cache/data.tar.gz#datafiles;

Note that you not only need to set mapred.create.symlink to yes but also need to tell Hadoop what symbolic name to use for a given file/archive. The symbolic name is appended to the files/archives and is separated by # sign as shown above. Based on the above shown settings, Hadoop will do the following three actions:
1. copy data.tar.gz from hdfs to all mappers/reducers,
2. unarchive data.tar.gz to a folder referred by mapred.local.dir
3. create a symbolic link “datafiles” in the current directory that points to the unarchived data.tar.gz

Below is a simple python code that shows how to access content of datafiles.

import os
import sys
print list(os.listdir("datafiles"))

Few Gotachas

  1. In my local hadoop/pig installation, If I used multiple mapred.cache.archives statement, Pig honored only the last one. For instance, in the below code only the last archive “data3.tar.gz” was copied to the distributed cache.
    set mapred.create.symlink yes;
    set mapred.cache.archives hdfs://localhost:8020/ragrawal/data1.tar.gz#a;
    set mapred.cache.archives hdfs://localhost:8020/ragrawal/data2.tar.gz#b;
    set mapred.cache.archives hdfs://localhost:8020/ragrawal/data3.tar.gz#c;
  2. mapred.cache.files vs mapred.cache.file