Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add query client support #8

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions src/poktroll_clients/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# Add generated protobuf types to the module path.
from os import path

from .go_memory import ffi, libpoktroll_clients, go_ref, check_err, check_ref, GoManagedMem
from .depinject import Supply, SupplyMany
from .events_query_client import EventsQueryClient
from .block_client import BlockClient, BlockQueryClient
from .tx_context import TxContext
from .tx_client import TxClient
from .events_query_client import EventsQueryClient
from .depinject import Supply, SupplyMany
from .go_memory import go_ref

__all__ = [
'BlockClient',
Expand All @@ -16,5 +16,10 @@
'EventsQueryClient',
'Supply',
'SupplyMany',
'ffi',
'go_ref',
'check_err',
'check_ref',
'GoManagedMem',
'libpoktroll_clients',
]
2 changes: 1 addition & 1 deletion src/poktroll_clients/block_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class BlockQueryClient(GoManagedMem):
TODO_IN_THIS_COMMIT: comment
"""

self_ref: go_ref
go_ref: go_ref
err_ptr: ffi.CData

def __init__(self, query_node_rpc_url: str):
Expand Down
4 changes: 2 additions & 2 deletions src/poktroll_clients/events_query_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ class EventsQueryClient(GoManagedMem):
err_ptr: ffi.CData

def __init__(self, query_node_rpc_websocket_url: str):
go_ref = libpoktroll_clients.NewEventsQueryClient(query_node_rpc_websocket_url.encode('utf-8'))
super().__init__(go_ref)
self_ref = libpoktroll_clients.NewEventsQueryClient(query_node_rpc_websocket_url.encode('utf-8'))
super().__init__(self_ref)

def EventsBytes(self, query: str) -> go_ref:
return libpoktroll_clients.EventsQueryClientEventsBytes(self.go_ref, query.encode('utf-8'))
86 changes: 86 additions & 0 deletions src/poktroll_clients/ffi.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
long long int __align;
} pthread_cond_t;

// TODO: convert to snake case
typedef struct AsyncContext {
pthread_mutex_t mutex;
pthread_cond_t cond;
Expand All @@ -45,6 +46,7 @@
typedef void (*error_callback)(AsyncContext* ctx, const char* error);
typedef void (*cleanup_callback)(AsyncContext* ctx);

// TODO: convert to snake case
typedef struct AsyncOperation {
AsyncContext* ctx;
success_callback on_success;
Expand Down Expand Up @@ -78,6 +80,8 @@
serialized_proto* messages;
size_t num_messages;
} proto_message_array;

serialized_proto* GetGoProtoAsSerializedProto(go_ref go_proto_ref, char **err);

go_ref NewEventsQueryClient(const char* comet_websocket_url);
go_ref EventsQueryClientEventsBytes(go_ref selfRef, const char* query);
Expand All @@ -91,6 +95,88 @@
go_ref NewTxClient(go_ref deps_ref, char *signing_key_name, char **err);
go_ref TxClient_SignAndBroadcast(AsyncOperation* op, go_ref self_ref, serialized_proto *msg);
go_ref TxClient_SignAndBroadcastMany(AsyncOperation* op, go_ref self_ref, proto_message_array *msgs);

// Params update methods (all modules)
go_ref TxClient_UpdateSharedParams(AsyncOperation* op, go_ref self_ref, char *params);
go_ref TxClient_UpdateApplicationParams(AsyncOperation* op, go_ref self_ref, char *params);
go_ref TxClient_UpdateGatewayParams(AsyncOperation* op, go_ref self_ref, char *params);
go_ref TxClient_UpdateSupplierParams(AsyncOperation* op, go_ref self_ref, char *params);
go_ref TxClient_UpdateSessionParams(AsyncOperation* op, go_ref self_ref, char *params);
go_ref TxClient_UpdateServiceParams(AsyncOperation* op, go_ref self_ref, char *params);
go_ref TxClient_UpdateProofParams(AsyncOperation* op, go_ref self_ref, char *params);
go_ref TxClient_UpdateTokenomicsParams(AsyncOperation* op, go_ref self_ref, char *params);

// Param (individual) update methods (all modules)
go_ref TxClient_UpdateSharedParam(AsyncOperation* op, go_ref self_ref, char *params);
go_ref TxClient_UpdateApplicationParam(AsyncOperation* op, go_ref self_ref, char *params);
go_ref TxClient_UpdateGatewayParam(AsyncOperation* op, go_ref self_ref, char *params);
go_ref TxClient_UpdateSupplierParam(AsyncOperation* op, go_ref self_ref, char *params);
go_ref TxClient_UpdateSessionParam(AsyncOperation* op, go_ref self_ref, char *params);
go_ref TxClient_UpdateServiceParam(AsyncOperation* op, go_ref self_ref, char *params);
go_ref TxClient_UpdateProofParam(AsyncOperation* op, go_ref self_ref, char *params);
go_ref TxClient_UpdateTokenomicsParam(AsyncOperation* op, go_ref self_ref, char *params);

// Application module message methods
go_ref TxClient_StakeApplication(AsyncOperation* op, go_ref self_ref, char *address, char *stake, proto_message_array *services);
go_ref TxClient_UnstakeApplication(AsyncOperation* op, go_ref self_ref, char *address, char *stake, proto_message_array *services);
go_ref TxClient_DelegateToGateway(AsyncOperation* op, go_ref self_ref, char *address, char *stake, proto_message_array *services);
go_ref TxClient_UndelegateFromGateway(AsyncOperation* op, go_ref self_ref, char *address, char *stake, proto_message_array *services);
go_ref TxClient_TransferApplication(AsyncOperation* op, go_ref self_ref, char *address, char *stake, proto_message_array *services);

// Gateway module message methods
go_ref TxClient_StakeGateway(AsyncOperation* op, go_ref self_ref, char *address, char *stake);
go_ref TxClient_UnstakeGateway(AsyncOperation* op, go_ref self_ref, char *address, char *stake);

// Supplier module message methods
go_ref TxClient_StakeSupplier(AsyncOperation* op, go_ref self_ref, char *address, char *stake);
go_ref TxClient_UnstakeSupplier(AsyncOperation* op, go_ref self_ref, char *address, char *stake);

// Service module message methods
go_ref TxClient_AddService(AsyncOperation* op, go_ref self_ref, char *owner_address, serialized_proto *service);

// Proof module message methods
go_ref TxClient_CreateClaim(AsyncOperation* op, go_ref self_ref, char *owner_address, char *session_header, char *root_hash, char *proof);
go_ref TxClient_SubmitProof(AsyncOperation* op, go_ref self_ref, char *owner_address, char *session_header, char *proof);

go_ref NewQueryClient(go_ref deps_ref, char *query_node_rpc_url, char **err);

// Params query methods (all modules)
// go_ref QueryClient_GetSharedParams(AsyncOperation* op, go_ref self_ref);
go_ref QueryClient_GetSharedParams(go_ref self_ref, char **err);
go_ref QueryClient_GetApplicationParams(AsyncOperation* op, go_ref self_ref);
go_ref QueryClient_GetGatewayParams(AsyncOperation* op, go_ref self_ref);
go_ref QueryClient_GetSupplierParams(AsyncOperation* op, go_ref self_ref);
go_ref QueryClient_GetSessionParams(AsyncOperation* op, go_ref self_ref);
go_ref QueryClient_GetServiceParams(AsyncOperation* op, go_ref self_ref);
go_ref QueryClient_GetProofParams(AsyncOperation* op, go_ref self_ref);
go_ref QueryClient_GetTokenomicsParams(AsyncOperation* op, go_ref self_ref);

// Application module query methods
go_ref QueryClient_GetApplication(AsyncOperation* op, go_ref self_ref, char *address);
go_ref QueryClient_GetAllApplications(AsyncOperation* op, go_ref self_ref, char *address);

// Gateway module query methods
go_ref QueryClient_GetGateway(AsyncOperation* op, go_ref self_ref, char *address);
go_ref QueryClient_GetAllGateways(AsyncOperation* op, go_ref self_ref, char *address);

// Supplier module query methods
go_ref QueryClient_GetSupplier(AsyncOperation* op, go_ref self_ref, char *address);
go_ref QueryClient_GetAllSuppliers(AsyncOperation* op, go_ref self_ref, char *address);

// Session module query methods
go_ref QueryClient_GetSession(AsyncOperation* op, go_ref self_ref, char *address);

// Service module query methods
go_ref QueryClient_GetService(AsyncOperation* op, go_ref self_ref, char *address);
go_ref QueryClient_GetAllServices(AsyncOperation* op, go_ref self_ref, char *address);
go_ref QueryClient_GetRelayMiningDifficulty(AsyncOperation* op, go_ref self_ref, char *address);
go_ref QueryClient_GetAllRelayMiningDifficulties(AsyncOperation* op, go_ref self_ref, char *address);

// Proof module query methods
go_ref QueryClient_GetClaim(AsyncOperation* op, go_ref self_ref, char *address);
go_ref QueryClient_GetAllClaims(AsyncOperation* op, go_ref self_ref, char *address);
go_ref QueryClient_GetProof(AsyncOperation* op, go_ref self_ref, char *address);
go_ref QueryClient_GetAllProofs(AsyncOperation* op, go_ref self_ref, char *address);
""")


Expand Down
10 changes: 5 additions & 5 deletions src/poktroll_clients/go_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@

# TODO_IN_THIS_COMMIT: switch to an err_msg[] array

def check_err(err_ptr: ffi.CData):
def check_err(err_ptr: ffi.CData) -> None:
"""
TODO_IN_THIS_COMMIT: comment...
"""
if err_ptr[0] != ffi.NULL:
raise FFIError(ffi.string(err_ptr[0]))


def check_ref(go_ref: go_ref):
def check_ref(go_ref: go_ref) -> None:
if go_ref < 1:
raise FFIError("unexpected emtpy go_ref")

Expand All @@ -34,15 +34,15 @@ class GoManagedMem:
go_ref: go_ref
err_ptr: ffi.CData = ffi.new("char **")

def __init__(self, go_ref: go_ref):
def __init__(self, self_ref: go_ref):
"""
Constructor for GoManagedMem. Stores the Go-managed memory reference.
"""

self.go_ref = go_ref
self.go_ref = self_ref

check_err(self.err_ptr)
check_ref(go_ref)
check_ref(self_ref)

def __del__(self):
"""
Expand Down
98 changes: 98 additions & 0 deletions src/poktroll_clients/protobuf.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import importlib
from dataclasses import dataclass
from typing import List

from google.protobuf import symbol_database, message
from google.protobuf.json_format import MessageToDict

from poktroll_clients import go_ref, libpoktroll_clients, check_err
from poktroll_clients.ffi import ffi


Expand All @@ -13,6 +18,21 @@ class SerializedProto:
type_url: str
data: bytes

@staticmethod
def from_c_struct(c_serialized_proto: ffi.CData):
return SerializedProto(
type_url=(ffi.string(c_serialized_proto.type_url, c_serialized_proto.type_url_length).decode('utf-8')),
data=(bytes(ffi.buffer(c_serialized_proto.data, c_serialized_proto.data_length))),
)

def __init__(self, c_serialized_proto: ffi.CData = None, type_url: str = "", data: bytes = b""):
self.type_url = type_url
self.data = data

if c_serialized_proto is not None:
self.type_url = ffi.string(c_serialized_proto.type_url, c_serialized_proto.type_url_length).decode('utf-8')
self.data = bytes(ffi.buffer(c_serialized_proto.data, c_serialized_proto.data_length))

def to_c_struct(self) -> ffi.CData:
"""
Converts the Python protobuf data to a C struct while preserving the underlying memory.
Expand Down Expand Up @@ -68,3 +88,81 @@ def to_c_struct(self) -> ffi.CData:
proto_message_array.messages[i].data_length = c_msg.data_length

return proto_message_array


def get_serialized_proto(go_proto_ref: go_ref) -> SerializedProto:
"""
TODO_IN_THIS_COMMIT: move and comment...
"""
err_ptr = ffi.new("char **")

c_serialized_proto = libpoktroll_clients.GetGoProtoAsSerializedProto(go_proto_ref, err_ptr)

check_err(err_ptr)

return SerializedProto.from_c_struct(c_serialized_proto)


def deserialize_protobuf(serialized_data: bytes, type_url: str) -> message.Message:
"""
Deserialize protocol buffer data given a type URL.

Args:
serialized_data: Bytes containing the serialized protobuf message
type_url: Type URL in format "type.googleapis.com/package.MessageType"
or "package.MessageType"
Returns:
dict: Deserialized protobuf message as a dictionary

Raises:
ValueError: If type URL is invalid or message type cannot be found
ImportError: If the protobuf module cannot be imported
"""
try:
# First, import the module containing the protobuf classes
# This ensures the types are registered in the symbol database
type_url = type_url.lstrip("/")
poktroll_namespace = type_url.rsplit(".", 1)[0]
package_filename = f"{type_url.rsplit('.', 1)[1].lower()}_pb2"
package_module = f"poktroll_clients.proto.{poktroll_namespace}.{package_filename}"
importlib.import_module(package_module)
except ImportError as e:
raise ImportError(f"Could not import protobuf module {package_module}: {str(e)}")

# Extract the full message type from the type URL
if '/' in type_url:
_, full_type = type_url.split('/', 1)
else:
full_type = type_url

# Split into package and message type to validate format
parts = full_type.split('.')
if len(parts) < 2:
raise ValueError("Invalid type URL format")

try:
# Get the message class from the symbol database
db = symbol_database.Default()
message_class = db.GetSymbol(full_type)

# Create a new message instance and parse the data
message = message_class()
message.ParseFromString(serialized_data)

return message
# # Convert to dictionary for easier handling
# return MessageToDict(message)

except KeyError as e:
raise ValueError(
f"Could not find message type: {full_type}. Make sure it's registered in the symbol database.") from e
except Exception as e:
raise ValueError(f"Error deserializing protobuf: {str(e)}") from e


def get_proto_from_go_ref(go_proto_ref: go_ref) -> message.Message:
"""
TODO_IN_THIS_COMMIT: move and comment...
"""
serialized_proto = get_serialized_proto(go_proto_ref)
return deserialize_protobuf(serialized_proto.data, serialized_proto.type_url)
Loading