diff --git a/commune/module/_task.py b/commune/executor/executor.py similarity index 97% rename from commune/module/_task.py rename to commune/executor/executor.py index e21ff711..c7eb18f1 100644 --- a/commune/module/_task.py +++ b/commune/executor/executor.py @@ -3,11 +3,8 @@ import threading from typing import * import threading - - - -class Task: - +import commune as c +class Executor(c.Module): thread_map = {} @@ -225,16 +222,6 @@ def tasks(cls, task = None, mode='pm2',**kwargs) -> List[str]: return tasks - @classmethod - def asubmit(cls, fn:str, *args, **kwargs): - - async def _asubmit(): - kwargs.update(kwargs.pop('kwargs',{})) - return fn(*args, **kwargs) - return _asubmit() - - - thread_map = {} @classmethod diff --git a/commune/module/_crypto.py b/commune/module/_crypto.py deleted file mode 100644 index 9943c7e6..00000000 --- a/commune/module/_crypto.py +++ /dev/null @@ -1,30 +0,0 @@ -import hashlib - -class Crypto: - @classmethod - def hash(cls, x, mode: str='sha256',*args,**kwargs) -> str: - x = cls.python2str(x) - if mode == 'keccak': - return cls.import_object('web3.main.Web3').keccak(text=x, *args, **kwargs).hex() - elif mode == 'ss58': - return cls.import_object('scalecodec.utils.ss58.ss58_encode')(x, *args,**kwargs) - elif mode == 'python': - return hash(x) - elif mode == 'md5': - return hashlib.md5(x.encode()).hexdigest() - elif mode == 'sha256': - return hashlib.sha256(x.encode()).hexdigest() - elif mode == 'sha512': - return hashlib.sha512(x.encode()).hexdigest() - elif mode =='sha3_512': - return hashlib.sha3_512(x.encode()).hexdigest() - else: - raise ValueError(f'unknown mode {mode}') - - - @classmethod - def hash_modes(cls): - return ['keccak', 'ss58', 'python', 'md5', 'sha256', 'sha512', 'sha3_512'] - - str2hash = hash - \ No newline at end of file diff --git a/commune/module/_endpoint.py b/commune/module/_endpoint.py deleted file mode 100644 index 81e01447..00000000 --- a/commune/module/_endpoint.py +++ /dev/null @@ -1,161 +0,0 @@ -from typing import * -class Endpoint: - - helper_functions = ['info', - 'metadata', - 'schema', - 'server_name', - 'is_admin', - 'namespace', - 'whitelist', - 'endpoints', - 'forward', - 'module_name', - 'class_name', - 'name', - 'address', - 'fns'] # whitelist of helper functions to load - - def add_endpoint(self, name, fn): - setattr(self, name, fn) - self.endpoints.append(name) - assert hasattr(self, name), f'{name} not added to {self.__class__.__name__}' - return {'success':True, 'message':f'Added {fn} to {self.__class__.__name__}'} - - def is_endpoint(self, fn) -> bool: - if isinstance(fn, str): - fn = getattr(self, fn) - return hasattr(fn, '__metadata__') - - def get_endpoints(self, search: str =None , helper_fn_attributes = ['helper_functions', - 'whitelist', - '_endpoints', - '__endpoints___']): - endpoints = [] - for k in helper_fn_attributes: - if hasattr(self, k): - fn_obj = getattr(self, k) - if callable(fn_obj): - endpoints += fn_obj() - else: - endpoints += fn_obj - for f in dir(self): - try: - if not callable(getattr(self, f)) or (search != None and search not in f): - continue - fn_obj = getattr(self, f) # you need to watchout for properties - is_endpoint = hasattr(fn_obj, '__metadata__') - if is_endpoint: - endpoints.append(f) - except Exception as e: - print(f'Error in get_endpoints: {e} for {f}') - return sorted(list(set(endpoints))) - - endpoints = get_endpoints - - - def cost_fn(self, fn:str, args:list, kwargs:dict): - return 1 - - @classmethod - def endpoint(cls, - cost=1, # cost per call - user2rate : dict = None, - rate_limit : int = 100, # calls per minute - timestale : int = 60, - public:bool = False, - cost_keys = ['cost', 'w', 'weight'], - **kwargs): - - for k in cost_keys: - if k in kwargs: - cost = kwargs[k] - break - - def decorator_fn(fn): - metadata = { - **cls.fn_schema(fn), - 'cost': cost, - 'rate_limit': rate_limit, - 'user2rate': user2rate, - 'timestale': timestale, - 'public': public, - } - import commune as c - fn.__dict__['__metadata__'] = metadata - - return fn - - return decorator_fn - - - - def metadata(self, to_string=False): - if hasattr(self, '_metadata'): - return self._metadata - metadata = {} - metadata['schema'] = self.schema() - metadata['description'] = self.description - metadata['urls'] = {k: v for k,v in self.urls.items() if v != None} - if to_string: - return self.python2str(metadata) - self._metadata = metadata - return metadata - - def info(self , - module = None, - lite_features = ['name', 'address', 'schema', 'key', 'description'], - lite = True, - cost = False, - **kwargs - ) -> Dict[str, Any]: - ''' - hey, whadup hey how is it going - ''' - info = self.metadata() - info['name'] = self.server_name or self.module_name() - info['address'] = self.address - info['key'] = self.key.ss58_address - return info - - @classmethod - def is_public(cls, fn): - if not cls.is_endpoint(fn): - return False - return getattr(fn, '__metadata__')['public'] - - - urls = {'github': None, - 'website': None, - 'docs': None, - 'twitter': None, - 'discord': None, - 'telegram': None, - 'linkedin': None, - 'email': None} - - - - def schema(self, - search = None, - docs: bool = True, - defaults:bool = True, - cache=True) -> 'Schema': - if self.is_str_fn(search): - return self.fn_schema(search, docs=docs, defaults=defaults) - schema = {} - if cache and self._schema != None: - return self._schema - fns = self.get_endpoints() - for fn in fns: - if search != None and search not in fn: - continue - if callable(getattr(self, fn )): - schema[fn] = self.fn_schema(fn, defaults=defaults,docs=docs) - # sort by keys - schema = dict(sorted(schema.items())) - if cache: - self._schema = schema - - return schema - diff --git a/commune/module/_manager.py b/commune/module/_manager.py index 860423aa..a71b776a 100644 --- a/commune/module/_manager.py +++ b/commune/module/_manager.py @@ -547,7 +547,7 @@ def get_module(cls, except Exception as e: return c.detailed_error(e) if path in ['module', 'c']: - return c + return c.Module # if the module is a valid import path shortcuts = c.shortcuts() if path in shortcuts: diff --git a/commune/module/_misc.py b/commune/module/_misc.py index d94c0d43..459e3ac0 100644 --- a/commune/module/_misc.py +++ b/commune/module/_misc.py @@ -1018,3 +1018,31 @@ def pip_install(cls, def pip_exists(cls, lib:str, verbose:str=True): return bool(lib in cls.pip_libs()) + + @classmethod + def hash(cls, x, mode: str='sha256',*args,**kwargs) -> str: + import hashlib + x = cls.python2str(x) + if mode == 'keccak': + return cls.import_object('web3.main.Web3').keccak(text=x, *args, **kwargs).hex() + elif mode == 'ss58': + return cls.import_object('scalecodec.utils.ss58.ss58_encode')(x, *args,**kwargs) + elif mode == 'python': + return hash(x) + elif mode == 'md5': + return hashlib.md5(x.encode()).hexdigest() + elif mode == 'sha256': + return hashlib.sha256(x.encode()).hexdigest() + elif mode == 'sha512': + return hashlib.sha512(x.encode()).hexdigest() + elif mode =='sha3_512': + return hashlib.sha3_512(x.encode()).hexdigest() + else: + raise ValueError(f'unknown mode {mode}') + + @classmethod + def hash_modes(cls): + return ['keccak', 'ss58', 'python', 'md5', 'sha256', 'sha512', 'sha3_512'] + + str2hash = hash + diff --git a/commune/module/_network.py b/commune/module/_network.py index d016493a..9461d651 100644 --- a/commune/module/_network.py +++ b/commune/module/_network.py @@ -156,59 +156,7 @@ def external_ip(cls, default_ip='0.0.0.0') -> str: return ip return default_ip - - @staticmethod - def upnpc_create_port_map(port: int): - r""" Creates a upnpc port map on your router from passed external_port to local port. - - Args: - port (int, `required`): - The local machine port to map from your external port. - - Return: - external_port (int, `required`): - The external port mappeclass to the local port on your machine. - - Raises: - Exception (Exception): - Raised if UPNPC port mapping fails, for instance, if upnpc is not enabled on your router. - """ - - try: - import miniupnpc - upnp = miniupnpc.UPnP() - upnp.discoverdelay = 200 - logger.debug('UPNPC: Using UPnP to open a port on your router ...') - logger.debug('UPNPC: Discovering... delay={}ms', upnp.discoverdelay) - ndevices = upnp.discover() - upnp.selectigd() - logger.debug('UPNPC: ' + str(ndevices) + ' device(s) detected') - - ip = upnp.lanaddr - external_ip = upnp.externalipaddress() - - logger.debug('UPNPC: your local ip address: ' + str(ip)) - logger.debug('UPNPC: your external ip address: ' + str(external_ip)) - logger.debug('UPNPC: status = ' + str(upnp.statusinfo()) + " connection type = " + str(upnp.connectiontype())) - - # find a free port for the redirection - external_port = port - rc = upnp.getspecificportmapping(external_port, 'TCP') - while rc != None and external_port < 65536: - external_port += 1 - rc = upnp.getspecificportmapping(external_port, 'TCP') - if rc != None: - raise Exception("UPNPC: No available external ports for port mapping.") - - logger.info('UPNPC: trying to redirect remote: {}:{} => local: {}:{} over TCP', external_ip, external_port, ip, port) - upnp.addportmapping(external_port, 'TCP', ip, port, 'Bittensor: %u' % external_port, '') - logger.info('UPNPC: Create Success') - - return external_port - - except Exception as e: - raise Exception(e) from e - + @classmethod def unreserve_port(cls,port:int, var_path='reserved_ports'): diff --git a/commune/module/module.py b/commune/module/module.py index d499173c..d4d11804 100755 --- a/commune/module/module.py +++ b/commune/module/module.py @@ -694,6 +694,168 @@ def set_params(self,*args, **kwargs): def init_module(self,*args, **kwargs): return self.set_config(*args, **kwargs) + + + + helper_functions = ['info', + 'metadata', + 'schema', + 'server_name', + 'is_admin', + 'namespace', + 'whitelist', + 'endpoints', + 'forward', + 'module_name', + 'class_name', + 'name', + 'address', + 'fns'] # whitelist of helper functions to load + + def add_endpoint(self, name, fn): + setattr(self, name, fn) + self.endpoints.append(name) + assert hasattr(self, name), f'{name} not added to {self.__class__.__name__}' + return {'success':True, 'message':f'Added {fn} to {self.__class__.__name__}'} + + def is_endpoint(self, fn) -> bool: + if isinstance(fn, str): + fn = getattr(self, fn) + return hasattr(fn, '__metadata__') + + def get_endpoints(self, search: str =None , helper_fn_attributes = ['helper_functions', + 'whitelist', + '_endpoints', + '__endpoints___']): + endpoints = [] + for k in helper_fn_attributes: + if hasattr(self, k): + fn_obj = getattr(self, k) + if callable(fn_obj): + endpoints += fn_obj() + else: + endpoints += fn_obj + for f in dir(self): + try: + if not callable(getattr(self, f)) or (search != None and search not in f): + continue + fn_obj = getattr(self, f) # you need to watchout for properties + is_endpoint = hasattr(fn_obj, '__metadata__') + if is_endpoint: + endpoints.append(f) + except Exception as e: + print(f'Error in get_endpoints: {e} for {f}') + return sorted(list(set(endpoints))) + + endpoints = get_endpoints + + + def cost_fn(self, fn:str, args:list, kwargs:dict): + return 1 + + @classmethod + def endpoint(cls, + cost=1, # cost per call + user2rate : dict = None, + rate_limit : int = 100, # calls per minute + timestale : int = 60, + public:bool = False, + cost_keys = ['cost', 'w', 'weight'], + **kwargs): + + for k in cost_keys: + if k in kwargs: + cost = kwargs[k] + break + + def decorator_fn(fn): + metadata = { + **cls.fn_schema(fn), + 'cost': cost, + 'rate_limit': rate_limit, + 'user2rate': user2rate, + 'timestale': timestale, + 'public': public, + } + import commune as c + fn.__dict__['__metadata__'] = metadata + + return fn + + return decorator_fn + + + + def metadata(self, to_string=False): + if hasattr(self, '_metadata'): + return self._metadata + metadata = {} + metadata['schema'] = self.schema() + metadata['description'] = self.description + metadata['urls'] = {k: v for k,v in self.urls.items() if v != None} + if to_string: + return self.python2str(metadata) + self._metadata = metadata + return metadata + + def info(self , + module = None, + lite_features = ['name', 'address', 'schema', 'key', 'description'], + lite = True, + cost = False, + **kwargs + ) -> Dict[str, Any]: + ''' + hey, whadup hey how is it going + ''' + info = self.metadata() + info['name'] = self.server_name or self.module_name() + info['address'] = self.address + info['key'] = self.key.ss58_address + return info + + @classmethod + def is_public(cls, fn): + if not cls.is_endpoint(fn): + return False + return getattr(fn, '__metadata__')['public'] + + + urls = {'github': None, + 'website': None, + 'docs': None, + 'twitter': None, + 'discord': None, + 'telegram': None, + 'linkedin': None, + 'email': None} + + + + def schema(self, + search = None, + docs: bool = True, + defaults:bool = True, + cache=True) -> 'Schema': + if self.is_str_fn(search): + return self.fn_schema(search, docs=docs, defaults=defaults) + schema = {} + if cache and self._schema != None: + return self._schema + fns = self.get_endpoints() + for fn in fns: + if search != None and search not in fn: + continue + if callable(getattr(self, fn )): + schema[fn] = self.fn_schema(fn, defaults=defaults,docs=docs) + # sort by keys + schema = dict(sorted(schema.items())) + if cache: + self._schema = schema + + return schema + + c.enable_routes() Module = c # Module is alias of c Module.run(__name__) diff --git a/commune/module/module.yaml b/commune/module/module.yaml index 14c20204..db533279 100644 --- a/commune/module/module.yaml +++ b/commune/module/module.yaml @@ -155,6 +155,7 @@ routes: - kill_many - kill_all - wait_for_server + - remote_fn pm2: - [kill_many, pm2_kill_many] @@ -206,6 +207,17 @@ routes: - generate - ask - models + executor: + - wait + - gather + - submit + - submit_batch + - thread + - threads + - as_completed + - is_coroutine + - obj2typestr + - tasks + - thread_map + - chat: - - ask s diff --git a/commune/server/server.py b/commune/server/server.py index 1ec880b0..73d5d291 100644 --- a/commune/server/server.py +++ b/commune/server/server.py @@ -1,9 +1,9 @@ import commune as c -import pandas as pd from typing import * from fastapi import FastAPI, Request from fastapi.middleware.cors import CORSMiddleware import uvicorn +import json from .middleware import ServerMiddleware from sse_starlette.sse import EventSourceResponse @@ -314,80 +314,56 @@ def kill_all(cls, network='local', timeout=20, verbose=True): @classmethod def serve(cls, - module:Any = None, - kwargs:dict = None, # kwargs for the module - params = None, # kwargs for the module - tag:str=None, - server_network = 'local', # network to run the server - port :int = None, # name of the server if None, it will be the module name + module: Any = None, + kwargs:Optional[dict] = None, # kwargs for the module + params: Optional[dict] = None, # kwargs for the module + tag:Optional[str]=None, + server_network: Optional[str] = 'local', # network to run the server + port :Optional[int] = None, # name of the server if None, it will be the module name server_name:str=None, # name of the server if None, it will be the module name name = None, # name of the server if None, it will be the module name refresh:bool = True, # refreshes the server's key remote:bool = True, # runs the server remotely (pm2, ray) tag_seperator:str='::', max_workers:int = None, - free: bool = False, + public: bool = False, mnemonic = None, # mnemonic for the server key = None, **extra_kwargs ): - module = module or c.module_name() - if module.endswith('.py'): - module = module[:-3] - if tag_seperator in str(module): - module, tag = module.split(tag_seperator) kwargs = {**(params or kwargs or {}), **extra_kwargs} - name = name or server_name or module + name = (name or server_name or module) or c.module_name() if tag_seperator in name: module, tag = name.split(tag_seperator) - else: - if tag != None: - name = f'{name}{tag_seperator}{tag}' - + if tag != None: + name = f'{module}{tag_seperator}{tag}' if port == None: # now if we have the server_name, we can repeat the server - address = c.get_address(name, network=server_network) - try: - port = int(address.split(':')[-1]) - if c.port_used(port): - c.kill_port(port) - except Exception as e: - port = c.free_port() + namespace = c.namespace(network=server_network) + port = int(namespace.get(name).split(':')[-1]) if name in namespace else c.free_port() + address = '0.0.0.0:' + str(port) # RESOLVE THE PORT FROM THE ADDRESS IF IT ALREADY EXISTS - # # NOTE REMOVE is FROM THE KWARGS REMOTE + response = { 'module':module, 'name': name, 'address':address, 'kwargs':kwargs} if remote: + remote = False remote_kwargs = c.locals2kwargs(locals()) # GET THE LOCAL KWARGS FOR SENDING TO THE REMOTE - remote_kwargs['remote'] = False # SET THIS TO FALSE TO AVOID RECURSION - for _ in ['extra_kwargs', 'address']: + for _ in ['extra_kwargs', 'address', 'response']: remote_kwargs.pop(_, None) # WE INTRODUCED THE ADDRES - response = cls.remote_fn('serve', name=name, kwargs=remote_kwargs) - if response['success'] == False: - return response - return {'success':True, - 'name': name, - 'address':c.ip() + ':' + str(remote_kwargs['port']), - 'kwargs':kwargs, - 'module':module - } - - module_class = c.module(module) - kwargs.update(extra_kwargs) - module = module_class(**kwargs) + cls.remote_fn('serve', name=name, kwargs=remote_kwargs) + return response + + module = c.module(module)(**kwargs) cls(module=module, - name=name, - port=port, - network=server_network, - max_workers=max_workers, - mnemonic = mnemonic, - free=free, - key=key) - - return {'success':True, - 'address': f'{c.default_ip}:{port}' , - 'name':name, - 'kwargs': kwargs, - 'module':module} + name=name, + port=port, + network=server_network, + max_workers=max_workers, + mnemonic = mnemonic, + public=public, + key=key) + + return response diff --git a/commune/subspace/subspace.py b/commune/subspace/subspace.py index 18449ce1..72bf6f78 100644 --- a/commune/subspace/subspace.py +++ b/commune/subspace/subspace.py @@ -434,44 +434,28 @@ def query_map(self, name: str = 'StakeFrom', # if all lowercase then we want to capitalize the first letter module = self.resolve_query_module_from_name(name) - path = f'query_map/{self.config.network}/{module}.{name}' # resolving the params params = params or [] - is_single_subnet = bool(netuid != 'all' and netuid != None) - if is_single_subnet: - params = [netuid] + params - if not isinstance(params, list): - params = [params] + params = [netuid] + params if bool(netuid != 'all' and netuid != None) else params + params = params if isinstance(params, list) else [params] + path = f'query_map/{self.network}/{module}.{name}' if len(params) > 0 : path = path + f'::params::' + '-'.join([str(p) for p in params]) - value = self.get(path, None , max_age=max_age, update=update) - if value == None: # if the value is a tuple then we want to convert it to a list - - while trials > 0: - try: - substrate = self.get_substrate( mode=mode) - print(f'Querying {name} with params {params} and block {block}') - qmap = substrate.query_map( - module=module, - storage_function = name, - params = params, - page_size = page_size, - max_results = max_results, - block_hash =substrate.get_block_hash(block) - ) - break - except Exception as e: - trials = trials - 1 - if trials == 0: - raise e - + substrate = self.get_substrate( mode=mode) + c.print(f'Querying {name} with params {params} and block {block}') + qmap = substrate.query_map( + module=module, + storage_function = name, + params = params, + page_size = page_size, + max_results = max_results, + block_hash =substrate.get_block_hash(block) + ) new_qmap = {} - progress_bar = c.progress(qmap, desc=f'Querying {name}(network={self.network})') for (k,v) in qmap: - progress_bar.update(1) if not isinstance(k, tuple): k = [k] if type(k) in [tuple,list]: @@ -501,9 +485,7 @@ def process_qmap(d): # sort the dictionary by key d = dict(sorted(d.items())) return d - new_map = process_qmap(new_qmap) - return new_map def runtime_spec_version(self): @@ -3057,10 +3039,6 @@ def netuid2module(self, update=False, fmt:str='j', **kwargs) -> 'ModuleInfo': return netuid2module - - - - def netuid2uid(self, key=None, update=False, **kwargs) -> Dict[str, str]: key = self.resolve_key_ss58(key) netuids = self.netuids(update=update)