Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support: Reading from AsyncIterator #27

Open
gregbrowndev opened this issue Jul 9, 2024 · 2 comments
Open

Support: Reading from AsyncIterator #27

gregbrowndev opened this issue Jul 9, 2024 · 2 comments

Comments

@gregbrowndev
Copy link

gregbrowndev commented Jul 9, 2024

Hi, thanks for the work on this library.

I saw in the docs thataiocsv only supports reading from file-like objects that support the WithAsyncRead protocol. However, it would be nice to have the ability to read from an AsyncIterator, to have feature parity with the standard csv lib which can read from an iterable.

My specific usecase is reading a CSV from S3 using the aioboto3 library to stream the file using iter_lines or iter_chunks.

Example:

import asyncio
import csv

import aioboto3


async def main():
    session = aioboto3.Session()
    async with session.client("s3") as s3:

        response = await s3.get_object(Bucket="my-bucket", Key="data.csv")
        lines_iter = (
            line.decode('utf-8') 
            async for line in response["Body"].iter_lines()
        )

        async for row in AsyncDictReader(lines_iter, delimiter=","):
            print(row)
   

if __name__ == '__main__':
  asyncio.run(main())

I imagine you could write something to adapt the async iter into the async read API, but I didn't know if you could do this efficiently without buffering the response.

I would be grateful to see this feature considered in the library.

Cheers!

@MKuranowski
Copy link
Owner

Isn't response["Body"] already a file-like object?

Regardless, yes, there should be support for AsyncIterable[str], but usually there's a file-like object available anyway, so I haven't really prioritized this API mismatch.

@gregbrowndev
Copy link
Author

Thanks a lot, you're right, I missed that by focusing on the iter_lines method. This code now works for anyone else trying to do the same thing. I used your answer here to handle the decoding: #2

import asyncio
import codecs

import aioboto3
from aiocsv import AsyncDictReader
from aiocsv.protocols import WithAsyncRead


class AsyncTextReaderWrapper(WithAsyncRead):
    def __init__(self, obj, encoding: str, errors="strict"):
        self.obj = obj
        decoder_factory = codecs.getincrementaldecoder(encoding)
        self.decoder = decoder_factory(errors)

    async def read(self, size: int) -> str:
        raw_data = await self.obj.read(size)

        if not raw_data:
            return self.decoder.decode(b"", final=True)

        return self.decoder.decode(raw_data, final=False)


async def main():
    session = aioboto3.Session()
    async with session.client("s3") as s3:

        response = await s3.get_object(Bucket="my-bucket", Key="data.csv")

        f = AsyncTextReaderWrapper(response["Body"], encoding="utf-8")

        async for row in AsyncDictReader(f, delimiter=","):
            print(row) 
   

if __name__ == '__main__':
  asyncio.run(main())

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants