Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gcp: respect permissions inherited from parent drives #563

Merged
merged 6 commits into from
Oct 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 133 additions & 65 deletions interface/gcp.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
"""Utility classes for interacting with Google APIs"""
from typing import Any, List
from typing import Any, List, Iterator
from googleapiclient.discovery import Resource
import logging

# Set to False to resolve https://github.com/ubclaunchpad/rocket2/issues/510
# temporarily. Longer-term fix is being tracked in the actual problem,
# https://github.com/ubclaunchpad/rocket2/issues/497
DELETE_OLD_DRIVE_PERMISSIONS = False

class GCPDrivePermission:
"""Represents a 'share' on a Google Drive item."""

def __init__(self, id: str, email: str):
self.id = id
self.email = email


class GCPInterface:
Expand All @@ -17,54 +20,123 @@ def __init__(self, drive_client: Resource, subject=None):
self.drive = drive_client
self.subject = subject

def set_drive_permissions(self,
team_name: str,
drive_id: str,
emails: List[str],
delete_permissions=DELETE_OLD_DRIVE_PERMISSIONS):
def get_drive_parents(self, drive_id: str) -> List[str]:
"""
Retrieves list of parents of the given Drive folder, returned as ID
strings.
"""
Create permissions for the given emails, and removes everyone not on
the list.
# pylint: disable=no-member
file = self.drive.files()\
.get(fileId=drive_id, fields="parents")\
.execute()
if 'parents' in file:
parents: List[str] = file['parents']
if len(parents) > 0:
return parents
return []

def get_drive_permissions(self, drive_id: str) -> List[GCPDrivePermission]:
"""
Retrieves list of permissions present on the given Drive item. It
pages through all results and returns a subset of permission fields,
as defined in :class:`GCPDrivePermission`.
"""
fields = [
"permissions/id",
"permissions/emailAddress",
]

In all cases of API errors, we log and continue, to try and get close
to the desired state of permissions.
def paginated_permissions() -> Iterator[Any]:
# See http://googleapis.github.io/google-api-python-client/docs/dyn/drive_v3.permissions.html#list # noqa
# pylint: disable=no-member
req = self.drive.permissions()\
.list(fileId=drive_id,
fields=', '.join(fields),
pageSize=50)
while req is not None:
resp = req.execute()
for perm in resp['permissions']:
yield perm
# see https://googleapis.github.io/google-api-python-client/docs/dyn/drive_v3.permissions.html#list_next # noqa
# pylint: disable=no-member
req = self.drive.permissions().list_next(req, resp)

# collect all permissions for this drive
perms: List[GCPDrivePermission] = []
page_count = 0
for p in paginated_permissions():
if 'emailAddress' in p:
perm = GCPDrivePermission(p['id'], p['emailAddress'])
perms.append(perm)
page_count += 1

logging.info(f"Found {len(perms)} permissions across {page_count} "
+ f"pages for {drive_id}")
return perms

def get_parents_permissions(self,
drive_id: str) -> List[GCPDrivePermission]:
"""
Retrieves list of permissions associated with one level of parents to
the given Drive.
"""
parents = self.get_drive_parents(drive_id)
perms: List[GCPDrivePermission] = []
for parent_id in parents:
parent_perms = self.get_drive_permissions(parent_id)
for p in parent_perms:
perms.append(p)
return perms

def ensure_drive_permissions(self,
team_name: str,
drive_id: str,
emails: List[str]):
"""
Create permissions for the given emails on the given Drive item, and
removes everyone not on the list, to ensure the state of shares on the
Drive item matches the email list provided.

It respects permissions inherited by one level of parents of the given
Drive item - permissions inherited from two levels of parents are at
risk of being deleted if the user is not on the provided email list.

In all cases of API errors, we log and continue, to try and get as
close to the desired state of permissions as possible.

:param team_name: name of the team for the drive permissions; serves
aesthetic purposes only
:param drive_id: id of the Google Drive object to share
:param emails: a list of emails to share with
:param bool delete_permissions: option whether we delete the old
permissions in favour of the new emails (previously added emails
would be removed if they aren't in the `emails` parameter)
"""
# Get parents so that we do not remove or duplicate inherited shares.
inherited: List[str] = [] # emails
try:
parents_perms = self.get_parents_permissions(drive_id)
inherited = [p.email for p in parents_perms]
except Exception as e:
logging.warning("Unable to fetch parents for drive item"
+ f"({team_name}, {drive_id}): {e}")

# List existing permissions - we use this to avoid duplicate
# permissions, and to delete ones that should not longer exist.
# See http://googleapis.github.io/google-api-python-client/docs/dyn/drive_v3.permissions.html#list # noqa
existing: List[str] = [] # emails
# Collect existing permissions and determine which emails to delete.
existing: List[str] = [] # emails
to_delete: List[str] = [] # permission IDs
try:
# pylint: disable=no-member
list_res = self.drive.permissions()\
.list(fileId=drive_id,
supportsAllDrives=True,
fields="permissions(id, emailAddress, role)")\
.execute()
permissions: List[Any] = list_res['permissions']
logging.info(
f'{team_name} drive currently shared with {permissions}')
for p in permissions:
if 'emailAddress' in p:
email: str = p['emailAddress']
if email in emails:
# track permission we do not need to recreate
existing.append(email)
elif email == self.subject:
# do not remove actor from shared
continue
else:
# delete unknown emails
to_delete.append(p['id'])
perms = self.get_drive_permissions(drive_id)
for p in perms:
if p.email in emails:
# keep shares that should exist
existing.append(p.email)
elif p.email == self.subject:
# do not remove actor from shared (actor needs permissions
# on this Drive item to perform a share)
continue
elif p.email in inherited:
# do not remove inherited permissions
continue
else:
# delete unknown permissions
to_delete.append(p.id)
except Exception as e:
logging.error("Failed to load permissions for drive item"
+ f"({team_name}, {drive_id}): {e}")
Expand All @@ -76,7 +148,8 @@ def set_drive_permissions(self,
# See http://googleapis.github.io/google-api-python-client/docs/dyn/drive_v3.permissions.html#create # noqa
created_shares = 0
for email in emails:
if email in existing:
# Do not re-share (causes email spam)
if email in existing or email in inherited:
continue

body = new_create_permission_body(email)
Expand All @@ -86,35 +159,30 @@ def set_drive_permissions(self,
.create(fileId=drive_id,
body=body,
emailMessage=new_share_message(team_name),
sendNotificationEmail=True,
supportsAllDrives=True)\
sendNotificationEmail=True)\
.execute()
created_shares += 1
except Exception as e:
logging.error("Failed to share drive item"
+ f"({team_name}, {drive_id}) with {email}: {e}")
logging.info(f"Created {created_shares} permissions for {team_name}")

# Delete old permissions
# Delete unknown permissions
# See http://googleapis.github.io/google-api-python-client/docs/dyn/drive_v3.permissions.html#delete # noqa
if delete_permissions is True:
deleted_shares = 0
for p_id in to_delete:
try:
self.drive.permissions()\
.delete(fileId=drive_id,
permissionId=p_id,
supportsAllDrives=True)\
.execute()
deleted_shares += 1
except Exception as e:
logging.error(
f'Failed to delete permission {p_id} for '
+ f'drive item ({team_name}, {drive_id}): {e}')
logging.info(
f'Deleted {deleted_shares} permissions for {team_name}')
else:
logging.info("DELETE_OLD_DRIVE_PERMISSIONS is set to false")
deleted_shares = 0
for p_id in to_delete:
try:
self.drive.permissions()\
.delete(fileId=drive_id,
permissionId=p_id)\
.execute()
deleted_shares += 1
except Exception as e:
logging.error(
f'Failed to delete permission {p_id} for '
+ f'drive item ({team_name}, {drive_id}): {e}')
logging.info(
f'Deleted {deleted_shares} permissions for {team_name}')
chuck-sys marked this conversation as resolved.
Show resolved Hide resolved


def new_share_message(team_name):
Expand All @@ -124,8 +192,8 @@ def new_share_message(team_name):
def new_create_permission_body(email):
return {
"emailAddress": email,
"photoLink": "https://github.com/ubclaunchpad/rocket2/blob/master/docs/rocket-logo.png?raw=true", # noqa
"role": "writer",
"type": "user",
"sendNotificationEmail": True,
"supportsAllDrives": True,
}
2 changes: 1 addition & 1 deletion interface/gcp_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,5 @@ def sync_team_email_perms(gcp: Optional[GCPInterface],
logging.info("Synchronizing permissions for "
+ f"{team.github_team_name}'s folder ({team.folder}) "
+ f"to {emails}")
gcp.set_drive_permissions(
gcp.ensure_drive_permissions(
team.github_team_name, team.folder, emails)
95 changes: 70 additions & 25 deletions tests/interface/gcp_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,56 +13,101 @@ def setUp(self):
self.gcp = GCPInterface(self.mock_drive,
subject="[email protected]")

def test_set_drive_permissions(self):
mock_list = mock.MagicMock()
mock_list.execute = mock.MagicMock(return_value={
def test_ensure_drive_permissions(self):
# Mocks for files
mock_files_get = mock.MagicMock()
mock_files_get.execute = mock.MagicMock(return_value={
"parents": [
"parent-drive",
]
})

mock_files = mock.MagicMock()
mock_files.get = mock.MagicMock(return_value=mock_files_get)

# Mocks for permissions
mock_perms_list_parent = mock.MagicMock()
mock_perms_list_parent.execute = mock.MagicMock(return_value={
"permissions": [
{
# should not be removed (inherited)
"id": "99",
"emailAddress": "[email protected]",
},
]
})
mock_perms_list_target = mock.MagicMock()
mock_perms_list_target.execute = mock.MagicMock(return_value={
"permissions": [
{
# should not be removed or created (exists in email list)
"id": "1",
"emailAddress": "[email protected]",
},
{
# should be removed (does not exist in email list)
"id": "2",
"emailAddress": "[email protected]",
},
{
# should not be removed
# should not be removed (actor)
"id": "3",
"emailAddress": "[email protected]"
}
"emailAddress": "[email protected]",
},
{
# should not be removed (inherited)
"id": "99",
"emailAddress": "[email protected]",
},
]
})
mock_perms_create = mock.MagicMock()
mock_perms_create.execute = mock.MagicMock(return_value={})
mock_perms_delete = mock.MagicMock()
mock_perms_delete.execute = mock.MagicMock(return_value={})

mock_create = mock.MagicMock()
mock_create.execute = mock.MagicMock(return_value={})

mock_delete = mock.MagicMock()
mock_delete.execute = mock.MagicMock(return_value={})
def perms_list_effect(**kwargs):
if kwargs['fileId'] == 'target-drive':
return mock_perms_list_target
if kwargs['fileId'] == 'parent-drive':
return mock_perms_list_parent

mock_perms = mock.MagicMock()
mock_perms.list = mock.MagicMock(return_value=mock_list)
mock_perms.create = mock.MagicMock(return_value=mock_create)
mock_perms.delete = mock.MagicMock(return_value=mock_delete)
mock_perms.list = mock.MagicMock(side_effect=perms_list_effect)
mock_perms.list_next = mock.MagicMock(return_value=None)
mock_perms.create = mock.MagicMock(return_value=mock_perms_create)
mock_perms.delete = mock.MagicMock(return_value=mock_perms_delete)

# Create Google Drive API
self.mock_drive.files = mock.MagicMock(return_value=mock_files)
self.mock_drive.permissions = mock.MagicMock(return_value=mock_perms)
self.gcp.set_drive_permissions('team', 'abcde', [
self.gcp.ensure_drive_permissions('team', 'target-drive', [
'[email protected]',
'[email protected]',
], delete_permissions=True)
])

# initial list
mock_perms.list.assert_called()
mock_list.execute.assert_called()
# initial parent search
mock_files.get.assert_called_with(fileId='target-drive',
fields=mock.ANY)
mock_files_get.execute.assert_called()
# perms listing
mock_perms.list.assert_has_calls([
mock.call(fileId='parent-drive',
fields=mock.ANY, pageSize=mock.ANY),
mock.call(fileId='target-drive',
fields=mock.ANY, pageSize=mock.ANY),
])
mock_perms_list_parent.execute.assert_called()
mock_perms_list_target.execute.assert_called()
# one email already exists, share to the new one
mock_perms.create\
.assert_called_with(fileId='abcde',
.assert_called_with(fileId='target-drive',
body=new_create_permission_body(
'[email protected]'),
emailMessage=new_share_message('team'),
sendNotificationEmail=True,
supportsAllDrives=True)
mock_create.execute.assert_called()
sendNotificationEmail=True)
mock_perms_create.execute.assert_called()
# one email should no longer be shared, it is removed
mock_perms.delete.assert_called_with(
fileId='abcde', permissionId='2', supportsAllDrives=True)
mock_delete.execute.assert_called()
fileId='target-drive', permissionId='2')
mock_perms_delete.execute.assert_called()