From ca26d1685adf9f7bed42f6bef6f77ce9b0f96d86 Mon Sep 17 00:00:00 2001 From: Tom Bocklisch Date: Fri, 27 Oct 2023 13:10:26 +0200 Subject: [PATCH] refactored and added tests to policy --- rasa/core/policies/flow_policy.py | 610 +----------------- rasa/core/policies/flows/__init__.py | 0 rasa/core/policies/flows/flow_exceptions.py | 31 + rasa/core/policies/flows/flow_executor.py | 496 ++++++++++++++ rasa/core/policies/flows/flow_step_result.py | 40 ++ .../generator/llm_command_generator.py | 2 +- .../stack/dialogue_stack.py | 4 + rasa/dialogue_understanding/stack/utils.py | 7 +- rasa/shared/core/flows/flow.py | 157 ----- tests/core/policies/flows/__init__.py | 0 .../policies/flows/test_flow_exceptions.py | 19 + .../core/policies/flows/test_flow_executor.py | 597 +++++++++++++++++ tests/core/policies/test_flow_policy.py | 237 +------ 13 files changed, 1212 insertions(+), 988 deletions(-) create mode 100644 rasa/core/policies/flows/__init__.py create mode 100644 rasa/core/policies/flows/flow_exceptions.py create mode 100644 rasa/core/policies/flows/flow_executor.py create mode 100644 rasa/core/policies/flows/flow_step_result.py create mode 100644 tests/core/policies/flows/__init__.py create mode 100644 tests/core/policies/flows/test_flow_exceptions.py create mode 100644 tests/core/policies/flows/test_flow_executor.py diff --git a/rasa/core/policies/flow_policy.py b/rasa/core/policies/flow_policy.py index fcd282e2233e..9d23638236aa 100644 --- a/rasa/core/policies/flow_policy.py +++ b/rasa/core/policies/flow_policy.py @@ -1,12 +1,9 @@ from __future__ import annotations -from dataclasses import dataclass from typing import Any, Dict, Text, List, Optional +from rasa.core.policies.flows import flow_executor -from jinja2 import Template -from structlog.contextvars import ( - bound_contextvars, -) +from rasa.core.policies.flows.flow_exceptions import FlowCircuitBreakerTrippedException from rasa.dialogue_understanding.patterns.internal_error import ( InternalErrorPatternFlowStackFrame, ) @@ -14,21 +11,9 @@ from rasa.dialogue_understanding.stack.frames import ( BaseFlowStackFrame, DialogueStackFrame, - UserFlowStackFrame, -) -from rasa.dialogue_understanding.patterns.collect_information import ( - CollectInformationPatternFlowStackFrame, -) -from rasa.dialogue_understanding.patterns.completed import ( - CompletedPatternFlowStackFrame, ) -from rasa.dialogue_understanding.patterns.continue_interrupted import ( - ContinueInterruptedPatternFlowStackFrame, -) -from rasa.dialogue_understanding.stack.frames.flow_stack_frame import FlowStackFrameType from rasa.dialogue_understanding.stack.utils import ( end_top_user_flow, - top_user_flow_frame, ) from rasa.core.constants import ( @@ -36,33 +21,10 @@ POLICY_MAX_HISTORY, POLICY_PRIORITY, ) -from pypred import Predicate -from rasa.shared.constants import FLOW_PREFIX -from rasa.shared.core.constants import ( - ACTION_LISTEN_NAME, - ACTION_SEND_TEXT_NAME, -) -from rasa.shared.core.events import Event, SlotSet +from rasa.shared.core.events import Event from rasa.shared.core.flows.flow import ( - END_STEP, - ActionFlowStep, - BranchFlowStep, - ContinueFlowStep, - ElseFlowLink, - EndFlowStep, - Flow, - FlowStep, FlowsList, - GenerateResponseFlowStep, - IfFlowLink, - SlotRejection, - StepThatCanStartAFlow, - UserMessageStep, - LinkFlowStep, - SetSlotsFlowStep, - CollectInformationFlowStep, - StaticFlowLink, ) from rasa.core.featurizers.tracker_featurizers import TrackerFeaturizer from rasa.core.policies.policy import Policy, PolicyPrediction @@ -77,38 +39,8 @@ ) import structlog -from rasa.shared.exceptions import RasaException - structlogger = structlog.get_logger() -MAX_NUMBER_OF_STEPS = 250 - - -class FlowException(RasaException): - """Exception that is raised when there is a problem with a flow.""" - - pass - - -class FlowCircuitBreakerTrippedException(FlowException): - """Exception that is raised when there is a problem with a flow.""" - - def __init__( - self, dialogue_stack: DialogueStack, number_of_steps_taken: int - ) -> None: - """Creates a `FlowCircuitBreakerTrippedException`. - - Args: - dialogue_stack: The dialogue stack. - number_of_steps_taken: The number of steps that were taken. - """ - super().__init__( - f"Flow circuit breaker tripped after {number_of_steps_taken} steps. " - "There appears to be an infinite loop in the flows." - ) - self.dialogue_stack = dialogue_stack - self.number_of_steps_taken = number_of_steps_taken - @DefaultV1Recipe.register( DefaultV1Recipe.ComponentType.POLICY_WITHOUT_END_TO_END_SUPPORT, is_trainable=False @@ -203,11 +135,10 @@ def predict_action_probabilities( return self._prediction(self._default_predictions(domain)) flows = flows or FlowsList([]) - executor = FlowExecutor.from_tracker(tracker, flows, domain) # create executor and predict next action try: - prediction = executor.advance_flows(tracker) + prediction = flow_executor.advance_flows(tracker, domain, flows) return self._create_prediction_result( prediction.action_name, domain, @@ -226,15 +157,18 @@ def predict_action_probabilities( ), ) # end the current flow and start the internal error flow - end_top_user_flow(executor.dialogue_stack) - executor.dialogue_stack.push(InternalErrorPatternFlowStackFrame()) + updated_stack = end_top_user_flow(DialogueStack.from_tracker(tracker)) + updated_stack.push(InternalErrorPatternFlowStackFrame()) # we retry, with the internal error frame on the stack - prediction = executor.advance_flows(tracker) + event = updated_stack.persist_as_event() + tracker.update(event) + prediction = flow_executor.advance_flows(tracker, domain, flows) + collected_events = [event] + (prediction.events or []) return self._create_prediction_result( prediction.action_name, domain, prediction.score, - prediction.events, + collected_events, prediction.metadata, ) @@ -262,525 +196,3 @@ def _create_prediction_result( return self._prediction( result, optional_events=events, action_metadata=action_metadata ) - - -@dataclass -class ActionPrediction: - """Represents an action prediction.""" - - action_name: Optional[Text] - """The name of the predicted action.""" - score: float - """The score of the predicted action.""" - metadata: Optional[Dict[Text, Any]] = None - """The metadata of the predicted action.""" - events: Optional[List[Event]] = None - """The events attached to the predicted action.""" - - -class FlowExecutor: - """Executes a flow.""" - - def __init__( - self, dialogue_stack: DialogueStack, all_flows: FlowsList, domain: Domain - ) -> None: - """Initializes the `FlowExecutor`. - - Args: - dialogue_stack: State of the flow. - all_flows: All flows. - domain: The domain. - """ - self.dialogue_stack = dialogue_stack - self.all_flows = all_flows - self.domain = domain - - @staticmethod - def from_tracker( - tracker: DialogueStateTracker, flows: FlowsList, domain: Domain - ) -> FlowExecutor: - """Creates a `FlowExecutor` from a tracker. - - Args: - tracker: The tracker to create the `FlowExecutor` from. - flows: The flows to use. - domain: The domain to use. - - Returns: - The created `FlowExecutor`. - """ - dialogue_stack = DialogueStack.from_tracker(tracker) - return FlowExecutor(dialogue_stack, flows or FlowsList([]), domain) - - def find_startable_flow(self, tracker: DialogueStateTracker) -> Optional[Flow]: - """Finds a flow which can be started. - - Args: - tracker: The tracker containing the conversation history up to now. - - Returns: - The predicted action and the events to run. - """ - if ( - not tracker.latest_message - or tracker.latest_action_name != ACTION_LISTEN_NAME - ): - # flows can only be started automatically as a response to a user message - return None - - for flow in self.all_flows.underlying_flows: - first_step = flow.first_step_in_flow() - if not first_step or not isinstance(first_step, StepThatCanStartAFlow): - continue - - if first_step.is_triggered(tracker): - return flow - return None - - def is_condition_satisfied( - self, predicate: Text, tracker: "DialogueStateTracker" - ) -> bool: - """Evaluate a predicate condition.""" - - # attach context to the predicate evaluation to allow conditions using it - context = {"context": self.dialogue_stack.current_context()} - document: Dict[str, Any] = context.copy() - for slot in self.domain.slots: - document[slot.name] = tracker.get_slot(slot.name) - p = Predicate(self.render_template_variables(predicate, context)) - try: - return p.evaluate(document) - except (TypeError, Exception) as e: - structlogger.error( - "flow.predicate.error", - predicate=predicate, - document=document, - error=str(e), - ) - return False - - def _select_next_step_id( - self, current: FlowStep, tracker: "DialogueStateTracker" - ) -> Optional[Text]: - """Selects the next step id based on the current step.""" - next = current.next - if len(next.links) == 1 and isinstance(next.links[0], StaticFlowLink): - return next.links[0].target - - # evaluate if conditions - for link in next.links: - if isinstance(link, IfFlowLink) and link.condition: - if self.is_condition_satisfied(link.condition, tracker): - return link.target - - # evaluate else condition - for link in next.links: - if isinstance(link, ElseFlowLink): - return link.target - - if next.links: - structlogger.error( - "flow.link.failed_to_select_branch", - current=current, - links=next.links, - tracker=tracker, - ) - return None - if current.id == END_STEP: - # we are already at the very end of the flow. There is no next step. - return None - elif isinstance(current, LinkFlowStep): - # link steps don't have a next step, so we'll return the end step - return END_STEP - else: - structlogger.error( - "flow.step.failed_to_select_next_step", - step=current, - tracker=tracker, - ) - return None - - def _select_next_step( - self, - tracker: "DialogueStateTracker", - current_step: FlowStep, - flow: Flow, - ) -> Optional[FlowStep]: - """Get the next step to execute.""" - next_id = self._select_next_step_id(current_step, tracker) - step = flow.step_by_id(next_id) - structlogger.debug( - "flow.step.next", - next_id=step.id if step else None, - current_id=current_step.id, - flow_id=flow.id, - ) - return step - - @staticmethod - def render_template_variables(text: str, context: Dict[Text, Any]) -> str: - """Replace context variables in a text.""" - return Template(text).render(context) - - def _is_step_completed( - self, step: FlowStep, tracker: "DialogueStateTracker" - ) -> bool: - """Check if a step is completed.""" - if isinstance(step, CollectInformationFlowStep): - return tracker.get_slot(step.collect) is not None - else: - return True - - def consider_flow_switch(self, tracker: DialogueStateTracker) -> ActionPrediction: - """Consider switching to a new flow. - - Args: - tracker: The tracker to get the next action for. - - Returns: - The predicted action and the events to run. - """ - if new_flow := self.find_startable_flow(tracker): - # there are flows available, but we are not in a flow - # it looks like we can start a flow, so we'll predict the trigger action - structlogger.debug("flow.startable", flow_id=new_flow.id) - return ActionPrediction(FLOW_PREFIX + new_flow.id, 1.0) - else: - structlogger.debug("flow.nostartable") - return ActionPrediction(None, 0.0) - - def advance_flows(self, tracker: DialogueStateTracker) -> ActionPrediction: - """Advance the flows. - - Either start a new flow or advance the current flow. - - Args: - tracker: The tracker to get the next action for. - - Returns: - The predicted action and the events to run. - """ - prediction = self.consider_flow_switch(tracker) - - if prediction.action_name: - # if a flow can be started, we'll start it - return prediction - if self.dialogue_stack.is_empty(): - # if there are no flows, there is nothing to do - return ActionPrediction(None, 0.0) - else: - previous_stack = DialogueStack.get_persisted_stack(tracker) - prediction = self.select_next_action(tracker) - if previous_stack != self.dialogue_stack.as_dict(): - # we need to update dialogue stack to persist the state of the executor - if not prediction.events: - prediction.events = [] - prediction.events.append(self.dialogue_stack.persist_as_event()) - return prediction - - def select_next_action( - self, - tracker: DialogueStateTracker, - ) -> ActionPrediction: - """Select the next action to execute. - - Advances the current flow and returns the next action to execute. A flow - is advanced until it is completed or until it predicts an action. If - the flow is completed, the next flow is popped from the stack and - advanced. If there are no more flows, the action listen is predicted. - - Args: - tracker: The tracker to get the next action for. - - Returns: - The next action to execute, the events that should be applied to the - tracker and the confidence of the prediction. - """ - step_result: FlowStepResult = ContinueFlowWithNextStep() - - tracker = tracker.copy() - - number_of_initial_events = len(tracker.events) - - number_of_steps_taken = 0 - - while isinstance(step_result, ContinueFlowWithNextStep): - - number_of_steps_taken += 1 - if number_of_steps_taken > MAX_NUMBER_OF_STEPS: - raise FlowCircuitBreakerTrippedException( - self.dialogue_stack, number_of_steps_taken - ) - - active_frame = self.dialogue_stack.top() - if not isinstance(active_frame, BaseFlowStackFrame): - # If there is no current flow, we assume that all flows are done - # and there is nothing to do. The assumption here is that every - # flow ends with an action listen. - step_result = PauseFlowReturnPrediction( - ActionPrediction(ACTION_LISTEN_NAME, 1.0) - ) - else: - with bound_contextvars(flow_id=active_frame.flow_id): - structlogger.debug( - "flow.execution.loop", previous_step_id=active_frame.step_id - ) - current_flow = active_frame.flow(self.all_flows) - current_step = self._select_next_step( - tracker, active_frame.step(self.all_flows), current_flow - ) - - if current_step: - self._advance_top_flow_on_stack(current_step.id) - - with bound_contextvars(step_id=current_step.id): - step_result = self.run_step( - current_flow, current_step, tracker - ) - tracker.update_with_events(step_result.events, self.domain) - - gathered_events = list(tracker.events)[number_of_initial_events:] - if isinstance(step_result, PauseFlowReturnPrediction): - prediction = step_result.action_prediction - # make sure we really return all events that got created during the - # step execution of all steps (not only the last one) - prediction.events = gathered_events - return prediction - else: - structlogger.warning("flow.step.execution.no_action") - return ActionPrediction(None, 0.0) - - def _advance_top_flow_on_stack(self, updated_id: str) -> None: - if (top := self.dialogue_stack.top()) and isinstance(top, BaseFlowStackFrame): - top.step_id = updated_id - - def _reset_scoped_slots( - self, current_flow: Flow, tracker: DialogueStateTracker - ) -> List[Event]: - """Reset all scoped slots.""" - - def _reset_slot( - slot_name: Text, dialogue_tracker: DialogueStateTracker - ) -> None: - slot = dialogue_tracker.slots.get(slot_name, None) - initial_value = slot.initial_value if slot else None - events.append(SlotSet(slot_name, initial_value)) - - events: List[Event] = [] - - not_resettable_slot_names = set() - - for step in current_flow.steps: - if isinstance(step, CollectInformationFlowStep): - # reset all slots scoped to the flow - if step.reset_after_flow_ends: - _reset_slot(step.collect, tracker) - else: - not_resettable_slot_names.add(step.collect) - - # slots set by the set slots step should be reset after the flow ends - # unless they are also used in a collect step where `reset_after_flow_ends` - # is set to `False` - resettable_set_slots = [ - slot["key"] - for step in current_flow.steps - if isinstance(step, SetSlotsFlowStep) - for slot in step.slots - if slot["key"] not in not_resettable_slot_names - ] - - for name in resettable_set_slots: - _reset_slot(name, tracker) - - return events - - def run_step( - self, - flow: Flow, - step: FlowStep, - tracker: DialogueStateTracker, - ) -> FlowStepResult: - """Run a single step of a flow. - - Returns the predicted action and a list of events that were generated - during the step. The predicted action can be `None` if the step - doesn't generate an action. The list of events can be empty if the - step doesn't generate any events. - - Raises a `FlowException` if the step is invalid. - - Args: - flow: The flow that the step belongs to. - step: The step to run. - tracker: The tracker to run the step on. - - Returns: - A result of running the step describing where to transition to. - """ - if isinstance(step, CollectInformationFlowStep): - structlogger.debug("flow.step.run.collect") - self.trigger_pattern_ask_collect_information( - step.collect, step.rejections, step.utter - ) - - # reset the slot if its already filled and the collect information shouldn't - # be skipped - slot = tracker.slots.get(step.collect, None) - - if slot and slot.has_been_set and step.ask_before_filling: - events = [SlotSet(step.collect, slot.initial_value)] - else: - events = [] - - return ContinueFlowWithNextStep(events=events) - - elif isinstance(step, ActionFlowStep): - if not step.action: - raise FlowException(f"Action not specified for step {step}") - - context = {"context": self.dialogue_stack.current_context()} - action_name = self.render_template_variables(step.action, context) - - if action_name in self.domain.action_names_or_texts: - structlogger.debug("flow.step.run.action", context=context) - return PauseFlowReturnPrediction(ActionPrediction(action_name, 1.0)) - else: - structlogger.warning("flow.step.run.action.unknown", action=action_name) - return ContinueFlowWithNextStep() - - elif isinstance(step, LinkFlowStep): - structlogger.debug("flow.step.run.link") - self.dialogue_stack.push( - UserFlowStackFrame( - flow_id=step.link, - frame_type=FlowStackFrameType.LINK, - ), - # push this below the current stack frame so that we can - # complete the current flow first and then continue with the - # linked flow - index=-1, - ) - return ContinueFlowWithNextStep() - - elif isinstance(step, SetSlotsFlowStep): - structlogger.debug("flow.step.run.slot") - return ContinueFlowWithNextStep( - events=[SlotSet(slot["key"], slot["value"]) for slot in step.slots], - ) - - elif isinstance(step, UserMessageStep): - structlogger.debug("flow.step.run.user_message") - return ContinueFlowWithNextStep() - - elif isinstance(step, BranchFlowStep): - structlogger.debug("flow.step.run.branch") - return ContinueFlowWithNextStep() - - elif isinstance(step, GenerateResponseFlowStep): - structlogger.debug("flow.step.run.generate_response") - generated = step.generate(tracker) - return PauseFlowReturnPrediction( - ActionPrediction( - ACTION_SEND_TEXT_NAME, - 1.0, - metadata={"message": {"text": generated}}, - ) - ) - - elif isinstance(step, EndFlowStep): - # this is the end of the flow, so we'll pop it from the stack - structlogger.debug("flow.step.run.flow_end") - current_frame = self.dialogue_stack.pop() - self.trigger_pattern_continue_interrupted(current_frame) - self.trigger_pattern_completed(current_frame) - reset_events = self._reset_scoped_slots(flow, tracker) - return ContinueFlowWithNextStep(events=reset_events) - - else: - raise FlowException(f"Unknown flow step type {type(step)}") - - def trigger_pattern_continue_interrupted( - self, current_frame: DialogueStackFrame - ) -> None: - """Trigger the pattern to continue an interrupted flow if needed.""" - # get previously started user flow that will be continued - previous_user_flow_frame = top_user_flow_frame(self.dialogue_stack) - previous_user_flow_step = ( - previous_user_flow_frame.step(self.all_flows) - if previous_user_flow_frame - else None - ) - previous_user_flow = ( - previous_user_flow_frame.flow(self.all_flows) - if previous_user_flow_frame - else None - ) - - if ( - isinstance(current_frame, UserFlowStackFrame) - and previous_user_flow_step - and previous_user_flow - and current_frame.frame_type == FlowStackFrameType.INTERRUPT - and not self.is_step_end_of_flow(previous_user_flow_step) - ): - self.dialogue_stack.push( - ContinueInterruptedPatternFlowStackFrame( - previous_flow_name=previous_user_flow.readable_name(), - ) - ) - - def trigger_pattern_completed(self, current_frame: DialogueStackFrame) -> None: - """Trigger the pattern indicating that the stack is empty, if needed.""" - if self.dialogue_stack.is_empty() and isinstance( - current_frame, UserFlowStackFrame - ): - completed_flow = current_frame.flow(self.all_flows) - completed_flow_name = ( - completed_flow.readable_name() if completed_flow else None - ) - self.dialogue_stack.push( - CompletedPatternFlowStackFrame( - previous_flow_name=completed_flow_name, - ) - ) - - def trigger_pattern_ask_collect_information( - self, - collect: str, - rejections: List[SlotRejection], - utter: str, - ) -> None: - """Trigger the pattern to ask for a slot value.""" - self.dialogue_stack.push( - CollectInformationPatternFlowStackFrame( - collect=collect, - utter=utter, - rejections=rejections, - ) - ) - - @staticmethod - def is_step_end_of_flow(step: FlowStep) -> bool: - """Check if a step is the end of a flow.""" - return ( - step.id == END_STEP - or - # not quite at the end but almost, so we'll treat it as the end - step.id == ContinueFlowStep.continue_step_for_id(END_STEP) - ) - - -class FlowStepResult: - def __init__(self, events: Optional[List[Event]] = None) -> None: - self.events = events or [] - - -class ContinueFlowWithNextStep(FlowStepResult): - def __init__(self, events: Optional[List[Event]] = None) -> None: - super().__init__(events=events) - - -class PauseFlowReturnPrediction(FlowStepResult): - def __init__(self, action_prediction: ActionPrediction) -> None: - self.action_prediction = action_prediction - super().__init__(events=action_prediction.events) diff --git a/rasa/core/policies/flows/__init__.py b/rasa/core/policies/flows/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/rasa/core/policies/flows/flow_exceptions.py b/rasa/core/policies/flows/flow_exceptions.py new file mode 100644 index 000000000000..c4f76959e4d2 --- /dev/null +++ b/rasa/core/policies/flows/flow_exceptions.py @@ -0,0 +1,31 @@ +from rasa.dialogue_understanding.stack.dialogue_stack import DialogueStack +from rasa.shared.exceptions import RasaException + + +class FlowException(RasaException): + """Exception that is raised when there is a problem with a flow.""" + + pass + + +class FlowCircuitBreakerTrippedException(FlowException): + """Exception that is raised when the flow circuit breaker tripped. + + The circuit breaker gets tripped when a flow seems to be stuck in + executing steps and does not make any progress.""" + + def __init__( + self, dialogue_stack: DialogueStack, number_of_steps_taken: int + ) -> None: + """Creates a `FlowCircuitBreakerTrippedException`. + + Args: + dialogue_stack: The dialogue stack. + number_of_steps_taken: The number of steps that were taken. + """ + super().__init__( + f"Flow circuit breaker tripped after {number_of_steps_taken} steps. " + "There appears to be an infinite loop in the flows." + ) + self.dialogue_stack = dialogue_stack + self.number_of_steps_taken = number_of_steps_taken diff --git a/rasa/core/policies/flows/flow_executor.py b/rasa/core/policies/flows/flow_executor.py new file mode 100644 index 000000000000..342ee9bd02b2 --- /dev/null +++ b/rasa/core/policies/flows/flow_executor.py @@ -0,0 +1,496 @@ +from __future__ import annotations + +from typing import Any, Dict, Text, List, Optional + +from jinja2 import Template +from structlog.contextvars import ( + bound_contextvars, +) +from rasa.core.policies.flows.flow_exceptions import ( + FlowCircuitBreakerTrippedException, + FlowException, +) +from rasa.core.policies.flows.flow_step_result import ( + FlowActionPrediction, + ContinueFlowWithNextStep, + FlowStepResult, + PauseFlowReturnPrediction, +) +from rasa.dialogue_understanding.stack.dialogue_stack import DialogueStack +from rasa.dialogue_understanding.stack.frames import ( + BaseFlowStackFrame, + DialogueStackFrame, + UserFlowStackFrame, +) +from rasa.dialogue_understanding.patterns.collect_information import ( + CollectInformationPatternFlowStackFrame, +) +from rasa.dialogue_understanding.patterns.completed import ( + CompletedPatternFlowStackFrame, +) +from rasa.dialogue_understanding.patterns.continue_interrupted import ( + ContinueInterruptedPatternFlowStackFrame, +) +from rasa.dialogue_understanding.stack.frames.flow_stack_frame import FlowStackFrameType +from rasa.dialogue_understanding.stack.utils import ( + top_user_flow_frame, +) + +from pypred import Predicate + +from rasa.shared.core.constants import ( + ACTION_LISTEN_NAME, + ACTION_SEND_TEXT_NAME, +) +from rasa.shared.core.events import Event, SlotSet +from rasa.shared.core.flows.flow import ( + END_STEP, + ActionFlowStep, + BranchFlowStep, + ContinueFlowStep, + ElseFlowLink, + EndFlowStep, + Flow, + FlowStep, + FlowsList, + GenerateResponseFlowStep, + IfFlowLink, + SlotRejection, + LinkFlowStep, + SetSlotsFlowStep, + CollectInformationFlowStep, + StaticFlowLink, +) +from rasa.shared.core.domain import Domain +from rasa.shared.core.trackers import ( + DialogueStateTracker, +) +import structlog + +structlogger = structlog.get_logger() + +MAX_NUMBER_OF_STEPS = 250 + + +def render_template_variables(text: str, context: Dict[Text, Any]) -> str: + """Replace context variables in a text.""" + return Template(text).render(context) + + +def is_condition_satisfied( + predicate: Text, context: Dict[str, Any], tracker: DialogueStateTracker +) -> bool: + """Evaluate a predicate condition.""" + + # attach context to the predicate evaluation to allow conditions using it + context = {"context": context} + + document: Dict[str, Any] = context.copy() + document.update(tracker.current_slot_values()) + + p = Predicate(render_template_variables(predicate, context)) + try: + return p.evaluate(document) + except (TypeError, Exception) as e: + structlogger.error( + "flow.predicate.error", + predicate=predicate, + document=document, + error=str(e), + ) + return False + + +def is_step_end_of_flow(step: FlowStep) -> bool: + """Check if a step is the end of a flow.""" + return ( + step.id == END_STEP + or + # not quite at the end but almost, so we'll treat it as the end + step.id == ContinueFlowStep.continue_step_for_id(END_STEP) + ) + + +def select_next_step_id( + current: FlowStep, + condition_evaluation_context: Dict[str, Any], + tracker: DialogueStateTracker, +) -> Optional[Text]: + """Selects the next step id based on the current step.""" + next = current.next + if len(next.links) == 1 and isinstance(next.links[0], StaticFlowLink): + return next.links[0].target + + # evaluate if conditions + for link in next.links: + if isinstance(link, IfFlowLink) and link.condition: + if is_condition_satisfied( + link.condition, condition_evaluation_context, tracker + ): + return link.target + + # evaluate else condition + for link in next.links: + if isinstance(link, ElseFlowLink): + return link.target + + if next.links: + structlogger.error( + "flow.link.failed_to_select_branch", + current=current, + links=next.links, + tracker=tracker, + ) + return None + if current.id == END_STEP: + # we are already at the very end of the flow. There is no next step. + return None + elif isinstance(current, LinkFlowStep): + # link steps don't have a next step, so we'll return the end step + return END_STEP + else: + structlogger.error( + "flow.step.failed_to_select_next_step", + step=current, + tracker=tracker, + ) + return None + + +def select_next_step( + current_step: FlowStep, + current_flow: Flow, + stack: DialogueStack, + tracker: DialogueStateTracker, +) -> Optional[FlowStep]: + """Get the next step to execute.""" + next_id = select_next_step_id(current_step, stack.current_context(), tracker) + step = current_flow.step_by_id(next_id) + structlogger.debug( + "flow.step.next", + next_id=step.id if step else None, + current_id=current_step.id, + flow_id=current_flow.id, + ) + return step + + +def advance_top_flow_on_stack(updated_id: str, stack: DialogueStack) -> None: + """Advance the top flow on the stack.""" + if (top := stack.top()) and isinstance(top, BaseFlowStackFrame): + top.step_id = updated_id + + +def events_from_set_slots_step(step: SetSlotsFlowStep) -> List[Event]: + """Create events from a set slots step.""" + return [SlotSet(slot["key"], slot["value"]) for slot in step.slots] + + +def events_for_collect_step( + step: CollectInformationFlowStep, tracker: DialogueStateTracker +) -> List[Event]: + """Create events for a collect step.""" + # reset the slot if its already filled and the collect information shouldn't + # be skipped + slot = tracker.slots.get(step.collect, None) + + if slot and slot.has_been_set and step.ask_before_filling: + return [SlotSet(step.collect, slot.initial_value)] + else: + return [] + + +def trigger_pattern_continue_interrupted( + current_frame: DialogueStackFrame, stack: DialogueStack, flows: FlowsList +) -> None: + """Trigger the pattern to continue an interrupted flow if needed.""" + # get previously started user flow that will be continued + previous_user_flow_frame = top_user_flow_frame(stack) + previous_user_flow_step = ( + previous_user_flow_frame.step(flows) if previous_user_flow_frame else None + ) + previous_user_flow = ( + previous_user_flow_frame.flow(flows) if previous_user_flow_frame else None + ) + + if ( + isinstance(current_frame, UserFlowStackFrame) + and previous_user_flow_step is not None + and previous_user_flow is not None + and current_frame.frame_type == FlowStackFrameType.INTERRUPT + and not is_step_end_of_flow(previous_user_flow_step) + ): + stack.push( + ContinueInterruptedPatternFlowStackFrame( + previous_flow_name=previous_user_flow.readable_name(), + ) + ) + + +def trigger_pattern_completed( + current_frame: DialogueStackFrame, stack: DialogueStack, flows: FlowsList +) -> None: + """Trigger the pattern indicating that the stack is empty, if needed.""" + if not stack.is_empty() or not isinstance(current_frame, UserFlowStackFrame): + return + + completed_flow = current_frame.flow(flows) + completed_flow_name = completed_flow.readable_name() if completed_flow else None + stack.push( + CompletedPatternFlowStackFrame( + previous_flow_name=completed_flow_name, + ) + ) + + +def trigger_pattern_ask_collect_information( + collect: str, + stack: DialogueStack, + rejections: List[SlotRejection], + utter: str, +) -> None: + """Trigger the pattern to ask for a slot value.""" + stack.push( + CollectInformationPatternFlowStackFrame( + collect=collect, + utter=utter, + rejections=rejections, + ) + ) + + +def reset_scoped_slots( + current_flow: Flow, tracker: DialogueStateTracker +) -> List[Event]: + """Reset all scoped slots.""" + + def _reset_slot(slot_name: Text, dialogue_tracker: DialogueStateTracker) -> None: + slot = dialogue_tracker.slots.get(slot_name, None) + initial_value = slot.initial_value if slot else None + events.append(SlotSet(slot_name, initial_value)) + + events: List[Event] = [] + + not_resettable_slot_names = set() + + for step in current_flow.steps: + if isinstance(step, CollectInformationFlowStep): + # reset all slots scoped to the flow + if step.reset_after_flow_ends: + _reset_slot(step.collect, tracker) + else: + not_resettable_slot_names.add(step.collect) + + # slots set by the set slots step should be reset after the flow ends + # unless they are also used in a collect step where `reset_after_flow_ends` + # is set to `False` + resettable_set_slots = [ + slot["key"] + for step in current_flow.steps + if isinstance(step, SetSlotsFlowStep) + for slot in step.slots + if slot["key"] not in not_resettable_slot_names + ] + + for name in resettable_set_slots: + _reset_slot(name, tracker) + + return events + + +def advance_flows( + tracker: DialogueStateTracker, domain: Domain, flows: FlowsList +) -> FlowActionPrediction: + """Advance the flows. + + Either start a new flow or advance the current flow. + + Args: + tracker: The tracker to get the next action for. + + Returns: + The predicted action and the events to run. + """ + stack = DialogueStack.from_tracker(tracker) + if stack.is_empty(): + # if there are no flows, there is nothing to do + return FlowActionPrediction(None, 0.0) + + previous_stack = stack.as_dict() + prediction = select_next_action(stack, tracker, domain, flows) + if previous_stack != stack.as_dict(): + # we need to update dialogue stack to persist the state of the executor + if not prediction.events: + prediction.events = [] + prediction.events.append(stack.persist_as_event()) + return prediction + + +def select_next_action( + stack: DialogueStack, + tracker: DialogueStateTracker, + domain: Domain, + flows: FlowsList, +) -> FlowActionPrediction: + """Select the next action to execute. + + Advances the current flow and returns the next action to execute. A flow + is advanced until it is completed or until it predicts an action. If + the flow is completed, the next flow is popped from the stack and + advanced. If there are no more flows, the action listen is predicted. + + Args: + tracker: The tracker to get the next action for. + + Returns: + The next action to execute, the events that should be applied to the + tracker and the confidence of the prediction. + """ + step_result: FlowStepResult = ContinueFlowWithNextStep() + + tracker = tracker.copy() + + number_of_initial_events = len(tracker.events) + + number_of_steps_taken = 0 + + while isinstance(step_result, ContinueFlowWithNextStep): + + number_of_steps_taken += 1 + if number_of_steps_taken > MAX_NUMBER_OF_STEPS: + raise FlowCircuitBreakerTrippedException(stack, number_of_steps_taken) + + active_frame = stack.top() + if not isinstance(active_frame, BaseFlowStackFrame): + # If there is no current flow, we assume that all flows are done + # and there is nothing to do. The assumption here is that every + # flow ends with an action listen. + step_result = PauseFlowReturnPrediction( + FlowActionPrediction(ACTION_LISTEN_NAME, 1.0) + ) + break + + with bound_contextvars(flow_id=active_frame.flow_id): + structlogger.debug( + "flow.execution.loop", previous_step_id=active_frame.step_id + ) + current_flow = active_frame.flow(flows) + current_step = select_next_step( + active_frame.step(flows), current_flow, stack, tracker + ) + + if not current_step: + continue + + advance_top_flow_on_stack(current_step.id, stack) + + with bound_contextvars(step_id=current_step.id): + step_result = run_step( + current_step, current_flow, stack, tracker, domain, flows + ) + tracker.update_with_events(step_result.events, domain) + + gathered_events = list(tracker.events)[number_of_initial_events:] + if isinstance(step_result, PauseFlowReturnPrediction): + prediction = step_result.action_prediction + # make sure we really return all events that got created during the + # step execution of all steps (not only the last one) + prediction.events = gathered_events + return prediction + else: + structlogger.warning("flow.step.execution.no_action") + return FlowActionPrediction(None, 0.0) + + +def run_step( + step: FlowStep, + flow: Flow, + stack: DialogueStack, + tracker: DialogueStateTracker, + domain: Domain, + flows: FlowsList, +) -> FlowStepResult: + """Run a single step of a flow. + + Returns the predicted action and a list of events that were generated + during the step. The predicted action can be `None` if the step + doesn't generate an action. The list of events can be empty if the + step doesn't generate any events. + + Raises a `FlowException` if the step is invalid. + + Args: + flow: The flow that the step belongs to. + step: The step to run. + tracker: The tracker to run the step on. + + Returns: + A result of running the step describing where to transition to. + """ + if isinstance(step, CollectInformationFlowStep): + structlogger.debug("flow.step.run.collect") + trigger_pattern_ask_collect_information( + step.collect, stack, step.rejections, step.utter + ) + + events = events_for_collect_step(step, tracker) + return ContinueFlowWithNextStep(events=events) + + elif isinstance(step, ActionFlowStep): + if not step.action: + raise FlowException(f"Action not specified for step {step}") + + context = {"context": stack.current_context()} + action_name = render_template_variables(step.action, context) + + if action_name in domain.action_names_or_texts: + structlogger.debug("flow.step.run.action", context=context) + return PauseFlowReturnPrediction(FlowActionPrediction(action_name, 1.0)) + else: + structlogger.warning("flow.step.run.action.unknown", action=action_name) + return ContinueFlowWithNextStep() + + elif isinstance(step, LinkFlowStep): + structlogger.debug("flow.step.run.link") + stack.push( + UserFlowStackFrame( + flow_id=step.link, + frame_type=FlowStackFrameType.LINK, + ), + # push this below the current stack frame so that we can + # complete the current flow first and then continue with the + # linked flow + index=-1, + ) + return ContinueFlowWithNextStep() + + elif isinstance(step, SetSlotsFlowStep): + structlogger.debug("flow.step.run.slot") + events = events_from_set_slots_step(step) + return ContinueFlowWithNextStep(events=events) + + elif isinstance(step, BranchFlowStep): + structlogger.debug("flow.step.run.branch") + return ContinueFlowWithNextStep() + + elif isinstance(step, GenerateResponseFlowStep): + structlogger.debug("flow.step.run.generate_response") + generated = step.generate(tracker) + action_prediction = FlowActionPrediction( + ACTION_SEND_TEXT_NAME, + 1.0, + metadata={"message": {"text": generated}}, + ) + return PauseFlowReturnPrediction(action_prediction) + + elif isinstance(step, EndFlowStep): + # this is the end of the flow, so we'll pop it from the stack + structlogger.debug("flow.step.run.flow_end") + current_frame = stack.pop() + trigger_pattern_continue_interrupted(current_frame, stack, flows) + trigger_pattern_completed(current_frame, stack, flows) + reset_events = reset_scoped_slots(flow, tracker) + return ContinueFlowWithNextStep(events=reset_events) + + else: + raise FlowException(f"Unknown flow step type {type(step)}") diff --git a/rasa/core/policies/flows/flow_step_result.py b/rasa/core/policies/flows/flow_step_result.py new file mode 100644 index 000000000000..00a884a0a65f --- /dev/null +++ b/rasa/core/policies/flows/flow_step_result.py @@ -0,0 +1,40 @@ +from dataclasses import dataclass +from typing import Any, Dict, List, Optional + +from rasa.shared.core.events import Event + + +@dataclass +class FlowActionPrediction: + """Represents an action prediction.""" + + action_name: Optional[str] + """The name of the predicted action.""" + score: float + """The score of the predicted action.""" + metadata: Optional[Dict[str, Any]] = None + """The metadata of the predicted action.""" + events: Optional[List[Event]] = None + """The events attached to the predicted action.""" + + +class FlowStepResult: + """Represents the result of a flow step.""" + + def __init__(self, events: Optional[List[Event]] = None) -> None: + self.events = events or [] + + +class ContinueFlowWithNextStep(FlowStepResult): + """Represents the result of a flow step that should continue with the next step.""" + + def __init__(self, events: Optional[List[Event]] = None) -> None: + super().__init__(events=events) + + +class PauseFlowReturnPrediction(FlowStepResult): + """Result where the flow execution should be paused after this step.""" + + def __init__(self, action_prediction: FlowActionPrediction) -> None: + self.action_prediction = action_prediction + super().__init__(events=action_prediction.events) diff --git a/rasa/dialogue_understanding/generator/llm_command_generator.py b/rasa/dialogue_understanding/generator/llm_command_generator.py index aad8e01e90d0..891625674547 100644 --- a/rasa/dialogue_understanding/generator/llm_command_generator.py +++ b/rasa/dialogue_understanding/generator/llm_command_generator.py @@ -4,6 +4,7 @@ from jinja2 import Template import structlog +from rasa.dialogue_understanding.stack.dialogue_stack import DialogueStack from rasa.dialogue_understanding.stack.utils import top_flow_frame from rasa.dialogue_understanding.generator import CommandGenerator @@ -18,7 +19,6 @@ KnowledgeAnswerCommand, ClarifyCommand, ) -from rasa.core.policies.flow_policy import DialogueStack from rasa.engine.graph import GraphComponent, ExecutionContext from rasa.engine.recipes.default_recipe import DefaultV1Recipe from rasa.engine.storage.resource import Resource diff --git a/rasa/dialogue_understanding/stack/dialogue_stack.py b/rasa/dialogue_understanding/stack/dialogue_stack.py index 45911b0207c6..c1759f39b045 100644 --- a/rasa/dialogue_understanding/stack/dialogue_stack.py +++ b/rasa/dialogue_understanding/stack/dialogue_stack.py @@ -1,4 +1,5 @@ from __future__ import annotations +import copy from dataclasses import dataclass from typing import Any, Callable, Dict, List, Optional @@ -43,6 +44,9 @@ def as_dict(self) -> List[Dict[str, Any]]: """ return [frame.as_dict() for frame in self.frames] + def copy(self) -> DialogueStack: + return copy.deepcopy(self) + def push(self, frame: DialogueStackFrame, index: Optional[int] = None) -> None: """Pushes a new frame onto the stack. diff --git a/rasa/dialogue_understanding/stack/utils.py b/rasa/dialogue_understanding/stack/utils.py index 71e59b90d4ba..ecfea8503ae3 100644 --- a/rasa/dialogue_understanding/stack/utils.py +++ b/rasa/dialogue_understanding/stack/utils.py @@ -108,7 +108,7 @@ def user_flows_on_the_stack(dialogue_stack: DialogueStack) -> Set[str]: } -def end_top_user_flow(stack: DialogueStack) -> None: +def end_top_user_flow(stack: DialogueStack) -> DialogueStack: """Ends all frames on top of the stack including the topmost user frame. Ends all flows until the next user flow is reached. This is useful @@ -119,8 +119,11 @@ def end_top_user_flow(stack: DialogueStack) -> None: stack: The dialogue stack. """ - for frame in reversed(stack.frames): + updated_stack = stack.copy() + + for frame in reversed(updated_stack.frames): if isinstance(frame, BaseFlowStackFrame): frame.step_id = ContinueFlowStep.continue_step_for_id(END_STEP) if isinstance(frame, UserFlowStackFrame): break + return updated_stack diff --git a/rasa/shared/core/flows/flow.py b/rasa/shared/core/flows/flow.py index 656309c23564..716227645aa2 100644 --- a/rasa/shared/core/flows/flow.py +++ b/rasa/shared/core/flows/flow.py @@ -12,14 +12,12 @@ Set, Text, Union, - runtime_checkable, ) import structlog from rasa.shared.core.trackers import DialogueStateTracker from rasa.shared.constants import RASA_DEFAULT_FLOW_PATTERN_PREFIX, UTTER_PREFIX from rasa.shared.exceptions import RasaException -from rasa.shared.nlu.constants import ENTITY_ATTRIBUTE_TYPE, INTENT_NAME_KEY import rasa.shared.utils.io from rasa.shared.utils.llm import ( @@ -524,26 +522,6 @@ def _previously_asked_collect( return _previously_asked_collect(step_id or START_STEP, set()) - def get_trigger_intents(self) -> Set[str]: - """Returns the trigger intents of the flow""" - results: Set[str] = set() - if len(self.steps) == 0: - return results - - first_step = self.steps[0] - - if not isinstance(first_step, UserMessageStep): - return results - - for condition in first_step.trigger_conditions: - results.add(condition.intent) - - return results - - def is_user_triggerable(self) -> bool: - """Test whether a user can trigger the flow with an intent.""" - return len(self.get_trigger_intents()) > 0 - @property def is_rasa_default_flow(self) -> bool: """Test whether something is a rasa default flow.""" @@ -631,8 +609,6 @@ def step_from_json(flow_step_config: Dict[Text, Any]) -> FlowStep: """ if "action" in flow_step_config: return ActionFlowStep.from_json(flow_step_config) - if "intent" in flow_step_config: - return UserMessageStep.from_json(flow_step_config) if "collect" in flow_step_config: return CollectInformationFlowStep.from_json(flow_step_config) if "link" in flow_step_config: @@ -925,139 +901,6 @@ def default_id_postfix(self) -> str: return f"link_{self.link}" -@dataclass -class TriggerCondition: - """Represents the configuration of a trigger condition.""" - - intent: Text - """The intent to trigger the flow.""" - entities: List[Text] - """The entities to trigger the flow.""" - - def is_triggered(self, intent: Text, entities: List[Text]) -> bool: - """Check if condition is triggered by the given intent and entities. - - Args: - intent: The intent to check. - entities: The entities to check. - - Returns: - Whether the trigger condition is triggered by the given intent and entities. - """ - if self.intent != intent: - return False - if len(self.entities) == 0: - return True - return all(entity in entities for entity in self.entities) - - -@runtime_checkable -class StepThatCanStartAFlow(Protocol): - """Represents a step that can start a flow.""" - - def is_triggered(self, tracker: DialogueStateTracker) -> bool: - """Check if a flow should be started for the tracker - - Args: - tracker: The tracker to check. - - Returns: - Whether a flow should be started for the tracker. - """ - ... - - -@dataclass -class UserMessageStep(FlowStep, StepThatCanStartAFlow): - """Represents the configuration of an intent flow step.""" - - trigger_conditions: List[TriggerCondition] - """The trigger conditions of the flow step.""" - - @classmethod - def from_json(cls, flow_step_config: Dict[Text, Any]) -> UserMessageStep: - """Used to read flow steps from parsed YAML. - - Args: - flow_step_config: The parsed YAML as a dictionary. - - Returns: - The parsed flow step. - """ - base = super()._from_json(flow_step_config) - - trigger_conditions = [] - if "intent" in flow_step_config: - trigger_conditions.append( - TriggerCondition( - intent=flow_step_config["intent"], - entities=flow_step_config.get("entities", []), - ) - ) - elif "or" in flow_step_config: - for trigger_condition in flow_step_config["or"]: - trigger_conditions.append( - TriggerCondition( - intent=trigger_condition.get("intent", ""), - entities=trigger_condition.get("entities", []), - ) - ) - - return UserMessageStep( - trigger_conditions=trigger_conditions, - **base.__dict__, - ) - - def as_json(self) -> Dict[Text, Any]: - """Returns the flow step as a dictionary. - - Returns: - The flow step as a dictionary. - """ - dump = super().as_json() - - if len(self.trigger_conditions) == 1: - dump["intent"] = self.trigger_conditions[0].intent - if self.trigger_conditions[0].entities: - dump["entities"] = self.trigger_conditions[0].entities - elif len(self.trigger_conditions) > 1: - dump["or"] = [ - { - "intent": trigger_condition.intent, - "entities": trigger_condition.entities, - } - for trigger_condition in self.trigger_conditions - ] - - return dump - - def is_triggered(self, tracker: DialogueStateTracker) -> bool: - """Returns whether the flow step is triggered by the given intent and entities. - - Args: - intent: The intent to check. - entities: The entities to check. - - Returns: - Whether the flow step is triggered by the given intent and entities. - """ - if not tracker.latest_message: - return False - - intent: Text = tracker.latest_message.intent.get(INTENT_NAME_KEY, "") - entities: List[Text] = [ - e.get(ENTITY_ATTRIBUTE_TYPE, "") for e in tracker.latest_message.entities - ] - return any( - trigger_condition.is_triggered(intent, entities) - for trigger_condition in self.trigger_conditions - ) - - def default_id_postfix(self) -> str: - """Returns the default id postfix of the flow step.""" - return "intent" - - DEFAULT_LLM_CONFIG = { "_type": "openai", "request_timeout": 5, diff --git a/tests/core/policies/flows/__init__.py b/tests/core/policies/flows/__init__.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/tests/core/policies/flows/test_flow_exceptions.py b/tests/core/policies/flows/test_flow_exceptions.py new file mode 100644 index 000000000000..541dd6607d35 --- /dev/null +++ b/tests/core/policies/flows/test_flow_exceptions.py @@ -0,0 +1,19 @@ +from rasa.core.policies.flows.flow_exceptions import ( + FlowCircuitBreakerTrippedException, + FlowException, +) +from rasa.dialogue_understanding.stack.dialogue_stack import DialogueStack +from rasa.shared.exceptions import RasaException + + +def test_flow_circuit_breaker_tripped_exception_is_rasa_exception(): + # important, because we treat internal exceptiosn differently + stack = DialogueStack(frames=[]) + e = FlowCircuitBreakerTrippedException(stack, 42) + assert isinstance(e, RasaException) + + +def test_flow_exception_is_rasa_exception(): + # important, because we treat internal exceptiosn differently + e = FlowException() + assert isinstance(e, RasaException) diff --git a/tests/core/policies/flows/test_flow_executor.py b/tests/core/policies/flows/test_flow_executor.py new file mode 100644 index 000000000000..1df413404694 --- /dev/null +++ b/tests/core/policies/flows/test_flow_executor.py @@ -0,0 +1,597 @@ +import textwrap +from typing import List, Optional, Tuple +import pytest +from rasa.core.policies.flows import flow_executor +from rasa.core.policies.flows.flow_exceptions import FlowCircuitBreakerTrippedException +from rasa.dialogue_understanding.stack.dialogue_stack import DialogueStack +from rasa.dialogue_understanding.stack.frames.flow_stack_frame import UserFlowStackFrame +from rasa.dialogue_understanding.stack.frames.search_frame import SearchStackFrame +from rasa.shared.core.domain import Domain +from rasa.shared.core.events import ActionExecuted, Event, SlotSet +from rasa.shared.core.flows.flow import ( + END_STEP, + ContinueFlowStep, + EndFlowStep, + FlowLinks, + FlowsList, + SetSlotsFlowStep, +) +from rasa.shared.core.flows.yaml_flows_io import YAMLFlowsReader, flows_from_str +from rasa.shared.core.slots import TextSlot +from rasa.shared.core.trackers import DialogueStateTracker + + +def test_render_template_variables(): + assert ( + flow_executor.render_template_variables("foo {{bar}}", {"bar": "bar baz"}) + == "foo bar baz" + ) + + +def test_render_template_empty_context(): + assert flow_executor.render_template_variables("foo {{bar}}", {}) == "foo " + + +def test_render_template_empty_text(): + assert flow_executor.render_template_variables("", {"bar": "bar baz"}) == "" + + +def test_evaluate_simple_predicate(): + predicate = "2 > 1" + stack = DialogueStack(frames=[]) + tracker = DialogueStateTracker.from_events("test", []) + assert flow_executor.is_condition_satisfied(predicate, stack, tracker) + + +def test_evaluate_simple_predicate_failing(): + predicate = "2 < 1" + stack = DialogueStack(frames=[]) + tracker = DialogueStateTracker.from_events("test", []) + assert not flow_executor.is_condition_satisfied(predicate, stack, tracker) + + +def test_invalid_predicate(): + predicate = "2 >!= 1" + stack = DialogueStack(frames=[]) + tracker = DialogueStateTracker.from_events("test", []) + assert not flow_executor.is_condition_satisfied(predicate, stack, tracker) + + +def test_evaluate_predicate_with_context_unsuccessfully(): + predicate = "'foo' = context.flow_id" + unsatisfied_tracker = DialogueStateTracker.from_events("test", []) + assert not flow_executor.is_condition_satisfied( + predicate, + context={}, + tracker=unsatisfied_tracker, + ) + + +def test_evaluate_predicate_with_context_successfully(): + predicate = "'foo' = context.flow_id" + stack = DialogueStack( + frames=[ + UserFlowStackFrame( + flow_id="foo", step_id="first_step", frame_id="some-frame-id" + ) + ] + ) + + satisfied_tracker = DialogueStateTracker.from_events( + "test", [stack.persist_as_event()] + ) + assert flow_executor.is_condition_satisfied( + predicate, + stack.current_context(), + satisfied_tracker, + ) + + +def test_evaluate_predicate_with_slots(): + predicate = "'foo' = my_slot" + + satisfied_tracker = DialogueStateTracker.from_events( + "test", [SlotSet("my_slot", "foo")] + ) + assert flow_executor.is_condition_satisfied( + predicate, + context={}, + tracker=satisfied_tracker, + ) + + unsatisfied_tracker = DialogueStateTracker.from_events("test", []) + assert not flow_executor.is_condition_satisfied( + predicate, + context={}, + tracker=unsatisfied_tracker, + ) + + +def test_is_step_end_of_flow_is_false_for_set_slot(): + step = SetSlotsFlowStep( + custom_id="foo", + description="", + idx=1, + slots=[], + metadata={}, + next=FlowLinks(links=[]), + ) + assert not flow_executor.is_step_end_of_flow(step) + + +def test_is_step_end_of_flow_is_true_for_end(): + step = EndFlowStep() + assert flow_executor.is_step_end_of_flow(step) + + +def test_is_step_end_of_flow_is_true_for_step_continuing_at_end(): + step = ContinueFlowStep(next=END_STEP) + assert flow_executor.is_step_end_of_flow(step) + + +def test_select_next_step_static_link(): + all_flows = flows_from_str( + """ + flows: + my_flow: + steps: + - id: collect_foo + collect: foo + next: collect_bar + - id: collect_bar + collect: bar + """ + ) + + user_flow_frame = UserFlowStackFrame( + flow_id="my_flow", step_id="collect_foo", frame_id="some-frame-id" + ) + stack = DialogueStack(frames=[user_flow_frame]) + tracker = DialogueStateTracker.from_events("test", [stack.persist_as_event()]) + step = user_flow_frame.step(all_flows) + + assert ( + flow_executor.select_next_step_id(step, stack.current_context(), tracker) + == "collect_bar" + ) + + +def test_select_next_step_branch_if(): + all_flows = flows_from_str( + """ + flows: + my_flow: + steps: + - id: collect_foo + collect: foo + next: + - if: foo is 'foobar' + then: collect_bar + - else: + - id: collect_baz + collect: baz + next: END + - id: collect_bar + collect: bar + """ + ) + + user_flow_frame = UserFlowStackFrame( + flow_id="my_flow", step_id="collect_foo", frame_id="some-frame-id" + ) + stack = DialogueStack(frames=[user_flow_frame]) + tracker = DialogueStateTracker.from_events( + "test", [stack.persist_as_event(), SlotSet("foo", "foobar")] + ) + step = user_flow_frame.step(all_flows) + + assert ( + flow_executor.select_next_step_id(step, stack.current_context(), tracker) + == "collect_bar" + ) + + +def test_select_next_step_branch_else(): + all_flows = flows_from_str( + """ + flows: + my_flow: + steps: + - id: collect_foo + collect: foo + next: + - if: foo is 'foobar' + then: collect_bar + - else: + - id: collect_baz + collect: baz + next: END + - id: collect_bar + collect: bar + """ + ) + + user_flow_frame = UserFlowStackFrame( + flow_id="my_flow", step_id="collect_foo", frame_id="some-frame-id" + ) + stack = DialogueStack(frames=[user_flow_frame]) + tracker = DialogueStateTracker.from_events( + "test", [stack.persist_as_event(), SlotSet("foo", "bazbaz")] + ) + step = user_flow_frame.step(all_flows) + + assert ( + flow_executor.select_next_step_id(step, stack.current_context(), tracker) + == "collect_baz" + ) + + +def test_select_next_step_branch_not_possible(): + # the flow is missing an else so we can't select a next step + all_flows = flows_from_str( + """ + flows: + my_flow: + steps: + - id: collect_foo + collect: foo + next: + - if: foo is 'foobar' + then: collect_bar + - if: foo is 'fooooobar' + then: + - id: collect_baz + collect: baz + next: END + # we need to add this when parsing, otherwise it fails. but + # we will remove it later in the test code + - else: END + - id: collect_bar + collect: bar + """ + ) + + user_flow_frame = UserFlowStackFrame( + flow_id="my_flow", step_id="collect_foo", frame_id="some-frame-id" + ) + stack = DialogueStack(frames=[user_flow_frame]) + tracker = DialogueStateTracker.from_events( + "test", [stack.persist_as_event(), SlotSet("foo", "bazbaz")] + ) + step = user_flow_frame.step(all_flows) + + step.next.links = step.next.links[:-1] # removes the else branch + + assert ( + flow_executor.select_next_step_id(step, stack.current_context(), tracker) + is None + ) + + +def test_select_handles_END_next(): + all_flows = flows_from_str( + """ + flows: + my_flow: + steps: + - id: collect_foo + collect: foo + """ + ) + + user_flow_frame = UserFlowStackFrame( + flow_id="my_flow", step_id="collect_foo", frame_id="some-frame-id" + ) + stack = DialogueStack(frames=[user_flow_frame]) + tracker = DialogueStateTracker.from_events("test", [stack.persist_as_event()]) + step = user_flow_frame.step(all_flows) + + assert ( + flow_executor.select_next_step_id(step, stack.current_context(), tracker) + == "END" + ) + + +def test_select_handles_no_next(): + all_flows = flows_from_str( + """ + flows: + my_flow: + steps: + - id: collect_foo + collect: foo + """ + ) + + user_flow_frame = UserFlowStackFrame( + flow_id="my_flow", step_id="collect_foo", frame_id="some-frame-id" + ) + stack = DialogueStack(frames=[user_flow_frame]) + tracker = DialogueStateTracker.from_events("test", [stack.persist_as_event()]) + step = user_flow_frame.step(all_flows) + + # we need to manually create this case as the YAML parser doesn't allow + # for empty nexts. so actually, we shouldn't even get into this situation + # but still good to make sure that the function handles it + step.next = FlowLinks(links=[]) + + assert ( + flow_executor.select_next_step_id(step, stack.current_context(), tracker) + is None + ) + + +def test_select_handles_current_node_being_END(): + all_flows = flows_from_str( + """ + flows: + my_flow: + steps: + - id: collect_foo + collect: foo + """ + ) + + user_flow_frame = UserFlowStackFrame( + flow_id="my_flow", step_id="END", frame_id="some-frame-id" + ) + stack = DialogueStack(frames=[user_flow_frame]) + tracker = DialogueStateTracker.from_events("test", [stack.persist_as_event()]) + step = user_flow_frame.step(all_flows) + + assert ( + flow_executor.select_next_step_id(step, stack.current_context(), tracker) + is None + ) + + +def test_select_handles_current_node_being_link(): + all_flows = flows_from_str( + """ + flows: + my_flow: + steps: + - id: link_to_foo + link: foo + """ + ) + + user_flow_frame = UserFlowStackFrame( + flow_id="my_flow", step_id="link_to_foo", frame_id="some-frame-id" + ) + stack = DialogueStack(frames=[user_flow_frame]) + tracker = DialogueStateTracker.from_events("test", [stack.persist_as_event()]) + step = user_flow_frame.step(all_flows) + + assert ( + flow_executor.select_next_step_id(step, stack.current_context(), tracker) + == "END" + ) + + +def test_advance_top_flow_on_stack_handles_empty_stack(): + stack = DialogueStack(frames=[]) + flow_executor.advance_top_flow_on_stack("foo", stack) + assert stack == DialogueStack(frames=[]) + + +def test_advance_top_flow_on_stack_handles_non_user_flow_stack(): + search_frame = SearchStackFrame(frame_id="some-frame-id") + stack = DialogueStack(frames=[search_frame]) + flow_executor.advance_top_flow_on_stack("foo", stack) + assert stack == DialogueStack(frames=[search_frame]) + + +def test_advance_top_flow_on_stack_advances_user_flow(): + user_frame = UserFlowStackFrame( + flow_id="foo", step_id="first_step", frame_id="some-frame-id" + ) + stack = DialogueStack(frames=[user_frame]) + flow_executor.advance_top_flow_on_stack("bar", stack) + top = stack.top() + assert isinstance(top, UserFlowStackFrame) + assert top.step_id == "bar" + + +def test_executor_does_not_get_tripped_if_an_action_is_predicted_in_loop(): + flow_with_loop = flows_from_str( + """ + flows: + foo_flow: + steps: + - id: "1" + set_slots: + - foo: bar + next: "2" + - id: "2" + action: action_listen + next: "1" + """ + ) + + domain = Domain.empty() + + stack = DialogueStack( + frames=[UserFlowStackFrame(flow_id="foo_flow", step_id="1", frame_id="some-id")] + ) + + tracker = DialogueStateTracker.from_events( + "test", + evts=[ActionExecuted(action_name="action_listen"), stack.persist_as_event()], + domain=domain, + slots=domain.slots, + ) + + selection = flow_executor.select_next_action(stack, tracker, domain, flow_with_loop) + assert selection.action_name == "action_listen" + + +def test_resets_all_slots_after_flow_ends() -> None: + flows = flows_from_str( + """ + flows: + foo_flow: + steps: + - id: "1" + collect: my_slot + - id: "2" + set_slots: + - foo: bar + - other_slot: other_value + - id: "3" + action: action_listen + """ + ) + tracker = DialogueStateTracker.from_events( + "test", + [ + SlotSet("my_slot", "my_value"), + SlotSet("foo", "bar"), + SlotSet("other_slot", "other_value"), + ActionExecuted("action_listen"), + ], + slots=[ + TextSlot("my_slot", mappings=[], initial_value="initial_value"), + TextSlot("foo", mappings=[]), + TextSlot("other_slot", mappings=[]), + ], + ) + + current_flow = flows.flow_by_id("foo_flow") + events = flow_executor.reset_scoped_slots(current_flow, tracker) + assert events == [ + SlotSet("my_slot", "initial_value"), + SlotSet("foo", None), + SlotSet("other_slot", None), + ] + + +def test_set_slots_inherit_reset_from_collect_step() -> None: + """Test that `reset_after_flow_ends` is inherited from the collect step.""" + slot_name = "my_slot" + flows = flows_from_str( + f""" + flows: + foo_flow: + steps: + - id: "1" + collect: {slot_name} + reset_after_flow_ends: false + - id: "2" + set_slots: + - foo: bar + - {slot_name}: my_value + - id: "3" + action: action_listen + """ + ) + tracker = DialogueStateTracker.from_events( + "test123", + [ + SlotSet("my_slot", "my_value"), + SlotSet("foo", "bar"), + ActionExecuted("action_listen"), + ], + slots=[ + TextSlot("my_slot", mappings=[], initial_value="initial_value"), + TextSlot("foo", mappings=[]), + ], + ) + + current_flow = flows.flow_by_id("foo_flow") + events = flow_executor.reset_scoped_slots(current_flow, tracker) + assert events == [ + SlotSet("foo", None), + ] + + +def test_executor_trips_internal_circuit_breaker(): + flow_with_loop = flows_from_str( + """ + flows: + foo_flow: + steps: + - id: "1" + set_slots: + - foo: bar + next: "2" + - id: "2" + set_slots: + - foo: barbar + next: "1" + """ + ) + + domain = Domain.empty() + + stack = DialogueStack( + frames=[UserFlowStackFrame(flow_id="foo_flow", step_id="1", frame_id="some-id")] + ) + + tracker = DialogueStateTracker.from_events( + "test", + evts=[ActionExecuted(action_name="action_listen"), stack.persist_as_event()], + domain=domain, + slots=domain.slots, + ) + + with pytest.raises(FlowCircuitBreakerTrippedException): + flow_executor.select_next_action(stack, tracker, domain, flow_with_loop) + + +def _run_flow_until_listen( + tracker: DialogueStateTracker, domain: Domain, flows: FlowsList +) -> Tuple[List[Optional[str]], List[Event]]: + # Run the flow until we reach a listen action. + # Collect and return all events and intermediate actions. + events = [] + actions = [] + while True: + action_prediction = flow_executor.advance_flows(tracker, domain, flows) + if not action_prediction: + break + + events.extend(action_prediction.events or []) + actions.append(action_prediction.action_name) + tracker.update_with_events(action_prediction.events or [], domain) + if action_prediction.action_name: + tracker.update(ActionExecuted(action_prediction.action_name), domain) + if action_prediction.action_name == "action_listen": + break + if action_prediction.action_name is None and not action_prediction.events: + # No action was executed and no events were generated. This means that + # the flow isn't doing anything anymore + break + return actions, events + + +@pytest.mark.skip(reason="Skip until intent gets replaced by nlu_trigger") +def test_select_next_action() -> None: + flows = YAMLFlowsReader.read_from_string( + textwrap.dedent( + """ + flows: + test_flow: + description: Test flow + steps: + - id: "1" + intent: transfer_money + next: "2" + - id: "2" + action: utter_ask_name + """ + ) + ) + tracker = DialogueStateTracker.from_dict( + "test", + [ + {"event": "action", "name": "action_listen"}, + {"event": "user", "parse_data": {"intent": {"name": "transfer_money"}}}, + ], + ) + domain = Domain.empty() + + actions, events = _run_flow_until_listen(tracker, domain, flows) + + assert actions == ["flow_test_flow", None] + assert events == [] diff --git a/tests/core/policies/test_flow_policy.py b/tests/core/policies/test_flow_policy.py index 5a84a9dcca1c..5b9d75cd1d0b 100644 --- a/tests/core/policies/test_flow_policy.py +++ b/tests/core/policies/test_flow_policy.py @@ -1,11 +1,6 @@ -import textwrap -from typing import List, Optional, Text, Tuple - import pytest from rasa.core.policies.flow_policy import ( - FlowCircuitBreakerTrippedException, - FlowExecutor, FlowPolicy, ) from rasa.dialogue_understanding.stack.dialogue_stack import DialogueStack @@ -13,10 +8,8 @@ from rasa.engine.storage.resource import Resource from rasa.engine.storage.storage import ModelStorage from rasa.shared.core.domain import Domain -from rasa.shared.core.events import ActionExecuted, Event, SlotSet +from rasa.shared.core.events import ActionExecuted, SlotSet from rasa.shared.core.flows.flow import FlowsList -from rasa.shared.core.flows.yaml_flows_io import YAMLFlowsReader -from rasa.shared.core.slots import TextSlot from rasa.shared.core.trackers import DialogueStateTracker from rasa.dialogue_understanding.stack.frames import ( UserFlowStackFrame, @@ -68,65 +61,6 @@ def default_flows() -> FlowsList: ) -def _run_flow_until_listen( - executor: FlowExecutor, tracker: DialogueStateTracker, domain: Domain -) -> Tuple[List[Optional[Text]], List[Event]]: - # Run the flow until we reach a listen action. - # Collect and return all events and intermediate actions. - events = [] - actions = [] - while True: - action_prediction = executor.advance_flows(tracker) - if not action_prediction: - break - - events.extend(action_prediction.events or []) - actions.append(action_prediction.action_name) - tracker.update_with_events(action_prediction.events or [], domain) - if action_prediction.action_name: - tracker.update(ActionExecuted(action_prediction.action_name), domain) - if action_prediction.action_name == "action_listen": - break - if action_prediction.action_name is None and not action_prediction.events: - # No action was executed and no events were generated. This means that - # the flow isn't doing anything anymore - break - return actions, events - - -@pytest.mark.skip(reason="Skip until intent gets replaced by nlu_trigger") -def test_select_next_action() -> None: - flows = YAMLFlowsReader.read_from_string( - textwrap.dedent( - """ - flows: - test_flow: - description: Test flow - steps: - - id: "1" - intent: transfer_money - next: "2" - - id: "2" - action: utter_ask_name - """ - ) - ) - tracker = DialogueStateTracker.from_dict( - "test", - [ - {"event": "action", "name": "action_listen"}, - {"event": "user", "parse_data": {"intent": {"name": "transfer_money"}}}, - ], - ) - domain = Domain.empty() - executor = FlowExecutor.from_tracker(tracker, flows, domain) - - actions, events = _run_flow_until_listen(executor, tracker, domain) - - assert actions == ["flow_test_flow", None] - assert events == [] - - def test_flow_policy_does_support_user_flowstack_frame(): frame = UserFlowStackFrame(flow_id="foo", step_id="first_step", frame_id="some-id") assert FlowPolicy.does_support_stack_frame(frame) @@ -207,42 +141,6 @@ def test_predict_action_probabilities_advances_topmost_flow( ] -def test_executor_trips_internal_circuit_breaker(): - flow_with_loop = flows_from_str( - """ - flows: - foo_flow: - steps: - - id: "1" - set_slots: - - foo: bar - next: "2" - - id: "2" - set_slots: - - foo: barbar - next: "1" - """ - ) - - domain = Domain.empty() - - stack = DialogueStack( - frames=[UserFlowStackFrame(flow_id="foo_flow", step_id="1", frame_id="some-id")] - ) - - tracker = DialogueStateTracker.from_events( - "test", - evts=[ActionExecuted(action_name="action_listen"), stack.persist_as_event()], - domain=domain, - slots=domain.slots, - ) - - executor = FlowExecutor.from_tracker(tracker, flow_with_loop, domain) - - with pytest.raises(FlowCircuitBreakerTrippedException): - executor.select_next_action(tracker) - - def test_policy_triggers_error_pattern_if_internal_circuit_breaker_is_tripped( default_flow_policy: FlowPolicy, ): @@ -284,133 +182,14 @@ def test_policy_triggers_error_pattern_if_internal_circuit_breaker_is_tripped( predicted_idx = prediction.max_confidence_index assert domain.action_names_or_texts[predicted_idx] == "utter_internal_error_rasa" # check that the stack was updated. - assert len(prediction.optional_events) == 1 - assert isinstance(prediction.optional_events[0], SlotSet) + assert len(prediction.optional_events) == 2 + event = prediction.optional_events[1] + assert isinstance(event, SlotSet) - assert prediction.optional_events[0].key == "dialogue_stack" + assert event.key == "dialogue_stack" # the user flow should be on the stack as well as the error pattern - assert len(prediction.optional_events[0].value) == 2 + assert len(event.value) == 2 # the user flow should be about to end - assert prediction.optional_events[0].value[0]["step_id"] == "NEXT:END" + assert event.value[0]["step_id"] == "NEXT:END" # the pattern should be the other frame - assert prediction.optional_events[0].value[1]["flow_id"] == "pattern_internal_error" - - -def test_executor_does_not_get_tripped_if_an_action_is_predicted_in_loop(): - flow_with_loop = flows_from_str( - """ - flows: - foo_flow: - steps: - - id: "1" - set_slots: - - foo: bar - next: "2" - - id: "2" - action: action_listen - next: "1" - """ - ) - - domain = Domain.empty() - - stack = DialogueStack( - frames=[UserFlowStackFrame(flow_id="foo_flow", step_id="1", frame_id="some-id")] - ) - - tracker = DialogueStateTracker.from_events( - "test", - evts=[ActionExecuted(action_name="action_listen"), stack.persist_as_event()], - domain=domain, - slots=domain.slots, - ) - - executor = FlowExecutor.from_tracker(tracker, flow_with_loop, domain) - - selection = executor.select_next_action(tracker) - assert selection.action_name == "action_listen" - - -def test_flow_policy_resets_all_slots_after_flow_ends() -> None: - flows = flows_from_str( - """ - flows: - foo_flow: - steps: - - id: "1" - collect: my_slot - - id: "2" - set_slots: - - foo: bar - - other_slot: other_value - - id: "3" - action: action_listen - """ - ) - tracker = DialogueStateTracker.from_events( - "test", - [ - SlotSet("my_slot", "my_value"), - SlotSet("foo", "bar"), - SlotSet("other_slot", "other_value"), - ActionExecuted("action_listen"), - ], - slots=[ - TextSlot("my_slot", mappings=[], initial_value="initial_value"), - TextSlot("foo", mappings=[]), - TextSlot("other_slot", mappings=[]), - ], - ) - - domain = Domain.empty() - executor = FlowExecutor.from_tracker(tracker, flows, domain) - - current_flow = flows.flow_by_id("foo_flow") - events = executor._reset_scoped_slots(current_flow, tracker) - assert events == [ - SlotSet("my_slot", "initial_value"), - SlotSet("foo", None), - SlotSet("other_slot", None), - ] - - -def test_flow_policy_set_slots_inherit_reset_from_collect_step() -> None: - """Test that `reset_after_flow_ends` is inherited from the collect step.""" - slot_name = "my_slot" - flows = flows_from_str( - f""" - flows: - foo_flow: - steps: - - id: "1" - collect: {slot_name} - reset_after_flow_ends: false - - id: "2" - set_slots: - - foo: bar - - {slot_name}: my_value - - id: "3" - action: action_listen - """ - ) - tracker = DialogueStateTracker.from_events( - "test123", - [ - SlotSet("my_slot", "my_value"), - SlotSet("foo", "bar"), - ActionExecuted("action_listen"), - ], - slots=[ - TextSlot("my_slot", mappings=[], initial_value="initial_value"), - TextSlot("foo", mappings=[]), - ], - ) - - domain = Domain.empty() - executor = FlowExecutor.from_tracker(tracker, flows, domain) - - current_flow = flows.flow_by_id("foo_flow") - events = executor._reset_scoped_slots(current_flow, tracker) - assert events == [ - SlotSet("foo", None), - ] + assert event.value[1]["flow_id"] == "pattern_internal_error"