| | | 1 | | // Copyright (c) ZeroC, Inc. |
| | | 2 | | |
| | | 3 | | using Google.Protobuf; |
| | | 4 | | using System.Buffers; |
| | | 5 | | using System.Buffers.Binary; |
| | | 6 | | using System.Diagnostics; |
| | | 7 | | using System.IO.Pipelines; |
| | | 8 | | |
| | | 9 | | namespace IceRpc.Protobuf.RpcMethods.Internal; |
| | | 10 | | |
| | | 11 | | /// <summary>Provides extension methods for <see cref="PipeReader" />.</summary> |
| | | 12 | | internal static class PipeReaderExtensions |
| | | 13 | | { |
| | | 14 | | /// <summary>Decodes a Protobuf length-prefixed message from a <see cref="PipeReader" />.</summary> |
| | | 15 | | /// <param name="reader">The <see cref="PipeReader" /> containing the Protobuf length-prefixed message.</param> |
| | | 16 | | /// <param name="parser">The <see cref="MessageParser{T}" /> used to parse the message data.</param> |
| | | 17 | | /// <param name="maxMessageLength">The maximum allowed length.</param> |
| | | 18 | | /// <param name="acceptEmptyPayload">Indicates whether to accept or reject empty payloads.</param> |
| | | 19 | | /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param> |
| | | 20 | | /// <returns>The decoded message object.</returns> |
| | | 21 | | /// <remarks>The envelope follows the gRPC Length-Prefixed-Message format: a 1-byte compression flag (0 for |
| | | 22 | | /// uncompressed; 1 indicates compressed and is rejected as not supported), followed by the message length |
| | | 23 | | /// as a 4-byte unsigned big-endian integer, followed by the Protobuf-encoded message bytes. See |
| | | 24 | | /// <see href="https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#data-frames" />.</remarks> |
| | | 25 | | internal static async ValueTask<T> DecodeProtobufMessageAsync<T>( |
| | | 26 | | this PipeReader reader, |
| | | 27 | | MessageParser<T> parser, |
| | | 28 | | int maxMessageLength, |
| | | 29 | | bool acceptEmptyPayload, |
| | | 30 | | CancellationToken cancellationToken) where T : class, IMessage<T> |
| | 42 | 31 | | { |
| | 42 | 32 | | T? message = await reader.ReadProtobufMessageAsync( |
| | 42 | 33 | | parser, |
| | 42 | 34 | | maxMessageLength, |
| | 42 | 35 | | cancellationToken).ConfigureAwait(false); |
| | | 36 | | |
| | 35 | 37 | | if (message is null) |
| | 1 | 38 | | { |
| | 1 | 39 | | if (acceptEmptyPayload) |
| | 1 | 40 | | { |
| | | 41 | | // An empty payload represents a default-constructed message. Used for oneway requests. |
| | 1 | 42 | | message = parser.ParseFrom([]); |
| | 1 | 43 | | } |
| | | 44 | | else |
| | 0 | 45 | | { |
| | 0 | 46 | | throw new InvalidDataException( |
| | 0 | 47 | | "The payload is empty; a length-prefixed Protobuf message was expected."); |
| | | 48 | | } |
| | 1 | 49 | | } |
| | | 50 | | else |
| | 34 | 51 | | { |
| | | 52 | | // A unary payload must contain exactly one message; any trailing bytes indicate a framing error. |
| | 34 | 53 | | ReadResult readResult = await reader.ReadAsync(cancellationToken).ConfigureAwait(false); |
| | | 54 | | // We never call CancelPendingRead; an interceptor or middleware can but it's not correct. |
| | 34 | 55 | | if (readResult.IsCanceled) |
| | 0 | 56 | | { |
| | 0 | 57 | | throw new InvalidOperationException("Unexpected call to CancelPendingRead."); |
| | | 58 | | } |
| | 34 | 59 | | bool hasTrailingBytes = !readResult.Buffer.IsEmpty; |
| | 34 | 60 | | reader.AdvanceTo(readResult.Buffer.End); |
| | 34 | 61 | | if (hasTrailingBytes) |
| | 2 | 62 | | { |
| | 2 | 63 | | throw new InvalidDataException( |
| | 2 | 64 | | "The payload contains unexpected trailing bytes after the Protobuf message."); |
| | | 65 | | } |
| | 32 | 66 | | Debug.Assert(readResult.IsCompleted); |
| | 32 | 67 | | } |
| | | 68 | | |
| | 33 | 69 | | return message; |
| | 33 | 70 | | } |
| | | 71 | | |
| | | 72 | | /// <summary>Creates an async stream over a pipe reader to decode Protobuf messages.</summary> |
| | | 73 | | /// <typeparam name="T">The type of the message being decoded.</typeparam> |
| | | 74 | | /// <param name="reader">The pipe reader.</param> |
| | | 75 | | /// <param name="messageParser">The <see cref="MessageParser{T}" /> used to parse the message data.</param> |
| | | 76 | | /// <param name="maxMessageLength">The maximum allowed length.</param> |
| | | 77 | | /// <returns>The async stream to decode and return the streamed messages.</returns> |
| | | 78 | | /// <remarks>The reader ownership is transferred to the returned async stream. The caller should no longer use |
| | | 79 | | /// the reader after this call, and must dispose the returned async stream when done to release the reader. |
| | | 80 | | /// </remarks> |
| | | 81 | | internal static IAsyncStream<T> ToAsyncStream<T>( |
| | | 82 | | this PipeReader reader, |
| | | 83 | | MessageParser<T> messageParser, |
| | | 84 | | int maxMessageLength) where T : class, IMessage<T> => |
| | 232 | 85 | | new AsyncStream<T>(reader, messageParser, maxMessageLength); |
| | | 86 | | |
| | | 87 | | /// <summary>Reads a single Protobuf length-prefixed message from a <see cref="PipeReader" />.</summary> |
| | | 88 | | /// <typeparam name="T">The type of the message being decoded.</typeparam> |
| | | 89 | | /// <param name="reader">The pipe reader.</param> |
| | | 90 | | /// <param name="messageParser">The <see cref="MessageParser{T}" /> used to parse the message data.</param> |
| | | 91 | | /// <param name="maxMessageLength">The maximum allowed length.</param> |
| | | 92 | | /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param> |
| | | 93 | | /// <returns>The decoded message, or <see langword="null" /> when the reader's stream is empty (end of stream). |
| | | 94 | | /// The <see langword="null" /> return value is used as a sentinel to signal end of stream; this is why |
| | | 95 | | /// <typeparamref name="T" /> is constrained to be a reference type.</returns> |
| | | 96 | | internal static async ValueTask<T?> ReadProtobufMessageAsync<T>( |
| | | 97 | | this PipeReader reader, |
| | | 98 | | MessageParser<T> messageParser, |
| | | 99 | | int maxMessageLength, |
| | | 100 | | CancellationToken cancellationToken) where T : class, IMessage<T> |
| | 65956 | 101 | | { |
| | 65956 | 102 | | ReadResult readResult = await reader.ReadAtLeastAsync(5, cancellationToken).ConfigureAwait(false); |
| | | 103 | | // We never call CancelPendingRead; an interceptor or middleware can but it's not correct. |
| | 65848 | 104 | | if (readResult.IsCanceled) |
| | 0 | 105 | | { |
| | 0 | 106 | | throw new InvalidOperationException("Unexpected call to CancelPendingRead."); |
| | | 107 | | } |
| | | 108 | | |
| | 65848 | 109 | | if (readResult.Buffer.IsEmpty) |
| | 21 | 110 | | { |
| | 21 | 111 | | return null; |
| | | 112 | | } |
| | | 113 | | |
| | 65827 | 114 | | if (readResult.Buffer.Length < 5) |
| | 1 | 115 | | { |
| | 1 | 116 | | throw new InvalidDataException( |
| | 1 | 117 | | $"The payload has {readResult.Buffer.Length} bytes, but 5 bytes were expected."); |
| | | 118 | | } |
| | | 119 | | |
| | 65826 | 120 | | byte compressionFlag = readResult.Buffer.FirstSpan[0]; |
| | 65826 | 121 | | switch (compressionFlag) |
| | | 122 | | { |
| | | 123 | | case 0: |
| | 65823 | 124 | | break; |
| | | 125 | | case 1: |
| | | 126 | | // The Protobuf envelope defines flag 1 as "compressed". The message is well-formed Protobuf, |
| | | 127 | | // but IceRPC doesn't decompress it. |
| | 1 | 128 | | throw new NotSupportedException("IceRPC does not support Protobuf compressed messages."); |
| | | 129 | | default: |
| | | 130 | | // The Protobuf envelope only defines flags 0 and 1; any other value is malformed. |
| | 2 | 131 | | throw new InvalidDataException( |
| | 2 | 132 | | $"Invalid Protobuf compression flag {compressionFlag}; expected 0 or 1."); |
| | | 133 | | } |
| | 65823 | 134 | | int messageLength = DecodeMessageLength(readResult.Buffer.Slice(1, 4), maxMessageLength); |
| | 65821 | 135 | | reader.AdvanceTo(readResult.Buffer.GetPosition(5)); |
| | | 136 | | |
| | 65821 | 137 | | readResult = await reader.ReadAtLeastAsync(messageLength, cancellationToken).ConfigureAwait(false); |
| | | 138 | | // We never call CancelPendingRead; an interceptor or middleware can but it's not correct. |
| | 65821 | 139 | | if (readResult.IsCanceled) |
| | 0 | 140 | | { |
| | 0 | 141 | | throw new InvalidOperationException("Unexpected call to CancelPendingRead."); |
| | | 142 | | } |
| | | 143 | | |
| | 65821 | 144 | | if (readResult.Buffer.Length < messageLength) |
| | 0 | 145 | | { |
| | 0 | 146 | | throw new InvalidDataException( |
| | 0 | 147 | | $"The payload has {readResult.Buffer.Length} bytes, but {messageLength} bytes were expected."); |
| | | 148 | | } |
| | | 149 | | |
| | | 150 | | T message; |
| | | 151 | | try |
| | 65821 | 152 | | { |
| | | 153 | | // ParseFrom reads tags until end-of-input and throws InvalidProtocolBufferException on a |
| | | 154 | | // truncated field, so passing an exact-length slice means it either consumes every byte or |
| | | 155 | | // throws. |
| | 65821 | 156 | | message = messageParser.ParseFrom(readResult.Buffer.Slice(0, messageLength)); |
| | 65819 | 157 | | } |
| | 2 | 158 | | catch (InvalidProtocolBufferException exception) |
| | 2 | 159 | | { |
| | 2 | 160 | | throw new InvalidDataException("Failed to decode the Protobuf message.", exception); |
| | | 161 | | } |
| | 65819 | 162 | | reader.AdvanceTo(readResult.Buffer.GetPosition(messageLength)); |
| | 65819 | 163 | | return message; |
| | | 164 | | |
| | | 165 | | static int DecodeMessageLength(ReadOnlySequence<byte> buffer, int maxMessageLength) |
| | 65823 | 166 | | { |
| | 65823 | 167 | | Debug.Assert(buffer.Length == 4); |
| | 65823 | 168 | | Span<byte> spanBuffer = stackalloc byte[4]; |
| | 65823 | 169 | | buffer.CopyTo(spanBuffer); |
| | | 170 | | // The Protobuf envelope encodes the length as an unsigned 32-bit big-endian integer. The cast of |
| | | 171 | | // maxMessageLength to uint is safe because it is a non-negative int, and covers both the > int.MaxValue |
| | | 172 | | // case and the > maxMessageLength case in a single comparison. |
| | 65823 | 173 | | uint messageLength = BinaryPrimitives.ReadUInt32BigEndian(spanBuffer); |
| | 65823 | 174 | | if (messageLength > (uint)maxMessageLength) |
| | 2 | 175 | | { |
| | 2 | 176 | | throw new InvalidDataException("The message length exceeds the maximum value."); |
| | | 177 | | } |
| | 65821 | 178 | | return (int)messageLength; |
| | 65821 | 179 | | } |
| | 65840 | 180 | | } |
| | | 181 | | } |