Skip to content

Commit

Permalink
Merge pull request #8 from hasura/tristen/update
Browse files Browse the repository at this point in the history
add aggregates first cut
  • Loading branch information
TristenHarr authored Oct 30, 2024
2 parents 1ada467 + d46e9be commit 2592158
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 24 deletions.
17 changes: 0 additions & 17 deletions connector-definition/template/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,6 @@ import { GoogleCalendar, GMail} from "@hasura/ndc-duckduckapi/services";
const connectorConfig: duckduckapi = {
dbSchema: `
DROP TABLE IF EXISTS users;
CREATE TABLE users (
id integer primary key,
name text
);
INSERT INTO users (id, name) VALUES
(1, 'Alice Johnson'),
(2, 'Bob Smith'),
(3, 'Carol Martinez'),
(4, 'David Kim'),
(5, 'Emma Wilson'),
(6, 'Frank Zhang'),
(7, 'Grace Lee'),
(8, 'Henry Garcia'),
(9, 'Isabel Patel'),
(10, 'Jack Thompson');
-- Add your SQL schema here.
-- This SQL will be run on startup every time.
-- CREATE TABLE SAAS_TABLE_NAME (.....);
Expand Down
1 change: 1 addition & 0 deletions ndc-duckduckapi/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ export const CAPABILITIES_RESPONSE: Capabilities = {
query: {
variables: {},
nested_fields: {},
aggregates: {}
},
mutation: {
transactional: null,
Expand Down
5 changes: 5 additions & 0 deletions ndc-duckduckapi/src/duckdb-connection-manager.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// What is the point of this file?
import duckdb from 'duckdb';

export class AsyncConnection {
Expand Down Expand Up @@ -38,6 +39,10 @@ export class DuckDBManager {
this.maxConnections = 5;
this.activeConnections = 0;
}

async releaseSyncConnection(){
this.activeConnections--;
}

async getSyncConnection(): Promise<duckdb.Connection> {
if (this.activeConnections >= this.maxConnections) {
Expand Down
88 changes: 81 additions & 7 deletions ndc-duckduckapi/src/handlers/query.ts
Original file line number Diff line number Diff line change
Expand Up @@ -269,11 +269,57 @@ function build_query(
let order_by_sql = ``;
let collect_rows = [];
let where_conditions = ["WHERE 1"];

if (query.aggregates) {
run_agg = true;
agg_sql = "... todo";
throw new Forbidden("Aggregates not implemented yet!", {});
let agg_columns: string[] = [];

// Process each aggregate
for (const [agg_name, agg_value] of Object.entries(query.aggregates)) {
if (agg_value.type === "star_count") {
agg_columns.push(`COUNT(*) as ${escape_double(agg_name)}`);
} else if (agg_value.type === "column_count") {
// Handle column count aggregation
const column_expr = agg_value.distinct
? `COUNT(DISTINCT ${escape_double(collection_alias)}.${escape_double(agg_value.column)})`
: `COUNT(${escape_double(collection_alias)}.${escape_double(agg_value.column)})`;
agg_columns.push(`${column_expr} as ${escape_double(agg_name)}`);
} else {
throw new Forbidden(`Aggregate type ${agg_value.type} not implemented yet!`, {});
}
}

let from_sql = `${collection} as ${escape_double(collection_alias)}`;
if (path.length > 1 && relationship_key !== null) {
let relationship = query_request.collection_relationships[relationship_key];
let parent_alias = path.slice(0, -1).join("_");
let relationship_alias = config.duckdbConfig.collection_aliases[relationship.target_collection];
from_sql = `${relationship_alias} as ${escape_double(collection_alias)}`;
where_conditions.push(
...Object.entries(relationship.column_mapping).map(([from, to]) => {
return `${escape_double(parent_alias)}.${escape_double(from)} = ${escape_double(collection_alias)}.${escape_double(to)}`;
}),
);
}

if (query.predicate) {
where_conditions.push(
`(${build_where(query.predicate, query_request.collection_relationships, agg_args, variables, collection_alias, config.duckdbConfig.collection_aliases)})`
);
}

agg_sql = wrap_data(`
SELECT JSON_OBJECT(
${agg_columns.map(col => {
const parts = col.split(' as ');
return `${escape_single(parts[1].replace(/"/g, ''))}, ${parts[0]}`;
}).join(',')}
) as data
FROM ${from_sql}
${where_conditions.join(" AND ")}
`);
}

if (query.fields) {
run_sql = true;
for (let [field_name, field_value] of Object.entries(query.fields)) {
Expand Down Expand Up @@ -493,17 +539,45 @@ export async function perform_query(
state: State,
query_plans: SQLQuery[],
): Promise<QueryResponse> {
// const con = state.client.connect();
const response: RowSet[] = [];
for (let query_plan of query_plans) {
try {
const connection = await state.client.getSyncConnection();
const res = await do_all(connection, query_plan);
const row_set = JSON.parse(res[0]["data"] as string) as RowSet;
let row_set: RowSet = { rows: [] };

// Handle aggregate query if present
if (query_plan.runAgg) {
const aggRes = await do_all(connection, {
runSql: true,
runAgg: false,
sql: query_plan.aggSql,
args: query_plan.aggArgs,
aggSql: "",
aggArgs: []
});
const parsedAggData = JSON.parse(aggRes[0]["data"]);
row_set.aggregates = parsedAggData;
}

// Handle regular query if present
if (query_plan.runSql) {
const res = await do_all(connection, {
runSql: true,
runAgg: false,
sql: query_plan.sql,
args: query_plan.args,
aggSql: "",
aggArgs: []
});
const regular_results = JSON.parse(res[0]["data"]);
row_set.rows = regular_results.rows;
}
response.push(row_set);
await state.client.releaseSyncConnection();
} catch (err) {
console.error('RAAAAAAAAAAAAAAAAAAAAAAR.perform_query:: ' + "Error performing query: " + err);
console.error('Error performing query: ' + err);
throw err;
}
}
return response;
}
}

0 comments on commit 2592158

Please sign in to comment.