Skip to content

Commit

Permalink
gcp: respect drive parents permissions
Browse files Browse the repository at this point in the history
  • Loading branch information
bobheadxi committed Oct 16, 2020
1 parent 866fb07 commit 140c578
Show file tree
Hide file tree
Showing 3 changed files with 190 additions and 90 deletions.
183 changes: 119 additions & 64 deletions interface/gcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
from googleapiclient.discovery import Resource
import logging

# Set to False to resolve https://github.com/ubclaunchpad/rocket2/issues/510
# temporarily. Longer-term fix is being tracked in the actual problem,
# https://github.com/ubclaunchpad/rocket2/issues/497
DELETE_OLD_DRIVE_PERMISSIONS = False

class GCPDrivePermission:
"""Represents a 'share' on a Google Drive item."""

def __init__(self, id: str, email: str):
self.id = id
self.email = email


class GCPInterface:
Expand All @@ -17,54 +20,110 @@ def __init__(self, drive_client: Resource, subject=None):
self.drive = drive_client
self.subject = subject

def set_drive_permissions(self,
team_name: str,
drive_id: str,
emails: List[str],
delete_permissions=DELETE_OLD_DRIVE_PERMISSIONS):
def get_drive_parents(self, drive_id: str) -> List[str]:
"""
Retrieves list of parents of the given Drive folder, returned as ID
strings.
"""
# pylint: disable=no-member
file = self.drive.files()\
.get(fileId=drive_id, fields="parents")\
.execute()
if 'parents' in file:
parents: List[str] = file['parents']
if len(parents) > 0:
return parents
return []

def get_drive_permissions(self, drive_id: str) -> List[GCPDrivePermission]:
"""
Retrieves list of permissions present on the given Drive item. It
pages through all results and returns a subset of permission fields,
as defined in `GCPPermission`.
"""
perms: List[GCPDrivePermission] = []
page_count = 0
fields = [
"permissions/id",
"permissions/emailAddress",
]
# See http://googleapis.github.io/google-api-python-client/docs/dyn/drive_v3.permissions.html#list # noqa
# pylint: disable=no-member
next_page_req = self.drive.permissions()\
.list(fileId=drive_id,
fields=", ".join(fields),
pageSize=50)
while next_page_req is not None:
list_res = next_page_req.execute()
permissions: List[Any] = list_res['permissions']
for p in permissions:
if 'emailAddress' in p:
perm = GCPDrivePermission(p['id'], p['emailAddress'])
perms.append(perm)

# set state for the next page
# see https://googleapis.github.io/google-api-python-client/docs/dyn/drive_v3.permissions.html#list_next # noqa
# pylint: disable=no-member
next_page_req = self.drive.permissions()\
.list_next(next_page_req, list_res)
page_count += 1

logging.info(f"Found {len(perms)} permissions across {page_count} "
+ f"pages for {drive_id}")
return perms

def ensure_drive_permissions(self,
team_name: str,
drive_id: str,
emails: List[str]):
"""
Create permissions for the given emails, and removes everyone not on
the list.
Create permissions for the given emails on the given Drive item, and
removes everyone not on the list, to ensure the state of shares on the
Drive item matches the emai list provided.
In all cases of API errors, we log and continue, to try and get close
to the desired state of permissions.
It respects permissions inherited by one level of parents of the given
Drive item - permissions inherited from two levels of parents are at
risk of being deleted it the user is not on the provided email list.
In all cases of API errors, we log and continue, to try and get as
close to the desired state of permissions as possible.
:param team_name: name of the team for the drive permissions; serves
aesthetic purposes only
:param drive_id: id of the Google Drive object to share
:param emails: a list of emails to share with
:param bool delete_permissions: option whether we delete the old
permissions in favour of the new emails (previously added emails
would be removed if they aren't in the `emails` parameter)
"""
# Get parents so that we do not remove or duplicate inherited shares.
inherited: List[str] = [] # emails
try:
parents = self.get_drive_parents(drive_id)
for parent_id in parents:
perms = self.get_drive_permissions(parent_id)
for p in perms:
inherited.append(p.email)
except Exception as e:
logging.warn("Unable to fetch parents for drive item"
+ f"({team_name}, {drive_id}): {e}")

# List existing permissions - we use this to avoid duplicate
# permissions, and to delete ones that should not longer exist.
# See http://googleapis.github.io/google-api-python-client/docs/dyn/drive_v3.permissions.html#list # noqa
existing: List[str] = [] # emails
# Collect existing permissions and determine which emails to delete.
existing: List[str] = [] # emails
to_delete: List[str] = [] # permission IDs
try:
# pylint: disable=no-member
list_res = self.drive.permissions()\
.list(fileId=drive_id,
supportsAllDrives=True,
fields="permissions(id, emailAddress, role)")\
.execute()
permissions: List[Any] = list_res['permissions']
logging.info(
f'{team_name} drive currently shared with {permissions}')
for p in permissions:
if 'emailAddress' in p:
email: str = p['emailAddress']
if email in emails:
# track permission we do not need to recreate
existing.append(email)
elif email == self.subject:
# do not remove actor from shared
continue
else:
# delete unknown emails
to_delete.append(p['id'])
perms = self.get_drive_permissions(drive_id)
for p in perms:
if p.email in emails:
# keep shares that should exist
existing.append(p.email)
elif p.email == self.subject:
# do not remove actor from shared (actor needs permissions
# on this Drive item to perform a share)
continue
elif p.email in inherited:
# do not remove inherited permissions
continue
else:
# delete unknown permissions
to_delete.append(p.id)
except Exception as e:
logging.error("Failed to load permissions for drive item"
+ f"({team_name}, {drive_id}): {e}")
Expand All @@ -76,7 +135,8 @@ def set_drive_permissions(self,
# See http://googleapis.github.io/google-api-python-client/docs/dyn/drive_v3.permissions.html#create # noqa
created_shares = 0
for email in emails:
if email in existing:
# Do not re-share (causes email spam)
if email in existing or email in inherited:
continue

body = new_create_permission_body(email)
Expand All @@ -86,35 +146,30 @@ def set_drive_permissions(self,
.create(fileId=drive_id,
body=body,
emailMessage=new_share_message(team_name),
sendNotificationEmail=True,
supportsAllDrives=True)\
sendNotificationEmail=True)\
.execute()
created_shares += 1
except Exception as e:
logging.error("Failed to share drive item"
+ f"({team_name}, {drive_id}) with {email}: {e}")
logging.info(f"Created {created_shares} permissions for {team_name}")

# Delete old permissions
# Delete unknown permissions
# See http://googleapis.github.io/google-api-python-client/docs/dyn/drive_v3.permissions.html#delete # noqa
if delete_permissions is True:
deleted_shares = 0
for p_id in to_delete:
try:
self.drive.permissions()\
.delete(fileId=drive_id,
permissionId=p_id,
supportsAllDrives=True)\
.execute()
deleted_shares += 1
except Exception as e:
logging.error(
f'Failed to delete permission {p_id} for '
+ f'drive item ({team_name}, {drive_id}): {e}')
logging.info(
f'Deleted {deleted_shares} permissions for {team_name}')
else:
logging.info("DELETE_OLD_DRIVE_PERMISSIONS is set to false")
deleted_shares = 0
for p_id in to_delete:
try:
self.drive.permissions()\
.delete(fileId=drive_id,
permissionId=p_id)\
.execute()
deleted_shares += 1
except Exception as e:
logging.error(
f'Failed to delete permission {p_id} for '
+ f'drive item ({team_name}, {drive_id}): {e}')
logging.info(
f'Deleted {deleted_shares} permissions for {team_name}')


def new_share_message(team_name):
Expand All @@ -124,8 +179,8 @@ def new_share_message(team_name):
def new_create_permission_body(email):
return {
"emailAddress": email,
"photoLink": "https://github.com/ubclaunchpad/rocket2/blob/master/docs/rocket-logo.png?raw=true", # noqa
"role": "writer",
"type": "user",
"sendNotificationEmail": True,
"supportsAllDrives": True,
}
2 changes: 1 addition & 1 deletion interface/gcp_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,5 +66,5 @@ def sync_team_email_perms(gcp: Optional[GCPInterface],
logging.info("Synchronizing permissions for "
+ f"{team.github_team_name}'s folder ({team.folder}) "
+ f"to {emails}")
gcp.set_drive_permissions(
gcp.ensure_drive_permissions(
team.github_team_name, team.folder, emails)
95 changes: 70 additions & 25 deletions tests/interface/gcp_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,56 +13,101 @@ def setUp(self):
self.gcp = GCPInterface(self.mock_drive,
subject="[email protected]")

def test_set_drive_permissions(self):
mock_list = mock.MagicMock()
mock_list.execute = mock.MagicMock(return_value={
def test_ensure_drive_permissions(self):
# Mocks for files
mock_files_get = mock.MagicMock()
mock_files_get.execute = mock.MagicMock(return_value={
"parents": [
"parent-drive",
]
})

mock_files = mock.MagicMock()
mock_files.get = mock.MagicMock(return_value=mock_files_get)

# Mocks for permissions
mock_perms_list_parent = mock.MagicMock()
mock_perms_list_parent.execute = mock.MagicMock(return_value={
"permissions": [
{
# should not be removed (inherited)
"id": "99",
"emailAddress": "[email protected]",
},
]
})
mock_perms_list_target = mock.MagicMock()
mock_perms_list_target.execute = mock.MagicMock(return_value={
"permissions": [
{
# should not be removed or created (exists in email list)
"id": "1",
"emailAddress": "[email protected]",
},
{
# should be removed (does not exist in email list)
"id": "2",
"emailAddress": "[email protected]",
},
{
# should not be removed
# should not be removed (actor)
"id": "3",
"emailAddress": "[email protected]"
}
"emailAddress": "[email protected]",
},
{
# should not be removed (inherited)
"id": "99",
"emailAddress": "[email protected]",
},
]
})
mock_perms_create = mock.MagicMock()
mock_perms_create.execute = mock.MagicMock(return_value={})
mock_perms_delete = mock.MagicMock()
mock_perms_delete.execute = mock.MagicMock(return_value={})

mock_create = mock.MagicMock()
mock_create.execute = mock.MagicMock(return_value={})

mock_delete = mock.MagicMock()
mock_delete.execute = mock.MagicMock(return_value={})
def perms_list_effect(**kwargs):
if kwargs['fileId'] == 'target-drive':
return mock_perms_list_target
if kwargs['fileId'] == 'parent-drive':
return mock_perms_list_parent

mock_perms = mock.MagicMock()
mock_perms.list = mock.MagicMock(return_value=mock_list)
mock_perms.create = mock.MagicMock(return_value=mock_create)
mock_perms.delete = mock.MagicMock(return_value=mock_delete)
mock_perms.list = mock.MagicMock(side_effect=perms_list_effect)
mock_perms.list_next = mock.MagicMock(return_value=None)
mock_perms.create = mock.MagicMock(return_value=mock_perms_create)
mock_perms.delete = mock.MagicMock(return_value=mock_perms_delete)

# Create Google Drive API
self.mock_drive.files = mock.MagicMock(return_value=mock_files)
self.mock_drive.permissions = mock.MagicMock(return_value=mock_perms)
self.gcp.set_drive_permissions('team', 'abcde', [
self.gcp.ensure_drive_permissions('team', 'target-drive', [
'[email protected]',
'[email protected]',
], delete_permissions=True)
])

# initial list
mock_perms.list.assert_called()
mock_list.execute.assert_called()
# initial parent search
mock_files.get.assert_called_with(fileId='target-drive',
fields=mock.ANY)
mock_files_get.execute.assert_called()
# perms listing
mock_perms.list.assert_has_calls([
mock.call(fileId='parent-drive',
fields=mock.ANY, pageSize=mock.ANY),
mock.call(fileId='target-drive',
fields=mock.ANY, pageSize=mock.ANY),
])
mock_perms_list_parent.execute.assert_called()
mock_perms_list_target.execute.assert_called()
# one email already exists, share to the new one
mock_perms.create\
.assert_called_with(fileId='abcde',
.assert_called_with(fileId='target-drive',
body=new_create_permission_body(
'[email protected]'),
emailMessage=new_share_message('team'),
sendNotificationEmail=True,
supportsAllDrives=True)
mock_create.execute.assert_called()
sendNotificationEmail=True)
mock_perms_create.execute.assert_called()
# one email should no longer be shared, it is removed
mock_perms.delete.assert_called_with(
fileId='abcde', permissionId='2', supportsAllDrives=True)
mock_delete.execute.assert_called()
fileId='target-drive', permissionId='2')
mock_perms_delete.execute.assert_called()

0 comments on commit 140c578

Please sign in to comment.