Skip to content

Commit 36a07aa

Browse files
committed
Support Protobuf serialization in WebFlux
This commit introduces Protobuf support in WebFlux via dedicated codecs. Flux<Message> are serialized/deserialized using delimited Protobuf messages with the size of each message specified before the message itself. In that case, a "delimited=true" parameter is added to the content type. Mono<Message> are expected to use regular Protobuf message format (without the size prepended before the message). Related HttpMessageReader/Writer are automatically registered when the "com.google.protobuf:protobuf-java" library is detected in the classpath, and can be customized easily if needed via CodecConfigurer, for example to specify protocol extensions via the ExtensionRegistry based constructors. Both "application/x-protobuf" and "application/octet-stream" mime types are supported. Issue: SPR-15776
1 parent 4475c67 commit 36a07aa

23 files changed

+2225
-18
lines changed

spring-web/src/main/java/org/springframework/http/codec/CodecConfigurer.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,22 @@ interface DefaultCodecs {
110110
*/
111111
void jackson2JsonEncoder(Encoder<?> encoder);
112112

113+
/**
114+
* Override the default Protobuf {@code Decoder}.
115+
* @param decoder the decoder instance to use
116+
* @since 5.1
117+
* @see org.springframework.http.codec.protobuf.ProtobufDecoder
118+
*/
119+
void protobufDecoder(Decoder<?> decoder);
120+
121+
/**
122+
* Override the default Protobuf {@code HttpMessageReader}.
123+
* @param decoder the decoder instance to use
124+
* @since 5.1
125+
* @see org.springframework.http.codec.protobuf.ProtobufHttpMessageWriter
126+
*/
127+
void protobufWriter(HttpMessageWriter<?> decoder);
128+
113129
/**
114130
* Whether to log form data at DEBUG level, and headers at TRACE level.
115131
* Both may contain sensitive information.

spring-web/src/main/java/org/springframework/http/codec/EncoderHttpMessageWriter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,8 @@ private static MediaType addDefaultCharset(MediaType main, @Nullable MediaType d
160160
private boolean isStreamingMediaType(@Nullable MediaType contentType) {
161161
return (contentType != null && this.encoder instanceof HttpMessageEncoder &&
162162
((HttpMessageEncoder<?>) this.encoder).getStreamingMediaTypes().stream()
163-
.anyMatch(contentType::isCompatibleWith));
163+
.anyMatch(streamingMediaType -> contentType.isCompatibleWith(streamingMediaType) &&
164+
contentType.getParameters().entrySet().containsAll(streamingMediaType.getParameters().keySet())));
164165
}
165166

166167

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Copyright 2002-2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.http.codec.protobuf;
18+
19+
import java.util.Arrays;
20+
import java.util.Collections;
21+
import java.util.List;
22+
23+
import org.springframework.lang.Nullable;
24+
import org.springframework.util.MimeType;
25+
26+
/**
27+
* Base class providing support methods for Protobuf encoding and decoding.
28+
*
29+
* @author Sebastien Deleuze
30+
* @since 5.1
31+
*/
32+
public abstract class ProtobufCodecSupport {
33+
34+
static final List<MimeType> MIME_TYPES = Collections.unmodifiableList(
35+
Arrays.asList(
36+
new MimeType("application", "x-protobuf"),
37+
new MimeType("application", "octet-stream")));
38+
39+
static final String DELIMITED_KEY = "delimited";
40+
41+
static final String DELIMITED_VALUE = "true";
42+
43+
44+
protected boolean supportsMimeType(@Nullable MimeType mimeType) {
45+
return (mimeType == null || MIME_TYPES.stream().anyMatch(m -> m.isCompatibleWith(mimeType)));
46+
}
47+
48+
protected List<MimeType> getMimeTypes() {
49+
return MIME_TYPES;
50+
}
51+
}
Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
/*
2+
* Copyright 2002-2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.http.codec.protobuf;
18+
19+
import java.io.IOException;
20+
import java.lang.reflect.Method;
21+
import java.util.List;
22+
import java.util.Map;
23+
import java.util.concurrent.ConcurrentHashMap;
24+
import java.util.function.Function;
25+
26+
import com.google.protobuf.CodedInputStream;
27+
import com.google.protobuf.ExtensionRegistry;
28+
import com.google.protobuf.Message;
29+
import org.reactivestreams.Publisher;
30+
import reactor.core.publisher.Flux;
31+
import reactor.core.publisher.Mono;
32+
33+
import org.springframework.core.ResolvableType;
34+
import org.springframework.core.codec.Decoder;
35+
import org.springframework.core.codec.DecodingException;
36+
import org.springframework.core.io.buffer.DataBuffer;
37+
import org.springframework.core.io.buffer.DataBufferUtils;
38+
import org.springframework.util.Assert;
39+
import org.springframework.util.MimeType;
40+
41+
/**
42+
* A {@code Decoder} that reads {@link com.google.protobuf.Message}s
43+
* using <a href="https://developers.google.com/protocol-buffers/">Google Protocol Buffers</a>.
44+
*
45+
* Flux deserialized via
46+
* {@link #decode(Publisher, ResolvableType, MimeType, Map)} are expected to use
47+
* <a href="https://developers.google.com/protocol-buffers/docs/techniques?hl=en#streaming">delimited Protobuf messages</a>
48+
* with the size of each message specified before the message itself. Single values deserialized
49+
* via {@link #decodeToMono(Publisher, ResolvableType, MimeType, Map)} are expected to use
50+
* regular Protobuf message format (without the size prepended before the message).
51+
*
52+
* <p>To generate {@code Message} Java classes, you need to install the {@code protoc} binary.
53+
*
54+
* <p>This decoder requires Protobuf 3 or higher, and supports
55+
* {@code "application/x-protobuf"} and {@code "application/octet-stream"} with the official
56+
* {@code "com.google.protobuf:protobuf-java"} library.
57+
*
58+
* @author Sébastien Deleuze
59+
* @since 5.1
60+
* @see ProtobufEncoder
61+
*/
62+
public class ProtobufDecoder extends ProtobufCodecSupport implements Decoder<Message> {
63+
64+
/**
65+
* The default max size for aggregating messages.
66+
*/
67+
protected static final int DEFAULT_MESSAGE_MAX_SIZE = 64 * 1024;
68+
69+
private static final ConcurrentHashMap<Class<?>, Method> methodCache = new ConcurrentHashMap<>();
70+
71+
private final ExtensionRegistry extensionRegistry;
72+
73+
private int maxMessageSize = DEFAULT_MESSAGE_MAX_SIZE;
74+
75+
76+
/**
77+
* Construct a new {@code ProtobufDecoder}.
78+
*/
79+
public ProtobufDecoder() {
80+
this(ExtensionRegistry.newInstance());
81+
}
82+
83+
/**
84+
* Construct a new {@code ProtobufDecoder} with an initializer that allows the
85+
* registration of message extensions.
86+
* @param extensionRegistry a message extension registry
87+
*/
88+
public ProtobufDecoder(ExtensionRegistry extensionRegistry) {
89+
Assert.notNull(extensionRegistry, "ExtensionRegistry must not be null");
90+
this.extensionRegistry = extensionRegistry;
91+
}
92+
93+
public void setMaxMessageSize(int maxMessageSize) {
94+
this.maxMessageSize = maxMessageSize;
95+
}
96+
97+
@Override
98+
public boolean canDecode(ResolvableType elementType, MimeType mimeType) {
99+
return Message.class.isAssignableFrom(elementType.getRawClass()) && supportsMimeType(mimeType);
100+
}
101+
102+
@Override
103+
public Flux<Message> decode(Publisher<DataBuffer> inputStream, ResolvableType elementType,
104+
MimeType mimeType, Map<String, Object> hints) {
105+
106+
return Flux.from(inputStream)
107+
.concatMap(new MessageDecoderFunction(elementType, this.maxMessageSize));
108+
}
109+
110+
@Override
111+
public Mono<Message> decodeToMono(Publisher<DataBuffer> inputStream, ResolvableType elementType,
112+
MimeType mimeType, Map<String, Object> hints) {
113+
return DataBufferUtils.join(inputStream).map(dataBuffer -> {
114+
try {
115+
Message.Builder builder = getMessageBuilder(elementType.getRawClass());
116+
builder.mergeFrom(CodedInputStream.newInstance(dataBuffer.asByteBuffer()), this.extensionRegistry);
117+
Message message = builder.build();
118+
DataBufferUtils.release(dataBuffer);
119+
return message;
120+
}
121+
catch (IOException ex) {
122+
throw new DecodingException("I/O error while parsing input stream", ex);
123+
}
124+
catch (Exception ex) {
125+
throw new DecodingException("Could not read Protobuf message: " + ex.getMessage(), ex);
126+
}
127+
}
128+
);
129+
}
130+
131+
/**
132+
* Create a new {@code Message.Builder} instance for the given class.
133+
* <p>This method uses a ConcurrentHashMap for caching method lookups.
134+
*/
135+
private static Message.Builder getMessageBuilder(Class<?> clazz) throws Exception {
136+
Method method = methodCache.get(clazz);
137+
if (method == null) {
138+
method = clazz.getMethod("newBuilder");
139+
methodCache.put(clazz, method);
140+
}
141+
return (Message.Builder) method.invoke(clazz);
142+
}
143+
144+
@Override
145+
public List<MimeType> getDecodableMimeTypes() {
146+
return getMimeTypes();
147+
}
148+
149+
150+
private class MessageDecoderFunction implements Function<DataBuffer, Publisher<? extends Message>> {
151+
152+
private final ResolvableType elementType;
153+
154+
private final int maxMessageSize;
155+
156+
private DataBuffer output;
157+
158+
private int messageBytesToRead;
159+
160+
public MessageDecoderFunction(ResolvableType elementType, int maxMessageSize) {
161+
this.elementType = elementType;
162+
this.maxMessageSize = maxMessageSize;
163+
}
164+
165+
// TODO Instead of the recursive call, loop over the current DataBuffer, produce a list of as many messages as are contained, and save any remaining bytes with flatMapIterable
166+
@Override
167+
public Publisher<? extends Message> apply(DataBuffer input) {
168+
169+
try {
170+
if (this.output == null) {
171+
int firstByte = input.read();
172+
if (firstByte == -1) {
173+
return Flux.error(new DecodingException("Can't parse message size"));
174+
}
175+
this.messageBytesToRead = CodedInputStream.readRawVarint32(firstByte, input.asInputStream());
176+
if (this.messageBytesToRead > this.maxMessageSize) {
177+
return Flux.error(new DecodingException(
178+
"The number of bytes to read parsed in the incoming stream (" +
179+
this.messageBytesToRead + ") exceeds the configured limit (" + this.maxMessageSize + ")"));
180+
}
181+
this.output = input.factory().allocateBuffer(this.messageBytesToRead);
182+
}
183+
int chunkBytesToRead = this.messageBytesToRead >= input.readableByteCount() ?
184+
input.readableByteCount() : this.messageBytesToRead;
185+
int remainingBytesToRead = input.readableByteCount() - chunkBytesToRead;
186+
this.output.write(input.slice(input.readPosition(), chunkBytesToRead));
187+
this.messageBytesToRead -= chunkBytesToRead;
188+
Message message = null;
189+
if (this.messageBytesToRead == 0) {
190+
Message.Builder builder = getMessageBuilder(this.elementType.getRawClass());
191+
builder.mergeFrom(CodedInputStream.newInstance(this.output.asByteBuffer()), extensionRegistry);
192+
message = builder.build();
193+
DataBufferUtils.release(this.output);
194+
this.output = null;
195+
}
196+
if (remainingBytesToRead > 0) {
197+
return Mono.justOrEmpty(message).concatWith(
198+
apply(input.slice(input.readPosition() + chunkBytesToRead, remainingBytesToRead)));
199+
}
200+
else {
201+
return Mono.justOrEmpty(message);
202+
}
203+
}
204+
catch (IOException ex) {
205+
return Flux.error(new DecodingException("I/O error while parsing input stream", ex));
206+
}
207+
catch (Exception ex) {
208+
return Flux.error(new DecodingException("Could not read Protobuf message: " + ex.getMessage(), ex));
209+
}
210+
}
211+
}
212+
}

0 commit comments

Comments
 (0)