Skip to content

Commit

Permalink
Update main.py
Browse files Browse the repository at this point in the history
  • Loading branch information
lmangani authored Oct 18, 2024
1 parent e6a3be7 commit 8ed8abc
Showing 1 changed file with 26 additions and 17 deletions.
43 changes: 26 additions & 17 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,23 @@ def verify(username, password):
conn = duckdb.connect(path)
return True

def convert_to_ndjson(result):
columns = result.description
data = result.fetchall()

ndjson_lines = []
for row in data:
row_dict = {columns[i][0]: row[i] for i in range(len(columns))}
ndjson_lines.append(json.dumps(row_dict))

return '\n'.join(ndjson_lines).encode()

def convert_to_clickhouse_jsoncompact(result, query_time):
columns = result.description
data = result.fetchall()

meta = [{"name": col[0], "type": col[1]} for col in columns]

json_result = {
"meta": meta,
"data": data,
Expand All @@ -40,29 +51,28 @@ def convert_to_clickhouse_jsoncompact(result, query_time):
"bytes_read": sum(len(str(item)) for row in data for item in row)
}
}

return json.dumps(json_result)

def duckdb_query_with_errmsg(query, format):
try:
start_time = time.time()
result = conn.execute(query)
query_time = time.time() - start_time

if format.lower() == 'jsoncompact':
output = convert_to_clickhouse_jsoncompact(result, query_time)
elif format.lower() == 'jsoneachrow':
rows = result.fetchall()
output = '\n'.join(json.dumps(row) for row in rows).encode()
output = convert_to_ndjson(result)
elif format.lower() == 'tsv':
output = result.df().to_csv(sep='\t', index=False)
else:
output = result.fetchall()

# Ensure output is in bytes before returning
if isinstance(output, list):
output = json.dumps(output).encode() # Convert list to JSON string and then encode

return output, b""
except Exception as e:
return b"", str(e).encode()
Expand Down Expand Up @@ -92,13 +102,13 @@ def clickhouse():
response = app.response_class(status=200)
response.headers['Content-Type'] = 'application/json'
response.headers['Accept-Ranges'] = 'bytes' # Indicate that range requests are supported

# Set Content-Length for HEAD request
content_length = len(result) if isinstance(result, bytes) else len(result.decode())
response.headers['Content-Length'] = content_length

return response

return result, 200

# Log any warnings or errors
Expand All @@ -116,20 +126,20 @@ def play():
body = request.get_data() or None
format = request.args.get('default_format', default="JSONCompact", type=str)
database = request.args.get('database', default="", type=str)

if query is None:
query = ""

if body is not None:
data = " ".join(body.decode('utf-8').strip().splitlines())
query = f"{query} {data}"

if not query:
return "Error: no query parameter provided", 400

if database:
query = f"ATTACH '{database}' AS db; USE db; {query}"

result, errmsg = duckdb_query_with_errmsg(query.strip(), format)
if len(errmsg) == 0:
return result, 200
Expand All @@ -156,4 +166,3 @@ def handle_404(e):

if __name__ == '__main__':
app.run(host=host, port=port)

0 comments on commit 8ed8abc

Please sign in to comment.