| | 1 | | // Copyright (c) ZeroC, Inc. |
| | 2 | |
|
| | 3 | | using IceRpc.Internal; |
| | 4 | | using System.Buffers; |
| | 5 | | using System.Diagnostics; |
| | 6 | | using System.IO.Pipelines; |
| | 7 | |
|
| | 8 | | namespace IceRpc.Transports.Slic.Internal; |
| | 9 | |
|
| | 10 | | // Type owns disposable field(s) '_completeWritesCts' and '_sendCreditSemaphore' but is not disposable |
| | 11 | | #pragma warning disable CA1001 |
| | 12 | | internal class SlicPipeWriter : ReadOnlySequencePipeWriter |
| | 13 | | #pragma warning restore CA1001 |
| | 14 | | { |
| 2 | 15 | | public override bool CanGetUnflushedBytes => true; |
| | 16 | |
|
| 7094 | 17 | | public override long UnflushedBytes => _pipe.Writer.UnflushedBytes; |
| | 18 | |
|
| | 19 | | // We can avoid disposing _completeWritesCts because it was not created using CreateLinkedTokenSource, and it |
| | 20 | | // doesn't use a timer. It is not easy to dispose it because CompleteWrites can be called by another thread after |
| | 21 | | // Complete has been called. |
| 5316 | 22 | | private readonly CancellationTokenSource _completeWritesCts = new(); |
| | 23 | | private Exception? _exception; |
| | 24 | | private bool _isCompleted; |
| 5316 | 25 | | private volatile int _peerWindowSize = SlicTransportOptions.MaxWindowSize; |
| | 26 | | private readonly Pipe _pipe; |
| | 27 | | // The semaphore is used when flow control is enabled to wait for additional send credit to be available. |
| 5316 | 28 | | private readonly SemaphoreSlim _sendCreditSemaphore = new(1, 1); |
| | 29 | | private readonly SlicStream _stream; |
| | 30 | |
|
| | 31 | | public override void Advance(int bytes) |
| 18956 | 32 | | { |
| 18956 | 33 | | if (_isCompleted) |
| 0 | 34 | | { |
| 0 | 35 | | throw new InvalidOperationException("Writing is not allowed once the writer is completed."); |
| | 36 | | } |
| 18956 | 37 | | _pipe.Writer.Advance(bytes); |
| 18956 | 38 | | } |
| | 39 | |
|
| | 40 | | // SlicPipeWriter does not support this method: the IceRPC core does not need it. And when the application code |
| | 41 | | // installs a payload writer interceptor, this interceptor should never call it on "next". |
| 0 | 42 | | public override void CancelPendingFlush() => throw new NotSupportedException(); |
| | 43 | |
|
| | 44 | | public override void Complete(Exception? exception = null) |
| 6023 | 45 | | { |
| 6023 | 46 | | if (!_isCompleted) |
| 5262 | 47 | | { |
| 5262 | 48 | | _isCompleted = true; |
| | 49 | |
|
| 5262 | 50 | | if (exception is null && _pipe.Writer.UnflushedBytes > 0) |
| 2 | 51 | | { |
| 2 | 52 | | throw new InvalidOperationException( |
| 2 | 53 | | $"Completing a {nameof(SlicPipeWriter)} without an exception is not allowed when this pipe writer ha |
| | 54 | | } |
| | 55 | |
|
| | 56 | | // If the exception is set, forcefully close the stream writes if writes were not already gracefully closed |
| | 57 | | // by WriteAsync called with endStream=true. Otherwise, if exception is null, writes are gracefully closed. |
| 5260 | 58 | | _stream.CloseWrites(graceful: exception is null); |
| | 59 | |
|
| 5260 | 60 | | _pipe.Writer.Complete(); |
| 5260 | 61 | | _pipe.Reader.Complete(); |
| | 62 | |
|
| | 63 | | // Don't dispose the semaphore. It's not needed and we don't want to have to catch ObjectDisposedException |
| | 64 | | // from AdjustPeerWindowSize if a StreamWindowUpdate is received after the application completed the stream |
| | 65 | | // output. An alternative would be to add a lock but it's a bit overkill given than disposing the semaphore |
| | 66 | | // is only useful when using SemaphoreSlim.AvailableWaitHandle. |
| | 67 | | // _sendCreditSemaphore.Dispose(); |
| 5260 | 68 | | } |
| 6021 | 69 | | } |
| | 70 | |
|
| | 71 | | public override ValueTask<FlushResult> FlushAsync(CancellationToken cancellationToken) => |
| | 72 | | // WriteAsync will flush the internal buffer |
| 792 | 73 | | WriteAsync(ReadOnlySequence<byte>.Empty, endStream: false, cancellationToken); |
| | 74 | |
|
| 4 | 75 | | public override Memory<byte> GetMemory(int sizeHint) => _pipe.Writer.GetMemory(sizeHint); |
| | 76 | |
|
| 18956 | 77 | | public override Span<byte> GetSpan(int sizeHint) => _pipe.Writer.GetSpan(sizeHint); |
| | 78 | |
|
| | 79 | | public override ValueTask<FlushResult> WriteAsync( |
| | 80 | | ReadOnlyMemory<byte> source, |
| | 81 | | CancellationToken cancellationToken) => |
| 11278 | 82 | | WriteAsync(new ReadOnlySequence<byte>(source), endStream: false, cancellationToken); |
| | 83 | |
|
| | 84 | | public override async ValueTask<FlushResult> WriteAsync( |
| | 85 | | ReadOnlySequence<byte> source, |
| | 86 | | bool endStream, |
| | 87 | | CancellationToken cancellationToken) |
| 15614 | 88 | | { |
| 15614 | 89 | | if (_isCompleted) |
| 0 | 90 | | { |
| 0 | 91 | | throw new InvalidOperationException("Writing is not allowed once the writer is completed."); |
| | 92 | | } |
| | 93 | |
|
| | 94 | | // Flush the pipe before the check for the close connection. This makes sure that the check for unflushed data |
| | 95 | | // on successful compete succeeds. See the Complete implementation above. |
| 15614 | 96 | | if (_pipe.Writer.UnflushedBytes > 0) |
| 4299 | 97 | | { |
| 4299 | 98 | | await _pipe.Writer.FlushAsync(CancellationToken.None).ConfigureAwait(false); |
| 4299 | 99 | | } |
| | 100 | |
|
| 15614 | 101 | | _stream.ThrowIfConnectionClosed(); |
| | 102 | |
|
| | 103 | | // Abort the stream if the invocation is canceled. |
| 15604 | 104 | | using CancellationTokenRegistration cancelTokenRegistration = cancellationToken.UnsafeRegister( |
| 8 | 105 | | cts => ((CancellationTokenSource)cts!).Cancel(), |
| 15604 | 106 | | _completeWritesCts); |
| | 107 | |
|
| | 108 | | ReadOnlySequence<byte> source1; |
| | 109 | | ReadOnlySequence<byte> source2; |
| 15604 | 110 | | if (_pipe.Reader.TryRead(out ReadResult readResult)) |
| 4299 | 111 | | { |
| 4299 | 112 | | Debug.Assert(!readResult.IsCanceled && !readResult.IsCompleted && readResult.Buffer.Length > 0); |
| 4299 | 113 | | source1 = readResult.Buffer; |
| 4299 | 114 | | source2 = source; |
| 4299 | 115 | | } |
| | 116 | | else |
| 11305 | 117 | | { |
| 11305 | 118 | | source1 = source; |
| 11305 | 119 | | source2 = ReadOnlySequence<byte>.Empty; |
| 11305 | 120 | | } |
| | 121 | |
|
| 15604 | 122 | | if (source1.IsEmpty && source2.IsEmpty && !endStream) |
| 9 | 123 | | { |
| | 124 | | // WriteAsync is called with an empty buffer, typically by a call to FlushAsync. Some payload writers such |
| | 125 | | // as the deflate compressor might do this. |
| 9 | 126 | | return new FlushResult(isCanceled: false, isCompleted: false); |
| | 127 | | } |
| | 128 | |
|
| | 129 | | try |
| 15595 | 130 | | { |
| 15595 | 131 | | return await _stream.WriteStreamFrameAsync( |
| 15595 | 132 | | source1, |
| 15595 | 133 | | source2, |
| 15595 | 134 | | endStream, |
| 15595 | 135 | | _completeWritesCts.Token).ConfigureAwait(false); |
| | 136 | | } |
| 2027 | 137 | | catch (OperationCanceledException) |
| 2027 | 138 | | { |
| 2027 | 139 | | cancellationToken.ThrowIfCancellationRequested(); |
| 2019 | 140 | | return _exception is null ? |
| 2019 | 141 | | new FlushResult(isCanceled: false, isCompleted: true) : |
| 2019 | 142 | | throw ExceptionUtil.Throw(_exception); |
| | 143 | | } |
| | 144 | | finally |
| 15595 | 145 | | { |
| 15595 | 146 | | if (readResult.Buffer.Length > 0) |
| 4299 | 147 | | { |
| 4299 | 148 | | _pipe.Reader.AdvanceTo(readResult.Buffer.End); |
| | 149 | |
|
| | 150 | | // Make sure there's no more data to consume from the pipe. |
| 4299 | 151 | | Debug.Assert(!_pipe.Reader.TryRead(out ReadResult _)); |
| 4299 | 152 | | } |
| 15595 | 153 | | } |
| 15596 | 154 | | } |
| | 155 | |
|
| 5316 | 156 | | internal SlicPipeWriter(SlicStream stream, SlicConnection connection) |
| 5316 | 157 | | { |
| 5316 | 158 | | _stream = stream; |
| 5316 | 159 | | _peerWindowSize = connection.PeerInitialStreamWindowSize; |
| | 160 | |
|
| | 161 | | // Create a pipe that never pauses on flush or write. The SlicePipeWriter will pause the flush or write if |
| | 162 | | // the Slic flow control doesn't permit sending more data. |
| | 163 | | // The readerScheduler doesn't matter (we don't call _pipe.Reader.ReadAsync) and the writerScheduler doesn't |
| | 164 | | // matter (_pipe.Writer.FlushAsync never blocks). |
| 5316 | 165 | | _pipe = new(new PipeOptions( |
| 5316 | 166 | | pool: connection.Pool, |
| 5316 | 167 | | minimumSegmentSize: connection.MinSegmentSize, |
| 5316 | 168 | | pauseWriterThreshold: 0, |
| 5316 | 169 | | useSynchronizationContext: false)); |
| 5316 | 170 | | } |
| | 171 | |
|
| | 172 | | /// <summary>Acquires send credit.</summary> |
| | 173 | | /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param> |
| | 174 | | /// <returns>The available send credit.</returns> |
| | 175 | | /// <remarks>The send credit matches the size of the peer's flow-control window.</remarks> |
| | 176 | | internal async ValueTask<int> AcquireSendCreditAsync(CancellationToken cancellationToken) |
| 20176 | 177 | | { |
| | 178 | | // Acquire the semaphore to ensure flow control allows sending additional data. It's important to acquire the |
| | 179 | | // semaphore before checking the peer window size. The semaphore acquisition will block if we can't send |
| | 180 | | // additional data (_peerWindowSize <= 0). |
| 20176 | 181 | | await _sendCreditSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false); |
| 18149 | 182 | | return _peerWindowSize; |
| 18149 | 183 | | } |
| | 184 | |
|
| | 185 | | /// <summary>Complete writes.</summary> |
| | 186 | | /// <param name="exception">The exception that will be raised by <see cref="PipeWriter.WriteAsync" /> or <see |
| | 187 | | /// cref="FlushAsync" />.</param> |
| | 188 | | internal void CompleteWrites(Exception? exception) |
| 3290 | 189 | | { |
| 3290 | 190 | | Interlocked.CompareExchange(ref _exception, exception, null); |
| 3290 | 191 | | _completeWritesCts.Cancel(); |
| 3290 | 192 | | } |
| | 193 | |
|
| | 194 | | /// <summary>Notifies the writer of the amount of send credit consumed by the sending of a stream frame.</summary> |
| | 195 | | /// <param name="size">The size of the stream frame.</param> |
| | 196 | | internal void ConsumedSendCredit(int size) |
| 18149 | 197 | | { |
| 18149 | 198 | | Debug.Assert(_sendCreditSemaphore.CurrentCount == 0); // Can only be called with the semaphore acquired. |
| | 199 | |
|
| | 200 | | // Release the semaphore if the peer's window size is still superior to 0 |
| 18149 | 201 | | int newPeerWindowSize = Interlocked.Add(ref _peerWindowSize, -size); |
| 18149 | 202 | | if (newPeerWindowSize > 0) |
| 14909 | 203 | | { |
| 14909 | 204 | | _sendCreditSemaphore.Release(); |
| 14909 | 205 | | } |
| 18149 | 206 | | } |
| | 207 | |
|
| | 208 | | /// <summary>Notifies the writer of the reception of a <see cref="FrameType.StreamWindowUpdate" /> frame.</summary> |
| | 209 | | /// <param name="size">The window size increment.</param> |
| | 210 | | internal void ReceivedWindowUpdateFrame(int size) |
| 2064 | 211 | | { |
| 2064 | 212 | | Debug.Assert(size > 0); |
| | 213 | |
|
| 2064 | 214 | | int newPeerWindowSize = Interlocked.Add(ref _peerWindowSize, size); |
| 2064 | 215 | | if (newPeerWindowSize > SlicTransportOptions.MaxWindowSize) |
| 0 | 216 | | { |
| 0 | 217 | | throw new IceRpcException( |
| 0 | 218 | | IceRpcError.IceRpcError, |
| 0 | 219 | | $"The window update is trying to increase the window size to a value larger than allowed."); |
| | 220 | | } |
| | 221 | |
|
| 2064 | 222 | | int previousPeerWindowSize = newPeerWindowSize - size; |
| | 223 | |
|
| | 224 | | // A zero peer window size indicates that the last write consumed all the send credit and as a result didn't |
| | 225 | | // release the semaphore. We can now release the semaphore to allow another write to send data. |
| 2064 | 226 | | if (previousPeerWindowSize == 0) |
| 1222 | 227 | | { |
| 1222 | 228 | | Debug.Assert(_sendCreditSemaphore.CurrentCount == 0); |
| 1222 | 229 | | _sendCreditSemaphore.Release(); |
| 1222 | 230 | | } |
| 2064 | 231 | | } |
| | 232 | | } |