Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable cloning of private PMR repositories #7

Merged
merged 3 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 111 additions & 1 deletion src/pmr2/wfctrl/cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import sys
from io import BytesIO

import urllib3

if sys.version_info > (3, 0): # pragma: no cover
from configparser import ConfigParser
else: # pragma: no cover
Expand Down Expand Up @@ -216,6 +218,40 @@ def reset_to_remote(self, workspace, branch=None):
return self.execute(*args)


class AuthenticatedGitDvcsCmd(GitDvcsCmd):
name = 'authenticated_git'

def __init__(self, remote=None, cmd_binary=None):
super().__init__(remote=remote, cmd_binary=cmd_binary)

self._auth_header = None

def set_authorization(self, authorization_header):
"""
Sets the authorization header for requests made by this class. The input should specify the type of authorization along with the
authorization token itself (e.g., "Basic {token}").
"""
self._auth_header = authorization_header

def _authenticate(self, workspace):
args = ['config', 'http.extraHeader', f'Authorization: {self._auth_header}']
return self.execute(*self._args(workspace, *args))

def clone(self, workspace, **kw):
args = ['clone', '--config', f'http.extraHeader=Authorization: {self._auth_header}', self.remote, workspace.working_dir]
return self.execute(*args)

def pull(self, workspace, **kw):
self._authenticate(workspace)
target = self.read_remote(workspace)
return self.execute(*self._args(workspace, 'pull', target))

def push(self, workspace, **kw):
self._authenticate(workspace)
target = self.get_remote(workspace)
return self.execute(*self._args(workspace, 'push', target))


class DulwichDvcsCmd(BaseDvcsCmd):
name = 'dulwich'
marker = '.git'
Expand Down Expand Up @@ -311,8 +347,82 @@ def reset_to_remote(self, workspace, branch=None):
return b'', b'', 0


class AuthenticatedDulwichDvcsCmd(DulwichDvcsCmd):
name = 'authenticated_dulwich'

def __init__(self, remote=None):
super().__init__(remote=remote)

self._auth_header = None

def set_authorization(self, authorization_header):
"""
Sets the authorization header for requests made by this class. The input should specify the type of authorization along with the
authorization token itself (e.g., "Basic {token}").
"""
self._auth_header = authorization_header

def _authenticate_pool_manager(self, *args, **kwargs):
pool_manager = urllib3.PoolManager()
pool_manager.headers['Authorization'] = self._auth_header
return pool_manager

def clone(self, workspace, **kw):
pool_manager = self._authenticate_pool_manager()
out_stream = BytesIO()
err_stream = BytesIO()
porcelain.clone(
self.remote,
workspace.working_dir,
pool_manager=pool_manager,
outstream=out_stream,
errstream=err_stream
)
return out_stream.getvalue(), err_stream.getvalue(), 0

def pull(self, workspace, **kw):
pool_manager = self._authenticate_pool_manager()
target = self.read_remote(workspace)
out_stream = BytesIO()
err_stream = BytesIO()
try:
result = 0
porcelain.pull(
workspace.working_dir,
target.encode(),
pool_manager=pool_manager,
outstream=out_stream,
errstream=err_stream
)
except NotGitRepository as e:
result = 1
err_stream.write(b'Not a Git repository ' + target.encode())

return out_stream.getvalue(), err_stream.getvalue(), result

def push(self, workspace, **kw):
pool_manager = self._authenticate_pool_manager()
target = self.read_remote(workspace)
out_stream = BytesIO()
err_stream = BytesIO()
try:
result = 0
porcelain.push(
workspace.working_dir,
target.encode(),
pool_manager=pool_manager,
outstream=out_stream,
errstream=err_stream
)
except NotGitRepository as e:
result = 1
err_stream.write(b'Not a Git repository ' + target.encode())

return out_stream.getvalue(), err_stream.getvalue(), result


def _register():
register_cmd(MercurialDvcsCmd, DulwichDvcsCmd, GitDvcsCmd)
register_cmd(MercurialDvcsCmd, DulwichDvcsCmd, GitDvcsCmd, AuthenticatedDulwichDvcsCmd, AuthenticatedGitDvcsCmd)


register = _register
Expand Down
140 changes: 138 additions & 2 deletions tests/test_cmd.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from unittest import TestCase, skipIf
from unittest import TestCase, skipIf, skip

import os
import sys
Expand All @@ -12,7 +12,8 @@
from StringIO import StringIO

try:
from dulwich import porcelain
from dulwich import porcelain, client

except ImportError:
pass

Expand All @@ -21,6 +22,8 @@
from pmr2.wfctrl.cmd import GitDvcsCmd
from pmr2.wfctrl.cmd import MercurialDvcsCmd
from pmr2.wfctrl.cmd import DulwichDvcsCmd
from pmr2.wfctrl.cmd import AuthenticatedGitDvcsCmd
from pmr2.wfctrl.cmd import AuthenticatedDulwichDvcsCmd

from pmr2.wfctrl.testing.base import CoreTestCase
from pmr2.wfctrl.testing.base import CoreTests
Expand Down Expand Up @@ -344,3 +347,136 @@ def _ls_root(self, workspace=None):

def test_get_cmd_by_name(self):
self.assertEqual(get_cmd_by_name('dulwich'), self.cmdcls)


@skipIf(not AuthenticatedGitDvcsCmd.available(), 'git is not available')
class AuthenticatedGitDvcsCmdTestCase(GitDvcsCmdTestCase):
cmdcls = AuthenticatedGitDvcsCmd

def setUp(self):
super(AuthenticatedGitDvcsCmdTestCase, self).setUp()
self.cmd = AuthenticatedGitDvcsCmd()
self.workspace = CmdWorkspace(self.workspace_dir, self.cmd)

def _log(self, workspace=None):
return AuthenticatedGitDvcsCmd._execute(self.cmd._args(self.workspace, 'log'))

def _ls_root(self, workspace=None):
branch, _, *_ = self.cmd.execute(
*self.cmd._args(self.workspace, 'branch', '--show-current'))
branch = branch.strip()
return AuthenticatedGitDvcsCmd._execute(
self.cmd._args(self.workspace, 'ls-tree', branch))

def _make_remote(self):
target = os.path.join(self.working_dir, 'remote')
AuthenticatedGitDvcsCmd._execute(['init', target, '--bare'])
return target

def test_get_cmd_by_name(self):
self.assertEqual(get_cmd_by_name('authenticated_git'), self.cmdcls)

def test_push_url_with_creds(self):
credentials = 'Basic username:password'
cmd = self.TrapCmd(remote='http://example.com/')
cmd.set_authorization(credentials)
workspace = CmdWorkspace(self.workspace_dir, cmd)
cmd.push(workspace)
auth_header = cmd.execute(*cmd._args(workspace, 'config', '--get', 'http.extraHeader'))
self.assertEqual(f'Authorization: {credentials}', auth_header[0].decode().strip())

def test_pull_url_with_creds(self):
credentials = 'Basic username:password'
cmd = self.TrapCmd(remote='http://example.com/')
cmd.set_authorization(credentials)
workspace = CmdWorkspace(self.workspace_dir, cmd)
cmd.pull(workspace)
auth_header = cmd.execute(*cmd._args(workspace, 'config', '--get', 'http.extraHeader'))
self.assertEqual(f'Authorization: {credentials}', auth_header[0].decode().strip())

def test_clone(self):
credentials = 'Basic username:password'
cmd = self.TrapCmd(remote='http://example.com/')
cmd.set_authorization(credentials)
target = os.path.join(self.working_dir, 'new_target')
workspace = CmdWorkspace(target, cmd)
cmd.init_new(workspace)
result = cmd.clone(workspace)

self.assertTrue(isdir(join(target, self.marker)))
authorisation = f'http.extraHeader=Authorization: {credentials}'
self.assertIn(authorisation, result[0].decode())

@skip("Not applicable.")
def test_auto_init(self):
pass


class CustomLocalGitClient(client.LocalGitClient):
def __init__(self, pool_manager=None, *args, **kwargs):
super().__init__(*args, **kwargs)


class TrapDulwichCmd(AuthenticatedDulwichDvcsCmd):
def __init__(self, remote=None):
super(TrapDulwichCmd, self).__init__(remote)
self._pool_manager = None

def _authenticate_pool_manager(self, *args, **kwargs):
self._pool_manager = super(TrapDulwichCmd, self)._authenticate_pool_manager(*args, **kwargs)


@skipIf(not AuthenticatedDulwichDvcsCmd.available(), 'dulwich is not available')
class AuthenticatedDulwichDvcsCmdTestCase(DulwichDvcsCmdTestCase):
cmdcls = AuthenticatedDulwichDvcsCmd

def setUp(self):
super(DulwichDvcsCmdTestCase, self).setUp()
self.cmd = AuthenticatedDulwichDvcsCmd()
self.workspace = CmdWorkspace(self.workspace_dir, self.cmd)

# Temporarily override the local dulwich client.
self._default_client = client.default_local_git_client_cls
client.default_local_git_client_cls = CustomLocalGitClient

def tearDown(self):
# Reset the local dulwich client.
client.default_local_git_client_cls = self._default_client

def test_get_cmd_by_name(self):
self.assertEqual(get_cmd_by_name('authenticated_dulwich'), self.cmdcls)

def test_push_url_with_creds(self):
credentials = 'Basic username:password'
cmd = TrapDulwichCmd()
cmd.set_authorization(credentials)
workspace = CmdWorkspace(self.workspace_dir, cmd)
cmd.push(workspace)
auth_header = cmd._pool_manager.headers['Authorization']
self.assertEqual(credentials, auth_header)

def test_pull_url_with_creds(self):
credentials = 'Basic username:password'
cmd = TrapDulwichCmd()
cmd.set_authorization(credentials)
workspace = CmdWorkspace(self.workspace_dir, cmd)
cmd.pull(workspace)
auth_header = cmd._pool_manager.headers['Authorization']
self.assertEqual(credentials, auth_header)

def test_clone(self):
self.cmd.init_new(self.workspace)
target = os.path.join(self.working_dir, 'new_target')
workspace = CmdWorkspace(target)
credentials = 'Basic username:password'
cmd = TrapDulwichCmd(remote=self.workspace_dir)
cmd.set_authorization(credentials)
cmd.clone(workspace)

self.assertTrue(isdir(join(target, self.marker)))
auth_header = cmd._pool_manager.headers['Authorization']
self.assertEqual(credentials, auth_header)

@skip("Not applicable.")
def test_auto_init(self):
pass
Loading