< Summary

Information
Class: IceRpc.Transports.Slic.Internal.SlicConnection
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicConnection.cs
Tag: 1321_24790053727
Line coverage
90%
Covered lines: 910
Uncovered lines: 99
Coverable lines: 1009
Total lines: 1598
Line coverage: 90.1%
Branch coverage
90%
Covered branches: 304
Total branches: 336
Branch coverage: 90.4%
Method coverage
97%
Covered methods: 45
Fully covered methods: 26
Total methods: 46
Method coverage: 97.8%
Full method coverage: 56.5%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_IsServer()100%11100%
get_MinSegmentSize()100%11100%
get_PeerInitialStreamWindowSize()100%11100%
get_PeerMaxStreamFrameSize()100%11100%
get_Pool()100%11100%
get_InitialStreamWindowSize()100%11100%
get_StreamWindowUpdateThreshold()100%11100%
.ctor(...)100%44100%
AcceptStreamAsync()100%66100%
ConnectAsync(...)75%4484.61%
PerformConnectAsync()100%262695.91%
DecodeInitialize()75%4489.47%
DecodeInitializeAckOrVersion()66.66%6690%
ReadFrameAsync()100%88100%
CloseAsync()100%1414100%
CreateStreamAsync()100%141497.61%
DisposeAsync()100%22100%
PerformDisposeAsync()93.75%161690.47%
SendPing()100%1141.66%
SendReadPing()100%22100%
SendWritePing()0%620%
FillBufferWriterAsync(...)100%11100%
ReleaseStream(...)100%88100%
ThrowIfClosed()100%22100%
WriteConnectionFrame(...)50%2280%
WriteStreamFrame(...)87.5%88100%
WriteStreamDataFrameAsync()94.44%363692.94%
EncodeStreamFrameHeader()100%22100%
AddStream(...)100%66100%
DecodeParameters(...)75%352473.68%
DecodeParamValue()100%1166.66%
EncodeParameters()100%66100%
EncodeParameter()100%11100%
IsUnknownStream(...)100%1212100%
ReadFrameAsync(...)95.65%232394.87%
ReadCloseFrameAsync()100%1313100%
ReadPingFrameAndWritePongFrameAsync()100%11100%
ReadPongFrameAsync()66.66%6686.66%
ReadStreamWindowUpdateFrameAsync()100%22100%
ReadFrameBodyAsync()100%44100%
ReadFrameHeaderAsync()83.33%6681.81%
TryDecodeHeader()81.25%181680.55%
ReadFramesAsync()83.33%6685.41%
ReadStreamDataFrameAsync()80.55%643672.09%
TryClose(...)100%66100%
WriteFrame(...)100%44100%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicConnection.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Internal;
 4using IceRpc.Transports.Internal;
 5using System.Buffers;
 6using System.Collections.Concurrent;
 7using System.Diagnostics;
 8using System.IO.Pipelines;
 9using System.Security.Authentication;
 10using System.Threading.Channels;
 11using ZeroC.Slice.Codec;
 12
 13namespace IceRpc.Transports.Slic.Internal;
 14
 15/// <summary>The Slic connection implements an <see cref="IMultiplexedConnection" /> on top of a <see
 16/// cref="IDuplexConnection" />.</summary>
 17internal class SlicConnection : IMultiplexedConnection
 18{
 19    /// <summary>Gets a value indicating whether or not this is the server-side of the connection.</summary>
 1718620    internal bool IsServer { get; }
 21
 22    /// <summary>Gets the minimum size of the segment requested from <see cref="Pool" />.</summary>
 556223    internal int MinSegmentSize { get; }
 24
 25    /// <summary>Gets the peer's initial stream window size. This property is set to the <see
 26    /// cref="ParameterKey.InitialStreamWindowSize"/> value carried by the <see cref="FrameType.Initialize" />
 27    /// frame.</summary>
 337228    internal int PeerInitialStreamWindowSize { get; private set; }
 29
 30    /// <summary>Gets the maximum size of stream frames accepted by the peer. This property is set to the <see
 31    /// cref="ParameterKey.MaxStreamFrameSize"/> value carried by the <see cref="FrameType.Initialize" />
 32    /// frame.</summary>
 978433    internal int PeerMaxStreamFrameSize { get; private set; }
 34
 35    /// <summary>Gets the <see cref="MemoryPool{T}" /> used for obtaining memory buffers.</summary>
 556236    internal MemoryPool<byte> Pool { get; }
 37
 38    /// <summary>Gets the initial stream window size.</summary>
 1038139    internal int InitialStreamWindowSize { get; }
 40
 41    /// <summary>Gets the window update threshold. When the window size is increased and this threshold reached, a <see
 42    /// cref="FrameType.StreamWindowUpdate" /> frame is sent.</summary>
 699443    internal int StreamWindowUpdateThreshold => InitialStreamWindowSize / StreamWindowUpdateRatio;
 44
 45    // The maximum body size for non-stream frames (Initialize, InitializeAck, Version, Close, Ping, Pong). This
 46    // value is the maximum value that can be encoded as a 2-byte varuint62, which allows WriteFrame to use a 2-byte
 47    // size placeholder. Stream data frames are not subject to this limit; they are gated by per-stream flow control.
 48    private const int MaxControlFrameBodySize = 16_383;
 49
 50    // The ratio used to compute the StreamWindowUpdateThreshold. For now, the stream window update is sent when the
 51    // window size grows over InitialStreamWindowSize / StreamWindowUpdateRatio.
 52    private const int StreamWindowUpdateRatio = 2;
 53
 54    private readonly Channel<IMultiplexedStream> _acceptStreamChannel;
 55    private int _bidirectionalStreamCount;
 56    private SemaphoreSlim? _bidirectionalStreamSemaphore;
 57    private readonly CancellationToken _closedCancellationToken;
 70658    private readonly CancellationTokenSource _closedCts = new();
 59    private string? _closedMessage;
 60    private Task<TransportConnectionInformation>? _connectTask;
 70661    private readonly CancellationTokenSource _disposedCts = new();
 62    private Task? _disposeTask;
 63    private readonly SlicDuplexConnectionDecorator _duplexConnection;
 64    private readonly DuplexConnectionReader _duplexConnectionReader;
 65    private readonly SlicDuplexConnectionWriter _duplexConnectionWriter;
 66    private bool _isClosed;
 67    private ulong? _lastRemoteBidirectionalStreamId;
 68    private ulong? _lastRemoteUnidirectionalStreamId;
 69    private readonly TimeSpan _localIdleTimeout;
 70    private readonly int _maxBidirectionalStreams;
 71    private readonly int _maxStreamFrameSize;
 72    private readonly int _maxUnidirectionalStreams;
 73    // _mutex ensure the assignment of _lastRemoteXxx members and the addition of the stream to _streams is
 74    // an atomic operation.
 70675    private readonly Lock _mutex = new();
 76    private ulong _nextBidirectionalId;
 77    private ulong _nextUnidirectionalId;
 78    private IceRpcError? _peerCloseError;
 70679    private TimeSpan _peerIdleTimeout = Timeout.InfiniteTimeSpan;
 80    private int _pendingPongCount;
 81    private Task? _readFramesTask;
 82
 70683    private readonly ConcurrentDictionary<ulong, SlicStream> _streams = new();
 84    private int _streamSemaphoreWaitCount;
 70685    private readonly TaskCompletionSource _streamSemaphoreWaitClosed =
 70686        new(TaskCreationOptions.RunContinuationsAsynchronously);
 87
 88    private int _unidirectionalStreamCount;
 89    private SemaphoreSlim? _unidirectionalStreamSemaphore;
 90
 91    // This is only set for server connections to ensure that _duplexConnectionWriter.Write is not called after
 92    // _duplexConnectionWriter.Shutdown. This can occur if the client-side of the connection sends the close frame
 93    // followed by the shutdown of the duplex connection and if CloseAsync is called at the same time on the server
 94    // connection.
 95    private bool _writerIsShutdown;
 96
 97    public async ValueTask<IMultiplexedStream> AcceptStreamAsync(CancellationToken cancellationToken)
 239098    {
 99        lock (_mutex)
 2390100        {
 2390101            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 102
 2389103            if (_connectTask is null || !_connectTask.IsCompletedSuccessfully)
 1104            {
 1105                throw new InvalidOperationException("Cannot accept stream before connecting the Slic connection.");
 106            }
 2388107            if (_isClosed)
 9108            {
 9109                throw new IceRpcException(_peerCloseError ?? IceRpcError.ConnectionAborted, _closedMessage);
 110            }
 2379111        }
 112
 113        try
 2379114        {
 2379115            return await _acceptStreamChannel.Reader.ReadAsync(cancellationToken).ConfigureAwait(false);
 116        }
 92117        catch (ChannelClosedException exception)
 92118        {
 92119            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 91120            Debug.Assert(exception.InnerException is not null);
 121            // The exception given to ChannelWriter.Complete(Exception? exception) is the InnerException.
 91122            throw ExceptionUtil.Throw(exception.InnerException);
 123        }
 2033124    }
 125
 126    public Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken)
 687127    {
 128        lock (_mutex)
 687129        {
 687130            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 131
 687132            if (_connectTask is not null)
 1133            {
 1134                throw new InvalidOperationException("Cannot connect twice a Slic connection.");
 135            }
 686136            if (_isClosed)
 0137            {
 0138                throw new InvalidOperationException("Cannot connect a closed Slic connection.");
 139            }
 686140            _connectTask = PerformConnectAsync();
 686141        }
 686142        return _connectTask;
 143
 144        async Task<TransportConnectionInformation> PerformConnectAsync()
 686145        {
 686146            await Task.Yield(); // Exit mutex lock
 147
 148            // Connect the duplex connection.
 149            TransportConnectionInformation transportConnectionInformation;
 686150            TimeSpan peerIdleTimeout = TimeSpan.MaxValue;
 151
 152            try
 686153            {
 686154                transportConnectionInformation = await _duplexConnection.ConnectAsync(cancellationToken)
 686155                    .ConfigureAwait(false);
 156
 157                // Initialize the Slic connection.
 664158                if (IsServer)
 334159                {
 160                    // Read the Initialize frame.
 334161                    (ulong version, InitializeBody? initializeBody) = await ReadFrameAsync(
 334162                        DecodeInitialize,
 334163                        cancellationToken).ConfigureAwait(false);
 164
 327165                    if (initializeBody is null)
 2166                    {
 167                        // Unsupported version, try to negotiate another version by sending a Version frame with the
 168                        // Slic versions supported by this server.
 2169                        ulong[] supportedVersions = new ulong[] { SlicDefinitions.V1 };
 170
 2171                        WriteConnectionFrame(FrameType.Version, new VersionBody(supportedVersions).Encode);
 172
 2173                        (version, initializeBody) = await ReadFrameAsync(
 2174                            (frameType, buffer) =>
 2175                            {
 2176                                if (frameType is null)
 1177                                {
 2178                                    // The client shut down the connection because it doesn't support any of the
 2179                                    // server's supported Slic versions.
 1180                                    throw new IceRpcException(
 1181                                        IceRpcError.ConnectionRefused,
 1182                                        $"The connection was refused because the client Slic version {version} is not su
 2183                                }
 2184                                else
 1185                                {
 1186                                    return DecodeInitialize(frameType, buffer);
 2187                                }
 1188                            },
 2189                            cancellationToken).ConfigureAwait(false);
 1190                    }
 191
 326192                    Debug.Assert(initializeBody is not null);
 193
 326194                    DecodeParameters(initializeBody.Value.Parameters);
 195
 196                    // Write back an InitializeAck frame.
 326197                    WriteConnectionFrame(FrameType.InitializeAck, new InitializeAckBody(EncodeParameters()).Encode);
 326198                }
 199                else
 330200                {
 201                    // Write the Initialize frame.
 330202                    WriteConnectionFrame(
 330203                        FrameType.Initialize,
 330204                        (ref SliceEncoder encoder) =>
 330205                        {
 330206                            encoder.EncodeVarUInt62(SlicDefinitions.V1);
 330207                            new InitializeBody(EncodeParameters()).Encode(ref encoder);
 660208                        });
 209
 210                    // Read and decode the InitializeAck or Version frame.
 330211                    (InitializeAckBody? initializeAckBody, VersionBody? versionBody) = await ReadFrameAsync(
 330212                        DecodeInitializeAckOrVersion,
 330213                        cancellationToken).ConfigureAwait(false);
 214
 306215                    Debug.Assert(initializeAckBody is not null || versionBody is not null);
 216
 306217                    if (initializeAckBody is not null)
 304218                    {
 304219                        DecodeParameters(initializeAckBody.Value.Parameters);
 304220                    }
 221
 306222                    if (versionBody is not null)
 2223                    {
 2224                        if (versionBody.Value.Versions.Contains(SlicDefinitions.V1))
 1225                        {
 1226                            throw new InvalidDataException(
 1227                                "The server supported versions include the version initially requested.");
 228                        }
 229                        else
 1230                        {
 231                            // We only support V1 and the peer rejected V1.
 1232                            throw new IceRpcException(
 1233                                IceRpcError.ConnectionRefused,
 1234                                $"The connection was refused because the server only supports Slic version(s) {string.Jo
 235                        }
 236                    }
 304237                }
 630238            }
 7239            catch (InvalidDataException exception)
 7240            {
 7241                throw new IceRpcException(
 7242                    IceRpcError.IceRpcError,
 7243                    "The connection was aborted by a Slic protocol error.",
 7244                    exception);
 245            }
 25246            catch (OperationCanceledException)
 25247            {
 25248                throw;
 249            }
 4250            catch (AuthenticationException)
 4251            {
 4252                throw;
 253            }
 20254            catch (IceRpcException)
 20255            {
 20256                throw;
 257            }
 0258            catch (Exception exception)
 0259            {
 0260                Debug.Fail($"ConnectAsync failed with an unexpected exception: {exception}");
 0261                throw;
 262            }
 263
 264            // Enable the idle timeout checks after the connection establishment. The Ping frames sent by the keep alive
 265            // check are not expected until the Slic connection initialization completes. The idle timeout check uses
 266            // the smallest idle timeout. Timeout.InfiniteTimeSpan is -1 ms so we can't compare it directly with
 267            // positive timeouts.
 268            TimeSpan idleTimeout;
 630269            if (_localIdleTimeout == Timeout.InfiniteTimeSpan)
 2270            {
 2271                idleTimeout = _peerIdleTimeout;
 2272            }
 628273            else if (_peerIdleTimeout == Timeout.InfiniteTimeSpan)
 19274            {
 19275                idleTimeout = _localIdleTimeout;
 19276            }
 277            else
 609278            {
 609279                idleTimeout = _peerIdleTimeout < _localIdleTimeout ? _peerIdleTimeout : _localIdleTimeout;
 609280            }
 281
 630282            if (idleTimeout != Timeout.InfiniteTimeSpan)
 628283            {
 628284                _duplexConnection.Enable(idleTimeout);
 628285            }
 286
 630287            _readFramesTask = ReadFramesAsync(_disposedCts.Token);
 288
 630289            return transportConnectionInformation;
 630290        }
 291
 292        static (ulong, InitializeBody?) DecodeInitialize(FrameType? frameType, ReadOnlySequence<byte> buffer)
 329293        {
 329294            if (frameType != FrameType.Initialize)
 0295            {
 0296                throw new InvalidDataException($"Received unexpected {frameType} frame.");
 297            }
 298
 329299            return buffer.DecodeSliceBuffer<(ulong, InitializeBody?)>(
 329300                (ref SliceDecoder decoder) =>
 329301                {
 329302                    ulong version = decoder.DecodeVarUInt62();
 328303                    if (version == SlicDefinitions.V1)
 326304                    {
 326305                        return (version, new InitializeBody(ref decoder));
 329306                    }
 329307                    else
 2308                    {
 2309                        decoder.Skip((int)(buffer.Length - decoder.Consumed));
 2310                        return (version, null);
 329311                    }
 657312                });
 328313        }
 314
 315        static (InitializeAckBody?, VersionBody?) DecodeInitializeAckOrVersion(
 316            FrameType? frameType,
 317            ReadOnlySequence<byte> buffer) =>
 308318            frameType switch
 308319            {
 305320                FrameType.InitializeAck => (
 305321                    buffer.DecodeSliceBuffer((ref SliceDecoder decoder) => new InitializeAckBody(ref decoder)),
 305322                    null),
 3323                FrameType.Version => (
 3324                    null,
 6325                    buffer.DecodeSliceBuffer((ref SliceDecoder decoder) => new VersionBody(ref decoder))),
 0326                _ => throw new InvalidDataException($"Received unexpected Slic frame: '{frameType}'."),
 308327            };
 328
 329        async ValueTask<T> ReadFrameAsync<T>(
 330            Func<FrameType?, ReadOnlySequence<byte>, T> decodeFunc,
 331            CancellationToken cancellationToken)
 666332        {
 666333            (FrameType FrameType, int FrameSize, ulong?)? header =
 666334                await ReadFrameHeaderAsync(cancellationToken).ConfigureAwait(false);
 335
 336            ReadOnlySequence<byte> buffer;
 638337            if (header is null || header.Value.FrameSize == 0)
 4338            {
 4339                buffer = ReadOnlySequence<byte>.Empty;
 4340            }
 341            else
 634342            {
 634343                buffer = await _duplexConnectionReader.ReadAtLeastAsync(
 634344                    header.Value.FrameSize,
 634345                    cancellationToken).ConfigureAwait(false);
 634346                if (buffer.Length > header.Value.FrameSize)
 1347                {
 1348                    buffer = buffer.Slice(0, header.Value.FrameSize);
 1349                }
 634350            }
 351
 638352            T decodedFrame = decodeFunc(header?.FrameType, buffer);
 634353            _duplexConnectionReader.AdvanceTo(buffer.End);
 634354            return decodedFrame;
 634355        }
 686356    }
 357
 358    public async Task CloseAsync(MultiplexedConnectionCloseError closeError, CancellationToken cancellationToken)
 110359    {
 360        lock (_mutex)
 110361        {
 110362            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 363
 110364            if (_connectTask is null || !_connectTask.IsCompletedSuccessfully)
 1365            {
 1366                throw new InvalidOperationException("Cannot close a Slic connection before connecting it.");
 367            }
 109368        }
 369
 109370        bool waitForWriterShutdown = false;
 109371        if (TryClose(new IceRpcException(IceRpcError.OperationAborted), "The connection was closed."))
 98372        {
 373            lock (_mutex)
 98374            {
 375                // The duplex connection writer of a server connection might already be shutdown
 376                // (_writerIsShutdown=true) if the client-side sent the Close frame and shut down the duplex connection.
 377                // This doesn't apply to the client-side since the server-side doesn't shutdown the duplex connection
 378                // writer after sending the Close frame.
 98379                if (!IsServer || !_writerIsShutdown)
 98380                {
 98381                    WriteFrame(FrameType.Close, streamId: null, new CloseBody((ulong)closeError).Encode);
 98382                    if (IsServer)
 50383                    {
 50384                        _duplexConnectionWriter.Flush();
 50385                    }
 386                    else
 48387                    {
 388                        // The sending of the client-side Close frame is followed by the shutdown of the duplex
 389                        // connection. For TCP, it's important to always shutdown the connection on the client-side
 390                        // first to avoid TIME_WAIT states on the server-side.
 48391                        _duplexConnectionWriter.Shutdown();
 48392                        waitForWriterShutdown = true;
 48393                    }
 98394                }
 98395            }
 98396        }
 397
 109398        if (waitForWriterShutdown)
 48399        {
 48400            await _duplexConnectionWriter.WriterTask.WaitAsync(cancellationToken).ConfigureAwait(false);
 48401        }
 402
 403        // Now, wait for the peer to close the write side of the connection, which will terminate the read frames task.
 109404        Debug.Assert(_readFramesTask is not null);
 109405        await _readFramesTask.WaitAsync(cancellationToken).ConfigureAwait(false);
 107406    }
 407
 408    public async ValueTask<IMultiplexedStream> CreateStreamAsync(
 409        bool bidirectional,
 410        CancellationToken cancellationToken)
 2096411    {
 412        lock (_mutex)
 2096413        {
 2096414            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 415
 2093416            if (_connectTask is null || !_connectTask.IsCompletedSuccessfully)
 2417            {
 2418                throw new InvalidOperationException("Cannot create stream before connecting the Slic connection.");
 419            }
 2091420            if (_isClosed)
 5421            {
 5422                throw new IceRpcException(_peerCloseError ?? IceRpcError.ConnectionAborted, _closedMessage);
 423            }
 424
 2086425            ++_streamSemaphoreWaitCount;
 2086426        }
 427
 428        try
 2086429        {
 2086430            using var createStreamCts = CancellationTokenSource.CreateLinkedTokenSource(
 2086431                _closedCancellationToken,
 2086432                cancellationToken);
 433
 2086434            SemaphoreSlim? streamCountSemaphore = bidirectional ?
 2086435                _bidirectionalStreamSemaphore :
 2086436                _unidirectionalStreamSemaphore;
 437
 2086438            if (streamCountSemaphore is null)
 1439            {
 440                // The stream semaphore is null if the peer's max streams configuration is 0. In this case, we let
 441                // CreateStreamAsync hang indefinitely until the connection is closed.
 1442                await Task.Delay(-1, createStreamCts.Token).ConfigureAwait(false);
 0443            }
 444            else
 2085445            {
 2085446                await streamCountSemaphore.WaitAsync(createStreamCts.Token).ConfigureAwait(false);
 2076447            }
 448
 2076449            return new SlicStream(this, bidirectional, isRemote: false);
 450        }
 10451        catch (OperationCanceledException)
 10452        {
 10453            cancellationToken.ThrowIfCancellationRequested();
 7454            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 6455            Debug.Assert(_isClosed);
 6456            throw new IceRpcException(_peerCloseError ?? IceRpcError.OperationAborted, _closedMessage);
 457        }
 458        finally
 2086459        {
 460            lock (_mutex)
 2086461            {
 2086462                --_streamSemaphoreWaitCount;
 2086463                if (_isClosed && _streamSemaphoreWaitCount == 0)
 7464                {
 7465                    _streamSemaphoreWaitClosed.SetResult();
 7466                }
 2086467            }
 2086468        }
 2076469    }
 470
 471    public ValueTask DisposeAsync()
 973472    {
 473        lock (_mutex)
 973474        {
 973475            _disposeTask ??= PerformDisposeAsync();
 973476        }
 973477        return new(_disposeTask);
 478
 479        async Task PerformDisposeAsync()
 705480        {
 481            // Make sure we execute the code below without holding the mutex lock.
 705482            await Task.Yield();
 705483            TryClose(new IceRpcException(IceRpcError.OperationAborted), "The connection was disposed.");
 484
 705485            _disposedCts.Cancel();
 486
 487            try
 705488            {
 705489                await Task.WhenAll(
 705490                    _connectTask ?? Task.CompletedTask,
 705491                    _readFramesTask ?? Task.CompletedTask,
 705492                    _streamSemaphoreWaitClosed.Task).ConfigureAwait(false);
 398493            }
 307494            catch
 307495            {
 496                // Expected if any of these tasks failed or was canceled. Each task takes care of handling unexpected
 497                // exceptions so there's no need to handle them here.
 307498            }
 499
 500            // Clean-up the streams that might still be queued on the channel.
 729501            while (_acceptStreamChannel.Reader.TryRead(out IMultiplexedStream? stream))
 24502            {
 24503                if (stream.IsBidirectional)
 5504                {
 5505                    stream.Output.Complete();
 5506                    stream.Input.Complete();
 5507                }
 19508                else if (stream.IsRemote)
 19509                {
 19510                    stream.Input.Complete();
 19511                }
 512                else
 0513                {
 0514                    stream.Output.Complete();
 0515                }
 24516            }
 517
 518            try
 705519            {
 520                // Prevents unobserved task exceptions.
 705521                await _acceptStreamChannel.Reader.Completion.ConfigureAwait(false);
 0522            }
 705523            catch
 705524            {
 705525            }
 526
 705527            await _duplexConnectionWriter.DisposeAsync().ConfigureAwait(false);
 705528            _duplexConnectionReader.Dispose();
 705529            _duplexConnection.Dispose();
 530
 705531            _disposedCts.Dispose();
 705532            _bidirectionalStreamSemaphore?.Dispose();
 705533            _unidirectionalStreamSemaphore?.Dispose();
 705534            _closedCts.Dispose();
 705535        }
 973536    }
 537
 706538    internal SlicConnection(
 706539        IDuplexConnection duplexConnection,
 706540        MultiplexedConnectionOptions options,
 706541        SlicTransportOptions slicOptions,
 706542        bool isServer)
 706543    {
 706544        IsServer = isServer;
 545
 706546        Pool = options.Pool;
 706547        MinSegmentSize = options.MinSegmentSize;
 706548        _maxBidirectionalStreams = options.MaxBidirectionalStreams;
 706549        _maxUnidirectionalStreams = options.MaxUnidirectionalStreams;
 550
 706551        InitialStreamWindowSize = slicOptions.InitialStreamWindowSize;
 706552        _localIdleTimeout = slicOptions.IdleTimeout;
 706553        _maxStreamFrameSize = slicOptions.MaxStreamFrameSize;
 554
 706555        _acceptStreamChannel = Channel.CreateUnbounded<IMultiplexedStream>(new UnboundedChannelOptions
 706556        {
 706557            SingleReader = true,
 706558            SingleWriter = true
 706559        });
 560
 706561        _closedCancellationToken = _closedCts.Token;
 562
 563        // Only the client-side sends pings to keep the connection alive when idle timeout (set later) is not infinite.
 706564        _duplexConnection = IsServer ?
 706565            new SlicDuplexConnectionDecorator(duplexConnection) :
 706566            new SlicDuplexConnectionDecorator(duplexConnection, SendReadPing, SendWritePing);
 567
 706568        _duplexConnectionReader = new DuplexConnectionReader(_duplexConnection, options.Pool, options.MinSegmentSize);
 706569        _duplexConnectionWriter = new SlicDuplexConnectionWriter(
 706570            _duplexConnection,
 706571            options.Pool,
 706572            options.MinSegmentSize);
 573
 574        // We use the same stream ID numbering scheme as QUIC.
 706575        if (IsServer)
 349576        {
 349577            _nextBidirectionalId = 1;
 349578            _nextUnidirectionalId = 3;
 349579        }
 580        else
 357581        {
 357582            _nextBidirectionalId = 0;
 357583            _nextUnidirectionalId = 2;
 357584        }
 585
 586        void SendPing(long payload)
 14587        {
 588            try
 14589            {
 14590                WriteConnectionFrame(FrameType.Ping, new PingBody(payload).Encode);
 14591            }
 0592            catch (IceRpcException)
 0593            {
 594                // Expected if the connection is closed.
 0595            }
 0596            catch (Exception exception)
 0597            {
 0598                Debug.Fail($"The sending of a Ping frame failed with an unexpected exception: {exception}");
 0599                throw;
 600            }
 14601        }
 602
 603        void SendReadPing()
 14604        {
 605            // This local function is no-op if there is already a pending Pong.
 14606            if (Interlocked.CompareExchange(ref _pendingPongCount, 1, 0) == 0)
 14607            {
 14608                SendPing(1L);
 14609            }
 14610        }
 611
 612        void SendWritePing()
 0613        {
 614            // _pendingPongCount can be <= 0 if an unexpected pong is received. If it's the case, the connection is
 615            // being torn down and there's no point in sending a ping frame.
 0616            if (Interlocked.Increment(ref _pendingPongCount) > 0)
 0617            {
 0618                SendPing(0L);
 0619            }
 0620        }
 706621    }
 622
 623    /// <summary>Fills the given writer with stream data received on the connection.</summary>
 624    /// <param name="bufferWriter">The destination buffer writer.</param>
 625    /// <param name="byteCount">The amount of stream data to read.</param>
 626    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 627    internal ValueTask FillBufferWriterAsync(
 628        IBufferWriter<byte> bufferWriter,
 629        int byteCount,
 630        CancellationToken cancellationToken) =>
 9529631        _duplexConnectionReader.FillBufferWriterAsync(bufferWriter, byteCount, cancellationToken);
 632
 633    /// <summary>Releases a stream from the connection. The connection stream count is decremented and if this is a
 634    /// client allow a new stream to be started.</summary>
 635    /// <param name="stream">The released stream.</param>
 636    internal void ReleaseStream(SlicStream stream)
 4118637    {
 4118638        Debug.Assert(stream.IsStarted);
 639
 4118640        _streams.Remove(stream.Id, out SlicStream? _);
 641
 4118642        if (stream.IsRemote)
 2057643        {
 2057644            if (stream.IsBidirectional)
 664645            {
 664646                Interlocked.Decrement(ref _bidirectionalStreamCount);
 664647            }
 648            else
 1393649            {
 1393650                Interlocked.Decrement(ref _unidirectionalStreamCount);
 1393651            }
 2057652        }
 2061653        else if (!_isClosed)
 1707654        {
 1707655            if (stream.IsBidirectional)
 612656            {
 612657                _bidirectionalStreamSemaphore!.Release();
 612658            }
 659            else
 1095660            {
 1095661                _unidirectionalStreamSemaphore!.Release();
 1095662            }
 1707663        }
 4118664    }
 665
 666    /// <summary>Throws the connection closure exception if the connection is already closed.</summary>
 667    internal void ThrowIfClosed()
 7912668    {
 669        lock (_mutex)
 7912670        {
 7912671            if (_isClosed)
 8672            {
 8673                throw new IceRpcException(_peerCloseError ?? IceRpcError.ConnectionAborted, _closedMessage);
 674            }
 7904675        }
 7904676    }
 677
 678    /// <summary>Writes a connection frame.</summary>
 679    /// <param name="frameType">The frame type.</param>
 680    /// <param name="encode">The action to encode the frame.</param>
 681    internal void WriteConnectionFrame(FrameType frameType, EncodeAction? encode)
 686682    {
 686683        Debug.Assert(frameType < FrameType.Stream);
 684
 685        lock (_mutex)
 686686        {
 686687            if (_isClosed)
 0688            {
 0689                throw new IceRpcException(_peerCloseError ?? IceRpcError.ConnectionAborted, _closedMessage);
 690            }
 686691            WriteFrame(frameType, streamId: null, encode);
 686692            _duplexConnectionWriter.Flush();
 686693        }
 686694    }
 695
 696    /// <summary>Writes a stream frame.</summary>
 697    /// <param name="stream">The stream to write the frame for.</param>
 698    /// <param name="frameType">The frame type.</param>
 699    /// <param name="encode">The action to encode the frame.</param>
 700    /// <param name="writeReadsClosedFrame"><see langword="true" /> if a <see cref="FrameType.StreamReadsClosed" />
 701    /// frame should be written after the stream frame.</param>
 702    /// <remarks>This method is called by streams and might be called on a closed connection. The connection might
 703    /// also be closed concurrently while it's in progress.</remarks>
 704    internal void WriteStreamFrame(
 705        SlicStream stream,
 706        FrameType frameType,
 707        EncodeAction? encode,
 708        bool writeReadsClosedFrame)
 3118709    {
 710        // Ensure that this method is called for any FrameType.StreamXxx frame type except FrameType.Stream.
 3118711        Debug.Assert(frameType >= FrameType.StreamLast && stream.IsStarted);
 712
 713        lock (_mutex)
 3118714        {
 3118715            if (_isClosed)
 2716            {
 2717                return;
 718            }
 719
 3116720            WriteFrame(frameType, stream.Id, encode);
 3116721            if (writeReadsClosedFrame)
 83722            {
 83723                WriteFrame(FrameType.StreamReadsClosed, stream.Id, encode: null);
 83724            }
 3116725            if (frameType == FrameType.StreamLast)
 564726            {
 727                // Notify the stream that the last stream frame is considered sent at this point. This will close
 728                // writes on the stream and allow the stream to be released if reads are also closed.
 564729                stream.WroteLastStreamFrame();
 564730            }
 3116731            _duplexConnectionWriter.Flush();
 3116732        }
 3118733    }
 734
 735    /// <summary>Writes a stream data frame.</summary>
 736    /// <param name="stream">The stream to write the frame for.</param>
 737    /// <param name="source1">The first stream frame data source.</param>
 738    /// <param name="source2">The second stream frame data source.</param>
 739    /// <param name="endStream"><see langword="true" /> to write a <see cref="FrameType.StreamLast" /> frame and
 740    /// <see langword="false" /> to write a <see cref="FrameType.Stream" /> frame.</param>
 741    /// <param name="writeReadsClosedFrame"><see langword="true" /> if a <see cref="FrameType.StreamReadsClosed" />
 742    /// frame should be written after the stream frame.</param>
 743    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 744    /// <remarks>This method is called by streams and might be called on a closed connection. The connection might
 745    /// also be closed concurrently while it's in progress.</remarks>
 746    internal async ValueTask<FlushResult> WriteStreamDataFrameAsync(
 747        SlicStream stream,
 748        ReadOnlySequence<byte> source1,
 749        ReadOnlySequence<byte> source2,
 750        bool endStream,
 751        bool writeReadsClosedFrame,
 752        CancellationToken cancellationToken)
 7889753    {
 7889754        Debug.Assert(!source1.IsEmpty || endStream);
 755
 7889756        if (_connectTask is null)
 0757        {
 0758            throw new InvalidOperationException("Cannot send a stream frame before calling ConnectAsync.");
 759        }
 760
 7889761        using var writeCts = CancellationTokenSource.CreateLinkedTokenSource(
 7889762            _closedCancellationToken,
 7889763            cancellationToken);
 764
 765        try
 7889766        {
 767            do
 10165768            {
 769                // Next, ensure send credit is available. If not, this will block until the receiver allows sending
 770                // additional data.
 10165771                int sendCredit = 0;
 10165772                if (!source1.IsEmpty || !source2.IsEmpty)
 10160773                {
 10160774                    sendCredit = await stream.AcquireSendCreditAsync(writeCts.Token).ConfigureAwait(false);
 9147775                    Debug.Assert(sendCredit > 0);
 9147776                }
 777
 778                // Gather data from source1 or source2 up to sendCredit bytes or the peer maximum stream frame size.
 9152779                int sendMaxSize = Math.Min(sendCredit, PeerMaxStreamFrameSize);
 780                ReadOnlySequence<byte> sendSource1;
 781                ReadOnlySequence<byte> sendSource2;
 9152782                if (!source1.IsEmpty)
 8147783                {
 8147784                    int length = Math.Min((int)source1.Length, sendMaxSize);
 8147785                    sendSource1 = source1.Slice(0, length);
 8147786                    source1 = source1.Slice(length);
 8147787                }
 788                else
 1005789                {
 1005790                    sendSource1 = ReadOnlySequence<byte>.Empty;
 1005791                }
 792
 9152793                if (source1.IsEmpty && !source2.IsEmpty)
 2057794                {
 2057795                    int length = Math.Min((int)source2.Length, sendMaxSize - (int)sendSource1.Length);
 2057796                    sendSource2 = source2.Slice(0, length);
 2057797                    source2 = source2.Slice(length);
 2057798                }
 799                else
 7095800                {
 7095801                    sendSource2 = ReadOnlySequence<byte>.Empty;
 7095802                }
 803
 804                // If there's no data left to send and endStream is true, it's the last stream frame.
 9152805                bool lastStreamFrame = endStream && source1.IsEmpty && source2.IsEmpty;
 806
 807                lock (_mutex)
 9152808                {
 9152809                    if (_isClosed)
 0810                    {
 0811                        throw new IceRpcException(_peerCloseError ?? IceRpcError.ConnectionAborted, _closedMessage);
 812                    }
 813
 9152814                    if (!stream.IsStarted)
 2061815                    {
 2061816                        if (stream.IsBidirectional)
 663817                        {
 663818                            AddStream(_nextBidirectionalId, stream);
 663819                            _nextBidirectionalId += 4;
 663820                        }
 821                        else
 1398822                        {
 1398823                            AddStream(_nextUnidirectionalId, stream);
 1398824                            _nextUnidirectionalId += 4;
 1398825                        }
 2061826                    }
 827
 828                    // Notify the stream that we're consuming sendSize credit. It's important to call this before
 829                    // sending the stream frame to avoid race conditions where the StreamWindowUpdate frame could be
 830                    // received before the send credit was updated.
 9152831                    if (sendCredit > 0)
 9147832                    {
 9147833                        stream.ConsumedSendCredit((int)(sendSource1.Length + sendSource2.Length));
 9147834                    }
 835
 9152836                    EncodeStreamFrameHeader(stream.Id, sendSource1.Length + sendSource2.Length, lastStreamFrame);
 837
 9152838                    if (lastStreamFrame)
 791839                    {
 840                        // Notify the stream that the last stream frame is considered sent at this point. This will
 841                        // complete writes on the stream and allow the stream to be released if reads are also
 842                        // completed.
 791843                        stream.WroteLastStreamFrame();
 791844                    }
 845
 846                    // Write and flush the stream frame.
 9152847                    if (!sendSource1.IsEmpty)
 8147848                    {
 8147849                        _duplexConnectionWriter.Write(sendSource1);
 8147850                    }
 9152851                    if (!sendSource2.IsEmpty)
 2057852                    {
 2057853                        _duplexConnectionWriter.Write(sendSource2);
 2057854                    }
 855
 9152856                    if (writeReadsClosedFrame)
 378857                    {
 378858                        WriteFrame(FrameType.StreamReadsClosed, stream.Id, encode: null);
 378859                    }
 9152860                    _duplexConnectionWriter.Flush();
 9152861                }
 9152862            }
 9152863            while (!source1.IsEmpty || !source2.IsEmpty); // Loop until there's no data left to send.
 6876864        }
 1013865        catch (OperationCanceledException)
 1013866        {
 1013867            cancellationToken.ThrowIfCancellationRequested();
 868
 0869            Debug.Assert(_isClosed);
 0870            throw new IceRpcException(_peerCloseError ?? IceRpcError.OperationAborted, _closedMessage);
 871        }
 872
 6876873        return new FlushResult(isCanceled: false, isCompleted: false);
 874
 875        void EncodeStreamFrameHeader(ulong streamId, long size, bool lastStreamFrame)
 9152876        {
 9152877            var encoder = new SliceEncoder(_duplexConnectionWriter);
 9152878            encoder.EncodeFrameType(!lastStreamFrame ? FrameType.Stream : FrameType.StreamLast);
 9152879            Span<byte> sizePlaceholder = encoder.GetPlaceholderSpan(4);
 9152880            int startPos = encoder.EncodedByteCount;
 9152881            encoder.EncodeVarUInt62(streamId);
 9152882            SliceEncoder.EncodeVarUInt62((ulong)(encoder.EncodedByteCount - startPos + size), sizePlaceholder);
 9152883        }
 6876884    }
 885
 886    private void AddStream(ulong id, SlicStream stream)
 4121887    {
 888        lock (_mutex)
 4121889        {
 4121890            if (_isClosed)
 3891            {
 3892                throw new IceRpcException(_peerCloseError ?? IceRpcError.ConnectionAborted, _closedMessage);
 893            }
 894
 4118895            _streams[id] = stream;
 896
 897            // Assign the stream ID within the mutex to ensure that the addition of the stream to the connection and the
 898            // stream ID assignment are atomic.
 4118899            stream.Id = id;
 900
 901            // Keep track of the last assigned stream ID. This is used to figure out if the stream is known or unknown.
 4118902            if (stream.IsRemote)
 2057903            {
 2057904                if (stream.IsBidirectional)
 664905                {
 664906                    _lastRemoteBidirectionalStreamId = id;
 664907                }
 908                else
 1393909                {
 1393910                    _lastRemoteUnidirectionalStreamId = id;
 1393911                }
 2057912            }
 4118913        }
 4118914    }
 915
 916    private void DecodeParameters(IDictionary<ParameterKey, IList<byte>> parameters)
 630917    {
 630918        int? maxStreamFrameSize = null;
 630919        int? peerInitialStreamWindowSize = null;
 7968920        foreach ((ParameterKey key, IList<byte> buffer) in parameters)
 3039921        {
 3039922            switch (key)
 923            {
 924                case ParameterKey.MaxBidirectionalStreams:
 559925                {
 559926                    int value = DecodeParamValue(buffer);
 559927                    if (value > 0)
 559928                    {
 559929                        _bidirectionalStreamSemaphore = new SemaphoreSlim(value, value);
 559930                    }
 559931                    break;
 932                }
 933                case ParameterKey.MaxUnidirectionalStreams:
 611934                {
 611935                    int value = DecodeParamValue(buffer);
 611936                    if (value > 0)
 611937                    {
 611938                        _unidirectionalStreamSemaphore = new SemaphoreSlim(value, value);
 611939                    }
 611940                    break;
 941                }
 942                case ParameterKey.IdleTimeout:
 609943                {
 609944                    _peerIdleTimeout = TimeSpan.FromMilliseconds(DecodeParamValue(buffer));
 609945                    if (_peerIdleTimeout == TimeSpan.Zero)
 0946                    {
 0947                        throw new InvalidDataException(
 0948                            "The IdleTimeout Slic connection parameter is invalid, it must be greater than 0 s.");
 949                    }
 609950                    break;
 951                }
 952                case ParameterKey.MaxStreamFrameSize:
 630953                {
 630954                    maxStreamFrameSize = DecodeParamValue(buffer);
 630955                    if (maxStreamFrameSize < 1024)
 0956                    {
 0957                        throw new InvalidDataException(
 0958                            "The MaxStreamFrameSize connection parameter is invalid, it must be greater than 1 KB.");
 959                    }
 630960                    break;
 961                }
 962                case ParameterKey.InitialStreamWindowSize:
 630963                {
 630964                    peerInitialStreamWindowSize = DecodeParamValue(buffer);
 630965                    if (peerInitialStreamWindowSize < 1024)
 0966                    {
 0967                        throw new InvalidDataException(
 0968                            "The InitialStreamWindowSize connection parameter is invalid, it must be greater than 1 KB."
 969                    }
 630970                    break;
 971                }
 972                // Ignore unsupported parameter.
 973            }
 3039974        }
 975
 630976        if (maxStreamFrameSize is null)
 0977        {
 0978            throw new InvalidDataException(
 0979                "The peer didn't send the required MaxStreamFrameSize connection parameter.");
 980        }
 981        else
 630982        {
 630983            PeerMaxStreamFrameSize = maxStreamFrameSize.Value;
 630984        }
 985
 630986        if (peerInitialStreamWindowSize is null)
 0987        {
 0988            throw new InvalidDataException(
 0989                "The peer didn't send the required InitialStreamWindowSize connection parameter.");
 990        }
 991        else
 630992        {
 630993            PeerInitialStreamWindowSize = peerInitialStreamWindowSize.Value;
 630994        }
 995
 996        // all parameter values are currently integers in the range 0..Int32Max encoded as varuint62.
 997        static int DecodeParamValue(IList<byte> buffer)
 3039998        {
 999            // The IList<byte> decoded by the IceRPC + Slice integration is backed by an array
 30391000            ulong value = new ReadOnlySequence<byte>((byte[])buffer).DecodeSliceBuffer(
 60781001                (ref SliceDecoder decoder) => decoder.DecodeVarUInt62());
 1002            try
 30391003            {
 30391004                return checked((int)value);
 1005            }
 01006            catch (OverflowException exception)
 01007            {
 01008                throw new InvalidDataException("The value is out of the varuint32 accepted range.", exception);
 1009            }
 30391010        }
 6301011    }
 1012
 1013    private Dictionary<ParameterKey, IList<byte>> EncodeParameters()
 6561014    {
 6561015        var parameters = new List<KeyValuePair<ParameterKey, IList<byte>>>
 6561016        {
 6561017            // Required parameters.
 6561018            EncodeParameter(ParameterKey.MaxStreamFrameSize, (ulong)_maxStreamFrameSize),
 6561019            EncodeParameter(ParameterKey.InitialStreamWindowSize, (ulong)InitialStreamWindowSize)
 6561020        };
 1021
 1022        // Optional parameters.
 6561023        if (_localIdleTimeout != Timeout.InfiniteTimeSpan)
 6541024        {
 6541025            parameters.Add(EncodeParameter(ParameterKey.IdleTimeout, (ulong)_localIdleTimeout.TotalMilliseconds));
 6541026        }
 6561027        if (_maxBidirectionalStreams > 0)
 5961028        {
 5961029            parameters.Add(EncodeParameter(ParameterKey.MaxBidirectionalStreams, (ulong)_maxBidirectionalStreams));
 5961030        }
 6561031        if (_maxUnidirectionalStreams > 0)
 6561032        {
 6561033            parameters.Add(EncodeParameter(ParameterKey.MaxUnidirectionalStreams, (ulong)_maxUnidirectionalStreams));
 6561034        }
 1035
 6561036        return new Dictionary<ParameterKey, IList<byte>>(parameters);
 1037
 1038        static KeyValuePair<ParameterKey, IList<byte>> EncodeParameter(ParameterKey key, ulong value)
 32181039        {
 32181040            int sizeLength = SliceEncoder.GetVarUInt62EncodedSize(value);
 32181041            byte[] buffer = new byte[sizeLength];
 32181042            SliceEncoder.EncodeVarUInt62(value, buffer);
 32181043            return new(key, buffer);
 32181044        }
 6561045    }
 1046
 1047    private bool IsUnknownStream(ulong streamId)
 51041048    {
 51041049        bool isRemote = streamId % 2 == (IsServer ? 0ul : 1ul);
 51041050        bool isBidirectional = streamId % 4 < 2;
 51041051        if (isRemote)
 27131052        {
 27131053            if (isBidirectional)
 12791054            {
 12791055                return _lastRemoteBidirectionalStreamId is null || streamId > _lastRemoteBidirectionalStreamId;
 1056            }
 1057            else
 14341058            {
 14341059                return _lastRemoteUnidirectionalStreamId is null || streamId > _lastRemoteUnidirectionalStreamId;
 1060            }
 1061        }
 1062        else
 23911063        {
 23911064            if (isBidirectional)
 12291065            {
 12291066                return streamId >= _nextBidirectionalId;
 1067            }
 1068            else
 11621069            {
 11621070                return streamId >= _nextUnidirectionalId;
 1071            }
 1072        }
 51041073    }
 1074
 1075    private Task ReadFrameAsync(FrameType frameType, int size, ulong? streamId, CancellationToken cancellationToken)
 127411076    {
 127411077        if (frameType >= FrameType.Stream && streamId is null)
 01078        {
 01079            throw new InvalidDataException("Received stream frame without stream ID.");
 1080        }
 1081
 127411082        switch (frameType)
 1083        {
 1084            case FrameType.Close:
 1001085            {
 1001086                return ReadCloseFrameAsync(size, cancellationToken);
 1087            }
 1088            case FrameType.Ping:
 161089            {
 161090                return ReadPingFrameAndWritePongFrameAsync(size, cancellationToken);
 1091            }
 1092            case FrameType.Pong:
 171093            {
 171094                return ReadPongFrameAsync(size, cancellationToken);
 1095            }
 1096            case FrameType.Stream:
 1097            case FrameType.StreamLast:
 96211098            {
 96211099                return ReadStreamDataFrameAsync(frameType, size, streamId!.Value, cancellationToken);
 1100            }
 1101            case FrameType.StreamWindowUpdate:
 11661102            {
 11661103                if (IsUnknownStream(streamId!.Value))
 11104                {
 11105                    throw new InvalidDataException($"Received {frameType} frame for unknown stream.");
 1106                }
 1107
 11651108                return ReadStreamWindowUpdateFrameAsync(size, streamId!.Value, cancellationToken);
 1109            }
 1110            case FrameType.StreamReadsClosed:
 1111            case FrameType.StreamWritesClosed:
 18181112            {
 18181113                if (size > 0)
 21114                {
 21115                    throw new InvalidDataException($"Unexpected body for {frameType} frame.");
 1116                }
 18161117                if (IsUnknownStream(streamId!.Value))
 21118                {
 21119                    throw new InvalidDataException($"Received {frameType} frame for unknown stream.");
 1120                }
 1121
 18141122                if (_streams.TryGetValue(streamId.Value, out SlicStream? stream))
 13501123                {
 13501124                    if (frameType == FrameType.StreamWritesClosed)
 401125                    {
 401126                        stream.ReceivedWritesClosedFrame();
 401127                    }
 1128                    else
 13101129                    {
 13101130                        stream.ReceivedReadsClosedFrame();
 13101131                    }
 13501132                }
 18141133                return Task.CompletedTask;
 1134            }
 1135            default:
 31136            {
 31137                throw new InvalidDataException($"Received unexpected {frameType} frame.");
 1138            }
 1139        }
 1140
 1141        async Task ReadCloseFrameAsync(int size, CancellationToken cancellationToken)
 1001142        {
 1001143            CloseBody closeBody = await ReadFrameBodyAsync(
 1001144                FrameType.Close,
 1001145                size,
 991146                (ref SliceDecoder decoder) => new CloseBody(ref decoder),
 1001147                cancellationToken).ConfigureAwait(false);
 1148
 981149            IceRpcError? peerCloseError = closeBody.ApplicationErrorCode switch
 981150            {
 761151                (ulong)MultiplexedConnectionCloseError.NoError => IceRpcError.ConnectionClosedByPeer,
 41152                (ulong)MultiplexedConnectionCloseError.Refused => IceRpcError.ConnectionRefused,
 81153                (ulong)MultiplexedConnectionCloseError.ServerBusy => IceRpcError.ServerBusy,
 51154                (ulong)MultiplexedConnectionCloseError.Aborted => IceRpcError.ConnectionAborted,
 51155                _ => null
 981156            };
 1157
 1158            bool notAlreadyClosed;
 981159            if (peerCloseError is null)
 51160            {
 51161                notAlreadyClosed = TryClose(
 51162                    new IceRpcException(IceRpcError.ConnectionAborted),
 51163                    $"The connection was closed by the peer with an unknown application error code: '{closeBody.Applicat
 51164                    IceRpcError.ConnectionAborted);
 51165            }
 1166            else
 931167            {
 931168                notAlreadyClosed = TryClose(
 931169                    new IceRpcException(peerCloseError.Value),
 931170                    "The connection was closed by the peer.",
 931171                    peerCloseError);
 931172            }
 1173
 1174            // The server-side of the duplex connection is only shutdown once the client-side is shutdown. When using
 1175            // TCP, this ensures that the server TCP connection won't end-up in the TIME_WAIT state on the server-side.
 981176            if (notAlreadyClosed && !IsServer)
 251177            {
 1178                // DisposeAsync waits for the reads frames task to complete before disposing the writer.
 1179                lock (_mutex)
 251180                {
 251181                    _duplexConnectionWriter.Shutdown();
 251182                }
 251183                await _duplexConnectionWriter.WriterTask.WaitAsync(cancellationToken).ConfigureAwait(false);
 251184            }
 981185        }
 1186
 1187        async Task ReadPingFrameAndWritePongFrameAsync(int size, CancellationToken cancellationToken)
 161188        {
 1189            // Read the ping frame.
 161190            PingBody pingBody = await ReadFrameBodyAsync(
 161191                FrameType.Ping,
 161192                size,
 151193                (ref SliceDecoder decoder) => new PingBody(ref decoder),
 161194                cancellationToken).ConfigureAwait(false);
 1195
 1196            // Return a pong frame with the ping payload.
 141197            WriteConnectionFrame(FrameType.Pong, new PongBody(pingBody.Payload).Encode);
 141198        }
 1199
 1200        async Task ReadPongFrameAsync(int size, CancellationToken cancellationToken)
 171201        {
 171202            if (Interlocked.Decrement(ref _pendingPongCount) >= 0)
 141203            {
 1204                // Ensure the pong frame payload value is expected.
 1205
 141206                PongBody pongBody = await ReadFrameBodyAsync(
 141207                    FrameType.Pong,
 141208                    size,
 141209                    (ref SliceDecoder decoder) => new PongBody(ref decoder),
 141210                    cancellationToken).ConfigureAwait(false);
 1211
 1212                // For now, we only send a 0 or 1 payload value (0 for "write ping" and 1 for "read ping").
 141213                if (pongBody.Payload != 0L && pongBody.Payload != 1L)
 01214                {
 01215                    throw new InvalidDataException($"Received {nameof(FrameType.Pong)} with unexpected payload.");
 1216                }
 141217            }
 1218            else
 31219            {
 1220                // If not waiting for a pong frame, this pong frame is unexpected.
 31221                throw new InvalidDataException($"Received unexpected {nameof(FrameType.Pong)} frame.");
 1222            }
 141223        }
 1224
 1225        async Task ReadStreamWindowUpdateFrameAsync(int size, ulong streamId, CancellationToken cancellationToken)
 11651226        {
 11651227            StreamWindowUpdateBody frame = await ReadFrameBodyAsync(
 11651228                FrameType.StreamWindowUpdate,
 11651229                size,
 11651230                (ref SliceDecoder decoder) => new StreamWindowUpdateBody(ref decoder),
 11651231                cancellationToken).ConfigureAwait(false);
 11651232            if (_streams.TryGetValue(streamId, out SlicStream? stream))
 11231233            {
 11231234                stream.ReceivedWindowUpdateFrame(frame);
 11231235            }
 11651236        }
 1237
 1238        async Task<T> ReadFrameBodyAsync<T>(
 1239            FrameType frameType,
 1240            int size,
 1241            DecodeFunc<T> decodeFunc,
 1242            CancellationToken cancellationToken)
 12951243        {
 12951244            if (size <= 0)
 21245            {
 21246                throw new InvalidDataException($"Unexpected empty body for {frameType} frame.");
 1247            }
 1248
 12931249            ReadOnlySequence<byte> buffer = await _duplexConnectionReader.ReadAtLeastAsync(size, cancellationToken)
 12931250                .ConfigureAwait(false);
 1251
 12931252            if (buffer.Length > size)
 7371253            {
 7371254                buffer = buffer.Slice(0, size);
 7371255            }
 1256
 12931257            T decodedFrame = buffer.DecodeSliceBuffer(decodeFunc);
 12911258            _duplexConnectionReader.AdvanceTo(buffer.End);
 12911259            return decodedFrame;
 12911260        }
 127331261    }
 1262
 1263    private async ValueTask<(FrameType FrameType, int FrameSize, ulong? StreamId)?> ReadFrameHeaderAsync(
 1264        CancellationToken cancellationToken)
 140201265    {
 140201266        while (true)
 140201267        {
 1268            // Read data from the pipe reader.
 140201269            if (!_duplexConnectionReader.TryRead(out ReadOnlySequence<byte> buffer))
 94651270            {
 94651271                buffer = await _duplexConnectionReader.ReadAsync(cancellationToken).ConfigureAwait(false);
 89711272            }
 1273
 135261274            if (buffer.IsEmpty)
 1421275            {
 1421276                return null;
 1277            }
 1278
 133841279            if (TryDecodeHeader(
 133841280                buffer,
 133841281                out (FrameType FrameType, int FrameSize, ulong? StreamId) header,
 133841282                out int consumed))
 133781283            {
 133781284                _duplexConnectionReader.AdvanceTo(buffer.GetPosition(consumed));
 133781285                return header;
 1286            }
 1287            else
 01288            {
 01289                _duplexConnectionReader.AdvanceTo(buffer.Start, buffer.End);
 01290            }
 01291        }
 1292
 1293        static bool TryDecodeHeader(
 1294            ReadOnlySequence<byte> buffer,
 1295            out (FrameType FrameType, int FrameSize, ulong? StreamId) header,
 1296            out int consumed)
 133841297        {
 133841298            header = default;
 133841299            consumed = default;
 1300
 133841301            var decoder = new SliceDecoder(buffer);
 1302
 1303            // Decode the frame type and frame size.
 133841304            if (!decoder.TryDecodeUInt8(out byte frameType) || !decoder.TryDecodeVarUInt62(out ulong frameSize))
 01305            {
 01306                return false;
 1307            }
 1308
 133841309            header.FrameType = frameType.AsFrameType();
 1310            try
 133811311            {
 133811312                header.FrameSize = checked((int)frameSize);
 133811313            }
 01314            catch (OverflowException exception)
 01315            {
 01316                throw new InvalidDataException("The frame size can't be larger than int.MaxValue.", exception);
 1317            }
 1318
 1319            // Reject oversized control frame bodies before any buffering occurs.
 133811320            if (header.FrameType < FrameType.Stream && header.FrameSize > MaxControlFrameBodySize)
 11321            {
 11322                throw new InvalidDataException(
 11323                    $"The {header.FrameType} frame body size ({header.FrameSize}) exceeds the maximum allowed size ({Max
 1324            }
 1325
 1326            // If it's a stream frame, try to decode the stream ID
 133801327            if (header.FrameType >= FrameType.Stream)
 126071328            {
 126071329                if (header.FrameSize == 0)
 11330                {
 11331                    throw new InvalidDataException("Invalid stream frame size.");
 1332                }
 1333
 126061334                consumed = (int)decoder.Consumed;
 126061335                if (!decoder.TryDecodeVarUInt62(out ulong streamId))
 01336                {
 01337                    return false;
 1338                }
 126061339                header.StreamId = streamId;
 126061340                header.FrameSize -= (int)decoder.Consumed - consumed;
 1341
 126061342                if (header.FrameSize < 0)
 11343                {
 11344                    throw new InvalidDataException("Invalid stream frame size.");
 1345                }
 126051346            }
 1347
 133781348            consumed = (int)decoder.Consumed;
 133781349            return true;
 133781350        }
 135201351    }
 1352
 1353    private async Task ReadFramesAsync(CancellationToken cancellationToken)
 6301354    {
 1355        try
 6301356        {
 133541357            while (true)
 133541358            {
 133541359                (FrameType Type, int Size, ulong? StreamId)? header = await ReadFrameHeaderAsync(cancellationToken)
 133541360                    .ConfigureAwait(false);
 1361
 128821362                if (header is null)
 1411363                {
 1364                    lock (_mutex)
 1411365                    {
 1411366                        if (!_isClosed)
 01367                        {
 1368                            // Unexpected duplex connection shutdown.
 01369                            throw new IceRpcException(IceRpcError.ConnectionAborted);
 1370                        }
 1411371                    }
 1372                    // The peer has shut down the duplex connection.
 1411373                    break;
 1374                }
 1375
 127411376                await ReadFrameAsync(header.Value.Type, header.Value.Size, header.Value.StreamId, cancellationToken)
 127411377                    .ConfigureAwait(false);
 127241378            }
 1379
 1411380            if (IsServer)
 721381            {
 721382                Debug.Assert(_isClosed);
 1383
 1384                // The server-side of the duplex connection is only shutdown once the client-side is shutdown. When
 1385                // using TCP, this ensures that the server TCP connection won't end-up in the TIME_WAIT state on the
 1386                // server-side.
 1387
 1388                // DisposeAsync waits for the reads frames task to complete before disposing the writer.
 1389                lock (_mutex)
 721390                {
 721391                    _duplexConnectionWriter.Shutdown();
 1392
 1393                    // Make sure that CloseAsync doesn't call Write on the writer if it's called shortly after the peer
 1394                    // shutdown its side of the connection (which triggers ReadFrameHeaderAsync to return null).
 721395                    _writerIsShutdown = true;
 721396                }
 1397
 721398                await _duplexConnectionWriter.WriterTask.WaitAsync(cancellationToken).ConfigureAwait(false);
 711399            }
 1401400        }
 2381401        catch (OperationCanceledException)
 2381402        {
 1403            // Expected, DisposeAsync was called.
 2381404        }
 2331405        catch (IceRpcException exception)
 2331406        {
 2331407            TryClose(exception, "The connection was lost.", IceRpcError.ConnectionAborted);
 2331408            throw;
 1409        }
 191410        catch (InvalidDataException exception)
 191411        {
 191412            var rpcException = new IceRpcException(
 191413                IceRpcError.IceRpcError,
 191414                "The connection was aborted by a Slic protocol error.",
 191415                exception);
 191416            TryClose(rpcException, rpcException.Message, IceRpcError.IceRpcError);
 191417            throw rpcException;
 1418        }
 01419        catch (Exception exception)
 01420        {
 01421            Debug.Fail($"The read frames task completed due to an unhandled exception: {exception}");
 01422            TryClose(exception, "The connection was lost.", IceRpcError.ConnectionAborted);
 01423            throw;
 1424        }
 3781425    }
 1426
 1427    private async Task ReadStreamDataFrameAsync(
 1428        FrameType type,
 1429        int size,
 1430        ulong streamId,
 1431        CancellationToken cancellationToken)
 96211432    {
 96211433        bool endStream = type == FrameType.StreamLast;
 96211434        bool isRemote = streamId % 2 == (IsServer ? 0ul : 1ul);
 96211435        bool isBidirectional = streamId % 4 < 2;
 1436
 96211437        if (!isBidirectional && !isRemote)
 01438        {
 01439            throw new InvalidDataException(
 01440                "Received unexpected stream frame on local unidirectional stream.");
 1441        }
 96211442        else if (size == 0 && !endStream)
 11443        {
 11444            throw new InvalidDataException($"Received invalid {nameof(FrameType.Stream)} frame.");
 1445        }
 1446
 96201447        if (!_streams.TryGetValue(streamId, out SlicStream? stream) && isRemote && IsUnknownStream(streamId))
 20601448        {
 1449            // Create a new remote stream.
 1450
 20601451            if (size == 0)
 01452            {
 01453                throw new InvalidDataException("Received empty stream frame on new stream.");
 1454            }
 1455
 20601456            if (isBidirectional)
 6641457            {
 6641458                if (streamId > _lastRemoteBidirectionalStreamId + 4)
 01459                {
 01460                    throw new InvalidDataException("Invalid stream ID.");
 1461                }
 1462
 6641463                if (_bidirectionalStreamCount == _maxBidirectionalStreams)
 01464                {
 01465                    throw new IceRpcException(
 01466                        IceRpcError.IceRpcError,
 01467                        $"The maximum bidirectional stream count {_maxBidirectionalStreams} was reached.");
 1468                }
 6641469                Interlocked.Increment(ref _bidirectionalStreamCount);
 6641470            }
 1471            else
 13961472            {
 13961473                if (streamId > _lastRemoteUnidirectionalStreamId + 4)
 01474                {
 01475                    throw new InvalidDataException("Invalid stream ID.");
 1476                }
 1477
 13961478                if (_unidirectionalStreamCount == _maxUnidirectionalStreams)
 01479                {
 01480                    throw new IceRpcException(
 01481                        IceRpcError.IceRpcError,
 01482                        $"The maximum unidirectional stream count {_maxUnidirectionalStreams} was reached.");
 1483                }
 13961484                Interlocked.Increment(ref _unidirectionalStreamCount);
 13961485            }
 1486
 1487            // The stream is registered with the connection and queued on the channel. The caller of AcceptStreamAsync
 1488            // is responsible for cleaning up the stream.
 20601489            stream = new SlicStream(this, isBidirectional, isRemote: true);
 1490
 1491            try
 20601492            {
 20601493                AddStream(streamId, stream);
 1494
 1495                try
 20571496                {
 20571497                    await _acceptStreamChannel.Writer.WriteAsync(
 20571498                        stream,
 20571499                        cancellationToken).ConfigureAwait(false);
 20571500                }
 01501                catch (ChannelClosedException exception)
 01502                {
 1503                    // The exception given to ChannelWriter.Complete(Exception? exception) is the InnerException.
 01504                    Debug.Assert(exception.InnerException is not null);
 01505                    throw ExceptionUtil.Throw(exception.InnerException);
 1506                }
 20571507            }
 31508            catch (IceRpcException)
 31509            {
 1510                // The two methods above throw IceRpcException if the connection has been closed (either by CloseAsync
 1511                // or because the close frame was received). We cleanup up the stream but don't throw to not abort the
 1512                // reading. The connection graceful closure still needs to read on the connection to figure out when the
 1513                // peer shuts down the duplex connection.
 31514                Debug.Assert(_isClosed);
 31515                stream.Input.Complete();
 31516                if (isBidirectional)
 01517                {
 01518                    stream.Output.Complete();
 01519                }
 31520            }
 20601521        }
 1522
 96201523        bool isDataConsumed = false;
 96201524        if (stream is not null)
 95471525        {
 1526            // Let the stream consume the stream frame data.
 95471527            isDataConsumed = await stream.ReceivedDataFrameAsync(
 95471528                size,
 95471529                endStream,
 95471530                cancellationToken).ConfigureAwait(false);
 95471531        }
 1532
 96201533        if (!isDataConsumed)
 911534        {
 1535            // The stream (if any) didn't consume the data. Read and ignore the data using a helper pipe.
 911536            var pipe = new Pipe(
 911537                new PipeOptions(
 911538                    pool: Pool,
 911539                    pauseWriterThreshold: 0,
 911540                    minimumSegmentSize: MinSegmentSize,
 911541                    useSynchronizationContext: false));
 1542
 911543            await _duplexConnectionReader.FillBufferWriterAsync(
 911544                    pipe.Writer,
 911545                    size,
 911546                    cancellationToken).ConfigureAwait(false);
 1547
 901548            pipe.Writer.Complete();
 901549            pipe.Reader.Complete();
 901550        }
 96191551    }
 1552
 1553    private bool TryClose(Exception exception, string closeMessage, IceRpcError? peerCloseError = null)
 11641554    {
 1555        lock (_mutex)
 11641556        {
 11641557            if (_isClosed)
 4591558            {
 4591559                return false;
 1560            }
 7051561            _isClosed = true;
 7051562            _closedMessage = closeMessage;
 7051563            _peerCloseError = peerCloseError;
 7051564            if (_streamSemaphoreWaitCount == 0)
 6981565            {
 6981566                _streamSemaphoreWaitClosed.SetResult();
 6981567            }
 7051568        }
 1569
 1570        // Cancel pending CreateStreamAsync, AcceptStreamAsync and WriteStreamDataFrameAsync operations.
 7051571        _closedCts.Cancel();
 7051572        _acceptStreamChannel.Writer.TryComplete(exception);
 1573
 1574        // Close streams.
 33871575        foreach (SlicStream stream in _streams.Values)
 6361576        {
 6361577            stream.Close(exception);
 6361578        }
 1579
 7051580        return true;
 11641581    }
 1582
 1583    private void WriteFrame(FrameType frameType, ulong? streamId, EncodeAction? encode)
 43611584    {
 43611585        var encoder = new SliceEncoder(_duplexConnectionWriter);
 43611586        encoder.EncodeFrameType(frameType);
 1587        // 2 bytes is sufficient: control frame bodies are limited to MaxControlFrameBodySize (16,383) and the
 1588        // stream frames encoded by WriteFrame carry at most a stream ID + a small body (e.g., StreamWindowUpdate).
 43611589        Span<byte> sizePlaceholder = encoder.GetPlaceholderSpan(2);
 43611590        int startPos = encoder.EncodedByteCount;
 43611591        if (streamId is not null)
 35771592        {
 35771593            encoder.EncodeVarUInt62(streamId.Value);
 35771594        }
 43611595        encode?.Invoke(ref encoder);
 43611596        SliceEncoder.EncodeVarUInt62((ulong)(encoder.EncodedByteCount - startPos), sizePlaceholder);
 43611597    }
 1598}

Methods/Properties

get_IsServer()
get_MinSegmentSize()
get_PeerInitialStreamWindowSize()
get_PeerMaxStreamFrameSize()
get_Pool()
get_InitialStreamWindowSize()
get_StreamWindowUpdateThreshold()
.ctor(IceRpc.Transports.IDuplexConnection,IceRpc.Transports.MultiplexedConnectionOptions,IceRpc.Transports.Slic.SlicTransportOptions,System.Boolean)
AcceptStreamAsync()
ConnectAsync(System.Threading.CancellationToken)
PerformConnectAsync()
DecodeInitialize()
DecodeInitializeAckOrVersion()
ReadFrameAsync()
CloseAsync()
CreateStreamAsync()
DisposeAsync()
PerformDisposeAsync()
SendPing()
SendReadPing()
SendWritePing()
FillBufferWriterAsync(System.Buffers.IBufferWriter`1<System.Byte>,System.Int32,System.Threading.CancellationToken)
ReleaseStream(IceRpc.Transports.Slic.Internal.SlicStream)
ThrowIfClosed()
WriteConnectionFrame(IceRpc.Transports.Slic.Internal.FrameType,ZeroC.Slice.Codec.EncodeAction)
WriteStreamFrame(IceRpc.Transports.Slic.Internal.SlicStream,IceRpc.Transports.Slic.Internal.FrameType,ZeroC.Slice.Codec.EncodeAction,System.Boolean)
WriteStreamDataFrameAsync()
EncodeStreamFrameHeader()
AddStream(System.UInt64,IceRpc.Transports.Slic.Internal.SlicStream)
DecodeParameters(System.Collections.Generic.IDictionary`2<IceRpc.Transports.Slic.Internal.ParameterKey,System.Collections.Generic.IList`1<System.Byte>>)
DecodeParamValue()
EncodeParameters()
EncodeParameter()
IsUnknownStream(System.UInt64)
ReadFrameAsync(IceRpc.Transports.Slic.Internal.FrameType,System.Int32,System.Nullable`1<System.UInt64>,System.Threading.CancellationToken)
ReadCloseFrameAsync()
ReadPingFrameAndWritePongFrameAsync()
ReadPongFrameAsync()
ReadStreamWindowUpdateFrameAsync()
ReadFrameBodyAsync()
ReadFrameHeaderAsync()
TryDecodeHeader()
ReadFramesAsync()
ReadStreamDataFrameAsync()
TryClose(System.Exception,System.String,System.Nullable`1<IceRpc.IceRpcError>)
WriteFrame(IceRpc.Transports.Slic.Internal.FrameType,System.Nullable`1<System.UInt64>,ZeroC.Slice.Codec.EncodeAction)