Skip to content
This repository has been archived by the owner on Jan 29, 2022. It is now read-only.

Spark Usage

Luke Lovett edited this page Jul 27, 2015 · 24 revisions

This page describes how to use the MongoDB Hadoop Connector with Spark.

Installation

  1. Obtain the MongoDB Hadoop Connector. You can either build it or download the jars. The releases page also includes instructions for use with Maven and Gradle. For Spark, all you need is the "core" jar.
  2. Get a JAR for the MongoDB Java Driver.

These are the only two depencies for building a project using Spark and MongoDB.

Basic Usage

See example code in Java, Scala or Python (warning: PySpark support is still experimental!).

These examples go through the basics of setting up your Spark project to use MongoDB as a source or a sink. At a high level, here's what we're going to do:

  1. Create a new Configuration so we can set options on the MongoDB Hadoop Connector.
  2. Create a new RDD by calling the newAPIHadoopRDD method on a SparkContext object, passing in the Configuration and the InputFormat class we want to use, based on whether we want to read from a live cluster or a BSON snapshot.
  3. When we're ready to save data back into MongoDB or a BSON file, we'll call the saveAsNewAPIHadoopFile method on the RDD with the OutputFormat class we want.
Java Example
// Set configuration options for the MongoDB Hadoop Connector.
Configuration mongodbConfig = new Configuration();
// MongoInputFormat allows us to read from a live MongoDB instance.
// We could also use BSONFileInputFormat to read BSON snapshots.
mongodbConfig.set("mongo.job.input.format",
                  "com.mongodb.hadoop.MongoInputFormat");
// MongoDB connection string naming a collection to use.
// If using BSON, use "mapred.input.dir" to configure the directory
// where BSON files are located instead.
mongodbConfig.set("mongo.input.uri",
                  "mongodb://localhost:27017/db.collection");

// Create an RDD backed by the MongoDB collection.
JavaPairRDD<Object, BSONObject> documents = sc.newAPIHadoopRDD(
    mongodbConfig,            // Configuration
    MongoInputFormat.class,   // InputFormat: read from a live cluster.
    Object.class,             // Key class
    BSONObject.class          // Value class
);

// Create a separate Configuration for saving data back to MongoDB.
Configuration outputConfig = new Configuration();
outputConfig.set("mongo.output.format",
                 "com.mongodb.hadoop.MongoOutputFormat");
outputConfig.set("mongo.output.uri",
                 "mongodb://localhost:27017/output.collection");

// Save this RDD as a Hadoop "file".
// The path argument is unused; all documents will go to 'mongo.output.uri'.
documents.saveAsNewAPIHadoopFile(
    "file:///this-is-completely-unused",
    Object.class,
    BSONObject.class,
    MongoOutputFormat.class,
    outputConfig
);

// We can also save this back to a BSON file.
Configuration bsonOutputConfig = new Configuration();
bsonOutputConfig.set(
    "mongo.job.output.format",
    "com.mongodb.hadoop.BSONFileOutputFormat"
);
documents.saveAsNewAPIHadoopFile(
    "hdfs://localhost:8020/user/spark/bson-demo",
    Object.class,
    BSONObject.class,
    BSONFileOutputFormat.class,
    bsonOutputConfig
);
Scala Example
import org.apache.hadoop.conf.Configuration
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.rdd.RDD

import org.bson.BSONObject
import com.mongodb.hadoop.{
  MongoInputFormat, MongoOutputFormat,
  BSONFileInputFormat, BSONFileOutputFormat}

object SparkExample extends App {
  // Set up the configuration for reading from MongoDB.
  val mongoConfig = new Configuration()
  // MongoInputFormat allows us to read from a live MongoDB instance.
  // We could also use BSONFileInputFormat to read BSON snapshots.
  mongoConfig.set("mongo.job.input.format",
    "com.mongodb.hadoop.MongoInputFormat")
  // MongoDB connection string naming a collection to read.
  // If using BSON, use "mapred.input.dir" to configure the directory
  // where the BSON files are located instead.
  mongoConfig.set("mongo.input.uri",
    "mongodb://localhost:27017/db.collection")

  val sparkConf = new SparkConf()
  val sc = new SparkContext("local", "SparkExample", sparkConf)

  // Create an RDD backed by the MongoDB collection.
  val documents = sc.newAPIHadoopRDD(
    mongoConfig,                // Configuration
    classOf[MongoInputFormat],  // InputFormat
    classOf[Object],            // Key type
    classOf[BSONObject])        // Value type

  // Create a separate Configuration for saving data back to MongoDB.
  val outputConfig = new Configuration()
  outputConfig.set("mongo.output.format",
    "com.mongodb.hadoop.MongoOutputFormat")
  outputConfig.set("mongo.output.uri",
    "mongodb://localhost:27017/output.collection")

  // Save this RDD as a Hadoop "file".
  // The path argument is unused; all documents will go to "mongo.output.uri".
  documents.saveAsNewAPIHadoopFile(
    "file:///this-is-completely-unused",
    classOf[Object],
    classOf[BSONObject],
    classOf[MongoOutputFormat[Object, BSONObject]],
    outputConfig)

  // We can also save this back to a BSON file.
  val bsonOutputConfig = new Configuration()
  bsonOutputConfig.set("mongo.job.output.format",
    "com.mongodb.hadoop.BSONFileOutputFormat")
  documents.saveAsNewAPIHadoopFile(
    "hdfs://localhost:8020/user/spark/bson-demo",
    classOf[Object],
    classOf[BSONObject],
    classOf[BSONFileOutputFormat[Object, BSONObject]],
    bsonOutputConfig)
}
Python Example
from pyspark import SparkContext, SparkConf


def main():
    conf = SparkConf().setAppName("pyspark test")
    sc = SparkContext(conf=conf)

    # Create an RDD backed by the MongoDB collection.
    # MongoInputFormat allows us to read from a live MongoDB instance.
    # We could also use BSONFileInputFormat to read BSON snapshots.
    rdd = sc.newAPIHadoopRDD(
        inputFormatClass='com.mongodb.hadoop.MongoInputFormat',
        keyClass='org.apache.hadoop.io.Text',
        valueClass='org.apache.hadoop.io.MapWritable',
        conf={
            'mongo.input.uri': 'mongodb://localhost:27017/db.collection'
        }
    )

    # Save this RDD as a Hadoop "file".
    # The path argument is unused; all documents will go to "mongo.output.uri".
    rdd.saveAsNewAPIHadoopFile(
        path='file:///this-is-unused',
        outputFormatClass='com.mongodb.hadoop.MongoOutputFormat',
        keyClass='org.apache.hadoop.io.Text',
        valueClass='org.apache.hadoop.io.MapWritable',
        conf={
            'mongo.output.uri': 'mongodb://localhost:27017/output.collection'
        }
    )

    # We can also save this back to a BSON file.
    rdd.saveAsNewAPIHadoopFile(
        path='hdfs://localhost:8020/user/spark/bson-demo',
        outputFormatClass='com.mongodb.hadoop.BSONFileOutputFormat',
        keyClass='org.apache.hadoop.io.Text',
        valueClass='org.apache.hadoop.io.MapWritable'
    )


if __name__ == '__main__':
    main()
Clone this wiki locally