< Summary

Information
Class: IceRpc.Transports.Slic.Internal.SlicStream
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicStream.cs
Tag: 275_13775359185
Line coverage
92%
Covered lines: 216
Uncovered lines: 18
Coverable lines: 234
Total lines: 448
Line coverage: 92.3%
Branch coverage
92%
Covered branches: 78
Total branches: 84
Branch coverage: 92.8%
Method coverage
100%
Covered methods: 28
Total methods: 28
Method coverage: 100%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_Id()50%2.09271.42%
set_Id(...)100%11100%
get_Input()50%22100%
get_IsBidirectional()100%11100%
get_IsRemote()100%11100%
get_IsStarted()100%11100%
get_Output()50%22100%
get_WritesClosed()100%11100%
get_WindowUpdateThreshold()100%11100%
.ctor(...)100%1212100%
AcquireSendCreditAsync(...)100%11100%
Close(...)100%44100%
CloseReads(...)100%24.232492.68%
CloseWrites(...)100%14.51486.36%
ConsumedSendCredit(...)100%11100%
FillBufferWriterAsync(...)100%11100%
ReceivedDataFrameAsync(...)100%66100%
ReceivedReadsClosedFrame()50%22100%
ReceivedWindowUpdateFrame(...)50%2.5250%
ReceivedWritesClosedFrame()50%22100%
WindowUpdate(...)100%1.02172.72%
WriteStreamFrameAsync(...)100%22100%
WroteLastStreamFrame()100%22100%
ThrowIfConnectionClosed()100%11100%
TrySetReadsClosed()100%11100%
TrySetWritesClosed()100%22100%
TrySetState(...)100%66100%
WriteStreamFrame(...)100%11100%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicStream.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Transports.Internal;
 4using System.Buffers;
 5using System.Diagnostics;
 6using System.IO.Pipelines;
 7using ZeroC.Slice;
 8
 9namespace IceRpc.Transports.Slic.Internal;
 10
 11/// <summary>The stream implementation for Slic.</summary>
 12/// <remarks>The stream implementation implements flow control to ensure data isn't buffered indefinitely if the
 13/// application doesn't consume it.</remarks>
 14internal class SlicStream : IMultiplexedStream
 15{
 16    public ulong Id
 17    {
 18        get
 3597019        {
 3597020            ulong id = Volatile.Read(ref _id);
 3597021            if (id == ulong.MaxValue)
 022            {
 023                throw new InvalidOperationException("The stream ID isn't allocated yet.");
 24            }
 3597025            return id;
 3597026        }
 27
 28        set
 804629        {
 804630            Debug.Assert(_id == ulong.MaxValue);
 804631            Volatile.Write(ref _id, value);
 804632        }
 33    }
 34
 35    public PipeReader Input =>
 2237936        _inputPipeReader ?? throw new InvalidOperationException("A local unidirectional stream has no Input.");
 37
 38    /// <inheritdoc/>
 6861239    public bool IsBidirectional { get; }
 40
 41    /// <inheritdoc/>
 6205442    public bool IsRemote { get; }
 43
 44    /// <inheritdoc/>
 5429245    public bool IsStarted => Volatile.Read(ref _id) != ulong.MaxValue;
 46
 47    public PipeWriter Output =>
 1778648        _outputPipeWriter ?? throw new InvalidOperationException("A remote unidirectional stream has no Output.");
 49
 434350    public Task WritesClosed => _writesClosedTcs.Task;
 51
 1204352    internal int WindowUpdateThreshold => _connection.StreamWindowUpdateThreshold;
 53
 54    private bool _closeReadsOnWritesClosure;
 55    private readonly SlicConnection _connection;
 807656    private ulong _id = ulong.MaxValue;
 57    private readonly SlicPipeReader? _inputPipeReader;
 58    // This mutex protects _writesClosePending, _closeReadsOnWritesClosure.
 807659    private readonly object _mutex = new();
 60    private readonly SlicPipeWriter? _outputPipeWriter;
 61    // FlagEnumExtensions operations are used to update the state. These operations are atomic and don't require mutex
 62    // locking.
 63    private int _state;
 807664    private readonly TaskCompletionSource _writesClosedTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
 65    private bool _writesClosePending;
 66
 807667    internal SlicStream(SlicConnection connection, bool isBidirectional, bool isRemote)
 807668    {
 807669        _connection = connection;
 70
 807671        IsBidirectional = isBidirectional;
 807672        IsRemote = isRemote;
 73
 807674        if (!IsBidirectional)
 553875        {
 553876            if (IsRemote)
 275977            {
 78                // Write-side of remote unidirectional stream is marked as closed.
 275979                TrySetWritesClosed();
 275980            }
 81            else
 277982            {
 83                // Read-side of local unidirectional stream is marked as closed.
 277984                TrySetReadsClosed();
 277985            }
 553886        }
 87
 807688        if (IsRemote || IsBidirectional)
 529789        {
 529790            _inputPipeReader = new SlicPipeReader(this, _connection);
 529791        }
 92
 807693        if (!IsRemote || IsBidirectional)
 531794        {
 531795            _outputPipeWriter = new SlicPipeWriter(this, _connection);
 531796        }
 807697    }
 98
 99    /// <summary>Acquires send credit.</summary>
 100    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 101    /// <returns>The available send credit.</returns>
 102    /// <remarks>This method should be called before sending a <see cref="FrameType.Stream"/> or <see
 103    /// cref="FrameType.StreamLast"/> frame to ensure enough send credit is available. If no send credit is available,
 104    /// it will block until send credit is available. The send credit matches the size of the peer's flow-control
 105    /// window.</remarks>
 106    internal ValueTask<int> AcquireSendCreditAsync(CancellationToken cancellationToken) =>
 20238107        _outputPipeWriter!.AcquireSendCreditAsync(cancellationToken);
 108
 109    /// <summary>Closes the read and write sides of the stream and notifies the stream <see cref="Input" /> and <see
 110    /// cref="Output" /> of the reads and writes closure.</summary>
 111    internal void Close(Exception closeException)
 1312112    {
 1312113        if (TrySetReadsClosed())
 611114        {
 611115            Debug.Assert(_inputPipeReader is not null);
 611116            _inputPipeReader.CompleteReads(closeException);
 611117        }
 1312118        if (TrySetWritesClosed())
 771119        {
 771120            Debug.Assert(_outputPipeWriter is not null);
 771121            _outputPipeWriter.CompleteWrites(closeException);
 771122        }
 1312123    }
 124
 125    /// <summary>Closes the read-side of the stream. It's only called by <see cref="SlicPipeReader.Complete" />, <see
 126    /// cref="SlicPipeReader.TryRead" /> or <see cref="SlicPipeReader.ReadAsync" /> and never called concurrently.
 127    /// </summary>
 128    /// <param name="graceful"><see langword="true" /> if the application consumed all the stream data from the stream
 129    /// <see cref="Input" />; otherwise, <see langword="false" />.</param>
 130    internal void CloseReads(bool graceful)
 8819131    {
 8819132        bool writeReadsClosedFrame = false;
 133
 8819134        lock (_mutex)
 8819135        {
 8819136            if (IsStarted && !_state.HasFlag(State.ReadsClosed))
 4588137            {
 138                // As an optimization, if reads are gracefully closed once the buffered data is consumed but before
 139                // writes are closed, we don't send the StreamReadsClosed frame just yet. Instead, when writes are
 140                // closed, CloseWrites will bundle the sending of the StreamReadsClosed with the sending of the
 141                // StreamLast or StreamWritesClosed frame. This allows to send both frames with a single write on the
 142                // duplex connection.
 4588143                if (graceful &&
 4588144                    IsBidirectional &&
 4588145                    IsRemote &&
 4588146                    !_state.HasFlag(State.WritesClosed) &&
 4588147                    !_writesClosePending)
 862148                {
 862149                    _closeReadsOnWritesClosure = true;
 862150                }
 3726151                else if (!graceful || IsRemote)
 2663152                {
 153                    // If forcefully closed because the input was completed before the data was fully read or if writes
 154                    // are already closed and the stream is a remote stream, we send the StreamReadsClosed frame to
 155                    // notify the peer that reads are closed.
 2663156                    writeReadsClosedFrame = true;
 2663157                }
 4588158            }
 8819159        }
 160
 8819161        if (writeReadsClosedFrame)
 2663162        {
 2663163            if (IsRemote)
 2535164            {
 165                // If it's a remote stream, we close writes before sending the StreamReadsClosed frame to ensure
 166                // _connection._bidirectionalStreamCount or _connection._unidirectionalStreamCount is decreased before
 167                // the peer receives the frame. This is necessary to prevent a race condition where the peer could
 168                // release the connection's bidirectional or unidirectional stream semaphore before this connection's
 169                // stream count is actually decreased.
 2535170                TrySetReadsClosed();
 2535171            }
 172
 173            try
 2663174            {
 2663175                WriteStreamFrame(FrameType.StreamReadsClosed, encode: null, writeReadsClosedFrame: false);
 2663176            }
 0177            catch (IceRpcException)
 0178            {
 179                // Ignore connection failures.
 0180            }
 181
 2663182            if (!IsRemote)
 128183            {
 184                // We can now close reads to permit a new stream to be started. The peer will receive the
 185                // StreamReadsClosed frame before the new stream sends a Stream frame.
 128186                TrySetReadsClosed();
 128187            }
 2663188        }
 189        else
 6156190        {
 6156191            TrySetReadsClosed();
 6156192        }
 8819193    }
 194
 195    /// <summary>Closes the write-side of the stream. It's only called by <see cref="SlicPipeWriter.Complete" /> and
 196    /// never called concurrently.</summary>
 197    /// <param name="graceful"><see langword="true" /> if the application wrote all the stream data on the stream <see
 198    /// cref="Output" />; otherwise, <see langword="false" />.</param>
 199    internal void CloseWrites(bool graceful)
 5261200    {
 5261201        bool writeWritesClosedFrame = false;
 5261202        bool writeReadsClosedFrame = false;
 203
 5261204        lock (_mutex)
 5261205        {
 5261206            if (IsStarted && !_state.HasFlag(State.WritesClosed) && !_writesClosePending)
 1200207            {
 1200208                writeReadsClosedFrame = _closeReadsOnWritesClosure;
 1200209                _writesClosePending = true;
 1200210                writeWritesClosedFrame = true;
 1200211            }
 5261212        }
 213
 5261214        if (writeWritesClosedFrame)
 1200215        {
 1200216            if (IsRemote)
 486217            {
 218                // If it's a remote stream, we close writes before sending the StreamLast or StreamWritesClosed
 219                // frame to ensure _connection._bidirectionalStreamCount or _connection._unidirectionalStreamCount
 220                // is decreased before the peer receives the frame. This is necessary to prevent a race condition
 221                // where the peer could release the connection's bidirectional or unidirectional stream semaphore
 222                // before this connection's stream count is actually decreased.
 486223                TrySetWritesClosed();
 486224            }
 225
 1200226            if (graceful)
 1114227            {
 228                try
 1114229                {
 1114230                    WriteStreamFrame(FrameType.StreamLast, encode: null, writeReadsClosedFrame);
 1114231                }
 0232                catch (IceRpcException)
 0233                {
 234                    // Ignore connection failures.
 0235                }
 236
 237                // If the stream is a local stream, writes are not closed until the StreamReadsClosed frame is
 238                // received from the peer (see ReceivedReadsClosedFrame). This ensures that the connection's
 239                // bidirectional or unidirectional stream semaphore is released only once the peer consumed the
 240                // buffered data.
 1114241            }
 242            else
 86243            {
 244                try
 86245                {
 86246                    WriteStreamFrame(FrameType.StreamWritesClosed, encode: null, writeReadsClosedFrame);
 86247                }
 0248                catch (IceRpcException)
 0249                {
 250                    // Ignore connection failures.
 0251                }
 252
 86253                if (!IsRemote)
 54254                {
 255                    // We can now close writes to allow starting a new stream. Since the sending of frames is
 256                    // serialized over the connection, the peer will receive this StreamWritesClosed frame before
 257                    // a new stream sends a StreamFrame frame.
 54258                    TrySetWritesClosed();
 54259                }
 86260            }
 1200261        }
 262        else
 4061263        {
 4061264            TrySetWritesClosed();
 4061265        }
 5261266    }
 267
 268    /// <summary>Notifies the stream of the amount of data consumed by the connection to send a <see
 269    /// cref="FrameType.Stream" /> or <see cref="FrameType.StreamLast" /> frame.</summary>
 270    /// <param name="size">The size of the stream frame.</param>
 18210271    internal void ConsumedSendCredit(int size) => _outputPipeWriter!.ConsumedSendCredit(size);
 272
 273    /// <summary>Fills the given writer with stream data received on the connection.</summary>
 274    /// <param name="bufferWriter">The destination buffer writer.</param>
 275    /// <param name="byteCount">The amount of stream data to read.</param>
 276    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 277    internal ValueTask FillBufferWriterAsync(
 278        IBufferWriter<byte> bufferWriter,
 279        int byteCount,
 280        CancellationToken cancellationToken) =>
 18914281        _connection.FillBufferWriterAsync(bufferWriter, byteCount, cancellationToken);
 282
 283    /// <summary>Notifies the stream of the reception of a <see cref="FrameType.Stream" /> or <see
 284    /// cref="FrameType.StreamLast" /> frame.</summary>
 285    /// <param name="size">The size of the data carried by the stream frame.</param>
 286    /// <param name="endStream"><see langword="true" /> if the received stream frame is the <see
 287    /// cref="FrameType.StreamLast" /> frame; otherwise, <see langword="false" />.</param>
 288    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 289    internal ValueTask<bool> ReceivedDataFrameAsync(int size, bool endStream, CancellationToken cancellationToken)
 18929290    {
 18929291        Debug.Assert(_inputPipeReader is not null);
 18929292        if (_state.HasFlag(State.ReadsClosed))
 14293        {
 14294            return new(false);
 295        }
 296        else
 18915297        {
 18915298            if (endStream && !IsRemote)
 1063299            {
 300                // For a local stream we can close reads after we have received the StreamLast frame. For remote
 301                // streams reads are closed after the application has consumed all the data.
 1063302                CloseReads(graceful: true);
 1063303            }
 18915304            return _inputPipeReader.ReceivedDataFrameAsync(size, endStream, cancellationToken);
 305        }
 18929306    }
 307
 308    /// <summary>Notifies the stream of the reception of a <see cref="FrameType.StreamReadsClosed" /> frame.</summary>
 309    internal void ReceivedReadsClosedFrame()
 2520310    {
 2520311        TrySetWritesClosed();
 2520312        _outputPipeWriter?.CompleteWrites(exception: null);
 2520313    }
 314
 315    /// <summary>Notifies the stream of the reception of a <see cref="FrameType.StreamWindowUpdate" /> frame.</summary>
 316    /// <param name="frame">The body of the <see cref="FrameType.StreamWindowUpdate" /> frame.</param>
 317    internal void ReceivedWindowUpdateFrame(StreamWindowUpdateBody frame)
 1911318    {
 1911319        if (frame.WindowSizeIncrement > SlicTransportOptions.MaxWindowSize)
 0320        {
 0321            throw new IceRpcException(
 0322                IceRpcError.IceRpcError,
 0323                $"The window update is trying to increase the window size to a value larger than allowed.");
 324        }
 1911325        _outputPipeWriter!.ReceivedWindowUpdateFrame((int)frame.WindowSizeIncrement);
 1911326    }
 327
 328    /// <summary>Notifies the stream of the reception of a <see cref="FrameType.StreamWritesClosed" /> frame.</summary>
 329    internal void ReceivedWritesClosedFrame()
 82330    {
 82331        TrySetReadsClosed();
 332
 333        // Read operations will return a TruncatedData error if the peer closed writes.
 82334        _inputPipeReader?.CompleteReads(new IceRpcException(IceRpcError.TruncatedData));
 82335    }
 336
 337    /// <summary>Notifies the stream of the window update.</summary>
 338    /// <param name="size">The amount of data consumed by the application on the stream <see cref="Input" />.</param>
 339    internal void WindowUpdate(int size)
 1987340    {
 341        try
 1987342        {
 343            // Notify the sender of the window update to permit the sending of additional data.
 1987344            WriteStreamFrame(
 1987345                FrameType.StreamWindowUpdate,
 1987346                new StreamWindowUpdateBody((ulong)size).Encode,
 1987347                writeReadsClosedFrame: false);
 1987348        }
 0349        catch (IceRpcException)
 0350        {
 351            // Ignore connection failures.
 0352        }
 1987353    }
 354
 355    /// <summary>Writes a <see cref="FrameType.Stream" /> or <see cref="FrameType.StreamLast" /> frame on the
 356    /// connection.</summary>
 357    /// <param name="source1">The first stream frame data source.</param>
 358    /// <param name="source2">The second stream frame data source.</param>
 359    /// <param name="endStream"><see langword="true" /> to write a <see cref="FrameType.StreamLast" /> frame; otherwise,
 360    /// <see langword="false" />.</param>
 361    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 362    internal ValueTask<FlushResult> WriteStreamFrameAsync(
 363        ReadOnlySequence<byte> source1,
 364        ReadOnlySequence<byte> source2,
 365        bool endStream,
 366        CancellationToken cancellationToken)
 15592367    {
 15592368        bool writeReadsClosedFrame = false;
 15592369        if (endStream)
 3471370        {
 3471371            lock (_mutex)
 3471372            {
 3471373                writeReadsClosedFrame = _closeReadsOnWritesClosure;
 3471374                _writesClosePending = true;
 3471375            }
 3471376        }
 377
 15592378        return _connection.WriteStreamDataFrameAsync(
 15592379            this,
 15592380            source1,
 15592381            source2,
 15592382            endStream,
 15592383            writeReadsClosedFrame,
 15592384            cancellationToken);
 15592385    }
 386
 387    /// <summary>Notifies the stream that the <see cref="FrameType.StreamLast" /> was written by the
 388    /// connection.</summary>
 389    internal void WroteLastStreamFrame()
 2576390    {
 2576391        if (IsRemote)
 1161392        {
 1161393            TrySetWritesClosed();
 1161394        }
 395        // For local streams, writes will be closed only once the peer sends the StreamReadsClosed frame.
 396
 2576397        _writesClosedTcs.TrySetResult();
 2576398    }
 399
 400    /// <summary>Throws the connection closure exception if the connection is closed.</summary>
 15637401    internal void ThrowIfConnectionClosed() => _connection.ThrowIfClosed();
 402
 12992403    private bool TrySetReadsClosed() => TrySetState(State.ReadsClosed);
 404
 405    private bool TrySetWritesClosed()
 12353406    {
 12353407        if (TrySetState(State.WritesClosed))
 8066408        {
 8066409            _writesClosedTcs.TrySetResult();
 8066410            return true;
 411        }
 412        else
 4287413        {
 4287414            return false;
 415        }
 12353416    }
 417
 418    private bool TrySetState(State state)
 25345419    {
 25345420        if (_state.TrySetFlag(state, out int newState))
 16138421        {
 16138422            if (newState.HasFlag(State.ReadsClosed | State.WritesClosed))
 8066423            {
 424                // The stream reads and writes are closed, it's time to release the stream to either allow creating or
 425                // accepting a new stream.
 8066426                if (IsStarted)
 8046427                {
 8046428                    _connection.ReleaseStream(this);
 8046429                }
 8066430            }
 16138431            return true;
 432        }
 433        else
 9207434        {
 9207435            return false;
 436        }
 25345437    }
 438
 439    private void WriteStreamFrame(FrameType frameType, EncodeAction? encode, bool writeReadsClosedFrame) =>
 5850440        _connection.WriteStreamFrame(stream: this, frameType, encode, writeReadsClosedFrame);
 441
 442    [Flags]
 443    private enum State : int
 444    {
 445        ReadsClosed = 1,
 446        WritesClosed = 2
 447    }
 448}