Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Async Generators for Source of Files #8

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,23 @@ loop.run_until_complete(zip_async('example.zip', files))
loop.stop()
```

The List of files may also be an async generator:
```python
async def files():
yield {
'file': '/tmp/car.jpeg'
},
yield {
'file': '/tmp/aaa.mp3',
'name': 'music.mp3'
},
yield {
'stream': content_generator(),
'name': 'random_stuff.txt'
}

```

## Examples

See `examples` directory for complete code and working examples of ZipStream and AioZipStream.
62 changes: 62 additions & 0 deletions examples/async_gen_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import asyncio
import random
import io
import zipstream
import aiofiles
async def generate_task(doc):
delay=random.randrange(0,5)
await asyncio.sleep(delay)
data= {"stream": doc["data"],
"name": doc["name"],
"compression": "deflate"}
print(f"finshed {data['name']} in {delay}s" )
return data



async def generated_content(size):
"""
asynchronous source of random data of unknown length,
which we stream inside zip
"""
chars = '0123456789 abcdefghijklmnopqrstuvwxyz \n'
for m in range(size):
t = ""
for n in range(random.randint(20, 200)):
t += random.choice(chars)
yield bytes(t, 'ascii')


files = [
{'name': '/tmp/z/1.txt' ,"data": generated_content(50)},
{'name': '/tmp/z/2.txt',"data": generated_content(50)},
{'name': '/tmp/z/3.txt', "data": generated_content(50)},
{'name': '/tmp/z/4.txt', "data": generated_content(50)},
{'name': '/tmp/z/5.txt', "data": generated_content(50)},

]

async def fileslistgen(docs):
"""
This allows concurrency while files get streamed as completed
"""
futures = []
for doc in docs:
futures.append((generate_task(doc)))

for coroutine in asyncio.as_completed(futures):
yield await coroutine


async def zip_async(zipname, files):
# larger chunk size will increase performance
aiozip = zipstream.AioZipStream(
fileslistgen(files)
)
async with aiofiles.open(zipname, mode='wb') as z:
async for chunk in aiozip.stream():
await z.write(chunk)

loop = asyncio.get_event_loop()
loop.run_until_complete(zip_async('example.zip', files))
loop.stop()
1 change: 1 addition & 0 deletions examples/async_zip_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,4 @@ async def zip_async(zipname, files):
loop = asyncio.get_event_loop()
loop.run_until_complete(zip_async('example.zip', files))
loop.stop()

2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

setup(
name='zipstream',
version='0.5',
version='0.5.1',

description='Creating zip files on the fly',
long_description=long_description,
Expand Down
94 changes: 94 additions & 0 deletions tests/test_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
import io
import pytest
import zipstream
import zipfile
import aiofiles

pytestmark = pytest.mark.asyncio


async def asyncfileslist():
with open("/tmp/_tempik_1.txt", "w") as f:
f.write("foo baz bar")
with open("/tmp/_tempik_2.txt", "w") as f:
f.write("baz trololo something")
yield {"file": "/tmp/_tempik_1.txt"}
yield {"file": "/tmp/_tempik_2.txt"}


def syncfileslist():
with open("/tmp/_tempik_1.txt", "w") as f:
f.write("foo baz bar")
with open("/tmp/_tempik_2.txt", "w") as f:
f.write("baz trololo something")
yield {"file": "/tmp/_tempik_1.txt"}
yield {"file": "/tmp/_tempik_2.txt"}


async def asyncfileslist_streams():
with open("/tmp/_tempik_1.txt", "w") as f:
f.write("foo baz bar")
with open("/tmp/_tempik_2.txt", "w") as f:
f.write("baz trololo something")
async with aiofiles.open("/tmp/_tempik_1.txt", "rb") as f:
yield {
"stream": f,
"name": "_tempik_1.txt"
}
async with aiofiles.open("/tmp/_tempik_2.txt", "rb") as f:
yield {
"stream": f,
"name": "_tempik_2.txt"
}


async def asyncfileslist_stream_iter():
with open("/tmp/_tempik_1.txt", "w") as f:
f.write("foo baz bar")
with open("/tmp/_tempik_2.txt", "w") as f:
f.write("baz trololo something")
with open("/tmp/_tempik_1.txt", "rb") as f:
yield {"stream": f,
"name": "_tempik_1.txt"
}
with open("/tmp/_tempik_2.txt", "rb") as f:
yield {"stream": f,
"name": "_tempik_2.txt"
}


async def test_async_generator_for_files():
gen = asyncfileslist()
await zip_gen_and_check(gen)


async def test_sync_generator_for_files():
gen = syncfileslist()
await zip_gen_and_check(gen)


async def test_sync_list_for_files():
gen = list(syncfileslist())
await zip_gen_and_check(gen)


async def test_async_list_for_async_get():
gen = asyncfileslist_streams()
await zip_gen_and_check(gen)


async def test_async_list_for_iter():
gen = asyncfileslist_stream_iter()
await zip_gen_and_check(gen)


async def zip_gen_and_check(gen):
FILENAMES = set(fd["file"].split("/")[-1] for fd in syncfileslist())
zs = zipstream.AioZipStream(gen)
res = b""

async for f in zs.stream():
res += f
zf = zipfile.ZipFile(io.BytesIO(res))
filenames = set(zipinfo.filename for zipinfo in zf.filelist)
assert not filenames.difference(FILENAMES)
50 changes: 23 additions & 27 deletions tests/test_zipstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,44 +74,40 @@ def test_one_file_add(self):
res+=f

# check header
self.assertEqual( res[:4], zipfile.stringFileHeader )
self.assertEqual( res[4:6], b"\x14\x00" ) # version
self.assertEqual( res[6:8], b"\x08\x00" ) # flags
self.assertEqual( res[8:10], b"\x00\x00" ) # compression method
self.assertEqual( res[14:18], b"\x00\x00\x00\x00" ) # crc is set to 0
self.assertEqual( res[18:22], b"\x00\x00\x00\x00" ) # compressed size is 0
self.assertEqual( res[22:26], b"\x00\x00\x00\x00" ) # uncompressed size is 0

pos = res.find( zipstream.consts.LF_MAGIC ,10)

self.check_result(res)

def check_result(self, res):
self.assertEqual(res[:4], zipfile.stringFileHeader)
self.assertEqual(res[4:6], b"\x14\x00") # version
self.assertEqual(res[6:8], b"\x08\x00") # flags
self.assertEqual(res[8:10], b"\x00\x00") # compression method
self.assertEqual(res[14:18], b"\x00\x00\x00\x00") # crc is set to 0
self.assertEqual(res[18:22], b"\x00\x00\x00\x00") # compressed size is 0
self.assertEqual(res[22:26], b"\x00\x00\x00\x00") # uncompressed size is 0
pos = res.find(zipstream.consts.LF_MAGIC, 10)
# data descriptor has 16 bytes
dd = res[pos-16:pos]
dd = res[pos - 16:pos]
self.assertEqual(dd[:4], zipstream.consts.DD_MAGIC)

# check CRC
crc = dd[4:8]
crc2 = zlib.crc32(b"foo baz bar") & 0xffffffff
crc2 = struct.pack(b"<L", crc2 )
self.assertEqual( crc, crc2 )
crc2 = struct.pack(b"<L", crc2)
self.assertEqual(crc, crc2)
# check file len compressed and uncompressed
self.assertEqual( dd[8:12], b"\x0b\x00\x00\x00" )
self.assertEqual( dd[12:16], b"\x0b\x00\x00\x00" )

self.assertEqual(dd[8:12], b"\x0b\x00\x00\x00")
self.assertEqual(dd[12:16], b"\x0b\x00\x00\x00")
# check end file descriptor
endstruct = res[ -zipstream.consts.CD_END_STRUCT.size: ]
self.assertEqual( endstruct[:4], zipstream.consts.CD_END_MAGIC )
self.assertEqual( endstruct[8:10], b"\x02\x00" ) # two files in disc
self.assertEqual( endstruct[10:12], b"\x02\x00" ) # two files total

endstruct = res[-zipstream.consts.CD_END_STRUCT.size:]
self.assertEqual(endstruct[:4], zipstream.consts.CD_END_MAGIC)
self.assertEqual(endstruct[8:10], b"\x02\x00") # two files in disc
self.assertEqual(endstruct[10:12], b"\x02\x00") # two files total
zdsize = len(res) - zipstream.consts.CD_END_STRUCT.size
cdsize = struct.unpack("<L", endstruct[12:16])[0]
cdpos = struct.unpack("<L", endstruct[16:20])[0]

# position of ender is equal of cd + cd size
self.assertEqual( cdpos+cdsize, len(res)-zipstream.consts.CD_END_STRUCT.size )
self.assertEqual( res[cdpos:cdpos+4], zipstream.consts.CDFH_MAGIC )

cdentry = res[ cdpos: cdpos+cdsize ]
self.assertEqual(cdpos + cdsize, len(res) - zipstream.consts.CD_END_STRUCT.size)
self.assertEqual(res[cdpos:cdpos + 4], zipstream.consts.CDFH_MAGIC)
cdentry = res[cdpos: cdpos + cdsize]


if __name__ == '__main__':
Expand Down
60 changes: 47 additions & 13 deletions zipstream/aiozipstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,14 @@ def _create_file_struct(self, data):

async def data_generator(self, src, src_type):
if src_type == 's':
async for chunk in src:
yield chunk
return
if hasattr(src,"__anext__",):
async for chunk in src:
yield chunk
return
else:
for chunk in src:
yield chunk
return
if src_type == 'f':
async with aiofiles.open(src, "rb") as fh:
while True:
Expand All @@ -74,17 +79,46 @@ async def _stream_single_file(self, file_struct):
yield self._make_data_descriptor(file_struct, *pcs.state())

async def stream(self):
# stream files
for idx, source in enumerate(self._source_of_files):
file_struct = self._create_file_struct(source)
# file offset in archive
file_struct['offset'] = self._offset_get()
self._add_file_to_cdir(file_struct)
# file data
async for chunk in self._stream_single_file(file_struct):
self._offset_add(len(chunk))
yield chunk
# stream files from iterabel or async generator
if hasattr(self._source_of_files, "__anext__"):
async for chunck in self._stream_async_gen_fileslist():
yield chunck
return
else:
async for chunck in self._stream_iterable_fileslist():
yield chunck

async def _stream_iterable_fileslist(self):
"""
stream from _source_of_files if it is not an async generator
"""
for source in (self._source_of_files):
async for chunck in self._stream_file_and_local_headers(source):
yield chunck
# stream zip structures
for chunk in self._make_end_structures():
yield chunk
self._cleanup()

async def _stream_file_and_local_headers(self, source):

file_struct = self._create_file_struct(source)
# file offset in archive
file_struct['offset'] = self._offset_get()
self._add_file_to_cdir(file_struct)
# file data
async for chunk in self._stream_single_file(file_struct):
self._offset_add(len(chunk))
yield chunk

async def _stream_async_gen_fileslist(self):
"""
stream files from _source_of_files if it is an async generator
"""
async for source in self._source_of_files:
async for chunck in self._stream_file_and_local_headers(source):
yield chunck

for chunk in self._make_end_structures():
yield chunk
self._cleanup()