| | | 1 | | // Copyright (c) ZeroC, Inc. |
| | | 2 | | |
| | | 3 | | using System.Buffers; |
| | | 4 | | using System.Diagnostics; |
| | | 5 | | using System.IO.Pipelines; |
| | | 6 | | using System.Runtime.CompilerServices; |
| | | 7 | | |
| | | 8 | | namespace IceRpc.Slice.Operations.Internal; |
| | | 9 | | |
| | | 10 | | /// <summary>The default <see cref="IAsyncStream{T}" /> implementation. It wraps a <see cref="PipeReader" /> and |
| | | 11 | | /// decodes its bytes into elements of type <typeparamref name="T"/> using a read function and a decode function. |
| | | 12 | | /// </summary> |
| | | 13 | | internal sealed class AsyncStream<T> : IAsyncStream<T> |
| | | 14 | | { |
| | | 15 | | private readonly PipeReader _reader; |
| | | 16 | | private readonly Func<PipeReader, CancellationToken, ValueTask<ReadResult>> _readFunc; |
| | | 17 | | private readonly Func<ReadOnlySequence<byte>, IEnumerable<T>> _decodeBufferFunc; |
| | | 18 | | |
| | | 19 | | // Canceled by Dispose when iteration has started, to unblock any pending ReadAsync. |
| | 235 | 20 | | private readonly CancellationTokenSource _disposeCts = new(); |
| | | 21 | | |
| | | 22 | | // Set when GetAsyncEnumerator is called. This enforces the single-enumerator contract even if the created |
| | | 23 | | // enumerator is never advanced. |
| | | 24 | | private bool _enumeratorCreated; |
| | | 25 | | |
| | | 26 | | // Atomic state used to safely arbitrate ownership of _reader.Complete() between Dispose and the first |
| | | 27 | | // MoveNextAsync. |
| | | 28 | | private int _state; |
| | | 29 | | |
| | | 30 | | public void Dispose() |
| | 223 | 31 | | { |
| | 223 | 32 | | int original = Interlocked.Exchange(ref _state, (int)State.Disposed); |
| | | 33 | | |
| | 223 | 34 | | switch ((State)original) |
| | | 35 | | { |
| | | 36 | | case State.Initial: |
| | | 37 | | // No iteration could have started (and any future MoveNextAsync will see Disposed and throw). |
| | | 38 | | // Safe to complete the reader directly from this thread. |
| | 9 | 39 | | _reader.Complete(); |
| | 9 | 40 | | _disposeCts.Dispose(); |
| | 9 | 41 | | break; |
| | | 42 | | |
| | | 43 | | case State.Iterating: |
| | | 44 | | // The iterator owns the reader; its finally will complete it. We only signal cancellation here. |
| | | 45 | | // We must not dispose _disposeCts here: a linked CTS inside the iterator may still hold a |
| | | 46 | | // registration on _disposeCts.Token. |
| | 212 | 47 | | _disposeCts.Cancel(); |
| | 212 | 48 | | break; |
| | | 49 | | |
| | | 50 | | case State.Disposed: |
| | | 51 | | // no-op (Dispose called more than once). |
| | 2 | 52 | | break; |
| | | 53 | | } |
| | 223 | 54 | | } |
| | | 55 | | |
| | | 56 | | public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken) |
| | 233 | 57 | | { |
| | | 58 | | // We don't check for Disposed here: if the stream was disposed, the first MoveNextAsync call on the |
| | | 59 | | // returned enumerator throws ObjectDisposedException (see EnumerateAsync). |
| | 233 | 60 | | if (_enumeratorCreated) |
| | 1 | 61 | | { |
| | 1 | 62 | | throw new InvalidOperationException($"An {nameof(IAsyncStream<T>)} can only be enumerated once."); |
| | | 63 | | } |
| | 232 | 64 | | _enumeratorCreated = true; |
| | 232 | 65 | | return EnumerateAsync(cancellationToken).GetAsyncEnumerator(cancellationToken); |
| | 232 | 66 | | } |
| | | 67 | | |
| | 235 | 68 | | internal AsyncStream( |
| | 235 | 69 | | PipeReader reader, |
| | 235 | 70 | | Func<PipeReader, CancellationToken, ValueTask<ReadResult>> readFunc, |
| | 235 | 71 | | Func<ReadOnlySequence<byte>, IEnumerable<T>> decodeBufferFunc) |
| | 235 | 72 | | { |
| | 235 | 73 | | _reader = reader; |
| | 235 | 74 | | _readFunc = readFunc; |
| | 235 | 75 | | _decodeBufferFunc = decodeBufferFunc; |
| | 235 | 76 | | } |
| | | 77 | | |
| | | 78 | | private async IAsyncEnumerable<T> EnumerateAsync([EnumeratorCancellation] CancellationToken cancellationToken) |
| | 230 | 79 | | { |
| | | 80 | | // Because this async method returns an IAsyncEnumerable<T>, it only starts executing when the caller starts |
| | | 81 | | // iterating (calls MoveNextAsync on the enumerator). It does not execute when EnumerateAsync is called, or |
| | | 82 | | // even when GetAsyncEnumerator is called on the returned IAsyncEnumerable<T>. |
| | | 83 | | |
| | | 84 | | // Atomically claim the reader (Idle -> Iterating). This races with Dispose's atomic transition to Disposed; |
| | | 85 | | // whichever transition wins from Idle owns _reader.Complete(). |
| | 230 | 86 | | int original = Interlocked.CompareExchange(ref _state, (int)State.Iterating, (int)State.Initial); |
| | 230 | 87 | | ObjectDisposedException.ThrowIf(original == (int)State.Disposed, this); |
| | 225 | 88 | | Debug.Assert(original == (int)State.Initial); // _enumeratorCreated forbids a second iteration. |
| | | 89 | | |
| | | 90 | | // Link the caller-provided token with our internal dispose token so that Dispose can unblock ReadAsync. |
| | 225 | 91 | | using var linkedCts = CancellationTokenSource.CreateLinkedTokenSource( |
| | 225 | 92 | | cancellationToken, |
| | 225 | 93 | | _disposeCts.Token); |
| | 225 | 94 | | CancellationToken linkedToken = linkedCts.Token; |
| | | 95 | | |
| | | 96 | | try |
| | 225 | 97 | | { |
| | 235 | 98 | | while (true) |
| | 235 | 99 | | { |
| | | 100 | | ReadResult readResult; |
| | | 101 | | |
| | | 102 | | try |
| | 235 | 103 | | { |
| | 235 | 104 | | readResult = await _readFunc(_reader, linkedToken).ConfigureAwait(false); |
| | | 105 | | |
| | 36 | 106 | | if (readResult.IsCanceled) |
| | 0 | 107 | | { |
| | | 108 | | // We never call CancelPendingRead; an interceptor or middleware can but it's not correct. |
| | 0 | 109 | | throw new InvalidOperationException("Unexpected call to CancelPendingRead."); |
| | | 110 | | } |
| | 36 | 111 | | if (readResult.Buffer.IsEmpty) |
| | 3 | 112 | | { |
| | 3 | 113 | | Debug.Assert(readResult.IsCompleted); |
| | 3 | 114 | | yield break; |
| | | 115 | | } |
| | 33 | 116 | | } |
| | 199 | 117 | | catch (OperationCanceledException) when (linkedToken.IsCancellationRequested) |
| | 199 | 118 | | { |
| | | 119 | | // Re-issue the cancellation with the caller's token so the OCE that propagates carries the |
| | | 120 | | // token the caller passed in (not our internal linkedToken). When dispose is the only source, |
| | | 121 | | // surface dispose-mid-iteration as ObjectDisposedException. |
| | 199 | 122 | | cancellationToken.ThrowIfCancellationRequested(); |
| | | 123 | | |
| | | 124 | | // Safe to read _state without a barrier: Dispose writes State.Disposed before calling |
| | | 125 | | // _disposeCts.Cancel(), and observing the cancellation here establishes happens-before |
| | | 126 | | // with that write. |
| | 197 | 127 | | Debug.Assert(_state == (int)State.Disposed); |
| | 197 | 128 | | throw new ObjectDisposedException(nameof(AsyncStream<>), "The stream was disposed while reading."); |
| | | 129 | | } |
| | | 130 | | |
| | 33 | 131 | | IEnumerable<T> elements = _decodeBufferFunc(readResult.Buffer); |
| | 31 | 132 | | _reader.AdvanceTo(readResult.Buffer.End); |
| | | 133 | | |
| | 262830 | 134 | | foreach (T item in elements) |
| | 131370 | 135 | | { |
| | 131370 | 136 | | cancellationToken.ThrowIfCancellationRequested(); |
| | | 137 | | |
| | | 138 | | // No memory barrier needed: this read is just an early-out optimization. If we miss a |
| | | 139 | | // concurrent transition to Disposed, the next ReadAsync call observes the cancellation of |
| | | 140 | | // _disposeCts (which has its own synchronization) and we surface ObjectDisposedException |
| | | 141 | | // from the catch block above. |
| | 131369 | 142 | | ObjectDisposedException.ThrowIf(_state == (int)State.Disposed, this); |
| | 131369 | 143 | | yield return item; |
| | 131367 | 144 | | } |
| | | 145 | | |
| | 28 | 146 | | if (readResult.IsCompleted) |
| | 18 | 147 | | { |
| | 18 | 148 | | yield break; |
| | | 149 | | } |
| | 10 | 150 | | } |
| | | 151 | | } |
| | | 152 | | finally |
| | 225 | 153 | | { |
| | 225 | 154 | | _reader.Complete(); |
| | 225 | 155 | | } |
| | 23 | 156 | | } |
| | | 157 | | |
| | | 158 | | private enum State |
| | | 159 | | { |
| | | 160 | | Initial = 0, |
| | | 161 | | Iterating = 1, |
| | | 162 | | Disposed = 2 |
| | | 163 | | } |
| | | 164 | | } |