Skip to content

Commit

Permalink
Merge pull request #211 from modelorona/clickhouse
Browse files Browse the repository at this point in the history
add ClickHouse support
  • Loading branch information
hkdeman authored Dec 2, 2024
2 parents 74e806d + d302fef commit 899f687
Show file tree
Hide file tree
Showing 18 changed files with 1,100 additions and 27 deletions.
15 changes: 12 additions & 3 deletions core/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ toolchain go1.22.7

require (
github.com/99designs/gqlgen v0.17.56
github.com/ClickHouse/clickhouse-go/v2 v2.30.0
github.com/elastic/go-elasticsearch/v8 v8.16.0
github.com/go-chi/chi/v5 v5.1.0
github.com/go-chi/cors v1.2.1
Expand All @@ -23,13 +24,17 @@ require (
)

require (
github.com/ClickHouse/ch-go v0.61.5 // indirect
github.com/agnivade/levenshtein v1.1.1 // indirect
github.com/andybalholm/brotli v1.1.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.5 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/elastic/elastic-transport-go/v8 v8.6.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/go-faster/city v1.0.1 // indirect
github.com/go-faster/errors v0.7.1 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-sql-driver/mysql v1.7.0 // indirect
Expand All @@ -47,8 +52,12 @@ require (
github.com/klauspost/compress v1.17.7 // indirect
github.com/mattn/go-sqlite3 v1.14.22 // indirect
github.com/montanaflynn/stats v0.7.1 // indirect
github.com/paulmach/orb v0.11.1 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/samber/lo v1.47.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/shopspring/decimal v1.4.0 // indirect
github.com/sosodev/duration v1.3.1 // indirect
github.com/urfave/cli/v2 v2.27.5 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
Expand All @@ -63,11 +72,11 @@ require (
go.opentelemetry.io/otel/sdk v1.30.0 // indirect
go.opentelemetry.io/otel/trace v1.30.0 // indirect
go.opentelemetry.io/proto/otlp v1.3.1 // indirect
golang.org/x/crypto v0.27.0 // indirect
golang.org/x/crypto v0.28.0 // indirect
golang.org/x/mod v0.21.0 // indirect
golang.org/x/net v0.29.0 // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/sys v0.25.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/text v0.19.0 // indirect
golang.org/x/tools v0.25.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240930140551-af27646dc61f // indirect
Expand Down
77 changes: 71 additions & 6 deletions core/go.sum

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion core/graph/schema.resolvers.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/src/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const (
DatabaseType_MongoDB = "MongoDB"
DatabaseType_Redis = "Redis"
DatabaseType_ElasticSearch = "ElasticSearch"
DatabaseType_ClickHouse = "ClickHouse"
)

type Engine struct {
Expand Down
105 changes: 105 additions & 0 deletions core/src/plugins/clickhouse/add.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package clickhouse

import (
"context"
"fmt"
"strings"

"github.com/clidey/whodb/core/src/engine"
)

func (p *ClickHousePlugin) AddStorageUnit(config *engine.PluginConfig, schema string, storageUnit string, fields map[string]string) (bool, error) {
conn, err := DB(config)
if err != nil {
return false, err
}
defer conn.Close()

// Extract engine settings from advanced configuration
var engineSettings struct {
engine string
orderBy string
partitionBy string
settings map[string]string
}

engineSettings.engine = "MergeTree" // default engine
engineSettings.orderBy = "tuple()" // default order
engineSettings.settings = make(map[string]string)

for _, record := range config.Credentials.Advanced {
switch record.Key {
case "Engine":
engineSettings.engine = record.Value
case "OrderBy":
engineSettings.orderBy = record.Value
case "PartitionBy":
engineSettings.partitionBy = record.Value
default:
if strings.HasPrefix(record.Key, "Setting_") {
key := strings.TrimPrefix(record.Key, "Setting_")
engineSettings.settings[key] = record.Value
}
}
}

// Prepare columns
var columns []string
for field, fieldType := range fields {
columns = append(columns, fmt.Sprintf("%s %s", field, fieldType))
}

// Build the CREATE TABLE query
query := fmt.Sprintf("CREATE TABLE %s.%s (\n\t%s\n) ENGINE = %s",
schema, storageUnit, strings.Join(columns, ",\n\t"), engineSettings.engine)

// Add ORDER BY clause
if engineSettings.orderBy != "" {
query += fmt.Sprintf("\nORDER BY %s", engineSettings.orderBy)
}

// Add PARTITION BY clause if specified
if engineSettings.partitionBy != "" {
query += fmt.Sprintf("\nPARTITION BY %s", engineSettings.partitionBy)
}

// Add engine settings if any
if len(engineSettings.settings) > 0 {
var settingsClauses []string
for key, value := range engineSettings.settings {
settingsClauses = append(settingsClauses, fmt.Sprintf("%s=%s", key, value))
}
query += fmt.Sprintf("\nSETTINGS %s", strings.Join(settingsClauses, ", "))
}

err = conn.Exec(context.Background(), query)
if err != nil {
return false, fmt.Errorf("failed to create table: %w (query: %s)", err, query)
}

return true, nil
}

func (p *ClickHousePlugin) AddRow(config *engine.PluginConfig, schema string, storageUnit string, values []engine.Record) (bool, error) {
conn, err := DB(config)
if err != nil {
return false, err
}
defer conn.Close()

var columns []string
var placeholders []string
var args []interface{}

for _, value := range values {
columns = append(columns, value.Key)
placeholders = append(placeholders, "?")
args = append(args, value.Value)
}

query := fmt.Sprintf("INSERT INTO %s.%s (%s) VALUES (%s)",
schema, storageUnit, strings.Join(columns, ", "), strings.Join(placeholders, ", "))

err = conn.Exec(context.Background(), query, args...)
return err == nil, err
}
144 changes: 144 additions & 0 deletions core/src/plugins/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
package clickhouse

import (
"context"
"fmt"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"strconv"

"github.com/clidey/whodb/core/src/engine"
)

type ClickHousePlugin struct{}

func (p *ClickHousePlugin) IsAvailable(config *engine.PluginConfig) bool {
conn, err := DB(config)
if err != nil {
return false
}
defer conn.Close()
return conn.Ping(context.Background()) == nil
}

func (p *ClickHousePlugin) GetDatabases(config *engine.PluginConfig) ([]string, error) {
conn, err := DB(config)
if err != nil {
return nil, err
}
defer conn.Close()

rows, err := conn.Query(context.Background(), "SHOW DATABASES")
if err != nil {
return nil, err
}
defer rows.Close()

var databases []string
for rows.Next() {
var dbName string
if err := rows.Scan(&dbName); err != nil {
return nil, err
}
databases = append(databases, dbName)
}

return databases, nil
}

func (p *ClickHousePlugin) GetSchema(config *engine.PluginConfig) ([]string, error) {
return []string{config.Credentials.Database}, nil
}

func (p *ClickHousePlugin) GetStorageUnits(config *engine.PluginConfig, schema string) ([]engine.StorageUnit, error) {
conn, err := DB(config)
if err != nil {
return nil, err
}
defer conn.Close()

query := fmt.Sprintf(`
SELECT
name,
engine,
total_rows,
formatReadableSize(total_bytes) as total_size
FROM system.tables
WHERE database = '%s'
`, schema)

rows, err := conn.Query(context.Background(), query)
if err != nil {
return nil, err
}
defer rows.Close()

var storageUnits []engine.StorageUnit
for rows.Next() {
var name, tableType string
var totalRows uint64
var totalSize string
if err := rows.Scan(&name, &tableType, &totalRows, &totalSize); err != nil {
return nil, err
}

attributes := []engine.Record{
{Key: "Table Type", Value: tableType},
{Key: "Total Size", Value: totalSize},
{Key: "Count", Value: strconv.FormatUint(totalRows, 10)},
}

columns, err := getTableSchema(conn, schema, name)
if err != nil {
return nil, err
}
attributes = append(attributes, columns...)

storageUnits = append(storageUnits, engine.StorageUnit{
Name: name,
Attributes: attributes,
})
}

return storageUnits, nil
}

func getTableSchema(conn driver.Conn, schema string, tableName string) ([]engine.Record, error) {
query := fmt.Sprintf(`
SELECT
name,
type
FROM system.columns
WHERE database = '%s' AND table = '%s'
ORDER BY position
`, schema, tableName)

rows, err := conn.Query(context.Background(), query)
if err != nil {
return nil, err
}
defer rows.Close()

var result []engine.Record
for rows.Next() {
var name, dataType string
if err := rows.Scan(&name, &dataType); err != nil {
return nil, err
}
result = append(result, engine.Record{Key: name, Value: dataType})
}

return result, nil
}

func (p *ClickHousePlugin) Chat(config *engine.PluginConfig, schema string, model string, previousConversation string, query string) ([]*engine.ChatMessage, error) {
// Implement chat functionality similar to MySQL implementation
// You may need to adapt this based on ClickHouse specifics
return nil, fmt.Errorf("chat functionality not implemented for ClickHouse")
}

func NewClickHousePlugin() *engine.Plugin {
return &engine.Plugin{
Type: engine.DatabaseType_ClickHouse,
PluginFunctions: &ClickHousePlugin{},
}
}
58 changes: 58 additions & 0 deletions core/src/plugins/clickhouse/db.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package clickhouse

import (
"context"
"fmt"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/clidey/whodb/core/src/common"
"github.com/clidey/whodb/core/src/engine"
)

func DB(config *engine.PluginConfig) (driver.Conn, error) {
port := common.GetRecordValueOrDefault(config.Credentials.Advanced, "Port", "9000")
options := &clickhouse.Options{
Addr: []string{fmt.Sprintf("%s:%s", config.Credentials.Hostname, port)},
Auth: clickhouse.Auth{
Database: config.Credentials.Database,
Username: config.Credentials.Username,
Password: config.Credentials.Password,
},
Settings: clickhouse.Settings{
"max_execution_time": 60,
},
DialTimeout: time.Second * 30,
MaxOpenConns: 5,
MaxIdleConns: 5,
ConnMaxLifetime: time.Hour,
ConnOpenStrategy: clickhouse.ConnOpenInOrder,
Compression: &clickhouse.Compression{
Method: clickhouse.CompressionLZ4,
},
}

return clickhouse.Open(options)
}

func getTableColumns(conn driver.Conn, schema, table string) ([]engine.Record, error) {
query := fmt.Sprintf("DESCRIBE TABLE %s.%s", schema, table)
rows, err := conn.Query(context.Background(), query)
if err != nil {
return nil, err
}
defer rows.Close()

var columns []engine.Record
for rows.Next() {
var name, typ, defaultType, defaultExpression string
var comment *string
if err := rows.Scan(&name, &typ, &defaultType, &defaultExpression, &comment); err != nil {
return nil, err
}
columns = append(columns, engine.Record{Key: name, Value: typ})
}

return columns, nil
}
Loading

0 comments on commit 899f687

Please sign in to comment.