Skip to content
Quantisan edited this page Oct 16, 2012 · 21 revisions

Cascalog for the Impatient, Part 5

In our fourth installment of this series we showed how to use a join on two generators in combination with a filter to perform “stop words” filtering at scale in a Cascalog application. If you haven’t read that yet, it’s probably best to start there.

Today's lesson builds on that same Word Count app and now implements TF-IDF in Cascalog. We’ll show how to chain queries to calculate the TF-IDF weights. We also continue to show best practices for workflow orchestration and test-driven development (TDD) at scale.

Theory

Fortunately, most all of the data required to calculate TF-IDF weight was already available in our Word Count example in Part 4. However, we’ll need to revise the overall workflow, adding more pipe assemblies to it.

TF-IDF calculates a metric for each token which indicates how “important” that token is to a document within the context of a collection of documents. The metric is calculated based on relative frequencies. On one hand, tokens which appear in most documents tend to have very low TF-IDF weights. On the other hand, tokens which are less common but appear multiple times in a few documents tend to have very high TF-IDF weights. Consequently, the TF-IDF algorithm gets used to drive the indexing in some text search engines, such as Apache Lucene. In particular, TF-IDF provides an effective way to rank documents for a search query. For a good discussion of this in gory detail, see the Similarity class in Lucene.

Note that in the literature, token and term may be used interchangeably for this sample app. More advanced text analytics might look at sequences of words, in which case a term becomes a more complex structure. However, we’re only looking at single words.

We’ll need the following components to calculate TF-IDF:

  • term count: number of times a given term appears in a given document
  • document frequency: how frequently a given term appears across all documents
  • number of terms: total number of terms in a given document
  • document count: total number of documents

Slight modifications to Word Count provides the means to get both term count and document frequency, along with the other two components which get calculated almost as by-products. In this sense, we get to leverage Cascalog by re-using the results of some functions within our workflow. A conceptual diagram for this implementation of TF-IDF in Cascading is shown as:

Conceptual Workflow Diagram

Source

Download source for this example on GitHub. For quick reference, a log and output for this example are listed in a gist.

First, let's decompose the word count query from Part 4 into re-useable chunks.

(defn etl-docs-gen [rain stop]
  (<- [?doc-id ?word]
      (rain ?doc-id ?line)
      (split ?line :> ?word-dirty)
      ((c/comp s/trim s/lower-case) ?word-dirty :> ?word)
      (stop ?word :> false)))

(defn word-count [src]
  "simple word count across all documents"
  (<- [?word ?count]
      (src _ ?word)
      (c/count ?count)))

Notice that etl-docs-gen is just for cleaning the data, which we can re-use later.

We'll be able to execute word-count like so.

(defn -main [in out stop tfidf & args]
  (let [rain (hfs-delimited in :skip-header? true)
        stop (hfs-delimited stop :skip-header? true)]
    (?- (hfs-delimited out)
        (word-count (etl-docs-gen rain stop)))))

Next let's look at the TF-IDF algorithm to figure out what we need. Here's the mathematical formula. TF-IDF = TF * IDF = TF * [log(D / (1 + DF))] And in Clojure:

(defn tf-idf-formula [tf-count df-count d-count]
  (->> (+ 1.0 df-count)
    (div d-count)
    (Math/log)
    (* tf-count)))

So we need D, DF, and TF, which are merely a series of counts.

Starting with counting the number of documents, D.

(defn D [src]
  (let [src  (select-fields src ["?doc-id"])]
    (<- [?n-docs]
        (src ?doc-id)
        (c/distinct-count ?doc-id :> ?n-docs))))

We use select-fields to pick only the ?doc-id from the source. Then we perform a distinct-count on ?doc-id to count the number of distinct documents in the dataset.

Similarly, for counting document frequencies, which is the number of documents a term appears in across all documents. This is a step towards getting the inverse document frequency.

(defn DF [src]
  (<- [?df-word ?df-count]
      (src ?doc-id ?df-word)
      (c/distinct-count ?doc-id ?df-word :> ?df-count)))

Since we want the number of documents a term appears in, we use distinct-count by both ?doc-id and ?df-word such that multiple occurrences of a word in the same document do not get double counted.

Last of the count is term frequency, TF, which is just a word count of how many times a word appears in each document.

(defn TF [src]
  (<- [?doc-id ?tf-word ?tf-count]
      (src ?doc-id ?tf-word)
      (c/count ?tf-count)))

Now that we have all the components needed to calculate TF-IDF weights. We can make use of the declarative join in Cascalog and plug the fields into the regular Clojure function tf-idf-formula directly.

(defn TF-IDF [src]
  (let [n-doc (first (flatten (??- (D src))))]
    (<- [?doc-id ?tf-idf ?tf-word]
        ((TF src) ?doc-id ?tf-word ?tf-count)
        ((DF src) ?tf-word ?df-count)
        (tf-idf-formula ?tf-count ?df-count n-doc :> ?tf-idf))))

Convenient, isn't it?

Note that since D is just a single value, we are caching its result in memory using a special query executor, ??-. D query only need to be run once with its result passed as a Clojure sequence (instead of a tuple) into the TF-IDF calculation. Otherwise another join would be needed to merge tuples of D with TF and DF for tf-idf-formula. Doing less is good.

Now we can get back to the remainder of the workflow by adding TF-IDF into main. We’ll keep the actual Word Count metrics, since those are useful for testing:

(defn -main [in out stop tfidf & args]
  (let [rain (hfs-delimited in :skip-header? true)
        stop (hfs-delimited stop :skip-header? true)
        src  (etl-docs-gen rain stop)]
    (?- (hfs-delimited tfidf)
        (TF-IDF src))
    (?- (hfs-delimited out)
        (word-count src))))

Here we have annotated the dot diagram from the Cascading for the Impatient exercise to show where the mapper and reducer phases are running, and also the sections which were added since Part 4. This diagram is quite different for this Cascalog example but would still serve as an illustration of the underlying Cascading flow.

TF-IDF Flow Diagram

Build

To build the sample app from the command line use:

lein uberjar 

What you should have at this point is a JAR file which is nearly ready to drop into your Maven repo — almost. Actually, we provide a community jar repository for Cascading libraries and extensions at http://conjars.org

Run

Before running this sample app, you’ll need to have a supported release of Apache Hadoop installed. Here’s what was used to develop and test our example code:

$ hadoop version
Hadoop 1.0.3

Be sure to clear the output directory (Apache Hadoop insists, if you're running in standalone mode) and run the app:

rm -rf output
hadoop jar ./target/impatient.jar data/rain.txt output/wc data/en.stop output/tfidf

Output text gets stored in the partition file output/tfidf which you can then verify:

more output/tfidf/part-00000

BTW, did you notice what the TF-IDF weights for the tokens rain and shadow were? Those represent what the documents have in common. How do those compare with weights for the other tokens? Conversely, consider the weight for australia (high weight) or area (different weights).

Here’s a log file from our run of the sample app, part 5. If your run looks terribly different, something is probably not set up correctly. Drop us a line on the cascalog-user email forum.

Stay tuned for the next installments of our Cascalog for the Impatient series.

Clone this wiki locally