Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Get vault secret for smtp #1045

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions unskript-ctl/config/unskript_ctl_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -184,18 +184,21 @@ notification:
# - Sendgrid - Sendgrid
provider: ""
SMTP:
vault-secret-path: ""
smtp-host: ""
smtp-user: ""
smtp-password: ""
to-email: ""
from-email: ""
SES:
vault-secret-path: ""
access_key: ""
secret_access: ""
region: ""
to-email: ""
from-email: ""
Sendgrid:
vault-secret-path: ""
api_key: ""
to-email: ""
from-email: ""
Expand Down
81 changes: 81 additions & 0 deletions unskript-ctl/unskript_ctl_notification.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import smtplib
import os
import base64
import hvac

from pathlib import Path
from datetime import datetime
Expand Down Expand Up @@ -494,12 +495,61 @@ def prepare_combined_email(self,

return attachment

def fetch_secret_from_vault(self, secret_path: str = None):
vault_addr = os.environ.get('VAULT_ADDR', '')
vault_token = os.environ.get('VAULT_TOKEN', '')
use_ssl = os.environ.get('VAULT_USE_SSL', False)
if not secret_path:
return None

vault_client = hvac.Client(
url=vault_addr,
token=vault_token,
verify=use_ssl
)
secret_data = None
try:
# Retrieve the secret
response = vault_client.read(secret_path)
if response is None:
self.logger.debug(f"Secret not found at {secret_path}.")
return None
elif 'data' not in response:
self.logger.debug(f"No data found in the response for {secret_path}.")
return None
else:
secret_data = response['data']
if secret_data:
secret_data = json.loads(secret_data)
except hvac.exceptions.InvalidPath as e:
self.logger.debug(f"Invalid path: {e}")
return None
except hvac.exceptions.Forbidden as e:
self.logger.debug(f"Access denied: {e}")
return None
except hvac.exceptions.VaultError as e:
self.logger.debug(f"Vault error: {e}")
return None
except Exception as e:
self.logger.debug(f"Error: {e}")
return None


return secret_data


# Sendgrid specific implementation
class SendgridNotification(EmailNotification):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.sendgrid_config = self.email_config.get('Sendgrid')
# Check if vault_secret_path exist for sendgrid, if it does, use it
if 'vault-secret-path' in self.sendgrid_config:
self.logger.debug(f"Vault secret path is {self.sendgrid_config.get('vault-secret-path')}")
self.logger.debug("CURRENTLY NOT IMPLEMENTED")
else:
self.logger.debug("NO Vault path mentioned for secret, will use YAML file content to send notification")


def notify(self, **kwargs):
super().notify(**kwargs)
Expand Down Expand Up @@ -630,6 +680,13 @@ class AWSEmailNotification(EmailNotification):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.aws_config = self.email_config.get('SES')
# Check if vault_secret_path exist for SES, if it does, use it
if 'vault-secret-path' in self.aws_config:
self.logger.debug(f"Vault secret path is {self.aws_config.get('vault-secret-path')}")
self.logger.debug("CURRENTLY NOT IMPLEMENTED")
else:
self.logger.debug("NO Vault path mentioned for secret, will use YAML file content to send notification")


def notify(self, **kwargs):
super().notify(**kwargs)
Expand Down Expand Up @@ -753,6 +810,30 @@ def __init__(self, **kwargs):
super().__init__(**kwargs)
self.SMTP_TLS_PORT = 587
self.smtp_config = self.email_config.get('SMTP')
# Check if vault-secret-path mentioned for SMTP
if 'vault-secret-path' in self.smtp_config:
self.logger.debug(f"Vault secret path is {self.smtp_config.get('vault-secret-path')}")
vault_secret = self.fetch_secret_from_vault(self.smtp_config.get('vault-secret-path'))
if vault_secret:
smtp_host = vault_secret.get('value', {}).get('credentials', {}).get('smtpHost')
from_email = vault_secret.get('value', {}).get('credentials', {}).get('smtpSender')
smtp_user = vault_secret.get('value', {}).get('credentials', {}).get('smtpUsername')
smtp_password = vault_secret.get('value', {}).get('credentials', {}).get('smtpPassword')
smtp_port = vault_secret.get('value', {}).get('credentials', {}).get('smtpPort')

if not smtp_host:
self.smtp_config['smtp-host'] = smtp_host
if not from_email:
self.smtp_config['from-email'] = from_email
if not smtp_user:
self.smtp_config['smtp-user'] = smtp_user
if not smtp_password:
self.smtp_config['smtp-password'] = smtp_password
if not smtp_port:
self.SMTP_TLS_PORT = smtp_port

else:
self.logger.debug("NO Vault path mentioned for secret, will use YAML file content to send notification")

def notify(self, **kwargs):
super().notify(**kwargs)
Expand Down
Loading