| | 1 | | // Copyright (c) ZeroC, Inc. |
| | 2 | |
|
| | 3 | | using IceRpc.Internal; |
| | 4 | | using IceRpc.Transports.Internal; |
| | 5 | | using System.Diagnostics; |
| | 6 | | using System.IO.Pipelines; |
| | 7 | |
|
| | 8 | | namespace IceRpc.Transports.Slic.Internal; |
| | 9 | |
|
| | 10 | | // The SlicPipeReader doesn't override ReadAtLeastAsyncCore or CopyToAsync methods because: |
| | 11 | | // - we can't forward the calls to the internal pipe reader since reading relies on the AdvanceTo implementation to send |
| | 12 | | // the StreamWindowUpdate frame once the data is examined, |
| | 13 | | // - the default implementation can't be much optimized. |
| | 14 | | internal class SlicPipeReader : PipeReader |
| | 15 | | { |
| | 16 | | private int _examined; |
| | 17 | | private volatile Exception? _exception; |
| | 18 | | private long _lastExaminedOffset; |
| | 19 | | private readonly Pipe _pipe; |
| | 20 | | private ReadResult _readResult; |
| | 21 | | // FlagEnumExtensions operations are used to update the state. These operations are atomic and don't require mutex |
| | 22 | | // locking. |
| | 23 | | private int _state; |
| | 24 | | private readonly SlicStream _stream; |
| | 25 | | private int _windowSize; |
| | 26 | |
|
| 6286 | 27 | | public override void AdvanceTo(SequencePosition consumed) => AdvanceTo(consumed, consumed); |
| | 28 | |
|
| | 29 | | public override void AdvanceTo(SequencePosition consumed, SequencePosition examined) |
| 12043 | 30 | | { |
| 12043 | 31 | | ThrowIfCompleted(); |
| | 32 | |
|
| 12043 | 33 | | long startOffset = _readResult.Buffer.GetOffset(_readResult.Buffer.Start); |
| 12043 | 34 | | long consumedOffset = _readResult.Buffer.GetOffset(consumed) - startOffset; |
| 12043 | 35 | | long examinedOffset = _readResult.Buffer.GetOffset(examined) - startOffset; |
| | 36 | |
|
| | 37 | | // Add the additional examined bytes to the examined bytes total. |
| 12043 | 38 | | _examined += (int)(examinedOffset - _lastExaminedOffset); |
| 12043 | 39 | | _lastExaminedOffset = examinedOffset - consumedOffset; |
| | 40 | |
|
| | 41 | | // If the number of examined bytes is superior to the window update threshold, notifies the stream of the window |
| | 42 | | // update. This will trigger the sending of a window update frame and allow the sender to send additional data. |
| 12043 | 43 | | if (_examined >= _stream.WindowUpdateThreshold) |
| 1987 | 44 | | { |
| 1987 | 45 | | Interlocked.Add(ref _windowSize, _examined); |
| 1987 | 46 | | _stream.WindowUpdate(_examined); |
| 1987 | 47 | | _examined = 0; |
| 1987 | 48 | | } |
| | 49 | |
|
| 12043 | 50 | | _pipe.Reader.AdvanceTo(consumed, examined); |
| 12043 | 51 | | } |
| | 52 | |
|
| 4 | 53 | | public override void CancelPendingRead() => _pipe.Reader.CancelPendingRead(); |
| | 54 | |
|
| | 55 | | public override void Complete(Exception? exception = null) |
| 5279 | 56 | | { |
| 5279 | 57 | | if (_state.TrySetFlag(State.Completed)) |
| 5237 | 58 | | { |
| | 59 | | // Forcefully close the stream reads if reads were not already gracefully closed by ReadAsync or TryRead. |
| 5237 | 60 | | _stream.CloseReads(graceful: false); |
| | 61 | |
|
| 5237 | 62 | | CompleteReads(exception: null); |
| | 63 | |
|
| 5237 | 64 | | _pipe.Reader.Complete(); |
| 5237 | 65 | | } |
| 5279 | 66 | | } |
| | 67 | |
|
| | 68 | | public override async ValueTask<ReadResult> ReadAsync(CancellationToken cancellationToken = default) |
| 12638 | 69 | | { |
| 12638 | 70 | | ThrowIfCompleted(); |
| | 71 | |
|
| 12638 | 72 | | if (_exception is not null) |
| 25 | 73 | | { |
| 25 | 74 | | _stream.ThrowIfConnectionClosed(); |
| 17 | 75 | | } |
| | 76 | |
|
| 12630 | 77 | | ReadResult result = await _pipe.Reader.ReadAsync(cancellationToken).ConfigureAwait(false); |
| 12118 | 78 | | if (result.IsCanceled) |
| 4 | 79 | | { |
| 4 | 80 | | return GetReadResult(result); |
| | 81 | | } |
| | 82 | |
|
| | 83 | | // Cache the read result for the implementation of AdvanceTo that needs to figure out how much data got examined |
| | 84 | | // and consumed. |
| 12114 | 85 | | _readResult = result; |
| | 86 | |
|
| | 87 | | // All the data from the peer is considered read at this point. It's time to close reads on the stream. This |
| | 88 | | // will write the StreamReadsClosed frame to the peer and allow it to release the stream semaphore. |
| 12114 | 89 | | if (result.IsCompleted) |
| 2448 | 90 | | { |
| 2448 | 91 | | _stream.CloseReads(graceful: true); |
| 2448 | 92 | | } |
| | 93 | |
|
| 12114 | 94 | | return result; |
| 12118 | 95 | | } |
| | 96 | |
|
| | 97 | | public override bool TryRead(out ReadResult result) |
| 71 | 98 | | { |
| 71 | 99 | | ThrowIfCompleted(); |
| | 100 | |
|
| 71 | 101 | | if (_exception is not null) |
| 0 | 102 | | { |
| 0 | 103 | | _stream.ThrowIfConnectionClosed(); |
| 0 | 104 | | } |
| | 105 | |
|
| 71 | 106 | | if (_pipe.Reader.TryRead(out result)) |
| 71 | 107 | | { |
| 71 | 108 | | if (result.IsCanceled) |
| 0 | 109 | | { |
| 0 | 110 | | result = GetReadResult(result); |
| 0 | 111 | | return true; |
| | 112 | | } |
| | 113 | |
|
| | 114 | | // Cache the read result for the implementation of AdvanceTo that needs to figure out how much data got |
| | 115 | | // examined and consumed. |
| 71 | 116 | | _readResult = result; |
| | 117 | |
|
| | 118 | | // All the data from the peer is considered read at this point. It's time to close reads on the stream. This |
| | 119 | | // will write the StreamReadsClosed frame to the peer and allow it to release the stream semaphore. |
| 71 | 120 | | if (result.IsCompleted) |
| 71 | 121 | | { |
| 71 | 122 | | _stream.CloseReads(graceful: true); |
| 71 | 123 | | } |
| 71 | 124 | | return true; |
| | 125 | | } |
| | 126 | | else |
| 0 | 127 | | { |
| 0 | 128 | | return false; |
| | 129 | | } |
| 71 | 130 | | } |
| | 131 | |
|
| 5297 | 132 | | internal SlicPipeReader(SlicStream stream, SlicConnection connection) |
| 5297 | 133 | | { |
| 5297 | 134 | | _stream = stream; |
| 5297 | 135 | | _windowSize = connection.InitialStreamWindowSize; |
| | 136 | |
|
| | 137 | | // We keep the default readerScheduler (ThreadPool) because the _pipe.Writer.FlushAsync executes in the |
| | 138 | | // "read loop task" and we don't want this task to continue into application code. The writerScheduler |
| | 139 | | // doesn't matter since _pipe.Writer.FlushAsync never blocks. |
| 5297 | 140 | | _pipe = new(new PipeOptions( |
| 5297 | 141 | | pool: connection.Pool, |
| 5297 | 142 | | pauseWriterThreshold: 0, |
| 5297 | 143 | | minimumSegmentSize: connection.MinSegmentSize, |
| 5297 | 144 | | useSynchronizationContext: false)); |
| 5297 | 145 | | } |
| | 146 | |
|
| | 147 | | /// <summary>Completes reads.</summary> |
| | 148 | | /// <param name="exception">The exception that will be raised by <see cref="ReadAsync" /> or <see cref="TryRead" /> |
| | 149 | | /// operation.</param> |
| | 150 | | internal void CompleteReads(Exception? exception) |
| 5930 | 151 | | { |
| 5930 | 152 | | Interlocked.CompareExchange(ref _exception, exception, null); |
| | 153 | |
|
| 5930 | 154 | | if (_state.TrySetFlag(State.PipeWriterCompleted)) |
| 5291 | 155 | | { |
| 5291 | 156 | | if (_state.HasFlag(State.PipeWriterInUse)) |
| 152 | 157 | | { |
| 152 | 158 | | _pipe.Reader.CancelPendingRead(); |
| 152 | 159 | | } |
| | 160 | | else |
| 5139 | 161 | | { |
| 5139 | 162 | | _pipe.Writer.Complete(exception); |
| 5139 | 163 | | } |
| 5291 | 164 | | } |
| 5930 | 165 | | } |
| | 166 | |
|
| | 167 | | /// <summary>Notifies the reader of the reception of a <see cref="FrameType.Stream" /> or <see |
| | 168 | | /// cref="FrameType.StreamLast" /> frame. The stream data is consumed from the connection and buffered by this |
| | 169 | | /// reader on its internal pipe.</summary> |
| | 170 | | /// <returns><see langword="true" /> if the data was consumed; otherwise, <see langword="false"/> if the reader was |
| | 171 | | /// completed by the application.</returns> |
| | 172 | | internal async ValueTask<bool> ReceivedDataFrameAsync( |
| | 173 | | int dataSize, |
| | 174 | | bool endStream, |
| | 175 | | CancellationToken cancellationToken) |
| 18915 | 176 | | { |
| 18915 | 177 | | if (dataSize == 0 && !endStream) |
| 0 | 178 | | { |
| 0 | 179 | | throw new IceRpcException( |
| 0 | 180 | | IceRpcError.IceRpcError, |
| 0 | 181 | | "An empty Slic stream frame is not allowed unless endStream is true."); |
| | 182 | | } |
| | 183 | |
|
| 18915 | 184 | | if (!_state.TrySetFlag(State.PipeWriterInUse)) |
| 0 | 185 | | { |
| 0 | 186 | | throw new InvalidOperationException( |
| 0 | 187 | | $"The {nameof(ReceivedDataFrameAsync)} operation is not thread safe."); |
| | 188 | | } |
| | 189 | |
|
| | 190 | | try |
| 18915 | 191 | | { |
| 18915 | 192 | | if (_state.HasFlag(State.PipeWriterCompleted)) |
| 1 | 193 | | { |
| 1 | 194 | | return false; // No bytes consumed because the application completed the stream input. |
| | 195 | | } |
| | 196 | |
|
| 18914 | 197 | | int newWindowSize = Interlocked.Add(ref _windowSize, -dataSize); |
| 18914 | 198 | | if (newWindowSize < 0) |
| 0 | 199 | | { |
| 0 | 200 | | throw new IceRpcException( |
| 0 | 201 | | IceRpcError.IceRpcError, |
| 0 | 202 | | "Received more data than flow control permits."); |
| | 203 | | } |
| | 204 | |
|
| | 205 | | // Fill the pipe writer with dataSize bytes. |
| 18914 | 206 | | await _stream.FillBufferWriterAsync( |
| 18914 | 207 | | _pipe.Writer, |
| 18914 | 208 | | dataSize, |
| 18914 | 209 | | cancellationToken).ConfigureAwait(false); |
| | 210 | |
|
| 18913 | 211 | | if (endStream) |
| 2375 | 212 | | { |
| 2375 | 213 | | _pipe.Writer.Complete(); |
| 2375 | 214 | | } |
| | 215 | | else |
| 16538 | 216 | | { |
| 16538 | 217 | | _ = await _pipe.Writer.FlushAsync(CancellationToken.None).ConfigureAwait(false); |
| 16538 | 218 | | } |
| | 219 | |
|
| 18913 | 220 | | return true; |
| | 221 | | } |
| | 222 | | finally |
| 18915 | 223 | | { |
| 18915 | 224 | | if (_state.HasFlag(State.PipeWriterCompleted)) |
| 153 | 225 | | { |
| | 226 | | // If the pipe writer has been completed while we were reading the data from the stream, we make sure to |
| | 227 | | // complete the writer now since Complete or CompleteWriter didn't do it. |
| 153 | 228 | | _pipe.Writer.Complete(_exception); |
| 153 | 229 | | } |
| 18915 | 230 | | _state.ClearFlag(State.PipeWriterInUse); |
| 18915 | 231 | | } |
| 18914 | 232 | | } |
| | 233 | |
|
| | 234 | | private ReadResult GetReadResult(ReadResult readResult) |
| 4 | 235 | | { |
| | 236 | | // This method is called by ReadAsync or TryRead when the read operation on _pipe.Reader returns a canceled read |
| | 237 | | // result (IsCanceled=true). The _pipe.Reader ReadAsync/TryRead operations can return a canceled read result for |
| | 238 | | // two different reasons: |
| | 239 | | // - the application called CancelPendingRead |
| | 240 | | // - the connection is closed while data is written on _pipe.Writer |
| 4 | 241 | | Debug.Assert(readResult.IsCanceled); |
| | 242 | |
|
| 4 | 243 | | if (_state.HasFlag(State.PipeWriterCompleted)) |
| 0 | 244 | | { |
| | 245 | | // The connection was closed while the pipe writer was in use. Either throw or return a non-canceled result |
| | 246 | | // depending on the completion exception. |
| 0 | 247 | | if (_exception is null) |
| 0 | 248 | | { |
| 0 | 249 | | return new ReadResult(readResult.Buffer, isCanceled: false, isCompleted: true); |
| | 250 | | } |
| | 251 | | else |
| 0 | 252 | | { |
| 0 | 253 | | throw ExceptionUtil.Throw(_exception); |
| | 254 | | } |
| | 255 | | } |
| | 256 | | else |
| 4 | 257 | | { |
| | 258 | | // The application called CancelPendingRead, return the read result as-is. |
| 4 | 259 | | return readResult; |
| | 260 | | } |
| 4 | 261 | | } |
| | 262 | |
|
| | 263 | | private void ThrowIfCompleted() |
| 24752 | 264 | | { |
| 24752 | 265 | | if (_state.HasFlag(State.Completed)) |
| 0 | 266 | | { |
| | 267 | | // If the reader is completed, the caller is bogus, it shouldn't call read operations after completing the |
| | 268 | | // pipe reader. |
| 0 | 269 | | throw new InvalidOperationException("Reading is not allowed once the reader is completed."); |
| | 270 | | } |
| 24752 | 271 | | } |
| | 272 | |
|
| | 273 | | /// <summary>The state enumeration is used to ensure the reader is not used after it's completed and to ensure that |
| | 274 | | /// the internal pipe writer isn't completed concurrently when it's being used by <see |
| | 275 | | /// cref="ReceivedDataFrameAsync" />.</summary> |
| | 276 | | private enum State : int |
| | 277 | | { |
| | 278 | | /// <summary><see cref="Complete" /> was called on this Slic pipe reader.</summary> |
| | 279 | | Completed = 1, |
| | 280 | |
|
| | 281 | | /// <summary>Data is being written to the internal pipe writer.</summary> |
| | 282 | | PipeWriterInUse = 2, |
| | 283 | |
|
| | 284 | | /// <summary>The internal pipe writer was completed by <see cref="CompleteReads" />.</summary> |
| | 285 | | PipeWriterCompleted = 4, |
| | 286 | | } |
| | 287 | | } |