diff --git a/dataduct/etl/etl_pipeline.py b/dataduct/etl/etl_pipeline.py index 66dd2ed..c67907c 100644 --- a/dataduct/etl/etl_pipeline.py +++ b/dataduct/etl/etl_pipeline.py @@ -193,7 +193,7 @@ def create_base_objects(self): else: self.sns = self.create_pipeline_object( object_class=SNSAlarm, - topic_arn=self.topic_arn, + topic_arn=self.topic_arn.replace('all:',''), pipeline_name=self.name, ) if self.frequency == 'on-demand': @@ -481,10 +481,12 @@ def create_steps(self, steps_params, is_bootstrap=False, 'input_path' not in step_param: step_param['input_node'] = input_node - # if is_teardown: - ## Instead of just teardown set sns for every step so as to get SNS alerts with error stack trace if hasattr(self.sns,'fields'): - step_param['sns_object'] = self.sns + if self.topic_arn.startswith("all:"): + ## Instead of just teardown set sns for every step so as to get SNS alerts with error stack trace + step_param['sns_object'] = self.sns + elif is_teardown: + step_param['sns_object'] = self.sns try: step_class = step_param.pop('step_class') diff --git a/dataduct/pipeline/sns_alarm.py b/dataduct/pipeline/sns_alarm.py index e7dfa8a..95fe464 100644 --- a/dataduct/pipeline/sns_alarm.py +++ b/dataduct/pipeline/sns_alarm.py @@ -42,7 +42,7 @@ def __init__(self, 'Error Stack Trace: #{node.errorStackTrace}' ]) - subject = 'Data Pipeline %s failed' % pipeline_name + subject = 'Data Pipeline %s #{node.@status}' % pipeline_name if topic_arn is None: topic_arn = SNS_TOPIC_ARN_FAILURE