< Summary

Information
Class: IceRpc.Slice.Operations.Internal.AsyncStream<T>
Assembly: IceRpc.Slice
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Slice/Operations/Internal/AsyncStream.cs
Tag: 1856_27024993493
Line coverage
97%
Covered lines: 67
Uncovered lines: 2
Coverable lines: 69
Total lines: 164
Line coverage: 97.1%
Branch coverage
92%
Covered branches: 13
Total branches: 14
Branch coverage: 92.8%
Method coverage
100%
Covered methods: 4
Fully covered methods: 3
Total methods: 4
Method coverage: 100%
Full method coverage: 75%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.ctor(...)100%11100%
Dispose()100%44100%
GetAsyncEnumerator(...)100%22100%
EnumerateAsync()87.5%8890.47%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Slice/Operations/Internal/AsyncStream.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using System.Buffers;
 4using System.Diagnostics;
 5using System.IO.Pipelines;
 6using System.Runtime.CompilerServices;
 7
 8namespace IceRpc.Slice.Operations.Internal;
 9
 10/// <summary>The default <see cref="IAsyncStream{T}" /> implementation. It wraps a <see cref="PipeReader" /> and
 11/// decodes its bytes into elements of type <typeparamref name="T"/> using a read function and a decode function.
 12/// </summary>
 13internal sealed class AsyncStream<T> : IAsyncStream<T>
 14{
 15    private readonly PipeReader _reader;
 16    private readonly Func<PipeReader, CancellationToken, ValueTask<ReadResult>> _readFunc;
 17    private readonly Func<ReadOnlySequence<byte>, IEnumerable<T>> _decodeBufferFunc;
 18
 19    // Canceled by Dispose when iteration has started, to unblock any pending ReadAsync.
 23520    private readonly CancellationTokenSource _disposeCts = new();
 21
 22    // Set when GetAsyncEnumerator is called. This enforces the single-enumerator contract even if the created
 23    // enumerator is never advanced.
 24    private bool _enumeratorCreated;
 25
 26    // Atomic state used to safely arbitrate ownership of _reader.Complete() between Dispose and the first
 27    // MoveNextAsync.
 28    private int _state;
 29
 30    public void Dispose()
 22331    {
 22332        int original = Interlocked.Exchange(ref _state, (int)State.Disposed);
 33
 22334        switch ((State)original)
 35        {
 36            case State.Initial:
 37                // No iteration could have started (and any future MoveNextAsync will see Disposed and throw).
 38                // Safe to complete the reader directly from this thread.
 939                _reader.Complete();
 940                _disposeCts.Dispose();
 941                break;
 42
 43            case State.Iterating:
 44                // The iterator owns the reader; its finally will complete it. We only signal cancellation here.
 45                // We must not dispose _disposeCts here: a linked CTS inside the iterator may still hold a
 46                // registration on _disposeCts.Token.
 21247                _disposeCts.Cancel();
 21248                break;
 49
 50            case State.Disposed:
 51                // no-op (Dispose called more than once).
 252                break;
 53        }
 22354    }
 55
 56    public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken)
 23357    {
 58        // We don't check for Disposed here: if the stream was disposed, the first MoveNextAsync call on the
 59        // returned enumerator throws ObjectDisposedException (see EnumerateAsync).
 23360        if (_enumeratorCreated)
 161        {
 162            throw new InvalidOperationException($"An {nameof(IAsyncStream<T>)} can only be enumerated once.");
 63        }
 23264        _enumeratorCreated = true;
 23265        return EnumerateAsync(cancellationToken).GetAsyncEnumerator(cancellationToken);
 23266    }
 67
 23568    internal AsyncStream(
 23569        PipeReader reader,
 23570        Func<PipeReader, CancellationToken, ValueTask<ReadResult>> readFunc,
 23571        Func<ReadOnlySequence<byte>, IEnumerable<T>> decodeBufferFunc)
 23572    {
 23573        _reader = reader;
 23574        _readFunc = readFunc;
 23575        _decodeBufferFunc = decodeBufferFunc;
 23576    }
 77
 78    private async IAsyncEnumerable<T> EnumerateAsync([EnumeratorCancellation] CancellationToken cancellationToken)
 23079    {
 80        // Because this async method returns an IAsyncEnumerable<T>, it only starts executing when the caller starts
 81        // iterating (calls MoveNextAsync on the enumerator). It does not execute when EnumerateAsync is called, or
 82        // even when GetAsyncEnumerator is called on the returned IAsyncEnumerable<T>.
 83
 84        // Atomically claim the reader (Idle -> Iterating). This races with Dispose's atomic transition to Disposed;
 85        // whichever transition wins from Idle owns _reader.Complete().
 23086        int original = Interlocked.CompareExchange(ref _state, (int)State.Iterating, (int)State.Initial);
 23087        ObjectDisposedException.ThrowIf(original == (int)State.Disposed, this);
 22588        Debug.Assert(original == (int)State.Initial); // _enumeratorCreated forbids a second iteration.
 89
 90        // Link the caller-provided token with our internal dispose token so that Dispose can unblock ReadAsync.
 22591        using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource(
 22592            cancellationToken,
 22593            _disposeCts.Token);
 22594        CancellationToken linkedToken = linkedCts.Token;
 95
 96        try
 22597        {
 23598            while (true)
 23599            {
 100                ReadResult readResult;
 101
 102                try
 235103                {
 235104                    readResult = await _readFunc(_reader, linkedToken).ConfigureAwait(false);
 105
 36106                    if (readResult.IsCanceled)
 0107                    {
 108                        // We never call CancelPendingRead; an interceptor or middleware can but it's not correct.
 0109                        throw new InvalidOperationException("Unexpected call to CancelPendingRead.");
 110                    }
 36111                    if (readResult.Buffer.IsEmpty)
 3112                    {
 3113                        Debug.Assert(readResult.IsCompleted);
 3114                        yield break;
 115                    }
 33116                }
 199117                catch (OperationCanceledException) when (linkedToken.IsCancellationRequested)
 199118                {
 119                    // Re-issue the cancellation with the caller's token so the OCE that propagates carries the
 120                    // token the caller passed in (not our internal linkedToken). When dispose is the only source,
 121                    // surface dispose-mid-iteration as ObjectDisposedException.
 199122                    cancellationToken.ThrowIfCancellationRequested();
 123
 124                    // Safe to read _state without a barrier: Dispose writes State.Disposed before calling
 125                    // _disposeCts.Cancel(), and observing the cancellation here establishes happens-before
 126                    // with that write.
 197127                    Debug.Assert(_state == (int)State.Disposed);
 197128                    throw new ObjectDisposedException(nameof(AsyncStream<>), "The stream was disposed while reading.");
 129                }
 130
 33131                IEnumerable<T> elements = _decodeBufferFunc(readResult.Buffer);
 31132                _reader.AdvanceTo(readResult.Buffer.End);
 133
 262830134                foreach (T item in elements)
 131370135                {
 131370136                    cancellationToken.ThrowIfCancellationRequested();
 137
 138                    // No memory barrier needed: this read is just an early-out optimization. If we miss a
 139                    // concurrent transition to Disposed, the next ReadAsync call observes the cancellation of
 140                    // _disposeCts (which has its own synchronization) and we surface ObjectDisposedException
 141                    // from the catch block above.
 131369142                    ObjectDisposedException.ThrowIf(_state == (int)State.Disposed, this);
 131369143                    yield return item;
 131367144                }
 145
 28146                if (readResult.IsCompleted)
 18147                {
 18148                    yield break;
 149                }
 10150            }
 151        }
 152        finally
 225153        {
 225154            _reader.Complete();
 225155        }
 23156    }
 157
 158    private enum State
 159    {
 160        Initial = 0,
 161        Iterating = 1,
 162        Disposed = 2
 163    }
 164}