Skip to content

Commit

Permalink
fs_storage: support SSH private keys authentication
Browse files Browse the repository at this point in the history
SSH connections can now be done with private keys by setting the `pkey`
+ `passphrase` options.
Coupled with the `eval_options_from_env` this allows to set these ones
from the environment, e.g:

`{"host": "sftp.example.net", "username": "sftp", "pkey": "$SSH_KEY", "passphrase": "$SSH_PASSPHRASE", "port": 22}`
  • Loading branch information
sebalix committed Jan 29, 2024
1 parent 96a648b commit c0b3315
Showing 1 changed file with 53 additions and 0 deletions.
53 changes: 53 additions & 0 deletions fs_storage/models/fs_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import base64
import functools
import inspect
import io
import json
import logging
import os.path
Expand All @@ -21,6 +22,19 @@

_logger = logging.getLogger(__name__)

try:
import paramiko

SSH_PKEYS = {

Check warning on line 28 in fs_storage/models/fs_storage.py

View check run for this annotation

Codecov / codecov/patch

fs_storage/models/fs_storage.py#L28

Added line #L28 was not covered by tests
"DSS": paramiko.DSSKey,
"RSA": paramiko.RSAKey,
"ECDSA": paramiko.ECDSAKey,
"OPENSSH": paramiko.Ed25519Key,
}
except ImportError: # pragma: no cover
_logger.debug("Cannot `import paramiko`.")
SSH_PKEYS = {}


# TODO: useful for the whole OCA?
def deprecated(reason):
Expand Down Expand Up @@ -358,13 +372,52 @@ def _get_filesystem(self) -> fsspec.AbstractFileSystem:
and isinstance(options["auth"], list)
):
options["auth"] = tuple(options["auth"])
if (
self.protocol in ("sftp", "ssh")
and "pkey" in options
and isinstance(options["pkey"], str)
):
# Handle SSH private keys by replacing 'pkey' parameter by a
# paramiko.pkey.PKey object
pkey_file = io.StringIO(options["pkey"])
pkey = self._get_ssh_private_key(

Check warning on line 383 in fs_storage/models/fs_storage.py

View check run for this annotation

Codecov / codecov/patch

fs_storage/models/fs_storage.py#L382-L383

Added lines #L382 - L383 were not covered by tests
pkey_file, passphrase=options.get("passphrase")
)
options["pkey"] = pkey

Check warning on line 386 in fs_storage/models/fs_storage.py

View check run for this annotation

Codecov / codecov/patch

fs_storage/models/fs_storage.py#L386

Added line #L386 was not covered by tests
options = self._recursive_add_odoo_storage_path(options)
fs = fsspec.filesystem(self.protocol, **options)
directory_path = self.directory_path
if directory_path:
fs = fsspec.filesystem("rooted_dir", path=directory_path, fs=fs)
return fs

def _detect_ssh_private_key_type(self, pkey_file):
"""Detect SSH private key type (RSA, DSS...)."""
# Code copied and adapted from 'paramiko.pkey.PKey._read_private_key' method
# https://github.com/paramiko/paramiko/blob/main/paramiko/pkey.py#L498C9-L498C26
pkey_file.seek(0)
lines = pkey_file.readlines()
pkey_file.seek(0)

Check warning on line 400 in fs_storage/models/fs_storage.py

View check run for this annotation

Codecov / codecov/patch

fs_storage/models/fs_storage.py#L398-L400

Added lines #L398 - L400 were not covered by tests
if not lines:
raise paramiko.SSHException("no lines in private key file")
start = 0
m = paramiko.pkey.PKey.BEGIN_TAG.match(lines[start])
line_range = len(lines) - 1

Check warning on line 405 in fs_storage/models/fs_storage.py

View check run for this annotation

Codecov / codecov/patch

fs_storage/models/fs_storage.py#L402-L405

Added lines #L402 - L405 were not covered by tests
while start < line_range and not m:
start += 1
m = paramiko.pkey.PKey.BEGIN_TAG.match(lines[start])
start += 1
keytype = m.group(1) if m else None

Check warning on line 410 in fs_storage/models/fs_storage.py

View check run for this annotation

Codecov / codecov/patch

fs_storage/models/fs_storage.py#L407-L410

Added lines #L407 - L410 were not covered by tests
if keytype:
return keytype

Check warning on line 412 in fs_storage/models/fs_storage.py

View check run for this annotation

Codecov / codecov/patch

fs_storage/models/fs_storage.py#L412

Added line #L412 was not covered by tests

def _get_ssh_private_key(self, pkey_file, passphrase=None):
"""Build the expected `paramiko.pkey.PKey` object."""
keytype = self._detect_ssh_private_key_type(pkey_file)

Check warning on line 416 in fs_storage/models/fs_storage.py

View check run for this annotation

Codecov / codecov/patch

fs_storage/models/fs_storage.py#L416

Added line #L416 was not covered by tests
if not keytype:
raise paramiko.SSHException("not a valid private key file")
return SSH_PKEYS[keytype].from_private_key(pkey_file, password=passphrase)

Check warning on line 419 in fs_storage/models/fs_storage.py

View check run for this annotation

Codecov / codecov/patch

fs_storage/models/fs_storage.py#L418-L419

Added lines #L418 - L419 were not covered by tests

# Deprecated methods used to ease the migration from the storage_backend addons
# to the fs_storage addons. These methods will be removed in the future (Odoo 18)
@deprecated("Please use _get_filesystem() instead and the fsspec API directly.")
Expand Down

0 comments on commit c0b3315

Please sign in to comment.