< Summary

Information
Class: IceRpc.Slice.AsyncEnumerableExtensions
Assembly: IceRpc.Slice
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Slice/AsyncEnumerableExtensions.cs
Tag: 275_13775359185
Line coverage
93%
Covered lines: 124
Uncovered lines: 8
Coverable lines: 132
Total lines: 243
Line coverage: 93.9%
Branch coverage
91%
Covered branches: 31
Total branches: 34
Branch coverage: 91.1%
Method coverage
80%
Covered methods: 8
Total methods: 10
Method coverage: 80%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
ToPipeReader(...)100%11100%
.ctor(...)75%4.02489.47%
AdvanceTo(...)100%11100%
AdvanceTo(...)100%210%
CancelPendingRead()100%11100%
Complete(...)100%22100%
DisposeEnumeratorAsync()100%2291.66%
ReadAsync()85.71%14.081492.5%
EncodeElements()100%1212100%
TryRead(...)100%210%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Slice/AsyncEnumerableExtensions.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using System.Diagnostics;
 4using System.IO.Pipelines;
 5using ZeroC.Slice;
 6
 7namespace IceRpc.Slice;
 8
 9/// <summary>Provides an extension method for <see cref="IAsyncEnumerable{T}" /> to encode elements into a <see
 10/// cref="PipeReader"/>.</summary>
 11public static class AsyncEnumerableExtensions
 12{
 13    /// <summary>Encodes an async enumerable into a stream of bytes represented by a <see cref="PipeReader"/>.</summary>
 14    /// <typeparam name="T">The async enumerable element type.</typeparam>
 15    /// <param name="asyncEnumerable">The async enumerable to encode into a stream of bytes.</param>
 16    /// <param name="encodeAction">The action used to encode one element.</param>
 17    /// <param name="useSegments"><see langword="true" /> if an element can be encoded on a variable number of bytes;
 18    /// otherwise, <see langword="false" />.</param>
 19    /// <param name="encoding">The Slice encoding to use.</param>
 20    /// <param name="encodeOptions">The Slice encode options.</param>
 21    /// <returns>A pipe reader that represents the encoded stream of bytes.</returns>
 22    /// <remarks>This extension method is used to encode streaming parameters and streaming return values with the
 23    /// Slice2 encoding.</remarks>
 24    public static PipeReader ToPipeReader<T>(
 25        this IAsyncEnumerable<T> asyncEnumerable,
 26        EncodeAction<T> encodeAction,
 27        bool useSegments,
 28        SliceEncoding encoding = SliceEncoding.Slice2,
 29        SliceEncodeOptions? encodeOptions = null) =>
 2530        new AsyncEnumerablePipeReader<T>(
 2531            asyncEnumerable,
 2532            encodeAction,
 2533            useSegments,
 2534            encoding,
 2535            encodeOptions);
 36
 37    // Overriding ReadAtLeastAsyncCore or CopyToAsync methods for this reader is not critical since this reader is
 38    // mostly used by the IceRPC core to copy the encoded data for the enumerable to the network stream. This copy
 39    // doesn't use these methods.
 40#pragma warning disable CA1001 // Types that own disposable fields should be disposable.
 41    private class AsyncEnumerablePipeReader<T> : PipeReader
 42#pragma warning restore CA1001
 43    {
 44        // Disposed in Complete.
 45        private readonly IAsyncEnumerator<T> _asyncEnumerator;
 46
 47        // We don't dispose _cts because it's not necessary
 48        // (see https://github.com/dotnet/runtime/issues/29970#issuecomment-717840778) and we can't easily dispose it
 49        // when no one is using it since CancelPendingRead can be called by another thread after Complete is called.
 2550        private readonly CancellationTokenSource _cts = new();
 51        private readonly EncodeAction<T> _encodeAction;
 52        private readonly SliceEncoding _encoding;
 53        private bool _isCompleted;
 54        private readonly bool _useSegments;
 55        private readonly int _streamFlushThreshold;
 56        private Task<bool>? _moveNext;
 57        private readonly Pipe _pipe;
 58
 9659        public override void AdvanceTo(SequencePosition consumed) => _pipe.Reader.AdvanceTo(consumed);
 60
 61        public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) =>
 062            _pipe.Reader.AdvanceTo(consumed, examined);
 63
 64        public override void CancelPendingRead()
 165        {
 166            _pipe.Reader.CancelPendingRead();
 167            _cts.Cancel();
 168        }
 69
 70        public override void Complete(Exception? exception = null)
 2671        {
 2672            if (!_isCompleted)
 1973            {
 1974                _isCompleted = true;
 75
 76                // Cancel MoveNextAsync if it's still running.
 1977                _cts.Cancel();
 78
 1979                _pipe.Reader.Complete();
 1980                _pipe.Writer.Complete();
 81
 1982                _ = DisposeEnumeratorAsync();
 1983            }
 84
 85            async Task DisposeEnumeratorAsync()
 1986            {
 87                // Make sure MoveNextAsync is completed before disposing the enumerator. Calling DisposeAsync on the
 88                // enumerator while MoveNextAsync is still running is disallowed.
 1989                if (_moveNext is not null)
 290                {
 91                    try
 292                    {
 293                        _ = await _moveNext.ConfigureAwait(false);
 094                    }
 295                    catch
 296                    {
 297                    }
 298                }
 1999                await _asyncEnumerator.DisposeAsync().ConfigureAwait(false);
 19100            }
 26101        }
 102
 103        public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default)
 99104        {
 99105            if (!_pipe.Reader.TryRead(out ReadResult readResult))
 99106            {
 107                // If no more buffered data to read, fill the pipe with new data.
 108
 109                // If ReadAsync is canceled, cancel the enumerator iteration to ensure MoveNextAsync below completes.
 99110                using CancellationTokenRegistration registration = cancellationToken.UnsafeRegister(
 1111                    cts => ((CancellationTokenSource)cts!).Cancel(),
 99112                    _cts);
 113
 114                bool hasNext;
 115                try
 99116                {
 99117                    if (_moveNext is null)
 24118                    {
 24119                        hasNext = await _asyncEnumerator.MoveNextAsync().ConfigureAwait(false);
 22120                    }
 121                    else
 75122                    {
 75123                        hasNext = await _moveNext.ConfigureAwait(false);
 74124                        _moveNext = null;
 74125                    }
 126
 96127                    if (hasNext && EncodeElements() is Task<bool> moveNext)
 76128                    {
 129                        // Flush does not block because the pipe is configured to not pause flush.
 76130                        ValueTask<FlushResult> valueTask = _pipe.Writer.FlushAsync(CancellationToken.None);
 76131                        Debug.Assert(valueTask.IsCompletedSuccessfully);
 132
 76133                        _moveNext = moveNext;
 134                        // And the next ReadAsync will await _moveNext.
 76135                    }
 136                    else
 20137                    {
 138                        // No need to flush the writer, complete takes care of it.
 20139                        _pipe.Writer.Complete();
 20140                    }
 141
 142                    // There are bytes in the reader or it's completed since we've just flushed or completed the writer.
 96143                    bool ok = _pipe.Reader.TryRead(out readResult);
 96144                    Debug.Assert(ok);
 96145                }
 2146                catch (OperationCanceledException exception)
 2147                {
 2148                    Debug.Assert(exception.CancellationToken == _cts.Token);
 2149                    cancellationToken.ThrowIfCancellationRequested();
 150
 1151                    if (_pipe.Reader.TryRead(out readResult) && readResult.IsCanceled)
 1152                    {
 153                        // Ok: return canceled readResult once after calling CancelPendingRead.
 154                        // Note that we can't return a canceled read result with a bogus buffer since the caller must
 155                        // be able to call reader.AdvanceTo with this buffer.
 1156                    }
 157                    else
 0158                    {
 0159                        throw new NotSupportedException(
 0160                            "Cannot resume reading an AsyncEnumerablePipeReader after canceling a ReadAsync or calling C
 161                    }
 1162                }
 97163            }
 164
 97165            return readResult;
 166
 167            Task<bool>? EncodeElements()
 94168            {
 94169                var encoder = new SliceEncoder(_pipe.Writer, _encoding);
 170
 94171                Span<byte> sizePlaceholder = default;
 94172                if (_useSegments)
 59173                {
 59174                    sizePlaceholder = encoder.GetPlaceholderSpan(4);
 59175                }
 176
 94177                Task<bool>? result = null;
 178                bool keepEncoding;
 179
 180                do
 131337181                {
 131337182                    _encodeAction(ref encoder, _asyncEnumerator.Current);
 131337183                    ValueTask<bool> moveNext = _asyncEnumerator.MoveNextAsync();
 184
 131337185                    if (moveNext.IsCompletedSuccessfully)
 131323186                    {
 131323187                        bool hasNext = moveNext.Result;
 188
 189                        // If we reached the stream flush threshold, it's time to flush.
 131323190                        if (encoder.EncodedByteCount - sizePlaceholder.Length >= _streamFlushThreshold)
 63191                        {
 63192                            result = hasNext ? moveNext.AsTask() : null;
 63193                            keepEncoding = false;
 63194                        }
 195                        else
 131260196                        {
 131260197                            keepEncoding = hasNext;
 131260198                        }
 131323199                    }
 200                    else
 14201                    {
 202                        // If we can't get the next element synchronously, we return the move next task and end the loop
 203                        // to flush the encoded elements.
 14204                        result = moveNext.AsTask();
 14205                        keepEncoding = false;
 14206                    }
 131337207                }
 131337208                while (keepEncoding);
 209
 94210                if (_useSegments)
 59211                {
 59212                    SliceEncoder.EncodeVarUInt62(
 59213                        (ulong)(encoder.EncodedByteCount - sizePlaceholder.Length),
 59214                        sizePlaceholder);
 59215                }
 94216                return result;
 94217            }
 97218        }
 219
 0220        public override bool TryRead(out ReadResult result) => _pipe.Reader.TryRead(out result);
 221
 25222        internal AsyncEnumerablePipeReader(
 25223            IAsyncEnumerable<T> asyncEnumerable,
 25224            EncodeAction<T> encodeAction,
 25225            bool useSegments,
 25226            SliceEncoding encoding,
 25227            SliceEncodeOptions? encodeOptions)
 25228        {
 25229            if (encoding == SliceEncoding.Slice1)
 0230            {
 0231                throw new NotSupportedException("Streaming is not supported by the Slice1 encoding.");
 232            }
 233
 25234            encodeOptions ??= SliceEncodeOptions.Default;
 25235            _pipe = new Pipe(encodeOptions.PipeOptions);
 25236            _streamFlushThreshold = encodeOptions.StreamFlushThreshold;
 25237            _encodeAction = encodeAction;
 25238            _encoding = encoding;
 25239            _useSegments = useSegments;
 25240            _asyncEnumerator = asyncEnumerable.GetAsyncEnumerator(_cts.Token);
 25241        }
 242    }
 243}