Skip to content

feat: add blob.uploadfrom(inputstream) #162

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Mar 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@
import com.google.common.io.BaseEncoding;
import com.google.common.io.CountingOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.OutputStream;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.Key;
Expand Down Expand Up @@ -73,7 +75,8 @@ public Blob apply(Tuple<Storage, StorageObject> pb) {
}
};

private static final int DEFAULT_CHUNK_SIZE = 2 * 1024 * 1024;
private static final int DEFAULT_CHUNK_SIZE = 15 * 1024 * 1024;
private static final int MIN_BUFFER_SIZE = 256 * 1024;

/** Class for specifying blob source options when {@code Blob} methods are used. */
public static class BlobSourceOption extends Option {
Expand Down Expand Up @@ -260,6 +263,88 @@ public void downloadTo(Path path) {
downloadTo(path, new BlobSourceOption[0]);
}

/**
* Uploads the given file path to this blob using specified blob write options.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I probably just don't understand how this API works, but this sounds off to me. I'd expect a static method that uploads the path to a new Blob and returns it. What does it mean to "upload to this blob"? I'm missing something here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The storage hierarchy is the following:
Class BlobId: A BlobId object includes the name of the containing bucket, the blob's name and possibly the blob's generation. If getGeneration() is null the identifier refers to the latest blob's generation.

Class BlobInfo: encapsulate BlobId and set of properties.

Class Blob extends BlobInfo and adds a number of methods to modify the blob like update, copyTo, downloadTo, etc. Objects of this class are immutable. Operations that modify the blob return a new object.

New Blob.uploadFrom() methods will allow those who already have a blob instance to upload some content to it as simply as blob = blob.uploadFrom(myFile);

I'd expect a static method that uploads the path to a new Blob and returns it.

A static method would require an extra parameter storage, which is not very convenient.
As I said previously I'm going to extend the Storage interface with new methods:

void upload(BlobInfo blobInfo, Path path, BlobWriteOption... options);
void upload(BlobInfo blobInfo, InputStream content, BlobWriteOption... options);
public Blob create(BlobInfo blobInfo, Path path, BlobWriteOption... options);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what's throwing me: the object is immutable but has methods to modify it? Why?

This seems to be the existing API, but it's still extremely wonky. After you call uploadFrom (or downloadTo) how does the new Blob bject returned differ from the original Blob object?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can consider a Blob instance like a pointer to {bucket, name, generation} and methods to operate with that pointer. Each modification of the blob causes the generation change, so existing pointer becomes outdated and the Blob instance useless(Immutability does not allow to modify the generation in place). A new object returned by uploadFrom has the new generation value pointing to the latest blob version.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if the docs said that, or the classes were named accurately. but c'est la vie.

#176

My immediate question is what exactly has changed in the object? What field has been modified and how?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new Blob objects are constructed from data received from GCS.
In case of uploadFrom only 'generation' and 'size' will be changed. (These fields are inherited from the BlobInfo class.)
uploadFrom uses WriteChannel, so it has to perform a separate GET request to obtain the fresh data.
upload method performs the RPC request and obtain the data to construct new object from the response.

*
* @param path file to upload
* @param options blob write options
* @return updated blob
* @throws IOException on I/O error
* @throws StorageException on failure
*/
public Blob uploadFrom(Path path, BlobWriteOption... options) throws IOException {
if (Files.isDirectory(path)) {
throw new StorageException(0, path + " is a directory");
}
try (InputStream input = Files.newInputStream(path)) {
return uploadFrom(input, options);
}
}

/**
* Uploads the given content to this blob using specified blob write options.
*
* @param input content to upload
* @param options blob write options
* @return updated blob
* @throws IOException on I/O error
* @throws StorageException on failure
*/
public Blob uploadFrom(InputStream input, BlobWriteOption... options) throws IOException {
try (WriteChannel writer = storage.writer(this, options)) {
uploadFrom(input, writer);
}
BlobId blobId = getBlobId();
try {
return storage.get(BlobId.of(blobId.getBucket(), blobId.getName()));
} catch (StorageException e) {
throw new StorageException(
e.getCode(), "Content has been uploaded successfully. Failed to retrieve blob.", e);
}
}

static void uploadFrom(InputStream input, WriteChannel writer) throws IOException {
Copy link
Contributor

@elharo elharo Mar 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be private?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method will be invoked from the StorageImpl class

uploadFrom(input, writer, DEFAULT_CHUNK_SIZE);
}

/**
* Uploads the given content to the storage using specified write channel and the given buffer
* size. Other uploadFrom() methods invoke this one with a buffer size of 15 MiB. Users can pass
* alternative values. Larger buffer sizes might improve the upload performance but require more
* memory. This can cause an OutOfMemoryError or add significant garbage collection overhead.
* Smaller buffer sizes reduce memory consumption, that is noticeable when uploading many objects
* in parallel. Buffer sizes less than 256 KiB are treated as 256 KiB.
*
* <p>This method does not close either the InputStream or the WriterChannel.
*
* <p>Example of uploading:
*
* <pre>{@code
* BlobId blobId = BlobId.of(bucketName, blobName);
* BlobInfo blobInfo = BlobInfo.newBuilder(blobId).setContentType("video/webm").build();
* Path file = Paths.get("humongous.file");
* try (InputStream input = Files.newInputStream(file); WriteChannel writer = storage.writer(blobInfo)) {
* Blob.uploadFrom(input, writer, 150 * 1024 * 1024);
* } catch (IOException e) {
* // your handler
* }
* }</pre>
*
* @param input content to upload
* @param writer channel
* @param bufferSize size of the buffer to read from input and send over writer
* @throws IOException on I/O error
*/
public static void uploadFrom(InputStream input, WriteChannel writer, int bufferSize)
throws IOException {
bufferSize = Math.max(bufferSize, MIN_BUFFER_SIZE);
byte[] buffer = new byte[bufferSize];
int length;
while ((length = input.read(buffer)) >= 0) {
writer.write(ByteBuffer.wrap(buffer, 0, length));
}
}

/** Builder for {@code Blob}. */
public static class Builder extends BlobInfo.Builder {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import com.google.api.core.ApiClock;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.services.storage.model.StorageObject;
import com.google.cloud.ReadChannel;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.Acl.Project;
import com.google.cloud.storage.Acl.Project.ProjectRole;
import com.google.cloud.storage.Acl.Role;
Expand All @@ -48,10 +50,17 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.BaseEncoding;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.security.Key;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -586,7 +595,7 @@ public void testBuilder() {
}

@Test
public void testDownload() throws Exception {
public void testDownloadTo() throws Exception {
final byte[] expected = {1, 2};
StorageRpc mockStorageRpc = createNiceMock(StorageRpc.class);
expect(storage.getOptions()).andReturn(mockOptions).times(1);
Expand Down Expand Up @@ -618,7 +627,7 @@ public Long answer() throws Throwable {
}

@Test
public void testDownloadWithRetries() throws Exception {
public void testDownloadToWithRetries() throws Exception {
final byte[] expected = {1, 2};
StorageRpc mockStorageRpc = createNiceMock(StorageRpc.class);
expect(storage.getOptions()).andReturn(mockOptions);
Expand Down Expand Up @@ -662,4 +671,135 @@ public Long answer() throws Throwable {
byte actual[] = Files.readAllBytes(file.toPath());
assertArrayEquals(expected, actual);
}

@Test
public void testUploadFromNonExistentFile() {
initializeExpectedBlob(1);
expect(storage.getOptions()).andReturn(mockOptions);
replay(storage);
blob = new Blob(storage, new BlobInfo.BuilderImpl(BLOB_INFO));
String fileName = "non_existing_file.txt";
try {
blob.uploadFrom(Paths.get(fileName));
fail();
} catch (IOException e) {
assertEquals(NoSuchFileException.class, e.getClass());
assertEquals(fileName, e.getMessage());
}
}

@Test
public void testUploadFromDirectory() throws IOException {
initializeExpectedBlob(1);
expect(storage.getOptions()).andReturn(mockOptions);
replay(storage);
blob = new Blob(storage, new BlobInfo.BuilderImpl(BLOB_INFO));
Path dir = Files.createTempDirectory("unit_");
try {
blob.uploadFrom(dir);
fail();
} catch (StorageException e) {
assertEquals(dir + " is a directory", e.getMessage());
}
}

private WriteChannel createWriteChannelMock(byte[] bytes) throws Exception {
WriteChannel channel = createMock(WriteChannel.class);
ByteBuffer expectedByteBuffer = ByteBuffer.wrap(bytes, 0, bytes.length);
expect(channel.write(expectedByteBuffer)).andReturn(bytes.length);
channel.close();
replay(channel);
return channel;
}

private Blob createBlobForUpload(WriteChannel channel) {
initializeExpectedBlob(1);
BlobId blobId = BlobId.of(BLOB_INFO.getBucket(), BLOB_INFO.getName());
expect(storage.getOptions()).andReturn(mockOptions);
expect(storage.writer(eq(expectedBlob))).andReturn(channel);
expect(storage.get(blobId)).andReturn(expectedBlob);
replay(storage);
return new Blob(storage, new BlobInfo.BuilderImpl(BLOB_INFO));
}

@Test
public void testUploadFromFile() throws Exception {
byte[] dataToSend = {1, 2, 3};
WriteChannel channel = createWriteChannelMock(dataToSend);
blob = createBlobForUpload(channel);
Path tempFile = Files.createTempFile("testUpload", ".tmp");
Files.write(tempFile, dataToSend);
blob = blob.uploadFrom(tempFile);
assertSame(expectedBlob, blob);
}

@Test
public void testUploadFromStream() throws Exception {
byte[] dataToSend = {1, 2, 3, 4, 5};
WriteChannel channel = createWriteChannelMock(dataToSend);
blob = createBlobForUpload(channel);
InputStream input = new ByteArrayInputStream(dataToSend);
blob = blob.uploadFrom(input);
assertSame(expectedBlob, blob);
}

@Test
public void testUploadFromStreamRetrieveFailed() throws Exception {
byte[] dataToSend = {1, 2, 3, 4, 5};
StorageException storageException = new StorageException(123, "message");
WriteChannel channel = createWriteChannelMock(dataToSend);
initializeExpectedBlob(1);
BlobId blobId = BlobId.of(BLOB_INFO.getBucket(), BLOB_INFO.getName());
expect(storage.getOptions()).andReturn(mockOptions);
expect(storage.writer(eq(expectedBlob))).andReturn(channel);
expect(storage.get(blobId)).andThrow(storageException);
replay(storage);
Blob blob = new Blob(storage, new BlobInfo.BuilderImpl(BLOB_INFO));
InputStream input = new ByteArrayInputStream(dataToSend);
try {
blob.uploadFrom(input);
fail();
} catch (StorageException e) {
assertEquals(
"Content has been uploaded successfully. Failed to retrieve blob.", e.getMessage());
assertSame(e.getCause(), storageException);
}
}

@Test
public void testUpload() throws Exception {
replay(storage);
byte[] dataToSend = {1, 2, 3, 4, 5};
WriteChannel channel = createWriteChannelMock(dataToSend);
InputStream input = new ByteArrayInputStream(dataToSend);
Blob.uploadFrom(input, channel);
}

@Test
public void testUploadSmallBufferSize() throws Exception {
replay(storage);
byte[] dataToSend = new byte[100_000];
WriteChannel channel = createWriteChannelMock(dataToSend);
InputStream input = new ByteArrayInputStream(dataToSend);
Blob.uploadFrom(input, channel, 100);
}

@Test
public void testUploadMultiplePortions() throws Exception {
replay(storage);
int totalSize = 400_000;
int bufferSize = 300_000;
byte[] dataToSend = new byte[totalSize];
dataToSend[0] = 42;
dataToSend[bufferSize] = 43;

WriteChannel channel = createMock(WriteChannel.class);
expect(channel.write(ByteBuffer.wrap(dataToSend, 0, bufferSize))).andReturn(bufferSize);
expect(channel.write(ByteBuffer.wrap(dataToSend, bufferSize, totalSize - bufferSize)))
.andReturn(bufferSize - bufferSize);
channel.close();
replay(channel);
InputStream input = new ByteArrayInputStream(dataToSend);
Blob.uploadFrom(input, channel, bufferSize);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.google.cloud.Policy;
import com.google.cloud.ReadChannel;
import com.google.cloud.RestorableState;
import com.google.cloud.RetryHelper;
import com.google.cloud.TransportOptions;
import com.google.cloud.WriteChannel;
import com.google.cloud.http.HttpTransportOptions;
Expand Down Expand Up @@ -101,6 +102,8 @@
import java.net.URL;
import java.net.URLConnection;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.Key;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -2939,4 +2942,56 @@ public void testBucketLogging() throws ExecutionException, InterruptedException
RemoteStorageHelper.forceDelete(storage, loggingBucket, 5, TimeUnit.SECONDS);
}
}

@Test
public void testUpload() throws Exception {
String blobName = "test-upload-static";
BlobId blobId = BlobId.of(BUCKET, blobName);
try (WriteChannel writer = storage.writer(BlobInfo.newBuilder(blobId).build())) {
Blob.uploadFrom(new ByteArrayInputStream(BLOB_STRING_CONTENT.getBytes(UTF_8)), writer, 1);
}
Blob blob = storage.get(blobId);
String readString = new String(blob.getContent(), UTF_8);
assertEquals(BLOB_STRING_CONTENT, readString);
}

@Test
public void testUploadFromDownloadTo() throws Exception {
String blobName = "test-uploadFrom-downloadTo-blob";
BlobInfo blobInfo = BlobInfo.newBuilder(BUCKET, blobName).build();

Path tempFileFrom = Files.createTempFile("ITStorageTest_", ".tmp");
Files.write(tempFileFrom, BLOB_BYTE_CONTENT);
Blob blob = storage.create(blobInfo);
blob = blob.uploadFrom(tempFileFrom);

Path tempFileTo = Files.createTempFile("ITStorageTest_", ".tmp");
blob.downloadTo(tempFileTo);
byte[] readBytes = Files.readAllBytes(tempFileTo);
assertArrayEquals(BLOB_BYTE_CONTENT, readBytes);
}

@Test
public void testUploadFromDownloadToWithEncryption() throws Exception {
String blobName = "test-uploadFrom-downloadTo-withEncryption-blob";
BlobInfo blobInfo = BlobInfo.newBuilder(BUCKET, blobName).build();

Path tempFileFrom = Files.createTempFile("ITStorageTest_", ".tmp");
Files.write(tempFileFrom, BLOB_BYTE_CONTENT);
Blob blob = storage.create(blobInfo);
blob = blob.uploadFrom(tempFileFrom, Storage.BlobWriteOption.encryptionKey(KEY));

Path tempFileTo = Files.createTempFile("ITStorageTest_", ".tmp");
try {
blob.downloadTo(tempFileTo);
} catch (RetryHelper.RetryHelperException e) {
// Expected to be StorageException
String expectedMessage =
"The target object is encrypted by a customer-supplied encryption key.";
assertTrue(e.getMessage().contains(expectedMessage));
}
blob.downloadTo(tempFileTo, Blob.BlobSourceOption.decryptionKey(KEY));
byte[] readBytes = Files.readAllBytes(tempFileTo);
assertArrayEquals(BLOB_BYTE_CONTENT, readBytes);
}
}