< Summary

Information
Class: IceRpc.Slice.Operations.PipeReaderExtensions
Assembly: IceRpc.Slice
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Slice/Operations/PipeReaderExtensions.cs
Tag: 1856_27024993493
Line coverage
88%
Covered lines: 44
Uncovered lines: 6
Coverable lines: 50
Total lines: 130
Line coverage: 88%
Branch coverage
72%
Covered branches: 13
Total branches: 18
Branch coverage: 72.2%
Method coverage
100%
Covered methods: 8
Fully covered methods: 6
Total methods: 8
Method coverage: 100%
Full method coverage: 75%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.cctor()100%11100%
CreateEmptySliceStructPayload()100%11100%
ToAsyncStream(...)75%5462.5%
DecodeBuffer()100%22100%
ReadAsync()66.66%6676.92%
ToAsyncStream(...)50%44100%
DecodeBuffer()100%22100%
ReadAsync()100%11100%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Slice/Operations/PipeReaderExtensions.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Features;
 4using IceRpc.Internal;
 5using IceRpc.Slice.Operations.Internal;
 6using System.Buffers;
 7using System.IO.Pipelines;
 8using ZeroC.Slice.Codec;
 9
 10namespace IceRpc.Slice.Operations;
 11
 12/// <summary>Provides extension methods for <see cref="PipeReader" />.</summary>
 13public static class PipeReaderExtensions
 14{
 15    // 4 = varuint62 encoding of the size (1)
 16    // 252 = varint32 encoding of the tag end marker (-1)
 117    private static readonly ReadOnlySequence<byte> _emptyStructPayload = new([4, 252]);
 18
 19    extension(PipeReader)
 20    {
 21        /// <summary>Creates a request or response payload holding an empty Slice struct.</summary>
 22        /// <returns>The payload.</returns>
 1223        public static PipeReader CreateEmptySliceStructPayload() => PipeReader.Create(_emptyStructPayload);
 24    }
 25
 26    /// <summary>Creates an async stream over a pipe reader to decode streamed elements.</summary>
 27    /// <typeparam name="T">The type of the element being decoded.</typeparam>
 28    /// <param name="reader">The pipe reader.</param>
 29    /// <param name="decodeFunc">The function used to decode the streamed member.</param>
 30    /// <param name="elementSize">The size in bytes of one element.</param>
 31    /// <param name="sliceFeature">The Slice feature to customize the decoding.</param>
 32    /// <returns>The async stream to decode and return the streamed elements.</returns>
 33    /// <exception cref="ArgumentException">Thrown if <paramref name="elementSize" /> is equal of inferior to
 34    /// <c>0</c>.</exception>
 35    /// <remarks>The reader ownership is transferred to the returned async stream. The caller should no longer use
 36    /// the reader after this call, and must dispose the returned async stream when done to release the reader.
 37    /// </remarks>
 38    public static IAsyncStream<T> ToAsyncStream<T>(
 39        this PipeReader reader,
 40        DecodeFunc<T> decodeFunc,
 41        int elementSize,
 42        ISliceFeature? sliceFeature = null)
 22243    {
 22244        if (elementSize <= 0)
 045        {
 046            reader.Complete();
 047            throw new ArgumentException("The element size must be greater than 0.", nameof(elementSize));
 48        }
 49
 22250        sliceFeature ??= SliceFeature.Default;
 22251        return new AsyncStream<T>(reader, ReadAsync, DecodeBuffer);
 52
 53        IEnumerable<T> DecodeBuffer(ReadOnlySequence<byte> buffer)
 2054        {
 55            // Since the elements are fixed-size, they can't contain service addresses hence baseProxy can remain null.
 2056            var decoder = new SliceDecoder(
 2057                buffer,
 2058                maxCollectionAllocation: sliceFeature.MaxCollectionAllocation);
 59
 2060            var items = new T[buffer.Length / elementSize];
 13143461            for (int i = 0; i < items.Length; ++i)
 6569862            {
 6569863                items[i] = decodeFunc(ref decoder);
 6569764            }
 1965            decoder.CheckEndOfBuffer();
 1966            return items;
 1967        }
 68
 69        async ValueTask<ReadResult> ReadAsync(PipeReader reader, CancellationToken cancellationToken)
 22170        {
 71            // Read the bytes for at least one element.
 72            // Note that the max number of bytes we can read in one shot is limited by the flow control of the
 73            // underlying transport.
 22174            ReadResult readResult = await reader.ReadAtLeastAsync(elementSize, cancellationToken).ConfigureAwait(false);
 75
 76            // Check if the buffer contains extra bytes that we need to remove.
 2277            ReadOnlySequence<byte> buffer = readResult.Buffer;
 2278            if (elementSize > 1 && buffer.Length > elementSize)
 1979            {
 1980                long extra = buffer.Length % elementSize;
 1981                if (extra > 0)
 082                {
 083                    buffer = buffer.Slice(0, buffer.Length - extra);
 084                    return new ReadResult(buffer, isCanceled: readResult.IsCanceled, isCompleted: false);
 85                }
 1986            }
 87
 88            // Return the read result as-is.
 2289            return readResult;
 2290        }
 22291    }
 92
 93    /// <summary>Creates an async stream over a pipe reader to decode variable size streamed elements.</summary>
 94    /// <typeparam name="T">The stream element type.</typeparam>
 95    /// <param name="reader">The pipe reader.</param>
 96    /// <param name="decodeFunc">The function used to decode the streamed member.</param>
 97    /// <param name="sender">The proxy that sent the request, if applicable.</param>
 98    /// <param name="sliceFeature">The slice feature to customize the decoding.</param>
 99    /// <returns>The async stream to decode and return the streamed members.</returns>
 100    /// <remarks>The reader ownership is transferred to the returned async stream. The caller should no longer use
 101    /// the reader after this call, and must dispose the returned async stream when done to release the reader.
 102    /// </remarks>
 103    public static IAsyncStream<T> ToAsyncStream<T>(
 104        this PipeReader reader,
 105        DecodeFunc<T> decodeFunc,
 106        ISliceProxy? sender = null,
 107        ISliceFeature? sliceFeature = null)
 13108    {
 13109        sliceFeature ??= SliceFeature.Default;
 13110        ISliceProxy? baseProxy = sliceFeature.BaseProxy ?? sender;
 13111        return new AsyncStream<T>(reader, ReadAsync, DecodeBuffer);
 112
 113        IEnumerable<T> DecodeBuffer(ReadOnlySequence<byte> buffer)
 13114        {
 13115            var decoder = new SliceDecoder(buffer, baseProxy, sliceFeature.MaxCollectionAllocation);
 116
 13117            var items = new List<T>();
 118            do
 65679119            {
 65679120                items.Add(decodeFunc(ref decoder));
 65678121            }
 65678122            while (decoder.Consumed < buffer.Length);
 123
 12124            return items;
 12125        }
 126
 127        ValueTask<ReadResult> ReadAsync(PipeReader reader, CancellationToken cancellationToken) =>
 14128            reader.ReadSliceSegmentAsync(sliceFeature.MaxSegmentSize, cancellationToken);
 13129    }
 130}