Skip to content
This repository has been archived by the owner on Mar 30, 2021. It is now read-only.

Simple Example

hbutani edited this page Jan 15, 2016 · 14 revisions

Quick Start has updated instructions on setting up an environment for TPCH flattened and star schema datasets. This page is still useful to get an idea of the high level flow and architecture.

Here we walk you through the rewrite of a basic SQL aggregation query. The query is on the TPCH dataset. This is to give you a sense of the various components involved and the DDL required to link the raw data with the Druid Index.

Setup

The setup for this example requires denormalizing the TPCH dataset, and creating a Druid index on this dataset.

Generate TPCH data

We choose the TPCH dataset because of its familiarity and set of 22 queries, which represent a good starting set. In order to build the Druid index, TPCH data needs to be denormalized. We build a simple load tool to generate a denormalized fact table from output of the TPCHGen program.

Load Druid Index

We build a Druid Index of this fact data. We used the following index configuration. Instructions on creating a quick index (for datascale1) can be found here.

The Overall Picture

The assumptions are:

  • 'Fact' data is landing in hdfs/s3, and a Druid Index is kept upto date.
  • The raw 'Fact' data is accessible from Spark.

The Spark-Druid DataSource and Rewrite Rules than provide the capability of rewriting eligible SQL queries against the raw data into Druid Queries.

Overall Picture

Setting up DataSources in Spark

For this example, we used the Spark-CSV package to load the raw fact table into Spark. We used our Spark-Datetime package for dateTime expressions. We ran a spark-shell with the following settings

export SPARK_SUBMIT_OPTS="-Xmx8g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
bin/spark-shell --executor-memory 4G --properties-file  /Users/hbutani/sparkline/spark-csv/src/test/scala/com/databricks/spark/csv/tpch/sparkshell/spark.properties --packages com.databricks:spark-csv_2.10:1.1.0 --jars /Users/hbutani/sparkline/spark-datetime/target/scala-2.10/spark-datetime-assembly-0.0.1.jar,/Users/hbutani/sparkline/spark-druid-olap/target/scala-2.10/spark-druid-olap-assembly-0.0.1.jar
  • pass the spark-csv, spark-dateTime and spark-druid jars
  • we use this properties file

Setup the DruidPlanner

Add the Druid DataSource rewrite rules as extraStrategies to the sqlContext

import org.apache.spark.sql.sources.druid.DruidPlanner
DruidPlanner(sqlContext)

Define the Base Fact Table

sql(
      s"""CREATE TEMPORARY TABLE orderLineItemPartSupplierBase(o_orderkey integer, o_custkey integer, 
      o_orderstatus string, o_totalprice double, o_orderdate string, o_orderpriority string, o_clerk string, 
      o_shippriority integer, o_comment string, l_partkey integer, l_suppkey integer, l_linenumber integer, 
      l_quantity double, l_extendedprice double, l_discount double, l_tax double, l_returnflag string, 
      l_linestatus string, l_shipdate string, l_commitdate string, l_receiptdate string, l_shipinstruct string, 
      l_shipmode string, l_comment string, order_year string, ps_partkey integer, ps_suppkey integer, 
      ps_availqty integer, ps_supplycost double, ps_comment string, s_name string, s_address string, 
      s_phone string, s_acctbal double, s_comment string, s_nation string, s_region string, p_name string, 
      p_mfgr string, p_brand string, p_type string, p_size integer, p_container string, p_retailprice double, 
      p_comment string, c_name string , c_address string , c_phone string , c_acctbal double , 
      c_mktsegment string , c_comment string , c_nation string , c_region string)
USING com.databricks.spark.csv
OPTIONS (path "/Users/hbutani/tpch/datascale1/orderLineItemPartSupplierCustomer.sample/", header "false", delimiter "|")"""
    )

var df = sqlContext.table("orderLineItemPartSupplierBase")
df = df.coalesce(5)
df.registerTempTable("orderLineItemPartSupplier")
df.cache
  • use the 'com.databricks.spark.csv' DataSource to setup a DataFrame against the raw data.
  • we then coalesce this into 5 partitions and cache the data. So queries against this table will be against in memory data.

Define the Druid connected Fact Table

sql(s"""CREATE TEMPORARY TABLE orderLineItemPartSupplier
      USING org.sparklinedata.druid
      OPTIONS (sourceDataframe "orderLineItemPartSupplierBase",
      timeDimensionColumn "l_shipdate",
      druidDatasource "tpch",
      druidHost "localhost",
      druidPort "8082",
      columnMapping '{  "l_quantity" : "sum_l_quantity", "ps_availqty" : "sum_ps_availqty" }     ',
      functionalDependencies '[   {"col1" : "c_name", "col2" : "c_address", "type" : "1-1"},   {"col1" : "c_phone", "col2" : "c_address", "type" : "1-1"},   {"col1" : "c_name", "col2" : "c_mktsegment", "type" : "n-1"},   {"col1" : "c_name", "col2" : "c_comment", "type" : "1-1"},   {"col1" : "c_name", "col2" : "c_nation", "type" : "n-1"},   {"col1" : "c_nation", "col2" : "c_region", "type" : "n-1"} ]     ')
""")

The_orderLineItemPartSupplierOLAP_ table connects the orderLineItemPartSupplierBase table with the Druid "tpch" datasource. It is based on the org.sparklinedata.druid DataSource, that can be configured with the following settings:

Name Description
sourceDataframe The DataFrame with the raw data
timeDimensionColumn The column that represents time in the Druid Index
druidDatasource the dataSource in druid
druidHost/Port connect info to the Druid broker
columnMapping mapping of names from the raw schema to Druid
functionalDependencies this for future use

The Sample Query

We are finally ready to run queries. Consider the following aggregation query:

val q1 = sql("""select l_returnflag, l_linestatus, count(*), sum(l_extendedprice) as s, max(ps_supplycost) as m, 
       avg(ps_availqty) as a,count(distinct o_orderkey)  
       from orderLineItemPartSupplierBase 
       group by l_returnflag, l_linestatus""")

The Logical Plan for this query is:

Aggregate [l_returnflag#16,l_linestatus#17], [l_returnflag#16,l_linestatus#17,COUNT(1) AS _c2#644L,SUM(l_extendedprice#13) AS s#641,MAX(ps_supplycost#28) AS m#642,AVG(CAST(ps_availqty#27, LongType)) AS a#643,COUNT(DISTINCT o_orderkey#0) AS _c6#645L]
 Project [l_returnflag#16,l_linestatus#17,ps_availqty#27,ps_supplycost#28,l_extendedprice#13,o_orderkey#0]
  InMemoryRelation [o_orderkey#0,o_custkey#1....

and the Physical Plan is:

Aggregate false, [l_returnflag#16,l_linestatus#17], [l_returnflag#16,l_linestatus#17,Coalesce(SUM(PartialCount#1817L),0) AS _c2#639L,CombineSum(PartialSum#1818) AS s#636,MAX(PartialMax#1819) AS m#637,(CAST(SUM(PartialSum#1820L), DoubleType) / CAST(SUM(PartialCount#1821L), DoubleType)) AS a#638,CombineAndCount(partialSets#1822) AS _c6#640L]
 Aggregate true, [l_returnflag#16,l_linestatus#17], [l_returnflag#16,l_linestatus#17,COUNT(CAST(ps_availqty#27, LongType)) AS PartialCount#1821L,SUM(CAST(ps_availqty#27, LongType)) AS PartialSum#1820L,MAX(ps_supplycost#28) AS PartialMax#1819,SUM(l_extendedprice#13) AS PartialSum#1818,AddToHashSet(o_orderkey#0) AS partialSets#1822,COUNT(1) AS PartialCount#1817L]
  InMemoryColumnarTableScan [l_returnfla...

This is executed on Spark by reading the in memory relation and performing an aggregation.

Running a similar query against the orderLineItemPartSupplierOLAP table has the same Logical Plan.

val q1OLAP = sql("""select l_returnflag, l_linestatus, count(*), sum(l_extendedprice) as s, max(ps_supplycost) as m, 
       avg(ps_availqty) as a,count(distinct o_orderkey)  
       from orderLineItemPartSupplier
       group by l_returnflag, l_linestatus""")

But the following Physical Plan. The Aggregation is handled as a DruidQuery.

Project [l_returnflag#69,l_linestatus#70,alias-1#112L AS c2#109L,alias-2#111 AS s#106,alias-3#115 AS m#107,alias-4#113 AS a#108,alias-7#114L AS c6#110L]
 PhysicalRDD [alias-2#111,alias-3#115,alias-7#114L,alias-4#113,l_returnflag#69,l_linestatus#70,alias-1#112L], DruidRDD[2] at RDD at DruidRDD.scala:16

The query sent to Druid is:

{
  "jsonClass" : "GroupByQuerySpec",
  "queryType" : "groupBy",
  "dataSource" : "tpch",
  "dimensions" : [ {
    "jsonClass" : "DefaultDimensionSpec",
    "type" : "default",
    "dimension" : "l_returnflag",
    "outputName" : "l_returnflag"
  }, {
    "jsonClass" : "DefaultDimensionSpec",
    "type" : "default",
    "dimension" : "l_linestatus",
    "outputName" : "l_linestatus"
  } ],
  "granularity" : "all",
  "aggregations" : [ {
    "jsonClass" : "FunctionAggregationSpec",
    "type" : "count",
    "name" : "alias-1",
    "fieldName" : "count"
  }, {
    "jsonClass" : "FunctionAggregationSpec",
    "type" : "doubleSum",
    "name" : "alias-2",
    "fieldName" : "l_extendedprice"
  }, {
    "jsonClass" : "FunctionAggregationSpec",
    "type" : "doubleMax",
    "name" : "alias-3",
    "fieldName" : "ps_supplycost"
  }, {
    "jsonClass" : "FunctionAggregationSpec",
    "type" : "longSum",
    "name" : "alias-5",
    "fieldName" : "sum_ps_availqty"
  }, {
    "jsonClass" : "FunctionAggregationSpec",
    "type" : "count",
    "name" : "alias-6",
    "fieldName" : "count"
  }, {
    "jsonClass" : "CardinalityAggregationSpec",
    "type" : "cardinality",
    "name" : "alias-7",
    "fieldNames" : [ "o_orderkey" ],
    "byRow" : true
  } ],
  "postAggregations" : [ {
    "jsonClass" : "ArithmeticPostAggregationSpec",
    "type" : "arithmetic",
    "name" : "alias-4",
    "fn" : "/",
    "fields" : [ {
      "jsonClass" : "FieldAccessPostAggregationSpec",
      "type" : "fieldAccess",
      "fieldName" : "alias-5"
    }, {
      "jsonClass" : "FieldAccessPostAggregationSpec",
      "type" : "fieldAccess",
      "fieldName" : "alias-6"
    } ]
  } ],
  "intervals" : [ "1992-12-31T16:00:00.000-08:00/1997-12-31T16:00:00.000-08:00" ]
}
Clone this wiki locally