Skip to content

Commit 56aeb87

Browse files
feat: add transfer_manager.upload_chunks_concurrently using the XML MPU API (#1115)
* intermediate commit * temporary commit * xml mpu support, unit tests and docstrings * integration tests * add support for metadata * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md * encryption support * unit tests for mpu * docs update * fix unit test issue * 🦉 Updates from OwlBot post-processor See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --------- Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
1 parent bdd7c6c commit 56aeb87

File tree

6 files changed

+759
-23
lines changed

6 files changed

+759
-23
lines changed

google/cloud/storage/blob.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1697,7 +1697,7 @@ def _get_writable_metadata(self):
16971697

16981698
return object_metadata
16991699

1700-
def _get_upload_arguments(self, client, content_type):
1700+
def _get_upload_arguments(self, client, content_type, filename=None):
17011701
"""Get required arguments for performing an upload.
17021702
17031703
The content type returned will be determined in order of precedence:
@@ -1716,7 +1716,7 @@ def _get_upload_arguments(self, client, content_type):
17161716
* An object metadata dictionary
17171717
* The ``content_type`` as a string (according to precedence)
17181718
"""
1719-
content_type = self._get_content_type(content_type)
1719+
content_type = self._get_content_type(content_type, filename=filename)
17201720
headers = {
17211721
**_get_default_headers(client._connection.user_agent, content_type),
17221722
**_get_encryption_headers(self._encryption_key),

google/cloud/storage/transfer_manager.py

Lines changed: 269 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,12 @@
2626
from google.api_core import exceptions
2727
from google.cloud.storage import Client
2828
from google.cloud.storage import Blob
29+
from google.cloud.storage.blob import _get_host_name
30+
from google.cloud.storage.constants import _DEFAULT_TIMEOUT
31+
32+
from google.resumable_media.requests.upload import XMLMPUContainer
33+
from google.resumable_media.requests.upload import XMLMPUPart
34+
2935

3036
warnings.warn(
3137
"The module `transfer_manager` is a preview feature. Functionality and API "
@@ -35,7 +41,14 @@
3541

3642
TM_DEFAULT_CHUNK_SIZE = 32 * 1024 * 1024
3743
DEFAULT_MAX_WORKERS = 8
38-
44+
METADATA_HEADER_TRANSLATION = {
45+
"cacheControl": "Cache-Control",
46+
"contentDisposition": "Content-Disposition",
47+
"contentEncoding": "Content-Encoding",
48+
"contentLanguage": "Content-Language",
49+
"customTime": "x-goog-custom-time",
50+
"storageClass": "x-goog-storage-class",
51+
}
3952

4053
# Constants to be passed in as `worker_type`.
4154
PROCESS = "process"
@@ -198,7 +211,7 @@ def upload_many(
198211
futures.append(
199212
executor.submit(
200213
_call_method_on_maybe_pickled_blob,
201-
_pickle_blob(blob) if needs_pickling else blob,
214+
_pickle_client(blob) if needs_pickling else blob,
202215
"upload_from_filename"
203216
if isinstance(path_or_file, str)
204217
else "upload_from_file",
@@ -343,7 +356,7 @@ def download_many(
343356
futures.append(
344357
executor.submit(
345358
_call_method_on_maybe_pickled_blob,
346-
_pickle_blob(blob) if needs_pickling else blob,
359+
_pickle_client(blob) if needs_pickling else blob,
347360
"download_to_filename"
348361
if isinstance(path_or_file, str)
349362
else "download_to_file",
@@ -733,7 +746,6 @@ def download_chunks_concurrently(
733746
Checksumming (md5 or crc32c) is not supported for chunked operations. Any
734747
`checksum` parameter passed in to download_kwargs will be ignored.
735748
736-
:type bucket: 'google.cloud.storage.bucket.Bucket'
737749
:param bucket:
738750
The bucket which contains the blobs to be downloaded
739751
@@ -745,6 +757,12 @@ def download_chunks_concurrently(
745757
:param filename:
746758
The destination filename or path.
747759
760+
:type chunk_size: int
761+
:param chunk_size:
762+
The size in bytes of each chunk to send. The optimal chunk size for
763+
maximum throughput may vary depending on the exact network environment
764+
and size of the blob.
765+
748766
:type download_kwargs: dict
749767
:param download_kwargs:
750768
A dictionary of keyword arguments to pass to the download method. Refer
@@ -809,7 +827,7 @@ def download_chunks_concurrently(
809827

810828
pool_class, needs_pickling = _get_pool_class_and_requirements(worker_type)
811829
# Pickle the blob ahead of time (just once, not once per chunk) if needed.
812-
maybe_pickled_blob = _pickle_blob(blob) if needs_pickling else blob
830+
maybe_pickled_blob = _pickle_client(blob) if needs_pickling else blob
813831

814832
futures = []
815833

@@ -844,9 +862,249 @@ def download_chunks_concurrently(
844862
return None
845863

846864

865+
def upload_chunks_concurrently(
866+
filename,
867+
blob,
868+
content_type=None,
869+
chunk_size=TM_DEFAULT_CHUNK_SIZE,
870+
deadline=None,
871+
worker_type=PROCESS,
872+
max_workers=DEFAULT_MAX_WORKERS,
873+
*,
874+
checksum="md5",
875+
timeout=_DEFAULT_TIMEOUT,
876+
):
877+
"""Upload a single file in chunks, concurrently.
878+
879+
This function uses the XML MPU API to initialize an upload and upload a
880+
file in chunks, concurrently with a worker pool.
881+
882+
The XML MPU API is significantly different from other uploads; please review
883+
the documentation at https://cloud.google.com/storage/docs/multipart-uploads
884+
before using this feature.
885+
886+
The library will attempt to cancel uploads that fail due to an exception.
887+
If the upload fails in a way that precludes cancellation, such as a
888+
hardware failure, process termination, or power outage, then the incomplete
889+
upload may persist indefinitely. To mitigate this, set the
890+
`AbortIncompleteMultipartUpload` with a nonzero `Age` in bucket lifecycle
891+
rules, or refer to the XML API documentation linked above to learn more
892+
about how to list and delete individual downloads.
893+
894+
Using this feature with multiple threads is unlikely to improve upload
895+
performance under normal circumstances due to Python interpreter threading
896+
behavior. The default is therefore to use processes instead of threads.
897+
898+
ACL information cannot be sent with this function and should be set
899+
separately with :class:`ObjectACL` methods.
900+
901+
:type filename: str
902+
:param filename:
903+
The path to the file to upload. File-like objects are not supported.
904+
905+
:type blob: `google.cloud.storage.Blob`
906+
:param blob:
907+
The blob to which to upload.
908+
909+
:type content_type: str
910+
:param content_type: (Optional) Type of content being uploaded.
911+
912+
:type chunk_size: int
913+
:param chunk_size:
914+
The size in bytes of each chunk to send. The optimal chunk size for
915+
maximum throughput may vary depending on the exact network environment
916+
and size of the blob. The remote API has restrictions on the minimum
917+
and maximum size allowable, see: https://cloud.google.com/storage/quotas#requests
918+
919+
:type deadline: int
920+
:param deadline:
921+
The number of seconds to wait for all threads to resolve. If the
922+
deadline is reached, all threads will be terminated regardless of their
923+
progress and concurrent.futures.TimeoutError will be raised. This can be
924+
left as the default of None (no deadline) for most use cases.
925+
926+
:type worker_type: str
927+
:param worker_type:
928+
The worker type to use; one of google.cloud.storage.transfer_manager.PROCESS
929+
or google.cloud.storage.transfer_manager.THREAD.
930+
931+
Although the exact performance impact depends on the use case, in most
932+
situations the PROCESS worker type will use more system resources (both
933+
memory and CPU) and result in faster operations than THREAD workers.
934+
935+
Because the subprocesses of the PROCESS worker type can't access memory
936+
from the main process, Client objects have to be serialized and then
937+
recreated in each subprocess. The serialization of the Client object
938+
for use in subprocesses is an approximation and may not capture every
939+
detail of the Client object, especially if the Client was modified after
940+
its initial creation or if `Client._http` was modified in any way.
941+
942+
THREAD worker types are observed to be relatively efficient for
943+
operations with many small files, but not for operations with large
944+
files. PROCESS workers are recommended for large file operations.
945+
946+
:type max_workers: int
947+
:param max_workers:
948+
The maximum number of workers to create to handle the workload.
949+
950+
With PROCESS workers, a larger number of workers will consume more
951+
system resources (memory and CPU) at once.
952+
953+
How many workers is optimal depends heavily on the specific use case,
954+
and the default is a conservative number that should work okay in most
955+
cases without consuming excessive resources.
956+
957+
:type checksum: str
958+
:param checksum:
959+
(Optional) The checksum scheme to use: either 'md5', 'crc32c' or None.
960+
Each individual part is checksummed. At present, the selected checksum
961+
rule is only applied to parts and a separate checksum of the entire
962+
resulting blob is not computed. Please compute and compare the checksum
963+
of the file to the resulting blob separately if needed, using the
964+
'crc32c' algorithm as per the XML MPU documentation.
965+
966+
:type timeout: float or tuple
967+
:param timeout:
968+
(Optional) The amount of time, in seconds, to wait
969+
for the server response. See: :ref:`configuring_timeouts`
970+
971+
:raises: :exc:`concurrent.futures.TimeoutError` if deadline is exceeded.
972+
"""
973+
974+
bucket = blob.bucket
975+
client = blob.client
976+
transport = blob._get_transport(client)
977+
978+
hostname = _get_host_name(client._connection)
979+
url = "{hostname}/{bucket}/{blob}".format(
980+
hostname=hostname, bucket=bucket.name, blob=blob.name
981+
)
982+
983+
base_headers, object_metadata, content_type = blob._get_upload_arguments(
984+
client, content_type, filename=filename
985+
)
986+
headers = {**base_headers, **_headers_from_metadata(object_metadata)}
987+
988+
if blob.user_project is not None:
989+
headers["x-goog-user-project"] = blob.user_project
990+
991+
# When a Customer Managed Encryption Key is used to encrypt Cloud Storage object
992+
# at rest, object resource metadata will store the version of the Key Management
993+
# Service cryptographic material. If a Blob instance with KMS Key metadata set is
994+
# used to upload a new version of the object then the existing kmsKeyName version
995+
# value can't be used in the upload request and the client instead ignores it.
996+
if blob.kms_key_name is not None and "cryptoKeyVersions" not in blob.kms_key_name:
997+
headers["x-goog-encryption-kms-key-name"] = blob.kms_key_name
998+
999+
container = XMLMPUContainer(url, filename, headers=headers)
1000+
container.initiate(transport=transport, content_type=content_type)
1001+
upload_id = container.upload_id
1002+
1003+
size = os.path.getsize(filename)
1004+
num_of_parts = -(size // -chunk_size) # Ceiling division
1005+
1006+
pool_class, needs_pickling = _get_pool_class_and_requirements(worker_type)
1007+
# Pickle the blob ahead of time (just once, not once per chunk) if needed.
1008+
maybe_pickled_client = _pickle_client(client) if needs_pickling else client
1009+
1010+
futures = []
1011+
1012+
with pool_class(max_workers=max_workers) as executor:
1013+
1014+
for part_number in range(1, num_of_parts + 1):
1015+
start = (part_number - 1) * chunk_size
1016+
end = min(part_number * chunk_size, size)
1017+
1018+
futures.append(
1019+
executor.submit(
1020+
_upload_part,
1021+
maybe_pickled_client,
1022+
url,
1023+
upload_id,
1024+
filename,
1025+
start=start,
1026+
end=end,
1027+
part_number=part_number,
1028+
checksum=checksum,
1029+
headers=headers,
1030+
)
1031+
)
1032+
1033+
concurrent.futures.wait(
1034+
futures, timeout=deadline, return_when=concurrent.futures.ALL_COMPLETED
1035+
)
1036+
1037+
try:
1038+
# Harvest results and raise exceptions.
1039+
for future in futures:
1040+
part_number, etag = future.result()
1041+
container.register_part(part_number, etag)
1042+
1043+
container.finalize(blob._get_transport(client))
1044+
except Exception:
1045+
container.cancel(blob._get_transport(client))
1046+
raise
1047+
1048+
1049+
def _upload_part(
1050+
maybe_pickled_client,
1051+
url,
1052+
upload_id,
1053+
filename,
1054+
start,
1055+
end,
1056+
part_number,
1057+
checksum,
1058+
headers,
1059+
):
1060+
"""Helper function that runs inside a thread or subprocess to upload a part.
1061+
1062+
`maybe_pickled_client` is either a Client (for threads) or a specially
1063+
pickled Client (for processes) because the default pickling mangles Client
1064+
objects."""
1065+
1066+
if isinstance(maybe_pickled_client, Client):
1067+
client = maybe_pickled_client
1068+
else:
1069+
client = pickle.loads(maybe_pickled_client)
1070+
part = XMLMPUPart(
1071+
url,
1072+
upload_id,
1073+
filename,
1074+
start=start,
1075+
end=end,
1076+
part_number=part_number,
1077+
checksum=checksum,
1078+
headers=headers,
1079+
)
1080+
part.upload(client._http)
1081+
return (part_number, part.etag)
1082+
1083+
1084+
def _headers_from_metadata(metadata):
1085+
"""Helper function to translate object metadata into a header dictionary."""
1086+
1087+
headers = {}
1088+
# Handle standard writable metadata
1089+
for key, value in metadata.items():
1090+
if key in METADATA_HEADER_TRANSLATION:
1091+
headers[METADATA_HEADER_TRANSLATION[key]] = value
1092+
# Handle custom metadata
1093+
if "metadata" in metadata:
1094+
for key, value in metadata["metadata"].items():
1095+
headers["x-goog-meta-" + key] = value
1096+
return headers
1097+
1098+
8471099
def _download_and_write_chunk_in_place(
8481100
maybe_pickled_blob, filename, start, end, download_kwargs
8491101
):
1102+
"""Helper function that runs inside a thread or subprocess.
1103+
1104+
`maybe_pickled_blob` is either a Blob (for threads) or a specially pickled
1105+
Blob (for processes) because the default pickling mangles Client objects
1106+
which are attached to Blobs."""
1107+
8501108
if isinstance(maybe_pickled_blob, Blob):
8511109
blob = maybe_pickled_blob
8521110
else:
@@ -863,9 +1121,9 @@ def _call_method_on_maybe_pickled_blob(
8631121
):
8641122
"""Helper function that runs inside a thread or subprocess.
8651123
866-
`maybe_pickled_blob` is either a blob (for threads) or a specially pickled
867-
blob (for processes) because the default pickling mangles clients which are
868-
attached to blobs."""
1124+
`maybe_pickled_blob` is either a Blob (for threads) or a specially pickled
1125+
Blob (for processes) because the default pickling mangles Client objects
1126+
which are attached to Blobs."""
8691127

8701128
if isinstance(maybe_pickled_blob, Blob):
8711129
blob = maybe_pickled_blob
@@ -894,8 +1152,8 @@ def _reduce_client(cl):
8941152
)
8951153

8961154

897-
def _pickle_blob(blob):
898-
"""Pickle a Blob (and its Bucket and Client) and return a bytestring."""
1155+
def _pickle_client(obj):
1156+
"""Pickle a Client or an object that owns a Client (like a Blob)"""
8991157

9001158
# We need a custom pickler to process Client objects, which are attached to
9011159
# Buckets (and therefore to Blobs in turn). Unfortunately, the Python
@@ -907,7 +1165,7 @@ def _pickle_blob(blob):
9071165
p = pickle.Pickler(f)
9081166
p.dispatch_table = copyreg.dispatch_table.copy()
9091167
p.dispatch_table[Client] = _reduce_client
910-
p.dump(blob)
1168+
p.dump(obj)
9111169
return f.getvalue()
9121170

9131171

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
"google-auth >= 1.25.0, < 3.0dev",
3232
"google-api-core >= 1.31.5, <3.0.0dev,!=2.0.*,!=2.1.*,!=2.2.*,!=2.3.0",
3333
"google-cloud-core >= 2.3.0, < 3.0dev",
34-
"google-resumable-media >= 2.3.2",
34+
"google-resumable-media >= 2.6.0",
3535
"requests >= 2.18.0, < 3.0.0dev",
3636
]
3737
extras = {"protobuf": ["protobuf<5.0.0dev"]}

0 commit comments

Comments
 (0)