< Summary

Information
Class: IceRpc.Slice.Operations.PipeReaderExtensions
Assembly: IceRpc.Slice
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Slice/Operations/PipeReaderExtensions.cs
Tag: 1321_24790053727
Line coverage
90%
Covered lines: 76
Uncovered lines: 8
Coverable lines: 84
Total lines: 193
Line coverage: 90.4%
Branch coverage
81%
Covered branches: 26
Total branches: 32
Branch coverage: 81.2%
Method coverage
100%
Covered methods: 9
Fully covered methods: 6
Total methods: 9
Method coverage: 100%
Full method coverage: 66.6%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.cctor()100%11100%
CreateEmptySliceStructPayload()100%11100%
ToAsyncEnumerable(...)75%5462.5%
DecodeBuffer()100%22100%
ReadAsync()66.66%6676.92%
ToAsyncEnumerable(...)100%44100%
DecodeBuffer()100%22100%
ReadAsync()100%11100%
ToAsyncEnumerable()78.57%141494.11%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Slice/Operations/PipeReaderExtensions.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Features;
 4using IceRpc.Internal;
 5using System.Buffers;
 6using System.Diagnostics;
 7using System.IO.Pipelines;
 8using System.Runtime.CompilerServices;
 9using ZeroC.Slice.Codec;
 10
 11namespace IceRpc.Slice.Operations;
 12
 13/// <summary>Provides extension methods for <see cref="PipeReader" />.</summary>
 14public static class PipeReaderExtensions
 15{
 16    // 4 = varuint62 encoding of the size (1)
 17    // 252 = varint32 encoding of the tag end marker (-1)
 118    private static readonly ReadOnlySequence<byte> _emptyStructPayload = new([4, 252]);
 19
 20    extension(PipeReader)
 21    {
 22        /// <summary>Creates a request or response payload holding an empty Slice struct.</summary>
 23        /// <returns>The payload.</returns>
 1224        public static PipeReader CreateEmptySliceStructPayload() => PipeReader.Create(_emptyStructPayload);
 25    }
 26
 27    /// <summary>Creates an async enumerable over a pipe reader to decode streamed elements.</summary>
 28    /// <typeparam name="T">The type of the element being decoded.</typeparam>
 29    /// <param name="reader">The pipe reader.</param>
 30    /// <param name="decodeFunc">The function used to decode the streamed member.</param>
 31    /// <param name="elementSize">The size in bytes of one element.</param>
 32    /// <param name="sliceFeature">The Slice feature to customize the decoding.</param>
 33    /// <returns>The async enumerable to decode and return the streamed elements.</returns>
 34    /// <exception cref="ArgumentException">Thrown if <paramref name="elementSize" /> is equal of inferior to
 35    /// <c>0</c>.</exception>
 36    /// <remarks>The reader ownership is transferred to the returned async enumerable. The caller should no longer use
 37    /// the reader after this call.</remarks>
 38    public static IAsyncEnumerable<T> ToAsyncEnumerable<T>(
 39        this PipeReader reader,
 40        DecodeFunc<T> decodeFunc,
 41        int elementSize,
 42        ISliceFeature? sliceFeature = null)
 1343    {
 1344        if (elementSize <= 0)
 045        {
 046            reader.Complete();
 047            throw new ArgumentException("The element size must be greater than 0.", nameof(elementSize));
 48        }
 49
 1350        sliceFeature ??= SliceFeature.Default;
 1351        return reader.ToAsyncEnumerable(ReadAsync, DecodeBuffer);
 52
 53        IEnumerable<T> DecodeBuffer(ReadOnlySequence<byte> buffer)
 1854        {
 55            // Since the elements are fixed-size, they can't contain service addresses hence baseProxy can remain null.
 1856            var decoder = new SliceDecoder(
 1857                buffer,
 1858                maxCollectionAllocation: sliceFeature.MaxCollectionAllocation);
 59
 1860            var items = new T[buffer.Length / elementSize];
 13141861            for (int i = 0; i < items.Length; ++i)
 6569262            {
 6569263                items[i] = decodeFunc(ref decoder);
 6569164            }
 1765            decoder.CheckEndOfBuffer();
 1766            return items;
 1767        }
 68
 69        async ValueTask<ReadResult> ReadAsync(PipeReader reader, CancellationToken cancellationToken)
 2170        {
 71            // Read the bytes for at least one element.
 72            // Note that the max number of bytes we can read in one shot is limited by the flow control of the
 73            // underlying transport.
 2174            ReadResult readResult = await reader.ReadAtLeastAsync(elementSize, cancellationToken).ConfigureAwait(false);
 75
 76            // Check if the buffer contains extra bytes that we need to remove.
 2077            ReadOnlySequence<byte> buffer = readResult.Buffer;
 2078            if (elementSize > 1 && buffer.Length > elementSize)
 1779            {
 1780                long extra = buffer.Length % elementSize;
 1781                if (extra > 0)
 082                {
 083                    buffer = buffer.Slice(0, buffer.Length - extra);
 084                    return new ReadResult(buffer, isCanceled: readResult.IsCanceled, isCompleted: false);
 85                }
 1786            }
 87
 88            // Return the read result as-is.
 2089            return readResult;
 2090        }
 1391    }
 92
 93    /// <summary>Creates an async enumerable over a pipe reader to decode variable size streamed elements.</summary>
 94    /// <typeparam name="T">The stream element type.</typeparam>
 95    /// <param name="reader">The pipe reader.</param>
 96    /// <param name="decodeFunc">The function used to decode the streamed member.</param>
 97    /// <param name="sender">The proxy that sent the request, if applicable.</param>
 98    /// <param name="sliceFeature">The slice feature to customize the decoding.</param>
 99    /// <returns>The async enumerable to decode and return the streamed members.</returns>
 100    /// <remarks>The reader ownership is transferred to the returned async enumerable. The caller should no longer use
 101    /// the reader after this call.</remarks>
 102    public static IAsyncEnumerable<T> ToAsyncEnumerable<T>(
 103        this PipeReader reader,
 104        DecodeFunc<T> decodeFunc,
 105        ISliceProxy? sender = null,
 106        ISliceFeature? sliceFeature = null)
 13107    {
 13108        sliceFeature ??= SliceFeature.Default;
 13109        ISliceProxy? baseProxy = sliceFeature.BaseProxy ?? sender;
 13110        return reader.ToAsyncEnumerable(ReadAsync, DecodeBuffer);
 111
 112        IEnumerable<T> DecodeBuffer(ReadOnlySequence<byte> buffer)
 13113        {
 13114            var decoder = new SliceDecoder(buffer, baseProxy, sliceFeature.MaxCollectionAllocation);
 115
 13116            var items = new List<T>();
 117            do
 65679118            {
 65679119                items.Add(decodeFunc(ref decoder));
 65678120            }
 65678121            while (decoder.Consumed < buffer.Length);
 122
 12123            return items;
 12124        }
 125
 126        ValueTask<ReadResult> ReadAsync(PipeReader reader, CancellationToken cancellationToken) =>
 14127            reader.ReadSliceSegmentAsync(sliceFeature.MaxSegmentSize, cancellationToken);
 13128    }
 129
 130    /// <summary>Decodes an async enumerable from a pipe reader.</summary>
 131    /// <param name="reader">The pipe reader.</param>
 132    /// <param name="readFunc">The function used to read enough data to decode elements returned by the
 133    /// enumerable.</param>
 134    /// <param name="decodeBufferFunc">The function used to decode an element.</param>
 135    /// <param name="cancellationToken">The cancellation token which is provided to <see
 136    /// cref="IAsyncEnumerable{T}.GetAsyncEnumerator(CancellationToken)" />.</param>
 137    private static async IAsyncEnumerable<T> ToAsyncEnumerable<T>(
 138        this PipeReader reader,
 139        Func<PipeReader, CancellationToken, ValueTask<ReadResult>> readFunc,
 140        Func<ReadOnlySequence<byte>, IEnumerable<T>> decodeBufferFunc,
 141        [EnumeratorCancellation] CancellationToken cancellationToken = default)
 25142    {
 143        try
 25144        {
 35145            while (true)
 35146            {
 147                ReadResult readResult;
 148
 149                try
 35150                {
 35151                    readResult = await readFunc(reader, cancellationToken).ConfigureAwait(false);
 152
 34153                    if (readResult.IsCanceled)
 0154                    {
 155                        // We never call CancelPendingRead; an interceptor or middleware can but it's not correct.
 0156                        throw new InvalidOperationException("Unexpected call to CancelPendingRead.");
 157                    }
 34158                    if (readResult.Buffer.IsEmpty)
 3159                    {
 3160                        Debug.Assert(readResult.IsCompleted);
 3161                        yield break;
 162                    }
 31163                }
 1164                catch (OperationCanceledException exception) when (exception.CancellationToken == cancellationToken)
 1165                {
 166                    // Canceling the cancellation token is a normal way to complete an iteration.
 1167                    yield break;
 168                }
 169
 31170                IEnumerable<T> elements = decodeBufferFunc(readResult.Buffer);
 29171                reader.AdvanceTo(readResult.Buffer.End);
 172
 262817173                foreach (T item in elements)
 131366174                {
 131366175                    if (cancellationToken.IsCancellationRequested)
 1176                    {
 1177                        yield break;
 178                    }
 131365179                    yield return item;
 131364180                }
 181
 27182                if (readResult.IsCompleted)
 17183                {
 17184                    yield break;
 185                }
 10186            }
 187        }
 188        finally
 25189        {
 25190            reader.Complete();
 25191        }
 23192    }
 193}