From a9929e933ecee76228f94be1babeec9170f9c391 Mon Sep 17 00:00:00 2001 From: Edgar Brissow Date: Tue, 19 Nov 2024 10:42:40 -0300 Subject: [PATCH] Testing query with query broker C++ --- compose.yaml | 132 +++++++++++++ docker_das_node/Dockerfile | 187 ++++++++++++++++++ hyperon_das/das_node/__init__.py | 0 hyperon_das/das_node/das_node.py | 74 +++++++ hyperon_das/das_node/query_answer.py | 164 +++++++++++++++ hyperon_das/das_node/query_element.py | 44 +++++ hyperon_das/das_node/query_node.py | 134 +++++++++++++ hyperon_das/das_node/remote_iterator.py | 30 +++ hyperon_das/das_node/shared_queue.py | 63 ++++++ hyperon_das/das_node/simple_node.py | 70 +++++++ hyperon_das/das_node/star_node.py | 40 ++++ .../query_engines/das_node_query_engine.py | 108 ++++++++++ tests/integration/test_node_das.py | 22 +++ 13 files changed, 1068 insertions(+) create mode 100644 compose.yaml create mode 100644 docker_das_node/Dockerfile create mode 100644 hyperon_das/das_node/__init__.py create mode 100644 hyperon_das/das_node/das_node.py create mode 100644 hyperon_das/das_node/query_answer.py create mode 100644 hyperon_das/das_node/query_element.py create mode 100644 hyperon_das/das_node/query_node.py create mode 100644 hyperon_das/das_node/remote_iterator.py create mode 100644 hyperon_das/das_node/shared_queue.py create mode 100644 hyperon_das/das_node/simple_node.py create mode 100644 hyperon_das/das_node/star_node.py create mode 100644 hyperon_das/query_engines/das_node_query_engine.py create mode 100644 tests/integration/test_node_das.py diff --git a/compose.yaml b/compose.yaml new file mode 100644 index 00000000..287930b2 --- /dev/null +++ b/compose.yaml @@ -0,0 +1,132 @@ + +services: + redis: + image: redis:7.2.3-alpine + container_name: test-das-redis + restart: always + command: redis-server --save 20 1 --loglevel warning + ports: + - 6379:6379 + healthcheck: + test: [ "CMD", "redis-cli", "--raw", "incr", "ping" ] + interval: 10s + timeout: 10s + retries: 5 + start_period: 40s + + mongodb: + image: mongo:latest + container_name: test-das-mongodb + restart: always + environment: + MONGO_INITDB_ROOT_USERNAME: root + MONGO_INITDB_ROOT_PASSWORD: root + ports: + - 27017:27017 + healthcheck: + test: + [ + "CMD", + "echo", + 'db.runCommand("ping").ok', + "|", + "mongo", + "das-mongo:27017/test", + "--quiet", + ] + interval: 10s + timeout: 10s + retries: 5 + start_period: 40s + + + metta-parser: +# image: trueagi/das:latest-metta-parser + image: trueagi/das:0.5.3-metta-parser + container_name: test-das-metta-parser + command: db_loader /tmp/animals.metta + volumes: + - ./examples/data:/tmp + environment: + DAS_MONGODB_NAME: das + DAS_MONGODB_HOSTNAME: localhost + DAS_MONGODB_PORT: 27017 + DAS_REDIS_HOSTNAME: localhost + DAS_REDIS_PORT: 6379 + DAS_MONGODB_USERNAME: root + DAS_MONGODB_PASSWORD: root + DAS_DATABASE_USERNAME: root + DAS_DATABASE_PASSWORD: root +# env_file: +# - .env + depends_on: + mongodb: + condition: service_healthy + redis: + condition: service_healthy + restart: on-failure + + + das-node: + # How to build: + # Clone the das-node repository + # Run: + # ./scripts/docker_image_build.sh +# image: das-node-builder + build: ./docker_das_node +# build: https://github.com/singnet/das-attention-broker.git#master:docker +# build: +# context: https://raw.githubusercontent.com/singnet/das-attention-broker/refs/heads/master/docker/Dockerfile + container_name: test-das-node +# working_dir: /opt/das-attention-broker +# command: ./query_broker 35700 +# command: ./bin 35700 +# command: ./query_broker 35700 + command: sleep infinity + network_mode: "host" + ports: + - 35700:35700 + depends_on: + - mongodb + - redis + - metta-parser + environment: + DAS_MONGODB_NAME: das + DAS_MONGODB_HOSTNAME: test-das-mongodb + DAS_MONGODB_PORT: 27017 + DAS_REDIS_HOSTNAME: test-das-redis + DAS_REDIS_PORT: 6379 + DAS_MONGODB_USERNAME: root + DAS_MONGODB_PASSWORD: root +# DAS_DATABASE_USERNAME: root +# DAS_DATABASE_PASSWORD: root +# command: sleep infinity + # command: bash -c "cd src && ../scripts/bazel_build.sh && cd .. && pip install . && python3 -i examples/docker_server.py" + + fass: +# image: trueagi/openfaas:query-engine-1.14.12 + image: trueagi/openfaas:query-engine-1.14.13 +# image: trueagi/openfaas:query-engine-latest + container_name: test-das-faas +# privileged: true + depends_on: + - mongodb + - redis + - metta-parser + - das-node + ports: + - 8080:8080 + environment: + DAS_MONGODB_NAME: test-das + DAS_MONGODB_HOSTNAME: test-das-mongodb + DAS_MONGODB_PORT: 27017 + DAS_REDIS_HOSTNAME: test-das-redis + DAS_REDIS_PORT: 6379 + DAS_MONGODB_USERNAME: root + DAS_MONGODB_PASSWORD: root + DAS_DATABASE_USERNAME: root + DAS_DATABASE_PASSWORD: root + + + + diff --git a/docker_das_node/Dockerfile b/docker_das_node/Dockerfile new file mode 100644 index 00000000..34a19bee --- /dev/null +++ b/docker_das_node/Dockerfile @@ -0,0 +1,187 @@ +# FROM das-node-builder +# ARG BASE_DIR="/opt" +# ARG ATTENTION_BROKER_DIR="${BASE_DIR}/das-attention-broker" +# ARG THIRDPARTY="${ATTENTION_BROKER_DIR}/assets" +# +# WORKDIR /opt/hyperon_das_node/src +# RUN ../scripts/bazel_build.sh +# +# +# WORKDIR /opt +# RUN git clone https://github.com/singnet/das-attention-broker.git +# WORKDIR /opt/das-attention-broker/ +# RUN ls -la +# RUN cd ${THIRDPARTY}\ +# && tar xzvf 3rd-party.tgz\ +# && rm -f 3rd-party.tgz\ +# && mkdir -p ${ATTENTION_BROKER_DIR}/src/3rd-party\ +# && ln -s /opt/3rd-party ${ATTENTION_BROKER_DIR}/src/3rd-party +# +# ENV CPLUS_INCLUDE_PATH="/opt/3rd-party/mbedcrypto/include/" +# +# +# WORKDIR /opt/das-attention-broker/src +# RUN ls -l +# RUN pwd +# # RUN ../scripts/bazel_build.sh +# RUN /opt/bazel/bazelisk build --jobs 6 --noenable_bzlmod :query_broker +# # RUN mv bazel-bin/query_broker ../bin +# WORKDIR /opt/das-attention-broker +# +# CMD .bin/query_broker +# +# # RUN pip install --no-cache-dir hyperon_das_node +# # +# # WORKDIR /opt/hyperon_das_node +# # +# # COPY *.py . +# # +# # +# # # Server image for docker example, exposes server port +# # FROM base AS server +# # +# # EXPOSE 35700 +# # +# # ENTRYPOINT ["python", "-i", "das_node_server.py"] + + + +FROM ubuntu:22.04 + +ARG BASE_DIR="/opt" +ARG TMP_DIR="/tmp" + +ARG ATTENTION_BROKER_DIR="${BASE_DIR}/das-attention-broker" +ARG DATA_DIR="${BASE_DIR}/data" +ARG GRPC_DIR="${BASE_DIR}/grpc" +ARG PROTO_DIR="${BASE_DIR}/proto" +ARG BAZEL_DIR="${BASE_DIR}/bazel" +ARG THIRDPARTY="${BASE_DIR}/3rd-party" + +# RUN mkdir -p ${ATTENTION_BROKER_DIR} +RUN mkdir -p ${DATA_DIR} +RUN mkdir -p ${GRPC_DIR} +RUN mkdir -p ${BAZEL_DIR} +RUN mkdir -p ${PROTO_DIR} +RUN mkdir -p ${THIRDPARTY} +VOLUME ${ATTENTION_BROKER_DIR} + +RUN apt-get update -y +RUN apt-get install -y git + +RUN cd ${GRPC_DIR} &&\ + git clone https://github.com/grpc/grpc &&\ + cd grpc &&\ + git submodule update --init + +RUN apt-get install -y build-essential autoconf libtool pkg-config curl gcc protobuf-compiler libmbedtls14 libmbedtls-dev libevent-dev libssl-dev +# RUN apt-get install -y autoconf +# RUN apt-get install -y libtool +# RUN apt-get install -y pkg-config +# RUN apt-get install -y curl +# RUN apt-get install -y gcc +# RUN apt-get install -y protobuf-compiler +# #RUN apt-get install -y libmbedcrypto7 +# RUN apt-get install -y libmbedtls14 +# RUN apt-get install -y libmbedtls-dev +# RUN apt-get install -y libevent-dev +# RUN apt-get install -y libssl-dev + +################################################################################ +# To be removed when AtomDB is properly integrated +# Redis client +RUN apt-get install -y cmake libevent-dev libssl-dev pkg-config cmake-data git + +WORKDIR ${BASE_DIR} +RUN git clone https://github.com/singnet/das-attention-broker.git +WORKDIR ${ATTENTION_BROKER_DIR} + +RUN sed -i '24i\AtomDBSingleton::init();' src/main/query_engine_main.cc +RUN cat src/main/query_engine_main.cc + +RUN cp assets/hiredis-cluster.tgz /tmp + +RUN cd /tmp &&\ + tar xzf hiredis-cluster.tgz &&\ + cd hiredis-cluster &&\ + mkdir build &&\ + cd build &&\ + cmake -DCMAKE_BUILD_TYPE=RelWithDebInfo -DENABLE_SSL=ON ..&&\ + make &&\ + make install &&\ + echo "/usr/local/lib" > /etc/ld.so.conf.d/local.conf &&\ + ldconfig +# MongoDB client +WORKDIR ${ATTENTION_BROKER_DIR} + +# RUN cp assets/mongo-cxx-driver-r3.11.0.tar.gz /tmp &&\ +# cd /tmp && \ +# tar xzvf mongo-cxx-driver-r3.11.0.tar.gz &&\ +# cd /tmp/mongo-cxx-driver-r3.11.0/build/ &&\ +# cmake .. -DCMAKE_BUILD_TYPE=Release -DMONGOCXX_OVERRIDE_DEFAULT_INSTALL_PREFIX=OFF &&\ +# cmake --build . +RUN cp assets/mongo-cxx-driver-r3.11.0.tar.gz /tmp +WORKDIR ${TMP_DIR} +RUN tar xzvf mongo-cxx-driver-r3.11.0.tar.gz +WORKDIR ${TMP_DIR}/mongo-cxx-driver-r3.11.0/build +RUN cmake .. -DCMAKE_BUILD_TYPE=Release -DMONGOCXX_OVERRIDE_DEFAULT_INSTALL_PREFIX=OFF &&\ + cmake --build . &&\ + cmake --build . --target install +RUN pwd &&\ + # ls -a &&\ + # ls /usr/local/include/mongocxx/v_noabi &&\ + # ls -a .. &&\ + # mv install/include/* /usr/local/include &&\ + ln -s /usr/local/include/bsoncxx/v_noabi/bsoncxx/* /usr/local/include/bsoncxx &&\ + ln -s /usr/local/include/bsoncxx/v_noabi/bsoncxx/third_party/mnmlstc/core/ /usr/local/include/core &&\ + ln -s /usr/local/include/mongocxx/v_noabi/mongocxx/* /usr/local/include/mongocxx/ &&\ + # mv install/lib/* /usr/local/lib &&\ + ldconfig +# RUN cmake --build . --target install &&\ +# mv install/include/* /usr/local/include &&\ +# ln -s /usr/local/include/bsoncxx/v_noabi/bsoncxx/* /usr/local/include/bsoncxx &&\ +# ln -s /usr/local/include/bsoncxx/v_noabi/bsoncxx/third_party/mnmlstc/core/ /usr/local/include/core &&\ +# ln -s /usr/local/include/mongocxx/v_noabi/mongocxx/* /usr/local/include/mongocxx/ &&\ +# mv install/lib/* /usr/local/lib &&\ +# ldconfig +################################################################################ +WORKDIR ${ATTENTION_BROKER_DIR} +RUN mkdir bin + +RUN cp assets/3rd-party.tgz ${THIRDPARTY} +RUN cd ${THIRDPARTY} &&\ + tar xzvf 3rd-party.tgz &&\ + rm -f 3rd-party.tgz &&\ + mkdir -p ${ATTENTION_BROKER_DIR}/src/3rd-party &&\ + ln -s ${THIRDPARTY} ${ATTENTION_BROKER_DIR}/src/3rd-party &&\ + mv bazelisk ${BAZEL_DIR} + +ENV CPLUS_INCLUDE_PATH="/opt/3rd-party/mbedcrypto/include/" + +ENV CC=/usr/bin/gcc +RUN ln -s ${BAZEL_DIR}/bazelisk /usr/bin/bazel +# RUN cd ${GRPC_DIR}/grpc &&\ +# ${BAZEL_DIR}/bazelisk build :all + +# RUN cd ${GRPC_DIR}/grpc &&\ +# ${BAZEL_DIR}/bazelisk build --jobs 6 --noenable_bzlmod :query_broker +# RUN ${BAZEL_DIR}/bazelisk build --jobs 6 --noenable_bzlmod :query_broker +WORKDIR ${ATTENTION_BROKER_DIR} +RUN rm -rf bin +RUN mkdir bin +WORKDIR ${ATTENTION_BROKER_DIR}/src +# RUN ${BAZEL_DIR}/bazelisk build --jobs 6 --noenable_bzlmod :query_broker +# RUN mv bazel-bin/query_broker ../query_broker +RUN ../scripts/bazel_build.sh +# RUN cp bazel-bin/query_broker ../ +# build --jobs 6 --noenable_bzlmod :query_broker +ADD https://raw.githubusercontent.com/singnet/das-query-engine/master/proto/attention_broker.proto ${PROTO_DIR} +ADD https://raw.githubusercontent.com/singnet/das-query-engine/master/proto/common.proto ${PROTO_DIR} +ADD https://raw.githubusercontent.com/singnet/das-query-engine/master/proto/echo.proto ${PROTO_DIR} +EXPOSE 35700 +WORKDIR /opt/das-attention-broker + +RUN ls -la +# RUN ls -la bin/ +# RUN ls /opt/bazel +# RUN ls /opt diff --git a/hyperon_das/das_node/__init__.py b/hyperon_das/das_node/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/hyperon_das/das_node/das_node.py b/hyperon_das/das_node/das_node.py new file mode 100644 index 00000000..307ae3be --- /dev/null +++ b/hyperon_das/das_node/das_node.py @@ -0,0 +1,74 @@ +from typing import List, Optional, Dict +import uuid +from hyperon_das.das_node.star_node import StarNode +from hyperon_das.das_node.remote_iterator import RemoteIterator +from hyperon_das_node import Message, MessageFactory +from hyperon_das.das_node.query_element import QueryElement + + + +class DASNode(StarNode): + PATTERN_MATCHING_QUERY = "pattern_matching_query" + + def __init__(self, node_id: str = None, server_id: str = None): + super().__init__(node_id, server_id) + self.initialize() + + def __del__(self): + # Destructor (although Python handles memory management automatically) + pass + + def pattern_matcher_query(self, tokens: list, context: str = ""): + if self.is_server: + raise ValueError("pattern_matcher_query() is not available in DASNode server.") + + # TODO: Update this when requestor is set in basic Message + query_id = self.next_query_id() + # print(query_id, tokens) + args = [query_id, context] + tokens + print(self.server_id) + print(args) + self.send(DASNode.PATTERN_MATCHING_QUERY, args, self.server_id) + + + print('send') + return RemoteIterator(query_id) + + def next_query_id(self) -> str: + port = self.next_query_port + limit = 0 + if self.is_server: + limit = (self.first_query_port + self.last_query_port) // 2 - 1 + if self.next_query_port > limit: + self.next_query_port = self.first_query_port + else: + limit = self.last_query_port + if self.next_query_port > limit: + self.next_query_port = (self.first_query_port + self.last_query_port) // 2 + + query_id = f"{self.local_host}:{port}" + self.next_query_port += 1 + return query_id + + def message_factory(self, command: str, args: list) -> Message: + message = super().message_factory(command, args) + if message: + return message + if command == DASNode.PATTERN_MATCHING_QUERY: + return PatternMatchingQuery(command, args) + return None + + def initialize(self): + self.first_query_port = 60000 + self.last_query_port = 61999 + self.local_host = self.node_id().split(":")[0] # Extracting the host part of node_id + if self.is_server: + self.next_query_port = self.first_query_port + else: + self.next_query_port = (self.first_query_port + self.last_query_port) // 2 + + # Private attributes (Python doesn't have access specifiers, but this is by convention) + local_host: str + next_query_port: int + first_query_port: int + last_query_port: int diff --git a/hyperon_das/das_node/query_answer.py b/hyperon_das/das_node/query_answer.py new file mode 100644 index 00000000..a4ca077e --- /dev/null +++ b/hyperon_das/das_node/query_answer.py @@ -0,0 +1,164 @@ +import copy +import math + + +MAX_NUMBER_OF_VARIABLES_IN_QUERY = 100 # Set to appropriate value +MAX_VARIABLE_NAME_SIZE = 64 # Set to appropriate value +HANDLE_HASH_SIZE = 64 # Set to appropriate value +MAX_NUMBER_OF_OPERATION_CLAUSES = 10 # Set to appropriate value + + +class Assignment: + def __init__(self): + self.size = 0 + self.labels = [None] * MAX_NUMBER_OF_VARIABLES_IN_QUERY + self.values = [None] * MAX_NUMBER_OF_VARIABLES_IN_QUERY + + def __del__(self): + # Destructor logic if needed + pass + + def assign(self, label: str, value: str) -> bool: + for i in range(self.size): + # If label is already present, return True iff its value is the same + if label == self.labels[i]: + return value == self.values[i] + + # Label is not present, so make the assignment and return true + self.labels[self.size] = label + self.values[self.size] = value + self.size += 1 + + if self.size == MAX_NUMBER_OF_VARIABLES_IN_QUERY: + raise ValueError( + f"Assignment size exceeds the maximal number of allowed variables: {MAX_NUMBER_OF_VARIABLES_IN_QUERY}") + + return True + + def get(self, label: str) -> str: + for i in range(self.size): + if label == self.labels[i]: + return self.values[i] + return None + + def is_compatible(self, other) -> bool: + for i in range(self.size): + for j in range(other.size): + if self.labels[i] == other.labels[j] and self.values[i] != other.values[j]: + return False + return True + + def copy_from(self, other): + self.size = other.size + self.labels = other.labels[:] + self.values = other.values[:] + + def add_assignments(self, other): + for j in range(other.size): + already_contains = False + for i in range(self.size): + if self.labels[i] == other.labels[j]: + already_contains = True + break + if not already_contains: + self.labels[self.size] = other.labels[j] + self.values[self.size] = other.values[j] + self.size += 1 + + def variable_count(self) -> int: + return self.size + + def to_string(self) -> str: + return "{" + ", ".join([f"({self.labels[i]}: {self.values[i]})" for i in range(self.size)]) + "}" + + @staticmethod + def read_token(token_string: str, cursor: int, token_size: int) -> str: + token = [] + while cursor < len(token_string) and token_string[cursor] != ' ': + if len(token) >= token_size: + raise ValueError("Invalid token string") + token.append(token_string[cursor]) + cursor += 1 + return ''.join(token), cursor + 1 + + +class QueryAnswer: + def __init__(self, handle: str = None, importance: float = 0.0): + self.handles = [None] * MAX_NUMBER_OF_OPERATION_CLAUSES + self.handles_size = 0 + self.importance = importance + self.assignment = Assignment() + + if handle: + self.add_handle(handle) + + def __del__(self): + # Destructor logic if needed + pass + + def add_handle(self, handle: str): + self.handles[self.handles_size] = handle + self.handles_size += 1 + + def merge(self, other, merge_handles: bool = True) -> bool: + if self.assignment.is_compatible(other.assignment): + self.assignment.add_assignments(other.assignment) + if merge_handles: + self.importance = max(self.importance, other.importance) + for j in range(other.handles_size): + if self.handles_size < MAX_NUMBER_OF_OPERATION_CLAUSES: + if other.handles[j] not in self.handles[:self.handles_size]: + self.handles[self.handles_size] = other.handles[j] + self.handles_size += 1 + return True + else: + return False + + @staticmethod + def copy(base): + new_copy = QueryAnswer(importance=base.importance) + new_copy.assignment.copy_from(base.assignment) + new_copy.handles = base.handles[:base.handles_size] + new_copy.handles_size = base.handles_size + return new_copy + + def tokenize(self) -> str: + importance_str = f"{self.importance:.10f}" + token_representation = f"{importance_str} {self.handles_size} " + + token_representation += " ".join(self.handles[:self.handles_size]) + " " + token_representation += f"{self.assignment.size} " + + for i in range(self.assignment.size): + token_representation += f"{self.assignment.labels[i]} {self.assignment.values[i]} " + + return token_representation.strip() + + def untokenize(self, tokens: str): + cursor = 0 + token_string = tokens.split(' ') + + importance = token_string[cursor] + self.importance = float(importance) + cursor += 1 + + num_handles = int(token_string[cursor]) + self.handles_size = num_handles + cursor += 1 + + self.handles = token_string[cursor:cursor + num_handles] + cursor += num_handles + + num_assignments = int(token_string[cursor]) + self.assignment.size = num_assignments + cursor += 1 + + for i in range(num_assignments): + label = token_string[cursor] + value = token_string[cursor + 1] + self.assignment.assign(label, value) + cursor += 2 + + def to_string(self) -> str: + handles_str = ", ".join(self.handles[:self.handles_size]) + return f"QueryAnswer<{self.handles_size},{self.assignment.variable_count()}> [{handles_str}] {self.assignment.to_string()}" diff --git a/hyperon_das/das_node/query_element.py b/hyperon_das/das_node/query_element.py new file mode 100644 index 00000000..868d7d8d --- /dev/null +++ b/hyperon_das/das_node/query_element.py @@ -0,0 +1,44 @@ +import threading +from abc import ABC, abstractmethod + +class QueryElement(ABC): + def __init__(self): + self.id = "" + self.subsequent_id = "" + self.flow_finished = False + self.is_terminal = False + self.flow_finished_lock = threading.Lock() + + def __del__(self): + # Destructor logic (if necessary) + pass + + @abstractmethod + def setup_buffers(self): + """ + Abstract method that must be implemented in a subclass + to setup buffers. + """ + pass + + @abstractmethod + def graceful_shutdown(self): + """ + Abstract method that must be implemented in a subclass + for graceful shutdown behavior. + """ + pass + + def is_flow_finished(self) -> bool: + """ + Check if the flow has finished with thread safety. + """ + with self.flow_finished_lock: + return self.flow_finished + + def set_flow_finished(self): + """ + Set the flow as finished with thread safety. + """ + with self.flow_finished_lock: + self.flow_finished = True diff --git a/hyperon_das/das_node/query_node.py b/hyperon_das/das_node/query_node.py new file mode 100644 index 00000000..79365ce3 --- /dev/null +++ b/hyperon_das/das_node/query_node.py @@ -0,0 +1,134 @@ +import threading +import time +from typing import List, Optional +from hyperon_das.das_node.query_answer import QueryAnswer +from hyperon_das.das_node.shared_queue import SharedQueue +from hyperon_das_node import MessageFactory, Message, AtomSpaceNode, MessageBrokerType, LeadershipBrokerType + +class QueryNode(AtomSpaceNode): + QUERY_ANSWER_FLOW_COMMAND = "query_answer_flow" + QUERY_ANSWER_TOKENS_FLOW_COMMAND = "query_answer_tokens_flow" + QUERY_ANSWERS_FINISHED_COMMAND = "query_answers_finished" + # query_answer_queue = SharedQueue() + def __init__(self, node_id: str, is_server: bool, messaging_backend: MessageBrokerType): + super().__init__(node_id, LeadershipBrokerType.SINGLE_MASTER_SERVER, messaging_backend) + self.is_server = is_server + self.query_answer_processor: Optional[threading.Thread] = None + self.query_answers_finished_flag = False + self.shutdown_flag = False + self.shutdown_flag_mutex = threading.Lock() + self.query_answers_finished_flag_mutex = threading.Lock() + + self.requires_serialization = messaging_backend != MessageBrokerType.RAM + self.query_answer_queue = SharedQueue() + + def __del__(self): + self.graceful_shutdown() + + def message_factory(self, command: str, args: List[str]) -> Optional[Message]: + print(command, args) + message = super().message_factory(command, args) + if message: + return message + if command == QueryNode.QUERY_ANSWER_FLOW_COMMAND: + return QueryAnswerFlow(command, args) + elif command == QueryNode.QUERY_ANSWER_TOKENS_FLOW_COMMAND: + return QueryAnswerTokensFlow(command, args) + elif command == QueryNode.QUERY_ANSWERS_FINISHED_COMMAND: + return QueryAnswersFinished(command, args) + return None + + def graceful_shutdown(self): + if self.is_shutting_down(): + return + with self.shutdown_flag_mutex: + self.shutdown_flag = True + if self.query_answer_processor: + self.query_answer_processor.join() + self.query_answer_processor = None + + def is_shutting_down(self) -> bool: + with self.shutdown_flag_mutex: + return self.shutdown_flag + + def query_answers_finished(self): + with self.query_answers_finished_flag_mutex: + self.query_answers_finished_flag = True + + def is_query_answers_finished(self) -> bool: + with self.query_answers_finished_flag_mutex: + return self.query_answers_finished_flag + + def add_query_answer(self, query_answer: QueryAnswer): + if self.is_query_answers_finished(): + raise ValueError("Invalid addition of new query answer.") + self.query_answer_queue.enqueue(query_answer) + + def pop_query_answer(self) -> QueryAnswer: + return self.query_answer_queue.dequeue() + + def is_query_answers_empty(self) -> bool: + return self.query_answer_queue.empty() + + def query_answer_processor_method(self): + raise NotImplementedError + + +class QueryNodeServer(QueryNode): + + def __init__(self, node_id: str, messaging_backend: MessageBrokerType = MessageBrokerType.RAM): + super().__init__(node_id, is_server=True, messaging_backend=messaging_backend) + + self.join_network() + self.query_answer_processor = threading.Thread(target=self.query_answer_processor_method) + self.query_answer_processor.start() + + def __del__(self): + self.graceful_shutdown() + + def node_joined_network(self, node_id: str): + self.add_peer(node_id) + + def cast_leadership_vote(self) -> str: + return self.node_id() + + def query_answer_processor_method(self): + while not self.is_shutting_down(): + time.sleep(0.5) # Sleep to avoid high CPU usage + + +class QueryAnswerFlow(Message): + + def __init__(self, command: str, args: List[str]): + self.query_answers = [QueryAnswer(pointer) for pointer in args] + + def act(self, arg, *args, **kwargs): + print(arg, *args, **kwargs) + # query_node = node # Assuming dynamic_pointer_cast logic is handled here + # for query_answer in self.query_answers: + # query_node.add_query_answer(query_answer) + + +class QueryAnswerTokensFlow(Message): + + def __init__(self, command: str, args: List[str]): + self.query_answers_tokens = args + + def act(self, arg, *args, **kwargs): + print(arg, *args, **kwargs) + # query_node = node # Assuming dynamic_pointer_cast logic is handled here + # for tokens in self.query_answers_tokens: + # query_answer = QueryAnswer() + # query_answer.untokenize(tokens) + # query_node.add_query_answer(query_answer) + + +class QueryAnswersFinished(Message): + + def __init__(self, command: str, args: List[str]): + pass + + def act(self,arg, *args, **kwargs): + print(arg, *args, **kwargs) + # query_node = node # Assuming dynamic_pointer_cast logic is handled here + # query_node.query_answers_finished() diff --git a/hyperon_das/das_node/remote_iterator.py b/hyperon_das/das_node/remote_iterator.py new file mode 100644 index 00000000..29164175 --- /dev/null +++ b/hyperon_das/das_node/remote_iterator.py @@ -0,0 +1,30 @@ +from hyperon_das.das_node.query_element import QueryElement +from hyperon_das_node import MessageBrokerType +from hyperon_das.das_node.query_answer import QueryAnswer +from hyperon_das.das_node.query_node import QueryNodeServer + +class RemoteIterator(QueryElement): + def __init__(self, local_id: str): + self.local_id = local_id + self.remote_input_buffer = None + self.setup_buffers() + + def __del__(self): + self.graceful_shutdown() + + def graceful_shutdown(self): + if self.remote_input_buffer: + self.remote_input_buffer.graceful_shutdown() + + def setup_buffers(self): + # Assuming QueryNodeServer and MessageBrokerType are defined elsewhere + print("local_server", self.local_id) + self.remote_input_buffer = QueryNodeServer(self.local_id, MessageBrokerType.GRPC) + + def finished(self) -> bool: + return (self.remote_input_buffer.is_query_answers_finished() and + self.remote_input_buffer.is_query_answers_empty()) + + def pop(self) -> QueryAnswer: + # Assuming QueryNode has a method `pop_query_answer` that returns a QueryAnswer + return self.remote_input_buffer.pop_query_answer() diff --git a/hyperon_das/das_node/shared_queue.py b/hyperon_das/das_node/shared_queue.py new file mode 100644 index 00000000..43f6b94b --- /dev/null +++ b/hyperon_das/das_node/shared_queue.py @@ -0,0 +1,63 @@ +import threading + +class SharedQueue: + def __init__(self, initial_size: int = 1000): + print("init SharedQueue") + self.size = initial_size + self.requests = [None] * self.size # List to hold the requests + self.count = 0 + self.start = 0 + self.end = 0 + self.request_queue_mutex = threading.Lock() + + def enqueue(self, request): + print("enqueue", request) + with self.request_queue_mutex: + if self.count == self.size: + self.enlarge_request_queue() + self.requests[self.end] = request + self.end = (self.end + 1) % self.size + self.count += 1 + + def dequeue(self): + print("dequeue") + with self.request_queue_mutex: + if self.count > 0: + answer = self.requests[self.start] + self.start = (self.start + 1) % self.size + self.count -= 1 + return answer + else: + return None + + def empty(self) -> bool: + with self.request_queue_mutex: + return self.count == 0 + + # Protected methods (used internally) + def current_size(self) -> int: + return self.size + + def current_start(self) -> int: + return self.start + + def current_end(self) -> int: + return self.end + + def current_count(self) -> int: + return self.count + + # Private method to enlarge the queue + def enlarge_request_queue(self): + new_size = self.size * 2 + new_queue = [None] * new_size + cursor = self.start + new_cursor = 0 + while cursor != self.end: + new_queue[new_cursor] = self.requests[cursor] + new_cursor += 1 + cursor = (cursor + 1) % self.size + self.size = new_size + self.start = 0 + self.end = new_cursor + self.requests = new_queue diff --git a/hyperon_das/das_node/simple_node.py b/hyperon_das/das_node/simple_node.py new file mode 100644 index 00000000..0936152e --- /dev/null +++ b/hyperon_das/das_node/simple_node.py @@ -0,0 +1,70 @@ +from hyperon_das_node import AtomSpaceNode, Message, LeadershipBrokerType, MessageBrokerType + +class PrintMessage(Message): + def __init__(self, content: str): + super().__init__() + self.content = content + + def act(self, node: "SimpleNode") -> None: + # ideally we should call a node.method in here + node.print_content(self.content) + +class SimpleNode(AtomSpaceNode): + def __init__(self, node_id: str, is_server: bool) -> None: + super().__init__( + node_id, + LeadershipBrokerType.SINGLE_MASTER_SERVER, + MessageBrokerType.GRPC, + ) + + self.is_server = is_server + self.known_commands = { + "print": PrintMessage, + } + + def print_content(self, content: str): + print(content) + + def message_factory(self, command: str, args: list[str]) -> Message: + message = super().message_factory(command, args) + if message is not None: + return message + + if klass:=self.known_commands.get(command): + return klass(*args) + + return None + +class SimpleNodeServer(SimpleNode): + def __init__(self, node_id: str) -> None: + super().__init__(node_id, True) + + def node_joined_network(self, node_id: str) -> None: + self.add_peer(node_id) + + def cast_leadership_vote(self) -> str: + return self.node_id() + +class SimpleNodeClient(SimpleNode): + def __init__(self, node_id: str, server_id: str) -> None: + super().__init__(node_id, False) + self.server_id = server_id + self.add_peer(server_id) + + def node_joined_network(self, node_id: str) -> None: + # do nothing + pass + + def cast_leadership_vote(self) -> str: + return self.server_id + + +if __name__ == "__main__": + server = SimpleNodeServer("localhost:35702") + client = SimpleNodeClient("localhost:35701", "localhost:35702") + server.join_network() + client.join_network() + import time + + client.send("print", ["qqq"], "localhost:35702") + time.sleep(10) diff --git a/hyperon_das/das_node/star_node.py b/hyperon_das/das_node/star_node.py new file mode 100644 index 00000000..4cd05698 --- /dev/null +++ b/hyperon_das/das_node/star_node.py @@ -0,0 +1,40 @@ +from hyperon_das_node import AtomSpaceNode, MessageBrokerType, LeadershipBrokerType + + +class StarNode(AtomSpaceNode): + def __init__(self, node_id: str = None, server_id: str = None, + messaging_backend: MessageBrokerType = MessageBrokerType.GRPC): + # Call the parent constructor (AtomSpaceNode) + super().__init__(node_id or server_id, LeadershipBrokerType.SINGLE_MASTER_SERVER, messaging_backend) + if server_id: + # If server_id is provided, this is a client node + self.server_id = server_id + self.is_server = False + self.add_peer(server_id) + print(server_id) + else: + # If no server_id, this is a server node + self.is_server = True + + print(self.is_server) + # Join the network regardless of server/client + self.join_network() + + def node_joined_network(self, node_id: str): + print("join", node_id) + if self.is_server: + self.add_peer(node_id) + + def cast_leadership_vote(self) -> str: + if self.is_server: + return self.node_id() + else: + return self.server_id + + def __del__(self): + # Destructor equivalent in Python (called when the object is deleted) + pass + + # Protected attributes + is_server: bool + server_id: str diff --git a/hyperon_das/query_engines/das_node_query_engine.py b/hyperon_das/query_engines/das_node_query_engine.py new file mode 100644 index 00000000..6afc7f68 --- /dev/null +++ b/hyperon_das/query_engines/das_node_query_engine.py @@ -0,0 +1,108 @@ +import time +from time import sleep +from random import randint +from typing import Any, Union, Iterator, List, Optional, Dict + +from hyperon_das_atomdb.database import HandleListT, AtomT, HandleT, IncomingLinksT, HandleSetT, LinkT +from pymongo import timeout + +from hyperon_das.context import Context +from hyperon_das.link_filters import LinkFilter +from hyperon_das.query_engines.query_engine_protocol import QueryEngine +from hyperon_das.type_alias import Query +from hyperon_das.utils import QueryAnswer +from hyperon_das.das_node.das_node import DASNode +from hyperon_das.das_node.simple_node import SimpleNodeClient, SimpleNodeServer +from hyperon_das.das_node.remote_iterator import RemoteIterator +from hyperon_das.das_node.query_answer import QueryAnswer + + +class DASNodeQueryEngine(QueryEngine): + + + def __init__(self, host, port, timeout=60): + self.next_query_port = randint(60000, 61999) + self.timeout = timeout + self.id = "localhost:" + str(self.next_query_port) + self.host = host + self.port = port + self.remote_das_node = ":".join([self.host, str(self.port)]) + # self.das_node = DASNode(node_id=self) + print(self.id) + self.requestor = DASNode(self.id, self.remote_das_node) + # self.requestor = SimpleNodeClient(self.id, self.remote_das_node) + # self.requestor2 = SimpleNodeServer(self.remote_das_node) + # self.requestor2.join_network() + # self.requestor.join_network() + + + def query( + self, query: Query, parameters: dict[str, Any] | None = None + ) -> Union[Iterator[QueryAnswer], List[QueryAnswer]]: + # qs = self.requestor.send("pattern_matching_query", query, self.remote_das_node) + # print("aaa", qs) + # assert qs + # assert False + qs: QueryAnswer = None + response: RemoteIterator = self.requestor.pattern_matcher_query(query) + start = time.time() + while not response.finished(): + while (qs := response.pop()) == None: + if response.finished(): + break + else: + print("sleep") + sleep(5) + if time.time() - start > self.timeout: + raise Exception("Timeout") + if qs is not None: + yield qs + + def get_atom(self, handle: HandleT) -> AtomT: + pass + + def get_atoms(self, handles: HandleListT, **kwargs) -> List[AtomT]: + pass + + def get_links(self, link_filter: LinkFilter) -> List[LinkT]: + pass + + def get_link_handles(self, link_filter: LinkFilter) -> HandleSetT: + pass + + def get_incoming_links(self, atom_handle: HandleT, **kwargs) -> IncomingLinksT: + pass + + def custom_query(self, index_id: str, query: Query, **kwargs) -> Union[Iterator, List[AtomT]]: + pass + + def count_atoms(self, parameters: Optional[Dict[str, Any]] = None) -> Dict[str, int]: + pass + + def reindex(self, pattern_index_templates: Optional[Dict[str, Dict[str, Any]]]): + pass + + def create_field_index(self, atom_type: str, fields: List[str], named_type: Optional[str] = None, + composite_type: Optional[List[Any]] = None, index_type: Optional[str] = None) -> str: + pass + + def fetch(self, query: Query, host: Optional[str] = None, port: Optional[int] = None, **kwargs) -> Any: + pass + + def create_context(self, name: str, queries: list[Query] | None = None) -> Context: + pass + + def commit(self, **kwargs) -> None: + pass + + def get_atoms_by_field(self, query: Query) -> HandleListT: + pass + + def get_atoms_by_text_field(self, text_value: str, field: Optional[str] = None, + text_index_id: Optional[str] = None) -> HandleListT: + pass + + def get_node_by_name_starting_with(self, node_type: str, startswith: str) -> HandleListT: + pass + + diff --git a/tests/integration/test_node_das.py b/tests/integration/test_node_das.py new file mode 100644 index 00000000..46ca67b7 --- /dev/null +++ b/tests/integration/test_node_das.py @@ -0,0 +1,22 @@ +from dns.resolver import query + +from hyperon_das.query_engines.das_node_query_engine import DASNodeQueryEngine + +class TestNodeDAS: + + def test_node_das(self): + qe = DASNodeQueryEngine("localhost", 35700 ) + # query = ["LINK_TEMPLATE", "Expression", "3", + # "NODE", "Symbol", "Similarity", + # "VARIABLE", "v1", + # "VARIABLE", "v2"] + query = [ + "LINK_TEMPLATE", "Expression", "3", + "NODE", "Symbol", "Similarity", + "NODE", "Symbol", "\"human\"", + "VARIABLE", "v1" + ] + qe.query(query) + for q in qe.query(query): + print(q) + assert q