< Summary

Information
Class: IceRpc.Slice.PipeReaderExtensions
Assembly: IceRpc.Slice
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Slice/PipeReaderExtensions.cs
Tag: 275_13775359185
Line coverage
90%
Covered lines: 76
Uncovered lines: 8
Coverable lines: 84
Total lines: 188
Line coverage: 90.4%
Branch coverage
81%
Covered branches: 26
Total branches: 32
Branch coverage: 81.2%
Method coverage
100%
Covered methods: 7
Total methods: 7
Method coverage: 100%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
ToAsyncEnumerable(...)75%4.84462.5%
DecodeBuffer()100%22100%
ReadAsync()66.66%6.44676.92%
ToAsyncEnumerable(...)100%44100%
DecodeBuffer()100%22100%
ReadAsync()100%11100%
ToAsyncEnumerable()78.57%14.041494.11%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Slice/PipeReaderExtensions.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Internal;
 4using System.Buffers;
 5using System.Diagnostics;
 6using System.IO.Pipelines;
 7using System.Runtime.CompilerServices;
 8using ZeroC.Slice;
 9
 10namespace IceRpc.Slice;
 11
 12/// <summary>Provides extension methods for <see cref="PipeReader" /> to decode streamed elements.</summary>
 13public static class PipeReaderExtensions
 14{
 15    /// <summary>Creates an async enumerable over a pipe reader to decode streamed elements.</summary>
 16    /// <typeparam name="T">The type of the element being decoded.</typeparam>
 17    /// <param name="reader">The pipe reader.</param>
 18    /// <param name="encoding">The Slice encoding version.</param>
 19    /// <param name="decodeFunc">The function used to decode the streamed member.</param>
 20    /// <param name="elementSize">The size in bytes of one element.</param>
 21    /// <param name="sliceFeature">The Slice feature to customize the decoding.</param>
 22    /// <returns>The async enumerable to decode and return the streamed elements.</returns>
 23    /// <exception cref="ArgumentException">Thrown if <paramref name="elementSize" /> is equal of inferior to
 24    /// <c>0</c>.</exception>
 25    /// <remarks>The reader ownership is transferred to the returned async enumerable. The caller should no longer use
 26    /// the reader after this call.</remarks>
 27    public static IAsyncEnumerable<T> ToAsyncEnumerable<T>(
 28        this PipeReader reader,
 29        SliceEncoding encoding,
 30        DecodeFunc<T> decodeFunc,
 31        int elementSize,
 32        ISliceFeature? sliceFeature = null)
 1333    {
 1334        if (elementSize <= 0)
 035        {
 036            reader.Complete();
 037            throw new ArgumentException("The element size must be greater than 0.", nameof(elementSize));
 38        }
 39
 1340        sliceFeature ??= SliceFeature.Default;
 1341        return reader.ToAsyncEnumerable(ReadAsync, DecodeBuffer);
 42
 43        IEnumerable<T> DecodeBuffer(ReadOnlySequence<byte> buffer)
 1844        {
 45            // Since the elements are fixed-size, they can't contain service addresses hence baseProxy can remain null.
 1846            var decoder = new SliceDecoder(
 1847                buffer,
 1848                encoding,
 1849                maxCollectionAllocation: sliceFeature.MaxCollectionAllocation,
 1850                maxDepth: sliceFeature.MaxDepth);
 51
 1852            var items = new T[buffer.Length / elementSize];
 13141853            for (int i = 0; i < items.Length; ++i)
 6569254            {
 6569255                items[i] = decodeFunc(ref decoder);
 6569156            }
 1757            decoder.CheckEndOfBuffer();
 1758            return items;
 1759        }
 60
 61        async ValueTask<ReadResult> ReadAsync(PipeReader reader, CancellationToken cancellationToken)
 2162        {
 63            // Read the bytes for at least one element.
 64            // Note that the max number of bytes we can read in one shot is limited by the flow control of the
 65            // underlying transport.
 2166            ReadResult readResult = await reader.ReadAtLeastAsync(elementSize, cancellationToken).ConfigureAwait(false);
 67
 68            // Check if the buffer contains extra bytes that we need to remove.
 2069            ReadOnlySequence<byte> buffer = readResult.Buffer;
 2070            if (elementSize > 1 && buffer.Length > elementSize)
 1771            {
 1772                long extra = buffer.Length % elementSize;
 1773                if (extra > 0)
 074                {
 075                    buffer = buffer.Slice(0, buffer.Length - extra);
 076                    return new ReadResult(buffer, isCanceled: readResult.IsCanceled, isCompleted: false);
 77                }
 1778            }
 79
 80            // Return the read result as-is.
 2081            return readResult;
 2082        }
 1383    }
 84
 85    /// <summary>Creates an async enumerable over a pipe reader to decode variable size streamed elements.</summary>
 86    /// <typeparam name="T">The stream element type.</typeparam>
 87    /// <param name="reader">The pipe reader.</param>
 88    /// <param name="encoding">The Slice encoding version.</param>
 89    /// <param name="decodeFunc">The function used to decode the streamed member.</param>
 90    /// <param name="sender">The proxy that sent the request, if applicable.</param>
 91    /// <param name="sliceFeature">The slice feature to customize the decoding.</param>
 92    /// <returns>The async enumerable to decode and return the streamed members.</returns>
 93    /// <remarks>The reader ownership is transferred to the returned async enumerable. The caller should no longer use
 94    /// the reader after this call.</remarks>
 95    public static IAsyncEnumerable<T> ToAsyncEnumerable<T>(
 96        this PipeReader reader,
 97        SliceEncoding encoding,
 98        DecodeFunc<T> decodeFunc,
 99        IProxy? sender = null,
 100        ISliceFeature? sliceFeature = null)
 13101    {
 13102        sliceFeature ??= SliceFeature.Default;
 13103        IProxy? baseProxy = sliceFeature.BaseProxy ?? sender;
 13104        return reader.ToAsyncEnumerable(ReadAsync, DecodeBuffer);
 105
 106        IEnumerable<T> DecodeBuffer(ReadOnlySequence<byte> buffer)
 13107        {
 108            // No activator or max depth since streams are Slice2+.
 13109            var decoder = new SliceDecoder(buffer, encoding, baseProxy, sliceFeature.MaxCollectionAllocation);
 110
 13111            var items = new List<T>();
 112            do
 65679113            {
 65679114                items.Add(decodeFunc(ref decoder));
 65678115            }
 65678116            while (decoder.Consumed < buffer.Length);
 117
 12118            return items;
 12119        }
 120
 121        ValueTask<ReadResult> ReadAsync(PipeReader reader, CancellationToken cancellationToken) =>
 14122            reader.ReadSegmentAsync(encoding, sliceFeature.MaxSegmentSize, cancellationToken);
 13123    }
 124
 125    /// <summary>Decodes an async enumerable from a pipe reader.</summary>
 126    /// <param name="reader">The pipe reader.</param>
 127    /// <param name="readFunc">The function used to read enough data to decode elements returned by the
 128    /// enumerable.</param>
 129    /// <param name="decodeBufferFunc">The function used to decode an element.</param>
 130    /// <param name="cancellationToken">The cancellation token which is provided to <see
 131    /// cref="IAsyncEnumerable{T}.GetAsyncEnumerator(CancellationToken)" />.</param>
 132    private static async IAsyncEnumerable<T> ToAsyncEnumerable<T>(
 133        this PipeReader reader,
 134        Func<PipeReader, CancellationToken, ValueTask<ReadResult>> readFunc,
 135        Func<ReadOnlySequence<byte>, IEnumerable<T>> decodeBufferFunc,
 136        [EnumeratorCancellation] CancellationToken cancellationToken = default)
 25137    {
 138        try
 25139        {
 35140            while (true)
 35141            {
 142                ReadResult readResult;
 143
 144                try
 35145                {
 35146                    readResult = await readFunc(reader, cancellationToken).ConfigureAwait(false);
 147
 34148                    if (readResult.IsCanceled)
 0149                    {
 150                        // We never call CancelPendingRead; an interceptor or middleware can but it's not correct.
 0151                        throw new InvalidOperationException("Unexpected call to CancelPendingRead.");
 152                    }
 34153                    if (readResult.Buffer.IsEmpty)
 3154                    {
 3155                        Debug.Assert(readResult.IsCompleted);
 3156                        yield break;
 157                    }
 31158                }
 1159                catch (OperationCanceledException exception) when (exception.CancellationToken == cancellationToken)
 1160                {
 161                    // Canceling the cancellation token is a normal way to complete an iteration.
 1162                    yield break;
 163                }
 164
 31165                IEnumerable<T> elements = decodeBufferFunc(readResult.Buffer);
 29166                reader.AdvanceTo(readResult.Buffer.End);
 167
 262817168                foreach (T item in elements)
 131366169                {
 131366170                    if (cancellationToken.IsCancellationRequested)
 1171                    {
 1172                        yield break;
 173                    }
 131365174                    yield return item;
 131364175                }
 176
 27177                if (readResult.IsCompleted)
 17178                {
 17179                    yield break;
 180                }
 10181            }
 182        }
 183        finally
 25184        {
 25185            reader.Complete();
 25186        }
 23187    }
 188}