From 32b080f769eb1ce761f749fd6362420f8cb9468b Mon Sep 17 00:00:00 2001 From: James Wettenhall Date: Wed, 22 Jul 2015 14:58:19 +1000 Subject: [PATCH] Now using a pool for SSH ControlMaster processes, instead of associating them with Upload or Verification models. --- mydata/utils/openssh.py | 298 ++++++++++++++++++++++------------------ 1 file changed, 163 insertions(+), 135 deletions(-) diff --git a/mydata/utils/openssh.py b/mydata/utils/openssh.py index 54625b0..4d8e8bb 100644 --- a/mydata/utils/openssh.py +++ b/mydata/utils/openssh.py @@ -24,6 +24,8 @@ from datetime import datetime import errno import getpass +import threading +import time from mydata.logs import logger from mydata.utils.exceptions import SshException @@ -31,7 +33,8 @@ from mydata.utils.exceptions import StagingHostRefusedSshConnection from mydata.utils.exceptions import StagingHostSshPermissionDenied from mydata.utils.exceptions import PrivateKeyDoesNotExist -from mydata.models.upload import HumanReadableSizeString +from mydata.utils import PidIsRunning +from mydata.utils import HumanReadableSizeString defaultStartupInfo = None @@ -81,7 +84,6 @@ def __init__(self): self.sshKeyGen = f("bin", "ssh-keygen.exe") self.cipher = "arcfour" self.sh = f("bin", "sh.exe") - self.pwd = f("bin", "pwd.exe") self.dd = f("bin", "dd.exe") self.preferToUseShellInSubprocess = False @@ -118,6 +120,27 @@ def __init__(self): # subprocess to quote the command lists correctly. self.preferToUseShellInSubprocess = True + def GetSshControlMasterPool(self, username=None, privateKeyFilePath=None, + hostname=None, createIfMissing=True): + """ + -oControlMaster is only available in POSIX implementations of ssh. + """ + if sys.platform.startswith("win"): + raise NotImplementedError("-oControlMaster is not implemented " + "in MinGW or Cygwin builds of OpenSSH.") + if not hasattr(self, "sshControlMasterPool"): + if createIfMissing: + if not hasattr(self, "createSshControlMasterPoolThreadingLock"): + self.createSshControlMasterPoolThreadingLock = threading.Lock() + self.createSshControlMasterPoolThreadingLock.acquire() + self.sshControlMasterPool = \ + SshControlMasterPool(username, privateKeyFilePath, + hostname) + self.createSshControlMasterPoolThreadingLock.release() + else: + return None + return self.sshControlMasterPool + class KeyPair(): @@ -350,120 +373,6 @@ def NewKeyPair(keyName=None, raise SshException(stdout) -def GetSshMasterProcessAndControlPath(uploadOrVerificationModel, username, - privateKeyFilePath, hostname): - """ - Unfortunately re-using an SSH connection with -oControlPath=... - only works on POSIX systems, not on Windows. - - To try to achieve a similar effect on Windows, we have the following - options: - - 1. Use an SSH agent (ssh-agent.exe and ssh-add.exe are already bundled). - Subsequent remote commands over SSH channels would still have some - connection overhead (unlike the -oControlPath method), but reading the - key from the agent should be faster than reading it from disk every time. - - 2. We can use a trick to slightly speed up SSH connection time: - mkdir openssh-msys...\home\\.ssh\ - where can be determined from getpass.getuser() - the Msys build of ssh looks here for known_hosts and repeatedly complains - if this directory doesn't exist, so we can easily create it to keep Msys's - SSH happy and hopefully speed up connections slightly. - - 3. Use bigger chunk sizes on Windows to ensure that we create new SSH - connections less often. Maybe return to having two different upload - methods - one for large files and one for small files. - - 4. Re-try the piping method (at least for small files). Basically, - we repeatedly write chunks to a subprocess.stdin PIPE (one for each - upload thread) and keep the "ssh staging_host cat >>" process open. - This seemed to work OK on Windows previously, but it failed so dismally - on Mac OS X (incomplete file transfers probably due to buffering), that - I removed it (but we can resurrect it). - - Whilst it will look messy to do different things for different OS's, - the path of least resistance might be just to resurrect what was working - on Windows before we did the Mac testing, and be careful to check which - OS we're on before deciding which upload method (and submethod) to use. - - So I think I'm voting in favour of 4 (piping for small files on Windows - only) and 3 (the part about returning to different upload methods for - different file sizes, at least on Windows). 2. is easy, so we can - definitely do that, and might get some benefit, but - """ - - if sys.platform.startswith("win"): - raise NotImplementedError("SSH connection caching is not implemented " - "in MinGW or Cygwin builds of OpenSSH.") - if uploadOrVerificationModel.GetSshMasterProcess(): - sshMasterProcess = uploadOrVerificationModel.GetSshMasterProcess() - sshControlPath = uploadOrVerificationModel.GetSshControlPath() - else: - tempFile = tempfile.NamedTemporaryFile(delete=True) - tempFile.close() - if sys.platform.startswith("win"): - sshControlPath = GetMsysPath(tempFile.name) - else: - sshControlPath = tempFile.name - uploadOrVerificationModel.SetSshControlPath(sshControlPath) - sshMasterProcessCommandString = \ - "%s -N -i %s -c %s " \ - "-oControlMaster=yes -oControlPath=%s " \ - "-oIdentitiesOnly=yes -oPasswordAuthentication=no " \ - "-oStrictHostKeyChecking=no " \ - "%s@%s" \ - % (openSSH.DoubleQuote(openSSH.ssh), privateKeyFilePath, - openSSH.cipher, - openSSH.DoubleQuote(sshControlPath), - username, hostname) - logger.debug(sshMasterProcessCommandString) - proc = subprocess.Popen( - sshMasterProcessCommandString, - shell=openSSH.preferToUseShellInSubprocess, - startupinfo=defaultStartupInfo, - creationflags=defaultCreationFlags) - - class SshMasterProcess(): - def __init__(self, proc, openSSH, sshControlPath, - username, hostname, - defaultStartupInfo, defaultCreationFlags): - self.proc = proc - self.openSSH = openSSH - self.sshControlPath = sshControlPath - self.username = username - self.hostname = hostname - self.defaultStartupInfo = defaultStartupInfo - self.defaultCreationFlags = defaultCreationFlags - self.pid = proc.pid - - def terminate(self): - logger.debug("Terminating SSH ControlMaster subprocess...") - exitSshMasterProcessCommandString = \ - "%s -oControlPath=%s -O exit " \ - "%s@%s" \ - % (self.openSSH.DoubleQuote(self.openSSH.ssh), - self.openSSH.DoubleQuote(self.sshControlPath), - self.username, self.hostname) - logger.debug(exitSshMasterProcessCommandString) - proc = subprocess.Popen( - exitSshMasterProcessCommandString, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - shell=self.openSSH.preferToUseShellInSubprocess, - startupinfo=self.defaultStartupInfo, - creationflags=self.defaultCreationFlags) - proc.communicate() - logger.debug("Terminated SSH ControlMaster subprocess.") - - sshMasterProcess = SshMasterProcess(proc, openSSH, sshControlPath, - username, hostname, - defaultStartupInfo, defaultCreationFlags) - uploadOrVerificationModel.SetSshMasterProcess(sshMasterProcess) - - return (sshMasterProcess, sshControlPath) - - def GetBytesUploadedToStaging(remoteFilePath, username, privateKeyFilePath, hostname, uploadOrVerificationModel): if sys.platform.startswith("win"): @@ -482,10 +391,12 @@ def GetBytesUploadedToStaging(remoteFilePath, username, privateKeyFilePath, hostname, openSSH.DoubleQuote("wc -c %s" % quotedRemoteFilePath)] else: - sshMasterProcess, sshControlPath = \ - GetSshMasterProcessAndControlPath(uploadOrVerificationModel, - username, - privateKeyFilePath, hostname) + sshControlMasterPool = \ + openSSH.GetSshControlMasterPool(username, privateKeyFilePath, + hostname) + sshControlMasterProcess = \ + sshControlMasterPool.GetSshControlMasterProcess() + sshControlPath = sshControlMasterProcess.GetControlPath() # The authentication options below (-i privateKeyFilePath etc.) # shouldn't be necessary if the socket created by the SSH master @@ -616,10 +527,6 @@ def UploadFile(filePath, fileSize, username, privateKeyFilePath, if foldersController.IsShuttingDown() or uploadModel.Canceled(): logger.debug("UploadFile 1: Aborting upload for " "%s" % filePath) - if sys.platform.startswith("darwin"): - sshMasterProcess = uploadModel.GetSshMasterProcess() - if sshMasterProcess: - sshMasterProcess.terminate() return if bytesUploaded == fileSize: logger.debug("UploadFile returning because file \"%s\" has already " @@ -677,9 +584,12 @@ def UploadFileFromPosixSystem(filePath, fileSize, username, privateKeyFilePath, # logger.warning("Assuming that the remote shell is Bash.") - sshMasterProcess, sshControlPath = \ - GetSshMasterProcessAndControlPath(uploadModel, username, - privateKeyFilePath, hostname) + sshControlMasterPool = \ + openSSH.GetSshControlMasterPool(username, privateKeyFilePath, + hostname) + sshControlMasterProcess = \ + sshControlMasterPool.GetSshControlMasterProcess() + sshControlPath = sshControlMasterProcess.GetControlPath() remoteDir = os.path.dirname(remoteFilePath) quotedRemoteDir = openSSH.DoubleQuote(remoteDir) @@ -758,7 +668,6 @@ def UploadFileFromPosixSystem(filePath, fileSize, username, privateKeyFilePath, if foldersController.IsShuttingDown() or uploadModel.Canceled(): logger.debug("UploadFileFromPosixSystem 1: Aborting upload for " "%s" % filePath) - sshMasterProcess.terminate() return # Write chunk to temporary file: @@ -870,7 +779,6 @@ def UploadFileFromPosixSystem(filePath, fileSize, username, privateKeyFilePath, if foldersController.IsShuttingDown() or uploadModel.Canceled(): logger.debug("UploadFileFromPosixSystem 2: Aborting upload for " "%s" % filePath) - sshMasterProcess.terminate() return remoteRemoveChunkCommand = \ @@ -896,11 +804,8 @@ def UploadFileFromPosixSystem(filePath, fileSize, username, privateKeyFilePath, creationflags=defaultCreationFlags) stdout, _ = removeRemoteChunkProcess.communicate() if removeRemoteChunkProcess.returncode != 0: - sshMasterProcess.terminate() raise SshException(stdout, removeRemoteChunkProcess.returncode) - sshMasterProcess.terminate() - def UploadSmallFileFromWindows(filePath, fileSize, username, privateKeyFilePath, hostname, remoteFilePath, @@ -1221,10 +1126,133 @@ def GetMsysPath(path): raise Exception("OpenSSH.GetMsysPath: %s doesn't look like " "a valid path." % path) +# Singleton instance of OpenSSH class: openSSH = OpenSSH() ssh = openSSH.ssh scp = openSSH.scp sshKeyGen = openSSH.sshKeyGen -if sys.platform.startswith("win"): - sh = openSSH.sh - pwd = openSSH.pwd + + +class SshControlMasterProcess(): + """ + See "ControlMaster" in "man ssh_config" + Only available on POSIX systems. + """ + def __init__(self, username, privateKeyFilePath, hostname): + self.username = username + self.privateKeyFilePath = privateKeyFilePath + self.hostname = hostname + + tempFile = tempfile.NamedTemporaryFile(delete=True) + tempFile.close() + if sys.platform.startswith("win"): + self.sshControlPath = GetMsysPath(tempFile.name) + else: + self.sshControlPath = tempFile.name + sshControlMasterProcessCommandString = \ + "%s -N -i %s -c %s " \ + "-oControlMaster=yes -oControlPath=%s " \ + "-oIdentitiesOnly=yes -oPasswordAuthentication=no " \ + "-oStrictHostKeyChecking=no " \ + "%s@%s" \ + % (openSSH.DoubleQuote(openSSH.ssh), privateKeyFilePath, + openSSH.cipher, + openSSH.DoubleQuote(self.sshControlPath), + username, hostname) + logger.debug(sshControlMasterProcessCommandString) + self.proc = subprocess.Popen( + sshControlMasterProcessCommandString, + shell=openSSH.preferToUseShellInSubprocess, + startupinfo=defaultStartupInfo, + creationflags=defaultCreationFlags) + self.pid = self.proc.pid + + def Check(self): + checkSshControlMasterProcessCommandString = \ + "%s -oControlPath=%s -O check " \ + "%s@%s" \ + % (openSSH.DoubleQuote(openSSH.ssh), + openSSH.DoubleQuote(self.sshControlPath), + self.username, self.hostname) + logger.debug(checkSshControlMasterProcessCommandString) + proc = subprocess.Popen( + checkSshControlMasterProcessCommandString, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + shell=openSSH.preferToUseShellInSubprocess, + startupinfo=defaultStartupInfo, + creationflags=defaultCreationFlags) + proc.communicate() + return (proc.returncode == 0) + + def Exit(self): + exitSshControlMasterProcessCommandString = \ + "%s -oControlPath=%s -O exit " \ + "%s@%s" \ + % (openSSH.DoubleQuote(openSSH.ssh), + openSSH.DoubleQuote(self.sshControlPath), + self.username, self.hostname) + logger.debug(exitSshControlMasterProcessCommandString) + proc = subprocess.Popen( + exitSshControlMasterProcessCommandString, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + shell=openSSH.preferToUseShellInSubprocess, + startupinfo=defaultStartupInfo, + creationflags=defaultCreationFlags) + proc.communicate() + + def GetControlPath(self): + return self.sshControlPath + + def GetPid(self): + return self.pid + + +class SshControlMasterPool(): + """ + Re-using an SSH connection with -oControlPath=... + only works on POSIX systems, not on Windows. + + To avoid having too many frequent SSH connections on Windows, we can + use larger chunk sizes (see UploadLargeFileFromWindows). + """ + + def __init__(self, username, privateKeyFilePath, hostname): + if sys.platform.startswith("win"): + raise NotImplementedError("-oControlMaster is not implemented " + "in MinGW or Cygwin builds of OpenSSH.") + self.username = username + self.privateKeyFilePath = privateKeyFilePath + self.hostname = hostname + # self.maxConnections should be less than + # MaxSessions in staging server's sshd_config + self.maxConnections = 5 + self.sshControlMasterProcesses = [] + self.timeout = 1 + + def GetSshControlMasterProcess(self): + for sshControlMasterProcess in self.sshControlMasterProcesses: + if sshControlMasterProcess.Check(): + return sshControlMasterProcess + if len(self.sshControlMasterProcesses) < self.maxConnections: + newSshControlMasterProcess = \ + SshControlMasterProcess(self.username, self.privateKeyFilePath, + self.hostname) + self.sshControlMasterProcesses.append(newSshControlMasterProcess) + return newSshControlMasterProcess + else: + wait = 0 + while wait < self.timeout: + time.sleep(0.1) + wait += 0.1 + for sshControlMasterProcess in self.sshControlMasterProcesses: + if sshControlMasterProcess.Check(): + return sshControlMasterProcess + raise Exception("Exceeded max connections in SshControlMasterPool") + + def ShutDown(self): + for sshControlMasterProcess in self.sshControlMasterProcesses: + if PidIsRunning(sshControlMasterProcess.GetPid()): + sshControlMasterProcess.Exit() + self.sshControlMasterProcesses = []