-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow splitting of CSV file if it is larger than 10MB #154
base: main
Are you sure you want to change the base?
Conversation
Added Integration Test.
✅ 38/38 passed, 2 skipped, 1m26s total Running from acceptance #219 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some pointers, please add more tests
@@ -40,6 +40,8 @@ | |||
|
|||
__all__ = ["Installation", "MockInstallation", "IllegalState", "NotInstalled", "SerdeError"] | |||
|
|||
FILE_SIZE_LIMIT: int = 1024 * 1024 * 10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Type hint lookgs redudant
FILE_SIZE_LIMIT: int = 1024 * 1024 * 10 | |
FILE_SIZE_LIMIT = 1024 * 1024 * 10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Irrespective of whether it's redundant or not, it's incorrect: it should be ClassVar[int]
.
@@ -132,6 +134,10 @@ def check_folder(install_folder: str) -> Installation | None: | |||
tasks.append(functools.partial(check_folder, service_principal_folder)) | |||
return Threads.strict(f"finding {product} installations", tasks) | |||
|
|||
@staticmethod | |||
def extension(filename): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing type hints, looks like a redundant methods, typically we use Path.suffix
@@ -804,9 +841,21 @@ def _dump_csv(raw: list[Json], type_ref: type) -> bytes: | |||
writer = csv.DictWriter(buffer, field_names, dialect="excel") | |||
writer.writeheader() | |||
for as_dict in raw: | |||
# Check if the buffer + the current row is over the file size limit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@asnare : could you have a look at this? From experience I know you are familiar with looping over these situations in a smart way ---> is there a way to chunk raw so that you have a minimum number of chunks while each of the chunks have a size smaller than FILE_SIZE_LIMIT (after writing a chunk to the io.StringIO buffer)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is fiddly, partly because we're treating the limit as a hard limit and there's no way to know in advance whether a row will exceed the limit.
A few things spring to mind:
- If we treat the limit as a target and don't try to back out the last line then things become a lot simpler.
- Even though we try to treat the maximum size as a limit, this code can still make files larger because we're counting characters but some will be more than a single byte. (Need to use
BytesIO
to fix this.) - An alternative approach, but not necessarily better, would be to first encode the header and rows as lines (in bytes), and then a second pass to chunk things up. I don't think this would be less code though.
"""The `_dump_csv` method is a private method that is used to serialize a list of dictionaries to a CSV string. | ||
This method is called by the `save` method.""" | ||
raws = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move this closer to where it is used
with self._ws.workspace.download(f"{self.install_folder()}/{filename}") as f: | ||
return self._convert_content(filename, f) | ||
except NotFound: | ||
# If the file is not found, check if it is a multi-part csv file |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this into to a separate mtehod
raws = converters[extension](as_dict, type_ref) | ||
if len(raws) > 1: | ||
for i, raw in enumerate(raws): | ||
self.upload(f"{filename[0:-4]}.{i + 1}.csv", raw) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why filename[0:-4]
?
self.upload(filename, raw) | ||
raws = converters[extension](as_dict, type_ref) | ||
if len(raws) > 1: | ||
for i, raw in enumerate(raws): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In spark world it is common to use a folder instead:
- Name the folder like the file
- Combine all files inside the folder to get the final file
closes #151