Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Iceberg cache #355

Open
wants to merge 31 commits into
base: main
Choose a base branch
from
Open

Feat: Iceberg cache #355

wants to merge 31 commits into from

Conversation

aaronsteers
Copy link
Contributor

@aaronsteers aaronsteers commented Sep 2, 2024

Summary by CodeRabbit

  • New Features

    • Introduced new message classes to ensure data fields are represented as JSON strings.
    • Added IcebergSqlProcessor and IcebergCache for enhanced SQL processing and caching functionalities.
    • Implemented ParquetWriter for writing data in Parquet format.
    • Added a proof of concept for using the Polars library with S3 data.
  • Improvements

    • Enhanced message handling with improved parsing logic across various components.
    • Streamlined version control by updating .gitignore to exclude specific test artifacts.
  • Documentation

    • Updated and added documentation for new classes and methods to improve developer guidance.
  • Chores

    • Added new dependencies for polars and boto3 to support enhanced data processing capabilities.

Copy link

coderabbitai bot commented Sep 2, 2024

Walkthrough

Walkthrough

The changes introduce various modifications across the Airbyte codebase, including the addition of custom message classes for JSON string handling, enhancements to connector execution and parsing methods, and the introduction of SQL processors for Iceberg and DuckDB. Additionally, utility functions for schema conversion have been added, and new dependencies for data manipulation and AWS interactions are now included in the project.

Changes

Files Change Summary
.gitignore Added patterns to ignore performance test artifacts and files matching *test-artifact*.
airbyte/_airbyte_message_overrides.py Introduced custom message classes to ensure data fields are JSON strings; added a validator function for data type enforcement.
airbyte/_connector_base.py Renamed _execute to _execute_and_parse for clarity; added a new _execute method for executing connectors with error handling.
airbyte/_message_iterators.py Added as_filelike method to provide a file-like interface for message iterators.
airbyte/_processors/sql/duckdb.py Modified the docstring of DuckDBSqlProcessor to a brief description, with no functional changes.
airbyte/_processors/sql/iceberg.py Introduced IcebergSqlProcessor for SQL processing with Iceberg tables, including state management and file writing operations.
airbyte/_util/polars.py Added utility functions for converting JSON schemas to Polars and PyArrow schemas, encapsulated in PolarsStreamSchema.
airbyte/_util/telemetry.py Added import for AirbyteWriterInterface to enhance telemetry functionality.
airbyte/_writers/__init__.py Exported ParquetWriter to make it available for import.
airbyte/_writers/file_writers.py Changed _flush_active_batch to flush_active_batch, altering it from private to public.
airbyte/_writers/parquet.py Introduced ParquetWriter class for writing data in Parquet format, managing record caching and file handling.
airbyte/caches/base.py Added import for AirbyteWriterInterface to enhance caching functionality.
airbyte/caches/iceberg.py Introduced IcebergCache class for caching with Iceberg, including a SQL processor for operations.
airbyte/destinations/base.py Modified _write_airbyte_message_stream to use _execute_and_parse, indicating a shift in message processing.
airbyte/progress.py Added import for AirbyteWriterInterface to enhance progress tracking functionality.
airbyte/results.py Added import for AirbyteWriterInterface, potentially enhancing result handling.
airbyte/sources/base.py Updated _discover, _get_spec, and _read_with_catalog methods to use _execute_and_parse, improving message processing during execution.
airbyte/writers.py Defined AirbyteWriterInterface, establishing a contract for Airbyte message writers with essential methods and properties.
examples/run_polars_poc.py Introduced a proof-of-concept script for using Polars with S3 data, including functions for data retrieval, transformation, and writing.
pyproject.toml Added dependencies for polars and boto3 to support data manipulation and AWS interactions.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant Connector
    participant MessageParser
    participant WriterInterface

    User->>Connector: Execute command
    Connector->>MessageParser: Parse messages
    MessageParser->>Connector: Return parsed messages
    Connector->>WriterInterface: Write data
    WriterInterface-->>User: Confirm data written
Loading

wdyt?


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

Share
Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai generate interesting stats about this repository and render them as a table.
    • @coderabbitai show all the console.log statements in this repository.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai or @coderabbitai title anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Tip

Early access features: enabled

We are currently testing the following features in early access:

  • Anthropic claude-3-5-sonnet for code reviews: Anthropic claims that the new Claude model has stronger code understanding and code generation capabilities than their previous models. Note: Our default code review model was also updated late last week. Please compare the quality of the reviews between the two models by toggling the early access feature.

Note:

  • You can enable or disable early access features from the CodeRabbit UI or by updating the CodeRabbit configuration file.
  • Please join our Discord Community to provide feedback and report issues on the discussion post.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 21

Outside diff range, codebase verification and nitpick comments (18)
airbyte/caches/iceberg.py (3)

17-25: Consider reordering imports for better readability?

The imports look good, but we could improve their organization slightly. What do you think about reordering them according to PEP 8 guidelines? Something like this:

from __future__ import annotations

import warnings

from iceberg_engine import IcebergEngineWarning
from pydantic import PrivateAttr

from airbyte._processors.sql.iceberg import IcebergConfig, IcebergSqlProcessor
from airbyte.caches.base import CacheBase

This groups standard library imports, third-party imports, and local imports separately. Wdyt?


4-14: Enhance the usage example for clarity?

The usage example is helpful, but we could make it even better. How about these suggestions:

  1. Correct the import statement from airbyte as ab to import airbyte as ab.
  2. Add a comment explaining what the db_path and schema_name parameters represent.
  3. Include an example of how to use the cache after initialization.

Something like this:

import airbyte as ab
from airbyte.caches import IcebergCache

# Initialize the Iceberg cache
cache = IcebergCache(
    db_path="/path/to/my/iceberg-file",  # Path to the Iceberg database file
    schema_name="myschema",  # Name of the schema to use in the Iceberg database
)

# Example usage of the cache
cache.set("key", "value")
retrieved_value = cache.get("key")
print(retrieved_value)  # Output: value

What do you think about these changes? They might make the example more informative for users.


37-41: Enhance class definition with type hints and docstring?

The IcebergCache class looks good, but we could make it even better. What do you think about these suggestions:

  1. Add type hints to the class attributes for better clarity.
  2. Expand the class docstring to provide more information about its purpose and usage.

Here's an example of how we could improve it:

from typing import Type

class IcebergCache(IcebergConfig, CacheBase):
    """
    A Iceberg-based cache implementation for PyAirbyte.

    This class provides a caching mechanism using Iceberg as the backend storage.
    It inherits from IcebergConfig for configuration options and CacheBase for
    standard cache operations.

    Attributes:
        _sql_processor_class (Type[IcebergSqlProcessor]): The SQL processor class used for database operations.
    """

    _sql_processor_class: Type[IcebergSqlProcessor] = PrivateAttr(default=IcebergSqlProcessor)

What are your thoughts on these changes? They could make the code more self-documenting and easier to understand for other developers.

airbyte/writers.py (2)

1-16: LGTM! Minor suggestion for the module docstring.

The file structure and imports look good. I like the use of from __future__ import annotations for better type hinting support.

One small suggestion: What do you think about expanding the module docstring to provide a bit more context? Maybe something like:

"""Write interfaces for PyAirbyte.

This module defines abstract base classes for Airbyte writers, providing a common interface
for implementing various writing strategies in the PyAirbyte framework.
"""

WDYT? This could give developers a clearer picture of the module's purpose at a glance.


18-27: LGTM! Small suggestion for the name property docstring.

The abstract class definition looks good, and I like the implementation of the name property. It's a neat way to provide a default name based on the class name.

One small suggestion: How about expanding the docstring for the name property to include an example? Something like:

@property
def name(self) -> str:
    """Return the name of the writer.

    This is used for logging and state tracking.

    Returns:
        str: The name of the writer (e.g., 'MyCustomWriter')
    """
    return self.__class__.__name__

What do you think? This could help developers understand exactly what to expect from this property.

airbyte/_airbyte_message_overrides.py (1)

45-58: Minor optimization in the validator function

The validator function looks good! I noticed a small opportunity for optimization. What do you think about combining the two conditions? Something like:

if "data" in values:
    if isinstance(values["data"], dict):
        values["data"] = json.dumps(values["data"])
    elif not isinstance(values["data"], str):
        raise ValueError("Data must be a string or a dict")

This change would reduce the number of checks and make the logic a bit more straightforward. Does this align with your intentions?

airbyte/_writers/parquet.py (4)

4-31: Consider cleaning up imports for better maintainability

There are a few unused imports and some that could be moved to the type-checking block. Would you like to consider the following changes?

  1. Remove unused imports:

    • Literal from typing
    • AirbyteRecordMessage, AirbyteStateMessage, AirbyteStateType from airbyte_protocol.models
    • exceptions from airbyte
  2. Move these imports to the type-checking block:

    • AirbyteMessage from airbyte_protocol.models
    • StateWriterBase from airbyte._future_cdk.state_writers

This could help reduce clutter and potentially improve import time. What do you think?

Tools
Ruff

9-9: typing.Literal imported but unused

Remove unused import: typing.Literal

(F401)


17-17: Move application import airbyte_protocol.models.AirbyteMessage into a type-checking block

Move into type-checking block

(TCH001)


18-18: airbyte_protocol.models.AirbyteRecordMessage imported but unused

Remove unused import

(F401)


19-19: airbyte_protocol.models.AirbyteStateMessage imported but unused

Remove unused import

(F401)


20-20: airbyte_protocol.models.AirbyteStateType imported but unused

Remove unused import

(F401)


23-23: airbyte.exceptions imported but unused

Remove unused import: airbyte.exceptions

(F401)


24-24: Move application import airbyte._future_cdk.state_writers.StateWriterBase into a type-checking block

Move into type-checking block

(TCH001)


34-41: Class structure looks good, consider adding more documentation

The ParquetWriter class is well-structured. To improve maintainability, would you consider adding more documentation? For example:

  1. Add a more detailed class docstring explaining the purpose and usage of the ParquetWriter.
  2. Add docstrings for the class attributes explaining their purpose.
  3. Consider adding type hints for the private attributes.

This could help future developers (including yourself) understand the class better. What are your thoughts on this?


52-58: _open_new_file method looks good, small suggestion

The method correctly opens a gzip file for writing. One small suggestion: instead of using cast, you could specify the mode as "wt" to open the file in text mode. This would eliminate the need for casting. What do you think about this change?

return gzip.open(file_path, "wt")

This might make the code a bit cleaner. Thoughts?


60-70: _get_new_cache_file_path method looks good, small suggestion

The method effectively generates a new cache file path. One small suggestion: consider using Path.joinpath() for constructing the file path. It can make the code a bit more readable and less prone to errors. What do you think about this change?

return target_dir.joinpath(f"{stream_name}_{batch_id}{self.default_cache_file_suffix}")

This might make the path construction a bit clearer. Thoughts?

airbyte/_processors/sql/iceberg.py (1)

35-35: Question about supports_merge_insert

I noticed that supports_merge_insert is set to False. Is this intentional, or do you plan to support merge inserts in the future? If it's intentional, maybe we could add a brief comment explaining why, wdyt?

airbyte/_util/polars.py (2)

1-11: LGTM! The imports and module docstring look good.

The necessary imports are present, and the module docstring is concise. One small suggestion: Would you consider adding a brief explanation of what Polars is in the module docstring? It might help developers who are not familiar with the library. wdyt?


13-20: The PolarsStreamSchema class looks well-structured.

The use of dataclasses is a good choice here. One suggestion: Have you considered adding type hints for the field default factories? It might improve code readability and IDE support. For example:

expressions: list[pl.Expr] = field(default_factory=list[pl.Expr])
arrow_schema: pa.Schema = field(default_factory=lambda: pa.schema([]))
polars_schema: pl.Schema = field(default_factory=lambda: pl.Schema([]))

What do you think about this change? It's a minor improvement but could help with type checking.

airbyte/_message_iterators.py (1)

13-13: Consider importing Literal from typing instead of typing_extensions?

The static analysis tool suggests importing Literal from typing instead of typing_extensions. This might be possible if you're using Python 3.8+. What do you think about making this change? It could simplify the imports a bit. wdyt?

-from typing_extensions import Literal, final
+from typing import Literal
+from typing_extensions import final
Tools
Ruff

13-13: Import from typing instead: Literal

Import from typing

(UP035)

airbyte/_writers/file_writers.py (1)

Line range hint 73-91: Consider updating method documentation for flush_active_batch

The method _flush_active_batch has been renamed to flush_active_batch, making it public. The existing documentation looks good, but we might want to add a note about its public nature now. What do you think about adding a line like "This method is now public and can be called externally." to the docstring? WDYT?

examples/run_polars_poc.py (1)

8-39: Consider using a logging configuration file for better maintainability.

The current logging setup is inline. What do you think about moving this to a separate configuration file for easier management, especially if this script grows or if you need to reuse this logging setup in other scripts? This could make it easier to adjust log levels and formats across your project. Wdyt?

import logging.config

logging.config.fileConfig('logging.conf')
logger = logging.getLogger(__name__)
airbyte/_connector_base.py (2)

Line range hint 351-397: Nice refactoring of _execute_and_parse!

The separation of execution and parsing logic looks great. It improves readability and maintainability.

One small suggestion: What do you think about adding a docstring to explain the purpose of this method and its return type? Something like:

def _execute_and_parse(
    self,
    args: list[str],
    stdin: IO[str] | AirbyteMessageIterator | None = None,
) -> Generator[AirbyteMessage, None, None]:
    """Execute the connector with given arguments and parse the output into AirbyteMessages.

    Args:
        args: List of command-line arguments for the connector.
        stdin: Optional input stream or AirbyteMessageIterator.

    Yields:
        AirbyteMessage: Parsed messages from the connector output.

    Raises:
        AirbyteConnectorFailedError: If the connector execution fails.
    """

This would provide clear documentation for future developers. WDYT?


398-424: Great addition of the _execute method!

The separation of execution logic into its own method is a solid improvement. It enhances modularity and makes the code easier to maintain and test.

A small suggestion: How about adding a type hint for the return value? It could look like this:

def _execute(
    self,
    args: list[str],
    stdin: IO[str] | AirbyteMessageIterator | None = None,
) -> Generator[str, None, None]:

This would make the return type explicit and consistent with the method's docstring. What do you think?

@@ -11,4 +11,5 @@
"BatchHandle",
"FileWriterBase",
"JsonlWriter",
"ParquetWriter",
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there! 👋 Looks like we might be missing an import for ParquetWriter. What do you think?

I noticed that we've added "ParquetWriter" to the __all__ list, which is great for making it part of the public API. However, it seems we forgot to import it in this file. This could lead to issues when someone tries to import ParquetWriter from this module.

Would you like me to suggest an import statement for ParquetWriter? Or maybe you intended to define it in another file? Let me know your thoughts!

If you'd like, I can help draft the import statement or create a new file for the ParquetWriter class. Just say the word!

Tools
Ruff

14-14: Undefined name ParquetWriter in __all__

(F822)

Comment on lines 29 to 58
def _write_airbyte_io_stream(
self,
stdin: IO[str],
*,
catalog_provider: CatalogProvider,
write_strategy: WriteStrategy,
state_writer: StateWriterBase | None = None,
progress_tracker: ProgressTracker,
) -> None:
"""Read from the connector and write to the cache."""
self._write_airbyte_message_stream(
stdin,
catalog_provider=catalog_provider,
write_strategy=write_strategy,
state_writer=state_writer,
progress_tracker=progress_tracker,
)

@abc.abstractmethod
def _write_airbyte_message_stream(
self,
stdin: IO[str] | AirbyteMessageIterator,
*,
catalog_provider: CatalogProvider,
write_strategy: WriteStrategy,
state_writer: StateWriterBase | None = None,
progress_tracker: ProgressTracker,
) -> None:
"""Write the incoming data."""
...
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestions for improving method docstrings and type hinting.

The structure of these methods looks good, but I have a few suggestions that might make them even better:

  1. For _write_airbyte_io_stream, what do you think about adding a more detailed docstring? Maybe something like:
def _write_airbyte_io_stream(
    self,
    stdin: IO[str],
    *,
    catalog_provider: CatalogProvider,
    write_strategy: WriteStrategy,
    state_writer: StateWriterBase | None = None,
    progress_tracker: ProgressTracker,
) -> None:
    """Read from the connector and write to the cache.

    This method serves as a wrapper around _write_airbyte_message_stream,
    providing an interface specifically for IO stream inputs.

    Args:
        stdin (IO[str]): The input stream to read from.
        catalog_provider (CatalogProvider): Provider for the catalog.
        write_strategy (WriteStrategy): Strategy for writing data.
        state_writer (StateWriterBase | None): Writer for state management.
        progress_tracker (ProgressTracker): Tracker for progress updates.
    """
    # ... existing implementation ...
  1. For _write_airbyte_message_stream, how about adding a docstring that explains the expected behavior for subclasses? Something like:
@abc.abstractmethod
def _write_airbyte_message_stream(
    self,
    stdin: IO[str] | AirbyteMessageIterator,
    *,
    catalog_provider: CatalogProvider,
    write_strategy: WriteStrategy,
    state_writer: StateWriterBase | None = None,
    progress_tracker: ProgressTracker,
) -> None:
    """Write the incoming data.

    This abstract method should be implemented by subclasses to define
    the specific behavior for writing Airbyte messages.

    Args:
        stdin (IO[str] | AirbyteMessageIterator): The input stream or iterator to read from.
        catalog_provider (CatalogProvider): Provider for the catalog.
        write_strategy (WriteStrategy): Strategy for writing data.
        state_writer (StateWriterBase | None): Writer for state management.
        progress_tracker (ProgressTracker): Tracker for progress updates.
    """
    ...
  1. Lastly, have you considered using more specific types for catalog_provider, write_strategy, and progress_tracker? If these are custom types defined elsewhere in your project, it might be helpful to import and use them directly in the type hints.

What are your thoughts on these suggestions? Do you think they would improve the clarity and usability of the code?

Comment on lines +33 to +42
AirbyteRecordMessageWithStrData = copy.deepcopy(AirbyteRecordMessage)
AirbyteStateMessageWithStrData = copy.deepcopy(AirbyteStateMessage)
AirbyteMessageWithStrData = copy.deepcopy(AirbyteMessage)

# Modify the data field in the copied class
AirbyteRecordMessageWithStrData.__annotations__["data"] = str
AirbyteStateMessageWithStrData.__annotations__["data"] = str

AirbyteRecordMessageWithStrData.data = Field(..., description="jsonified record data as a str")
AirbyteStateMessageWithStrData.data = Field(..., description="jsonified state data as a str")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider using inheritance for custom classes

The current implementation using deep copies works well, but have you considered using inheritance instead? It might lead to a more maintainable and cleaner code structure. Something like:

class AirbyteRecordMessageWithStrData(AirbyteRecordMessage):
    data: str = Field(..., description="jsonified record data as a str")

class AirbyteStateMessageWithStrData(AirbyteStateMessage):
    data: str = Field(..., description="jsonified state data as a str")

class AirbyteMessageWithStrData(AirbyteMessage):
    record: AirbyteRecordMessageWithStrData | None
    state: AirbyteStateMessageWithStrData | None

This approach could potentially simplify the code and make it easier to manage in the future. What do you think? Does this align with your design goals?

Comment on lines +72 to +84
@overrides
def _write_record_dict(
self,
record_dict: StreamRecord,
open_file_writer: IO[str],
) -> None:
# If the record is too nested, `orjson` will fail with error `TypeError: Recursion
# limit reached`. If so, fall back to the slower `json.dumps`.
try:
open_file_writer.write(orjson.dumps(record_dict).decode("utf-8") + "\n")
except TypeError:
# Using isoformat method for datetime serialization
open_file_writer.write(json.dumps(record_dict, default=lambda _: _.isoformat()) + "\n")
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_write_record_dict method handles edge cases well, some questions and suggestions

Great job on handling the potential orjson serialization issue! I have a few questions and suggestions:

  1. Have you considered using a custom JSON encoder instead of the lambda function in the json.dumps fallback? This could potentially handle more complex objects. What do you think?

  2. The error message mentions a recursion limit. Would it be helpful to log this error when it occurs, to help with debugging?

  3. Consider extracting the serialization logic into a separate method. This could improve readability and make it easier to test. Something like:

def _serialize_record(self, record: dict) -> str:
    try:
        return orjson.dumps(record).decode("utf-8")
    except TypeError:
        return json.dumps(record, default=lambda obj: obj.isoformat())

def _write_record_dict(self, record_dict: StreamRecord, open_file_writer: IO[str]) -> None:
    open_file_writer.write(self._serialize_record(record_dict) + "\n")

What are your thoughts on these suggestions?

Comment on lines +57 to +82
def write_state(self, state: AirbyteStateMessage) -> None:
"""Write the state to the cache.

Args:
state (AirbyteStateMessage): The state to write.

Implementation:
- State messages are written a separate file.
- Any pending records are written to the cache file and the cache file is closed.
- For stream state messages, the matching stream batches are flushed and closed.
- For global state, all batches are flushed and closed.
"""
stream_names: list[str] = []
if state.type == AirbyteStateType.STREAM:
stream_names = [state.record.stream]
if state.type == AirbyteStateType.GLOBAL:
stream_names = list(self._buffered_records.keys())
else:
msg = f"Unexpected state type: {state.type}"
raise exc.PyAirbyteInternalError(msg)

for stream_name in stream_names:
state_file_name = self.file_writer.get_active_batch(stream_name)
self.file_writer.flush_active_batch(stream_name)
self.file_writer._write_state_to_file(state)
return
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion for write_state method

The implementation looks good, but I have a couple of suggestions:

  1. The if condition for AirbyteStateType.GLOBAL (line 72) should probably be an elif to match the structure of the other conditions.
  2. The else block (lines 74-76) seems to be misplaced. It will only be reached if the state type is neither STREAM nor GLOBAL. Should this be an elif for a specific state type?

What do you think about these changes? They might make the logic a bit clearer.

Also, the return statement on line 82 will exit the function after processing only the first stream. Is this intentional, or should we process all streams in stream_names?

Tools
Ruff

79-79: Local variable state_file_name is assigned to but never used

Remove assignment to unused variable state_file_name

(F841)


81-81: Private member accessed: _write_state_to_file

(SLF001)

Comment on lines +243 to +254
def main() -> None:
"""Run the Polars proof of concept."""
# test_my_source() # We don't need to run this every time - only for debugging
run_polars_perf_test(
file_type="parquet",
lazy=False,
expire_cache=False,
)


if __name__ == "__main__":
main()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding command-line arguments and global error handling.

The main() function and script execution look good, but what do you think about making it more flexible and adding some global error handling? Here are a few suggestions:

  1. We could use argparse to allow users to specify parameters like file_type, lazy, and expire_cache from the command line.
  2. We could add a global try-except block to catch and log any unexpected errors.

What are your thoughts on these changes?

import argparse

def parse_args():
    parser = argparse.ArgumentParser(description="Run Polars performance test.")
    parser.add_argument("--file-type", choices=["jsonl", "parquet"], default="parquet", help="Output file type")
    parser.add_argument("--lazy", action="store_true", help="Use lazy evaluation")
    parser.add_argument("--expire-cache", action="store_true", help="Expire file cache")
    return parser.parse_args()

def main() -> None:
    args = parse_args()
    run_polars_perf_test(
        file_type=args.file_type,
        lazy=args.lazy,
        expire_cache=args.expire_cache,
    )

if __name__ == "__main__":
    try:
        main()
    except Exception as e:
        logger.exception(f"An unexpected error occurred: {str(e)}")

This approach would make the script more flexible and easier to use in different scenarios. It would also provide better error reporting for unexpected issues. What do you think about these changes?

@@ -24,6 +24,7 @@
from airbyte.caches._state_backend import SqlStateBackend
from airbyte.constants import DEFAULT_ARROW_MAX_CHUNK_SIZE, TEMP_FILE_CLEANUP
from airbyte.datasets._sql import CachedDataset
from airbyte.writers import AirbyteWriterInterface
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant import of AirbyteWriterInterface?

I noticed that AirbyteWriterInterface is imported twice, once on line 22 and again on line 27. It seems like we might not need this second import. Would you mind if we removed the import on line 27 to keep things clean? What do you think?

-from airbyte.writers import AirbyteWriterInterface
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
from airbyte.writers import AirbyteWriterInterface
Tools
Ruff

27-27: Redefinition of unused AirbyteWriterInterface from line 22

(F811)

@@ -55,6 +55,7 @@
from airbyte.caches.base import CacheBase
from airbyte.destinations.base import Destination
from airbyte.sources.base import Source
from airbyte.writers import AirbyteWriterInterface
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider removing the redundant import?

I noticed that AirbyteWriterInterface is already imported in the TYPE_CHECKING block on line 54. Do you think we could remove this new import to avoid potential confusion or circular import issues? wdyt?

-    from airbyte.writers import AirbyteWriterInterface
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
from airbyte.writers import AirbyteWriterInterface
Tools
Ruff

58-58: Redefinition of unused AirbyteWriterInterface from line 54

(F811)

@@ -30,6 +30,7 @@
from airbyte.results import ReadResult, WriteResult
from airbyte.sources.base import Source
from airbyte.strategies import WriteStrategy
from airbyte.writers import AirbyteWriterInterface
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant import detected. Shall we remove it?

Hey there! 👋 I noticed that AirbyteWriterInterface is imported twice - once on line 27 and again on line 33. The second import seems redundant. What do you think about removing the import on line 33 to keep things clean? Or is there a specific reason for having it twice that I might be missing?

If you agree, here's a quick fix:

-from airbyte.writers import AirbyteWriterInterface

Let me know what you think! 🙂

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
from airbyte.writers import AirbyteWriterInterface
Tools
Ruff

33-33: Redefinition of unused AirbyteWriterInterface from line 27

(F811)

@@ -58,6 +58,7 @@
from airbyte.caches.base import CacheBase
from airbyte.destinations.base import Destination
from airbyte.sources.base import Source
from airbyte.writers import AirbyteWriterInterface
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential duplicate import - can we consolidate?

I noticed that we're importing AirbyteWriterInterface twice, once on line 57 and again on line 61. Could we possibly consolidate these imports to keep things clean? Or is there a specific reason for having both? wdyt?

If you agree, we could remove the duplicate import like this:

-    from airbyte.writers import AirbyteWriterInterface
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
from airbyte.writers import AirbyteWriterInterface
Tools
Ruff

61-61: Redefinition of unused AirbyteWriterInterface from line 57

(F811)

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Outside diff range, codebase verification and nitpick comments (5)
airbyte/_connector_base.py (3)

172-173: Consider using a more descriptive variable name for the message iterator.

The change from self._execute to self._execute_and_parse looks good. However, to improve readability, would you consider using a more descriptive variable name for the message iterator? For example, message_iterator instead of msg. This could make the code more self-explanatory. WDYT?

- for msg in self._execute_and_parse(["spec"]):
-     if msg.type == Type.SPEC and msg.spec:
-         self._spec = msg.spec
+for message_iterator in self._execute_and_parse(["spec"]):
+    if message_iterator.type == Type.SPEC and message_iterator.spec:
+        self._spec = message_iterator.spec

278-279: Consider adding a comment to explain the purpose of the check operation.

The change from self._execute to self._execute_and_parse looks good. To improve code documentation, would you consider adding a brief comment explaining the purpose of the check operation? This could help future developers understand the intent of this code block more quickly. WDYT?

+# Execute the check operation and process the results
for msg in self._execute_and_parse(["check", "--config", config_file]):
    if msg.type == Type.CONNECTION_STATUS and msg.connectionStatus:

Line range hint 352-432: Refactored execution logic looks good, but consider adding error handling for specific exceptions.

The refactoring of the execution logic into separate _execute_and_parse and _execute methods looks great! It improves the separation of concerns and makes the code more modular. However, I have a few suggestions that might further enhance the robustness of the code:

  1. In the _execute_and_parse method, consider adding specific exception handling for json.JSONDecodeError when parsing the JSON data. This could provide more informative error messages for malformed JSON output.

  2. In the _execute method, the generic Exception catch might be too broad. Would you consider catching more specific exceptions that might occur during execution? This could help in providing more targeted error messages and handling.

  3. The docstring for _execute mentions "Locate the right venv", but this logic isn't visible in the method. Is this handled by the executor.execute method? If so, maybe update the docstring to reflect this.

WDYT about these suggestions? They might make the error handling more robust and the documentation more accurate.

Here's a suggestion for point 1:

 try:
     message: AirbyteMessage = AirbyteMessage.model_validate_json(json_data=line)
     if progress_tracker and message.record:
         progress_tracker.tally_bytes_read(
             len(line),
             stream_name=message.record.stream,
         )
     self._peek_airbyte_message(message)
     yield message
-except Exception:
+except json.JSONDecodeError:
     # This is likely a log message, so log it as INFO.
     self._print_info_message(line)
+except Exception as e:
+    # Handle other unexpected exceptions
+    self._print_error_message(f"Unexpected error processing message: {str(e)}")
airbyte/sources/base.py (2)

238-238: LGTM! Consider adding a comment for clarity.

The change from _execute to _execute_and_parse is consistent with the previous modification. It's good to see this improvement applied consistently.

Would it be helpful to add a brief comment explaining the purpose of _execute_and_parse? Something like:

# Execute the command and parse the output messages
for msg in self._execute_and_parse(["spec"]):

What do you think?


Line range hint 545-555: Great improvement! Docstring update needed?

The change from _execute to _execute_and_parse is consistent with the previous modifications. The addition of the progress_tracker parameter is a nice enhancement for monitoring the execution progress.

Should we update the method's docstring to include information about the new progress_tracker parameter? Something like:

def _read_with_catalog(
    self,
    catalog: ConfiguredAirbyteCatalog,
    progress_tracker: ProgressTracker,
    state: StateProviderBase | None = None,
) -> Generator[AirbyteMessage, None, None]:
    """Call read on the connector.

    Args:
        catalog: The configured catalog to use for reading.
        progress_tracker: A ProgressTracker instance for monitoring execution progress.
        state: Optional state provider for incremental reads.

    Returns:
        A generator yielding AirbyteMessages.
    """

What do you think about adding this documentation?

@@ -27,6 +27,7 @@
from airbyte.shared.state_providers import StateProviderBase
from airbyte.shared.state_writers import StateWriterBase
from airbyte.sources.base import Source
from airbyte.writers import AirbyteWriterInterface
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider removing the redundant import?

I noticed that AirbyteWriterInterface is already imported in the TYPE_CHECKING block on line 25. Do we need this additional import outside of TYPE_CHECKING? If not, maybe we could remove this line to avoid potential circular import issues and keep our imports clean. This would also address the Ruff warning about unused redefinition.

I see there was a previous comment about this as well. What are your thoughts on this? Is there a specific reason we need this import here that I might be missing?

If you agree, we could apply this change:

-    from airbyte.writers import AirbyteWriterInterface

Wdyt?

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
from airbyte.writers import AirbyteWriterInterface
Tools
Ruff

30-30: Redefinition of unused AirbyteWriterInterface from line 22

(F811)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant