< Summary

Information
Class: IceRpc.Transports.Slic.Internal.SlicStream
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicStream.cs
Tag: 276_17717543480
Line coverage
92%
Covered lines: 216
Uncovered lines: 18
Coverable lines: 234
Total lines: 448
Line coverage: 92.3%
Branch coverage
92%
Covered branches: 78
Total branches: 84
Branch coverage: 92.8%
Method coverage
100%
Covered methods: 28
Total methods: 28
Method coverage: 100%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_Id()50%2.09271.42%
set_Id(...)100%11100%
get_Input()50%22100%
get_IsBidirectional()100%11100%
get_IsRemote()100%11100%
get_IsStarted()100%11100%
get_Output()50%22100%
get_WritesClosed()100%11100%
get_WindowUpdateThreshold()100%11100%
.ctor(...)100%1212100%
AcquireSendCreditAsync(...)100%11100%
Close(...)100%44100%
CloseReads(...)100%24.232492.68%
CloseWrites(...)100%14.51486.36%
ConsumedSendCredit(...)100%11100%
FillBufferWriterAsync(...)100%11100%
ReceivedDataFrameAsync(...)100%66100%
ReceivedReadsClosedFrame()50%22100%
ReceivedWindowUpdateFrame(...)50%2.5250%
ReceivedWritesClosedFrame()50%22100%
WindowUpdate(...)100%1.02172.72%
WriteStreamFrameAsync(...)100%22100%
WroteLastStreamFrame()100%22100%
ThrowIfConnectionClosed()100%11100%
TrySetReadsClosed()100%11100%
TrySetWritesClosed()100%22100%
TrySetState(...)100%66100%
WriteStreamFrame(...)100%11100%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicStream.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Transports.Internal;
 4using System.Buffers;
 5using System.Diagnostics;
 6using System.IO.Pipelines;
 7using ZeroC.Slice;
 8
 9namespace IceRpc.Transports.Slic.Internal;
 10
 11/// <summary>The stream implementation for Slic.</summary>
 12/// <remarks>The stream implementation implements flow control to ensure data isn't buffered indefinitely if the
 13/// application doesn't consume it.</remarks>
 14internal class SlicStream : IMultiplexedStream
 15{
 16    public ulong Id
 17    {
 18        get
 3607519        {
 3607520            ulong id = Volatile.Read(ref _id);
 3607521            if (id == ulong.MaxValue)
 022            {
 023                throw new InvalidOperationException("The stream ID isn't allocated yet.");
 24            }
 3607525            return id;
 3607526        }
 27
 28        set
 804229        {
 804230            Debug.Assert(_id == ulong.MaxValue);
 804231            Volatile.Write(ref _id, value);
 804232        }
 33    }
 34
 35    public PipeReader Input =>
 2385536        _inputPipeReader ?? throw new InvalidOperationException("A local unidirectional stream has no Input.");
 37
 38    /// <inheritdoc/>
 6870239    public bool IsBidirectional { get; }
 40
 41    /// <inheritdoc/>
 6212042    public bool IsRemote { get; }
 43
 44    /// <inheritdoc/>
 5439245    public bool IsStarted => Volatile.Read(ref _id) != ulong.MaxValue;
 46
 47    public PipeWriter Output =>
 1778248        _outputPipeWriter ?? throw new InvalidOperationException("A remote unidirectional stream has no Output.");
 49
 435150    public Task WritesClosed => _writesClosedTcs.Task;
 51
 1284252    internal int WindowUpdateThreshold => _connection.StreamWindowUpdateThreshold;
 53
 54    private bool _closeReadsOnWritesClosure;
 55    private readonly SlicConnection _connection;
 807256    private ulong _id = ulong.MaxValue;
 57    private readonly SlicPipeReader? _inputPipeReader;
 58    // This mutex protects _writesClosePending, _closeReadsOnWritesClosure.
 807259    private readonly object _mutex = new();
 60    private readonly SlicPipeWriter? _outputPipeWriter;
 61    // FlagEnumExtensions operations are used to update the state. These operations are atomic and don't require mutex
 62    // locking.
 63    private int _state;
 807264    private readonly TaskCompletionSource _writesClosedTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
 65    private bool _writesClosePending;
 66
 807267    internal SlicStream(SlicConnection connection, bool isBidirectional, bool isRemote)
 807268    {
 807269        _connection = connection;
 70
 807271        IsBidirectional = isBidirectional;
 807272        IsRemote = isRemote;
 73
 807274        if (!IsBidirectional)
 553075        {
 553076            if (IsRemote)
 275677            {
 78                // Write-side of remote unidirectional stream is marked as closed.
 275679                TrySetWritesClosed();
 275680            }
 81            else
 277482            {
 83                // Read-side of local unidirectional stream is marked as closed.
 277484                TrySetReadsClosed();
 277485            }
 553086        }
 87
 807288        if (IsRemote || IsBidirectional)
 529889        {
 529890            _inputPipeReader = new SlicPipeReader(this, _connection);
 529891        }
 92
 807293        if (!IsRemote || IsBidirectional)
 531694        {
 531695            _outputPipeWriter = new SlicPipeWriter(this, _connection);
 531696        }
 807297    }
 98
 99    /// <summary>Acquires send credit.</summary>
 100    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 101    /// <returns>The available send credit.</returns>
 102    /// <remarks>This method should be called before sending a <see cref="FrameType.Stream"/> or <see
 103    /// cref="FrameType.StreamLast"/> frame to ensure enough send credit is available. If no send credit is available,
 104    /// it will block until send credit is available. The send credit matches the size of the peer's flow-control
 105    /// window.</remarks>
 106    internal ValueTask<int> AcquireSendCreditAsync(CancellationToken cancellationToken) =>
 20176107        _outputPipeWriter!.AcquireSendCreditAsync(cancellationToken);
 108
 109    /// <summary>Closes the read and write sides of the stream and notifies the stream <see cref="Input" /> and <see
 110    /// cref="Output" /> of the reads and writes closure.</summary>
 111    internal void Close(Exception closeException)
 1231112    {
 1231113        if (TrySetReadsClosed())
 593114        {
 593115            Debug.Assert(_inputPipeReader is not null);
 593116            _inputPipeReader.CompleteReads(closeException);
 593117        }
 1231118        if (TrySetWritesClosed())
 699119        {
 699120            Debug.Assert(_outputPipeWriter is not null);
 699121            _outputPipeWriter.CompleteWrites(closeException);
 699122        }
 1231123    }
 124
 125    /// <summary>Closes the read-side of the stream. It's only called by <see cref="SlicPipeReader.Complete" />, <see
 126    /// cref="SlicPipeReader.TryRead" /> or <see cref="SlicPipeReader.ReadAsync" /> and never called concurrently.
 127    /// </summary>
 128    /// <param name="graceful"><see langword="true" /> if the application consumed all the stream data from the stream
 129    /// <see cref="Input" />; otherwise, <see langword="false" />.</param>
 130    internal void CloseReads(bool graceful)
 8811131    {
 8811132        bool writeReadsClosedFrame = false;
 133
 8811134        lock (_mutex)
 8811135        {
 8811136            if (IsStarted && !_state.HasFlag(State.ReadsClosed))
 4611137            {
 138                // As an optimization, if reads are gracefully closed once the buffered data is consumed but before
 139                // writes are closed, we don't send the StreamReadsClosed frame just yet. Instead, when writes are
 140                // closed, CloseWrites will bundle the sending of the StreamReadsClosed with the sending of the
 141                // StreamLast or StreamWritesClosed frame. This allows to send both frames with a single write on the
 142                // duplex connection.
 4611143                if (graceful &&
 4611144                    IsBidirectional &&
 4611145                    IsRemote &&
 4611146                    !_state.HasFlag(State.WritesClosed) &&
 4611147                    !_writesClosePending)
 857148                {
 857149                    _closeReadsOnWritesClosure = true;
 857150                }
 3754151                else if (!graceful || IsRemote)
 2688152                {
 153                    // If forcefully closed because the input was completed before the data was fully read or if writes
 154                    // are already closed and the stream is a remote stream, we send the StreamReadsClosed frame to
 155                    // notify the peer that reads are closed.
 2688156                    writeReadsClosedFrame = true;
 2688157                }
 4611158            }
 8811159        }
 160
 8811161        if (writeReadsClosedFrame)
 2688162        {
 2688163            if (IsRemote)
 2558164            {
 165                // If it's a remote stream, we close writes before sending the StreamReadsClosed frame to ensure
 166                // _connection._bidirectionalStreamCount or _connection._unidirectionalStreamCount is decreased before
 167                // the peer receives the frame. This is necessary to prevent a race condition where the peer could
 168                // release the connection's bidirectional or unidirectional stream semaphore before this connection's
 169                // stream count is actually decreased.
 2558170                TrySetReadsClosed();
 2558171            }
 172
 173            try
 2688174            {
 2688175                WriteStreamFrame(FrameType.StreamReadsClosed, encode: null, writeReadsClosedFrame: false);
 2688176            }
 0177            catch (IceRpcException)
 0178            {
 179                // Ignore connection failures.
 0180            }
 181
 2688182            if (!IsRemote)
 130183            {
 184                // We can now close reads to permit a new stream to be started. The peer will receive the
 185                // StreamReadsClosed frame before the new stream sends a Stream frame.
 130186                TrySetReadsClosed();
 130187            }
 2688188        }
 189        else
 6123190        {
 6123191            TrySetReadsClosed();
 6123192        }
 8811193    }
 194
 195    /// <summary>Closes the write-side of the stream. It's only called by <see cref="SlicPipeWriter.Complete" /> and
 196    /// never called concurrently.</summary>
 197    /// <param name="graceful"><see langword="true" /> if the application wrote all the stream data on the stream <see
 198    /// cref="Output" />; otherwise, <see langword="false" />.</param>
 199    internal void CloseWrites(bool graceful)
 5260200    {
 5260201        bool writeWritesClosedFrame = false;
 5260202        bool writeReadsClosedFrame = false;
 203
 5260204        lock (_mutex)
 5260205        {
 5260206            if (IsStarted && !_state.HasFlag(State.WritesClosed) && !_writesClosePending)
 1194207            {
 1194208                writeReadsClosedFrame = _closeReadsOnWritesClosure;
 1194209                _writesClosePending = true;
 1194210                writeWritesClosedFrame = true;
 1194211            }
 5260212        }
 213
 5260214        if (writeWritesClosedFrame)
 1194215        {
 1194216            if (IsRemote)
 479217            {
 218                // If it's a remote stream, we close writes before sending the StreamLast or StreamWritesClosed
 219                // frame to ensure _connection._bidirectionalStreamCount or _connection._unidirectionalStreamCount
 220                // is decreased before the peer receives the frame. This is necessary to prevent a race condition
 221                // where the peer could release the connection's bidirectional or unidirectional stream semaphore
 222                // before this connection's stream count is actually decreased.
 479223                TrySetWritesClosed();
 479224            }
 225
 1194226            if (graceful)
 1111227            {
 228                try
 1111229                {
 1111230                    WriteStreamFrame(FrameType.StreamLast, encode: null, writeReadsClosedFrame);
 1111231                }
 0232                catch (IceRpcException)
 0233                {
 234                    // Ignore connection failures.
 0235                }
 236
 237                // If the stream is a local stream, writes are not closed until the StreamReadsClosed frame is
 238                // received from the peer (see ReceivedReadsClosedFrame). This ensures that the connection's
 239                // bidirectional or unidirectional stream semaphore is released only once the peer consumed the
 240                // buffered data.
 1111241            }
 242            else
 83243            {
 244                try
 83245                {
 83246                    WriteStreamFrame(FrameType.StreamWritesClosed, encode: null, writeReadsClosedFrame);
 83247                }
 0248                catch (IceRpcException)
 0249                {
 250                    // Ignore connection failures.
 0251                }
 252
 83253                if (!IsRemote)
 53254                {
 255                    // We can now close writes to allow starting a new stream. Since the sending of frames is
 256                    // serialized over the connection, the peer will receive this StreamWritesClosed frame before
 257                    // a new stream sends a StreamFrame frame.
 53258                    TrySetWritesClosed();
 53259                }
 83260            }
 1194261        }
 262        else
 4066263        {
 4066264            TrySetWritesClosed();
 4066265        }
 5260266    }
 267
 268    /// <summary>Notifies the stream of the amount of data consumed by the connection to send a <see
 269    /// cref="FrameType.Stream" /> or <see cref="FrameType.StreamLast" /> frame.</summary>
 270    /// <param name="size">The size of the stream frame.</param>
 18149271    internal void ConsumedSendCredit(int size) => _outputPipeWriter!.ConsumedSendCredit(size);
 272
 273    /// <summary>Fills the given writer with stream data received on the connection.</summary>
 274    /// <param name="bufferWriter">The destination buffer writer.</param>
 275    /// <param name="byteCount">The amount of stream data to read.</param>
 276    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 277    internal ValueTask FillBufferWriterAsync(
 278        IBufferWriter<byte> bufferWriter,
 279        int byteCount,
 280        CancellationToken cancellationToken) =>
 18882281        _connection.FillBufferWriterAsync(bufferWriter, byteCount, cancellationToken);
 282
 283    /// <summary>Notifies the stream of the reception of a <see cref="FrameType.Stream" /> or <see
 284    /// cref="FrameType.StreamLast" /> frame.</summary>
 285    /// <param name="size">The size of the data carried by the stream frame.</param>
 286    /// <param name="endStream"><see langword="true" /> if the received stream frame is the <see
 287    /// cref="FrameType.StreamLast" /> frame; otherwise, <see langword="false" />.</param>
 288    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 289    internal ValueTask<bool> ReceivedDataFrameAsync(int size, bool endStream, CancellationToken cancellationToken)
 18913290    {
 18913291        Debug.Assert(_inputPipeReader is not null);
 18913292        if (_state.HasFlag(State.ReadsClosed))
 30293        {
 30294            return new(false);
 295        }
 296        else
 18883297        {
 18883298            if (endStream && !IsRemote)
 1066299            {
 300                // For a local stream we can close reads after we have received the StreamLast frame. For remote
 301                // streams reads are closed after the application has consumed all the data.
 1066302                CloseReads(graceful: true);
 1066303            }
 18883304            return _inputPipeReader.ReceivedDataFrameAsync(size, endStream, cancellationToken);
 305        }
 18913306    }
 307
 308    /// <summary>Notifies the stream of the reception of a <see cref="FrameType.StreamReadsClosed" /> frame.</summary>
 309    internal void ReceivedReadsClosedFrame()
 2591310    {
 2591311        TrySetWritesClosed();
 2591312        _outputPipeWriter?.CompleteWrites(exception: null);
 2591313    }
 314
 315    /// <summary>Notifies the stream of the reception of a <see cref="FrameType.StreamWindowUpdate" /> frame.</summary>
 316    /// <param name="frame">The body of the <see cref="FrameType.StreamWindowUpdate" /> frame.</param>
 317    internal void ReceivedWindowUpdateFrame(StreamWindowUpdateBody frame)
 2064318    {
 2064319        if (frame.WindowSizeIncrement > SlicTransportOptions.MaxWindowSize)
 0320        {
 0321            throw new IceRpcException(
 0322                IceRpcError.IceRpcError,
 0323                $"The window update is trying to increase the window size to a value larger than allowed.");
 324        }
 2064325        _outputPipeWriter!.ReceivedWindowUpdateFrame((int)frame.WindowSizeIncrement);
 2064326    }
 327
 328    /// <summary>Notifies the stream of the reception of a <see cref="FrameType.StreamWritesClosed" /> frame.</summary>
 329    internal void ReceivedWritesClosedFrame()
 80330    {
 80331        TrySetReadsClosed();
 332
 333        // Read operations will return a TruncatedData error if the peer closed writes.
 80334        _inputPipeReader?.CompleteReads(new IceRpcException(IceRpcError.TruncatedData));
 80335    }
 336
 337    /// <summary>Notifies the stream of the window update.</summary>
 338    /// <param name="size">The amount of data consumed by the application on the stream <see cref="Input" />.</param>
 339    internal void WindowUpdate(int size)
 2146340    {
 341        try
 2146342        {
 343            // Notify the sender of the window update to permit the sending of additional data.
 2146344            WriteStreamFrame(
 2146345                FrameType.StreamWindowUpdate,
 2146346                new StreamWindowUpdateBody((ulong)size).Encode,
 2146347                writeReadsClosedFrame: false);
 2146348        }
 0349        catch (IceRpcException)
 0350        {
 351            // Ignore connection failures.
 0352        }
 2146353    }
 354
 355    /// <summary>Writes a <see cref="FrameType.Stream" /> or <see cref="FrameType.StreamLast" /> frame on the
 356    /// connection.</summary>
 357    /// <param name="source1">The first stream frame data source.</param>
 358    /// <param name="source2">The second stream frame data source.</param>
 359    /// <param name="endStream"><see langword="true" /> to write a <see cref="FrameType.StreamLast" /> frame; otherwise,
 360    /// <see langword="false" />.</param>
 361    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 362    internal ValueTask<FlushResult> WriteStreamFrameAsync(
 363        ReadOnlySequence<byte> source1,
 364        ReadOnlySequence<byte> source2,
 365        bool endStream,
 366        CancellationToken cancellationToken)
 15595367    {
 15595368        bool writeReadsClosedFrame = false;
 15595369        if (endStream)
 3478370        {
 3478371            lock (_mutex)
 3478372            {
 3478373                writeReadsClosedFrame = _closeReadsOnWritesClosure;
 3478374                _writesClosePending = true;
 3478375            }
 3478376        }
 377
 15595378        return _connection.WriteStreamDataFrameAsync(
 15595379            this,
 15595380            source1,
 15595381            source2,
 15595382            endStream,
 15595383            writeReadsClosedFrame,
 15595384            cancellationToken);
 15595385    }
 386
 387    /// <summary>Notifies the stream that the <see cref="FrameType.StreamLast" /> was written by the
 388    /// connection.</summary>
 389    internal void WroteLastStreamFrame()
 2579390    {
 2579391        if (IsRemote)
 1160392        {
 1160393            TrySetWritesClosed();
 1160394        }
 395        // For local streams, writes will be closed only once the peer sends the StreamReadsClosed frame.
 396
 2579397        _writesClosedTcs.TrySetResult();
 2579398    }
 399
 400    /// <summary>Throws the connection closure exception if the connection is closed.</summary>
 15638401    internal void ThrowIfConnectionClosed() => _connection.ThrowIfClosed();
 402
 12896403    private bool TrySetReadsClosed() => TrySetState(State.ReadsClosed);
 404
 405    private bool TrySetWritesClosed()
 12336406    {
 12336407        if (TrySetState(State.WritesClosed))
 8062408        {
 8062409            _writesClosedTcs.TrySetResult();
 8062410            return true;
 411        }
 412        else
 4274413        {
 4274414            return false;
 415        }
 12336416    }
 417
 418    private bool TrySetState(State state)
 25232419    {
 25232420        if (_state.TrySetFlag(state, out int newState))
 16130421        {
 16130422            if (newState.HasFlag(State.ReadsClosed | State.WritesClosed))
 8062423            {
 424                // The stream reads and writes are closed, it's time to release the stream to either allow creating or
 425                // accepting a new stream.
 8062426                if (IsStarted)
 8042427                {
 8042428                    _connection.ReleaseStream(this);
 8042429                }
 8062430            }
 16130431            return true;
 432        }
 433        else
 9102434        {
 9102435            return false;
 436        }
 25232437    }
 438
 439    private void WriteStreamFrame(FrameType frameType, EncodeAction? encode, bool writeReadsClosedFrame) =>
 6028440        _connection.WriteStreamFrame(stream: this, frameType, encode, writeReadsClosedFrame);
 441
 442    [Flags]
 443    private enum State : int
 444    {
 445        ReadsClosed = 1,
 446        WritesClosed = 2
 447    }
 448}