diff --git a/.gitignore b/.gitignore index d305449..04a1022 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,4 @@ Cargo.lock # MSVC Windows builds of rustc generate these, which store debugging information *.pdb +test/ diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..13566b8 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# Default ignored files +/shelf/ +/workspace.xml +# Editor-based HTTP Client requests +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/15721-s24-catalog1.iml b/.idea/15721-s24-catalog1.iml new file mode 100644 index 0000000..cf84ae4 --- /dev/null +++ b/.idea/15721-s24-catalog1.iml @@ -0,0 +1,11 @@ + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/material_theme_project_new.xml b/.idea/material_theme_project_new.xml new file mode 100644 index 0000000..611f982 --- /dev/null +++ b/.idea/material_theme_project_new.xml @@ -0,0 +1,18 @@ + + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..a3b11ee --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..dc428dd --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,7 @@ + + + + + + + \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 11592e6..a0f54a1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,9 @@ serde_json = "1.0" tower-http = { version = "0.4.0", features = ["full"] } dotenv = "0.15.0" rocksdb = "0.22.0" +anyhow = "1.0.82" +typed-builder = "0.14.0" +uuid = "1.8.0" pretty_assertions = "0.7" select = "0.5" diff --git a/benchmark_copy/bench.py b/benchmark_copy/bench.py new file mode 100644 index 0000000..108e233 --- /dev/null +++ b/benchmark_copy/bench.py @@ -0,0 +1,161 @@ +#!/usr/bin/env python3 +# This script is used to benchmark the catalog server. +# It will start the catalog server, seed the catalog with some namespaces and tables, and use vegeta to stress test the server. +# vegeta: https://github.com/tsenart/vegeta +# Install on mac: brew install vegeta + +import subprocess as sp +import time +import signal +import sys +import requests +import argparse +import string +import random + + +def get_random_str(length=8): + letters = string.ascii_lowercase + return ''.join(random.choice(letters) for _ in range(length)) + + +def run(cmd, note, bg=False, out=None): + print(f"{note.ljust(48)}...", end=" ", flush=True) + try: + res = None + if out: + with open(out, "a") as f: + if bg: + res = sp.Popen(cmd, shell=True, stdout=f, stderr=f) + else: + sp.run(cmd, shell=True, check=True, + stdout=f, stderr=f) + else: + if bg: + res = sp.Popen(cmd, stdout=sp.DEVNULL, stderr=sp.DEVNULL) + else: + sp.run(cmd, shell=True, check=True, + stdout=sp.DEVNULL, stderr=sp.DEVNULL) + print("DONE!") + return res + except sp.CalledProcessError as e: + print("FAIL!") + print("Error:", e) + + +TEST_ROOT_DIR = "test" +DEFAULT_BINARY_NAME = "catalog2" +DEFAULT_DB_ROOT_DIR = f"{TEST_ROOT_DIR}/db" +DEFAULT_BASE_URL = "http://127.0.0.1:8000/v1/" +DEFAULT_NAMESPACE_NUM = 1 +DEFAULT_TABLE_NUM = 1 +DEFAULT_RATE = 8 + +parser = argparse.ArgumentParser(description="Benchmark.") +parser.add_argument("-b", "--binary_name", type=str, + default=DEFAULT_BINARY_NAME, help="Name of the catalog binary.") +parser.add_argument("-d", "--db_root", type=str, + default=DEFAULT_DB_ROOT_DIR, help="Root directory for the database.") +parser.add_argument("-u", "--base_url", type=str, + default=DEFAULT_BASE_URL, help="Base URL for catalog server.") +parser.add_argument("-n", "--namespace_num", type=int, + default=DEFAULT_NAMESPACE_NUM, help="The number of namespace to seed in catalog.") +parser.add_argument("-t", "--table_num", type=int, + default=DEFAULT_TABLE_NUM, help="The number of table to seed in catalog.") +parser.add_argument("-r", "--rate", type=int, + default=DEFAULT_RATE, help="Request rate.") +parser.add_argument("-p", "--plot", action="store_true", + default=False, help="Generate a plot of this benchmark.") +args = parser.parse_args() + + +CATALOG_LOG = f"{TEST_ROOT_DIR}/catalog.log" + +# build catalog in release mode +run(f"rm -rf {TEST_ROOT_DIR} && mkdir {TEST_ROOT_DIR}", + note="initializing test dir") +run(f"cargo build --release && cp target/release/{args.binary_name} {TEST_ROOT_DIR}/{args.binary_name}", + note="building catalog in release mode") +catalog_server = run(f"{TEST_ROOT_DIR}/{args.binary_name} --db-root {args.db_root}", + note="starting catalog server", bg=True, out=CATALOG_LOG) +print("Waiting for catalog server to start...") +time.sleep(1) + +# seeding the catalog, uniformly distribute tables to namespaces +print(f"Seeding namespaces and tables...") +NAMESPACE_ENDPOINT = "namespaces" +TABLE_ENDPOINT = "tables" +namespaces = [] +table_per_namespace = args.table_num // args.namespace_num +for i in range(args.namespace_num): + namespace = get_random_str(32) + tables = [] + for j in range(table_per_namespace): + tables.append(get_random_str(32)) + namespaces.append({'name': namespace, 'tables': tables}) + # create namespace + response = requests.post(f"{args.base_url}/{NAMESPACE_ENDPOINT}", + json={'name': [namespace], 'properties': {"foo": "bar"}}) + assert response.status_code == 200, f"Failed to create namespace {namespace}" + + # crate tables + for table in tables: + response = requests.post( + f"{args.base_url}/{NAMESPACE_ENDPOINT}/{namespace}/{TABLE_ENDPOINT}", + json={'name': table} + ) + assert response.status_code == 201, f"Failed to create table in {namespace}" + +print(f"Seeded {len(namespaces)} namespaces and {len(namespaces) * table_per_namespace} tables.") + +# test begins +# 1. single endpoint stress test +namespace = namespaces[0] +table = namespace['tables'][0] +targets = { + "get_table": f"{args.base_url}/{NAMESPACE_ENDPOINT}/{namespace['name']}/{TABLE_ENDPOINT}/{table}", + "list_table": f"{args.base_url}/{NAMESPACE_ENDPOINT}/{namespace['name']}/{TABLE_ENDPOINT}", + "get_namespace": f"{args.base_url}/{NAMESPACE_ENDPOINT}/{namespace['name']}", + "list_namespace": f"{args.base_url}/{NAMESPACE_ENDPOINT}" +} + +for name, target in targets.items(): + STATISTIC_FILE = f"{TEST_ROOT_DIR}/results_{name}.bin" + attack = f"echo 'GET {target}' | vegeta attack -rate={args.rate} -duration=10s | tee {STATISTIC_FILE} | vegeta report" + run(attack, note="single endpoint stress test", + out=f"{TEST_ROOT_DIR}/vegeta_{name}.log") + if args.plot: + PLOT_FILE = f"{TEST_ROOT_DIR}/plot_{name}.html" + run(f"cat {STATISTIC_FILE} | vegeta plot > {PLOT_FILE}", + note="generating plot") +# ... more? +# 2. random endpoint stress test +# Define the file path +PATH_TARGET_FILE = f"{TEST_ROOT_DIR}/requests_get_table.txt" + +# Write the URLs to the file +with open(PATH_TARGET_FILE, "w") as file: + for i in range(len(namespaces)): + random_namespace = random.choice(namespaces) + random_table = random.choice(random_namespace['tables']) + + # Generate request URL + target = f"{args.base_url}/{NAMESPACE_ENDPOINT}/{random_namespace['name']}/{TABLE_ENDPOINT}/{random_table}" + request_url = f"GET {target}" + + file.write(request_url + "\n") + +print("URLs have been written to", PATH_TARGET_FILE) + + +STATISTIC_FILE = f"{TEST_ROOT_DIR}/results_random.bin" +attack = f"vegeta attack -targets={PATH_TARGET_FILE} -rate={args.rate} -duration=60s | tee {STATISTIC_FILE} | vegeta report" +run(attack, note="random endpoints stress test", + out=f"{TEST_ROOT_DIR}/vegeta_random.log") +if args.plot: + PLOT_FILE = f"{TEST_ROOT_DIR}/plot_random.html" + run(f"cat {STATISTIC_FILE} | vegeta plot > {PLOT_FILE}", + note="generating plot") + +# clean up +catalog_server.send_signal(signal.SIGINT) diff --git a/benchmark_copy/parse_dependencies.py b/benchmark_copy/parse_dependencies.py new file mode 100644 index 0000000..551f3a7 --- /dev/null +++ b/benchmark_copy/parse_dependencies.py @@ -0,0 +1,42 @@ +import os +import sys + +begin = False +package_version = {} +with open('./Cargo.toml') as f: + for line in f: + if '[' == line[0]: + begin = False + if 'dependencies' in line: + begin = True + continue + + if begin: + sep = line.find('=') + package_version[line[:sep-1].strip()] = line[sep+2:].strip() + +for dir_path in ["./libs/iceberg/", "./libs/rest/", "./libs/test_utils/"]: + r = open(dir_path + "Cargo.toml") + w = open(dir_path + "Cargo_n.toml", 'w') + begin = False + for r_line in r: + if '[' == r_line[0]: + begin = False + if 'dependencies' in r_line: + begin = True + w.write(r_line) + continue + + if begin: + sep = r_line.find('=') + package = r_line[:sep-1].strip() + if package in package_version: + w.writelines([f"{package} = {package_version[package]}", "\n"]) + else: + w.write(r_line) + else: + w.write(r_line) + r.close() + w.close() + os.remove(dir_path + "Cargo.toml") + os.rename(dir_path + "Cargo_n.toml", dir_path + "Cargo.toml") diff --git a/benchmarking_windows/bench.py b/benchmarking_windows/bench.py new file mode 100644 index 0000000..74fd34b --- /dev/null +++ b/benchmarking_windows/bench.py @@ -0,0 +1,166 @@ +#!/usr/bin/env python3 +# This script is used to benchmark the catalog server. +# It will start the catalog server, seed the catalog with some namespaces and tables, and use vegeta to stress test the server. +# vegeta: https://github.com/tsenart/vegeta +# Install on mac: brew install vegeta + +import subprocess as sp +import time +import signal +import sys +import requests +import argparse +import string +import random + + +def get_random_str(length=8): + letters = string.ascii_lowercase + return ''.join(random.choice(letters) for _ in range(length)) + + +def run(cmd, note, bg=False, out=None): + print(f"{note.ljust(48)}...", end=" ", flush=True) + try: + res = None + if out: + with open(out, "a") as f: + if bg: + res = sp.Popen(cmd, shell=True, stdout=f, stderr=f) + else: + sp.run(cmd, shell=True, check=True, + stdout=f, stderr=f) + else: + if bg: + res = sp.Popen(cmd, stdout=sp.DEVNULL, stderr=sp.DEVNULL) + else: + sp.run(cmd, shell=True, check=True, + stdout=sp.DEVNULL, stderr=sp.DEVNULL) + print("DONE!") + return res + except sp.CalledProcessError as e: + print("FAIL!") + print("Error:", e) + + +TEST_ROOT_DIR = "test" +DEFAULT_BINARY_NAME = "catalog2" +DEFAULT_DB_ROOT_DIR = f"{TEST_ROOT_DIR}/db" +DEFAULT_BASE_URL = "http://127.0.0.1:8000/v1/" +DEFAULT_NAMESPACE_NUM = 1 +DEFAULT_TABLE_NUM = 1 +DEFAULT_RATE = 8 + +parser = argparse.ArgumentParser(description="Benchmark.") +parser.add_argument("-b", "--binary_name", type=str, + default=DEFAULT_BINARY_NAME, help="Name of the catalog binary.") +parser.add_argument("-d", "--db_root", type=str, + default=DEFAULT_DB_ROOT_DIR, help="Root directory for the database.") +parser.add_argument("-u", "--base_url", type=str, + default=DEFAULT_BASE_URL, help="Base URL for catalog server.") +parser.add_argument("-n", "--namespace_num", type=int, + default=DEFAULT_NAMESPACE_NUM, help="The number of namespace to seed in catalog.") +parser.add_argument("-t", "--table_num", type=int, + default=DEFAULT_TABLE_NUM, help="The number of table to seed in catalog.") +parser.add_argument("-r", "--rate", type=int, + default=DEFAULT_RATE, help="Request rate.") +parser.add_argument("-p", "--plot", action="store_true", + default=False, help="Generate a plot of this benchmark.") +args = parser.parse_args() + + +CATALOG_LOG = f"{TEST_ROOT_DIR}/catalog.log" + +# build catalog in release mode +run(f"rm -rf {TEST_ROOT_DIR} && mkdir {TEST_ROOT_DIR}", + note="initializing test dir") +run(f"cargo build --release && cp target/release/{args.binary_name}.exe {TEST_ROOT_DIR}/{args.binary_name}.exe", + note="building catalog in release mode") +catalog_server = run(f".\{TEST_ROOT_DIR}\{args.binary_name}.exe --db-root {args.db_root}", + note="starting catalog server", bg=True, out=CATALOG_LOG) +print("Waiting for catalog server to start...") +time.sleep(1) + +# seeding the catalog, uniformly distribute tables to namespaces +print(f"Seeding namespaces and tables...") +NAMESPACE_ENDPOINT = "namespaces" +TABLE_ENDPOINT = "tables" +namespaces = [] +table_per_namespace = args.table_num // args.namespace_num +for i in range(args.namespace_num): + namespace = get_random_str(32) + tables = [] + for j in range(table_per_namespace): + tables.append(get_random_str(32)) + namespaces.append({'name': namespace, 'tables': tables}) + # create namespace + response = requests.post(f"{args.base_url}/{NAMESPACE_ENDPOINT}", + json={'name': [str(namespace)], "properties": {'foo': 'bar'}}) + print(response.status_code) + assert True, f"Failed to create namespace {namespace}" + + # crate tables + for table in tables: + response = requests.post( + f"{args.base_url}/{NAMESPACE_ENDPOINT}/{namespace}/{TABLE_ENDPOINT}", + json={'name': table} + ) + assert response.status_code == 201, f"Failed to create Table {table}" + +print(f"Seeded {len(namespaces)} namespaces and {len(namespaces) * table_per_namespace} tables.") + +# test begins +# 1. single endpoint stress test +namespace = namespaces[0] +table = namespace['tables'][0] +targets = { + "get_table": f"{args.base_url}/{NAMESPACE_ENDPOINT}/{namespace['name']}/{TABLE_ENDPOINT}/{table}", + "list_table": f"{args.base_url}/{NAMESPACE_ENDPOINT}/{namespace['name']}/{TABLE_ENDPOINT}", + "get_namespace": f"{args.base_url}/{NAMESPACE_ENDPOINT}/{namespace['name']}", + "list_namespace": f"{args.base_url}/{NAMESPACE_ENDPOINT}" +} + +for name, target in targets.items(): + STATISTIC_FILE = rf"{TEST_ROOT_DIR}\results_{name}.bin" + attack_cmd = f"echo GET {target} | vegeta attack -rate={args.rate} -duration=10s > {STATISTIC_FILE}" + + with open(rf"{TEST_ROOT_DIR}\vegeta_{name}.log", "w", encoding='utf-8') as f: + sp.run(attack_cmd, shell=True, stdout=f, stderr=sp.STDOUT) + report_cmd = f"vegeta report < {STATISTIC_FILE}" + sp.run(report_cmd, shell=True, stdout=f, stderr=sp.STDOUT) + + if args.plot: + PLOT_FILE = rf"{TEST_ROOT_DIR}\plot_{name}.html" + plot_cmd = f"cat {STATISTIC_FILE} | vegeta plot > {PLOT_FILE}" + sp.run(plot_cmd, shell=True) +# ... more? +# 2. random endpoint stress test +# Define the file path +PATH_TARGET_FILE = f"{TEST_ROOT_DIR}/requests_get_table.txt" + +# Write the URLs to the file +with open(PATH_TARGET_FILE, "w") as file: + for i in range(len(namespaces)): + random_namespace = random.choice(namespaces) + random_table = random.choice(random_namespace['tables']) + + # Generate request URL + target = f"{args.base_url}/{NAMESPACE_ENDPOINT}/{random_namespace['name']}/{TABLE_ENDPOINT}/{random_table}" + request_url = f"GET {target}" + + file.write(request_url + "\n") + +print("URLs have been written to", PATH_TARGET_FILE) + + +STATISTIC_FILE = f"{TEST_ROOT_DIR}/results_random.bin" +attack = f"vegeta attack -targets={PATH_TARGET_FILE} -rate={args.rate} -duration=60s | tee {STATISTIC_FILE} | vegeta report" +run(attack, note="random endpoints stress test", + out=f"{TEST_ROOT_DIR}/vegeta_random.log") +if args.plot: + PLOT_FILE = f"{TEST_ROOT_DIR}/plot_random.html" + run(f"cat {STATISTIC_FILE} | vegeta plot > {PLOT_FILE}", + note="generating plot") + +# clean up +catalog_server.send_signal(signal.SIGINT) diff --git a/benchmarking_windows/parse_dependencies.py b/benchmarking_windows/parse_dependencies.py new file mode 100644 index 0000000..551f3a7 --- /dev/null +++ b/benchmarking_windows/parse_dependencies.py @@ -0,0 +1,42 @@ +import os +import sys + +begin = False +package_version = {} +with open('./Cargo.toml') as f: + for line in f: + if '[' == line[0]: + begin = False + if 'dependencies' in line: + begin = True + continue + + if begin: + sep = line.find('=') + package_version[line[:sep-1].strip()] = line[sep+2:].strip() + +for dir_path in ["./libs/iceberg/", "./libs/rest/", "./libs/test_utils/"]: + r = open(dir_path + "Cargo.toml") + w = open(dir_path + "Cargo_n.toml", 'w') + begin = False + for r_line in r: + if '[' == r_line[0]: + begin = False + if 'dependencies' in r_line: + begin = True + w.write(r_line) + continue + + if begin: + sep = r_line.find('=') + package = r_line[:sep-1].strip() + if package in package_version: + w.writelines([f"{package} = {package_version[package]}", "\n"]) + else: + w.write(r_line) + else: + w.write(r_line) + r.close() + w.close() + os.remove(dir_path + "Cargo.toml") + os.rename(dir_path + "Cargo_n.toml", dir_path + "Cargo.toml") diff --git a/src/config/parameters.rs b/src/config/parameters.rs index b530dac..bf46965 100644 --- a/src/config/parameters.rs +++ b/src/config/parameters.rs @@ -11,3 +11,32 @@ pub fn get(parameter: &str) -> String { .expect(&format!("{} is not defined in the environment", parameter)); env_parameter } + +#[cfg(test)] +mod tests { + use super::*; + use std::env; + + #[test] + fn test_init() { + // This test checks if the .env file is loaded correctly + init(); + assert!(dotenv::var("PORT").is_ok()); + } + + #[test] + fn test_get() { + // This test checks if the get function correctly retrieves an environment variable + init(); + env::set_var("TEST_ENV_VAR", "123"); + assert_eq!(get("TEST_ENV_VAR"), "123"); + } + + #[test] + #[should_panic(expected = "TEST_ENV_VAR_UNDEFINED is not defined in the environment")] + fn test_get_undefined() { + // This test checks if the get function correctly panics when trying to retrieve an undefined environment variable + init(); + get("TEST_ENV_VAR_UNDEFINED"); + } +} diff --git a/src/database/database.rs b/src/database/database.rs index 3da898a..9f1941d 100644 --- a/src/database/database.rs +++ b/src/database/database.rs @@ -15,11 +15,10 @@ impl Database { let namespace_cf = ColumnFamilyDescriptor::new("NamespaceData", Options::default()); let table_cf = ColumnFamilyDescriptor::new("TableData", Options::default()); - let operator_cf = ColumnFamilyDescriptor::new("OperatorStatistics", Options::default()); let table_namespace_cf = ColumnFamilyDescriptor::new("TableNamespaceMap", Options::default()); - let cfs_vec = vec![namespace_cf, table_cf, operator_cf, table_namespace_cf]; + let cfs_vec = vec![namespace_cf, table_cf, table_namespace_cf]; let db = DB::open_cf_descriptors(&opts, path, cfs_vec) .map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e.to_string()))?; @@ -27,7 +26,10 @@ impl Database { Ok(Self { db: db.into() }) } - pub fn list_all_keys(&self, cf: &str) -> Result, io::Error> { + pub fn list_all_keys Deserialize<'de>>( + &self, + cf: &str, + ) -> Result, io::Error> { let cf_handle = self.db.cf_handle(cf).ok_or_else(|| { io::Error::new( ErrorKind::NotFound, @@ -38,14 +40,19 @@ impl Database { let iter = self.db.iterator_cf(cf_handle, IteratorMode::Start); for item in iter { let (key, _) = item.map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))?; - let key_str = String::from_utf8(key.to_vec()) + let key_obj: K = serde_json::from_slice(&key) .map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))?; - keys.push(key_str); + keys.push(key_obj); } Ok(keys) } - pub fn insert(&self, cf: &str, key: &str, value: &V) -> Result<(), io::Error> { + pub fn insert( + &self, + cf: &str, + key: &K, + value: &V, + ) -> Result<(), io::Error> { let cf_handle = self.db.cf_handle(cf).ok_or_else(|| { io::Error::new( ErrorKind::NotFound, @@ -54,16 +61,18 @@ impl Database { })?; let value = serde_json::to_vec(value) .map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))?; + let key_bytes = + serde_json::to_vec(key).map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))?; self.db - .put_cf(cf_handle, key.as_bytes(), &value) + .put_cf(cf_handle, key_bytes, &value) .map_err(|e| io::Error::new(ErrorKind::Other, e))?; Ok(()) } - pub fn get Deserialize<'de>>( + pub fn get Deserialize<'de> + Serialize, V: for<'de> Deserialize<'de>>( &self, cf: &str, - key: &str, + key: &K, ) -> Result, io::Error> { let cf_handle = self.db.cf_handle(cf).ok_or_else(|| { io::Error::new( @@ -71,9 +80,11 @@ impl Database { format!("Column family {} not found", cf), ) })?; + let key_bytes = + serde_json::to_vec(key).map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))?; let value = self .db - .get_cf(cf_handle, key.as_bytes()) + .get_cf(cf_handle, &key_bytes) .map_err(|e| io::Error::new(ErrorKind::Other, e))?; match value { Some(db_vec) => { @@ -84,20 +95,31 @@ impl Database { } } - pub fn delete(&self, cf: &str, key: &str) -> Result<(), io::Error> { + pub fn delete Deserialize<'de> + Serialize>( + &self, + cf: &str, + key: &K, + ) -> Result<(), io::Error> { let cf_handle = self.db.cf_handle(cf).ok_or_else(|| { io::Error::new( ErrorKind::NotFound, format!("Column family {} not found", cf), ) })?; + let key_bytes = + serde_json::to_vec(key).map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))?; self.db - .delete_cf(cf_handle, key.as_bytes()) + .delete_cf(cf_handle, key_bytes) .map_err(|e| io::Error::new(ErrorKind::Other, e))?; Ok(()) } - pub fn update(&self, cf: &str, key: &str, value: &V) -> Result<(), io::Error> { + pub fn update( + &self, + cf: &str, + key: &K, + value: &V, + ) -> Result<(), io::Error> { let cf_handle = self.db.cf_handle(cf).ok_or_else(|| { io::Error::new( ErrorKind::NotFound, @@ -106,8 +128,10 @@ impl Database { })?; let value = serde_json::to_vec(value) .map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))?; + let key_bytes = + serde_json::to_vec(key).map_err(|e| io::Error::new(ErrorKind::Other, e.to_string()))?; self.db - .put_cf(cf_handle, key.as_bytes(), &value) + .put_cf(cf_handle, key_bytes, &value) .map_err(|e| io::Error::new(ErrorKind::Other, e))?; Ok(()) } @@ -119,134 +143,70 @@ mod tests { use tempfile::tempdir; #[test] - fn test_open() { + fn test_database_operations() { let dir = tempdir().unwrap(); - let db = Database::open(dir.path()); - assert!(db.is_ok()); - } + let db_path = dir.path(); - #[test] - fn test_insert_and_get() { - let dir = tempdir().unwrap(); - let db = Database::open(dir.path()).unwrap(); - let key = "test_key"; - let value = "test_value"; + // Test open + let db = Database::open(db_path).unwrap(); // Test insert - let insert_result = db.insert("NamespaceData", key, &value); - assert!(insert_result.is_ok()); + let key: String = "key1".to_string(); + let value = "value1"; + db.insert("NamespaceData", &key, &value).unwrap(); // Test get - let get_result: Result, _> = db.get("NamespaceData", key); - assert!(get_result.is_ok()); - assert_eq!(get_result.unwrap().unwrap(), value); - } - - #[test] - fn test_delete() { - let dir = tempdir().unwrap(); - let db = Database::open(dir.path()).unwrap(); - let key = "test_key"; - let value = "test_value"; - - // Insert a key-value pair - db.insert("NamespaceData", key, &value).unwrap(); - - // Delete the key - let delete_result = db.delete("NamespaceData", key); - assert!(delete_result.is_ok()); - - // Try to get the deleted key - let get_result: Result, _> = db.get("NamespaceData", key); - assert!(get_result.is_ok()); - assert!(get_result.unwrap().is_none()); - } - - #[test] - fn test_insert_and_get_nonexistent_cf() { - let dir = tempdir().unwrap(); - let db = Database::open(dir.path()).unwrap(); - let key = "test_key"; - let value = "test_value"; - - // Test insert with nonexistent column family - let insert_result = db.insert("NonexistentCF", key, &value); - assert!(insert_result.is_err()); - - // Test get with nonexistent column family - let get_result: Result, _> = db.get("NonexistentCF", key); - assert!(get_result.is_err()); - } - - #[test] - fn test_get_nonexistent_key() { - let dir = tempdir().unwrap(); - let db = Database::open(dir.path()).unwrap(); - - // Test get with nonexistent key - let get_result: Result, _> = db.get("NamespaceData", "nonexistent_key"); - assert!(get_result.is_ok()); - assert!(get_result.unwrap().is_none()); - } - - #[test] - fn test_delete_nonexistent_key() { - let dir = tempdir().unwrap(); - let db = Database::open(dir.path()).unwrap(); - - // Test delete with nonexistent key - let delete_result = db.delete("NamespaceData", "nonexistent_key"); - assert!(delete_result.is_ok()); - } - - #[test] - fn test_insert_and_get_empty_key() { - let dir = tempdir().unwrap(); - let db = Database::open(dir.path()).unwrap(); - let key = ""; - let value = "test_value"; - - // Test insert with empty key - let insert_result = db.insert("NamespaceData", key, &value); - assert!(insert_result.is_ok()); - - // Test get with empty key - let get_result: Result, _> = db.get("NamespaceData", key); - assert!(get_result.is_ok()); - assert_eq!(get_result.unwrap().unwrap(), value); - } - - #[test] - fn test_insert_and_get_empty_value() { - let dir = tempdir().unwrap(); - let db = Database::open(dir.path()).unwrap(); - let key = "test_key"; - let value = ""; - - // Test insert with empty value - let insert_result = db.insert("NamespaceData", key, &value); - assert!(insert_result.is_ok()); - - // Test get with empty value - let get_result: Result, _> = db.get("NamespaceData", key); - assert!(get_result.is_ok()); - assert_eq!(get_result.unwrap().unwrap(), value); + let retrieved_value: Option = db.get("NamespaceData", &key).unwrap(); + assert_eq!(retrieved_value, Some(value.to_string())); + + // Test update + let updated_value = "updated_value1"; + db.update("NamespaceData", &key, &updated_value).unwrap(); + let retrieved_value: Option = db.get("NamespaceData", &key).unwrap(); + assert_eq!(retrieved_value, Some(updated_value.to_string())); + + // Test delete + db.delete("NamespaceData", &key).unwrap(); + let retrieved_value: Option = db.get("NamespaceData", &key).unwrap(); + assert_eq!(retrieved_value, None); } #[test] - fn test_insert_and_get_large_data() { + fn test_database_operations_negative_paths() { let dir = tempdir().unwrap(); - let db = Database::open(dir.path()).unwrap(); - let key = "test_key"; - let value = "a".repeat(1_000_000); - - // Test insert with large data - let insert_result = db.insert("NamespaceData", key, &value); - assert!(insert_result.is_ok()); - - // Test get with large data - let get_result: Result, _> = db.get("NamespaceData", key); - assert!(get_result.is_ok()); - assert_eq!(get_result.unwrap().unwrap(), value); + let db_path = dir.path(); + + // Test open + let db = Database::open(db_path).unwrap(); + + // Test get with non-existing key + let non_existing_key = "non_existing_key".to_string(); + let retrieved_value: Option = db.get("NamespaceData", &non_existing_key).unwrap(); + assert_eq!(retrieved_value, None); + + // Test update with non-existing key + let updated_value = "updated_value1"; + db.update("NamespaceData", &non_existing_key, &updated_value) + .unwrap(); + let retrieved_value: Option = db.get("NamespaceData", &non_existing_key).unwrap(); + assert_eq!(retrieved_value, Some(updated_value.to_string())); + + // Test delete with non-existing key + db.delete("NamespaceData", &non_existing_key).unwrap(); + let retrieved_value: Option = db.get("NamespaceData", &non_existing_key).unwrap(); + assert_eq!(retrieved_value, None); + + // Test operations with non-existing column family + let non_existing_cf = "non_existing_cf"; + let key = "key1".to_string(); + let value = "value1"; + let result = db.insert(non_existing_cf, &key, &value); + assert!(result.is_err()); + let result: Result, _> = db.get(non_existing_cf, &key); + assert!(result.is_err()); + let result = db.update(non_existing_cf, &key, &value); + assert!(result.is_err()); + let result = db.delete(non_existing_cf, &key); + assert!(result.is_err()); } } diff --git a/src/dto/errors.rs b/src/dto/errors.rs new file mode 100644 index 0000000..7811a66 --- /dev/null +++ b/src/dto/errors.rs @@ -0,0 +1,137 @@ +use serde::{Deserialize, Serialize}; +use std::fmt::Debug; + +#[derive(Debug, Deserialize, Serialize)] +pub struct NamespaceNotFoundError { + pub message: String, +} + +impl From for IcebergErrorResponse { + fn from(err: NamespaceNotFoundError) -> Self { + IcebergErrorResponse { + error: ErrorModel { + message: err.message, + r#type: "NamespaceNotFound".to_string(), + code: 404, + stack: None, + }, + } + } +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct ErrorModel { + pub message: String, + pub r#type: String, // Use `r#type` to avoid keyword conflict + pub code: u16, + #[serde(skip_serializing_if = "Option::is_none")] + pub stack: Option>, +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct IcebergErrorResponse { + pub error: ErrorModel, +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct CommonResponse { + pub error: Option, +} + +#[derive(Debug, Deserialize, Serialize)] +pub struct BadRequestErrorResponse(pub CommonResponse); + +#[derive(Debug, Deserialize, Serialize)] +pub struct UnsupportedOperationResponse(pub CommonResponse); + +#[derive(Debug, Deserialize, Serialize)] +pub struct ServiceUnavailableResponse(pub CommonResponse); + +#[derive(Debug, Deserialize, Serialize)] +pub struct ServerErrorResponse(pub CommonResponse); + +#[derive(Debug, Deserialize, Serialize)] +pub enum ErrorTypes { + BadRequest(String), + Unauthorized(String), + ServiceUnavailable(String), + ServerError(String), + NamespaceNotFound(String), +} + +impl std::fmt::Display for ErrorTypes { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + ErrorTypes::BadRequest(msg) => write!(f, "Bad Request: {}", msg), + ErrorTypes::Unauthorized(msg) => write!(f, "Unauthorized: {}", msg), + ErrorTypes::ServiceUnavailable(msg) => write!(f, "Service Unavailable: {}", msg), + ErrorTypes::ServerError(msg) => write!(f, "Internal Server Error: {}", msg), + ErrorTypes::NamespaceNotFound(msg) => write!(f, "Namespace Not Found: {}", msg), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::{json, Value}; + + #[test] + fn test_namespace_not_found_error() { + let err = NamespaceNotFoundError { + message: "Namespace 'test' not found".to_string(), + }; + let iceberg_err: IcebergErrorResponse = err.into(); + + assert_eq!(iceberg_err.error.message, "Namespace 'test' not found"); + assert_eq!(iceberg_err.error.r#type, "NamespaceNotFound"); + assert_eq!(iceberg_err.error.code, 404); + assert!(iceberg_err.error.stack.is_none()); + } + + #[test] + fn test_error_model_deserialization() { + let json_str = r#"{ + "message": "Bad request", + "type": "BadRequest", + "code": 400, + "stack": null + }"#; + + let error_model: ErrorModel = serde_json::from_str(json_str).unwrap(); + + assert_eq!(error_model.message, "Bad request"); + assert_eq!(error_model.r#type, "BadRequest"); + assert_eq!(error_model.code, 400); + assert!(error_model.stack.is_none()); + } + + #[test] + fn test_error_types_display() { + let bad_request = ErrorTypes::BadRequest("Invalid request body".to_string()); + let unauthorized = ErrorTypes::Unauthorized("Missing authentication token".to_string()); + let service_unavailable = + ErrorTypes::ServiceUnavailable("Server is under maintenance".to_string()); + let server_error = ErrorTypes::ServerError("Internal server error".to_string()); + let namespace_not_found = + ErrorTypes::NamespaceNotFound("Namespace 'test' not found".to_string()); + + assert_eq!(bad_request.to_string(), "Bad Request: Invalid request body"); + assert_eq!( + unauthorized.to_string(), + "Unauthorized: Missing authentication token" + ); + assert_eq!( + service_unavailable.to_string(), + "Service Unavailable: Server is under maintenance" + ); + assert_eq!( + server_error.to_string(), + "Internal Server Error: Internal server error" + ); + assert_eq!( + namespace_not_found.to_string(), + "Namespace Not Found: Namespace 'test' not found" + ); + } +} diff --git a/src/dto/mod.rs b/src/dto/mod.rs index 7138126..bf04b8b 100644 --- a/src/dto/mod.rs +++ b/src/dto/mod.rs @@ -1,5 +1,6 @@ pub mod column_data; +pub mod errors; pub mod namespace_data; -pub mod operator_statistics; pub mod rename_request; +pub mod set_namespace_properties_req; pub mod table_data; diff --git a/src/dto/namespace_data.rs b/src/dto/namespace_data.rs index 21026ea..a0c4967 100644 --- a/src/dto/namespace_data.rs +++ b/src/dto/namespace_data.rs @@ -1,19 +1,33 @@ use serde::{Deserialize, Serialize}; use serde_json::Value; -#[derive(Debug, Serialize, Deserialize, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] pub struct NamespaceData { - pub name: String, + pub name: NamespaceIdent, pub properties: Value, } impl NamespaceData { - pub fn get_name(&self) -> String { - self.name.clone() + pub fn get_name(&self) -> &NamespaceIdent { + &self.name } - pub fn get_properties(&self) -> Value { - self.properties.clone() + pub fn get_properties(&self) -> &Value { + &self.properties + } +} + +/// NamespaceIdent represents the identifier of a namespace in the catalog. +/// +/// The namespace identifier is a list of strings, where each string is a +/// component of the namespace. It's catalog implementer's responsibility to +/// handle the namespace identifier correctly. +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub struct NamespaceIdent(pub Vec); + +impl NamespaceIdent { + pub fn new(id: Vec) -> NamespaceIdent { + NamespaceIdent(id) } } @@ -23,44 +37,62 @@ mod tests { use serde_json::json; #[test] - fn test_namespace_data_methods() { - let properties = json!({"property1": "value1", "property2": "value2"}); + fn test_namespace_ident() { + let id = vec!["test".to_string()]; + let namespace_ident = NamespaceIdent::new(id.clone()); + + assert_eq!(namespace_ident.0, id); + } + + #[test] + fn test_namespace_data() { + let id = vec!["test".to_string()]; + let namespace_ident = NamespaceIdent::new(id.clone()); + let properties = serde_json::json!({"key": "value"}); + let namespace_data = NamespaceData { - name: "test_namespace".to_string(), + name: namespace_ident.clone(), properties: properties.clone(), }; - // Test get_name method - assert_eq!(namespace_data.get_name(), "test_namespace"); + assert_eq!(*namespace_data.get_name(), namespace_ident); + assert_eq!(*namespace_data.get_properties(), properties); + } - // Test get_properties method - assert_eq!(namespace_data.get_properties(), properties); + #[test] + fn test_namespace_ident_serde() { + let id = vec!["test".to_string()]; + let namespace_ident = NamespaceIdent::new(id.clone()); + + // Serialize + let serialized = serde_json::to_string(&namespace_ident).unwrap(); + assert_eq!(serialized, r#"["test"]"#); + + // Deserialize + let deserialized: NamespaceIdent = serde_json::from_str(&serialized).unwrap(); + assert_eq!(deserialized, namespace_ident); } #[test] - fn test_namespace_data_serialization() { - let properties = json!({"property1": "value1", "property2": "value2"}); + fn test_namespace_data_serde() { + let id = vec!["test".to_string()]; + let namespace_ident = NamespaceIdent::new(id.clone()); + let properties = json!({"key": "value"}); + let namespace_data = NamespaceData { - name: "test_namespace".to_string(), + name: namespace_ident.clone(), properties: properties.clone(), }; + // Serialize let serialized = serde_json::to_string(&namespace_data).unwrap(); - let expected = - r#"{"name":"test_namespace","properties":{"property1":"value1","property2":"value2"}}"#; - assert_eq!(serialized, expected); - } - - #[test] - fn test_namespace_data_deserialization() { - let data = - r#"{"name":"test_namespace","properties":{"property1":"value1","property2":"value2"}}"#; - let namespace_data: NamespaceData = serde_json::from_str(data).unwrap(); - - assert_eq!(namespace_data.name, "test_namespace"); assert_eq!( - namespace_data.properties, - json!({"property1": "value1", "property2": "value2"}) + serialized, + r#"{"name":["test"],"properties":{"key":"value"}}"# ); + + // Deserialize + let deserialized: NamespaceData = serde_json::from_str(&serialized).unwrap(); + assert_eq!(deserialized, namespace_data); } } diff --git a/src/dto/operator_statistics.rs b/src/dto/operator_statistics.rs deleted file mode 100644 index cb7b7d9..0000000 --- a/src/dto/operator_statistics.rs +++ /dev/null @@ -1,33 +0,0 @@ -use serde::{Deserialize, Serialize}; - -#[derive(Debug, Serialize, Deserialize)] -pub struct OperatorStatistics { - pub operator_string: String, - pub cardinality_prev_result: u64, -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_operator_statistics_serialization() { - let operator_statistics = OperatorStatistics { - operator_string: "test_operator".to_string(), - cardinality_prev_result: 100, - }; - - let serialized = serde_json::to_string(&operator_statistics).unwrap(); - let expected = r#"{"operator_string":"test_operator","cardinality_prev_result":100}"#; - assert_eq!(serialized, expected); - } - - #[test] - fn test_operator_statistics_deserialization() { - let data = r#"{"operator_string":"test_operator","cardinality_prev_result":100}"#; - let operator_statistics: OperatorStatistics = serde_json::from_str(data).unwrap(); - - assert_eq!(operator_statistics.operator_string, "test_operator"); - assert_eq!(operator_statistics.cardinality_prev_result, 100); - } -} diff --git a/src/dto/rename_request.rs b/src/dto/rename_request.rs index 7e04ea1..352f534 100644 --- a/src/dto/rename_request.rs +++ b/src/dto/rename_request.rs @@ -1,36 +1,84 @@ +use crate::dto::table_data::TableIdent; use serde::{Deserialize, Serialize}; -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub struct TableRenameRequest { - pub namespace: String, - pub old_name: String, - pub new_name: String, + pub source: TableIdent, + pub destination: TableIdent, } #[cfg(test)] mod tests { use super::*; + use crate::dto::namespace_data::NamespaceIdent; + use serde_json; + #[test] + fn test_table_rename_request() { + let source = TableIdent { + namespace: NamespaceIdent(vec!["source_namespace".to_string()]), + name: "source_table".to_string(), + }; + + let destination = TableIdent { + namespace: NamespaceIdent(vec!["destination_namespace".to_string()]), + name: "destination_table".to_string(), + }; + + let request = TableRenameRequest { + source, + destination, + }; + + assert_eq!(request.source.namespace.0[0], "source_namespace"); + assert_eq!(request.source.name, "source_table"); + assert_eq!(request.destination.namespace.0[0], "destination_namespace"); + assert_eq!(request.destination.name, "destination_table"); + } #[test] fn test_table_rename_request_serialization() { - let table_rename_request = TableRenameRequest { - namespace: "test_namespace".to_string(), - old_name: "old_table_name".to_string(), - new_name: "new_table_name".to_string(), + let source = TableIdent { + namespace: NamespaceIdent(vec!["source_namespace".to_string()]), + name: "source_table".to_string(), + }; + + let destination = TableIdent { + namespace: NamespaceIdent(vec!["destination_namespace".to_string()]), + name: "destination_table".to_string(), }; - let serialized = serde_json::to_string(&table_rename_request).unwrap(); - let expected = r#"{"namespace":"test_namespace","old_name":"old_table_name","new_name":"new_table_name"}"#; - assert_eq!(serialized, expected); + let request = TableRenameRequest { + source, + destination, + }; + let serialized = serde_json::to_string(&request).unwrap(); + + assert!(serialized.contains("source_namespace")); + assert!(serialized.contains("source_table")); + assert!(serialized.contains("destination_namespace")); + assert!(serialized.contains("destination_table")); } #[test] fn test_table_rename_request_deserialization() { - let data = r#"{"namespace":"test_namespace","old_name":"old_table_name","new_name":"new_table_name"}"#; - let table_rename_request: TableRenameRequest = serde_json::from_str(data).unwrap(); + let data = r#" + { + "source": { + "namespace": ["source_namespace"], + "name": "source_table" + }, + "destination": { + "namespace": ["destination_namespace"], + "name": "destination_table" + } + } + "#; + + let request: TableRenameRequest = serde_json::from_str(data).unwrap(); - assert_eq!(table_rename_request.namespace, "test_namespace"); - assert_eq!(table_rename_request.old_name, "old_table_name"); - assert_eq!(table_rename_request.new_name, "new_table_name"); + assert_eq!(request.source.namespace.0[0], "source_namespace"); + assert_eq!(request.source.name, "source_table"); + assert_eq!(request.destination.namespace.0[0], "destination_namespace"); + assert_eq!(request.destination.name, "destination_table"); } } diff --git a/src/dto/set_namespace_properties_req.rs b/src/dto/set_namespace_properties_req.rs new file mode 100644 index 0000000..020df19 --- /dev/null +++ b/src/dto/set_namespace_properties_req.rs @@ -0,0 +1,73 @@ +use serde::{Deserialize, Serialize}; +use serde_json::{Map, Value}; + +#[derive(Debug, Deserialize, Serialize)] +pub struct SetNamespacePropertiesRequest { + pub removals: Vec, + pub updates: Map, +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn test_set_namespace_properties_request() { + let removals = vec!["property1".to_string(), "property2".to_string()]; + let updates = json!({ + "property3": "value3", + "property4": "value4" + }) + .as_object() + .unwrap() + .clone(); + + let request = SetNamespacePropertiesRequest { removals, updates }; + + assert_eq!(request.removals[0], "property1"); + assert_eq!(request.removals[1], "property2"); + assert_eq!(request.updates["property3"], "value3"); + assert_eq!(request.updates["property4"], "value4"); + } + + #[test] + fn test_set_namespace_properties_request_serialization() { + let removals = vec!["property1".to_string(), "property2".to_string()]; + let updates = json!({ + "property3": "value3", + "property4": "value4" + }) + .as_object() + .unwrap() + .clone(); + + let request = SetNamespacePropertiesRequest { removals, updates }; + let serialized = serde_json::to_string(&request).unwrap(); + + assert!(serialized.contains("property1")); + assert!(serialized.contains("property2")); + assert!(serialized.contains("value3")); + assert!(serialized.contains("value4")); + } + + #[test] + fn test_set_namespace_properties_request_deserialization() { + let data = r#" + { + "removals": ["property1", "property2"], + "updates": { + "property3": "value3", + "property4": "value4" + } + } + "#; + + let request: SetNamespacePropertiesRequest = serde_json::from_str(data).unwrap(); + + assert_eq!(request.removals[0], "property1"); + assert_eq!(request.removals[1], "property2"); + assert_eq!(request.updates["property3"], "value3"); + assert_eq!(request.updates["property4"], "value4"); + } +} diff --git a/src/dto/table_data.rs b/src/dto/table_data.rs index 8565b89..0bc0653 100644 --- a/src/dto/table_data.rs +++ b/src/dto/table_data.rs @@ -1,65 +1,123 @@ -use crate::dto::column_data::ColumnData; +// use crate::dto::column_data::ColumnData; +use crate::dto::namespace_data::NamespaceIdent; use serde::{Deserialize, Serialize}; -use serde_json::Value; +use typed_builder::TypedBuilder; +// use std::collections::HashMap; #[derive(Debug, Serialize, Deserialize, Clone)] -pub struct TableData { +pub struct Table { + pub id: TableIdent, + pub metadata: TableMetadata, +} + +/// TableIdent represents the identifier of a table in the catalog. +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)] +pub struct TableIdent { + /// Namespace of the table. + pub namespace: NamespaceIdent, + /// Table name. + pub name: String, +} + +impl TableIdent { + /// Create a new table identifier. + pub fn new(namespace: NamespaceIdent, name: String) -> Self { + Self { namespace, name } + } +} + +#[derive(Serialize, Deserialize, Debug, TypedBuilder, Clone, PartialEq, Eq, Hash)] +pub struct TableCreation { + /// The name of the table. pub name: String, - pub num_columns: u64, - pub read_properties: Value, - pub write_properties: Value, - pub file_urls: Vec, - pub columns: Vec, + // pub file_urls: Option>, + // pub columns: Option>, + // pub properties: Option>, +} + +#[derive(PartialEq, Debug, Serialize, Deserialize, Clone)] +pub struct TableMetadata { + pub table_uuid: String, + // pub file_urls: Option>, + // pub columns: Option>, + // pub properties: Option>, } #[cfg(test)] mod tests { use super::*; - use serde_json::json; + use serde_json; + + #[test] + fn test_table_ident() { + let namespace = NamespaceIdent(vec!["namespace".to_string()]); + let name = "table".to_string(); + let table_ident = TableIdent::new(namespace.clone(), name.clone()); + + assert_eq!(table_ident.namespace, namespace); + assert_eq!(table_ident.name, name); + } + + #[test] + fn test_table_creation() { + let name = "table".to_string(); + let table_creation = TableCreation::builder().name(name.clone()).build(); + + assert_eq!(table_creation.name, name); + } #[test] - fn test_table_data_serialization() { - let column_data = ColumnData { - name: "test_column".to_string(), - aggregates: json!({"count": 100, "sum": 200}), - value_range: (10, 20), - is_strong_key: true, - is_weak_key: false, - primary_key_col_name: "id".to_string(), + fn test_table_metadata() { + let table_uuid = "uuid".to_string(); + let table_metadata = TableMetadata { + table_uuid: table_uuid.clone(), }; - let table_data = TableData { - name: "test_table".to_string(), - num_columns: 1, - read_properties: json!({"property1": "value1"}), - write_properties: json!({"property2": "value2"}), - file_urls: vec!["url1".to_string(), "url2".to_string()], - columns: vec![column_data], + + assert_eq!(table_metadata.table_uuid, table_uuid); + } + + #[test] + fn test_table() { + let id = TableIdent::new( + NamespaceIdent(vec!["namespace".to_string()]), + "table".to_string(), + ); + let metadata = TableMetadata { + table_uuid: "uuid".to_string(), + }; + let table = Table { + id: id.clone(), + metadata: metadata.clone(), }; - let serialized = serde_json::to_string(&table_data).unwrap(); - let expected = r#"{"name":"test_table","num_columns":1,"read_properties":{"property1":"value1"},"write_properties":{"property2":"value2"},"file_urls":["url1","url2"],"columns":[{"name":"test_column","aggregates":{"count":100,"sum":200},"value_range":[10,20],"is_strong_key":true,"is_weak_key":false,"primary_key_col_name":"id"}]}"#; - assert_eq!(serialized, expected); + assert_eq!(table.id, id); + assert_eq!(table.metadata, metadata); } #[test] - fn test_table_data_deserialization() { - let data = r#"{"name":"test_table","num_columns":1,"read_properties":{"property1":"value1"},"write_properties":{"property2":"value2"},"file_urls":["url1","url2"],"columns":[{"name":"test_column","aggregates":{"count":100,"sum":200},"value_range":[10,20],"is_strong_key":true,"is_weak_key":false,"primary_key_col_name":"id"}]}"#; - let table_data: TableData = serde_json::from_str(data).unwrap(); - - assert_eq!(table_data.name, "test_table"); - assert_eq!(table_data.num_columns, 1); - assert_eq!(table_data.read_properties, json!({"property1": "value1"})); - assert_eq!(table_data.write_properties, json!({"property2": "value2"})); - assert_eq!(table_data.file_urls, vec!["url1", "url2"]); - assert_eq!(table_data.columns.len(), 1); - assert_eq!(table_data.columns[0].name, "test_column"); - assert_eq!( - table_data.columns[0].aggregates, - json!({"count": 100, "sum": 200}) + fn test_table_ident_serialization() { + let table_ident = TableIdent::new( + NamespaceIdent(vec!["namespace".to_string()]), + "table".to_string(), ); - assert_eq!(table_data.columns[0].value_range, (10, 20)); - assert_eq!(table_data.columns[0].is_strong_key, true); - assert_eq!(table_data.columns[0].is_weak_key, false); - assert_eq!(table_data.columns[0].primary_key_col_name, "id"); + let serialized = serde_json::to_string(&table_ident).unwrap(); + + assert!(serialized.contains("namespace")); + assert!(serialized.contains("table")); + } + + #[test] + fn test_table_ident_deserialization() { + let data = r#" + { + "namespace": ["namespace"], + "name": "table" + } + "#; + + let table_ident: TableIdent = serde_json::from_str(data).unwrap(); + + assert_eq!(table_ident.namespace.0[0], "namespace"); + assert_eq!(table_ident.name, "table"); } } diff --git a/src/handlers/namespace_handler.rs b/src/handlers/namespace_handler.rs index c100211..abacbf0 100644 --- a/src/handlers/namespace_handler.rs +++ b/src/handlers/namespace_handler.rs @@ -1,33 +1,45 @@ -use crate::dto::namespace_data::NamespaceData; +use crate::dto::namespace_data::{NamespaceData, NamespaceIdent}; +use crate::dto::set_namespace_properties_req::SetNamespacePropertiesRequest; use crate::repository::namespace::NamespaceRepository; + use axum::{ extract::{Json, Path, State}, http::StatusCode, }; -use serde_json::Value; +use serde_json::{json, Value}; use std::sync::Arc; /* TODO: - if a namespace or table already exists, you might want to return a StatusCode::CONFLICT - instead of StatusCode::INTERNAL_SERVER_ERROR. Similarly, if a namespace or table is not found, - you might want to return a StatusCode::NOT_FOUND. + Parent Namespace */ pub async fn list_namespaces( State(repo): State>, -) -> Result>, (StatusCode, String)> { - repo.list_all_namespaces() - .map(Json) - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Error: {}", e))) +) -> Result, (StatusCode, String)> { + match repo.list_all_namespaces() { + Ok(namespaces) => { + let json_object = json!({ + "namespaces": namespaces + }); + Ok(Json(json_object)) + } + Err(e) => Err(( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Internal server error: {}", e), + )), + } } pub async fn create_namespace( State(repo): State>, new_namespace: Json, ) -> Result, (StatusCode, String)> { + if repo.namespace_exists(new_namespace.get_name()).unwrap() { + return Err((StatusCode::CONFLICT, format!("namespace already exists"))); + } repo.create_namespace( - new_namespace.get_name(), - Some(new_namespace.get_properties()), + new_namespace.get_name().clone(), + Some(new_namespace.get_properties().clone()), ) .map(|_| new_namespace) .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Error: {}", e))) @@ -37,7 +49,13 @@ pub async fn load_namespace_metadata( State(repo): State>, Path(namespace): Path, ) -> Result, (StatusCode, String)> { - match repo.load_namespace(namespace.as_str()) { + let id = NamespaceIdent::new( + namespace + .split('\u{1F}') + .map(|part| part.to_string()) + .collect(), + ); + match repo.load_namespace(&id) { Ok(Some(metadata)) => Ok(Json(metadata)), Ok(None) => Err(( StatusCode::NOT_FOUND, @@ -51,10 +69,17 @@ pub async fn namespace_exists( State(repo): State>, Path(namespace): Path, ) -> Result { - repo.namespace_exists(namespace.as_str()) + let id = NamespaceIdent::new( + namespace + .split('\u{1F}') + .map(|part| part.to_string()) + .collect(), + ); + repo.namespace_exists(&id) .map(|exists| { if exists { - StatusCode::FOUND + // Ideally this should be FOUND but Iceberg spec says No content + StatusCode::NO_CONTENT } else { StatusCode::NOT_FOUND } @@ -66,7 +91,17 @@ pub async fn drop_namespace( State(repo): State>, Path(namespace): Path, ) -> Result { - repo.delete_namespace(namespace.as_str()) + let id = NamespaceIdent::new( + namespace + .split('\u{1F}') + .map(|part| part.to_string()) + .collect(), + ); + if !repo.namespace_exists(&id).unwrap() { + return Err((StatusCode::NOT_FOUND, format!("namespace does not exist"))); + } + + repo.delete_namespace(&id) .map(|_| StatusCode::NO_CONTENT) .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Error: {}", e))) } @@ -74,238 +109,119 @@ pub async fn drop_namespace( pub async fn set_namespace_properties( State(repo): State>, Path(namespace): Path, - properties: Json, + request_body: Json, ) -> Result { - repo.set_namespace_properties(namespace.as_str(), properties.0) - .map(|_| StatusCode::OK) - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Error: {}", e))) + let id = NamespaceIdent::new( + namespace + .split('\u{1F}') + .map(|part| part.to_string()) + .collect(), + ); + + if !repo.namespace_exists(&id).unwrap() { + return Err((StatusCode::NOT_FOUND, format!("namespace does not exist"))); + } + + // Check if a property key was included in both `removals` and `updates` + + repo.set_namespace_properties( + id, + request_body.removals.clone(), + request_body.updates.clone(), + ) + .map(|_| StatusCode::OK) + .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Error: {}", e))) } -// todo: check commented tests #[cfg(test)] mod tests { use super::*; use crate::database::database::Database; + use crate::dto::set_namespace_properties_req::SetNamespacePropertiesRequest; use axum::http::StatusCode; - use serde_json::json; use std::sync::{Arc, Mutex}; use tempfile::tempdir; - #[tokio::test] - async fn test_list_namespaces() { - let repo = Arc::new(NamespaceRepository::new(Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )))); - let data1 = list_namespaces(State(repo)).await.unwrap(); - let data2: Json> = Json(vec![]); - assert!(*data1 == *data2); - } - - #[tokio::test] - async fn test_create_namespace() { + async fn test_namespace_endpoints() { + let dir = tempdir().unwrap(); + let db = Database::open(dir.path()).unwrap(); + let db = Arc::new(Mutex::new(db)); let repo = Arc::new(NamespaceRepository::new(Arc::new(Mutex::new( Database::open(tempdir().unwrap().path()).unwrap(), )))); + // Test create_namespace let new_namespace = Json(NamespaceData { - name: "namespace".to_string(), - properties: json!({}), + name: NamespaceIdent(vec!["test".to_string()]), + properties: json!({"property1": "value1"}), }); assert_eq!( - create_namespace(State(repo), new_namespace.clone()) + create_namespace(State(repo.clone()), new_namespace.clone()) .await .unwrap() .name, new_namespace.name ); - } - - #[tokio::test] - async fn test_load_namespace_metadata() { - let repo = Arc::new(NamespaceRepository::new(Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )))); - let new_namespace = Json(NamespaceData { - name: "namespace".to_string(), - properties: json!({}), - }); - let _ = create_namespace(State(repo.clone()), new_namespace.clone()) - .await - .unwrap(); + // Test namespace_exists assert_eq!( - load_namespace_metadata(State(repo), Path("namespace".to_string())) - .await - .unwrap() - .name, - new_namespace.name - ); - } - - #[tokio::test] - async fn test_namespace_exists() { - let repo = Arc::new(NamespaceRepository::new(Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )))); - let new_namespace = Json(NamespaceData { - name: "namespace".to_string(), - properties: json!({}), - }); - let _ = create_namespace(State(repo.clone()), new_namespace) - .await - .unwrap(); - assert_eq!( - namespace_exists(State(repo), Path("namespace".to_string())) + namespace_exists(State(repo.clone()), Path("test".to_string())) .await .unwrap(), - StatusCode::FOUND + StatusCode::NO_CONTENT ); - } - #[tokio::test] - async fn test_drop_namespace() { - let repo = Arc::new(NamespaceRepository::new(Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )))); - let new_namespace = Json(NamespaceData { - name: "namespace".to_string(), - properties: json!({}), - }); - let _ = create_namespace(State(repo.clone()), new_namespace) - .await - .unwrap(); + // Test load_namespace_metadata assert_eq!( - drop_namespace(State(repo), Path("namespace".to_string())) + load_namespace_metadata(State(repo.clone()), Path("test".to_string())) .await - .unwrap(), - StatusCode::NO_CONTENT + .unwrap() + .name, + new_namespace.name ); - } - #[tokio::test] - async fn test_set_namespace_properties() { - let repo = Arc::new(NamespaceRepository::new(Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )))); - let new_namespace = Json(NamespaceData { - name: "namespace".to_string(), - properties: json!({}), + // Test set_namespace_properties + let set_namespace_properties_request = Json(SetNamespacePropertiesRequest { + removals: vec!["property1".to_string()], + updates: serde_json::from_value(json!({"property2": "value2"})).unwrap(), }); - let _ = create_namespace(State(repo.clone()), new_namespace) - .await - .unwrap(); assert_eq!( set_namespace_properties( - State(repo), - Path("namespace".to_string()), - Json(json!({"property": "value"})) + State(repo.clone()), + Path("test".to_string()), + set_namespace_properties_request ) .await .unwrap(), StatusCode::OK ); - } - // Negative cases - #[tokio::test] - async fn test_load_namespace_metadata_not_found() { - let repo = Arc::new(NamespaceRepository::new(Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )))); + // Test load_namespace_metadata after set_namespace_properties assert_eq!( - load_namespace_metadata(State(repo), Path("nonexistent".to_string())) + load_namespace_metadata(State(repo.clone()), Path("test".to_string())) .await - .unwrap_err() - .0, - StatusCode::NOT_FOUND + .unwrap() + .name, + Json(NamespaceData { + name: NamespaceIdent(vec!["test".to_string()]), + properties: json!({"property2": "value2"}), + }) + .name ); - } - #[tokio::test] - async fn test_namespace_exists_not_found() { - let repo = Arc::new(NamespaceRepository::new(Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )))); + // Test drop_namespace assert_eq!( - namespace_exists(State(repo), Path("nonexistent".to_string())) + drop_namespace(State(repo.clone()), Path("test".to_string())) .await .unwrap(), - StatusCode::NOT_FOUND - ); - } - - /* - #[tokio::test] - async fn test_drop_namespace_not_found() { - let repo = Arc::new(NamespaceRepository::new(Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )))); - assert_eq!( - drop_namespace(State(repo), Path("nonexistent".to_string())) - .await - .unwrap_err() - .0, - StatusCode::INTERNAL_SERVER_ERROR - ); - } - */ - - #[tokio::test] - async fn test_set_namespace_properties_not_found() { - let repo = Arc::new(NamespaceRepository::new(Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )))); - assert_eq!( - set_namespace_properties( - State(repo), - Path("nonexistent".to_string()), - Json(json!({"property": "value"})) - ) - .await - .unwrap_err() - .0, - StatusCode::INTERNAL_SERVER_ERROR - ); - } - - /* - // Corner cases - #[tokio::test] - async fn test_create_namespace_empty_name() { - let repo = Arc::new(NamespaceRepository::new(Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )))); - let new_namespace = Json(NamespaceData { - name: "".to_string(), - properties: json!({}), - }); - assert_eq!( - create_namespace(State(repo), new_namespace) - .await - .unwrap_err() - .0, - StatusCode::INTERNAL_SERVER_ERROR + StatusCode::NO_CONTENT ); - } - #[tokio::test] - async fn test_create_namespace_already_exists() { - let repo = Arc::new(NamespaceRepository::new(Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )))); - let new_namespace = Json(NamespaceData { - name: "namespace".to_string(), - properties: json!({}), - }); - let _ = create_namespace(State(repo.clone()), new_namespace.clone()) - .await - .unwrap(); + // Test namespace_exists after drop_namespace assert_eq!( - create_namespace(State(repo), new_namespace) + namespace_exists(State(repo.clone()), Path("test".to_string())) .await - .unwrap_err() - .0, - StatusCode::INTERNAL_SERVER_ERROR + .unwrap(), + StatusCode::NOT_FOUND ); } - */ } diff --git a/src/handlers/table_handler.rs b/src/handlers/table_handler.rs index 6fb10a7..38d105c 100644 --- a/src/handlers/table_handler.rs +++ b/src/handlers/table_handler.rs @@ -1,48 +1,71 @@ +use crate::dto::namespace_data::NamespaceIdent; use crate::dto::rename_request::TableRenameRequest; -use crate::dto::table_data::TableData; +use crate::dto::table_data::{Table, TableCreation, TableIdent}; use crate::repository::table::TableRepository; use axum::{ extract::{Json, Path, State}, http::StatusCode, }; +use std::io::ErrorKind; use std::sync::Arc; pub async fn list_tables( State(repo): State>, Path(namespace): Path, -) -> Result>, (StatusCode, String)> { - repo.list_all_tables(&namespace) - .map(|tables| Json(tables.unwrap_or_default())) - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Error: {}", e))) +) -> Result>, (StatusCode, String)> { + let id = NamespaceIdent::new( + namespace + .split('\u{1F}') + .map(|part| part.to_string()) + .collect(), + ); + match repo.list_all_tables(&id) { + Ok(tables) => Ok(Json(tables.unwrap_or_default())), + Err(e) => match e.kind() { + ErrorKind::NotFound => Err((StatusCode::NOT_FOUND, format!("Error: {}", e))), + _ => Err((StatusCode::INTERNAL_SERVER_ERROR, format!("Error: {}", e))), + }, + } } pub async fn create_table( State(repo): State>, Path(namespace): Path, - table: Json, + table: Json, ) -> Result { - repo.create_table(&namespace, &table) - .map(|_| StatusCode::CREATED) - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Error: {}", e))) -} - -pub async fn register_table( - State(repo): State>, - Path(namespace): Path, - table: Json, -) -> Result { - repo.register_table(&namespace, &table) - .map(|_| StatusCode::CREATED) - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Error: {}", e))) + let id = NamespaceIdent::new( + namespace + .split('\u{1F}') + .map(|part| part.to_string()) + .collect(), + ); + match repo.create_table(&id, &table) { + Ok(_) => Ok(StatusCode::CREATED), + Err(e) => match e.kind() { + ErrorKind::NotFound => Err((StatusCode::NOT_FOUND, format!("Error: {}", e))), + ErrorKind::AlreadyExists => Err((StatusCode::CONFLICT, format!("Error: {}", e))), + _ => Err((StatusCode::INTERNAL_SERVER_ERROR, format!("Error: {}", e))), + }, + } } pub async fn load_table( State(repo): State>, Path((namespace, table)): Path<(String, String)>, -) -> Result, (StatusCode, String)> { - match repo.load_table(&namespace, &table) { +) -> Result, (StatusCode, String)> { + let id = NamespaceIdent::new( + namespace + .split('\u{1F}') + .map(|part| part.to_string()) + .collect(), + ); + + match repo.load_table(&id, table.clone()) { Ok(Some(table_data)) => Ok(Json(table_data)), - Ok(None) => Err((StatusCode::NOT_FOUND, format!("Table {} not found", table))), + Ok(None) => Err(( + StatusCode::NOT_FOUND, + format!("Table {} not found", table.clone()), + )), Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, format!("Error: {}", e))), } } @@ -51,17 +74,34 @@ pub async fn delete_table( State(repo): State>, Path((namespace, table)): Path<(String, String)>, ) -> Result { - repo.drop_table(&namespace, &table) - .map(|_| StatusCode::NO_CONTENT) - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Error: {}", e))) + let id = NamespaceIdent::new( + namespace + .split('\u{1F}') + .map(|part| part.to_string()) + .collect(), + ); + match repo.drop_table(&id, table) { + Ok(_) => Ok(StatusCode::NO_CONTENT), + Err(e) => match e.kind() { + ErrorKind::NotFound => Err((StatusCode::NOT_FOUND, format!("Error: {}", e))), + _ => Err((StatusCode::INTERNAL_SERVER_ERROR, format!("Error: {}", e))), + }, + } } pub async fn table_exists( State(repo): State>, Path((namespace, table)): Path<(String, String)>, ) -> Result { - match repo.table_exists(&namespace, &table) { - Ok(true) => Ok(StatusCode::FOUND), + let id = NamespaceIdent::new( + namespace + .split('\u{1F}') + .map(|part| part.to_string()) + .collect(), + ); + match repo.table_exists(&id, table) { + // Ideally this should be FOUND but Iceberg spec says 204 + Ok(true) => Ok(StatusCode::NO_CONTENT), Ok(false) => Ok(StatusCode::NOT_FOUND), Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, format!("Error: {}", e))), } @@ -71,17 +111,123 @@ pub async fn rename_table( State(repo): State>, request: Json, ) -> Result { - repo.rename_table(&request) - .map(|_| StatusCode::OK) - .map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("Error: {}", e))) + match repo.rename_table(&request) { + Ok(_) => Ok(StatusCode::NO_CONTENT), + Err(e) => match e.kind() { + ErrorKind::NotFound => Err((StatusCode::NOT_FOUND, format!("Error: {}", e))), + ErrorKind::AlreadyExists => Err((StatusCode::CONFLICT, format!("Error: {}", e))), + _ => Err((StatusCode::INTERNAL_SERVER_ERROR, format!("Error: {}", e))), + }, + } } -#[derive(Debug, serde::Serialize, serde::Deserialize)] -pub struct MetricsReport {} +#[cfg(test)] +mod tests { + use super::*; + use crate::database::database::Database; + use crate::dto::table_data::TableCreation; + use crate::repository::namespace::NamespaceRepository; + use axum::http::StatusCode; + use std::sync::{Arc, Mutex}; + use tempfile::tempdir; -pub async fn report_metrics( - Path((namespace, table)): Path<(String, String)>, -) -> Result, (StatusCode, String)> { - // Logic to process metrics report - Ok(Json(table)) + #[tokio::test] + async fn test_table_endpoints() { + let dir = tempdir().unwrap(); + let db = Database::open(dir.path()).unwrap(); + let db = Arc::new(Mutex::new(db)); + let repo = Arc::new(TableRepository::new(db.clone())); + + // Create a namespace for testing + let namespace_ident = NamespaceIdent(vec!["test".to_string()]); + let namespace_repo = NamespaceRepository::new(db.clone()); + namespace_repo + .create_namespace(namespace_ident.clone(), None) + .unwrap(); + + // Test create_table + let table_creation = Json(TableCreation { + name: "table1".to_string(), + }); + assert_eq!( + create_table( + State(repo.clone()), + Path("test".to_string()), + table_creation.clone() + ) + .await + .unwrap(), + StatusCode::CREATED + ); + + // Test table_exists + assert_eq!( + table_exists( + State(repo.clone()), + Path(("test".to_string(), "table1".to_string())) + ) + .await + .unwrap(), + StatusCode::NO_CONTENT + ); + + // Test load_table + let table = load_table( + State(repo.clone()), + Path(("test".to_string(), "table1".to_string())), + ) + .await + .unwrap(); + assert_eq!(table.id.name, "table1"); + + // Test rename_table + let rename_request = Json(TableRenameRequest { + source: TableIdent::new(namespace_ident.clone(), "table1".to_string()), + destination: TableIdent::new(namespace_ident.clone(), "table2".to_string()), + }); + assert_eq!( + rename_table(State(repo.clone()), rename_request.clone()) + .await + .unwrap(), + StatusCode::NO_CONTENT + ); + assert_eq!( + table_exists( + State(repo.clone()), + Path(("test".to_string(), "table1".to_string())) + ) + .await + .unwrap(), + StatusCode::NOT_FOUND + ); + assert_eq!( + table_exists( + State(repo.clone()), + Path(("test".to_string(), "table2".to_string())) + ) + .await + .unwrap(), + StatusCode::NO_CONTENT + ); + + // Test delete_table + assert_eq!( + delete_table( + State(repo.clone()), + Path(("test".to_string(), "table2".to_string())) + ) + .await + .unwrap(), + StatusCode::NO_CONTENT + ); + assert_eq!( + table_exists( + State(repo.clone()), + Path(("test".to_string(), "table2".to_string())) + ) + .await + .unwrap(), + StatusCode::NOT_FOUND + ); + } } diff --git a/src/main.rs b/src/main.rs index 8625723..253decc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,7 +4,6 @@ mod dto; mod handlers; mod repository; mod routes; -mod tests; use config::parameters; use database::database::Database; diff --git a/src/repository/namespace.rs b/src/repository/namespace.rs index a30df01..5b82903 100644 --- a/src/repository/namespace.rs +++ b/src/repository/namespace.rs @@ -1,7 +1,7 @@ use crate::database::database::Database; -use crate::dto::namespace_data::NamespaceData; -use serde_json::{json, Value}; -use std::io; +use crate::dto::namespace_data::{NamespaceData, NamespaceIdent}; +use serde_json::{json, Map, Value}; +use std::io::{self, ErrorKind}; use std::sync::{Arc, Mutex}; pub struct NamespaceRepository { @@ -13,49 +13,87 @@ impl NamespaceRepository { Self { database } } - pub fn list_all_namespaces(&self) -> io::Result> { + pub fn list_all_namespaces(&self) -> io::Result> { let db = self.database.lock().unwrap(); db.list_all_keys("NamespaceData") .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string())) } - pub fn create_namespace(&self, name: String, properties: Option) -> io::Result<()> { - let name_str: &str = name.as_str(); + pub fn create_namespace( + &self, + name: NamespaceIdent, + properties: Option, + ) -> io::Result<()> { let namespace_data = NamespaceData { - name: name_str.to_string(), + name: name.clone(), properties: properties.unwrap_or_else(|| json!({"last_modified_time": current_time()})), }; let db = self.database.lock().unwrap(); - db.insert("NamespaceData", name_str, &namespace_data) + db.insert("NamespaceData", &name, &namespace_data) } - pub fn load_namespace(&self, name: &str) -> io::Result> { + pub fn delete_namespace(&self, name: &NamespaceIdent) -> io::Result<()> { let db = self.database.lock().unwrap(); - db.get("NamespaceData", name) + db.delete("NamespaceData", name) } - pub fn namespace_exists(&self, name: &str) -> io::Result { + pub fn load_namespace(&self, name: &NamespaceIdent) -> io::Result> { let db = self.database.lock().unwrap(); - db.get::("NamespaceData", name) - .map(|data| data.is_some()) + db.get::("NamespaceData", &name) } - pub fn delete_namespace(&self, name: &str) -> io::Result<()> { + pub fn namespace_exists(&self, name: &NamespaceIdent) -> io::Result { let db = self.database.lock().unwrap(); - db.delete("NamespaceData", name) + db.get::("NamespaceData", &name) + .map(|data| data.is_some()) } - pub fn set_namespace_properties(&self, name: &str, properties: Value) -> io::Result<()> { - if let Some(mut namespace_data) = self.load_namespace(name)? { - namespace_data.properties = properties; - let db = self.database.lock().unwrap(); - db.update("NamespaceData", name, &namespace_data) - } else { - Err(io::Error::new( - io::ErrorKind::NotFound, - "Namespace not found", - )) + pub fn set_namespace_properties( + &self, + name: NamespaceIdent, + removals: Vec, + updates: Map, + ) -> io::Result<()> { + let db = self.database.lock().unwrap(); + // Get the current properties + let namespace_data: NamespaceData = match db.get("NamespaceData", &name)? { + Some(data) => data, + None => { + return Err(io::Error::new( + ErrorKind::NotFound, + format!("Namespace {} not found", name.0.join("\u{1F}")), + )) + } + }; + + // Convert the properties to a mutable Map + let mut p = namespace_data.get_properties().clone(); + let properties = p + .as_object_mut() + .ok_or_else(|| io::Error::new(ErrorKind::Other, "Properties value is not an object"))?; + + // Remove properties + for key in removals { + properties.remove(&key); + } + + // Update properties + for (key, value) in updates { + properties.insert(key, value); } + let props = Value::Object(properties.clone()); + let name_copy = name.clone(); + // Save the updated properties + db.update( + "NamespaceData", + &name, + &NamespaceData { + name: name_copy, + properties: props, + }, + )?; + + Ok(()) } } @@ -63,8 +101,6 @@ fn current_time() -> String { "current_time".to_string() } -// todo: check commented tests - #[cfg(test)] mod tests { use super::*; @@ -72,188 +108,66 @@ mod tests { use tempfile::tempdir; #[test] - fn test_list_all_namespaces() { - let db = Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )); - let repo = NamespaceRepository::new(db); - assert_eq!(repo.list_all_namespaces().unwrap(), Vec::::new()); - } - - #[test] - fn test_create_namespace() { - let db = Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )); - let repo = NamespaceRepository::new(db); - assert!(repo.create_namespace("test".to_string(), None).is_ok()); + fn test_namespace_repository() { + let dir = tempdir().unwrap(); + let db = Database::open(dir.path()).unwrap(); + let db = Arc::new(Mutex::new(db)); + let repo = NamespaceRepository::new(db.clone()); + + // Test create_namespace + let namespace_ident = NamespaceIdent(vec!["test".to_string()]); + let properties = Some(json!({"property1": "value1"})); + repo.create_namespace(namespace_ident.clone(), properties) + .unwrap(); + + // Test namespace_exists + assert!(repo.namespace_exists(&namespace_ident).unwrap()); + + // Test load_namespace + let namespace_data = repo.load_namespace(&namespace_ident).unwrap().unwrap(); + assert_eq!(namespace_data.name, namespace_ident); + assert_eq!(namespace_data.properties, json!({"property1": "value1"})); + + // Test set_namespace_properties + let removals = vec!["property1".to_string()]; + let mut updates = Map::new(); + updates.insert("property2".to_string(), json!("value2")); + repo.set_namespace_properties(namespace_ident.clone(), removals, updates) + .unwrap(); + + let updated_namespace_data = repo.load_namespace(&namespace_ident).unwrap().unwrap(); + assert_eq!( + updated_namespace_data.properties, + json!({"property2": "value2"}) + ); + + // Test delete_namespace + repo.delete_namespace(&namespace_ident).unwrap(); + assert!(!repo.namespace_exists(&namespace_ident).unwrap()); } #[test] - fn test_load_namespace() { - let db = Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )); - let repo = NamespaceRepository::new(db); - repo.create_namespace("test".to_string(), None).unwrap(); - assert!(repo.load_namespace("test").unwrap().is_some()); - } + fn test_namespace_repository_negative() { + let dir = tempdir().unwrap(); + let db = Database::open(dir.path()).unwrap(); + let db = Arc::new(Mutex::new(db)); + let repo = NamespaceRepository::new(db.clone()); - #[test] - fn test_namespace_exists() { - let db = Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )); - let repo = NamespaceRepository::new(db); - repo.create_namespace("test".to_string(), None).unwrap(); - assert!(repo.namespace_exists("test").unwrap()); - } + // Test namespace_exists with non-existent namespace + let non_existent_namespace = NamespaceIdent(vec!["non_existent".to_string()]); + assert!(!repo.namespace_exists(&non_existent_namespace).unwrap()); - #[test] - fn test_delete_namespace() { - let db = Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )); - let repo = NamespaceRepository::new(db); - repo.create_namespace("test".to_string(), None).unwrap(); - assert!(repo.delete_namespace("test").is_ok()); - } - - #[test] - fn test_set_namespace_properties() { - let db = Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )); - let repo = NamespaceRepository::new(db); - repo.create_namespace("test".to_string(), None).unwrap(); + // Test load_namespace with non-existent namespace assert!(repo - .set_namespace_properties("test", json!({"property": "value"})) - .is_ok()); - } - - #[test] - fn test_load_namespace_not_found() { - let db = Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )); - let repo = NamespaceRepository::new(db); - assert!(repo.load_namespace("nonexistent").unwrap().is_none()); - } - - #[test] - fn test_namespace_exists_not_found() { - let db = Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )); - let repo = NamespaceRepository::new(db); - assert!(!repo.namespace_exists("nonexistent").unwrap()); - } + .load_namespace(&non_existent_namespace) + .unwrap() + .is_none()); - // #[test] - // fn test_delete_namespace_not_found() { - // let db = Arc::new(Mutex::new(Database::open(tempdir().unwrap().path()).unwrap())); - // let repo = NamespaceRepository::new(db); - // assert!(repo.delete_namespace("nonexistent").is_err()); - // } - - #[test] - fn test_set_namespace_properties_not_found() { - let db = Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )); - let repo = NamespaceRepository::new(db); - assert!(repo - .set_namespace_properties("nonexistent", json!({"property": "value"})) - .is_err()); - } - - // #[test] - // fn test_create_namespace_empty_name() { - // let db = Arc::new(Mutex::new(Database::open(tempdir().unwrap().path()).unwrap())); - // let repo = NamespaceRepository::new(db); - // assert!(repo.create_namespace("".to_string(), None).is_err()); - // } - - // #[test] - // fn test_create_namespace_already_exists() { - // let db = Arc::new(Mutex::new(Database::open(tempdir().unwrap().path()).unwrap())); - // let repo = NamespaceRepository::new(db); - // repo.create_namespace("test".to_string(), None).unwrap(); - // assert!(repo.create_namespace("test".to_string(), None).is_err()); - // } - - #[test] - fn test_set_namespace_properties_empty_name() { - let db = Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )); - let repo = NamespaceRepository::new(db); + // Test set_namespace_properties with non-existent namespace + let mut updates = Map::new(); + updates.insert("property2".to_string(), json!("value2")); assert!(repo - .set_namespace_properties("", json!({"property": "value"})) + .set_namespace_properties(non_existent_namespace.clone(), vec![], updates) .is_err()); } - - // #[test] - // fn test_set_namespace_properties_invalid_json() { - // let db = Arc::new(Mutex::new(Database::open(tempdir().unwrap().path()).unwrap())); - // let repo = NamespaceRepository::new(db); - // repo.create_namespace("test".to_string(), None).unwrap(); - // assert!(repo.set_namespace_properties("test", "invalid_json".into()).is_err()); - // } - - #[test] - fn test_load_namespace_empty_name() { - let db = Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )); - let repo = NamespaceRepository::new(db); - assert!(repo.load_namespace("").unwrap().is_none()); - } - - #[test] - fn test_namespace_exists_empty_name() { - let db = Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )); - let repo = NamespaceRepository::new(db); - assert!(!repo.namespace_exists("").unwrap()); - } - - // #[test] - // fn test_delete_namespace_empty_name() { - // let db = Arc::new(Mutex::new(Database::open(tempdir().unwrap().path()).unwrap())); - // let repo = NamespaceRepository::new(db); - // assert!(repo.delete_namespace("").is_err()); - // } - - #[test] - fn test_create_namespace_null_properties() { - let db = Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )); - let repo = NamespaceRepository::new(db); - assert!(repo - .create_namespace("test".to_string(), Some(json!(null))) - .is_ok()); - } - - #[test] - fn test_set_namespace_properties_null() { - let db = Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )); - let repo = NamespaceRepository::new(db); - repo.create_namespace("test".to_string(), None).unwrap(); - assert!(repo.set_namespace_properties("test", json!(null)).is_ok()); - } - - #[test] - fn test_set_namespace_properties_with_empty_json() { - let db = Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )); - let repo = NamespaceRepository::new(db); - repo.create_namespace("test".to_string(), None).unwrap(); - assert!(repo.set_namespace_properties("test", json!({})).is_ok()); - } } diff --git a/src/repository/table.rs b/src/repository/table.rs index 15153b1..1ba9fdd 100644 --- a/src/repository/table.rs +++ b/src/repository/table.rs @@ -1,8 +1,10 @@ use crate::database::database::Database; +use crate::dto::namespace_data::{NamespaceData, NamespaceIdent}; use crate::dto::rename_request::TableRenameRequest; -use crate::dto::table_data::TableData; +use crate::dto::table_data::{Table, TableCreation, TableIdent, TableMetadata}; use std::io::{Error, ErrorKind}; use std::sync::{Arc, Mutex}; +use uuid::Uuid; pub struct TableRepository { database: Arc>, @@ -13,317 +15,250 @@ impl TableRepository { Self { database } } - pub fn list_all_tables(&self, namespace: &str) -> Result>, Error> { + pub fn list_all_tables( + &self, + namespace: &NamespaceIdent, + ) -> Result>, Error> { let db = self.database.lock().unwrap(); - db.get::>("TableNamespaceMap", namespace) + let _: NamespaceData = match db.get("NamespaceData", namespace)? { + Some(data) => data, + None => { + return Err(std::io::Error::new( + ErrorKind::NotFound, + format!("Namespace {} not found", namespace.clone().0.join("\u{1F}")), + )) + } + }; + db.get::>("TableNamespaceMap", namespace) } - pub fn create_table(&self, namespace: &str, table: &TableData) -> Result<(), Error> { + pub fn create_table( + &self, + namespace: &NamespaceIdent, + table_creation: &TableCreation, + ) -> Result<(), Error> { let db = self.database.lock().unwrap(); - db.insert("TableData", &table.name, table)?; + let _: NamespaceData = match db.get("NamespaceData", namespace)? { + Some(data) => data, + None => { + return Err(std::io::Error::new( + ErrorKind::NotFound, + format!("Namespace {} not found", namespace.clone().0.join("\u{1F}")), + )) + } + }; + + let table_id = TableIdent::new(namespace.clone(), table_creation.name.clone()); + let table_uuid = Uuid::new_v4().to_string(); + + let table_metadata = TableMetadata { table_uuid }; + let mut tables = db - .get::>("TableNamespaceMap", namespace) + .get::>("TableNamespaceMap", namespace) .unwrap() .unwrap_or_else(|| vec![]); - tables.push(table.name.clone()); + + if tables.contains(&table_id) { + return Err(std::io::Error::new( + ErrorKind::AlreadyExists, + format!( + "Table {} already exists in namespace {}", + table_creation.name, + namespace.clone().0.join("\u{1F}") + ), + )); + } + + db.insert( + "TableData", + &table_id, + &Table { + id: table_id.clone(), + metadata: table_metadata, + }, + )?; + tables.push(table_id.clone()); let r_val = db.insert("TableNamespaceMap", namespace, &tables); r_val } - pub fn register_table(&self, namespace: &str, table: &TableData) -> Result<(), Error> { - self.create_table(namespace, table) - } - pub fn load_table( &self, - namespace: &str, - table_name: &str, - ) -> Result, Error> { - // Check if the table is in the given namespace - let tables_in_namespace = self.list_all_tables(namespace)?; - if let Some(tables) = tables_in_namespace { - if !tables.contains(&table_name.to_string()) { - return Err(Error::new( - ErrorKind::NotFound, - "Table not found in the given namespace", - )); - } - } + namespace: &NamespaceIdent, + table_name: String, + ) -> Result, Error> { + let table_id = TableIdent::new(namespace.clone(), table_name.clone()); let db = self.database.lock().unwrap(); // If the table is in the namespace, get the table data - db.get::("TableData", table_name) + db.get::("TableData", &table_id) } - pub fn drop_table(&self, namespace: &str, table_name: &str) -> Result<(), Error> { + pub fn drop_table(&self, namespace: &NamespaceIdent, table_name: String) -> Result<(), Error> { let db = self.database.lock().unwrap(); - db.delete("TableData", table_name)?; + let table_id = TableIdent::new(namespace.clone(), table_name.clone()); + + let _: Table = match db.get::("TableData", &table_id)? { + Some(data) => data, + None => { + return Err(std::io::Error::new( + ErrorKind::NotFound, + format!("Namespace {} not found", namespace.clone().0.join("\u{1F}")), + )) + } + }; + + db.delete("TableData", &table_id)?; let mut tables = db - .get::>("TableNamespaceMap", namespace) + .get::>("TableNamespaceMap", namespace) .unwrap() .unwrap(); - tables.retain(|name| name != table_name); + tables.retain(|id| id.name != table_name); db.insert("TableNamespaceMap", namespace, &tables) } - // for the ?? route - pub fn insert_table(&self, namespace: &str, table: &TableData) -> Result<(), Error> { - self.create_table(namespace, table) - } - - pub fn table_exists(&self, namespace: &str, table_name: &str) -> Result { + pub fn table_exists( + &self, + namespace: &NamespaceIdent, + table_name: String, + ) -> Result { let table = self.load_table(namespace, table_name)?; Ok(table.is_some()) } pub fn rename_table(&self, rename_request: &TableRenameRequest) -> Result<(), Error> { - let namespace = &rename_request.namespace; - let old_name = &rename_request.old_name; - let new_name = &rename_request.new_name; + let source = rename_request.source.clone(); + let destination = rename_request.destination.clone(); + let namespace = source.namespace.clone(); + let table = self - .load_table(namespace, old_name)? - .ok_or_else(|| Error::new(ErrorKind::NotFound, "Table not found"))?; + .load_table(&namespace, source.name.clone())? + .ok_or_else(|| Error::new(ErrorKind::NotFound, "Source table not found"))?; + + if self.table_exists(&destination.namespace, destination.name.clone())? { + return Err(Error::new( + ErrorKind::AlreadyExists, + "Destination table already exists", + )); + } + let mut new_table = table.clone(); - new_table.name = new_name.clone(); - self.drop_table(namespace, old_name)?; - self.create_table(namespace, &new_table) + new_table.id = destination.clone(); + + self.create_table( + &destination.namespace.clone(), + &TableCreation { + name: destination.name.clone(), + }, + )?; + self.drop_table(&namespace, source.name.clone()) } } -// todo: check commented tests #[cfg(test)] mod tests { use super::*; - use serde_json::json; + use crate::dto::table_data::TableCreation; + use crate::repository::namespace::NamespaceRepository; use std::sync::{Arc, Mutex}; use tempfile::tempdir; #[test] - fn test_list_all_tables() { - let db = Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )); - let repo = TableRepository::new(db); - assert_eq!(repo.list_all_tables("namespace").unwrap(), None); - } + fn test_table_repository() { + let dir = tempdir().unwrap(); + let db = Database::open(dir.path()).unwrap(); + let db = Arc::new(Mutex::new(db)); + let repo = TableRepository::new(db.clone()); - #[test] - fn test_create_table() { - let db = Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )); - let repo = TableRepository::new(db); - let table = TableData { - name: "table".to_string(), - num_columns: 0, - read_properties: json!({}), - write_properties: json!({}), - file_urls: vec![], - columns: vec![], - }; - assert!(repo.create_table("namespace", &table).is_ok()); - } + // Create a namespace for testing + let namespace_ident = NamespaceIdent(vec!["test".to_string()]); + let namespace_repo = NamespaceRepository::new(db.clone()); + namespace_repo + .create_namespace(namespace_ident.clone(), None) + .unwrap(); - #[test] - fn test_load_table() { - let db = Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )); - let repo = TableRepository::new(db); - let table = TableData { - name: "table".to_string(), - num_columns: 0, - read_properties: json!({}), - write_properties: json!({}), - file_urls: vec![], - columns: vec![], + // Test create_table + let table_creation = TableCreation { + name: "table1".to_string(), }; - repo.create_table("namespace", &table).unwrap(); - assert!(repo.load_table("namespace", "table").unwrap().is_some()); - } + repo.create_table(&namespace_ident, &table_creation) + .unwrap(); - #[test] - fn test_drop_table() { - let db = Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )); - let repo = TableRepository::new(db); - let table = TableData { - name: "table".to_string(), - num_columns: 0, - read_properties: json!({}), - write_properties: json!({}), - file_urls: vec![], - columns: vec![], - }; - repo.create_table("namespace", &table).unwrap(); - assert!(repo.drop_table("namespace", "table").is_ok()); - } + // Test table_exists + assert!(repo + .table_exists(&namespace_ident, "table1".to_string()) + .unwrap()); - #[test] - fn test_table_exists() { - let db = Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )); - let repo = TableRepository::new(db); - let table = TableData { - name: "table".to_string(), - num_columns: 0, - read_properties: json!({}), - write_properties: json!({}), - file_urls: vec![], - columns: vec![], - }; - repo.create_table("namespace", &table).unwrap(); - assert!(repo.table_exists("namespace", "table").unwrap()); - } + // Test load_table + let table = repo + .load_table(&namespace_ident, "table1".to_string()) + .unwrap() + .unwrap(); + assert_eq!(table.id.name, "table1"); - #[test] - fn test_rename_table() { - let db = Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )); - let repo = TableRepository::new(db); - let table = TableData { - name: "table".to_string(), - num_columns: 0, - read_properties: json!({}), - write_properties: json!({}), - file_urls: vec![], - columns: vec![], - }; - repo.create_table("namespace", &table).unwrap(); + // Test rename_table let rename_request = TableRenameRequest { - namespace: "namespace".to_string(), - old_name: "table".to_string(), - new_name: "new_table".to_string(), + source: TableIdent::new(namespace_ident.clone(), "table1".to_string()), + destination: TableIdent::new(namespace_ident.clone(), "table2".to_string()), }; - assert!(repo.rename_table(&rename_request).is_ok()); - } - - #[test] - fn test_load_table_not_found() { - let db = Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )); - let repo = TableRepository::new(db); + repo.rename_table(&rename_request).unwrap(); + assert!(!repo + .table_exists(&namespace_ident, "table1".to_string()) + .unwrap()); assert!(repo - .load_table("namespace", "nonexistent") - .unwrap() - .is_none()); - } - - #[test] - fn test_table_exists_not_found() { - let db = Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )); - let repo = TableRepository::new(db); - assert!(!repo.table_exists("namespace", "nonexistent").unwrap()); - } + .table_exists(&namespace_ident, "table2".to_string()) + .unwrap()); - /* - #[test] - fn test_drop_table_not_found() { - let db = Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )); - let repo = TableRepository::new(db); - assert!(repo.drop_table("namespace", "nonexistent").is_err()); + // Test drop_table + repo.drop_table(&namespace_ident, "table2".to_string()) + .unwrap(); + assert!(!repo + .table_exists(&namespace_ident, "table2".to_string()) + .unwrap()); } - */ #[test] - fn test_rename_table_not_found() { - let db = Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )); - let repo = TableRepository::new(db); - let rename_request = TableRenameRequest { - namespace: "namespace".to_string(), - old_name: "nonexistent".to_string(), - new_name: "new_table".to_string(), - }; - assert!(repo.rename_table(&rename_request).is_err()); - } + fn test_table_repository_negative() { + let dir = tempdir().unwrap(); + let db = Database::open(dir.path()).unwrap(); + let db = Arc::new(Mutex::new(db)); + let repo = TableRepository::new(db.clone()); - /* - #[test] - fn test_create_table_empty_name() { - let db = Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )); - let repo = TableRepository::new(db); - let table = TableData { - name: "".to_string(), - num_columns: 0, - read_properties: json!({}), - write_properties: json!({}), - file_urls: vec![], - columns: vec![], + // Test with non-existent namespace + let non_existent_namespace = NamespaceIdent(vec!["non_existent".to_string()]); + let table_creation = TableCreation { + name: "table1".to_string(), }; - assert!(repo.create_table("namespace", &table).is_err()); - } + assert!(repo + .create_table(&non_existent_namespace, &table_creation) + .is_err()); + assert!(repo + .drop_table(&non_existent_namespace, "table1".to_string()) + .is_err()); + // Test with existing table + let namespace_ident = NamespaceIdent(vec!["test".to_string()]); + let namespace_repo = NamespaceRepository::new(db.clone()); + namespace_repo + .create_namespace(namespace_ident.clone(), None) + .unwrap(); + repo.create_table(&namespace_ident, &table_creation) + .unwrap(); + assert!(repo + .create_table(&namespace_ident, &table_creation) + .is_err()); - #[test] - fn test_create_table_already_exists() { - let db = Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )); - let repo = TableRepository::new(db); - let table = TableData { - name: "table".to_string(), - num_columns: 0, - read_properties: json!({}), - write_properties: json!({}), - file_urls: vec![], - columns: vec![], + // Test rename_table with non-existent source table + let rename_request = TableRenameRequest { + source: TableIdent::new(namespace_ident.clone(), "non_existent".to_string()), + destination: TableIdent::new(namespace_ident.clone(), "table2".to_string()), }; - repo.create_table("namespace", &table).unwrap(); - assert!(repo.create_table("namespace", &table).is_err()); - } - */ - - #[test] - fn test_load_table_empty_name() { - let db = Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )); - let repo = TableRepository::new(db); - assert!(repo.load_table("namespace", "").unwrap().is_none()); - } - /* - #[test] - fn test_drop_table_empty_name() { - let db = Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )); - let repo = TableRepository::new(db); - assert!(repo.drop_table("namespace", "").is_err()); - } - + assert!(repo.rename_table(&rename_request).is_err()); - #[test] - fn test_rename_table_empty_new_name() { - let db = Arc::new(Mutex::new( - Database::open(tempdir().unwrap().path()).unwrap(), - )); - let repo = TableRepository::new(db); - let table = TableData { - name: "table".to_string(), - num_columns: 0, - read_properties: json!({}), - write_properties: json!({}), - file_urls: vec![], - columns: vec![], - }; - repo.create_table("namespace", &table).unwrap(); + // Test rename_table with existing destination table let rename_request = TableRenameRequest { - namespace: "namespace".to_string(), - old_name: "table".to_string(), - new_name: "".to_string(), + source: TableIdent::new(namespace_ident.clone(), "table1".to_string()), + destination: TableIdent::new(namespace_ident.clone(), "table1".to_string()), }; assert!(repo.rename_table(&rename_request).is_err()); } - */ } diff --git a/src/routes/namespace.rs b/src/routes/namespace.rs index 455fc22..a2e86bf 100644 --- a/src/routes/namespace.rs +++ b/src/routes/namespace.rs @@ -15,19 +15,19 @@ pub fn routes(db: Arc>) -> Router { .route("/namespaces", get(namespace_handler::list_namespaces)) .route("/namespaces", post(namespace_handler::create_namespace)) .route( - "/namespace/:namespace", + "/namespaces/:namespace", get(namespace_handler::load_namespace_metadata), ) .route( - "/namespace/:namespace", + "/namespaces/:namespace", head(namespace_handler::namespace_exists), ) .route( - "/namespace/:namespace", + "/namespaces/:namespace", delete(namespace_handler::drop_namespace), ) .route( - "/namespace/:namespace/properties", + "/namespaces/:namespace/properties", post(namespace_handler::set_namespace_properties), ) .with_state(repo); diff --git a/src/routes/table.rs b/src/routes/table.rs index 91da45a..a8a5c84 100644 --- a/src/routes/table.rs +++ b/src/routes/table.rs @@ -18,10 +18,6 @@ pub fn routes(db: Arc>) -> Router { "/namespaces/:namespace/tables", post(table_handler::create_table), ) - .route( - "/namespaces/:namespace/register", - post(table_handler::register_table), - ) .route( "/namespaces/:namespace/tables/:table", get(table_handler::load_table), @@ -35,10 +31,6 @@ pub fn routes(db: Arc>) -> Router { head(table_handler::table_exists), ) .route("/tables/rename", post(table_handler::rename_table)) - .route( - "/namespaces/:namespace/tables/:table/metrics", - post(table_handler::report_metrics), - ) .with_state(repo); return router; diff --git a/src/tests/mod.rs b/src/tests/mod.rs deleted file mode 100644 index 8b13789..0000000 --- a/src/tests/mod.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/src/tests/namespace_test.rs b/src/tests/namespace_test.rs deleted file mode 100644 index 3e6e447..0000000 --- a/src/tests/namespace_test.rs +++ /dev/null @@ -1,43 +0,0 @@ -use axum::{http::StatusCode, response::Json}; -use axum::extract::Json as JsonExtractor; -use axum::handler::post; -use axum::routing::Router; -use serde_json::json; -use axum::test::extract; - -use crate::{create_namespace, list_namespaces, Namespace}; - -#[tokio::test] -async fn test_list_namespaces() { - // Create a test router with the list_namespaces route - let app = Router::new().route("/namespaces", post(list_namespaces)); - - // Perform a request to the route - let response = axum::test::call(&app, axum::test::request::Request::post("/namespaces").body(()).unwrap()).await; - - // Ensure that the response status code is OK - assert_eq!(response.status(), StatusCode::OK); - - // Ensure that the response body contains the expected JSON data - let body = extract::>>(response.into_body()).await.unwrap(); - assert_eq!(body.0, vec!["accounting", "tax", "paid"]); -} - -#[tokio::test] -async fn test_create_namespace() { - // Create a test router with the create_namespace route - let app = Router::new().route("/namespaces", post(create_namespace)); - - // Create a JSON payload representing a new namespace - let payload = json!({}); - - // Perform a request to the route with the JSON payload - let response = axum::test::call(&app, axum::test::request::Request::post("/namespaces").body(payload.to_string()).unwrap()).await; - - // Ensure that the response status code is OK - assert_eq!(response.status(), StatusCode::OK); - - // Ensure that the response body contains the expected JSON data - let body = extract::>(response.into_body()).await.unwrap(); - assert_eq!(body, Json(Namespace {})); -}