Skip to content

Commit 3fd7f1e

Browse files
committed
fix(stream_io): Finalize temporary files before the world ends
Previously, automatically uploaded temporary files for the s3 streams from `open_stream` were not guaranteed to trigger their upload and cleanup before the interpreter began shutting down. This change guarantees that the upload and deletion will happen at the soonest of either: 1. The stream being closed, 2. The stream being garbage collected, or 3. The program ending, but before the interpreter self-destructs.
1 parent 9cdfc65 commit 3fd7f1e

File tree

1 file changed

+65
-43
lines changed

1 file changed

+65
-43
lines changed

tensorizer/stream_io.py

Lines changed: 65 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
import functools
2+
import io
23
import logging
34
import os
45
import subprocess
56
import sys
67
import tempfile
78
import typing
8-
from types import MethodType
9+
import weakref
910
from urllib.parse import urlparse
1011

1112
import boto3
@@ -399,6 +400,58 @@ def _infer_credentials(
399400
)
400401

401402

403+
def _temp_file_closer(
404+
file: io.IOBase,
405+
file_name: str,
406+
*upload_args
407+
):
408+
"""
409+
Close, upload by name, and then delete the file.
410+
Meant to replace .close() on a particular instance
411+
of a temporary file-like wrapper object, as an unbound
412+
callback to a weakref.finalize() registration on the wrapper.
413+
414+
The reason this implementation is necessary is really complicated.
415+
416+
---
417+
418+
boto3's upload_fileobj could be used before closing the
419+
file, instead of closing it and then uploading it by
420+
name, but upload_fileobj is less performant than
421+
upload_file as of boto3's s3 library s3transfer
422+
version 0.6.0.
423+
424+
For details, see the implementation & comments:
425+
https://github.com/boto/s3transfer/blob/0.6.0/s3transfer/upload.py#L351
426+
427+
TL;DR: s3transfer does multithreaded transfers
428+
that require multiple file handles to work properly,
429+
but Python cannot duplicate file handles such that
430+
they can be accessed in a thread-safe way,
431+
so they have to buffer it all in memory.
432+
"""
433+
434+
if file.closed:
435+
# Makes closure idempotent.
436+
437+
# If the file object is used as a context
438+
# manager, close() is called twice (once in the
439+
# serializer code, once after, when leaving the
440+
# context).
441+
442+
# Without this check, this would trigger two
443+
# separate uploads.
444+
return
445+
try:
446+
file.close()
447+
s3_upload(file_name, *upload_args)
448+
finally:
449+
try:
450+
os.unlink(file_name)
451+
except OSError:
452+
pass
453+
454+
402455
def open_stream(
403456
path_uri: Union[str, os.PathLike],
404457
mode: str = "rb",
@@ -468,48 +521,17 @@ def open_stream(
468521
# with primitive temporary file support (e.g. Windows)
469522
temp_file = tempfile.NamedTemporaryFile(mode="wb+", delete=False)
470523

471-
def close(self: temp_file.__class__):
472-
# Close, upload by name, and then delete the file.
473-
#
474-
# boto3's upload_fileobj could be used before closing the
475-
# file, instead of closing it and then uploading it by
476-
# name, but upload_fileobj is less performant than
477-
# upload_file as of boto3's s3 library s3transfer,
478-
# version 0.6.0.
479-
480-
# For details, see the implementation & comments:
481-
# https://github.com/boto/s3transfer/blob/0.6.0/s3transfer/upload.py#L351
482-
483-
# TL;DR: s3transfer does multithreaded transfers
484-
# that require multiple file handles to work properly,
485-
# but Python cannot duplicate file handles such that
486-
# they can be accessed in a thread-safe way,
487-
# so they have to buffer it all in memory.
488-
if self.closed:
489-
# Makes close() idempotent.
490-
491-
# If the resulting object is used as a context
492-
# manager, close() is called twice (once in the
493-
# serializer code, once after, when leaving the
494-
# context).
495-
496-
# Without this check, this would trigger two
497-
# separate uploads.
498-
return
499-
try:
500-
# Use the original closing method tied to the class,
501-
# rather than the one on this instance. (like super())
502-
self.__class__.close(self)
503-
s3_upload(self.name,
504-
path_uri,
505-
s3_access_key_id,
506-
s3_secret_access_key,
507-
s3_endpoint)
508-
finally:
509-
os.unlink(self.name)
510-
511-
# Bind the method to the instance
512-
temp_file.close = MethodType(close, temp_file)
524+
guaranteed_closer = weakref.finalize(
525+
temp_file,
526+
_temp_file_closer,
527+
temp_file.file,
528+
temp_file.name,
529+
path_uri,
530+
s3_access_key_id,
531+
s3_secret_access_key,
532+
s3_endpoint
533+
)
534+
temp_file.close = guaranteed_closer
513535
return temp_file
514536
else:
515537
s3_endpoint = s3_endpoint or default_s3_read_endpoint

0 commit comments

Comments
 (0)