Skip to content

Commit

Permalink
Now using a pool for SSH ControlMaster processes, instead of associat…
Browse files Browse the repository at this point in the history
…ing them

with Upload or Verification models.
  • Loading branch information
wettenhj committed Jul 22, 2015
1 parent 9977c5f commit 32b080f
Showing 1 changed file with 163 additions and 135 deletions.
298 changes: 163 additions & 135 deletions mydata/utils/openssh.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,17 @@
from datetime import datetime
import errno
import getpass
import threading
import time

from mydata.logs import logger
from mydata.utils.exceptions import SshException
from mydata.utils.exceptions import ScpException
from mydata.utils.exceptions import StagingHostRefusedSshConnection
from mydata.utils.exceptions import StagingHostSshPermissionDenied
from mydata.utils.exceptions import PrivateKeyDoesNotExist
from mydata.models.upload import HumanReadableSizeString
from mydata.utils import PidIsRunning
from mydata.utils import HumanReadableSizeString


defaultStartupInfo = None
Expand Down Expand Up @@ -81,7 +84,6 @@ def __init__(self):
self.sshKeyGen = f("bin", "ssh-keygen.exe")
self.cipher = "arcfour"
self.sh = f("bin", "sh.exe")
self.pwd = f("bin", "pwd.exe")
self.dd = f("bin", "dd.exe")
self.preferToUseShellInSubprocess = False

Expand Down Expand Up @@ -118,6 +120,27 @@ def __init__(self):
# subprocess to quote the command lists correctly.
self.preferToUseShellInSubprocess = True

def GetSshControlMasterPool(self, username=None, privateKeyFilePath=None,
hostname=None, createIfMissing=True):
"""
-oControlMaster is only available in POSIX implementations of ssh.
"""
if sys.platform.startswith("win"):
raise NotImplementedError("-oControlMaster is not implemented "
"in MinGW or Cygwin builds of OpenSSH.")
if not hasattr(self, "sshControlMasterPool"):
if createIfMissing:
if not hasattr(self, "createSshControlMasterPoolThreadingLock"):
self.createSshControlMasterPoolThreadingLock = threading.Lock()
self.createSshControlMasterPoolThreadingLock.acquire()
self.sshControlMasterPool = \
SshControlMasterPool(username, privateKeyFilePath,
hostname)
self.createSshControlMasterPoolThreadingLock.release()
else:
return None
return self.sshControlMasterPool


class KeyPair():

Expand Down Expand Up @@ -350,120 +373,6 @@ def NewKeyPair(keyName=None,
raise SshException(stdout)


def GetSshMasterProcessAndControlPath(uploadOrVerificationModel, username,
privateKeyFilePath, hostname):
"""
Unfortunately re-using an SSH connection with -oControlPath=...
only works on POSIX systems, not on Windows.
To try to achieve a similar effect on Windows, we have the following
options:
1. Use an SSH agent (ssh-agent.exe and ssh-add.exe are already bundled).
Subsequent remote commands over SSH channels would still have some
connection overhead (unlike the -oControlPath method), but reading the
key from the agent should be faster than reading it from disk every time.
2. We can use a trick to slightly speed up SSH connection time:
mkdir openssh-msys...\home\<username>\.ssh\
where <username> can be determined from getpass.getuser()
the Msys build of ssh looks here for known_hosts and repeatedly complains
if this directory doesn't exist, so we can easily create it to keep Msys's
SSH happy and hopefully speed up connections slightly.
3. Use bigger chunk sizes on Windows to ensure that we create new SSH
connections less often. Maybe return to having two different upload
methods - one for large files and one for small files.
4. Re-try the piping method (at least for small files). Basically,
we repeatedly write chunks to a subprocess.stdin PIPE (one for each
upload thread) and keep the "ssh staging_host cat >>" process open.
This seemed to work OK on Windows previously, but it failed so dismally
on Mac OS X (incomplete file transfers probably due to buffering), that
I removed it (but we can resurrect it).
Whilst it will look messy to do different things for different OS's,
the path of least resistance might be just to resurrect what was working
on Windows before we did the Mac testing, and be careful to check which
OS we're on before deciding which upload method (and submethod) to use.
So I think I'm voting in favour of 4 (piping for small files on Windows
only) and 3 (the part about returning to different upload methods for
different file sizes, at least on Windows). 2. is easy, so we can
definitely do that, and might get some benefit, but
"""

if sys.platform.startswith("win"):
raise NotImplementedError("SSH connection caching is not implemented "
"in MinGW or Cygwin builds of OpenSSH.")
if uploadOrVerificationModel.GetSshMasterProcess():
sshMasterProcess = uploadOrVerificationModel.GetSshMasterProcess()
sshControlPath = uploadOrVerificationModel.GetSshControlPath()
else:
tempFile = tempfile.NamedTemporaryFile(delete=True)
tempFile.close()
if sys.platform.startswith("win"):
sshControlPath = GetMsysPath(tempFile.name)
else:
sshControlPath = tempFile.name
uploadOrVerificationModel.SetSshControlPath(sshControlPath)
sshMasterProcessCommandString = \
"%s -N -i %s -c %s " \
"-oControlMaster=yes -oControlPath=%s " \
"-oIdentitiesOnly=yes -oPasswordAuthentication=no " \
"-oStrictHostKeyChecking=no " \
"%s@%s" \
% (openSSH.DoubleQuote(openSSH.ssh), privateKeyFilePath,
openSSH.cipher,
openSSH.DoubleQuote(sshControlPath),
username, hostname)
logger.debug(sshMasterProcessCommandString)
proc = subprocess.Popen(
sshMasterProcessCommandString,
shell=openSSH.preferToUseShellInSubprocess,
startupinfo=defaultStartupInfo,
creationflags=defaultCreationFlags)

class SshMasterProcess():
def __init__(self, proc, openSSH, sshControlPath,
username, hostname,
defaultStartupInfo, defaultCreationFlags):
self.proc = proc
self.openSSH = openSSH
self.sshControlPath = sshControlPath
self.username = username
self.hostname = hostname
self.defaultStartupInfo = defaultStartupInfo
self.defaultCreationFlags = defaultCreationFlags
self.pid = proc.pid

def terminate(self):
logger.debug("Terminating SSH ControlMaster subprocess...")
exitSshMasterProcessCommandString = \
"%s -oControlPath=%s -O exit " \
"%s@%s" \
% (self.openSSH.DoubleQuote(self.openSSH.ssh),
self.openSSH.DoubleQuote(self.sshControlPath),
self.username, self.hostname)
logger.debug(exitSshMasterProcessCommandString)
proc = subprocess.Popen(
exitSshMasterProcessCommandString,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=self.openSSH.preferToUseShellInSubprocess,
startupinfo=self.defaultStartupInfo,
creationflags=self.defaultCreationFlags)
proc.communicate()
logger.debug("Terminated SSH ControlMaster subprocess.")

sshMasterProcess = SshMasterProcess(proc, openSSH, sshControlPath,
username, hostname,
defaultStartupInfo, defaultCreationFlags)
uploadOrVerificationModel.SetSshMasterProcess(sshMasterProcess)

return (sshMasterProcess, sshControlPath)


def GetBytesUploadedToStaging(remoteFilePath, username, privateKeyFilePath,
hostname, uploadOrVerificationModel):
if sys.platform.startswith("win"):
Expand All @@ -482,10 +391,12 @@ def GetBytesUploadedToStaging(remoteFilePath, username, privateKeyFilePath,
hostname,
openSSH.DoubleQuote("wc -c %s" % quotedRemoteFilePath)]
else:
sshMasterProcess, sshControlPath = \
GetSshMasterProcessAndControlPath(uploadOrVerificationModel,
username,
privateKeyFilePath, hostname)
sshControlMasterPool = \
openSSH.GetSshControlMasterPool(username, privateKeyFilePath,
hostname)
sshControlMasterProcess = \
sshControlMasterPool.GetSshControlMasterProcess()
sshControlPath = sshControlMasterProcess.GetControlPath()

# The authentication options below (-i privateKeyFilePath etc.)
# shouldn't be necessary if the socket created by the SSH master
Expand Down Expand Up @@ -616,10 +527,6 @@ def UploadFile(filePath, fileSize, username, privateKeyFilePath,
if foldersController.IsShuttingDown() or uploadModel.Canceled():
logger.debug("UploadFile 1: Aborting upload for "
"%s" % filePath)
if sys.platform.startswith("darwin"):
sshMasterProcess = uploadModel.GetSshMasterProcess()
if sshMasterProcess:
sshMasterProcess.terminate()
return
if bytesUploaded == fileSize:
logger.debug("UploadFile returning because file \"%s\" has already "
Expand Down Expand Up @@ -677,9 +584,12 @@ def UploadFileFromPosixSystem(filePath, fileSize, username, privateKeyFilePath,

# logger.warning("Assuming that the remote shell is Bash.")

sshMasterProcess, sshControlPath = \
GetSshMasterProcessAndControlPath(uploadModel, username,
privateKeyFilePath, hostname)
sshControlMasterPool = \
openSSH.GetSshControlMasterPool(username, privateKeyFilePath,
hostname)
sshControlMasterProcess = \
sshControlMasterPool.GetSshControlMasterProcess()
sshControlPath = sshControlMasterProcess.GetControlPath()

remoteDir = os.path.dirname(remoteFilePath)
quotedRemoteDir = openSSH.DoubleQuote(remoteDir)
Expand Down Expand Up @@ -758,7 +668,6 @@ def UploadFileFromPosixSystem(filePath, fileSize, username, privateKeyFilePath,
if foldersController.IsShuttingDown() or uploadModel.Canceled():
logger.debug("UploadFileFromPosixSystem 1: Aborting upload for "
"%s" % filePath)
sshMasterProcess.terminate()
return

# Write chunk to temporary file:
Expand Down Expand Up @@ -870,7 +779,6 @@ def UploadFileFromPosixSystem(filePath, fileSize, username, privateKeyFilePath,
if foldersController.IsShuttingDown() or uploadModel.Canceled():
logger.debug("UploadFileFromPosixSystem 2: Aborting upload for "
"%s" % filePath)
sshMasterProcess.terminate()
return

remoteRemoveChunkCommand = \
Expand All @@ -896,11 +804,8 @@ def UploadFileFromPosixSystem(filePath, fileSize, username, privateKeyFilePath,
creationflags=defaultCreationFlags)
stdout, _ = removeRemoteChunkProcess.communicate()
if removeRemoteChunkProcess.returncode != 0:
sshMasterProcess.terminate()
raise SshException(stdout, removeRemoteChunkProcess.returncode)

sshMasterProcess.terminate()


def UploadSmallFileFromWindows(filePath, fileSize, username,
privateKeyFilePath, hostname, remoteFilePath,
Expand Down Expand Up @@ -1221,10 +1126,133 @@ def GetMsysPath(path):
raise Exception("OpenSSH.GetMsysPath: %s doesn't look like "
"a valid path." % path)

# Singleton instance of OpenSSH class:
openSSH = OpenSSH()
ssh = openSSH.ssh
scp = openSSH.scp
sshKeyGen = openSSH.sshKeyGen
if sys.platform.startswith("win"):
sh = openSSH.sh
pwd = openSSH.pwd


class SshControlMasterProcess():
"""
See "ControlMaster" in "man ssh_config"
Only available on POSIX systems.
"""
def __init__(self, username, privateKeyFilePath, hostname):
self.username = username
self.privateKeyFilePath = privateKeyFilePath
self.hostname = hostname

tempFile = tempfile.NamedTemporaryFile(delete=True)
tempFile.close()
if sys.platform.startswith("win"):
self.sshControlPath = GetMsysPath(tempFile.name)
else:
self.sshControlPath = tempFile.name
sshControlMasterProcessCommandString = \
"%s -N -i %s -c %s " \
"-oControlMaster=yes -oControlPath=%s " \
"-oIdentitiesOnly=yes -oPasswordAuthentication=no " \
"-oStrictHostKeyChecking=no " \
"%s@%s" \
% (openSSH.DoubleQuote(openSSH.ssh), privateKeyFilePath,
openSSH.cipher,
openSSH.DoubleQuote(self.sshControlPath),
username, hostname)
logger.debug(sshControlMasterProcessCommandString)
self.proc = subprocess.Popen(
sshControlMasterProcessCommandString,
shell=openSSH.preferToUseShellInSubprocess,
startupinfo=defaultStartupInfo,
creationflags=defaultCreationFlags)
self.pid = self.proc.pid

def Check(self):
checkSshControlMasterProcessCommandString = \
"%s -oControlPath=%s -O check " \
"%s@%s" \
% (openSSH.DoubleQuote(openSSH.ssh),
openSSH.DoubleQuote(self.sshControlPath),
self.username, self.hostname)
logger.debug(checkSshControlMasterProcessCommandString)
proc = subprocess.Popen(
checkSshControlMasterProcessCommandString,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=openSSH.preferToUseShellInSubprocess,
startupinfo=defaultStartupInfo,
creationflags=defaultCreationFlags)
proc.communicate()
return (proc.returncode == 0)

def Exit(self):
exitSshControlMasterProcessCommandString = \
"%s -oControlPath=%s -O exit " \
"%s@%s" \
% (openSSH.DoubleQuote(openSSH.ssh),
openSSH.DoubleQuote(self.sshControlPath),
self.username, self.hostname)
logger.debug(exitSshControlMasterProcessCommandString)
proc = subprocess.Popen(
exitSshControlMasterProcessCommandString,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=openSSH.preferToUseShellInSubprocess,
startupinfo=defaultStartupInfo,
creationflags=defaultCreationFlags)
proc.communicate()

def GetControlPath(self):
return self.sshControlPath

def GetPid(self):
return self.pid


class SshControlMasterPool():
"""
Re-using an SSH connection with -oControlPath=...
only works on POSIX systems, not on Windows.
To avoid having too many frequent SSH connections on Windows, we can
use larger chunk sizes (see UploadLargeFileFromWindows).
"""

def __init__(self, username, privateKeyFilePath, hostname):
if sys.platform.startswith("win"):
raise NotImplementedError("-oControlMaster is not implemented "
"in MinGW or Cygwin builds of OpenSSH.")
self.username = username
self.privateKeyFilePath = privateKeyFilePath
self.hostname = hostname
# self.maxConnections should be less than
# MaxSessions in staging server's sshd_config
self.maxConnections = 5
self.sshControlMasterProcesses = []
self.timeout = 1

def GetSshControlMasterProcess(self):
for sshControlMasterProcess in self.sshControlMasterProcesses:
if sshControlMasterProcess.Check():
return sshControlMasterProcess
if len(self.sshControlMasterProcesses) < self.maxConnections:
newSshControlMasterProcess = \
SshControlMasterProcess(self.username, self.privateKeyFilePath,
self.hostname)
self.sshControlMasterProcesses.append(newSshControlMasterProcess)
return newSshControlMasterProcess
else:
wait = 0
while wait < self.timeout:
time.sleep(0.1)
wait += 0.1
for sshControlMasterProcess in self.sshControlMasterProcesses:
if sshControlMasterProcess.Check():
return sshControlMasterProcess
raise Exception("Exceeded max connections in SshControlMasterPool")

def ShutDown(self):
for sshControlMasterProcess in self.sshControlMasterProcesses:
if PidIsRunning(sshControlMasterProcess.GetPid()):
sshControlMasterProcess.Exit()
self.sshControlMasterProcesses = []

0 comments on commit 32b080f

Please sign in to comment.