< Summary

Information
Class: IceRpc.Protobuf.RpcMethods.Internal.PipeReaderExtensions
Assembly: IceRpc.Protobuf
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Protobuf/RpcMethods/Internal/PipeReaderExtensions.cs
Tag: 1856_27024993493
Line coverage
84%
Covered lines: 66
Uncovered lines: 12
Coverable lines: 78
Total lines: 181
Line coverage: 84.6%
Branch coverage
79%
Covered branches: 19
Total branches: 24
Branch coverage: 79.1%
Method coverage
100%
Covered methods: 4
Fully covered methods: 2
Total methods: 4
Method coverage: 100%
Full method coverage: 50%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
DecodeProtobufMessageAsync()75%8883.33%
ToAsyncStream(...)100%11100%
ReadProtobufMessageAsync()78.57%151481.08%
DecodeMessageLength()100%22100%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Protobuf/RpcMethods/Internal/PipeReaderExtensions.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using Google.Protobuf;
 4using System.Buffers;
 5using System.Buffers.Binary;
 6using System.Diagnostics;
 7using System.IO.Pipelines;
 8
 9namespace IceRpc.Protobuf.RpcMethods.Internal;
 10
 11/// <summary>Provides extension methods for <see cref="PipeReader" />.</summary>
 12internal static class PipeReaderExtensions
 13{
 14    /// <summary>Decodes a Protobuf length-prefixed message from a <see cref="PipeReader" />.</summary>
 15    /// <param name="reader">The <see cref="PipeReader" /> containing the Protobuf length-prefixed message.</param>
 16    /// <param name="parser">The <see cref="MessageParser{T}" /> used to parse the message data.</param>
 17    /// <param name="maxMessageLength">The maximum allowed length.</param>
 18    /// <param name="acceptEmptyPayload">Indicates whether to accept or reject empty payloads.</param>
 19    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 20    /// <returns>The decoded message object.</returns>
 21    /// <remarks>The envelope follows the gRPC Length-Prefixed-Message format: a 1-byte compression flag (0 for
 22    /// uncompressed; 1 indicates compressed and is rejected as not supported), followed by the message length
 23    /// as a 4-byte unsigned big-endian integer, followed by the Protobuf-encoded message bytes. See
 24    /// <see href="https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#data-frames" />.</remarks>
 25    internal static async ValueTask<T> DecodeProtobufMessageAsync<T>(
 26        this PipeReader reader,
 27        MessageParser<T> parser,
 28        int maxMessageLength,
 29        bool acceptEmptyPayload,
 30        CancellationToken cancellationToken) where T : class, IMessage<T>
 4231    {
 4232        T? message = await reader.ReadProtobufMessageAsync(
 4233            parser,
 4234            maxMessageLength,
 4235            cancellationToken).ConfigureAwait(false);
 36
 3537        if (message is null)
 138        {
 139            if (acceptEmptyPayload)
 140            {
 41                // An empty payload represents a default-constructed message. Used for oneway requests.
 142                message = parser.ParseFrom([]);
 143            }
 44            else
 045            {
 046                throw new InvalidDataException(
 047                    "The payload is empty; a length-prefixed Protobuf message was expected.");
 48            }
 149        }
 50        else
 3451        {
 52            // A unary payload must contain exactly one message; any trailing bytes indicate a framing error.
 3453            ReadResult readResult = await reader.ReadAsync(cancellationToken).ConfigureAwait(false);
 54            // We never call CancelPendingRead; an interceptor or middleware can but it's not correct.
 3455            if (readResult.IsCanceled)
 056            {
 057                throw new InvalidOperationException("Unexpected call to CancelPendingRead.");
 58            }
 3459            bool hasTrailingBytes = !readResult.Buffer.IsEmpty;
 3460            reader.AdvanceTo(readResult.Buffer.End);
 3461            if (hasTrailingBytes)
 262            {
 263                throw new InvalidDataException(
 264                    "The payload contains unexpected trailing bytes after the Protobuf message.");
 65            }
 3266            Debug.Assert(readResult.IsCompleted);
 3267        }
 68
 3369        return message;
 3370    }
 71
 72    /// <summary>Creates an async stream over a pipe reader to decode Protobuf messages.</summary>
 73    /// <typeparam name="T">The type of the message being decoded.</typeparam>
 74    /// <param name="reader">The pipe reader.</param>
 75    /// <param name="messageParser">The <see cref="MessageParser{T}" /> used to parse the message data.</param>
 76    /// <param name="maxMessageLength">The maximum allowed length.</param>
 77    /// <returns>The async stream to decode and return the streamed messages.</returns>
 78    /// <remarks>The reader ownership is transferred to the returned async stream. The caller should no longer use
 79    /// the reader after this call, and must dispose the returned async stream when done to release the reader.
 80    /// </remarks>
 81    internal static IAsyncStream<T> ToAsyncStream<T>(
 82        this PipeReader reader,
 83        MessageParser<T> messageParser,
 84        int maxMessageLength) where T : class, IMessage<T> =>
 23285        new AsyncStream<T>(reader, messageParser, maxMessageLength);
 86
 87    /// <summary>Reads a single Protobuf length-prefixed message from a <see cref="PipeReader" />.</summary>
 88    /// <typeparam name="T">The type of the message being decoded.</typeparam>
 89    /// <param name="reader">The pipe reader.</param>
 90    /// <param name="messageParser">The <see cref="MessageParser{T}" /> used to parse the message data.</param>
 91    /// <param name="maxMessageLength">The maximum allowed length.</param>
 92    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 93    /// <returns>The decoded message, or <see langword="null" /> when the reader's stream is empty (end of stream).
 94    /// The <see langword="null" /> return value is used as a sentinel to signal end of stream; this is why
 95    /// <typeparamref name="T" /> is constrained to be a reference type.</returns>
 96    internal static async ValueTask<T?> ReadProtobufMessageAsync<T>(
 97        this PipeReader reader,
 98        MessageParser<T> messageParser,
 99        int maxMessageLength,
 100        CancellationToken cancellationToken) where T : class, IMessage<T>
 65956101    {
 65956102        ReadResult readResult = await reader.ReadAtLeastAsync(5, cancellationToken).ConfigureAwait(false);
 103        // We never call CancelPendingRead; an interceptor or middleware can but it's not correct.
 65848104        if (readResult.IsCanceled)
 0105        {
 0106            throw new InvalidOperationException("Unexpected call to CancelPendingRead.");
 107        }
 108
 65848109        if (readResult.Buffer.IsEmpty)
 21110        {
 21111            return null;
 112        }
 113
 65827114        if (readResult.Buffer.Length < 5)
 1115        {
 1116            throw new InvalidDataException(
 1117                $"The payload has {readResult.Buffer.Length} bytes, but 5 bytes were expected.");
 118        }
 119
 65826120        byte compressionFlag = readResult.Buffer.FirstSpan[0];
 65826121        switch (compressionFlag)
 122        {
 123            case 0:
 65823124                break;
 125            case 1:
 126                // The Protobuf envelope defines flag 1 as "compressed". The message is well-formed Protobuf,
 127                // but IceRPC doesn't decompress it.
 1128                throw new NotSupportedException("IceRPC does not support Protobuf compressed messages.");
 129            default:
 130                // The Protobuf envelope only defines flags 0 and 1; any other value is malformed.
 2131                throw new InvalidDataException(
 2132                    $"Invalid Protobuf compression flag {compressionFlag}; expected 0 or 1.");
 133        }
 65823134        int messageLength = DecodeMessageLength(readResult.Buffer.Slice(1, 4), maxMessageLength);
 65821135        reader.AdvanceTo(readResult.Buffer.GetPosition(5));
 136
 65821137        readResult = await reader.ReadAtLeastAsync(messageLength, cancellationToken).ConfigureAwait(false);
 138        // We never call CancelPendingRead; an interceptor or middleware can but it's not correct.
 65821139        if (readResult.IsCanceled)
 0140        {
 0141            throw new InvalidOperationException("Unexpected call to CancelPendingRead.");
 142        }
 143
 65821144        if (readResult.Buffer.Length < messageLength)
 0145        {
 0146            throw new InvalidDataException(
 0147                $"The payload has {readResult.Buffer.Length} bytes, but {messageLength} bytes were expected.");
 148        }
 149
 150        T message;
 151        try
 65821152        {
 153            // ParseFrom reads tags until end-of-input and throws InvalidProtocolBufferException on a
 154            // truncated field, so passing an exact-length slice means it either consumes every byte or
 155            // throws.
 65821156            message = messageParser.ParseFrom(readResult.Buffer.Slice(0, messageLength));
 65819157        }
 2158        catch (InvalidProtocolBufferException exception)
 2159        {
 2160            throw new InvalidDataException("Failed to decode the Protobuf message.", exception);
 161        }
 65819162        reader.AdvanceTo(readResult.Buffer.GetPosition(messageLength));
 65819163        return message;
 164
 165        static int DecodeMessageLength(ReadOnlySequence<byte> buffer, int maxMessageLength)
 65823166        {
 65823167            Debug.Assert(buffer.Length == 4);
 65823168            Span<byte> spanBuffer = stackalloc byte[4];
 65823169            buffer.CopyTo(spanBuffer);
 170            // The Protobuf envelope encodes the length as an unsigned 32-bit big-endian integer. The cast of
 171            // maxMessageLength to uint is safe because it is a non-negative int, and covers both the > int.MaxValue
 172            // case and the > maxMessageLength case in a single comparison.
 65823173            uint messageLength = BinaryPrimitives.ReadUInt32BigEndian(spanBuffer);
 65823174            if (messageLength > (uint)maxMessageLength)
 2175            {
 2176                throw new InvalidDataException("The message length exceeds the maximum value.");
 177            }
 65821178            return (int)messageLength;
 65821179        }
 65840180    }
 181}