Skip to content
This repository has been archived by the owner on Mar 1, 2024. It is now read-only.

Commit

Permalink
SingularBusinessLogicPipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
wintonzheng committed Sep 17, 2023
1 parent 20adf41 commit e62f670
Showing 1 changed file with 148 additions and 40 deletions.
188 changes: 148 additions & 40 deletions wyvern/components/business_logic/business_logic.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,23 @@ class BusinessLogicRequest(
"""

request: REQUEST_ENTITY
scored_candidates: List[ScoredCandidate[GENERALIZED_WYVERN_ENTITY]]
scored_candidates: List[ScoredCandidate[GENERALIZED_WYVERN_ENTITY]] = []

# TODO (suchintan): Give business logic layer access to the feature map in the future
# feature_map: FeatureMap

class SingularBusinessLogicRequest(
GenericModel,
Generic[GENERALIZED_WYVERN_ENTITY, REQUEST_ENTITY],
):
"""
A request to the business logic layer to perform business logic on a single candidate
Parameters:
request: The request that the business logic layer is being asked to perform business logic on
candidate: The candidate that the business logic layer is being asked to perform business logic on
"""

request: REQUEST_ENTITY
scored_candidate: ScoredCandidate[GENERALIZED_WYVERN_ENTITY]


# TODO (suchintan): Possibly delete this now that events are gone
Expand All @@ -83,6 +96,22 @@ class BusinessLogicResponse(
adjusted_candidates: List[ScoredCandidate[GENERALIZED_WYVERN_ENTITY]]


class SingularBusinessLogicResponse(
GenericModel,
Generic[GENERALIZED_WYVERN_ENTITY, REQUEST_ENTITY],
):
"""
The response from the business logic layer after performing business logic on a single candidate
Parameters:
request: The request that the business logic layer was asked to perform business logic on
adjusted_candidate: The candidate that the business logic layer performed business logic on
"""

request: SingularBusinessLogicRequest[GENERALIZED_WYVERN_ENTITY, REQUEST_ENTITY]
adjusted_candidate: ScoredCandidate[GENERALIZED_WYVERN_ENTITY]


class BusinessLogicComponent(
Component[
BusinessLogicRequest[GENERALIZED_WYVERN_ENTITY, REQUEST_ENTITY],
Expand All @@ -99,11 +128,72 @@ class BusinessLogicComponent(
pass


class SingularBusinessLogicComponent(
Component[
SingularBusinessLogicRequest[GENERALIZED_WYVERN_ENTITY, REQUEST_ENTITY],
ScoredCandidate[GENERALIZED_WYVERN_ENTITY],
],
Generic[GENERALIZED_WYVERN_ENTITY, REQUEST_ENTITY],
):
"""
A component that performs business logic on an entity with a set of candidates
The request itself could contain more than just entities, for example it may contain a query and so on
"""

pass


class ExtractEventMixin(Generic[GENERALIZED_WYVERN_ENTITY]):
def extract_business_logic_events(
self,
output: List[ScoredCandidate[GENERALIZED_WYVERN_ENTITY]],
pipeline_index: int,
upstream_name: str,
request_id: str,
old_scores: List[float],
) -> List[BusinessLogicEvent]:
"""
Extracts the business logic events from the output of a business logic component
Args:
output: The output of a business logic component
pipeline_index: The index of the business logic component in the business logic pipeline
upstream_name: The name of the business logic component
request_id: The request id of the request that the business logic component was called in
old_scores: The old scores of the candidates that the business logic component was called on
Returns:
The business logic events that were extracted from the output of the business logic component
"""
timestamp = datetime.utcnow()
events = [
BusinessLogicEvent(
request_id=request_id,
api_source=request_context.ensure_current_request().url_path,
event_timestamp=timestamp,
event_data=BusinessLogicEventData(
business_logic_pipeline_order=pipeline_index,
business_logic_name=upstream_name,
old_score=old_scores[j],
new_score=output[j].score,
entity_identifier=candidate.entity.identifier.identifier,
entity_identifier_type=candidate.entity.identifier.identifier_type,
),
)
for (j, candidate) in enumerate(output)
if output[j].score != old_scores[j]
]

return events


class BusinessLogicPipeline(
Component[
BusinessLogicRequest[GENERALIZED_WYVERN_ENTITY, REQUEST_ENTITY],
BusinessLogicResponse[GENERALIZED_WYVERN_ENTITY, REQUEST_ENTITY],
],
ExtractEventMixin[GENERALIZED_WYVERN_ENTITY],
Generic[GENERALIZED_WYVERN_ENTITY, REQUEST_ENTITY],
):
"""
Expand Down Expand Up @@ -141,7 +231,7 @@ async def execute(
"""
argument = input

# Make sure that the inputted candidates are actually sorted
# Make sure that the input candidates are actually sorted
output = await self.sorting_component.execute(input.scored_candidates)

for (pipeline_index, upstream) in enumerate(self.ordered_upstreams):
Expand Down Expand Up @@ -185,44 +275,62 @@ def log_events(
adjusted_candidates=output,
)

def extract_business_logic_events(

class SingularBusinessLogicPipeline(
Component[
SingularBusinessLogicRequest[GENERALIZED_WYVERN_ENTITY, REQUEST_ENTITY],
SingularBusinessLogicResponse[GENERALIZED_WYVERN_ENTITY, REQUEST_ENTITY],
],
ExtractEventMixin[GENERALIZED_WYVERN_ENTITY],
Generic[GENERALIZED_WYVERN_ENTITY, REQUEST_ENTITY],
):
def __init__(
self,
output: List[ScoredCandidate[GENERALIZED_WYVERN_ENTITY]],
pipeline_index: int,
upstream_name: str,
request_id: str,
old_scores: List[float],
) -> List[BusinessLogicEvent]:
"""
Extracts the business logic events from the output of a business logic component
*upstreams: SingularBusinessLogicComponent[
GENERALIZED_WYVERN_ENTITY,
REQUEST_ENTITY,
],
name: Optional[str] = None,
):
self.ordered_upstreams = upstreams
super().__init__(*upstreams, name=name)

Args:
output: The output of a business logic component
pipeline_index: The index of the business logic component in the business logic pipeline
upstream_name: The name of the business logic component
request_id: The request id of the request that the business logic component was called in
old_scores: The old scores of the candidates that the business logic component was called on
async def execute(
self,
input: SingularBusinessLogicRequest[GENERALIZED_WYVERN_ENTITY, REQUEST_ENTITY],
**kwargs,
) -> SingularBusinessLogicResponse[GENERALIZED_WYVERN_ENTITY, REQUEST_ENTITY]:
argument = input
for (pipeline_index, upstream) in enumerate(self.ordered_upstreams):
old_scores = [argument.scored_candidate.score]

Returns:
The business logic events that were extracted from the output of the business logic component
"""
timestamp = datetime.utcnow()
events = [
BusinessLogicEvent(
request_id=request_id,
api_source=request_context.ensure_current_request().url_path,
event_timestamp=timestamp,
event_data=BusinessLogicEventData(
business_logic_pipeline_order=pipeline_index,
business_logic_name=upstream_name,
old_score=old_scores[j],
new_score=output[j].score,
entity_identifier=candidate.entity.identifier.identifier,
entity_identifier_type=candidate.entity.identifier.identifier_type,
),
# this output might have the same reference as the argument.scored_candidates
output = await upstream.execute(input=argument, **kwargs)

extracted_events: List[
BusinessLogicEvent
] = self.extract_business_logic_events(
[output],
pipeline_index,
upstream.name,
argument.request.request_id,
old_scores,
)
for (j, candidate) in enumerate(output)
if output[j].score != old_scores[j]
]

return events
def log_events(
extracted_events: List[BusinessLogicEvent] = extracted_events,
):
return extracted_events

# TODO (suchintan): "invariant" list error
event_logger.log_events(log_events) # type: ignore

argument = SingularBusinessLogicRequest(
request=input.request,
scored_candidate=output,
)

return SingularBusinessLogicResponse(
request=input,
adjusted_candidate=argument.scored_candidate,
)

0 comments on commit e62f670

Please sign in to comment.