< Summary

Information
Class: IceRpc.Protobuf.Internal.PipeReaderExtensions
Assembly: IceRpc.Protobuf
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Protobuf/Internal/PipeReaderExtensions.cs
Tag: 275_13775359185
Line coverage
87%
Covered lines: 64
Uncovered lines: 9
Coverable lines: 73
Total lines: 147
Line coverage: 87.6%
Branch coverage
72%
Covered branches: 16
Total branches: 22
Branch coverage: 72.7%
Method coverage
100%
Covered methods: 4
Total methods: 4
Method coverage: 100%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
DecodeProtobufMessageAsync()100%11100%
ToAsyncEnumerable()75%88100%
ReadMessageAsync()71.42%18.361471.87%
DecodeMessageLength()100%11100%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Protobuf/Internal/PipeReaderExtensions.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using Google.Protobuf;
 4using System.Buffers;
 5using System.Buffers.Binary;
 6using System.Diagnostics;
 7using System.IO.Pipelines;
 8using System.Runtime.CompilerServices;
 9
 10namespace IceRpc.Protobuf.Internal;
 11
 12/// <summary>Provides extension methods for <see cref="PipeReader" />.</summary>
 13internal static class PipeReaderExtensions
 14{
 15    /// <summary>Decodes a Protobuf length prefixed message from a <see cref="PipeReader" />.</summary>
 16    /// <param name="reader">The <see cref="PipeReader" /> containing the Protobuf length prefixed message.</param>
 17    /// <param name="parser">The <see cref="MessageParser{T}" /> used to parse the message data.</param>
 18    /// <param name="maxMessageLength">The maximum allowed length.</param>
 19    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 20    /// <returns>The decoded message object.</returns>
 21    internal static async ValueTask<T> DecodeProtobufMessageAsync<T>(
 22        this PipeReader reader,
 23        MessageParser<T> parser,
 24        int maxMessageLength,
 25        CancellationToken cancellationToken) where T : IMessage<T>
 2726    {
 2727        T? message = await ReadMessageAsync(
 2728            reader,
 2729            parser,
 2730            maxMessageLength,
 2731            cancellationToken).ConfigureAwait(false);
 32
 2633        Debug.Assert(message is not null);
 2634        return message;
 2635    }
 36
 37    /// <summary>Decodes an async enumerable from a pipe reader.</summary>
 38    /// <param name="reader">The pipe reader.</param>
 39    /// <param name="messageParser">The <see cref="MessageParser{T}" /> used to parse the message data.</param>
 40    /// <param name="maxMessageLength">The maximum allowed length.</param>
 41    /// <param name="cancellationToken">The cancellation token which is provided to <see
 42    /// cref="IAsyncEnumerable{T}.GetAsyncEnumerator(CancellationToken)" />.</param>
 43    internal static async IAsyncEnumerable<T> ToAsyncEnumerable<T>(
 44        this PipeReader reader,
 45        MessageParser<T> messageParser,
 46        int maxMessageLength,
 47        [EnumeratorCancellation] CancellationToken cancellationToken = default) where T : IMessage<T>
 2348    {
 49        try
 2350        {
 6580351            while (true)
 6580352            {
 6580353                if (cancellationToken.IsCancellationRequested)
 154                {
 155                    yield break;
 56                }
 57
 58                T? message;
 59                try
 6580260                {
 6580261                    message = await ReadMessageAsync(
 6580262                        reader,
 6580263                        messageParser,
 6580264                        maxMessageLength,
 6580265                        cancellationToken).ConfigureAwait(false);
 6580066                }
 167                catch (OperationCanceledException exception) when (exception.CancellationToken == cancellationToken)
 168                {
 69                    // Canceling the cancellation token is a normal way to complete an iteration.
 170                    yield break;
 71                }
 72
 6580073                if (message is null)
 1974                {
 1975                    yield break;
 76                }
 6578177                yield return message;
 6578078            }
 79        }
 80        finally
 2381        {
 2382            reader.Complete();
 2383        }
 2284    }
 85
 86    private static async ValueTask<T?> ReadMessageAsync<T>(
 87        PipeReader reader,
 88        MessageParser<T> messageParser,
 89        int maxMessageLength,
 90        CancellationToken cancellationToken) where T : IMessage<T>
 6582991    {
 6582992        ReadResult readResult = await reader.ReadAtLeastAsync(5, cancellationToken).ConfigureAwait(false);
 93        // We never call CancelPendingRead; an interceptor or middleware can but it's not correct.
 6582894        if (readResult.IsCanceled)
 095        {
 096            throw new InvalidOperationException("Unexpected call to CancelPendingRead.");
 97        }
 98
 6582899        if (readResult.Buffer.IsEmpty)
 19100        {
 19101            return default;
 102        }
 103
 65809104        if (readResult.Buffer.Length < 5)
 1105        {
 1106            throw new InvalidDataException(
 1107                $"The payload has {readResult.Buffer.Length} bytes, but 5 bytes were expected.");
 108        }
 109
 65808110        if (readResult.Buffer.FirstSpan[0] == 1)
 0111        {
 0112            throw new NotSupportedException("IceRPC does not support Protobuf compressed messages");
 113        }
 65808114        int messageLength = DecodeMessageLength(readResult.Buffer.Slice(1, 4));
 65808115        reader.AdvanceTo(readResult.Buffer.GetPosition(5));
 65808116        if (messageLength >= maxMessageLength)
 1117        {
 1118            throw new InvalidDataException("The message length exceeds the maximum value.");
 119        }
 120
 65807121        readResult = await reader.ReadAtLeastAsync(messageLength, cancellationToken).ConfigureAwait(false);
 122        // We never call CancelPendingRead; an interceptor or middleware can but it's not correct.
 65807123        if (readResult.IsCanceled)
 0124        {
 0125            throw new InvalidOperationException("Unexpected call to CancelPendingRead.");
 126        }
 127
 65807128        if (readResult.Buffer.Length < messageLength)
 0129        {
 0130            throw new InvalidDataException(
 0131                $"The payload has {readResult.Buffer.Length} bytes, but {messageLength} bytes were expected.");
 132        }
 133
 134        // TODO: Does ParseFrom check it read all the bytes?
 65807135        T message = messageParser.ParseFrom(readResult.Buffer.Slice(0, messageLength));
 65807136        reader.AdvanceTo(readResult.Buffer.GetPosition(messageLength));
 65807137        return message;
 138
 139        static int DecodeMessageLength(ReadOnlySequence<byte> buffer)
 65808140        {
 65808141            Debug.Assert(buffer.Length == 4);
 65808142            Span<byte> spanBuffer = stackalloc byte[4];
 65808143            buffer.CopyTo(spanBuffer);
 65808144            return BinaryPrimitives.ReadInt32BigEndian(spanBuffer);
 65808145        }
 65826146    }
 147}