From 512b5242d51a04c9a17a9fb1fc3cbb6214a78a60 Mon Sep 17 00:00:00 2001 From: Jedr Blaszyk Date: Thu, 13 Jun 2024 14:34:16 +0200 Subject: [PATCH 1/5] WIP use Connector API --- connectors/config.py | 2 +- connectors/es/index.py | 43 ++++++++++++++++ connectors/protocol/connectors.py | 82 ++++++++++++++++++++++--------- connectors/utils.py | 13 +++-- 4 files changed, 112 insertions(+), 28 deletions(-) diff --git a/connectors/config.py b/connectors/config.py index 689c28739..7497577e9 100644 --- a/connectors/config.py +++ b/connectors/config.py @@ -78,7 +78,7 @@ def _default_config(): "initial_backoff_duration": 1, "backoff_multiplier": 2, "log_level": "info", - "feature_use_connectors_api": False, + "feature_use_connectors_api": True, }, "service": { "idling": 30, diff --git a/connectors/es/index.py b/connectors/es/index.py index 09786213a..2b863362d 100644 --- a/connectors/es/index.py +++ b/connectors/es/index.py @@ -34,6 +34,30 @@ async def connector_check_in(self, connector_id): headers={"accept": "application/json"}, ) + async def connector_update_error(self, connector_id, error): + await self.client.perform_request( + "PUT", + f"/_connector/{connector_id}/_error", + headers={"accept": "application/json", "Content-Type": "application/json"}, + body={"error": error}, + ) + + async def connector_update_status(self, connector_id, status): + await self.client.perform_request( + "PUT", + f"/_connector/{connector_id}/_status", + headers={"accept": "application/json", "Content-Type": "application/json"}, + body={"status": status}, + ) + + async def connector_update_last_sync_info(self, connector_id, last_sync_info): + await self.client.perform_request( + "PUT", + f"/_connector/{connector_id}/_last_sync", + headers={"accept": "application/json", "Content-Type": "application/json"}, + body=last_sync_info, + ) + async def connector_update_filtering_draft_validation( self, connector_id, validation_result ): @@ -62,6 +86,25 @@ async def connector_check_in(self, connector_id): partial(self._api_wrapper.connector_check_in, connector_id) ) + async def connector_update_error(self, connector_id, error): + await self._retrier.execute_with_retry( + partial(self._api_wrapper.connector_update_error, connector_id, error) + ) + + async def connector_update_status(self, connector_id, status): + await self._retrier.execute_with_retry( + partial(self._api_wrapper.connector_update_status, connector_id, status) + ) + + async def connector_update_last_sync_info(self, connector_id, last_sync_info): + await self._retrier.execute_with_retry( + partial( + self._api_wrapper.connector_update_last_sync_info, + connector_id, + last_sync_info, + ) + ) + async def connector_update_filtering_draft_validation( self, connector_id, validation_result ): diff --git a/connectors/protocol/connectors.py b/connectors/protocol/connectors.py index 21847cd3f..1d8a50ca7 100644 --- a/connectors/protocol/connectors.py +++ b/connectors/protocol/connectors.py @@ -696,12 +696,17 @@ def next_sync(self, job_type, now): return next_run(scheduling_property.get("interval"), now) async def _update_datetime(self, field, new_ts): - await self.index.update( - doc_id=self.id, - doc={field: iso_utc(new_ts)}, - if_seq_no=self._seq_no, - if_primary_term=self._primary_term, - ) + if self.index.feature_use_connectors_api: + await self.index.api.connector_update_last_sync_info( + connector_id=self.id, last_sync_info={field: iso_utc(new_ts)} + ) + else: + await self.index.update( + doc_id=self.id, + doc={field: iso_utc(new_ts)}, + if_seq_no=self._seq_no, + if_primary_term=self._primary_term, + ) async def update_last_sync_scheduled_at_by_job_type(self, job_type, new_ts): match job_type: @@ -734,24 +739,43 @@ async def sync_starts(self, job_type): msg = f"Unknown job type: {job_type}" raise ValueError(msg) - doc = { - "status": Status.CONNECTED.value, - "error": None, - } | last_sync_information + if self.index.feature_use_connectors_api: + await self.index.api.connector_update_last_sync_info( + connector_id=self.id, last_sync_info=last_sync_information + ) + await self.index.api.connector_update_status( + connector_id=self.id, status=Status.CONNECTED.value + ) + await self.index.api.connector_update_error( + connector_id=self.id, error=None + ) + else: + doc = { + "status": Status.CONNECTED.value, + "error": None, + } | last_sync_information - await self.index.update( - doc_id=self.id, - doc=doc, - if_seq_no=self._seq_no, - if_primary_term=self._primary_term, - ) + await self.index.update( + doc_id=self.id, + doc=doc, + if_seq_no=self._seq_no, + if_primary_term=self._primary_term, + ) async def error(self, error): - doc = { - "status": Status.ERROR.value, - "error": str(error), - } - await self.index.update(doc_id=self.id, doc=doc) + if self.index.feature_use_connectors_api: + await self.index.api.connector_update_error( + connector_id=self.id, error=error + ) + await self.index.api.connector_update_status( + connector_id=self.id, status=Status.ERROR.value + ) + else: + doc = { + "status": Status.ERROR.value, + "error": str(error), + } + await self.index.update(doc_id=self.id, doc=doc) async def sync_done(self, job, cursor=None): job_status = JobStatus.ERROR if job is None else job.status @@ -790,8 +814,6 @@ async def sync_done(self, job, cursor=None): doc = { "last_synced": iso_utc(), - "status": connector_status.value, - "error": job_error, } | last_sync_information # only update sync cursor after a successful content sync job @@ -802,7 +824,19 @@ async def sync_done(self, job, cursor=None): doc["last_indexed_document_count"] = job.indexed_document_count doc["last_deleted_document_count"] = job.deleted_document_count - await self.index.update(doc_id=self.id, doc=doc) + if self.index.feature_use_connectors_api: + await self.index.api.connector_update_status( + connector_id=self.id, status=connector_status.value + ) + await self.index.api.connector_update_error( + connector_id=self.id, error=job_error + ) + await self.index.api.connector_update_last_sync_info( + connector_id=self.id, last_sync_info=last_sync_information + ) + else: + doc = doc | {"status": connector_status.value, "error": job_error} + await self.index.update(doc_id=self.id, doc=doc) @with_concurrency_control() async def prepare(self, config, sources): diff --git a/connectors/utils.py b/connectors/utils.py index b7bc06d32..484009be2 100644 --- a/connectors/utils.py +++ b/connectors/utils.py @@ -456,9 +456,16 @@ def _callback(self, task): f"Task {task.get_name()} was cancelled", ) elif task.exception(): - logger.error( - f"Exception found for task {task.get_name()}: {task.exception()}", - ) + try: + raise task.exception() + except Exception as e: + logger.error( + f"Exception found for task {task.get_name()}: {e}", + exc_info=True, + ) + # logger.error( + # f"Exception found for task {task.get_name()}: {task.exception()} {task}" + # ) def _add_task(self, coroutine, name=None): task = asyncio.create_task(coroutine(), name=name) From 1ffa5dbce1f54024aa9a486f96ff7fed78445ded Mon Sep 17 00:00:00 2001 From: Jedr Blaszyk Date: Mon, 17 Jun 2024 14:54:18 +0200 Subject: [PATCH 2/5] Add tests --- connectors/config.py | 2 +- tests/protocol/test_connectors.py | 258 ++++++++++++++++++++++++++++++ 2 files changed, 259 insertions(+), 1 deletion(-) diff --git a/connectors/config.py b/connectors/config.py index 7497577e9..689c28739 100644 --- a/connectors/config.py +++ b/connectors/config.py @@ -78,7 +78,7 @@ def _default_config(): "initial_backoff_duration": 1, "backoff_multiplier": 2, "log_level": "info", - "feature_use_connectors_api": True, + "feature_use_connectors_api": False, }, "service": { "idling": 30, diff --git a/tests/protocol/test_connectors.py b/tests/protocol/test_connectors.py index 7f7e12001..b269cdf7e 100644 --- a/tests/protocol/test_connectors.py +++ b/tests/protocol/test_connectors.py @@ -444,6 +444,7 @@ async def test_sync_starts(job_type, expected_doc_source_update): connector_doc = {"_id": doc_id, "_seq_no": seq_no, "_primary_term": primary_term} index = Mock() index.update = AsyncMock() + index.feature_use_connectors_api = False connector = Connector(elastic_index=index, doc_source=connector_doc) await connector.sync_starts(job_type) @@ -455,12 +456,65 @@ async def test_sync_starts(job_type, expected_doc_source_update): ) +@pytest.mark.parametrize( + "job_type, last_sync_info, status, error", + [ + ( + JobType.FULL, + { + "last_sync_status": JobStatus.IN_PROGRESS.value, + "last_sync_error": None, + }, + Status.CONNECTED.value, + None, + ), + ( + JobType.INCREMENTAL, + {"last_sync_status": JobStatus.IN_PROGRESS.value, "last_sync_error": None}, + Status.CONNECTED.value, + None, + ), + ( + JobType.ACCESS_CONTROL, + { + "last_access_control_sync_status": JobStatus.IN_PROGRESS.value, + "last_access_control_sync_error": None, + }, + Status.CONNECTED.value, + None, + ), + ], +) +@pytest.mark.asyncio +async def test_sync_starts_with_connector_api(job_type, last_sync_info, status, error): + doc_id = "1" + connector_doc = {"_id": doc_id} + index = Mock() + index.api.connector_update_error = AsyncMock() + index.api.connector_update_status = AsyncMock() + index.api.connector_update_last_sync_info = AsyncMock() + index.feature_use_connectors_api = True + + connector = Connector(elastic_index=index, doc_source=connector_doc) + await connector.sync_starts(job_type) + index.api.connector_update_last_sync_info.assert_called_with( + connector_id=connector.id, last_sync_info=last_sync_info + ) + index.api.connector_update_status.assert_called_with( + connector_id=connector.id, status=status + ) + index.api.connector_update_error.assert_called_with( + connector_id=connector.id, error=error + ) + + @pytest.mark.asyncio async def test_connector_error(): connector_doc = {"_id": "1"} error = "something wrong" index = Mock() index.update = AsyncMock(return_value=1) + index.feature_use_connectors_api = False expected_doc_source_update = { "status": Status.ERROR.value, "error": error, @@ -471,6 +525,24 @@ async def test_connector_error(): index.update.assert_called_with(doc_id=connector.id, doc=expected_doc_source_update) +@pytest.mark.asyncio +async def test_connector_error_with_connector_api(): + connector_doc = {"_id": "1"} + error = "something wrong" + index = Mock() + index.api.connector_update_error = AsyncMock() + index.api.connector_update_status = AsyncMock() + index.feature_use_connectors_api = True + connector = Connector(elastic_index=index, doc_source=connector_doc) + await connector.error(error) + index.api.connector_update_error.assert_called_with( + connector_id=connector.id, error=error + ) + index.api.connector_update_status.assert_called_with( + connector_id=connector.id, status=Status.ERROR.value + ) + + def mock_job( status=JobStatus.COMPLETED, job_type=JobType.FULL, @@ -627,12 +699,167 @@ async def test_sync_done(job, expected_doc_source_update): connector_doc = {"_id": "1"} index = Mock() index.update = AsyncMock(return_value=1) + index.feature_use_connectors_api = False connector = Connector(elastic_index=index, doc_source=connector_doc) await connector.sync_done(job=job, cursor=SYNC_CURSOR) index.update.assert_called_with(doc_id=connector.id, doc=expected_doc_source_update) +@pytest.mark.asyncio +@pytest.mark.parametrize( + "job, last_sync_info, error, status", + [ + ( + None, + { + "last_access_control_sync_error": JOB_NOT_FOUND_ERROR, + "last_access_control_sync_status": JobStatus.ERROR.value, + "last_sync_error": JOB_NOT_FOUND_ERROR, + "last_sync_status": JobStatus.ERROR.value, + "last_synced": ANY, + "last_indexed_document_count": 0, + "last_deleted_document_count": 0, + }, + JOB_NOT_FOUND_ERROR, + Status.ERROR.value, + ), + ( + mock_job( + status=JobStatus.ERROR, job_type=JobType.FULL, error="something wrong" + ), + { + "last_sync_status": JobStatus.ERROR.value, + "last_synced": ANY, + "last_sync_error": "something wrong", + "last_indexed_document_count": 0, + "last_deleted_document_count": 0, + }, + "something wrong", + Status.ERROR.value, + ), + ( + mock_job(status=JobStatus.CANCELED, job_type=JobType.FULL), + { + "last_sync_status": JobStatus.CANCELED.value, + "last_synced": ANY, + "last_sync_error": None, + "last_indexed_document_count": 0, + "last_deleted_document_count": 0, + }, + None, + Status.CONNECTED.value, + ), + ( + mock_job( + status=JobStatus.SUSPENDED, job_type=JobType.FULL, terminated=False + ), + { + "last_sync_status": JobStatus.SUSPENDED.value, + "last_synced": ANY, + "last_sync_error": None, + }, + None, + Status.CONNECTED.value, + ), + ( + mock_job(job_type=JobType.FULL), + { + "last_sync_status": JobStatus.COMPLETED.value, + "last_synced": ANY, + "last_sync_error": None, + "sync_cursor": SYNC_CURSOR, + "last_indexed_document_count": 0, + "last_deleted_document_count": 0, + }, + None, + Status.CONNECTED.value, + ), + ( + mock_job(job_type=JobType.FULL), + { + "last_sync_status": JobStatus.COMPLETED.value, + "last_synced": ANY, + "last_sync_error": None, + "sync_cursor": SYNC_CURSOR, + "last_indexed_document_count": 0, + "last_deleted_document_count": 0, + }, + None, + Status.CONNECTED.value, + ), + ( + mock_job(job_type=JobType.ACCESS_CONTROL), + { + "last_access_control_sync_status": JobStatus.COMPLETED.value, + "last_synced": ANY, + "last_access_control_sync_error": None, + "last_indexed_document_count": 0, + "last_deleted_document_count": 0, + }, + None, + Status.CONNECTED.value, + ), + ( + mock_job( + status=JobStatus.ERROR, + job_type=JobType.ACCESS_CONTROL, + error="something wrong", + ), + { + "last_access_control_sync_status": JobStatus.ERROR.value, + "last_synced": ANY, + "last_access_control_sync_error": "something wrong", + "last_indexed_document_count": 0, + "last_deleted_document_count": 0, + }, + "something wrong", + Status.ERROR.value, + ), + ( + mock_job( + status=JobStatus.SUSPENDED, + job_type=JobType.ACCESS_CONTROL, + terminated=False, + ), + { + "last_access_control_sync_status": JobStatus.SUSPENDED.value, + "last_synced": ANY, + "last_access_control_sync_error": None, + }, + None, + Status.CONNECTED.value, + ), + ( + mock_job(status=JobStatus.CANCELED, job_type=JobType.ACCESS_CONTROL), + { + "last_access_control_sync_status": JobStatus.CANCELED.value, + "last_synced": ANY, + "last_access_control_sync_error": None, + "last_indexed_document_count": 0, + "last_deleted_document_count": 0, + }, + None, + Status.CONNECTED.value, + ), + ], +) +async def test_sync_done_with_connector_api(job, last_sync_info, error, status): + connector_doc = {"_id": "1"} + index = Mock() + index.feature_use_connectors_api = True + index.api.connector_update_error = AsyncMock() + index.api.connector_update_status = AsyncMock() + index.api.connector_update_last_sync_info = AsyncMock() + connector = Connector(elastic_index=index, doc_source=connector_doc) + await connector.sync_done(job=job, cursor=SYNC_CURSOR) + index.api.connector_update_last_sync_info( + connector_id=connector.id, doc=last_sync_info + ) + index.api.connector_update_error(connector_id=connector.id, error=error) + index.api.connector_update_status(connector_id=connector.id, status=status) + + mock_next_run = iso_utc() @@ -1410,6 +1637,7 @@ async def test_connector_update_last_sync_scheduled_at_by_job_type( } index = Mock() index.update = AsyncMock() + index.feature_use_connectors_api = False connector = Connector(elastic_index=index, doc_source=connector_doc) await connector.update_last_sync_scheduled_at_by_job_type(job_type, new_ts) @@ -1421,6 +1649,36 @@ async def test_connector_update_last_sync_scheduled_at_by_job_type( ) +@pytest.mark.parametrize( + "job_type, date_field_to_update", + [ + (JobType.FULL, "last_sync_scheduled_at"), + (JobType.INCREMENTAL, "last_incremental_sync_scheduled_at"), + (JobType.ACCESS_CONTROL, "last_access_control_sync_scheduled_at"), + ], +) +@pytest.mark.asyncio +async def test_connector_update_last_sync_scheduled_at_by_job_type_with_connector_api( + job_type, date_field_to_update +): + doc_id = "2" + new_ts = datetime.now(timezone.utc) + timedelta(seconds=30) + connector_doc = { + "_id": doc_id, + "_source": {}, + } + index = Mock() + index.update = AsyncMock() + index.api.connector_update_last_sync_info = AsyncMock() + index.feature_use_connectors_api = True + connector = Connector(elastic_index=index, doc_source=connector_doc) + await connector.update_last_sync_scheduled_at_by_job_type(job_type, new_ts) + + index.api.connector_update_last_sync_info.assert_awaited_once_with( + connector_id=doc_id, last_sync_info={date_field_to_update: new_ts.isoformat()} + ) + + @pytest.mark.asyncio async def test_connector_validate_filtering_not_edited(): index = Mock() From a2475a09d6c70389f50ab97df2e23078a5b6a40b Mon Sep 17 00:00:00 2001 From: Jedr Blaszyk Date: Tue, 25 Jun 2024 09:14:31 +0200 Subject: [PATCH 3/5] fix changes local --- connectors/utils.py | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/connectors/utils.py b/connectors/utils.py index 484009be2..16a891110 100644 --- a/connectors/utils.py +++ b/connectors/utils.py @@ -456,16 +456,9 @@ def _callback(self, task): f"Task {task.get_name()} was cancelled", ) elif task.exception(): - try: - raise task.exception() - except Exception as e: - logger.error( - f"Exception found for task {task.get_name()}: {e}", - exc_info=True, - ) - # logger.error( - # f"Exception found for task {task.get_name()}: {task.exception()} {task}" - # ) + logger.error( + f"Exception found for task {task.get_name()}: {task.exception()} {task}" + ) def _add_task(self, coroutine, name=None): task = asyncio.create_task(coroutine(), name=name) From b27b41516f8b6384256500a796c5fef413bdc88e Mon Sep 17 00:00:00 2001 From: Jedr Blaszyk Date: Fri, 28 Jun 2024 11:08:20 +0200 Subject: [PATCH 4/5] Consolidate update status and error into a single call --- connectors/protocol/connectors.py | 15 +++------------ tests/protocol/test_connectors.py | 10 ---------- 2 files changed, 3 insertions(+), 22 deletions(-) diff --git a/connectors/protocol/connectors.py b/connectors/protocol/connectors.py index 2d87c5361..1ee3d7a5d 100644 --- a/connectors/protocol/connectors.py +++ b/connectors/protocol/connectors.py @@ -751,15 +751,12 @@ async def sync_starts(self, job_type): raise ValueError(msg) if self.index.feature_use_connectors_api: - await self.index.api.connector_update_last_sync_info( - connector_id=self.id, last_sync_info=last_sync_information - ) - await self.index.api.connector_update_status( - connector_id=self.id, status=Status.CONNECTED.value - ) await self.index.api.connector_update_error( connector_id=self.id, error=None ) + await self.index.api.connector_update_last_sync_info( + connector_id=self.id, last_sync_info=last_sync_information + ) else: doc = { "status": Status.CONNECTED.value, @@ -778,9 +775,6 @@ async def error(self, error): await self.index.api.connector_update_error( connector_id=self.id, error=error ) - await self.index.api.connector_update_status( - connector_id=self.id, status=Status.ERROR.value - ) else: doc = { "status": Status.ERROR.value, @@ -836,9 +830,6 @@ async def sync_done(self, job, cursor=None): doc["last_deleted_document_count"] = job.deleted_document_count if self.index.feature_use_connectors_api: - await self.index.api.connector_update_status( - connector_id=self.id, status=connector_status.value - ) await self.index.api.connector_update_error( connector_id=self.id, error=job_error ) diff --git a/tests/protocol/test_connectors.py b/tests/protocol/test_connectors.py index 3a0023758..54b4f03b4 100644 --- a/tests/protocol/test_connectors.py +++ b/tests/protocol/test_connectors.py @@ -491,7 +491,6 @@ async def test_sync_starts_with_connector_api(job_type, last_sync_info, status, connector_doc = {"_id": doc_id} index = Mock() index.api.connector_update_error = AsyncMock() - index.api.connector_update_status = AsyncMock() index.api.connector_update_last_sync_info = AsyncMock() index.feature_use_connectors_api = True @@ -500,9 +499,6 @@ async def test_sync_starts_with_connector_api(job_type, last_sync_info, status, index.api.connector_update_last_sync_info.assert_called_with( connector_id=connector.id, last_sync_info=last_sync_info ) - index.api.connector_update_status.assert_called_with( - connector_id=connector.id, status=status - ) index.api.connector_update_error.assert_called_with( connector_id=connector.id, error=error ) @@ -531,16 +527,12 @@ async def test_connector_error_with_connector_api(): error = "something wrong" index = Mock() index.api.connector_update_error = AsyncMock() - index.api.connector_update_status = AsyncMock() index.feature_use_connectors_api = True connector = Connector(elastic_index=index, doc_source=connector_doc) await connector.error(error) index.api.connector_update_error.assert_called_with( connector_id=connector.id, error=error ) - index.api.connector_update_status.assert_called_with( - connector_id=connector.id, status=Status.ERROR.value - ) def mock_job( @@ -849,7 +841,6 @@ async def test_sync_done_with_connector_api(job, last_sync_info, error, status): index = Mock() index.feature_use_connectors_api = True index.api.connector_update_error = AsyncMock() - index.api.connector_update_status = AsyncMock() index.api.connector_update_last_sync_info = AsyncMock() connector = Connector(elastic_index=index, doc_source=connector_doc) await connector.sync_done(job=job, cursor=SYNC_CURSOR) @@ -857,7 +848,6 @@ async def test_sync_done_with_connector_api(job, last_sync_info, error, status): connector_id=connector.id, doc=last_sync_info ) index.api.connector_update_error(connector_id=connector.id, error=error) - index.api.connector_update_status(connector_id=connector.id, status=status) mock_next_run = iso_utc() From 2016ac13320c8bee80859daea5d97428820dcd85 Mon Sep 17 00:00:00 2001 From: Jedr Blaszyk Date: Fri, 28 Jun 2024 11:33:43 +0200 Subject: [PATCH 5/5] Fix linting --- connectors/es/index.py | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/connectors/es/index.py b/connectors/es/index.py index 169441153..42fdbbd6c 100644 --- a/connectors/es/index.py +++ b/connectors/es/index.py @@ -42,14 +42,6 @@ async def connector_update_error(self, connector_id, error): body={"error": error}, ) - async def connector_update_status(self, connector_id, status): - await self.client.perform_request( - "PUT", - f"/_connector/{connector_id}/_status", - headers={"accept": "application/json", "Content-Type": "application/json"}, - body={"status": status}, - ) - async def connector_update_last_sync_info(self, connector_id, last_sync_info): await self.client.perform_request( "PUT", @@ -127,11 +119,6 @@ async def connector_update_error(self, connector_id, error): partial(self._api_wrapper.connector_update_error, connector_id, error) ) - async def connector_update_status(self, connector_id, status): - await self._retrier.execute_with_retry( - partial(self._api_wrapper.connector_update_status, connector_id, status) - ) - async def connector_update_last_sync_info(self, connector_id, last_sync_info): await self._retrier.execute_with_retry( partial(