Skip to content
Harvey Feng edited this page Mar 6, 2014 · 28 revisions

Shark is a large-scale data warehouse system for Spark designed to be compatible with Apache Hive. It can answer Hive QL queries up to 30 times faster than Hive without modification to the existing data nor queries. Shark supports Hive's query language, metastore, serialization formats, and user-defined functions.

The Shark Query Language

Shark supports a subset of SQL nearly identical to that implemented by Hive. This guide assumes you have some familiarity with Hive, and focuses on the extra functionality included in Shark. Those who need a refresher can refer to the Hive Documentation

Unlike Hive, Shark allows users to exploit this temporal locality by caching their working set of data, or in database terms, to create in-memory materialized views. Common data types can be cached in a columnar format (as Java primitives arrays), which is very efficient for storage and garbage collection, yet provides maximum performance (orders of magnitude faster than reading data from disk).

To create a cached table from the rows (or subset of rows) of an existing table, set the shark.cache table property:

CREATE TABLE ... TBLPROPERTIES ("shark.cache" = "true") AS SELECT ...

We also extend HiveQL to provide a shortcut for this syntax. Simply append _cached to the table name when using CREATE TABLE AS SELECT, and that table will be cached in memory. To disable this shortcut, see the configuration options section. Below is an example:

CREATE TABLE logs_last_month_cached AS
SELECT * FROM logs WHERE time > date(...);

Once this table has been created, we can query it like any other Hive table.

SELECT count(*) from logs_last_month_cached;

Note that queries which shuffle data require you to set the number of reducers:

set mapred.reduce.tasks=[num_tasks];
SELECT page, count(*) c FROM logs_last_month_cached
GROUP BY page ORDER BY c DESC LIMIT 10;

In addition to caching, Shark employs a number of optimization techniques such as limit push downs and hash-based shuffle, which can provide significant speedups in query processing. You can directly inspect the execution plan for a given query using the explain statement:

explain SELECT * FROM logs_last_month ORDER BY timestamp LIMIT 10;

From v0.8.1 onwards

Hive-partitioned tables may be cached using the same language semantics for creating regular cached tables. E.g,

CREATE TABLE srcpart_cached (key int, value string) PARTITIONED BY (keypart int);

Since partitioned tables can be very large, Shark allows users to define partition-level cache eviction policies by providing a classname value for the shark.cache.policy table property. For example, a table with the shark.cache.policy property set to shark.memstore2.LRUCachePolicy and the shark.cache.policy.maxSize property set to 10 will only contain 10 partitions in memory at a time and follow LRU semantics when dropping partitions from memory.

The default policy is to cache all partitions (CacheAllPolicy). In addition to the LRUCachePolicy, Shark includes a FIFOCachePolicy. Users may define custom behavior by extending the shark.memstore2.CachePolicy trait and passing the classname of the implementation as the value for shark.cache.policy. The main methods to implement are:

trait CachePolicy[K, V] {
   ...

  def notifyPut(key: K, value: V): Unit

  def notifyRemove(key: K): Unit

  def notifyGet(key: K): Unit

   ...
}

Executing Queries

Shark CLI

The easiest way to run Shark is to start a Shark Command Line Client (CLI) and being executing queries. The Shark CLI connects directly to the Hive Metastore, so it is compatible with existing Hive deployments. Shark executables are available in the bin/ directory. To start the Shark CLI, simply run:

$ ./bin/shark                            # Start CLI for interactive session
$ ./bin/shark -e "SELECT * FROM foo"     # Run a specific query and exit
$ ./bin/shark -i queries.hql             # Run queries from a file
$ ./bin/shark -H                         # Start CLI and print help

You can enter queries into the CLI directly, or use a flag to pass it a file. The Shark CLI will only work correctly if the HIVE_HOME environment variable is set (see Configuration). Alternative versions of the CLI exist which print out more information: bin/shark-withinfo and bin/shark-withdebug.

Table Persistence

Cached tables are ephemeral - they will fall out of cache once a user's session is terminated. To create long-lived cached tables, use the SharkServer described below.

From v0.8.1 onwards

Cached tables follow a write-through cache policy by default. When a user's Shark session is terminated, any cached table previously created will be automatically recovered by subsequent sessions, as long as they are connected to a shared Hive metastore. To disable automatic table recovery, launch Shark with a skipRddReload option, e.g:

$ ./bin/shark --skipRddReload

For pre-0.8.1 ephemeral caching, create a table with the shark.cache table property set to MEMORY_ONLY.

CREATE TABLE src_memory_only(key int, value string) TBLPROPERTIES('shark.cache' = 'MEMORY_ONLY')

SharkServer

It is also possible to run a persistent SharkServer which retains session state, such as cached tables, across executions of the Shark CLI. This can be very useful for those hoping to keep a few tables in cache for long periods of time.

To start a SharkServer, run

$./bin/shark --service sharkserver <port>

A client can connect to the server using

$./bin/shark -h <server-host> -p <server-port>

Spark Scala Shell With Spark Bindings (Advanced)

Shark provides a simple API for programmers to convert results from SQL queries into a special type of RDDs (Resilient Distributed Datasets). RDD's are a key abstraction in Spark. This lets you integrate SQL query processing with machine learning, and provides a unified system for data analysis using both SQL and sophisticated statistical learning functions. To launch a Spark shell with Spark functionality, run

$./bin/shark-shell
scala> val youngUsers = sc.sql2rdd("SELECT * FROM users WHERE age < 20")
scala> println(youngUsers.count)
...
scala> val featureMatrix = youngUsers.map(extractFeatures(_))
scala> kmeans(featureMatrix)

Gaining familiarity with the Spark API will be helpful if you want to use this feature.

Configuration Options

Shark has two types of configuration options. The first are options that related to a given Shark session. These can be set either in the CLI itself (using set key=val command) or in your existing Hive XML configuration files, which are loaded from HIVE_HOME/conf.

shark.exec.mode     # 'hive' or 'shark', whether to use Hive or Shark based execution
shark.explain.mode  # 'hive' or 'shark', whether to use Hive or Shark based explain
shark.cache.flag.checkTableName # 'true' or 'false', whether to cache tables ending in "_cached"

Advanced users can find the full list of configuration options in src/main/scala/shark/SharkConfVars.scala

The second type of configuration variables are environment vars that must be set for the Shark driver and slaves to run correctly. These are specified in conf/shark-env.sh. A few of the more important ones are described here:

HIVE_HOME     # Path to directory containing patched Hive jars
HIVE_CONF_DIR # Optional, a different path containing Hive configuration files 
SPARK_MEM     # How many much memory to allocate for slaves (e.g '1500m', '5g')