Skip to content

Commit

Permalink
Leverage duckdb views to enable "... FROM table1, table2"
Browse files Browse the repository at this point in the history
  • Loading branch information
exAspArk committed Nov 5, 2024
1 parent 4fc1fc2 commit 48f6db9
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 42 deletions.
27 changes: 0 additions & 27 deletions src/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,33 +243,6 @@ func MakeStatementNode(targetList []*pgQuery.Node) *pgQuery.Node {
}
}

// FROM: iceberg_scan('table_path', allow_moved_paths = true)
func MakeIcebergTableNode(tablePath string) *pgQuery.Node {
return pgQuery.MakeSimpleRangeFunctionNode([]*pgQuery.Node{
pgQuery.MakeListNode([]*pgQuery.Node{
pgQuery.MakeFuncCallNode(
[]*pgQuery.Node{
pgQuery.MakeStrNode("iceberg_scan"),
},
[]*pgQuery.Node{
pgQuery.MakeAConstStrNode(
tablePath,
0,
),
// pgQuery.MakeAExprNode(
// pgQuery.A_Expr_Kind_AEXPR_OP,
// []*pgQuery.Node{pgQuery.MakeStrNode("=")},
// pgQuery.MakeColumnRefNode([]*pgQuery.Node{pgQuery.MakeStrNode("allow_moved_paths")}, 0),
// MakeAConstBoolNode(true),
// 0,
// ),
},
0,
),
}),
})
}

func makeSubselectNode(columns []string, rowsValues [][]string) *pgQuery.Node {
var columnNodes []*pgQuery.Node
for _, column := range columns {
Expand Down
8 changes: 4 additions & 4 deletions src/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

"github.com/jackc/pgx/v5/pgproto3"
"github.com/jackc/pgx/v5/pgtype"
duckDB "github.com/marcboeker/go-duckdb"
duckDb "github.com/marcboeker/go-duckdb"
pgQuery "github.com/pganalyze/pg_query_go/v5"
)

Expand All @@ -27,7 +27,7 @@ type Proxy struct {
func NewProxy(config *Config, duckdb *Duckdb, icebergReader *IcebergReader) *Proxy {
return &Proxy{
duckdb: duckdb,
selectRemapper: &SelectRemapper{config: config, icebergReader: icebergReader},
selectRemapper: &SelectRemapper{config: config, duckdb: duckdb, icebergReader: icebergReader},
config: config,
}
}
Expand Down Expand Up @@ -129,7 +129,7 @@ func (proxy *Proxy) generateDataRow(rows *sql.Rows, cols []*sql.ColumnType) (*pg
var value time.Time
valuePtrs[i] = &value
case "Decimal":
var value duckDB.Decimal
var value duckDb.Decimal
valuePtrs[i] = &value
default:
panic("Unsupported type: " + col.ScanType().Name())
Expand Down Expand Up @@ -159,7 +159,7 @@ func (proxy *Proxy) generateDataRow(rows *sql.Rows, cols []*sql.ColumnType) (*pg
default:
panic("Unsupported type: " + cols[i].DatabaseTypeName())
}
case *duckDB.Decimal:
case *duckDb.Decimal:
float64Value := (*value).Float64()
values = append(values, []byte(fmt.Sprintf("%v", float64Value)))
default:
Expand Down
6 changes: 3 additions & 3 deletions src/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ func TestHandleQuery(t *testing.T) {
"description": {"count"},
"values": {"5"},
},
"SELECT AVG(decimal_value) AS average FROM public.test_table": {
"description": {"average"},
"values": {"7"},
"SELECT AVG(decimal_value) / 2 AS half_average FROM public.test_table": {
"description": {"half_average"},
"values": {"3.5"},
},
}

Expand Down
27 changes: 19 additions & 8 deletions src/select_remapper.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package main

import (
pgQuery "github.com/pganalyze/pg_query_go/v5"
"context"
"strconv"
"strings"

pgQuery "github.com/pganalyze/pg_query_go/v5"
)

const (
Expand All @@ -28,6 +31,7 @@ var REMAPPED_CONSTANT_BY_PG_FUNCTION_NAME = map[string]string{

type SelectRemapper struct {
icebergReader *IcebergReader
duckdb *Duckdb
config *Config
}

Expand Down Expand Up @@ -56,13 +60,18 @@ func (selectRemapper *SelectRemapper) remapSelectStatement(selectStatement *pgQu
if len(selectStatement.FromClause) > 0 && selectStatement.FromClause[0].GetJoinExpr() != nil {
selectStatement = selectRemapper.remapSelect(selectStatement, indentLevel)
selectRemapper.remapJoinExpressions(selectStatement.FromClause[0], indentLevel)
return selectStatement
}

if len(selectStatement.FromClause) > 0 && selectStatement.FromClause[0].GetRangeVar() != nil {
LogDebug(selectRemapper.config, strings.Repeat(">", indentLevel+1)+" SELECT statement")
if len(selectStatement.FromClause) > 0 {
selectStatement = selectRemapper.remapWhere(selectStatement)
selectStatement.FromClause[0] = selectRemapper.remapTable(selectStatement.FromClause[0])
selectStatement = selectRemapper.remapSelect(selectStatement, indentLevel)
for i, fromNode := range selectStatement.FromClause {
if fromNode.GetRangeVar() != nil {
LogDebug(selectRemapper.config, strings.Repeat(">", indentLevel+1)+" SELECT statement #"+strconv.Itoa(i+1))
selectStatement.FromClause[i] = selectRemapper.remapTable(fromNode)
}
}
return selectStatement
}

Expand Down Expand Up @@ -159,15 +168,17 @@ func (selectRemapper *SelectRemapper) remapTable(node *pgQuery.Node) *pgQuery.No
}
}

// FROM iceberg.table => FROM iceberg_scan('iceberg/schema.db/table', allow_moved_paths = true)
// iceberg.table (CREATE VIEW)
schemaTable := SchemaTable{Schema: schemaName, Table: tableName}
if schemaTable.Schema == "" {
schemaTable.Schema = PG_DEFAULT_SCHEMA
}

// TODO: cache created views
icebergPath := selectRemapper.icebergReader.MetadataFilePath(schemaTable)
tableNode := MakeIcebergTableNode(icebergPath)
return selectRemapper.overrideTable(node, tableNode)
query := "CREATE VIEW IF NOT EXISTS " + schemaTable.Schema + "." + schemaTable.Table + " AS SELECT * FROM iceberg_scan('" + icebergPath + "')"
LogDebug(selectRemapper.config, "Querying DuckDB:", query)
selectRemapper.duckdb.Db.ExecContext(context.Background(), query)
return node
}

func (selectRemapper *SelectRemapper) appendWhereCondition(selectStatement *pgQuery.SelectStmt, whereCondition *pgQuery.Node) *pgQuery.SelectStmt {
Expand Down

0 comments on commit 48f6db9

Please sign in to comment.