< Summary

Information
Class: IceRpc.Transports.Slic.Internal.SlicStream
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicStream.cs
Tag: 701_22528036593
Line coverage
92%
Covered lines: 213
Uncovered lines: 18
Coverable lines: 231
Total lines: 448
Line coverage: 92.2%
Branch coverage
92%
Covered branches: 78
Total branches: 84
Branch coverage: 92.8%
Method coverage
100%
Covered methods: 28
Total methods: 28
Method coverage: 100%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_Id()50%2.09271.42%
set_Id(...)100%11100%
get_Input()50%22100%
get_IsBidirectional()100%11100%
get_IsRemote()100%11100%
get_IsStarted()100%11100%
get_Output()50%22100%
get_WritesClosed()100%11100%
get_WindowUpdateThreshold()100%11100%
.ctor(...)100%1212100%
AcquireSendCreditAsync(...)100%11100%
Close(...)100%44100%
CloseReads(...)100%24.242492.5%
CloseWrites(...)100%14.531486.04%
ConsumedSendCredit(...)100%11100%
FillBufferWriterAsync(...)100%11100%
ReceivedDataFrameAsync(...)100%66100%
ReceivedReadsClosedFrame()50%22100%
ReceivedWindowUpdateFrame(...)50%2.5250%
ReceivedWritesClosedFrame()50%22100%
WindowUpdate(...)100%1.02172.72%
WriteStreamFrameAsync(...)100%22100%
WroteLastStreamFrame()100%22100%
ThrowIfConnectionClosed()100%11100%
TrySetReadsClosed()100%11100%
TrySetWritesClosed()100%22100%
TrySetState(...)100%66100%
WriteStreamFrame(...)100%11100%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicStream.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Transports.Internal;
 4using System.Buffers;
 5using System.Diagnostics;
 6using System.IO.Pipelines;
 7using ZeroC.Slice;
 8
 9namespace IceRpc.Transports.Slic.Internal;
 10
 11/// <summary>The stream implementation for Slic.</summary>
 12/// <remarks>The stream implementation implements flow control to ensure data isn't buffered indefinitely if the
 13/// application doesn't consume it.</remarks>
 14internal class SlicStream : IMultiplexedStream
 15{
 16    public ulong Id
 17    {
 18        get
 1830819        {
 1830820            ulong id = Volatile.Read(ref _id);
 1830821            if (id == ulong.MaxValue)
 022            {
 023                throw new InvalidOperationException("The stream ID isn't allocated yet.");
 24            }
 1830825            return id;
 1830826        }
 27
 28        set
 410929        {
 410930            Debug.Assert(_id == ulong.MaxValue);
 410931            Volatile.Write(ref _id, value);
 410932        }
 33    }
 34
 35    public PipeReader Input =>
 1253436        _inputPipeReader ?? throw new InvalidOperationException("A local unidirectional stream has no Input.");
 37
 38    /// <inheritdoc/>
 3506139    public bool IsBidirectional { get; }
 40
 41    /// <inheritdoc/>
 3166242    public bool IsRemote { get; }
 43
 44    /// <inheritdoc/>
 2775745    public bool IsStarted => Volatile.Read(ref _id) != ulong.MaxValue;
 46
 47    public PipeWriter Output =>
 898648        _outputPipeWriter ?? throw new InvalidOperationException("A remote unidirectional stream has no Output.");
 49
 224650    public Task WritesClosed => _writesClosedTcs.Task;
 51
 678052    internal int WindowUpdateThreshold => _connection.StreamWindowUpdateThreshold;
 53
 54    private bool _closeReadsOnWritesClosure;
 55    private readonly SlicConnection _connection;
 412556    private ulong _id = ulong.MaxValue;
 57    private readonly SlicPipeReader? _inputPipeReader;
 58    // This mutex protects _writesClosePending, _closeReadsOnWritesClosure.
 412559    private readonly Lock _mutex = new();
 60    private readonly SlicPipeWriter? _outputPipeWriter;
 61    // FlagEnumExtensions operations are used to update the state. These operations are atomic and don't require mutex
 62    // locking.
 63    private int _state;
 412564    private readonly TaskCompletionSource _writesClosedTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
 65    private bool _writesClosePending;
 66
 412567    internal SlicStream(SlicConnection connection, bool isBidirectional, bool isRemote)
 412568    {
 412569        _connection = connection;
 70
 412571        IsBidirectional = isBidirectional;
 412572        IsRemote = isRemote;
 73
 412574        if (!IsBidirectional)
 280775        {
 280776            if (IsRemote)
 139877            {
 78                // Write-side of remote unidirectional stream is marked as closed.
 139879                TrySetWritesClosed();
 139880            }
 81            else
 140982            {
 83                // Read-side of local unidirectional stream is marked as closed.
 140984                TrySetReadsClosed();
 140985            }
 280786        }
 87
 412588        if (IsRemote || IsBidirectional)
 271689        {
 271690            _inputPipeReader = new SlicPipeReader(this, _connection);
 271691        }
 92
 412593        if (!IsRemote || IsBidirectional)
 272794        {
 272795            _outputPipeWriter = new SlicPipeWriter(this, _connection);
 272796        }
 412597    }
 98
 99    /// <summary>Acquires send credit.</summary>
 100    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 101    /// <returns>The available send credit.</returns>
 102    /// <remarks>This method should be called before sending a <see cref="FrameType.Stream"/> or <see
 103    /// cref="FrameType.StreamLast"/> frame to ensure enough send credit is available. If no send credit is available,
 104    /// it will block until send credit is available. The send credit matches the size of the peer's flow-control
 105    /// window.</remarks>
 106    internal ValueTask<int> AcquireSendCreditAsync(CancellationToken cancellationToken) =>
 10148107        _outputPipeWriter!.AcquireSendCreditAsync(cancellationToken);
 108
 109    /// <summary>Closes the read and write sides of the stream and notifies the stream <see cref="Input" /> and <see
 110    /// cref="Output" /> of the reads and writes closure.</summary>
 111    internal void Close(Exception closeException)
 636112    {
 636113        if (TrySetReadsClosed())
 310114        {
 310115            Debug.Assert(_inputPipeReader is not null);
 310116            _inputPipeReader.CompleteReads(closeException);
 310117        }
 636118        if (TrySetWritesClosed())
 358119        {
 358120            Debug.Assert(_outputPipeWriter is not null);
 358121            _outputPipeWriter.CompleteWrites(closeException);
 358122        }
 636123    }
 124
 125    /// <summary>Closes the read-side of the stream. It's only called by <see cref="SlicPipeReader.Complete" />, <see
 126    /// cref="SlicPipeReader.TryRead" /> or <see cref="SlicPipeReader.ReadAsync" /> and never called concurrently.
 127    /// </summary>
 128    /// <param name="graceful"><see langword="true" /> if the application consumed all the stream data from the stream
 129    /// <see cref="Input" />; otherwise, <see langword="false" />.</param>
 130    internal void CloseReads(bool graceful)
 4602131    {
 4602132        bool writeReadsClosedFrame = false;
 133
 134        lock (_mutex)
 4602135        {
 4602136            if (IsStarted && !_state.HasFlag(State.ReadsClosed))
 2358137            {
 138                // As an optimization, if reads are gracefully closed once the buffered data is consumed but before
 139                // writes are closed, we don't send the StreamReadsClosed frame just yet. Instead, when writes are
 140                // closed, CloseWrites will bundle the sending of the StreamReadsClosed with the sending of the
 141                // StreamLast or StreamWritesClosed frame. This allows to send both frames with a single write on the
 142                // duplex connection.
 2358143                if (graceful &&
 2358144                    IsBidirectional &&
 2358145                    IsRemote &&
 2358146                    !_state.HasFlag(State.WritesClosed) &&
 2358147                    !_writesClosePending)
 460148                {
 460149                    _closeReadsOnWritesClosure = true;
 460150                }
 1898151                else if (!graceful || IsRemote)
 1342152                {
 153                    // If forcefully closed because the input was completed before the data was fully read or if writes
 154                    // are already closed and the stream is a remote stream, we send the StreamReadsClosed frame to
 155                    // notify the peer that reads are closed.
 1342156                    writeReadsClosedFrame = true;
 1342157                }
 2358158            }
 4602159        }
 160
 4602161        if (writeReadsClosedFrame)
 1342162        {
 1342163            if (IsRemote)
 1278164            {
 165                // If it's a remote stream, we close writes before sending the StreamReadsClosed frame to ensure
 166                // _connection._bidirectionalStreamCount or _connection._unidirectionalStreamCount is decreased before
 167                // the peer receives the frame. This is necessary to prevent a race condition where the peer could
 168                // release the connection's bidirectional or unidirectional stream semaphore before this connection's
 169                // stream count is actually decreased.
 1278170                TrySetReadsClosed();
 1278171            }
 172
 173            try
 1342174            {
 1342175                WriteStreamFrame(FrameType.StreamReadsClosed, encode: null, writeReadsClosedFrame: false);
 1342176            }
 0177            catch (IceRpcException)
 0178            {
 179                // Ignore connection failures.
 0180            }
 181
 1342182            if (!IsRemote)
 64183            {
 184                // We can now close reads to permit a new stream to be started. The peer will receive the
 185                // StreamReadsClosed frame before the new stream sends a Stream frame.
 64186                TrySetReadsClosed();
 64187            }
 1342188        }
 189        else
 3260190        {
 3260191            TrySetReadsClosed();
 3260192        }
 4602193    }
 194
 195    /// <summary>Closes the write-side of the stream. It's only called by <see cref="SlicPipeWriter.Complete" /> and
 196    /// never called concurrently.</summary>
 197    /// <param name="graceful"><see langword="true" /> if the application wrote all the stream data on the stream <see
 198    /// cref="Output" />; otherwise, <see langword="false" />.</param>
 199    internal void CloseWrites(bool graceful)
 2699200    {
 2699201        bool writeWritesClosedFrame = false;
 2699202        bool writeReadsClosedFrame = false;
 203
 204        lock (_mutex)
 2699205        {
 2699206            if (IsStarted && !_state.HasFlag(State.WritesClosed) && !_writesClosePending)
 606207            {
 606208                writeReadsClosedFrame = _closeReadsOnWritesClosure;
 606209                _writesClosePending = true;
 606210                writeWritesClosedFrame = true;
 606211            }
 2699212        }
 213
 2699214        if (writeWritesClosedFrame)
 606215        {
 606216            if (IsRemote)
 241217            {
 218                // If it's a remote stream, we close writes before sending the StreamLast or StreamWritesClosed
 219                // frame to ensure _connection._bidirectionalStreamCount or _connection._unidirectionalStreamCount
 220                // is decreased before the peer receives the frame. This is necessary to prevent a race condition
 221                // where the peer could release the connection's bidirectional or unidirectional stream semaphore
 222                // before this connection's stream count is actually decreased.
 241223                TrySetWritesClosed();
 241224            }
 225
 606226            if (graceful)
 565227            {
 228                try
 565229                {
 565230                    WriteStreamFrame(FrameType.StreamLast, encode: null, writeReadsClosedFrame);
 565231                }
 0232                catch (IceRpcException)
 0233                {
 234                    // Ignore connection failures.
 0235                }
 236
 237                // If the stream is a local stream, writes are not closed until the StreamReadsClosed frame is
 238                // received from the peer (see ReceivedReadsClosedFrame). This ensures that the connection's
 239                // bidirectional or unidirectional stream semaphore is released only once the peer consumed the
 240                // buffered data.
 565241            }
 242            else
 41243            {
 244                try
 41245                {
 41246                    WriteStreamFrame(FrameType.StreamWritesClosed, encode: null, writeReadsClosedFrame);
 41247                }
 0248                catch (IceRpcException)
 0249                {
 250                    // Ignore connection failures.
 0251                }
 252
 41253                if (!IsRemote)
 26254                {
 255                    // We can now close writes to allow starting a new stream. Since the sending of frames is
 256                    // serialized over the connection, the peer will receive this StreamWritesClosed frame before
 257                    // a new stream sends a StreamFrame frame.
 26258                    TrySetWritesClosed();
 26259                }
 41260            }
 606261        }
 262        else
 2093263        {
 2093264            TrySetWritesClosed();
 2093265        }
 2699266    }
 267
 268    /// <summary>Notifies the stream of the amount of data consumed by the connection to send a <see
 269    /// cref="FrameType.Stream" /> or <see cref="FrameType.StreamLast" /> frame.</summary>
 270    /// <param name="size">The size of the stream frame.</param>
 9135271    internal void ConsumedSendCredit(int size) => _outputPipeWriter!.ConsumedSendCredit(size);
 272
 273    /// <summary>Fills the given writer with stream data received on the connection.</summary>
 274    /// <param name="bufferWriter">The destination buffer writer.</param>
 275    /// <param name="byteCount">The amount of stream data to read.</param>
 276    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 277    internal ValueTask FillBufferWriterAsync(
 278        IBufferWriter<byte> bufferWriter,
 279        int byteCount,
 280        CancellationToken cancellationToken) =>
 9515281        _connection.FillBufferWriterAsync(bufferWriter, byteCount, cancellationToken);
 282
 283    /// <summary>Notifies the stream of the reception of a <see cref="FrameType.Stream" /> or <see
 284    /// cref="FrameType.StreamLast" /> frame.</summary>
 285    /// <param name="size">The size of the data carried by the stream frame.</param>
 286    /// <param name="endStream"><see langword="true" /> if the received stream frame is the <see
 287    /// cref="FrameType.StreamLast" /> frame; otherwise, <see langword="false" />.</param>
 288    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 289    internal ValueTask<bool> ReceivedDataFrameAsync(int size, bool endStream, CancellationToken cancellationToken)
 9529290    {
 9529291        Debug.Assert(_inputPipeReader is not null);
 9529292        if (_state.HasFlag(State.ReadsClosed))
 14293        {
 14294            return new(false);
 295        }
 296        else
 9515297        {
 9515298            if (endStream && !IsRemote)
 556299            {
 300                // For a local stream we can close reads after we have received the StreamLast frame. For remote
 301                // streams reads are closed after the application has consumed all the data.
 556302                CloseReads(graceful: true);
 556303            }
 9515304            return _inputPipeReader.ReceivedDataFrameAsync(size, endStream, cancellationToken);
 305        }
 9529306    }
 307
 308    /// <summary>Notifies the stream of the reception of a <see cref="FrameType.StreamReadsClosed" /> frame.</summary>
 309    internal void ReceivedReadsClosedFrame()
 1307310    {
 1307311        TrySetWritesClosed();
 1307312        _outputPipeWriter?.CompleteWrites(exception: null);
 1307313    }
 314
 315    /// <summary>Notifies the stream of the reception of a <see cref="FrameType.StreamWindowUpdate" /> frame.</summary>
 316    /// <param name="frame">The body of the <see cref="FrameType.StreamWindowUpdate" /> frame.</param>
 317    internal void ReceivedWindowUpdateFrame(StreamWindowUpdateBody frame)
 1076318    {
 1076319        if (frame.WindowSizeIncrement > SlicTransportOptions.MaxWindowSize)
 0320        {
 0321            throw new IceRpcException(
 0322                IceRpcError.IceRpcError,
 0323                $"The window update is trying to increase the window size to a value larger than allowed.");
 324        }
 1076325        _outputPipeWriter!.ReceivedWindowUpdateFrame((int)frame.WindowSizeIncrement);
 1076326    }
 327
 328    /// <summary>Notifies the stream of the reception of a <see cref="FrameType.StreamWritesClosed" /> frame.</summary>
 329    internal void ReceivedWritesClosedFrame()
 39330    {
 39331        TrySetReadsClosed();
 332
 333        // Read operations will return a TruncatedData error if the peer closed writes.
 39334        _inputPipeReader?.CompleteReads(new IceRpcException(IceRpcError.TruncatedData));
 39335    }
 336
 337    /// <summary>Notifies the stream of the window update.</summary>
 338    /// <param name="size">The amount of data consumed by the application on the stream <see cref="Input" />.</param>
 339    internal void WindowUpdate(int size)
 1123340    {
 341        try
 1123342        {
 343            // Notify the sender of the window update to permit the sending of additional data.
 1123344            WriteStreamFrame(
 1123345                FrameType.StreamWindowUpdate,
 1123346                new StreamWindowUpdateBody((ulong)size).Encode,
 1123347                writeReadsClosedFrame: false);
 1123348        }
 0349        catch (IceRpcException)
 0350        {
 351            // Ignore connection failures.
 0352        }
 1123353    }
 354
 355    /// <summary>Writes a <see cref="FrameType.Stream" /> or <see cref="FrameType.StreamLast" /> frame on the
 356    /// connection.</summary>
 357    /// <param name="source1">The first stream frame data source.</param>
 358    /// <param name="source2">The second stream frame data source.</param>
 359    /// <param name="endStream"><see langword="true" /> to write a <see cref="FrameType.StreamLast" /> frame; otherwise,
 360    /// <see langword="false" />.</param>
 361    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 362    internal ValueTask<FlushResult> WriteStreamFrameAsync(
 363        ReadOnlySequence<byte> source1,
 364        ReadOnlySequence<byte> source2,
 365        bool endStream,
 366        CancellationToken cancellationToken)
 7872367    {
 7872368        bool writeReadsClosedFrame = false;
 7872369        if (endStream)
 1785370        {
 371            lock (_mutex)
 1785372            {
 1785373                writeReadsClosedFrame = _closeReadsOnWritesClosure;
 1785374                _writesClosePending = true;
 1785375            }
 1785376        }
 377
 7872378        return _connection.WriteStreamDataFrameAsync(
 7872379            this,
 7872380            source1,
 7872381            source2,
 7872382            endStream,
 7872383            writeReadsClosedFrame,
 7872384            cancellationToken);
 7872385    }
 386
 387    /// <summary>Notifies the stream that the <see cref="FrameType.StreamLast" /> was written by the
 388    /// connection.</summary>
 389    internal void WroteLastStreamFrame()
 1345390    {
 1345391        if (IsRemote)
 603392        {
 603393            TrySetWritesClosed();
 603394        }
 395        // For local streams, writes will be closed only once the peer sends the StreamReadsClosed frame.
 396
 1345397        _writesClosedTcs.TrySetResult();
 1345398    }
 399
 400    /// <summary>Throws the connection closure exception if the connection is closed.</summary>
 7892401    internal void ThrowIfConnectionClosed() => _connection.ThrowIfClosed();
 402
 6686403    private bool TrySetReadsClosed() => TrySetState(State.ReadsClosed);
 404
 405    private bool TrySetWritesClosed()
 6304406    {
 6304407        if (TrySetState(State.WritesClosed))
 4120408        {
 4120409            _writesClosedTcs.TrySetResult();
 4120410            return true;
 411        }
 412        else
 2184413        {
 2184414            return false;
 415        }
 6304416    }
 417
 418    private bool TrySetState(State state)
 12990419    {
 12990420        if (_state.TrySetFlag(state, out int newState))
 8243421        {
 8243422            if (newState.HasFlag(State.ReadsClosed | State.WritesClosed))
 4120423            {
 424                // The stream reads and writes are closed, it's time to release the stream to either allow creating or
 425                // accepting a new stream.
 4120426                if (IsStarted)
 4109427                {
 4109428                    _connection.ReleaseStream(this);
 4109429                }
 4120430            }
 8243431            return true;
 432        }
 433        else
 4747434        {
 4747435            return false;
 436        }
 12990437    }
 438
 439    private void WriteStreamFrame(FrameType frameType, EncodeAction? encode, bool writeReadsClosedFrame) =>
 3071440        _connection.WriteStreamFrame(stream: this, frameType, encode, writeReadsClosedFrame);
 441
 442    [Flags]
 443    private enum State : int
 444    {
 445        ReadsClosed = 1,
 446        WritesClosed = 2
 447    }
 448}