diff --git a/LICENSE b/LICENSE index 8cdb845..eda48e2 100644 --- a/LICENSE +++ b/LICENSE @@ -1,3 +1,29 @@ +Ethereum stratum proxy - for ethereum-pools using stratum protocol RPCv2 + +Copyright (C) 2015 Atrides http://DwarfPool.com/eth + +# Stratum proxy + +Copyright (C) slush0 +https://github.com/slush0/stratum-mining-proxy + +# Stratum protocol +https://github.com/slush0/stratum + +Copyright (C) 2012 Marek Palatinus + +This program is free software: you can redistribute it and/or modify +it under the terms of the GNU Affero General Public License as +published by the Free Software Foundation, either version 3 of the +License, or any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU Affero General Public License for more details. + +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + GNU GENERAL PUBLIC LICENSE Version 2, June 1991 diff --git a/README.md b/README.md index d4fc882..11ba050 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,108 @@ -# eth-proxy -Stratum proxy for Ethereum +#Description + +This is Stratum Proxy for Ethereum-pools (RPCv2) using asynchronous networking written in Python Twisted. +Originally developed for DwarfPool http://dwarfpool.com/eth + +**NOTE:** This fork is still in development. Some features may be broken. Please report any broken features or issues. + + +#Features + +* Additional 10%~20% increase of earning compared to standard pools +* ETH stratum proxy +* Only one connection to the pool +* Workers get new jobs immediately +* Submit of shares without network delay, it's like solo-mining but with benefits of professional pool +* Central Wallet configuration, miners doesn't need wallet as username +* Support monitoring via email +* Bypass worker_id for detailed statistic and per rig monitoring + + +#How it works + + Pool A <---+ +-------------+ Rig1 / PC1 + (Active) | | + | +-------------+ Rig2 / PC2 + | | + Pool B <---+-----StratumProxy <-----+-------------+ Rig3 / PC3 +(FailOver) | + +-------------+ Rig4 / PC4 + | + +-------------+ Leaserigs + + +#ToDo + +* Automatically failover via proxy +* Create for Windows users compiled .exe file +* pass submitHashrate to pool + + +#Configuration + +* all configs in file config.py + + +#Command line to miner start, recommended farm-recheck to use with stratum-proxy is 200 + +* ./ethminer --farm-recheck 200 -G -F http://127.0.0.1:8080/rig1 + + +#Donations + +* ETH: 0xb7302f5988cd483db920069a5c88f049a3707e2f + + +#Requirements + +eth-proxy is built in python. I have been testing it with 2.7.3, but it should work with other versions. The requirements for running the software are below. + +* Python 2.7+ +* python-twisted + + +#Installation and start + +* [Linux] +1) install twisted + apt-get install python-twisted +2) start proxy with + python ./eth-proxy.py + +* [Windows] +1) Download Python Version 2.7.10 for Windows +https://www.python.org/downloads/ + +2) Modify PATH variable (how-to http://www.java.com/en/download/help/path.xml) and add + C:\Python27;C:\Python27\Scripts; + +3) Install python setuptools +https://pypi.python.org/pypi/setuptools/#windows-7-or-graphical-install + +4) Install Python-Twisted +https://pypi.python.org/pypi/Twisted/15.4.0 +File Twisted-15.4.0.win32-py2.7.msi (32bit) or Twisted-15.4.0.win-amd64-py2.7.msi (64bit) + +5) Install zope.interface, in console run: + easy_install -U zope.interface + +6) Install PyWin32 v2.7 +pywin32-219.win32-py2.7.exe or pywin32-219.win-amd64-py2.7.exe +http://sourceforge.net/projects/pywin32/files/pywin32/ + +7) Download eth-proxy. Extract eth-proxy.zip. Change settings in config.py and start with command: + python xmr-proxy.py + + +#Contact + +* I am available via admin@dwarfpool.com + +#Credits + +* Original version by Slush0 (original stratum code) +* More Features added by GeneralFault, Wadee Womersley and Moopless + +#License + +* This software is provides AS-IS without any warranties of any kind. Please use at your own risk. diff --git a/eth-proxy.conf b/eth-proxy.conf new file mode 100644 index 0000000..0e7af06 --- /dev/null +++ b/eth-proxy.conf @@ -0,0 +1,45 @@ +### +# Command line for miners: +# +# ethminer.exe -G -F http://HOST:PORT/ +# ethminer.exe -G -F http://HOST:PORT/rig1 +# +# ethminer.exe -G -F http://127.0.0.1:8080/ +# ethminer.exe -G -F http://192.168.0.33:8080/rig1 +# +# You can submit shares without workername or +# You can provide workername: +# - with url like "/rig1" +# - or use automatically numbering(integer) based on IP of miner +### + +# Host and port for your workers +HOST = "0.0.0.0" +PORT = 8080 + +# Coin address where money goes +WALLET = "0x2a65aca4d5fc5b5c859090a6c34d164135398226" + +# It's useful for individually monitoring and statistic +ENABLE_WORKER_ID = True + +# On DwarfPool you have option to monitor your workers via email. +# If WORKER_ID is enabled, you can monitor every worker/rig separately. +MONITORING = False +MONITORING_EMAIL = "mail@example.com" + +# Main pool +POOL_HOST = "eth-ru.dwarfpool.com" +POOL_PORT = 8008 + +# Failover pool. CURRENTLY DOESN'T WORK! +POOL_FAILOVER_ENABLE = False +POOL_HOST_FAILOVER = "eth-eu.dwarfpool.com" +POOL_PORT_FAILOVER = 8008 + +# Logging +LOG_TO_FILE = True + +# Enable debug +DEBUG = False + \ No newline at end of file diff --git a/eth-proxy.py b/eth-proxy.py new file mode 100644 index 0000000..0010696 --- /dev/null +++ b/eth-proxy.py @@ -0,0 +1,146 @@ +#!/usr/bin/env python +# -*- coding:utf-8 -*- + +import time +import os +import socket + +from stratum import settings +import stratum.logger +log = stratum.logger.get_logger('proxy') + +if __name__ == '__main__': + if len(settings.WALLET)!=42 and len(settings.WALLET)!=40: + log.error("Wrong WALLET!") + quit() + settings.CUSTOM_EMAIL = settings.MONITORING_EMAIL if settings.MONITORING_EMAIL and settings.MONITORING else "" + +from twisted.internet import reactor, defer, protocol +from twisted.internet import reactor as reactor2 +from stratum.socket_transport import SocketTransportFactory, SocketTransportClientFactory +from stratum.services import ServiceEventHandler +from twisted.web.server import Site +from stratum.custom_exceptions import TransportException + +from mining_libs import getwork_listener +from mining_libs import client_service +from mining_libs import jobs +from mining_libs import version +from mining_libs.jobs import Job + +def on_shutdown(f): + '''Clean environment properly''' + log.info("Shutting down proxy...") + if os.path.isfile('eth-proxy.pid'): + os.remove('eth-proxy.pid') + f.is_reconnecting = False # Don't let stratum factory to reconnect again + +# Support main connection +@defer.inlineCallbacks +def ping(f): + if not f.is_reconnecting: + return + try: + yield (f.rpc('eth_getWork', [], '')) + reactor.callLater(60, ping, f) + except Exception: + pass + +@defer.inlineCallbacks +def on_connect(f): + '''Callback when proxy get connected to the pool''' + log.info("Connected to Stratum pool at %s:%d" % f.main_host) + #reactor.callLater(30, f.client.transport.loseConnection) + + # Hook to on_connect again + f.on_connect.addCallback(on_connect) + + # Get first job and user_id + initial_job = (yield f.rpc('eth_submitLogin', [settings.WALLET, settings.CUSTOM_EMAIL], 'Proxy_'+version.VERSION)) + + reactor.callLater(0, ping, f) + + defer.returnValue(f) + +def on_disconnect(f): + '''Callback when proxy get disconnected from the pool''' + log.info("Disconnected from Stratum pool at %s:%d" % f.main_host) + f.on_disconnect.addCallback(on_disconnect) + + # Prepare to failover, currently works very bad + #if f.main_host==(settings.POOL_HOST, settings.POOL_PORT): + # main() + #else: + # f.is_reconnecting = False + #return f + +@defer.inlineCallbacks +def main(): + reactor.disconnectAll() + failover = False + if settings.POOL_FAILOVER_ENABLE: + failover = settings.failover_pool + settings.failover_pool = not settings.failover_pool + + pool_host = settings.POOL_HOST + pool_port = settings.POOL_PORT + if failover and settings.POOL_FAILOVER_ENABLE: + pool_host = settings.POOL_HOST_FAILOVER + pool_port = settings.POOL_PORT_FAILOVER + + log.warning("Ethereum Stratum proxy version: %s" % version.VERSION) + log.warning("Trying to connect to Stratum pool at %s:%d" % (pool_host, pool_port)) + + # Connect to Stratum pool, main monitoring connection + f = SocketTransportClientFactory(pool_host, pool_port, + debug=settings.DEBUG, proxy=None, + event_handler=client_service.ClientMiningService) + + job_registry = jobs.JobRegistry(f) + client_service.ClientMiningService.job_registry = job_registry + client_service.ClientMiningService.reset_timeout() + + f.on_connect.addCallback(on_connect) + f.on_disconnect.addCallback(on_disconnect) + # Cleanup properly on shutdown + reactor.addSystemEventTrigger('before', 'shutdown', on_shutdown, f) + + # Block until proxy connect to the pool + try: + yield f.on_connect + except TransportException: + log.warning("First pool server must be online first time to start failover") + return + + conn = reactor.listenTCP(settings.PORT, Site(getwork_listener.Root(job_registry, settings.ENABLE_WORKER_ID)), interface=settings.HOST) + + try: + conn.socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) # Enable keepalive packets + conn.socket.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 60) # Seconds before sending keepalive probes + conn.socket.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, 1) # Interval in seconds between keepalive probes + conn.socket.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 5) # Failed keepalive probles before declaring other end dead + except: + pass # Some socket features are not available on all platforms (you can guess which one) + + log.warning("-----------------------------------------------------------------------") + if settings.HOST == '0.0.0.0': + log.warning("PROXY IS LISTENING ON ALL IPs ON PORT %d" % settings.PORT) + else: + log.warning("LISTENING FOR MINERS ON http://%s:%d" % (settings.HOST, settings.PORT)) + log.warning("-----------------------------------------------------------------------") + log.warning("Wallet: %s" % settings.WALLET) + log.warning("Worker ID enabled: %s" % settings.ENABLE_WORKER_ID) + if settings.MONITORING: + log.warning("Email monitoring on %s" % settings.MONITORING_EMAIL) + else: + log.warning("Email monitoring diasbled") + #log.warning("Failover enabled: %" % settings.POOL_FAILOVER_ENABLE) + log.warning("-----------------------------------------------------------------------") + +if __name__ == '__main__': + fp = file("eth-proxy.pid", 'w') + fp.write(str(os.getpid())) + fp.close() + settings.failover_pool = False + main() + reactor.run() diff --git a/mining_libs/__init__.py b/mining_libs/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/mining_libs/client_service.py b/mining_libs/client_service.py new file mode 100644 index 0000000..8d08036 --- /dev/null +++ b/mining_libs/client_service.py @@ -0,0 +1,52 @@ +from twisted.internet import reactor + +from stratum.event_handler import GenericEventHandler +from jobs import Job +import version as _version + +import stratum.logger +log = stratum.logger.get_logger('proxy') + +class ClientMiningService(GenericEventHandler): + job_registry = None # Reference to JobRegistry instance + timeout = None # Reference to IReactorTime object + + @classmethod + def reset_timeout(cls): + if cls.timeout != None: + if not cls.timeout.called: + cls.timeout.cancel() + cls.timeout = None + + cls.timeout = reactor.callLater(960, cls.on_timeout) + + @classmethod + def on_timeout(cls): + ''' + Try to reconnect to the pool after 16 minutes of no activity on the connection. + It will also drop all Stratum connections to sub-miners + to indicate connection issues. + ''' + log.error("Connection to upstream pool timed out") + cls.reset_timeout() + cls.job_registry.f.reconnect() + + def handle_event(self, method, params, connection_ref): + '''Handle RPC calls and notifications from the pool''' + # Yay, we received something from the pool, + # let's restart the timeout. + self.reset_timeout() + + if method == 'eth_getWork': + '''Proxy just received information about new mining job''' + # Broadcast to getwork clients + job = Job.build_from_pool(params) + if stratum.logger.settings.DEBUG: + log.debug("NEW_JOB %s" % params) + else: + log.info("NEW_JOB") + self.job_registry.replace_job(job) + + else: + '''Pool just asked us for something which we don't support...''' + log.error("Unhandled method %s with params %s" % (method, params)) diff --git a/mining_libs/getwork_listener.py b/mining_libs/getwork_listener.py new file mode 100644 index 0000000..756f952 --- /dev/null +++ b/mining_libs/getwork_listener.py @@ -0,0 +1,64 @@ +import json +import time + +from twisted.internet import defer, threads +from twisted.web.resource import Resource +from twisted.web.server import NOT_DONE_YET + +import stratum.logger +log = stratum.logger.get_logger('proxy') + +class Root(Resource): + isLeaf = True + + def __init__(self, job_registry, enable_worker_id): + Resource.__init__(self) + self.job_registry = job_registry + self.isWorkerID = enable_worker_id + + def json_response(self, msg_id, result): + resp = json.dumps({'id': msg_id, 'jsonrpc': '2.0', 'result': result}) + return resp + + def json_error(self, msg_id, message): + resp = json.dumps({'id': msg_id, 'jsonrpc': '2.0', 'result': False, 'error': message}) + return resp + + def render_POST(self, request): + request.setHeader('content-type', 'application/json') + data = json.loads(request.content.read()) + + if not self.job_registry.jobs: + log.warning('Proxy is waiting for a job...') + request.write(self.json_error(data.get('id', 0), "Proxy is waiting for a job...")+'\n') + request.finish() + return NOT_DONE_YET + + if not data.has_key('method'): + response = self.json_error(data.get('id'), "Need methods")+'\n' + if data['method'] == 'eth_getWork': + response = self.json_response(data.get('id', 0), self.job_registry.jobs.params) + elif data['method'] == 'eth_submitWork' or data['method'] == 'eth_submitHashrate': + if self.isWorkerID: + worker_name = request.uri[1:15] + if not worker_name: + ip_temp = request.getClientIP().split('.') + worker_name = str( int(ip_temp[0])*16777216 + int(ip_temp[1])*65536 + int(ip_temp[2])*256 + int(ip_temp[3]) ) + else: + worker_name = '' + + if data['method'] == 'eth_submitWork': # ToFix!!!! + threads.deferToThread(self.job_registry.submit, data['method'], data['params'], worker_name) + response = self.json_response(data.get('id', 0), True) + else: + response = self.json_error(data.get('id'), "Unsupported method '%s'" % data['method']) + + try: + request.write(response+'\n') + request.finish() + return NOT_DONE_YET + except Exception: + return + + def render_GET(self, request): + return "Ethereum startum proxy" diff --git a/mining_libs/jobs.py b/mining_libs/jobs.py new file mode 100644 index 0000000..9ab7ef8 --- /dev/null +++ b/mining_libs/jobs.py @@ -0,0 +1,37 @@ +from twisted.internet import defer + +from stratum import settings +import stratum.logger +log = stratum.logger.get_logger('proxy') + +class Job(object): + def __init__(self): + self.params = '' + + @classmethod + def build_from_pool(cls, getWorkParams): + '''Build job object from Stratum server broadcast''' + job = Job() + job.params = getWorkParams + return job + +class JobRegistry(object): + def __init__(self, f): + self.f = f + self.jobs = None + # Hook for LP broadcasts + self.on_block = defer.Deferred() + + def replace_job(self, newjob): + self.jobs = newjob + # Force miners to reload jobs + on_block = self.on_block + self.on_block = defer.Deferred() + on_block.callback(True) + + def submit(self, method, params, worker_name): + if settings.DEBUG: + log.info("%s by %s %s" % (method, worker_name, params)) + else: + log.info("%s by %s" % (method, worker_name) ) + self.f.rpc(method, params, worker_name) diff --git a/mining_libs/version.py b/mining_libs/version.py new file mode 100644 index 0000000..d814d9d --- /dev/null +++ b/mining_libs/version.py @@ -0,0 +1 @@ +VERSION='0.0.3' diff --git a/stratum/__init__.py b/stratum/__init__.py new file mode 100644 index 0000000..552a734 --- /dev/null +++ b/stratum/__init__.py @@ -0,0 +1 @@ +from server import setup diff --git a/stratum/config_default.py b/stratum/config_default.py new file mode 100644 index 0000000..a3114d5 --- /dev/null +++ b/stratum/config_default.py @@ -0,0 +1,162 @@ +''' +This is example configuration for Stratum server. +Please rename it to config.py and fill correct values. +''' + +# ******************** GENERAL SETTINGS *************** + +# Enable some verbose debug (logging requests and responses). +DEBUG = False + +# Destination for application logs, files rotated once per day. +LOGDIR = 'log/' + +# Main application log file. +LOGFILE = None +LOG_TO_FILE = False + +# Possible values: DEBUG, INFO, WARNING, ERROR, CRITICAL +LOGLEVEL = 'INFO' + +# How many threads use for synchronous methods (services). +# 30 is enough for small installation, for real usage +# it should be slightly more, say 100-300. +THREAD_POOL_SIZE = 30 + +# RPC call throws TimeoutServiceException once total time since request has been +# placed (time to delivery to client + time for processing on the client) +# crosses _TOTAL (in second). +# _TOTAL reflects the fact that not all transports deliver RPC requests to the clients +# instantly, so request can wait some time in the buffer on server side. +# NOT IMPLEMENTED YET +#RPC_TIMEOUT_TOTAL = 600 + +# RPC call throws TimeoutServiceException once client is processing request longer +# than _PROCESS (in second) +# NOT IMPLEMENTED YET +#RPC_TIMEOUT_PROCESS = 30 + +# ******************** TRANSPORTS ********************* + +# Hostname or external IP to expose +HOSTNAME = 'stratum.example.com' + +# Port used for Socket transport. Use 'None' for disabling the transport. +LISTEN_SOCKET_TRANSPORT = 3333 + +# Port used for HTTP Poll transport. Use 'None' for disabling the transport +LISTEN_HTTP_TRANSPORT = 8000 + +# Port used for HTTPS Poll transport +LISTEN_HTTPS_TRANSPORT = 8001 + +# Port used for WebSocket transport, 'None' for disabling WS +LISTEN_WS_TRANSPORT = 8002 + +# Port used for secure WebSocket, 'None' for disabling WSS +LISTEN_WSS_TRANSPORT = 8003 + +# ******************** SSL SETTINGS ****************** + +# Private key and certification file for SSL protected transports +# You can find howto for generating self-signed certificate in README file +SSL_PRIVKEY = 'server.key' +SSL_CACERT = 'server.crt' + +# ******************** TCP SETTINGS ****************** + +# Enables support for socket encapsulation, which is compatible +# with haproxy 1.5+. By enabling this, first line of received +# data will represent some metadata about proxied stream: +# PROXY \n +# +# Full specification: http://haproxy.1wt.eu/download/1.5/doc/proxy-protocol.txt +TCP_PROXY_PROTOCOL = False + +# ******************** HTTP SETTINGS ***************** + +# Keepalive for HTTP transport sessions (at this time for both poll and push) +# High value leads to higher memory usage (all sessions are stored in memory ATM). +# Low value leads to more frequent session reinitializing (like downloading address history). +HTTP_SESSION_TIMEOUT = 3600 # in seconds + +# Maximum number of messages (notifications, responses) waiting to delivery to HTTP Poll clients. +# Buffer length is PER CONNECTION. High value will consume a lot of RAM, +# short history will cause that in some edge cases clients won't receive older events. +HTTP_BUFFER_LIMIT = 10000 + +# User agent used in HTTP requests (for both HTTP transports and for proxy calls from services) +#USER_AGENT = 'Stratum/0.1' +USER_AGENT = 'PoolServer' + +# Provide human-friendly user interface on HTTP transports for browsing exposed services. +BROWSER_ENABLE = True + +SIGNING_ID = None + +# ******************** BITCOIND SETTINGS ************ + +# Hostname and credentials for one trusted Bitcoin node ("Satoshi's client"). +# Stratum uses both P2P port (which is 8333 everytime) and RPC port +BITCOIN_TRUSTED_HOST = '127.0.0.1' +BITCOIN_TRUSTED_PORT = 8332 # RPC port +BITCOIN_TRUSTED_USER = 'stratum' +BITCOIN_TRUSTED_PASSWORD = '***somepassword***' + +# ******************** OTHER CORE SETTINGS ********************* +# Use "echo -n '' | sha256sum | cut -f1 -d' ' " +# for calculating SHA256 of your preferred password +ADMIN_PASSWORD_SHA256 = None # Admin functionality is disabled +#ADMIN_PASSWORD_SHA256 = '9e6c0c1db1e0dfb3fa5159deb4ecd9715b3c8cd6b06bd4a3ad77e9a8c5694219' # SHA256 of the password + +# IP from which admin calls are allowed. +# Set None to allow admin calls from all IPs +ADMIN_RESTRICT_INTERFACE = '127.0.0.1' + +# *********************** PEER CONFIGURATION ************* + +# Hardcoded list of Stratum nodes for clients to switch when this node is not available. +PEERS = [ + { + 'hostname': 'stratum.bitcoin.cz', + 'trusted': True, # This node is trustworthy + 'weight': -1, # Higher number means higher priority for selection. + # -1 will work mostly as a backup when other servers won't work. + }, +] +### +# Command line for miners: +# ethminer.exe -G -F http://YOUR_PROXY_IP:8080/ +# ethminer.exe -G -F http://YOUR_PROXY_IP:8080/rig1 +# +# You can submit shares without workername or +# You can provide workername: +# - with url like "/rig1" +# - or use automatically numbering(integer) based on IP of miner +### + +# Default settings for proxy + +# Host and port for your workers +HOST = "0.0.0.0" +PORT = 8080 + +# Coin address where money goes. +WALLET = '0x2a65aca4d5fc5b5c859090a6c34d164135398226' + +# It's useful for individually monitoring and statistic. +ENABLE_WORKER_ID = True + +# On DwarfPool you have option to monitor your workers via email. +# If WORKER_ID is enabled, you can monitor every worker/rig separately. +MONITORING = False +MONITORING_EMAIL = 'mail@example.com' + +# Main pool +POOL_HOST = 'eth-ru.dwarfpool.com' +POOL_PORT = 8008 + +# Failover pool. CURRENTLY DOESN'T WORK! +POOL_FAILOVER_ENABLE = False +POOL_HOST_FAILOVER = 'eth-eu.dwarfpool.com' +POOL_PORT_FAILOVER = 8008 diff --git a/stratum/connection_registry.py b/stratum/connection_registry.py new file mode 100644 index 0000000..5065f2c --- /dev/null +++ b/stratum/connection_registry.py @@ -0,0 +1,42 @@ +import weakref +from twisted.internet import reactor +from services import GenericService + +class ConnectionRegistry(object): + __connections = weakref.WeakKeyDictionary() + + @classmethod + def add_connection(cls, conn): + cls.__connections[conn] = True + + @classmethod + def remove_connection(cls, conn): + try: + del cls.__connections[conn] + except: + print "Warning: Cannot remove connection from ConnectionRegistry" + + @classmethod + def get_session(cls, conn): + if isinstance(conn, weakref.ref): + conn = conn() + + if isinstance(conn, GenericService): + conn = conn.connection_ref() + + if conn == None: + return None + + return conn.get_session() + + @classmethod + def iterate(cls): + return cls.__connections.iterkeyrefs() + +def dump_connections(): + for x in ConnectionRegistry.iterate(): + c = x() + c.transport.write('cus') + reactor.callLater(5, dump_connections) + +#reactor.callLater(0, dump_connections) diff --git a/stratum/custom_exceptions.py b/stratum/custom_exceptions.py new file mode 100644 index 0000000..d140adf --- /dev/null +++ b/stratum/custom_exceptions.py @@ -0,0 +1,44 @@ +class ProtocolException(Exception): + pass + +class TransportException(Exception): + pass + +class ServiceException(Exception): + code = -2 + +class UnauthorizedException(ServiceException): + pass + +class PubsubException(ServiceException): + pass + +class AlreadySubscribedException(PubsubException): + pass + +class MissingServiceTypeException(ServiceException): + code = -2 + +class MissingServiceVendorException(ServiceException): + code = -2 + +class MissingServiceIsDefaultException(ServiceException): + code = -2 + +class DefaultServiceAlreadyExistException(ServiceException): + code = -2 + +class ServiceNotFoundException(ServiceException): + code = -2 + +class MethodNotFoundException(ServiceException): + code = -3 + +class FeeRequiredException(ServiceException): + code = -10 + +class TimeoutServiceException(ServiceException): + pass + +class RemoteServiceException(Exception): + pass \ No newline at end of file diff --git a/stratum/event_handler.py b/stratum/event_handler.py new file mode 100644 index 0000000..c6ded12 --- /dev/null +++ b/stratum/event_handler.py @@ -0,0 +1,12 @@ +import custom_exceptions +from twisted.internet import defer +from services import wrap_result_object + +class GenericEventHandler(object): + def _handle_event(self, msg_method, msg_result, connection_ref): + return defer.maybeDeferred(wrap_result_object, self.handle_event(msg_method, msg_result, connection_ref)) + + def handle_event(self, msg_method, msg_params, connection_ref): + '''In most cases you'll only need to overload this method.''' + print "Other side called method", msg_method, "with params", msg_params + raise custom_exceptions.MethodNotFoundException("Method '%s' not implemented" % msg_method) \ No newline at end of file diff --git a/stratum/helpers.py b/stratum/helpers.py new file mode 100644 index 0000000..11d200b --- /dev/null +++ b/stratum/helpers.py @@ -0,0 +1,92 @@ +from zope.interface import implements +from twisted.internet import defer +from twisted.internet import reactor +from twisted.internet.protocol import Protocol +from twisted.web.iweb import IBodyProducer +from twisted.web.client import Agent +from twisted.web.http_headers import Headers + +import settings + +class ResponseCruncher(Protocol): + '''Helper for get_page()''' + def __init__(self, finished): + self.finished = finished + self.response = "" + + def dataReceived(self, data): + self.response += data + + def connectionLost(self, reason): + self.finished.callback(self.response) + +class StringProducer(object): + '''Helper for get_page()''' + implements(IBodyProducer) + + def __init__(self, body): + self.body = body + self.length = len(body) + + def startProducing(self, consumer): + consumer.write(self.body) + return defer.succeed(None) + + def pauseProducing(self): + pass + + def stopProducing(self): + pass + +@defer.inlineCallbacks +def get_page(url, method='GET', payload=None, headers=None): + '''Downloads the page from given URL, using asynchronous networking''' + agent = Agent(reactor) + + producer = None + if payload: + producer = StringProducer(payload) + + _headers = {'User-Agent': [settings.USER_AGENT,]} + if headers: + for key, value in headers.items(): + _headers[key] = [value,] + + response = (yield agent.request( + method, + str(url), + Headers(_headers), + producer)) + + #for h in response.headers.getAllRawHeaders(): + # print h + + try: + finished = defer.Deferred() + (yield response).deliverBody(ResponseCruncher(finished)) + except: + raise Exception("Downloading page '%s' failed" % url) + + defer.returnValue((yield finished)) + +@defer.inlineCallbacks +def ask_old_server(method, *args): + '''Perform request in old protocol to electrum servers. + This is deprecated, used only for proxying some calls.''' + import urllib + import ast + + # Hack for methods without arguments + if not len(args): + args = ['',] + + res = (yield get_page('http://electrum.bitcoin.cz/electrum.php', method='POST', + headers={"Content-type": "application/x-www-form-urlencoded", "Accept": "text/plain"}, + payload=urllib.urlencode({'q': repr([method,] + list(args))}))) + + try: + data = ast.literal_eval(res) + except SyntaxError: + print "Data received from server:", res + raise Exception("Corrupted data from old electrum server") + defer.returnValue(data) diff --git a/stratum/http_transport.py b/stratum/http_transport.py new file mode 100644 index 0000000..b855306 --- /dev/null +++ b/stratum/http_transport.py @@ -0,0 +1,223 @@ +from twisted.web.resource import Resource +from twisted.web.server import Request, Session, NOT_DONE_YET +from twisted.internet import defer +from twisted.python.failure import Failure +import hashlib +import json +import string + +import helpers +import semaphore +#from storage import Storage +from protocol import Protocol, RequestCounter +from event_handler import GenericEventHandler +import settings + +import logger +log = logger.get_logger('http_transport') + +class Transport(object): + def __init__(self, session_id, lock): + self.buffer = [] + self.session_id = session_id + self.lock = lock + self.push_url = None # None or full URL for HTTP Push + self.peer = None + + # For compatibility with generic transport, not used in HTTP transport + self.disconnecting = False + + def getPeer(self): + return self.peer + + def write(self, data): + if len(self.buffer) >= settings.HTTP_BUFFER_LIMIT: + # Drop first (oldest) item in buffer + # if buffer crossed allowed limit. + # This isn't totally exact, because one record in buffer + # can teoretically contains more than one message (divided by \n), + # but current server implementation don't store responses in this way, + # so counting exact number of messages will lead to unnecessary overhead. + self.buffer.pop(0) + + self.buffer.append(data) + + if not self.lock.is_locked() and self.push_url: + # Push the buffer to callback URL + # TODO: Buffer responses and perform callbgitacks in batches + self.push_buffer() + + def push_buffer(self): + '''Push the content of the buffer into callback URL''' + if not self.push_url: + return + + # FIXME: Don't expect any response + helpers.get_page(self.push_url, method='POST', + headers={"content-type": "application/stratum", + "x-session-id": self.session_id}, + payload=self.fetch_buffer()) + + def fetch_buffer(self): + ret = ''.join(self.buffer) + self.buffer = [] + return ret + + def set_push_url(self, url): + self.push_url = url + +def monkeypatch_method(cls): + '''Perform monkey patch for given class.''' + def decorator(func): + setattr(cls, func.__name__, func) + return func + return decorator + +@monkeypatch_method(Request) +def getSession(self, sessionInterface=None, cookie_prefix='TWISTEDSESSION'): + '''Monkey patch for Request object, providing backward-compatible + getSession method which can handle custom cookie as a session ID + (which is necessary for following Stratum protocol specs). + Unfortunately twisted developers rejected named-cookie feature, + which is pressing me into this ugly solution... + + TODO: Especially this would deserve some unit test to be sure it doesn't break + in future twisted versions. + ''' + # Session management + if not self.session: + cookiename = string.join([cookie_prefix] + self.sitepath, "_") + sessionCookie = self.getCookie(cookiename) + if sessionCookie: + try: + self.session = self.site.getSession(sessionCookie) + except KeyError: + pass + # if it still hasn't been set, fix it up. + if not self.session: + self.session = self.site.makeSession() + self.addCookie(cookiename, self.session.uid, path='/') + self.session.touch() + if sessionInterface: + return self.session.getComponent(sessionInterface) + return self.session + +class HttpSession(Session): + sessionTimeout = settings.HTTP_SESSION_TIMEOUT + + def __init__(self, *args, **kwargs): + Session.__init__(self, *args, **kwargs) + #self.storage = Storage() + + # Reference to connection object (Protocol instance) + self.protocol = None + + # Synchronizing object for avoiding race condition on session + self.lock = semaphore.Semaphore(1) + + # Output buffering + self.transport = Transport(self.uid, self.lock) + + # Setup cleanup method on session expiration + self.notifyOnExpire(lambda: HttpSession.on_expire(self)) + + @classmethod + def on_expire(cls, sess_obj): + # FIXME: Close protocol connection + print "EXPIRING SESSION", sess_obj + + if sess_obj.protocol: + sess_obj.protocol.connectionLost(Failure(Exception("HTTP session closed"))) + + sess_obj.protocol = None + +class Root(Resource): + isLeaf = True + + def __init__(self, debug=False, signing_key=None, signing_id=None, + event_handler=GenericEventHandler): + Resource.__init__(self) + self.signing_key = signing_key + self.signing_id = signing_id + self.debug = debug # This class acts as a 'factory', debug is used by Protocol + self.event_handler = event_handler + + def render_GET(self, request): + if not settings.BROWSER_ENABLE: + return "Welcome to %s server. Use HTTP POST to talk with the server." % settings.USER_AGENT + + # TODO: Web browser + return "Web browser not implemented yet" + + def render_OPTIONS(self, request): + session = request.getSession(cookie_prefix='STRATUM_SESSION') + + request.setHeader('server', settings.USER_AGENT) + request.setHeader('x-session-timeout', session.sessionTimeout) + request.setHeader('access-control-allow-origin', '*') # Allow access from any other domain + request.setHeader('access-control-allow-methods', 'POST, OPTIONS') + request.setHeader('access-control-allow-headers', 'Content-Type') + return '' + + def render_POST(self, request): + session = request.getSession(cookie_prefix='STRATUM_SESSION') + + l = session.lock.acquire() + l.addCallback(self._perform_request, request, session) + return NOT_DONE_YET + + def _perform_request(self, _, request, session): + request.setHeader('content-type', 'application/stratum') + request.setHeader('server', settings.USER_AGENT) + request.setHeader('x-session-timeout', session.sessionTimeout) + request.setHeader('access-control-allow-origin', '*') # Allow access from any other domain + + # Update client's IP address + session.transport.peer = request.getHost() + + # Although it isn't intuitive at all, request.getHeader reads request headers, + # but request.setHeader (few lines above) writes response headers... + if 'application/stratum' not in request.getHeader('content-type'): + session.transport.write("%s\n" % json.dumps({'id': None, 'result': None, 'error': (-1, "Content-type must be 'application/stratum'. See http://stratum.bitcoin.cz for more info.", "")})) + self._finish(None, request, session.transport, session.lock) + return + + if not session.protocol: + # Build a "protocol connection" + proto = Protocol() + proto.transport = session.transport + proto.factory = self + proto.connectionMade() + session.protocol = proto + else: + proto = session.protocol + + # Update callback URL if presented + callback_url = request.getHeader('x-callback-url') + if callback_url != None: + if callback_url == '': + # Blank value of callback URL switches HTTP Push back to HTTP Poll + session.transport.push_url = None + else: + session.transport.push_url = callback_url + + data = request.content.read() + if data: + counter = RequestCounter() + counter.on_finish.addCallback(self._finish, request, session.transport, session.lock) + proto.dataReceived(data, request_counter=counter) + else: + # Ping message (empty request) of HTTP Polling + self._finish(None, request, session.transport, session.lock) + + + @classmethod + def _finish(cls, _, request, transport, lock): + # First parameter is callback result; not used here + data = transport.fetch_buffer() + request.setHeader('content-length', len(data)) + request.setHeader('content-md5', hashlib.md5(data).hexdigest()) + request.setHeader('x-content-sha256', hashlib.sha256(data).hexdigest()) + request.write(data) + request.finish() + lock.release() diff --git a/stratum/jsonical.py b/stratum/jsonical.py new file mode 100644 index 0000000..3c9cc78 --- /dev/null +++ b/stratum/jsonical.py @@ -0,0 +1,116 @@ +# Copyright 2009 New England Biolabs +# +# This file is part of the nebgbhist package released under the MIT license. +# +r"""Canonical JSON serialization. + +Basic approaches for implementing canonical JSON serialization. + +Encoding basic Python object hierarchies:: + + >>> import jsonical + >>> jsonical.dumps(['foo', {'bar': ('baz', None, 1.0, 2)}]) + '["foo",{"bar":["baz",null,1.0,2]}]' + >>> print jsonical.dumps("\"foo\bar") + "\"foo\bar" + >>> print jsonical.dumps(u'\u1234') + "\u1234" + >>> print jsonical.dumps('\\') + "\\" + >>> print jsonical.dumps({"c": 0, "b": 0, "a": 0}) + {"a":0,"b":0,"c":0} + >>> from StringIO import StringIO + >>> io = StringIO() + >>> json.dump(['streaming API'], io) + >>> io.getvalue() + '["streaming API"]' + +Decoding JSON:: + + >>> import jsonical + >>> jsonical.loads('["foo", {"bar":["baz", null, 1.0, 2]}]') + [u'foo', {u'bar': [u'baz', None, Decimal('1.0'), 2]}] + >>> jsonical.loads('"\\"foo\\bar"') + u'"foo\x08ar' + >>> from StringIO import StringIO + >>> io = StringIO('["streaming API"]') + >>> jsonical.load(io) + [u'streaming API'] + +Using jsonical from the shell to canonicalize: + + $ echo '{"json":"obj","bar":2.333333}' | python -mjsonical + {"bar":2.333333,"json":"obj"} + $ echo '{1.2:3.4}' | python -mjson.tool + Expecting property name: line 1 column 2 (char 2) + +""" +import datetime +import decimal +import sys +import types +import unicodedata + +try: + import json +except ImportError: + import simplejson as json + +class Encoder(json.JSONEncoder): + def __init__(self, *args, **kwargs): + kwargs.pop("sort_keys", None) + super(Encoder, self).__init__(sort_keys=True, *args, **kwargs) + + def default(self, obj): + """This is slightly different than json.JSONEncoder.default(obj) + in that it should returned the serialized representation of the + passed object, not a serializable representation. + """ + if isinstance(obj, (datetime.date, datetime.time, datetime.datetime)): + return '"%s"' % obj.isoformat() + elif isinstance(obj, unicode): + return '"%s"' % unicodedata.normalize('NFD', obj).encode('utf-8') + elif isinstance(obj, decimal.Decimal): + return str(obj) + return super(Encoder, self).default(obj) + + def _iterencode_default(self, o, markers=None): + yield self.default(o) + +def dump(obj, fp, indent=None): + return json.dump(obj, fp, separators=(',', ':'), indent=indent, cls=Encoder) + +def dumps(obj, indent=None): + return json.dumps(obj, separators=(',', ':'), indent=indent, cls=Encoder) + +class Decoder(json.JSONDecoder): + def raw_decode(self, s, **kw): + obj, end = super(Decoder, self).raw_decode(s, **kw) + if isinstance(obj, types.StringTypes): + obj = unicodedata.normalize('NFD', unicode(obj)) + return obj, end + +def load(fp): + return json.load(fp, cls=Decoder, parse_float=decimal.Decimal) + +def loads(s): + return json.loads(s, cls=Decoder, parse_float=decimal.Decimal) + +def tool(): + infile = sys.stdin + outfile = sys.stdout + if len(sys.argv) > 1: + infile = open(sys.argv[1], 'rb') + if len(sys.argv) > 2: + outfile = open(sys.argv[2], 'wb') + if len(sys.argv) > 3: + raise SystemExit("{0} [infile [outfile]]".format(sys.argv[0])) + try: + obj = load(infile) + except ValueError, e: + raise SystemExit(e) + dump(obj, outfile) + outfile.write('\n') + +if __name__ == '__main__': + tool() \ No newline at end of file diff --git a/stratum/logger.py b/stratum/logger.py new file mode 100644 index 0000000..5534ee3 --- /dev/null +++ b/stratum/logger.py @@ -0,0 +1,49 @@ +'''Simple wrapper around python's logging package''' + +import os +import logging +from twisted.python import log as twisted_log + +import settings + +''' +class Logger(object): + def debug(self, msg): + twisted_log.msg(msg) + + def info(self, msg): + twisted_log.msg(msg) + + def warning(self, msg): + twisted_log.msg(msg) + + def error(self, msg): + twisted_log.msg(msg) + + def critical(self, msg): + twisted_log.msg(msg) +''' + +def get_logger(name): + logger = logging.getLogger(name) + logger.addHandler(stream_handler) + logger.setLevel(getattr(logging, settings.LOGLEVEL)) + + if settings.LOGFILE != None: + logger.addHandler(file_handler) + + logger.debug("Logging initialized") + return logger + #return Logger() + +if settings.DEBUG: + fmt = logging.Formatter("%(asctime)s %(levelname)s %(name)s %(module)s.%(funcName)s # %(message)s") +else: + fmt = logging.Formatter("%(asctime)s %(levelname)s %(name)s # %(message)s") + +if settings.LOGFILE != None: + file_handler = logging.FileHandler(os.path.join(settings.LOGDIR, settings.LOGFILE)) + file_handler.setFormatter(fmt) + +stream_handler = logging.StreamHandler() +stream_handler.setFormatter(fmt) \ No newline at end of file diff --git a/stratum/protocol.py b/stratum/protocol.py new file mode 100644 index 0000000..4b1ef05 --- /dev/null +++ b/stratum/protocol.py @@ -0,0 +1,324 @@ +import json +import time +import socket + +from twisted.protocols.basic import LineOnlyReceiver +from twisted.internet import defer, reactor, error +from twisted.python.failure import Failure + +import stats +import custom_exceptions +import connection_registry +import settings + +import logger +log = logger.get_logger('protocol') + +class RequestCounter(object): + def __init__(self): + self.on_finish = defer.Deferred() + self.counter = 0 + + def set_count(self, cnt): + self.counter = cnt + + def decrease(self): + self.counter -= 1 + if self.counter <= 0: + self.finish() + + def finish(self): + if not self.on_finish.called: + self.on_finish.callback(True) + +class Protocol(LineOnlyReceiver): + delimiter = '\n' + + def _get_id(self): + self.request_id += 1 + if self.request_id>65534: + self.request_id = 2 + return self.request_id + + def _get_ip(self): + return self.proxied_ip or self.transport.getPeer().host + + def get_ident(self): + # Get global unique ID of connection + return "%s:%s" % (self.proxied_ip or self.transport.getPeer().host, "%x" % id(self)) + + def get_session(self): + return self.session + + def connectionMade(self): + try: + self.transport.setTcpNoDelay(True) + self.transport.setTcpKeepAlive(True) + self.transport.socket.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 120) # Seconds before sending keepalive probes + self.transport.socket.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, 1) # Interval in seconds between keepalive probes + self.transport.socket.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 5) # Failed keepalive probles before declaring other end dead + except: + # Supported only by the socket transport, + # but there's really no better place in code to trigger this. + pass + + # Read settings.TCP_PROXY_PROTOCOL documentation + self.expect_tcp_proxy_protocol_header = self.factory.__dict__.get('tcp_proxy_protocol_enable', False) + self.proxied_ip = None # IP obtained from TCP proxy protocol + + self.request_id = 1 + self.lookup_table = {} + self.event_handler = self.factory.event_handler() + self.on_disconnect = defer.Deferred() + self.on_finish = None # Will point to defer which is called + # once all client requests are processed + + # Initiate connection session + self.session = {} + + stats.PeerStats.client_connected(self._get_ip()) + log.debug("Connected %s" % self.transport.getPeer().host) + connection_registry.ConnectionRegistry.add_connection(self) + + def transport_write(self, data): + '''Overwrite this if transport needs some extra care about data written + to the socket, like adding message format in websocket.''' + try: + self.transport.write(data) + except AttributeError: + # Transport is disconnected + pass + + def connectionLost(self, reason): + if self.on_disconnect != None and not self.on_disconnect.called: + self.on_disconnect.callback(self) + self.on_disconnect = None + + stats.PeerStats.client_disconnected(self._get_ip()) + connection_registry.ConnectionRegistry.remove_connection(self) + self.transport = None # Fixes memory leak (cyclic reference) + + def writeJsonRequest(self, method, params, worker, is_notification=False): + request_id = None if is_notification else self._get_id() + serialized = json.dumps({'id': request_id, 'method': method, 'params': params, 'jsonrpc':'2.0', 'worker': worker}) + + if self.factory.debug: + log.debug("< %s" % serialized) + + self.transport_write("%s\n" % serialized) + return request_id + + def writeJsonResponse(self, data, message_id): + if not data: + return + serialized = json.dumps({'id': message_id, 'result': data, 'error': None, 'jsonrpc':'2.0'}) + + if self.factory.debug: + log.debug("< %s" % serialized) + + self.transport_write("%s\n" % serialized) + + def writeJsonError(self, code, message, traceback, message_id): + serialized = json.dumps({'id': message_id, 'result': None, 'error': (code, message, traceback)}) + self.transport_write("%s\n" % serialized) + + def writeGeneralError(self, message, code=-1): + log.error(message) + return self.writeJsonError(code, message, None, None) + + def process_response(self, data, message_id, sign_method, sign_params, request_counter): + self.writeJsonResponse(data.result, message_id) + request_counter.decrease() + + + def process_failure(self, failure, message_id, request_counter): + if not isinstance(failure.value, custom_exceptions.ServiceException): + # All handled exceptions should inherit from ServiceException class. + # Throwing other exception class means that it is unhandled error + # and we should log it. + log.exception(failure) + + code = getattr(failure.value, 'code', -1) + + if message_id != None: + # Other party doesn't care of error state for notifications + if settings.DEBUG: + tb = failure.getBriefTraceback() + else: + tb = None + self.writeJsonError(code, failure.getErrorMessage(), tb, message_id) + + request_counter.decrease() + + def dataReceived(self, data, request_counter=None): + '''Original code from Twisted, hacked for request_counter proxying. + request_counter is hack for HTTP transport, didn't found cleaner solution how + to indicate end of request processing in asynchronous manner. + + TODO: This would deserve some unit test to be sure that future twisted versions + will work nicely with this.''' + + if request_counter == None: + request_counter = RequestCounter() + + lines = (self._buffer+data).split(self.delimiter) + self._buffer = lines.pop(-1) + request_counter.set_count(len(lines)) + self.on_finish = request_counter.on_finish + + for line in lines: + if self.transport.disconnecting: + request_counter.finish() + return + if len(line) > self.MAX_LENGTH: + request_counter.finish() + return self.lineLengthExceeded(line) + elif line: + try: + self.lineReceived(line, request_counter) + except Exception as exc: + request_counter.finish() + #log.exception("Processing of message failed") + log.warning("Failed message: %s from %s" % (str(exc), self._get_ip())) + return error.ConnectionLost('Processing of message failed') + + if len(self._buffer) > self.MAX_LENGTH: + request_counter.finish() + return self.lineLengthExceeded(self._buffer) + + def lineReceived(self, line, request_counter): + if self.expect_tcp_proxy_protocol_header: + # This flag may be set only for TCP transport AND when TCP_PROXY_PROTOCOL + # is enabled in server config. Then we expect the first line of the stream + # may contain proxy metadata. + + # We don't expect this header during this session anymore + self.expect_tcp_proxy_protocol_header = False + + if line.startswith('PROXY'): + self.proxied_ip = line.split()[2] + + # Let's process next line + request_counter.decrease() + return + + try: + message = json.loads(line) + except: + #self.writeGeneralError("Cannot decode message '%s'" % line) + request_counter.finish() + raise custom_exceptions.ProtocolException("Cannot decode message '%s'" % line.strip()) + + if self.factory.debug: + log.debug("> %s" % message) + + msg_id = message.get('id', 0) + #msg_method = message.get('method', None) + #msg_params = message.get('params', None) + msg_result = message.get('result', None) + msg_error = message.get('error', None) + + # If there's an error, handle it as errback + if msg_error != None: + meta['defer'].errback(custom_exceptions.RemoteServiceException(msg_error)) + return + + if not msg_id: + # It's a RPC newWork notification + try: + result = self.event_handler._handle_event("eth_getWork", msg_result, connection_ref=self) + except: + failure = Failure() + self.process_failure(failure, msg_id, request_counter) + else: + # It's notification, don't expect the response + request_counter.decrease() + + elif msg_id: + # It's a RPC response + # Perform lookup to the table of waiting requests. + request_counter.decrease() + + try: + meta = self.lookup_table[msg_id] + if meta['method'] == "eth_submitWork": + response_time = (time.time() - meta['start_time']) * 1000 + if msg_result == True: + log.info("[%dms] %s from '%s' accepted" % (response_time, meta['method'], meta['worker_name'])) + else: + log.warning("[%dms] %s from '%s' REJECTED" % (response_time, meta['method'], meta['worker_name'])) + del self.lookup_table[msg_id] + except KeyError: + # When deferred object for given message ID isn't found, it's an error + raise custom_exceptions.ProtocolException("Lookup for deferred object for message ID '%s' failed." % msg_id) + + # If both result and error are null, handle it as a success with blank result + meta['defer'].callback(msg_result) + if not isinstance(msg_result, bool): + try: + result = self.event_handler._handle_event("eth_getWork", msg_result, connection_ref=self) + if result == None: + # event handler must return Deferred object or raise an exception for RPC request + raise custom_exceptions.MethodNotFoundException("Event handler cannot process '%s'" % msg_result) + except: + pass + + #else: + # request_counter.decrease() + # raise custom_exceptions.ProtocolException("Cannot handle message '%s'" % line) + + def rpc(self, method, params, worker, is_notification=False): + ''' + This method performs remote RPC call. + + If method should expect an response, it store + request ID to lookup table and wait for corresponding + response message. + ''' + + request_id = self.writeJsonRequest(method, params, worker, is_notification) + + if is_notification: + return + + d = defer.Deferred() + start_time = time.time() + self.lookup_table[request_id] = {'defer': d, 'method': method, 'params': params, 'start_time':start_time, 'worker_name':worker} + return d + +class ClientProtocol(Protocol): + def connectionMade(self): + Protocol.connectionMade(self) + self.factory.client = self + + if self.factory.timeout_handler: + self.factory.timeout_handler.cancel() + self.factory.timeout_handler = None + + if isinstance(getattr(self.factory, 'after_connect', None), list): + log.debug("Resuming connection: %s" % self.factory.after_connect) + for cmd in self.factory.after_connect: + self.rpc(cmd[0], cmd[1], cmd[2]) + + if not self.factory.on_connect.called: + d = self.factory.on_connect + self.factory.on_connect = defer.Deferred() + d.callback(self.factory) + + + #d = self.rpc('node.get_peers', []) + #d.addCallback(self.factory.add_peers) + + def connectionLost(self, reason): + self.factory.client = None + + if self.factory.timeout_handler: + self.factory.timeout_handler.cancel() + self.factory.timeout_handler = None + + if not self.factory.on_disconnect.called: + d = self.factory.on_disconnect + self.factory.on_disconnect = defer.Deferred() + d.callback(self.factory) + + Protocol.connectionLost(self, reason) diff --git a/stratum/pubsub.py b/stratum/pubsub.py new file mode 100644 index 0000000..057b491 --- /dev/null +++ b/stratum/pubsub.py @@ -0,0 +1,180 @@ +import weakref +from connection_registry import ConnectionRegistry +import custom_exceptions +import hashlib + +def subscribe(func): + '''Decorator detect Subscription object in result and subscribe connection''' + def inner(self, *args, **kwargs): + subs = func(self, *args, **kwargs) + return Pubsub.subscribe(self.connection_ref(), subs) + return inner + +def unsubscribe(func): + '''Decorator detect Subscription object in result and unsubscribe connection''' + def inner(self, *args, **kwargs): + subs = func(self, *args, **kwargs) + if isinstance(subs, Subscription): + return Pubsub.unsubscribe(self.connection_ref(), subscription=subs) + else: + return Pubsub.unsubscribe(self.connection_ref(), key=subs) + return inner + +class Subscription(object): + def __init__(self, event=None, **params): + if hasattr(self, 'event'): + if event: + raise Exception("Event name already defined in Subscription object") + else: + if not event: + raise Exception("Please define event name in constructor") + else: + self.event = event + + self.params = params # Internal parameters for subscription object + self.connection_ref = None + + def process(self, *args, **kwargs): + return args + + def get_key(self): + '''This is an identifier for current subscription. It is sent to the client, + so result should not contain any sensitive information.''' + #return hashlib.md5(str((self.event, self.params))).hexdigest() + return "%s" % int(hashlib.md5( str((self.event, self.params)) ).hexdigest()[:12], 16) + + def get_session(self): + '''Connection session may be useful in filter or process functions''' + return self.connection_ref().get_session() + + @classmethod + def emit(cls, *args, **kwargs): + '''Shortcut for emiting this event to all subscribers.''' + if not hasattr(cls, 'event'): + raise Exception("Subscription.emit() can be used only for subclasses with filled 'event' class variable.") + return Pubsub.emit(cls.event, *args, **kwargs) + + def emit_single(self, *args, **kwargs): + '''Perform emit of this event just for current subscription.''' + conn = self.connection_ref() + if conn == None: + # Connection is closed + return + + payload = self.process(*args, **kwargs) + if payload != None: + if isinstance(payload, (tuple, list)): + if len(payload)==1 and isinstance(payload[0], dict): + payload = payload[0] + conn.writeJsonRequest(self.event, payload, '', is_notification=True) + self.after_emit(*args, **kwargs) + else: + raise Exception("Return object from process() method must be list or None") + + def after_emit(self, *args, **kwargs): + pass + + # Once function is defined, it will be called every time + #def after_subscribe(self, _): + # pass + + def __eq__(self, other): + return (isinstance(other, Subscription) and other.get_key() == self.get_key()) + + def __ne__(self, other): + return not self.__eq__(other) + +class Pubsub(object): + __subscriptions = {} + + @classmethod + def subscribe(cls, connection, subscription): + if connection == None: + raise custom_exceptions.PubsubException("Subscriber not connected") + + key = subscription.get_key() + session = ConnectionRegistry.get_session(connection) + if session == None: + raise custom_exceptions.PubsubException("No session found") + + subscription.connection_ref = weakref.ref(connection) + session.setdefault('subscriptions', {}) + + if key in session['subscriptions']: + raise custom_exceptions.AlreadySubscribedException("This connection is already subscribed for such event.") + + session['subscriptions'][key] = subscription + + cls.__subscriptions.setdefault(subscription.event, weakref.WeakKeyDictionary()) + cls.__subscriptions[subscription.event][subscription] = None + + if hasattr(subscription, 'after_subscribe'): + if connection.on_finish != None: + # If subscription is processed during the request, wait to + # finish and then process the callback + connection.on_finish.addCallback(subscription.after_subscribe) + else: + # If subscription is NOT processed during the request (any real use case?), + # process callback instantly (better now than never). + subscription.after_subscribe(True) + + # List of 2-tuples is prepared for future multi-subscriptions + return ((subscription.event, key, subscription),) + + @classmethod + def unsubscribe(cls, connection, subscription=None, key=None): + if connection == None: + raise custom_exceptions.PubsubException("Subscriber not connected") + + session = ConnectionRegistry.get_session(connection) + if session == None: + raise custom_exceptions.PubsubException("No session found") + + if subscription: + key = subscription.get_key() + + try: + # Subscription don't need to be removed from cls.__subscriptions, + # because it uses weak reference there. + del session['subscriptions'][key] + except KeyError: + print "Warning: Cannot remove subscription from connection session" + return False + + return True + + @classmethod + def get_subscription_count(cls, event): + return len(cls.__subscriptions.get(event, {})) + + @classmethod + def get_subscription(cls, connection, event, key=None): + '''Return subscription object for given connection and event''' + session = ConnectionRegistry.get_session(connection) + if session == None: + raise custom_exceptions.PubsubException("No session found") + + if key == None: + sub = [ sub for sub in session.get('subscriptions', {}).values() if sub.event == event ] + try: + return sub[0] + except IndexError: + raise custom_exceptions.PubsubException("Not subscribed for event %s" % event) + + else: + raise Exception("Searching subscriptions by key is not implemented yet") + + @classmethod + def iterate_subscribers(cls, event): + for subscription in cls.__subscriptions.get(event, weakref.WeakKeyDictionary()).iterkeyrefs(): + subscription = subscription() + if subscription == None: + # Subscriber is no more connected + continue + + yield subscription + + @classmethod + def emit(cls, event, *args, **kwargs): + for subscription in cls.iterate_subscribers(event): + subscription.emit_single(*args, **kwargs) \ No newline at end of file diff --git a/stratum/semaphore.py b/stratum/semaphore.py new file mode 100644 index 0000000..a260311 --- /dev/null +++ b/stratum/semaphore.py @@ -0,0 +1,54 @@ +from twisted.internet import defer + +class Semaphore: + """A semaphore for event driven systems.""" + + def __init__(self, tokens): + self.waiting = [] + self.tokens = tokens + self.limit = tokens + + def is_locked(self): + return (bool)(not self.tokens) + + def acquire(self): + """Attempt to acquire the token. + + @return Deferred which returns on token acquisition. + """ + assert self.tokens >= 0 + d = defer.Deferred() + if not self.tokens: + self.waiting.append(d) + else: + self.tokens = self.tokens - 1 + d.callback(self) + return d + + def release(self): + """Release the token. + + Should be called by whoever did the acquire() when the shared + resource is free. + """ + assert self.tokens < self.limit + self.tokens = self.tokens + 1 + if self.waiting: + # someone is waiting to acquire token + self.tokens = self.tokens - 1 + d = self.waiting.pop(0) + d.callback(self) + + def _releaseAndReturn(self, r): + self.release() + return r + + def run(self, f, *args, **kwargs): + """Acquire token, run function, release token. + + @return Deferred of function result. + """ + d = self.acquire() + d.addCallback(lambda r: defer.maybeDeferred(f, *args, + **kwargs).addBoth(self._releaseAndReturn)) + return d diff --git a/stratum/server.py b/stratum/server.py new file mode 100644 index 0000000..61c9a44 --- /dev/null +++ b/stratum/server.py @@ -0,0 +1,109 @@ +def setup(setup_event=None): + try: + from twisted.internet import epollreactor + epollreactor.install() + except ImportError: + print "Failed to install epoll reactor, default reactor will be used instead." + + try: + import settings + except ImportError: + print "***** Is configs.py missing? Maybe you want to copy and customize config_default.py?" + + from twisted.application import service + application = service.Application("stratum-server") + + # Setting up logging + from twisted.python.log import ILogObserver, FileLogObserver + from twisted.python.logfile import DailyLogFile + + #logfile = DailyLogFile(settings.LOGFILE, settings.LOGDIR) + #application.setComponent(ILogObserver, FileLogObserver(logfile).emit) + + if setup_event == None: + setup_finalize(None, application) + else: + setup_event.addCallback(setup_finalize, application) + + return application + +def setup_finalize(event, application): + + from twisted.application import service, internet + from twisted.internet import reactor, ssl + from twisted.web.server import Site + from twisted.python import log + #from twisted.enterprise import adbapi + import OpenSSL.SSL + + from services import ServiceEventHandler + + import socket_transport + import http_transport + import websocket_transport + + from stratum import settings + + signing_key = None + + # Attach HTTPS Poll Transport service to application + try: + sslContext = ssl.DefaultOpenSSLContextFactory(settings.SSL_PRIVKEY, settings.SSL_CACERT) + except OpenSSL.SSL.Error: + sslContext = None + print "Cannot initiate SSL context, are SSL_PRIVKEY or SSL_CACERT missing?" + print "This will skip all SSL-based transports." + + # Set up thread pool size for service threads + reactor.suggestThreadPoolSize(settings.THREAD_POOL_SIZE) + + if settings.LISTEN_SOCKET_TRANSPORT: + # Attach Socket Transport service to application + socket = internet.TCPServer(settings.LISTEN_SOCKET_TRANSPORT, + socket_transport.SocketTransportFactory(debug=settings.DEBUG, + signing_key=signing_key, + signing_id=settings.SIGNING_ID, + event_handler=ServiceEventHandler, + tcp_proxy_protocol_enable=settings.TCP_PROXY_PROTOCOL)) + socket.setServiceParent(application) + + # Build the HTTP interface + httpsite = Site(http_transport.Root(debug=settings.DEBUG, signing_key=signing_key, signing_id=settings.SIGNING_ID, + event_handler=ServiceEventHandler)) + httpsite.sessionFactory = http_transport.HttpSession + + if settings.LISTEN_HTTP_TRANSPORT: + # Attach HTTP Poll Transport service to application + http = internet.TCPServer(settings.LISTEN_HTTP_TRANSPORT, httpsite) + http.setServiceParent(application) + + if settings.LISTEN_HTTPS_TRANSPORT and sslContext: + https = internet.SSLServer(settings.LISTEN_HTTPS_TRANSPORT, httpsite, contextFactory = sslContext) + https.setServiceParent(application) + + if settings.LISTEN_WS_TRANSPORT: + from autobahn.websocket import listenWS + log.msg("Starting WS transport on %d" % settings.LISTEN_WS_TRANSPORT) + ws = websocket_transport.WebsocketTransportFactory(settings.LISTEN_WS_TRANSPORT, + debug=settings.DEBUG, + signing_key=signing_key, + signing_id=settings.SIGNING_ID, + event_handler=ServiceEventHandler, + tcp_proxy_protocol_enable=settings.TCP_PROXY_PROTOCOL) + listenWS(ws) + + if settings.LISTEN_WSS_TRANSPORT and sslContext: + from autobahn.websocket import listenWS + log.msg("Starting WSS transport on %d" % settings.LISTEN_WSS_TRANSPORT) + wss = websocket_transport.WebsocketTransportFactory(settings.LISTEN_WSS_TRANSPORT, is_secure=True, + debug=settings.DEBUG, + signing_key=signing_key, + signing_id=settings.SIGNING_ID, + event_handler=ServiceEventHandler) + listenWS(wss, contextFactory=sslContext) + + + return event + +if __name__ == '__main__': + print "This is not executable script. Try 'twistd -ny launcher.tac instead!" \ No newline at end of file diff --git a/stratum/services.py b/stratum/services.py new file mode 100644 index 0000000..9ad65aa --- /dev/null +++ b/stratum/services.py @@ -0,0 +1,240 @@ +from twisted.internet import defer, threads +from twisted.python import log +import hashlib +import weakref +import re + +import custom_exceptions + +VENDOR_RE = re.compile(r'\[(.*)\]') + +class ServiceEventHandler(object): # reimplements event_handler.GenericEventHandler + def _handle_event(self, msg_method, msg_params, connection_ref): + return ServiceFactory.call(msg_method, msg_params, connection_ref=connection_ref) + +class ResultObject(object): + def __init__(self, result=None, sign=False, sign_algo=None, sign_id=None): + self.result = result + self.sign = sign + self.sign_algo = sign_algo + self.sign_id = sign_id + +def wrap_result_object(obj): + def _wrap(o): + if isinstance(o, ResultObject): + return o + return ResultObject(result=o) + + if isinstance(obj, defer.Deferred): + # We don't have result yet, just wait for it and wrap it later + obj.addCallback(_wrap) + return obj + + return _wrap(obj) + +class ServiceFactory(object): + registry = {} # Mapping service_type -> vendor -> cls + + @classmethod + def _split_method(cls, method): + '''Parses "some.service[vendor].method" string + and returns 3-tuple with (service_type, vendor, rpc_method)''' + + # Splits the service type and method name + (service_type, method_name) = method.rsplit('.', 1) + vendor = None + + if '[' in service_type: + # Use regular expression only when brackets found + try: + vendor = VENDOR_RE.search(service_type).group(1) + service_type = service_type.replace('[%s]' % vendor, '') + except: + raise + #raise custom_exceptions.ServiceNotFoundException("Invalid syntax in service name '%s'" % type_name[0]) + + return (service_type, vendor, method_name) + + @classmethod + def call(cls, method, params, connection_ref=None): + if method in ['submit','login']: + method = 'mining.%s' % method + params = [params,] + try: + (service_type, vendor, func_name) = cls._split_method(method) + except ValueError: + raise custom_exceptions.MethodNotFoundException("Method name parsing failed. You *must* use format ., e.g. 'example.ping'") + + try: + if func_name.startswith('_'): + raise + + _inst = cls.lookup(service_type, vendor=vendor)() + _inst.connection_ref = weakref.ref(connection_ref) + func = _inst.__getattribute__(func_name) + if not callable(func): + raise + except: + raise custom_exceptions.MethodNotFoundException("Method '%s' not found for service '%s'" % (func_name, service_type)) + + def _run(func, *params): + return wrap_result_object(func(*params)) + + # Returns Defer which will lead to ResultObject sometimes + return defer.maybeDeferred(_run, func, *params) + + @classmethod + def lookup(cls, service_type, vendor=None): + # Lookup for service type provided by specific vendor + if vendor: + try: + return cls.registry[service_type][vendor] + except KeyError: + raise custom_exceptions.ServiceNotFoundException("Class for given service type and vendor isn't registered") + + # Lookup for any vendor, prefer default one + try: + vendors = cls.registry[service_type] + except KeyError: + raise custom_exceptions.ServiceNotFoundException("Class for given service type isn't registered") + + last_found = None + for _, _cls in vendors.items(): + last_found = _cls + if last_found.is_default: + return last_found + + if not last_found: + raise custom_exceptions.ServiceNotFoundException("Class for given service type isn't registered") + + return last_found + + @classmethod + def register_service(cls, _cls, meta): + # Register service class to ServiceFactory + service_type = meta.get('service_type') + service_vendor = meta.get('service_vendor') + is_default = meta.get('is_default') + + if str(_cls.__name__) in ('GenericService',): + # str() is ugly hack, but it is avoiding circular references + return + + if not service_type: + raise custom_exceptions.MissingServiceTypeException("Service class '%s' is missing 'service_type' property." % _cls) + + if not service_vendor: + raise custom_exceptions.MissingServiceVendorException("Service class '%s' is missing 'service_vendor' property." % _cls) + + if is_default == None: + raise custom_exceptions.MissingServiceIsDefaultException("Service class '%s' is missing 'is_default' property." % _cls) + + if is_default: + # Check if there's not any other default service + + try: + current = cls.lookup(service_type) + if current.is_default: + raise custom_exceptions.DefaultServiceAlreadyExistException("Default service already exists for type '%s'" % service_type) + except custom_exceptions.ServiceNotFoundException: + pass + + setup_func = meta.get('_setup', None) + if setup_func != None: + _cls()._setup() + + ServiceFactory.registry.setdefault(service_type, {}) + ServiceFactory.registry[service_type][service_vendor] = _cls + + log.msg("Registered %s for service '%s', vendor '%s' (default: %s)" % (_cls, service_type, service_vendor, is_default)) + +def synchronous(func): + '''Run given method synchronously in separate thread and return the result.''' + def inner(*args, **kwargs): + return threads.deferToThread(func, *args, **kwargs) + return inner + +def admin(func): + '''Requires an extra first parameter with superadministrator password''' + import settings + def inner(*args, **kwargs): + if not len(args): + raise custom_exceptions.UnauthorizedException("Missing password") + + if settings.ADMIN_RESTRICT_INTERFACE != None: + ip = args[0].connection_ref()._get_ip() + if settings.ADMIN_RESTRICT_INTERFACE != ip: + raise custom_exceptions.UnauthorizedException("RPC call not allowed from your IP") + + if not settings.ADMIN_PASSWORD_SHA256: + raise custom_exceptions.UnauthorizedException("Admin password not set, RPC call disabled") + + (password, args) = (args[1], [args[0],] + list(args[2:])) + + if hashlib.sha256(password).hexdigest() != settings.ADMIN_PASSWORD_SHA256: + raise custom_exceptions.UnauthorizedException("Wrong password") + + return func(*args, **kwargs) + return inner + +class ServiceMetaclass(type): + def __init__(cls, name, bases, _dict): + super(ServiceMetaclass, cls).__init__(name, bases, _dict) + ServiceFactory.register_service(cls, _dict) + +class GenericService(object): + __metaclass__ = ServiceMetaclass + service_type = None + service_vendor = None + is_default = None + + # Keep weak reference to connection which asked for current + # RPC call. Useful for pubsub mechanism, but use it with care. + # It does not need to point to actual and valid data, so + # you have to check if connection still exists every time. + connection_ref = None + +class ServiceDiscovery(GenericService): + service_type = 'discovery' + service_vendor = 'Stratum' + is_default = True + + def list_services(self): + return ServiceFactory.registry.keys() + + def list_vendors(self, service_type): + return ServiceFactory.registry[service_type].keys() + + def list_methods(self, service_name): + # Accepts also vendors in square brackets: firstbits[firstbits.com] + + # Parse service type and vendor. We don't care about the method name, + # but _split_method needs full path to some RPC method. + (service_type, vendor, _) = ServiceFactory._split_method("%s.foo" % service_name) + service = ServiceFactory.lookup(service_type, vendor) + out = [] + + for name, obj in service.__dict__.items(): + + if name.startswith('_'): + continue + + if not callable(obj): + continue + + out.append(name) + + return out + + def list_params(self, method): + (service_type, vendor, meth) = ServiceFactory._split_method(method) + service = ServiceFactory.lookup(service_type, vendor) + + # Load params and helper text from method attributes + func = service.__dict__[meth] + params = getattr(func, 'params', None) + help_text = getattr(func, 'help_text', None) + + return (help_text, params) + list_params.help_text = "Accepts name of method and returns its description and available parameters. Example: 'firstbits.resolve'" + list_params.params = [('method', 'string', 'Method to lookup for description and parameters.'),] \ No newline at end of file diff --git a/stratum/settings.py b/stratum/settings.py new file mode 100644 index 0000000..fd7fad8 --- /dev/null +++ b/stratum/settings.py @@ -0,0 +1,74 @@ +import os.path + + +def setup(): + ''' + This will import modules config_default and config and move their variables + into current module (variables in config have higher priority than config_default). + Thanks to this, you can import settings anywhere in the application and you'll get + actual application settings. + + This config is related to server side. You don't need config.py if you + want to use client part only. + ''' + + def read_values_file(cfg): + values = [] + for line in cfg: + if not line.count('#') and line.count('='): + cfgvars = line.split('=') + varname = cfgvars[0].strip() + value = cfgvars[1].strip() + if value in ('True','False'): + value = value == 'True' + elif not value.count('"'): + value = int(value) + else: + value = value.replace('"','') + yield (varname, value) + + def read_values(cfg): + for varname in cfg.__dict__.keys(): + if varname.startswith('__'): + continue + value = getattr(cfg, varname) + yield (varname, value) + + import config_default + + if os.path.isfile('eth-proxy.conf'): + config = open('eth-proxy.conf','r').readlines() + else: + # Custom config not presented, but we can still use defaults + config = None + + import sys + module = sys.modules[__name__] + + for name,value in read_values(config_default): + module.__dict__[name] = value + + changes = {} + if config: + for name,value in read_values_file(config): + if value != module.__dict__.get(name, None): + if name=="DEBUG" and value: + changes["LOGLEVEL"] = "DEBUG" + module.__dict__["LOGLEVEL"] = "DEBUG" + if name=="LOG_TO_FILE" and value: + changes["LOGFILE"] = "proxy.log" + module.__dict__["LOGFILE"] = "proxy.log" + changes[name] = value + module.__dict__[name] = value + + if module.__dict__['DEBUG'] and changes: + print "----------------" + print "Custom settings:" + for k, v in changes.items(): + if 'passw' in k.lower(): + print k, ": ********" + else: + print k, ":", v + print "----------------" + +setup() diff --git a/stratum/socket_transport.py b/stratum/socket_transport.py new file mode 100644 index 0000000..5dc6f40 --- /dev/null +++ b/stratum/socket_transport.py @@ -0,0 +1,158 @@ +from twisted.internet.protocol import ServerFactory +from twisted.internet.protocol import ReconnectingClientFactory +from twisted.internet import reactor, defer, endpoints + +import socksclient +import custom_exceptions +from protocol import Protocol, ClientProtocol +from event_handler import GenericEventHandler + +import logger +log = logger.get_logger('socket_transport') + +def sockswrapper(proxy, dest): + endpoint = endpoints.TCP4ClientEndpoint(reactor, dest[0], dest[1]) + return socksclient.SOCKSWrapper(reactor, proxy[0], proxy[1], endpoint) + +class SocketTransportFactory(ServerFactory): + def __init__(self, debug=False, signing_key=None, signing_id=None, event_handler=GenericEventHandler, + tcp_proxy_protocol_enable=False): + self.debug = debug + self.signing_key = signing_key + self.signing_id = signing_id + self.event_handler = event_handler + self.protocol = Protocol + + # Read settings.TCP_PROXY_PROTOCOL documentation + self.tcp_proxy_protocol_enable = tcp_proxy_protocol_enable + +class SocketTransportClientFactory(ReconnectingClientFactory): + def __init__(self, host, port, allow_trusted=True, allow_untrusted=False, + debug=False, signing_key=None, signing_id=None, + is_reconnecting=True, proxy=None, + event_handler=GenericEventHandler): + self.debug = debug + self.is_reconnecting = is_reconnecting + self.signing_key = signing_key + self.signing_id = signing_id + self.client = None # Reference to open connection + self.on_disconnect = defer.Deferred() + self.on_connect = defer.Deferred() + self.peers_trusted = {} + self.peers_untrusted = {} + self.main_host = (host, port) + self.new_host = None + self.proxy = proxy + + self.event_handler = event_handler + self.protocol = ClientProtocol + self.after_connect = [] + + self.connect() + + def connect(self): + if self.proxy: + self.timeout_handler = reactor.callLater(60, self.connection_timeout) + sw = sockswrapper(self.proxy, self.main_host) + sw.connect(self) + else: + self.timeout_handler = reactor.callLater(30, self.connection_timeout) + reactor.connectTCP(self.main_host[0], self.main_host[1], self) + + ''' + This shouldn't be a part of transport layer + def add_peers(self, peers): + # FIXME: Use this list when current connection fails + for peer in peers: + hash = "%s%s%s" % (peer['hostname'], peer['ipv4'], peer['ipv6']) + + which = self.peers_trusted if peer['trusted'] else self.peers_untrusted + which[hash] = peer + + #print self.peers_trusted + #print self.peers_untrusted + ''' + + def connection_timeout(self): + self.timeout_handler = None + + if self.client: + return + + e = custom_exceptions.TransportException("SocketTransportClientFactory connection timed out") + if not self.on_connect.called: + d = self.on_connect + self.on_connect = defer.Deferred() + d.errback(e) + + else: + raise e + + def rpc(self, method, params, worker, *args, **kwargs): + if not self.client: + raise custom_exceptions.TransportException("Not connected") + + return self.client.rpc(method, params, worker, *args, **kwargs) + + def subscribe(self, method, params, *args, **kwargs): + ''' + This is like standard RPC call, except that parameters are stored + into after_connect list, so the same command will perform again + on restored connection. + ''' + if not self.client: + raise custom_exceptions.TransportException("Not connected") + + self.after_connect.append((method, params)) + return self.client.rpc(method, params, worker, *args, **kwargs) + + def reconnect(self, host=None, port=None, wait=None): + '''Close current connection and start new one. + If host or port specified, it will be used for new connection.''' + + new = list(self.main_host) + if host: + new[0] = host + if port: + new[1] = port + self.new_host = tuple(new) + + if self.client and self.client.connected: + if wait != None: + self.delay = wait + self.client.transport.connector.disconnect() + + def retry(self, connector=None): + if not self.is_reconnecting: + return + + if connector is None: + if self.connector is None: + raise ValueError("no connector to retry") + else: + connector = self.connector + + if self.new_host: + # Switch to new host if any + connector.host = self.new_host[0] + connector.port = self.new_host[1] + self.main_host = self.new_host + self.new_host = None + + return ReconnectingClientFactory.retry(self, connector) + + def buildProtocol(self, addr): + self.resetDelay() + #if not self.is_reconnecting: raise + return ReconnectingClientFactory.buildProtocol(self, addr) + + def clientConnectionLost(self, connector, reason): + if self.is_reconnecting: + log.debug(reason) + ReconnectingClientFactory.clientConnectionLost(self, connector, reason) + + def clientConnectionFailed(self, connector, reason): + if self.is_reconnecting: + log.debug(reason) + ReconnectingClientFactory.clientConnectionFailed(self, connector, reason) + diff --git a/stratum/socksclient.py b/stratum/socksclient.py new file mode 100644 index 0000000..b82c98c --- /dev/null +++ b/stratum/socksclient.py @@ -0,0 +1,106 @@ +# Copyright (c) 2011-2012, Linus Nordberg +# Taken from https://github.com/ln5/twisted-socks/ + +import socket +import struct +from zope.interface import implements +from twisted.internet import defer +from twisted.internet.interfaces import IStreamClientEndpoint +from twisted.internet.protocol import Protocol, ClientFactory +from twisted.internet.endpoints import _WrappingFactory + +class SOCKSError(Exception): + def __init__(self, val): + self.val = val + def __str__(self): + return repr(self.val) + +class SOCKSv4ClientProtocol(Protocol): + buf = '' + + def SOCKSConnect(self, host, port): + # only socksv4a for now + ver = 4 + cmd = 1 # stream connection + user = '\x00' + dnsname = '' + try: + addr = socket.inet_aton(host) + except socket.error: + addr = '\x00\x00\x00\x01' + dnsname = '%s\x00' % host + msg = struct.pack('!BBH', ver, cmd, port) + addr + user + dnsname + self.transport.write(msg) + + def verifySocksReply(self, data): + """ + Return True on success, False on need-more-data. + Raise SOCKSError on request rejected or failed. + """ + if len(data) < 8: + return False + if ord(data[0]) != 0: + self.transport.loseConnection() + raise SOCKSError((1, "bad data")) + status = ord(data[1]) + if status != 0x5a: + self.transport.loseConnection() + raise SOCKSError((status, "request not granted: %d" % status)) + return True + + def isSuccess(self, data): + self.buf += data + return self.verifySocksReply(self.buf) + + def connectionMade(self): + self.SOCKSConnect(self.postHandshakeEndpoint._host, + self.postHandshakeEndpoint._port) + + def dataReceived(self, data): + if self.isSuccess(data): + # Build protocol from provided factory and transfer control to it. + self.transport.protocol = self.postHandshakeFactory.buildProtocol( + self.transport.getHost()) + self.transport.protocol.transport = self.transport + self.transport.protocol.connected = 1 + self.transport.protocol.connectionMade() + self.handshakeDone.callback(self.transport.getPeer()) + +class SOCKSv4ClientFactory(ClientFactory): + protocol = SOCKSv4ClientProtocol + + def buildProtocol(self, addr): + r=ClientFactory.buildProtocol(self, addr) + r.postHandshakeEndpoint = self.postHandshakeEndpoint + r.postHandshakeFactory = self.postHandshakeFactory + r.handshakeDone = self.handshakeDone + return r + +class SOCKSWrapper(object): + implements(IStreamClientEndpoint) + factory = SOCKSv4ClientFactory + + def __init__(self, reactor, host, port, endpoint): + self._host = host + self._port = port + self._reactor = reactor + self._endpoint = endpoint + + def connect(self, protocolFactory): + """ + Return a deferred firing when the SOCKS connection is established. + """ + + try: + # Connect with an intermediate SOCKS factory/protocol, + # which then hands control to the provided protocolFactory + # once a SOCKS connection has been established. + f = self.factory() + f.postHandshakeEndpoint = self._endpoint + f.postHandshakeFactory = protocolFactory + f.handshakeDone = defer.Deferred() + wf = _WrappingFactory(f) + self._reactor.connectTCP(self._host, self._port, wf) + return f.handshakeDone + except: + return defer.fail() \ No newline at end of file diff --git a/stratum/stats.py b/stratum/stats.py new file mode 100644 index 0000000..63b3026 --- /dev/null +++ b/stratum/stats.py @@ -0,0 +1,46 @@ +import time +import logger +log = logger.get_logger('stats') + +class PeerStats(object): + '''Stub for server statistics''' + counter = 0 + changes = 0 + + @classmethod + def client_connected(cls, ip): + cls.counter += 1 + cls.changes += 1 + + cls.print_stats() + + @classmethod + def client_disconnected(cls, ip): + cls.counter -= 1 + cls.changes += 1 + + cls.print_stats() + + @classmethod + def print_stats(cls): + if cls.counter and float(cls.changes) / cls.counter < 0.05: + # Print connection stats only when more than + # 5% connections change to avoid log spam + return + + log.info("%d peers connected, state changed %d times" % (cls.counter, cls.changes)) + cls.changes = 0 + + @classmethod + def get_connected_clients(cls): + return cls.counter + +''' +class CpuStats(object): + start_time = time.time() + + @classmethod + def get_time(cls): + diff = time.time() - cls.start_time + return resource.getrusage(resource.RUSAGE_SELF)[0] / diff +''' \ No newline at end of file diff --git a/stratum/storage.py b/stratum/storage.py new file mode 100644 index 0000000..fd3dddd --- /dev/null +++ b/stratum/storage.py @@ -0,0 +1,17 @@ +#class StorageFactory(object): + +class Storage(object): + #def __new__(self, session_id): + # pass + + def __init__(self): + self.__services = {} + self.session = None + + def get(self, service_type, vendor, default_object): + self.__services.setdefault(service_type, {}) + self.__services[service_type].setdefault(vendor, default_object) + return self.__services[service_type][vendor] + + def __repr__(self): + return str(self.__services) \ No newline at end of file diff --git a/stratum/version.py b/stratum/version.py new file mode 100644 index 0000000..e2f81b8 --- /dev/null +++ b/stratum/version.py @@ -0,0 +1 @@ +VERSION='0.2.13' diff --git a/stratum/websocket_transport.py b/stratum/websocket_transport.py new file mode 100644 index 0000000..2f26e2f --- /dev/null +++ b/stratum/websocket_transport.py @@ -0,0 +1,37 @@ +from autobahn.websocket import WebSocketServerProtocol, WebSocketServerFactory +from protocol import Protocol +from event_handler import GenericEventHandler + +class WebsocketServerProtocol(WebSocketServerProtocol, Protocol): + def connectionMade(self): + WebSocketServerProtocol.connectionMade(self) + Protocol.connectionMade(self) + + def connectionLost(self, reason): + WebSocketServerProtocol.connectionLost(self, reason) + Protocol.connectionLost(self, reason) + + def onMessage(self, msg, is_binary): + Protocol.dataReceived(self, msg) + + def transport_write(self, data): + self.sendMessage(data, False) + +class WebsocketTransportFactory(WebSocketServerFactory): + def __init__(self, port, is_secure=False, debug=False, signing_key=None, signing_id=None, + event_handler=GenericEventHandler): + self.debug = debug + self.signing_key = signing_key + self.signing_id = signing_id + self.protocol = WebsocketServerProtocol + self.event_handler = event_handler + + if is_secure: + uri = "wss://0.0.0.0:%d" % port + else: + uri = "ws://0.0.0.0:%d" % port + + WebSocketServerFactory.__init__(self, uri) + +# P.S. There's not Websocket client implementation yet +# P.P.S. And it probably won't be for long time...' \ No newline at end of file