< Summary

Information
Class: IceRpc.Transports.Slic.Internal.SlicConnection
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicConnection.cs
Tag: 275_13775359185
Line coverage
89%
Covered lines: 916
Uncovered lines: 102
Coverable lines: 1018
Total lines: 1578
Line coverage: 89.9%
Branch coverage
90%
Covered branches: 297
Total branches: 330
Branch coverage: 90%
Method coverage
97%
Covered methods: 45
Total methods: 46
Method coverage: 97.8%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_IsServer()100%11100%
get_MinSegmentSize()100%11100%
get_PeerInitialStreamWindowSize()100%11100%
get_PeerMaxStreamFrameSize()100%11100%
get_Pool()100%11100%
get_InitialStreamWindowSize()100%11100%
get_StreamWindowUpdateThreshold()100%11100%
.ctor(...)100%44100%
AcceptStreamAsync()100%66100%
ConnectAsync(...)75%4.05485.71%
PerformConnectAsync()100%24.052495.5%
DecodeInitialize()75%4.02490%
DecodeInitializeAckOrVersion()66.66%6.01692.85%
ReadFrameAsync()100%88100%
CloseAsync()100%1414100%
CreateStreamAsync()100%141497.72%
DisposeAsync()100%22100%
PerformDisposeAsync()93.75%16.221690.47%
SendPing()100%1.04166.66%
SendReadPing()100%22100%
SendWritePing()0%620%
FillBufferWriterAsync(...)100%11100%
ReleaseStream(...)100%88100%
ThrowIfClosed()100%22100%
WriteConnectionFrame(...)100%22100%
WriteStreamFrame(...)87.5%88100%
WriteStreamDataFrameAsync()94.44%36.443693.02%
EncodeStreamFrameHeader()100%22100%
AddStream(...)83.33%6.04690%
DecodeParameters(...)75%34.52473.68%
DecodeParamValue()100%1.03170%
EncodeParameters()100%66100%
EncodeParameter()100%11100%
IsUnknownStream(...)100%1212100%
ReadFrameAsync(...)95.65%23.072394.87%
ReadCloseFrameAsync()100%1313100%
ReadPingFrameAndWritePongFrameAsync()100%11100%
ReadPongFrameAsync()66.66%6.09686.66%
ReadStreamWindowUpdateFrameAsync()100%22100%
ReadFrameBodyAsync()100%44100%
ReadFrameHeaderAsync()83.33%6.22681.81%
TryDecodeHeader()75%13.511278.12%
ReadFramesAsync()83.33%6.1686%
ReadStreamDataFrameAsync()77.77%91.043665.11%
TryClose(...)100%66100%
WriteFrame(...)100%44100%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicConnection.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Internal;
 4using IceRpc.Transports.Internal;
 5using System.Buffers;
 6using System.Collections.Concurrent;
 7using System.Diagnostics;
 8using System.IO.Pipelines;
 9using System.Security.Authentication;
 10using System.Threading.Channels;
 11using ZeroC.Slice;
 12
 13namespace IceRpc.Transports.Slic.Internal;
 14
 15/// <summary>The Slic connection implements an <see cref="IMultiplexedConnection" /> on top of a <see
 16/// cref="IDuplexConnection" />.</summary>
 17internal class SlicConnection : IMultiplexedConnection
 18{
 19    /// <summary>Gets a value indicating whether or not this is the server-side of the connection.</summary>
 3344020    internal bool IsServer { get; }
 21
 22    /// <summary>Gets the minimum size of the segment requested from <see cref="Pool" />.</summary>
 1081423    internal int MinSegmentSize { get; }
 24
 25    /// <summary>Gets the peer's initial stream window size. This property is set to the <see
 26    /// cref="ParameterKey.InitialStreamWindowSize"/> value carried by the <see cref="FrameType.Initialize" />
 27    /// frame.</summary>
 654528    internal int PeerInitialStreamWindowSize { get; private set; }
 29
 30    /// <summary>Gets the maximum size of stream frames accepted by the peer. This property is set to the <see
 31    /// cref="ParameterKey.MaxStreamFrameSize"/> value carried by the <see cref="FrameType.Initialize" />
 32    /// frame.</summary>
 1944833    internal int PeerMaxStreamFrameSize { get; private set; }
 34
 35    /// <summary>Gets the <see cref="MemoryPool{T}" /> used for obtaining memory buffers.</summary>
 1081436    internal MemoryPool<byte> Pool { get; }
 37
 38    /// <summary>Gets the initial stream window size.</summary>
 1861539    internal int InitialStreamWindowSize { get; }
 40
 41    /// <summary>Gets the window update threshold. When the window size is increased and this threshold reached, a <see
 42    /// cref="FrameType.StreamWindowUpdate" /> frame is sent.</summary>
 1204343    internal int StreamWindowUpdateThreshold => InitialStreamWindowSize / StreamWindowUpdateRatio;
 44
 45    // The ratio used to compute the StreamWindowUpdateThreshold. For now, the stream window update is sent when the
 46    // window size grows over InitialStreamWindowSize / StreamWindowUpdateRatio.
 47    private const int StreamWindowUpdateRatio = 2;
 48
 49    private readonly Channel<IMultiplexedStream> _acceptStreamChannel;
 50    private int _bidirectionalStreamCount;
 51    private SemaphoreSlim? _bidirectionalStreamSemaphore;
 52    private readonly CancellationToken _closedCancellationToken;
 137353    private readonly CancellationTokenSource _closedCts = new();
 54    private string? _closedMessage;
 55    private Task<TransportConnectionInformation>? _connectTask;
 137356    private readonly CancellationTokenSource _disposedCts = new();
 57    private Task? _disposeTask;
 58    private readonly SlicDuplexConnectionDecorator _duplexConnection;
 59    private readonly DuplexConnectionReader _duplexConnectionReader;
 60    private readonly SlicDuplexConnectionWriter _duplexConnectionWriter;
 61    private bool _isClosed;
 62    private ulong? _lastRemoteBidirectionalStreamId;
 63    private ulong? _lastRemoteUnidirectionalStreamId;
 64    private readonly TimeSpan _localIdleTimeout;
 65    private readonly int _maxBidirectionalStreams;
 66    private readonly int _maxStreamFrameSize;
 67    private readonly int _maxUnidirectionalStreams;
 68    // _mutex ensure the assignment of _lastRemoteXxx members and the addition of the stream to _streams is
 69    // an atomic operation.
 137370    private readonly object _mutex = new();
 71    private ulong _nextBidirectionalId;
 72    private ulong _nextUnidirectionalId;
 73    private IceRpcError? _peerCloseError;
 137374    private TimeSpan _peerIdleTimeout = Timeout.InfiniteTimeSpan;
 75    private int _pendingPongCount;
 76    private Task? _readFramesTask;
 77
 137378    private readonly ConcurrentDictionary<ulong, SlicStream> _streams = new();
 79    private int _streamSemaphoreWaitCount;
 137380    private readonly TaskCompletionSource _streamSemaphoreWaitClosed =
 137381        new(TaskCreationOptions.RunContinuationsAsynchronously);
 82
 83    private int _unidirectionalStreamCount;
 84    private SemaphoreSlim? _unidirectionalStreamSemaphore;
 85
 86    // This is only set for server connections to ensure that _duplexConnectionWriter.Write is not called after
 87    // _duplexConnectionWriter.Shutdown. This can occur if the client-side of the connection sends the close frame
 88    // followed by the shutdown of the duplex connection and if CloseAsync is called at the same time on the server
 89    // connection.
 90    private bool _writerIsShutdown;
 91
 92    public async ValueTask<IMultiplexedStream> AcceptStreamAsync(CancellationToken cancellationToken)
 464593    {
 464594        lock (_mutex)
 464595        {
 464596            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 97
 464398            if (_connectTask is null || !_connectTask.IsCompletedSuccessfully)
 299            {
 2100                throw new InvalidOperationException("Cannot accept stream before connecting the Slic connection.");
 101            }
 4641102            if (_isClosed)
 15103            {
 15104                throw new IceRpcException(_peerCloseError ?? IceRpcError.ConnectionAborted, _closedMessage);
 105            }
 4626106        }
 107
 108        try
 4626109        {
 4626110            return await _acceptStreamChannel.Reader.ReadAsync(cancellationToken).ConfigureAwait(false);
 111        }
 232112        catch (ChannelClosedException exception)
 232113        {
 232114            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 231115            Debug.Assert(exception.InnerException is not null);
 116            // The exception given to ChannelWriter.Complete(Exception? exception) is the InnerException.
 231117            throw ExceptionUtil.Throw(exception.InnerException);
 118        }
 3968119    }
 120
 121    public Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken)
 1335122    {
 1335123        lock (_mutex)
 1335124        {
 1335125            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 126
 1335127            if (_connectTask is not null)
 2128            {
 2129                throw new InvalidOperationException("Cannot connect twice a Slic connection.");
 130            }
 1333131            if (_isClosed)
 0132            {
 0133                throw new InvalidOperationException("Cannot connect a closed Slic connection.");
 134            }
 1333135            _connectTask = PerformConnectAsync();
 1333136        }
 1333137        return _connectTask;
 138
 139        async Task<TransportConnectionInformation> PerformConnectAsync()
 1333140        {
 1333141            await Task.Yield(); // Exit mutex lock
 142
 143            // Connect the duplex connection.
 144            TransportConnectionInformation transportConnectionInformation;
 1333145            TimeSpan peerIdleTimeout = TimeSpan.MaxValue;
 146
 147            try
 1333148            {
 1333149                transportConnectionInformation = await _duplexConnection.ConnectAsync(cancellationToken)
 1333150                    .ConfigureAwait(false);
 151
 152                // Initialize the Slic connection.
 1289153                if (IsServer)
 648154                {
 155                    // Read the Initialize frame.
 648156                    (ulong version, InitializeBody? initializeBody) = await ReadFrameAsync(
 648157                        DecodeInitialize,
 648158                        cancellationToken).ConfigureAwait(false);
 159
 636160                    if (initializeBody is null)
 4161                    {
 162                        // Unsupported version, try to negotiate another version by sending a Version frame with the
 163                        // Slic versions supported by this server.
 4164                        ulong[] supportedVersions = new ulong[] { SlicDefinitions.V1 };
 165
 4166                        WriteConnectionFrame(FrameType.Version, new VersionBody(supportedVersions).Encode);
 167
 4168                        (version, initializeBody) = await ReadFrameAsync(
 4169                            (frameType, buffer) =>
 4170                            {
 4171                                if (frameType is null)
 2172                                {
 4173                                    // The client shut down the connection because it doesn't support any of the
 4174                                    // server's supported Slic versions.
 2175                                    throw new IceRpcException(
 2176                                        IceRpcError.ConnectionRefused,
 2177                                        $"The connection was refused because the client Slic version {version} is not su
 4178                                }
 4179                                else
 2180                                {
 2181                                    return DecodeInitialize(frameType, buffer);
 4182                                }
 2183                            },
 4184                            cancellationToken).ConfigureAwait(false);
 2185                    }
 186
 634187                    Debug.Assert(initializeBody is not null);
 188
 634189                    DecodeParameters(initializeBody.Value.Parameters);
 190
 191                    // Write back an InitializeAck frame.
 634192                    WriteConnectionFrame(FrameType.InitializeAck, new InitializeAckBody(EncodeParameters()).Encode);
 634193                }
 194                else
 641195                {
 196                    // Write the Initialize frame.
 641197                    WriteConnectionFrame(
 641198                        FrameType.Initialize,
 641199                        (ref SliceEncoder encoder) =>
 641200                        {
 641201                            encoder.EncodeVarUInt62(SlicDefinitions.V1);
 641202                            new InitializeBody(EncodeParameters()).Encode(ref encoder);
 1282203                        });
 204
 205                    // Read and decode the InitializeAck or Version frame.
 641206                    (InitializeAckBody? initializeAckBody, VersionBody? versionBody) = await ReadFrameAsync(
 641207                        DecodeInitializeAckOrVersion,
 641208                        cancellationToken).ConfigureAwait(false);
 209
 594210                    Debug.Assert(initializeAckBody is not null || versionBody is not null);
 211
 594212                    if (initializeAckBody is not null)
 590213                    {
 590214                        DecodeParameters(initializeAckBody.Value.Parameters);
 590215                    }
 216
 594217                    if (versionBody is not null)
 4218                    {
 4219                        if (versionBody.Value.Versions.Contains(SlicDefinitions.V1))
 2220                        {
 2221                            throw new InvalidDataException(
 2222                                "The server supported versions include the version initially requested.");
 223                        }
 224                        else
 2225                        {
 226                            // We only support V1 and the peer rejected V1.
 2227                            throw new IceRpcException(
 2228                                IceRpcError.ConnectionRefused,
 2229                                $"The connection was refused because the server only supports Slic version(s) {string.Jo
 230                        }
 231                    }
 590232                }
 1224233            }
 12234            catch (InvalidDataException exception)
 12235            {
 12236                throw new IceRpcException(
 12237                    IceRpcError.IceRpcError,
 12238                    "The connection was aborted by a Slic protocol error.",
 12239                    exception);
 240            }
 49241            catch (OperationCanceledException)
 49242            {
 49243                throw;
 244            }
 8245            catch (AuthenticationException)
 8246            {
 8247                throw;
 248            }
 40249            catch (IceRpcException)
 40250            {
 40251                throw;
 252            }
 0253            catch (Exception exception)
 0254            {
 0255                Debug.Fail($"ConnectAsync failed with an unexpected exception: {exception}");
 0256                throw;
 257            }
 258
 259            // Enable the idle timeout checks after the connection establishment. The Ping frames sent by the keep alive
 260            // check are not expected until the Slic connection initialization completes. The idle timeout check uses
 261            // the smallest idle timeout.
 1224262            TimeSpan idleTimeout = _peerIdleTimeout == Timeout.InfiniteTimeSpan ? _localIdleTimeout :
 1224263                (_peerIdleTimeout < _localIdleTimeout ? _peerIdleTimeout : _localIdleTimeout);
 264
 1224265            if (idleTimeout != Timeout.InfiniteTimeSpan)
 1220266            {
 1220267                _duplexConnection.Enable(idleTimeout);
 1220268            }
 269
 1224270            _readFramesTask = ReadFramesAsync(_disposedCts.Token);
 271
 1224272            return transportConnectionInformation;
 1224273        }
 274
 275        static (ulong, InitializeBody?) DecodeInitialize(FrameType? frameType, ReadOnlySequence<byte> buffer)
 640276        {
 640277            if (frameType != FrameType.Initialize)
 0278            {
 0279                throw new InvalidDataException($"Received unexpected {frameType} frame.");
 280            }
 281
 640282            return SliceEncoding.Slice2.DecodeBuffer<(ulong, InitializeBody?)>(
 640283                buffer,
 640284                (ref SliceDecoder decoder) =>
 640285                {
 640286                    ulong version = decoder.DecodeVarUInt62();
 638287                    if (version == SlicDefinitions.V1)
 634288                    {
 634289                        return (version, new InitializeBody(ref decoder));
 640290                    }
 640291                    else
 4292                    {
 4293                        decoder.Skip((int)(buffer.Length - decoder.Consumed));
 4294                        return (version, null);
 640295                    }
 1278296                });
 638297        }
 298
 299        static (InitializeAckBody?, VersionBody?) DecodeInitializeAckOrVersion(
 300            FrameType? frameType,
 301            ReadOnlySequence<byte> buffer) =>
 598302            frameType switch
 598303            {
 592304                FrameType.InitializeAck => (
 592305                    SliceEncoding.Slice2.DecodeBuffer(
 592306                        buffer,
 592307                        (ref SliceDecoder decoder) => new InitializeAckBody(ref decoder)),
 592308                    null),
 6309                FrameType.Version => (
 6310                    null,
 6311                    SliceEncoding.Slice2.DecodeBuffer(
 6312                        buffer,
 12313                        (ref SliceDecoder decoder) => new VersionBody(ref decoder))),
 0314                _ => throw new InvalidDataException($"Received unexpected Slic frame: '{frameType}'."),
 598315            };
 316
 317        async ValueTask<T> ReadFrameAsync<T>(
 318            Func<FrameType?, ReadOnlySequence<byte>, T> decodeFunc,
 319            CancellationToken cancellationToken)
 1293320        {
 1293321            (FrameType FrameType, int FrameSize, ulong?)? header =
 1293322                await ReadFrameHeaderAsync(cancellationToken).ConfigureAwait(false);
 323
 324            ReadOnlySequence<byte> buffer;
 1240325            if (header is null || header.Value.FrameSize == 0)
 8326            {
 8327                buffer = ReadOnlySequence<byte>.Empty;
 8328            }
 329            else
 1232330            {
 1232331                buffer = await _duplexConnectionReader.ReadAtLeastAsync(
 1232332                    header.Value.FrameSize,
 1232333                    cancellationToken).ConfigureAwait(false);
 1232334                if (buffer.Length > header.Value.FrameSize)
 13335                {
 13336                    buffer = buffer.Slice(0, header.Value.FrameSize);
 13337                }
 1232338            }
 339
 1240340            T decodedFrame = decodeFunc(header?.FrameType, buffer);
 1232341            _duplexConnectionReader.AdvanceTo(buffer.End);
 1232342            return decodedFrame;
 1232343        }
 1333344    }
 345
 346    public async Task CloseAsync(MultiplexedConnectionCloseError closeError, CancellationToken cancellationToken)
 199347    {
 199348        lock (_mutex)
 199349        {
 199350            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 351
 199352            if (_connectTask is null || !_connectTask.IsCompletedSuccessfully)
 2353            {
 2354                throw new InvalidOperationException("Cannot close a Slic connection before connecting it.");
 355            }
 197356        }
 357
 197358        bool waitForWriterShutdown = false;
 197359        if (TryClose(new IceRpcException(IceRpcError.OperationAborted), "The connection was closed."))
 152360        {
 152361            lock (_mutex)
 152362            {
 363                // The duplex connection writer of a server connection might already be shutdown
 364                // (_writerIsShutdown=true) if the client-side sent the Close frame and shut down the duplex connection.
 365                // This doesn't apply to the client-side since the server-side doesn't shutdown the duplex connection
 366                // writer after sending the Close frame.
 152367                if (!IsServer || !_writerIsShutdown)
 150368                {
 150369                    WriteFrame(FrameType.Close, streamId: null, new CloseBody((ulong)closeError).Encode);
 150370                    if (IsServer)
 77371                    {
 77372                        _duplexConnectionWriter.Flush();
 77373                    }
 374                    else
 73375                    {
 376                        // The sending of the client-side Close frame is followed by the shutdown of the duplex
 377                        // connection. For TCP, it's important to always shutdown the connection on the client-side
 378                        // first to avoid TIME_WAIT states on the server-side.
 73379                        _duplexConnectionWriter.Shutdown();
 73380                        waitForWriterShutdown = true;
 73381                    }
 150382                }
 152383            }
 152384        }
 385
 197386        if (waitForWriterShutdown)
 73387        {
 73388            await _duplexConnectionWriter.WriterTask.WaitAsync(cancellationToken).ConfigureAwait(false);
 73389        }
 390
 391        // Now, wait for the peer to close the write side of the connection, which will terminate the read frames task.
 197392        Debug.Assert(_readFramesTask is not null);
 197393        await _readFramesTask.WaitAsync(cancellationToken).ConfigureAwait(false);
 194394    }
 395
 396    public async ValueTask<IMultiplexedStream> CreateStreamAsync(
 397        bool bidirectional,
 398        CancellationToken cancellationToken)
 4097399    {
 4097400        lock (_mutex)
 4097401        {
 4097402            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 403
 4091404            if (_connectTask is null || !_connectTask.IsCompletedSuccessfully)
 4405            {
 4406                throw new InvalidOperationException("Cannot create stream before connecting the Slic connection.");
 407            }
 4087408            if (_isClosed)
 13409            {
 13410                throw new IceRpcException(_peerCloseError ?? IceRpcError.ConnectionAborted, _closedMessage);
 411            }
 412
 4074413            ++_streamSemaphoreWaitCount;
 4074414        }
 415
 416        try
 4074417        {
 4074418            using var createStreamCts = CancellationTokenSource.CreateLinkedTokenSource(
 4074419                _closedCancellationToken,
 4074420                cancellationToken);
 421
 4074422            SemaphoreSlim? streamCountSemaphore = bidirectional ?
 4074423                _bidirectionalStreamSemaphore :
 4074424                _unidirectionalStreamSemaphore;
 425
 4074426            if (streamCountSemaphore is null)
 2427            {
 428                // The stream semaphore is null if the peer's max streams configuration is 0. In this case, we let
 429                // CreateStreamAsync hang indefinitely until the connection is closed.
 2430                await Task.Delay(-1, createStreamCts.Token).ConfigureAwait(false);
 0431            }
 432            else
 4072433            {
 4072434                await streamCountSemaphore.WaitAsync(createStreamCts.Token).ConfigureAwait(false);
 4055435            }
 436
 4055437            return new SlicStream(this, bidirectional, isRemote: false);
 438        }
 19439        catch (OperationCanceledException)
 19440        {
 19441            cancellationToken.ThrowIfCancellationRequested();
 13442            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 12443            Debug.Assert(_isClosed);
 12444            throw new IceRpcException(_peerCloseError ?? IceRpcError.OperationAborted, _closedMessage);
 445        }
 446        finally
 4074447        {
 4074448            lock (_mutex)
 4074449            {
 4074450                --_streamSemaphoreWaitCount;
 4074451                if (_isClosed && _streamSemaphoreWaitCount == 0)
 13452                {
 13453                    _streamSemaphoreWaitClosed.SetResult();
 13454                }
 4074455            }
 4074456        }
 4055457    }
 458
 459    public ValueTask DisposeAsync()
 1898460    {
 1898461        lock (_mutex)
 1898462        {
 1898463            _disposeTask ??= PerformDisposeAsync();
 1898464        }
 1898465        return new(_disposeTask);
 466
 467        async Task PerformDisposeAsync()
 1371468        {
 469            // Make sure we execute the code below without holding the mutex lock.
 1371470            await Task.Yield();
 1371471            TryClose(new IceRpcException(IceRpcError.OperationAborted), "The connection was disposed.");
 472
 1371473            _disposedCts.Cancel();
 474
 475            try
 1371476            {
 1371477                await Task.WhenAll(
 1371478                    _connectTask ?? Task.CompletedTask,
 1371479                    _readFramesTask ?? Task.CompletedTask,
 1371480                    _streamSemaphoreWaitClosed.Task).ConfigureAwait(false);
 774481            }
 597482            catch
 597483            {
 484                // Expected if any of these tasks failed or was canceled. Each task takes care of handling unexpected
 485                // exceptions so there's no need to handle them here.
 597486            }
 487
 488            // Clean-up the streams that might still be queued on the channel.
 1424489            while (_acceptStreamChannel.Reader.TryRead(out IMultiplexedStream? stream))
 53490            {
 53491                if (stream.IsBidirectional)
 10492                {
 10493                    stream.Output.Complete();
 10494                    stream.Input.Complete();
 10495                }
 43496                else if (stream.IsRemote)
 43497                {
 43498                    stream.Input.Complete();
 43499                }
 500                else
 0501                {
 0502                    stream.Output.Complete();
 0503                }
 53504            }
 505
 506            try
 1371507            {
 508                // Prevents unobserved task exceptions.
 1371509                await _acceptStreamChannel.Reader.Completion.ConfigureAwait(false);
 0510            }
 1371511            catch
 1371512            {
 1371513            }
 514
 1371515            await _duplexConnectionWriter.DisposeAsync().ConfigureAwait(false);
 1371516            _duplexConnectionReader.Dispose();
 1371517            _duplexConnection.Dispose();
 518
 1371519            _disposedCts.Dispose();
 1371520            _bidirectionalStreamSemaphore?.Dispose();
 1371521            _unidirectionalStreamSemaphore?.Dispose();
 1371522            _closedCts.Dispose();
 1371523        }
 1898524    }
 525
 1373526    internal SlicConnection(
 1373527        IDuplexConnection duplexConnection,
 1373528        MultiplexedConnectionOptions options,
 1373529        SlicTransportOptions slicOptions,
 1373530        bool isServer)
 1373531    {
 1373532        IsServer = isServer;
 533
 1373534        Pool = options.Pool;
 1373535        MinSegmentSize = options.MinSegmentSize;
 1373536        _maxBidirectionalStreams = options.MaxBidirectionalStreams;
 1373537        _maxUnidirectionalStreams = options.MaxUnidirectionalStreams;
 538
 1373539        InitialStreamWindowSize = slicOptions.InitialStreamWindowSize;
 1373540        _localIdleTimeout = slicOptions.IdleTimeout;
 1373541        _maxStreamFrameSize = slicOptions.MaxStreamFrameSize;
 542
 1373543        _acceptStreamChannel = Channel.CreateUnbounded<IMultiplexedStream>(new UnboundedChannelOptions
 1373544        {
 1373545            SingleReader = true,
 1373546            SingleWriter = true
 1373547        });
 548
 1373549        _closedCancellationToken = _closedCts.Token;
 550
 551        // Only the client-side sends pings to keep the connection alive when idle timeout (set later) is not infinite.
 1373552        _duplexConnection = IsServer ?
 1373553            new SlicDuplexConnectionDecorator(duplexConnection) :
 1373554            new SlicDuplexConnectionDecorator(duplexConnection, SendReadPing, SendWritePing);
 555
 1373556        _duplexConnectionReader = new DuplexConnectionReader(_duplexConnection, options.Pool, options.MinSegmentSize);
 1373557        _duplexConnectionWriter = new SlicDuplexConnectionWriter(
 1373558            _duplexConnection,
 1373559            options.Pool,
 1373560            options.MinSegmentSize);
 561
 562        // We use the same stream ID numbering scheme as Quic.
 1373563        if (IsServer)
 677564        {
 677565            _nextBidirectionalId = 1;
 677566            _nextUnidirectionalId = 3;
 677567        }
 568        else
 696569        {
 696570            _nextBidirectionalId = 0;
 696571            _nextUnidirectionalId = 2;
 696572        }
 573
 574        void SendPing(long payload)
 27575        {
 576            try
 27577            {
 27578                WriteConnectionFrame(FrameType.Ping, new PingBody(payload).Encode);
 25579            }
 2580            catch (IceRpcException)
 2581            {
 582                // Expected if the connection is closed.
 2583            }
 0584            catch (Exception exception)
 0585            {
 0586                Debug.Fail($"The sending of a Ping frame failed with an unexpected exception: {exception}");
 0587                throw;
 588            }
 27589        }
 590
 591        void SendReadPing()
 27592        {
 593            // This local function is no-op if there is already a pending Pong.
 27594            if (Interlocked.CompareExchange(ref _pendingPongCount, 1, 0) == 0)
 27595            {
 27596                SendPing(1L);
 27597            }
 27598        }
 599
 600        void SendWritePing()
 0601        {
 602            // _pendingPongCount can be <= 0 if an unexpected pong is received. If it's the case, the connection is
 603            // being torn down and there's no point in sending a ping frame.
 0604            if (Interlocked.Increment(ref _pendingPongCount) > 0)
 0605            {
 0606                SendPing(0L);
 0607            }
 0608        }
 1373609    }
 610
 611    /// <summary>Fills the given writer with stream data received on the connection.</summary>
 612    /// <param name="bufferWriter">The destination buffer writer.</param>
 613    /// <param name="byteCount">The amount of stream data to read.</param>
 614    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 615    internal ValueTask FillBufferWriterAsync(
 616        IBufferWriter<byte> bufferWriter,
 617        int byteCount,
 618        CancellationToken cancellationToken) =>
 18914619        _duplexConnectionReader.FillBufferWriterAsync(bufferWriter, byteCount, cancellationToken);
 620
 621    /// <summary>Releases a stream from the connection. The connection stream count is decremented and if this is a
 622    /// client allow a new stream to be started.</summary>
 623    /// <param name="stream">The released stream.</param>
 624    internal void ReleaseStream(SlicStream stream)
 8046625    {
 8046626        Debug.Assert(stream.IsStarted);
 627
 8046628        _streams.Remove(stream.Id, out SlicStream? _);
 629
 8046630        if (stream.IsRemote)
 4021631        {
 4021632            if (stream.IsBidirectional)
 1262633            {
 1262634                Interlocked.Decrement(ref _bidirectionalStreamCount);
 1262635            }
 636            else
 2759637            {
 2759638                Interlocked.Decrement(ref _unidirectionalStreamCount);
 2759639            }
 4021640        }
 4025641        else if (!_isClosed)
 3287642        {
 3287643            if (stream.IsBidirectional)
 1102644            {
 1102645                _bidirectionalStreamSemaphore!.Release();
 1102646            }
 647            else
 2185648            {
 2185649                _unidirectionalStreamSemaphore!.Release();
 2185650            }
 3287651        }
 8046652    }
 653
 654    /// <summary>Throws the connection closure exception if the connection is already closed.</summary>
 655    internal void ThrowIfClosed()
 15637656    {
 15637657        lock (_mutex)
 15637658        {
 15637659            if (_isClosed)
 19660            {
 19661                throw new IceRpcException(_peerCloseError ?? IceRpcError.ConnectionAborted, _closedMessage);
 662            }
 15618663        }
 15618664    }
 665
 666    /// <summary>Writes a connection frame.</summary>
 667    /// <param name="frameType">The frame type.</param>
 668    /// <param name="encode">The action to encode the frame.</param>
 669    internal void WriteConnectionFrame(FrameType frameType, EncodeAction? encode)
 1331670    {
 1331671        Debug.Assert(frameType < FrameType.Stream);
 672
 1331673        lock (_mutex)
 1331674        {
 1331675            if (_isClosed)
 2676            {
 2677                throw new IceRpcException(_peerCloseError ?? IceRpcError.ConnectionAborted, _closedMessage);
 678            }
 1329679            WriteFrame(frameType, streamId: null, encode);
 1329680            _duplexConnectionWriter.Flush();
 1329681        }
 1329682    }
 683
 684    /// <summary>Writes a stream frame.</summary>
 685    /// <param name="stream">The stream to write the frame for.</param>
 686    /// <param name="frameType">The frame type.</param>
 687    /// <param name="encode">The action to encode the frame.</param>
 688    /// <param name="writeReadsClosedFrame"><see langword="true" /> if a <see cref="FrameType.StreamReadsClosed" />
 689    /// frame should be written after the stream frame.</param>
 690    /// <remarks>This method is called by streams and might be called on a closed connection. The connection might
 691    /// also be closed concurrently while it's in progress.</remarks>
 692    internal void WriteStreamFrame(
 693        SlicStream stream,
 694        FrameType frameType,
 695        EncodeAction? encode,
 696        bool writeReadsClosedFrame)
 5850697    {
 698        // Ensure that this method is called for any FrameType.StreamXxx frame type except FrameType.Stream.
 5850699        Debug.Assert(frameType >= FrameType.StreamLast && stream.IsStarted);
 700
 5850701        lock (_mutex)
 5850702        {
 5850703            if (_isClosed)
 1704            {
 1705                return;
 706            }
 707
 5849708            WriteFrame(frameType, stream.Id, encode);
 5849709            if (writeReadsClosedFrame)
 153710            {
 153711                WriteFrame(FrameType.StreamReadsClosed, stream.Id, encode: null);
 153712            }
 5849713            if (frameType == FrameType.StreamLast)
 1114714            {
 715                // Notify the stream that the last stream frame is considered sent at this point. This will close
 716                // writes on the stream and allow the stream to be released if reads are also closed.
 1114717                stream.WroteLastStreamFrame();
 1114718            }
 5849719            _duplexConnectionWriter.Flush();
 5849720        }
 5850721    }
 722
 723    /// <summary>Writes a stream data frame.</summary>
 724    /// <param name="stream">The stream to write the frame for.</param>
 725    /// <param name="source1">The first stream frame data source.</param>
 726    /// <param name="source2">The second stream frame data source.</param>
 727    /// <param name="endStream"><see langword="true" /> to write a <see cref="FrameType.StreamLast" /> frame and
 728    /// <see langword="false" /> to write a <see cref="FrameType.Stream" /> frame.</param>
 729    /// <param name="writeReadsClosedFrame"><see langword="true" /> if a <see cref="FrameType.StreamReadsClosed" />
 730    /// frame should be written after the stream frame.</param>
 731    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 732    /// <remarks>This method is called by streams and might be called on a closed connection. The connection might
 733    /// also be closed concurrently while it's in progress.</remarks>
 734    internal async ValueTask<FlushResult> WriteStreamDataFrameAsync(
 735        SlicStream stream,
 736        ReadOnlySequence<byte> source1,
 737        ReadOnlySequence<byte> source2,
 738        bool endStream,
 739        bool writeReadsClosedFrame,
 740        CancellationToken cancellationToken)
 15592741    {
 15592742        Debug.Assert(!source1.IsEmpty || endStream);
 743
 15592744        if (_connectTask is null)
 0745        {
 0746            throw new InvalidOperationException("Cannot send a stream frame before calling ConnectAsync.");
 747        }
 748
 15592749        using var writeCts = CancellationTokenSource.CreateLinkedTokenSource(
 15592750            _closedCancellationToken,
 15592751            cancellationToken);
 752
 753        try
 15592754        {
 755            do
 20248756            {
 757                // Next, ensure send credit is available. If not, this will block until the receiver allows sending
 758                // additional data.
 20248759                int sendCredit = 0;
 20248760                if (!source1.IsEmpty || !source2.IsEmpty)
 20238761                {
 20238762                    sendCredit = await stream.AcquireSendCreditAsync(writeCts.Token).ConfigureAwait(false);
 18210763                    Debug.Assert(sendCredit > 0);
 18210764                }
 765
 766                // Gather data from source1 or source2 up to sendCredit bytes or the peer maximum stream frame size.
 18220767                int sendMaxSize = Math.Min(sendCredit, PeerMaxStreamFrameSize);
 768                ReadOnlySequence<byte> sendSource1;
 769                ReadOnlySequence<byte> sendSource2;
 18220770                if (!source1.IsEmpty)
 16211771                {
 16211772                    int length = Math.Min((int)source1.Length, sendMaxSize);
 16211773                    sendSource1 = source1.Slice(0, length);
 16211774                    source1 = source1.Slice(length);
 16211775                }
 776                else
 2009777                {
 2009778                    sendSource1 = ReadOnlySequence<byte>.Empty;
 2009779                }
 780
 18220781                if (source1.IsEmpty && !source2.IsEmpty)
 4072782                {
 4072783                    int length = Math.Min((int)source2.Length, sendMaxSize - (int)sendSource1.Length);
 4072784                    sendSource2 = source2.Slice(0, length);
 4072785                    source2 = source2.Slice(length);
 4072786                }
 787                else
 14148788                {
 14148789                    sendSource2 = ReadOnlySequence<byte>.Empty;
 14148790                }
 791
 792                // If there's no data left to send and endStream is true, it's the last stream frame.
 18220793                bool lastStreamFrame = endStream && source1.IsEmpty && source2.IsEmpty;
 794
 18220795                lock (_mutex)
 18220796                {
 18220797                    if (_isClosed)
 0798                    {
 0799                        throw new IceRpcException(_peerCloseError ?? IceRpcError.ConnectionAborted, _closedMessage);
 800                    }
 801
 18220802                    if (!stream.IsStarted)
 4025803                    {
 4025804                        if (stream.IsBidirectional)
 1260805                        {
 1260806                            AddStream(_nextBidirectionalId, stream);
 1260807                            _nextBidirectionalId += 4;
 1260808                        }
 809                        else
 2765810                        {
 2765811                            AddStream(_nextUnidirectionalId, stream);
 2765812                            _nextUnidirectionalId += 4;
 2765813                        }
 4025814                    }
 815
 816                    // Notify the stream that we're consuming sendSize credit. It's important to call this before
 817                    // sending the stream frame to avoid race conditions where the StreamWindowUpdate frame could be
 818                    // received before the send credit was updated.
 18220819                    if (sendCredit > 0)
 18210820                    {
 18210821                        stream.ConsumedSendCredit((int)(sendSource1.Length + sendSource2.Length));
 18210822                    }
 823
 18220824                    EncodeStreamFrameHeader(stream.Id, sendSource1.Length + sendSource2.Length, lastStreamFrame);
 825
 18220826                    if (lastStreamFrame)
 1462827                    {
 828                        // Notify the stream that the last stream frame is considered sent at this point. This will
 829                        // complete writes on the stream and allow the stream to be released if reads are also
 830                        // completed.
 1462831                        stream.WroteLastStreamFrame();
 1462832                    }
 833
 834                    // Write and flush the stream frame.
 18220835                    if (!sendSource1.IsEmpty)
 16211836                    {
 16211837                        _duplexConnectionWriter.Write(sendSource1);
 16211838                    }
 18220839                    if (!sendSource2.IsEmpty)
 4072840                    {
 4072841                        _duplexConnectionWriter.Write(sendSource2);
 4072842                    }
 843
 18220844                    if (writeReadsClosedFrame)
 695845                    {
 695846                        WriteFrame(FrameType.StreamReadsClosed, stream.Id, encode: null);
 695847                    }
 18220848                    _duplexConnectionWriter.Flush();
 18220849                }
 18220850            }
 18220851            while (!source1.IsEmpty || !source2.IsEmpty); // Loop until there's no data left to send.
 13564852        }
 2028853        catch (OperationCanceledException)
 2028854        {
 2028855            cancellationToken.ThrowIfCancellationRequested();
 856
 0857            Debug.Assert(_isClosed);
 0858            throw new IceRpcException(_peerCloseError ?? IceRpcError.OperationAborted, _closedMessage);
 859        }
 860
 13564861        return new FlushResult(isCanceled: false, isCompleted: false);
 862
 863        void EncodeStreamFrameHeader(ulong streamId, long size, bool lastStreamFrame)
 18220864        {
 18220865            var encoder = new SliceEncoder(_duplexConnectionWriter, SliceEncoding.Slice2);
 18220866            encoder.EncodeFrameType(!lastStreamFrame ? FrameType.Stream : FrameType.StreamLast);
 18220867            Span<byte> sizePlaceholder = encoder.GetPlaceholderSpan(4);
 18220868            int startPos = encoder.EncodedByteCount;
 18220869            encoder.EncodeVarUInt62(streamId);
 18220870            SliceEncoder.EncodeVarUInt62((ulong)(encoder.EncodedByteCount - startPos + size), sizePlaceholder);
 18220871        }
 13564872    }
 873
 874    private void AddStream(ulong id, SlicStream stream)
 8046875    {
 8046876        lock (_mutex)
 8046877        {
 8046878            if (_isClosed)
 0879            {
 0880                throw new IceRpcException(_peerCloseError ?? IceRpcError.ConnectionAborted, _closedMessage);
 881            }
 882
 8046883            _streams[id] = stream;
 884
 885            // Assign the stream ID within the mutex to ensure that the addition of the stream to the connection and the
 886            // stream ID assignment are atomic.
 8046887            stream.Id = id;
 888
 889            // Keep track of the last assigned stream ID. This is used to figure out if the stream is known or unknown.
 8046890            if (stream.IsRemote)
 4021891            {
 4021892                if (stream.IsBidirectional)
 1262893                {
 1262894                    _lastRemoteBidirectionalStreamId = id;
 1262895                }
 896                else
 2759897                {
 2759898                    _lastRemoteUnidirectionalStreamId = id;
 2759899                }
 4021900            }
 8046901        }
 8046902    }
 903
 904    private void DecodeParameters(IDictionary<ParameterKey, IList<byte>> parameters)
 1224905    {
 1224906        int? maxStreamFrameSize = null;
 1224907        int? peerInitialStreamWindowSize = null;
 15494908        foreach ((ParameterKey key, IList<byte> buffer) in parameters)
 5911909        {
 5911910            switch (key)
 911            {
 912                case ParameterKey.MaxBidirectionalStreams:
 1095913                {
 1095914                    int value = DecodeParamValue(buffer);
 1095915                    if (value > 0)
 1095916                    {
 1095917                        _bidirectionalStreamSemaphore = new SemaphoreSlim(value, value);
 1095918                    }
 1095919                    break;
 920                }
 921                case ParameterKey.MaxUnidirectionalStreams:
 1186922                {
 1186923                    int value = DecodeParamValue(buffer);
 1186924                    if (value > 0)
 1186925                    {
 1186926                        _unidirectionalStreamSemaphore = new SemaphoreSlim(value, value);
 1186927                    }
 1186928                    break;
 929                }
 930                case ParameterKey.IdleTimeout:
 1182931                {
 1182932                    _peerIdleTimeout = TimeSpan.FromMilliseconds(DecodeParamValue(buffer));
 1182933                    if (_peerIdleTimeout == TimeSpan.Zero)
 0934                    {
 0935                        throw new InvalidDataException(
 0936                            "The IdleTimeout Slic connection parameter is invalid, it must be greater than 0 s.");
 937                    }
 1182938                    break;
 939                }
 940                case ParameterKey.MaxStreamFrameSize:
 1224941                {
 1224942                    maxStreamFrameSize = DecodeParamValue(buffer);
 1224943                    if (maxStreamFrameSize < 1024)
 0944                    {
 0945                        throw new InvalidDataException(
 0946                            "The MaxStreamFrameSize connection parameter is invalid, it must be greater than 1KB.");
 947                    }
 1224948                    break;
 949                }
 950                case ParameterKey.InitialStreamWindowSize:
 1224951                {
 1224952                    peerInitialStreamWindowSize = DecodeParamValue(buffer);
 1224953                    if (peerInitialStreamWindowSize < 1024)
 0954                    {
 0955                        throw new InvalidDataException(
 0956                            "The InitialStreamWindowSize connection parameter is invalid, it must be greater than 1KB.")
 957                    }
 1224958                    break;
 959                }
 960                // Ignore unsupported parameter.
 961            }
 5911962        }
 963
 1224964        if (maxStreamFrameSize is null)
 0965        {
 0966            throw new InvalidDataException(
 0967                "The peer didn't send the required MaxStreamFrameSize connection parameter.");
 968        }
 969        else
 1224970        {
 1224971            PeerMaxStreamFrameSize = maxStreamFrameSize.Value;
 1224972        }
 973
 1224974        if (peerInitialStreamWindowSize is null)
 0975        {
 0976            throw new InvalidDataException(
 0977                "The peer didn't send the required InitialStreamWindowSize connection parameter.");
 978        }
 979        else
 1224980        {
 1224981            PeerInitialStreamWindowSize = peerInitialStreamWindowSize.Value;
 1224982        }
 983
 984        // all parameter values are currently integers in the range 0..Int32Max encoded as varuint62.
 985        static int DecodeParamValue(IList<byte> buffer)
 5911986        {
 987            // The IList<byte> decoded by the IceRPC + Slice integration is backed by an array
 5911988            ulong value = SliceEncoding.Slice2.DecodeBuffer(
 5911989                new ReadOnlySequence<byte>((byte[])buffer),
 11822990                (ref SliceDecoder decoder) => decoder.DecodeVarUInt62());
 991            try
 5911992            {
 5911993                return checked((int)value);
 994            }
 0995            catch (OverflowException exception)
 0996            {
 0997                throw new InvalidDataException("The value is out of the varuint32 accepted range.", exception);
 998            }
 5911999        }
 12241000    }
 1001
 1002    private Dictionary<ParameterKey, IList<byte>> EncodeParameters()
 12751003    {
 12751004        var parameters = new List<KeyValuePair<ParameterKey, IList<byte>>>
 12751005        {
 12751006            // Required parameters.
 12751007            EncodeParameter(ParameterKey.MaxStreamFrameSize, (ulong)_maxStreamFrameSize),
 12751008            EncodeParameter(ParameterKey.InitialStreamWindowSize, (ulong)InitialStreamWindowSize)
 12751009        };
 1010
 1011        // Optional parameters.
 12751012        if (_localIdleTimeout != Timeout.InfiniteTimeSpan)
 12711013        {
 12711014            parameters.Add(EncodeParameter(ParameterKey.IdleTimeout, (ulong)_localIdleTimeout.TotalMilliseconds));
 12711015        }
 12751016        if (_maxBidirectionalStreams > 0)
 11691017        {
 11691018            parameters.Add(EncodeParameter(ParameterKey.MaxBidirectionalStreams, (ulong)_maxBidirectionalStreams));
 11691019        }
 12751020        if (_maxUnidirectionalStreams > 0)
 12751021        {
 12751022            parameters.Add(EncodeParameter(ParameterKey.MaxUnidirectionalStreams, (ulong)_maxUnidirectionalStreams));
 12751023        }
 1024
 12751025        return new Dictionary<ParameterKey, IList<byte>>(parameters);
 1026
 1027        static KeyValuePair<ParameterKey, IList<byte>> EncodeParameter(ParameterKey key, ulong value)
 62651028        {
 62651029            int sizeLength = SliceEncoder.GetVarUInt62EncodedSize(value);
 62651030            byte[] buffer = new byte[sizeLength];
 62651031            SliceEncoder.EncodeVarUInt62(value, buffer);
 62651032            return new(key, buffer);
 62651033        }
 12751034    }
 1035
 1036    private bool IsUnknownStream(ulong streamId)
 96071037    {
 96071038        bool isRemote = streamId % 2 == (IsServer ? 0ul : 1ul);
 96071039        bool isBidirectional = streamId % 4 < 2;
 96071040        if (isRemote)
 52381041        {
 52381042            if (isBidirectional)
 24101043            {
 24101044                return _lastRemoteBidirectionalStreamId is null || streamId > _lastRemoteBidirectionalStreamId;
 1045            }
 1046            else
 28281047            {
 28281048                return _lastRemoteUnidirectionalStreamId is null || streamId > _lastRemoteUnidirectionalStreamId;
 1049            }
 1050        }
 1051        else
 43691052        {
 43691053            if (isBidirectional)
 21331054            {
 21331055                return streamId >= _nextBidirectionalId;
 1056            }
 1057            else
 22361058            {
 22361059                return streamId >= _nextUnidirectionalId;
 1060            }
 1061        }
 96071062    }
 1063
 1064    private Task ReadFrameAsync(FrameType frameType, int size, ulong? streamId, CancellationToken cancellationToken)
 247551065    {
 247551066        if (frameType >= FrameType.Stream && streamId is null)
 01067        {
 01068            throw new InvalidDataException("Received stream frame without stream ID.");
 1069        }
 1070
 247551071        switch (frameType)
 1072        {
 1073            case FrameType.Close:
 1541074            {
 1541075                return ReadCloseFrameAsync(size, cancellationToken);
 1076            }
 1077            case FrameType.Ping:
 291078            {
 291079                return ReadPingFrameAndWritePongFrameAsync(size, cancellationToken);
 1080            }
 1081            case FrameType.Pong:
 311082            {
 311083                return ReadPongFrameAsync(size, cancellationToken);
 1084            }
 1085            case FrameType.Stream:
 1086            case FrameType.StreamLast:
 191161087            {
 191161088                return ReadStreamDataFrameAsync(frameType, size, streamId!.Value, cancellationToken);
 1089            }
 1090            case FrameType.StreamWindowUpdate:
 19831091            {
 19831092                if (IsUnknownStream(streamId!.Value))
 21093                {
 21094                    throw new InvalidDataException($"Received {frameType} frame for unknown stream.");
 1095                }
 1096
 19811097                return ReadStreamWindowUpdateFrameAsync(size, streamId!.Value, cancellationToken);
 1098            }
 1099            case FrameType.StreamReadsClosed:
 1100            case FrameType.StreamWritesClosed:
 34361101            {
 34361102                if (size > 0)
 41103                {
 41104                    throw new InvalidDataException($"Unexpected body for {frameType} frame.");
 1105                }
 34321106                if (IsUnknownStream(streamId!.Value))
 41107                {
 41108                    throw new InvalidDataException($"Received {frameType} frame for unknown stream.");
 1109                }
 1110
 34281111                if (_streams.TryGetValue(streamId.Value, out SlicStream? stream))
 26021112                {
 26021113                    if (frameType == FrameType.StreamWritesClosed)
 821114                    {
 821115                        stream.ReceivedWritesClosedFrame();
 821116                    }
 1117                    else
 25201118                    {
 25201119                        stream.ReceivedReadsClosedFrame();
 25201120                    }
 26021121                }
 34281122                return Task.CompletedTask;
 1123            }
 1124            default:
 61125            {
 61126                throw new InvalidDataException($"Received unexpected {frameType} frame.");
 1127            }
 1128        }
 1129
 1130        async Task ReadCloseFrameAsync(int size, CancellationToken cancellationToken)
 1541131        {
 1541132            CloseBody closeBody = await ReadFrameBodyAsync(
 1541133                FrameType.Close,
 1541134                size,
 1521135                (ref SliceDecoder decoder) => new CloseBody(ref decoder),
 1541136                cancellationToken).ConfigureAwait(false);
 1137
 1501138            IceRpcError? peerCloseError = closeBody.ApplicationErrorCode switch
 1501139            {
 1061140                (ulong)MultiplexedConnectionCloseError.NoError => IceRpcError.ConnectionClosedByPeer,
 81141                (ulong)MultiplexedConnectionCloseError.Refused => IceRpcError.ConnectionRefused,
 161142                (ulong)MultiplexedConnectionCloseError.ServerBusy => IceRpcError.ServerBusy,
 101143                (ulong)MultiplexedConnectionCloseError.Aborted => IceRpcError.ConnectionAborted,
 101144                _ => null
 1501145            };
 1146
 1147            bool notAlreadyClosed;
 1501148            if (peerCloseError is null)
 101149            {
 101150                notAlreadyClosed = TryClose(
 101151                    new IceRpcException(IceRpcError.ConnectionAborted),
 101152                    $"The connection was closed by the peer with an unknown application error code: '{closeBody.Applicat
 101153                    IceRpcError.ConnectionAborted);
 101154            }
 1155            else
 1401156            {
 1401157                notAlreadyClosed = TryClose(
 1401158                    new IceRpcException(peerCloseError.Value),
 1401159                    "The connection was closed by the peer.",
 1401160                    peerCloseError);
 1401161            }
 1162
 1163            // The server-side of the duplex connection is only shutdown once the client-side is shutdown. When using
 1164            // TCP, this ensures that the server TCP connection won't end-up in the TIME_WAIT state on the server-side.
 1501165            if (notAlreadyClosed && !IsServer)
 631166            {
 1167                // DisposeAsync waits for the reads frames task to complete before disposing the writer.
 631168                lock (_mutex)
 631169                {
 631170                    _duplexConnectionWriter.Shutdown();
 631171                }
 631172                await _duplexConnectionWriter.WriterTask.WaitAsync(cancellationToken).ConfigureAwait(false);
 611173            }
 1481174        }
 1175
 1176        async Task ReadPingFrameAndWritePongFrameAsync(int size, CancellationToken cancellationToken)
 291177        {
 1178            // Read the ping frame.
 291179            PingBody pingBody = await ReadFrameBodyAsync(
 291180                FrameType.Ping,
 291181                size,
 271182                (ref SliceDecoder decoder) => new PingBody(ref decoder),
 291183                cancellationToken).ConfigureAwait(false);
 1184
 1185            // Return a pong frame with the ping payload.
 251186            WriteConnectionFrame(FrameType.Pong, new PongBody(pingBody.Payload).Encode);
 251187        }
 1188
 1189        async Task ReadPongFrameAsync(int size, CancellationToken cancellationToken)
 311190        {
 311191            if (Interlocked.Decrement(ref _pendingPongCount) >= 0)
 251192            {
 1193                // Ensure the pong frame payload value is expected.
 1194
 251195                PongBody pongBody = await ReadFrameBodyAsync(
 251196                    FrameType.Pong,
 251197                    size,
 251198                    (ref SliceDecoder decoder) => new PongBody(ref decoder),
 251199                    cancellationToken).ConfigureAwait(false);
 1200
 1201                // For now, we only send a 0 or 1 payload value (0 for "write ping" and 1 for "read ping").
 251202                if (pongBody.Payload != 0L && pongBody.Payload != 1L)
 01203                {
 01204                    throw new InvalidDataException($"Received {nameof(FrameType.Pong)} with unexpected payload.");
 1205                }
 251206            }
 1207            else
 61208            {
 1209                // If not waiting for a pong frame, this pong frame is unexpected.
 61210                throw new InvalidDataException($"Received unexpected {nameof(FrameType.Pong)} frame.");
 1211            }
 251212        }
 1213
 1214        async Task ReadStreamWindowUpdateFrameAsync(int size, ulong streamId, CancellationToken cancellationToken)
 19811215        {
 19811216            StreamWindowUpdateBody frame = await ReadFrameBodyAsync(
 19811217                FrameType.StreamWindowUpdate,
 19811218                size,
 19811219                (ref SliceDecoder decoder) => new StreamWindowUpdateBody(ref decoder),
 19811220                cancellationToken).ConfigureAwait(false);
 19811221            if (_streams.TryGetValue(streamId, out SlicStream? stream))
 19111222            {
 19111223                stream.ReceivedWindowUpdateFrame(frame);
 19111224            }
 19811225        }
 1226
 1227        async Task<T> ReadFrameBodyAsync<T>(
 1228            FrameType frameType,
 1229            int size,
 1230            DecodeFunc<T> decodeFunc,
 1231            CancellationToken cancellationToken)
 21891232        {
 21891233            if (size <= 0)
 41234            {
 41235                throw new InvalidDataException($"Unexpected empty body for {frameType} frame.");
 1236            }
 1237
 21851238            ReadOnlySequence<byte> buffer = await _duplexConnectionReader.ReadAtLeastAsync(size, cancellationToken)
 21851239                .ConfigureAwait(false);
 1240
 21851241            if (buffer.Length > size)
 14051242            {
 14051243                buffer = buffer.Slice(0, size);
 14051244            }
 1245
 21851246            T decodedFrame = SliceEncoding.Slice2.DecodeBuffer(buffer, decodeFunc);
 21811247            _duplexConnectionReader.AdvanceTo(buffer.End);
 21811248            return decodedFrame;
 21811249        }
 247391250    }
 1251
 1252    private async ValueTask<(FrameType FrameType, int FrameSize, ulong? StreamId)?> ReadFrameHeaderAsync(
 1253        CancellationToken cancellationToken)
 272321254    {
 272321255        while (true)
 272321256        {
 1257            // Read data from the pipe reader.
 272321258            if (!_duplexConnectionReader.TryRead(out ReadOnlySequence<byte> buffer))
 170211259            {
 170211260                buffer = await _duplexConnectionReader.ReadAsync(cancellationToken).ConfigureAwait(false);
 160541261            }
 1262
 262651263            if (buffer.IsEmpty)
 2621264            {
 2621265                return null;
 1266            }
 1267
 260031268            if (TryDecodeHeader(
 260031269                buffer,
 260031270                out (FrameType FrameType, int FrameSize, ulong? StreamId) header,
 260031271                out int consumed))
 259931272            {
 259931273                _duplexConnectionReader.AdvanceTo(buffer.GetPosition(consumed));
 259931274                return header;
 1275            }
 1276            else
 01277            {
 01278                _duplexConnectionReader.AdvanceTo(buffer.Start, buffer.End);
 01279            }
 01280        }
 1281
 1282        static bool TryDecodeHeader(
 1283            ReadOnlySequence<byte> buffer,
 1284            out (FrameType FrameType, int FrameSize, ulong? StreamId) header,
 1285            out int consumed)
 260031286        {
 260031287            header = default;
 260031288            consumed = default;
 1289
 260031290            var decoder = new SliceDecoder(buffer, SliceEncoding.Slice2);
 1291
 1292            // Decode the frame type and frame size.
 260031293            if (!decoder.TryDecodeUInt8(out byte frameType) || !decoder.TryDecodeVarUInt62(out ulong frameSize))
 01294            {
 01295                return false;
 1296            }
 1297
 260031298            header.FrameType = frameType.AsFrameType();
 1299            try
 259971300            {
 259971301                header.FrameSize = checked((int)frameSize);
 259971302            }
 01303            catch (OverflowException exception)
 01304            {
 01305                throw new InvalidDataException("The frame size can't be larger than int.MaxValue.", exception);
 1306            }
 1307
 1308            // If it's a stream frame, try to decode the stream ID
 259971309            if (header.FrameType >= FrameType.Stream)
 245391310            {
 245391311                if (header.FrameSize == 0)
 21312                {
 21313                    throw new InvalidDataException("Invalid stream frame size.");
 1314                }
 1315
 245371316                consumed = (int)decoder.Consumed;
 245371317                if (!decoder.TryDecodeVarUInt62(out ulong streamId))
 01318                {
 01319                    return false;
 1320                }
 245371321                header.StreamId = streamId;
 245371322                header.FrameSize -= (int)decoder.Consumed - consumed;
 1323
 245371324                if (header.FrameSize < 0)
 21325                {
 21326                    throw new InvalidDataException("Invalid stream frame size.");
 1327                }
 245351328            }
 1329
 259931330            consumed = (int)decoder.Consumed;
 259931331            return true;
 259931332        }
 262551333    }
 1334
 1335    private async Task ReadFramesAsync(CancellationToken cancellationToken)
 12241336    {
 1337        try
 12241338        {
 259391339            while (true)
 259391340            {
 259391341                (FrameType Type, int Size, ulong? StreamId)? header = await ReadFrameHeaderAsync(cancellationToken)
 259391342                    .ConfigureAwait(false);
 1343
 250151344                if (header is null)
 2601345                {
 2601346                    lock (_mutex)
 2601347                    {
 2601348                        if (!_isClosed)
 01349                        {
 1350                            // Unexpected duplex connection shutdown.
 01351                            throw new IceRpcException(IceRpcError.ConnectionAborted);
 1352                        }
 2601353                    }
 1354                    // The peer has shut down the duplex connection.
 2601355                    break;
 1356                }
 1357
 247551358                await ReadFrameAsync(header.Value.Type, header.Value.Size, header.Value.StreamId, cancellationToken)
 247551359                    .ConfigureAwait(false);
 247151360            }
 1361
 2601362            if (IsServer)
 1341363            {
 1341364                Debug.Assert(_isClosed);
 1365
 1366                // The server-side of the duplex connection is only shutdown once the client-side is shutdown. When
 1367                // using TCP, this ensures that the server TCP connection won't end-up in the TIME_WAIT state on the
 1368                // server-side.
 1369
 1370                // DisposeAsync waits for the reads frames task to complete before disposing the writer.
 1341371                lock (_mutex)
 1341372                {
 1341373                    _duplexConnectionWriter.Shutdown();
 1374
 1375                    // Make sure that CloseAsync doesn't call Write on the writer if it's called shortly after the peer
 1376                    // shutdown its side of the connection (which triggers ReadFrameHeaderAsync to return null).
 1341377                    _writerIsShutdown = true;
 1341378                }
 1379
 1341380                await _duplexConnectionWriter.WriterTask.WaitAsync(cancellationToken).ConfigureAwait(false);
 1321381            }
 2581382        }
 4761383        catch (OperationCanceledException)
 4761384        {
 1385            // Expected, DisposeAsync was called.
 4761386        }
 4521387        catch (IceRpcException exception)
 4521388        {
 4521389            TryClose(exception, "The connection was lost.", IceRpcError.ConnectionAborted);
 4521390            throw;
 1391        }
 381392        catch (InvalidDataException exception)
 381393        {
 381394            var rpcException = new IceRpcException(
 381395                IceRpcError.IceRpcError,
 381396                "The connection was aborted by a Slic protocol error.",
 381397                exception);
 381398            TryClose(rpcException, rpcException.Message, IceRpcError.IceRpcError);
 381399            throw rpcException;
 1400        }
 01401        catch (Exception exception)
 01402        {
 01403            Debug.Fail($"The read frames task completed due to an unhandled exception: {exception}");
 01404            TryClose(exception, "The connection was lost.", IceRpcError.ConnectionAborted);
 01405            throw;
 1406        }
 7341407    }
 1408
 1409    private async Task ReadStreamDataFrameAsync(
 1410        FrameType type,
 1411        int size,
 1412        ulong streamId,
 1413        CancellationToken cancellationToken)
 191161414    {
 191161415        bool endStream = type == FrameType.StreamLast;
 191161416        bool isRemote = streamId % 2 == (IsServer ? 0ul : 1ul);
 191161417        bool isBidirectional = streamId % 4 < 2;
 1418
 191161419        if (!isBidirectional && !isRemote)
 01420        {
 01421            throw new InvalidDataException(
 01422                "Received unexpected stream frame on local unidirectional stream.");
 1423        }
 191161424        else if (size == 0 && !endStream)
 21425        {
 21426            throw new InvalidDataException($"Received invalid {nameof(FrameType.Stream)} frame.");
 1427        }
 1428
 191141429        if (!_streams.TryGetValue(streamId, out SlicStream? stream) && isRemote && IsUnknownStream(streamId))
 40211430        {
 1431            // Create a new remote stream.
 1432
 40211433            if (size == 0)
 01434            {
 01435                throw new InvalidDataException("Received empty stream frame on new stream.");
 1436            }
 1437
 40211438            if (isBidirectional)
 12621439            {
 12621440                if (streamId > _lastRemoteBidirectionalStreamId + 4)
 01441                {
 01442                    throw new InvalidDataException("Invalid stream ID.");
 1443                }
 1444
 12621445                if (_bidirectionalStreamCount == _maxBidirectionalStreams)
 01446                {
 01447                    throw new IceRpcException(
 01448                        IceRpcError.IceRpcError,
 01449                        $"The maximum bidirectional stream count {_maxBidirectionalStreams} was reached.");
 1450                }
 12621451                Interlocked.Increment(ref _bidirectionalStreamCount);
 12621452            }
 1453            else
 27591454            {
 27591455                if (streamId > _lastRemoteUnidirectionalStreamId + 4)
 01456                {
 01457                    throw new InvalidDataException("Invalid stream ID.");
 1458                }
 1459
 27591460                if (_unidirectionalStreamCount == _maxUnidirectionalStreams)
 01461                {
 01462                    throw new IceRpcException(
 01463                        IceRpcError.IceRpcError,
 01464                        $"The maximum unidirectional stream count {_maxUnidirectionalStreams} was reached");
 1465                }
 27591466                Interlocked.Increment(ref _unidirectionalStreamCount);
 27591467            }
 1468
 1469            // The stream is registered with the connection and queued on the channel. The caller of AcceptStreamAsync
 1470            // is responsible for cleaning up the stream.
 40211471            stream = new SlicStream(this, isBidirectional, isRemote: true);
 1472
 1473            try
 40211474            {
 40211475                AddStream(streamId, stream);
 1476
 1477                try
 40211478                {
 40211479                    await _acceptStreamChannel.Writer.WriteAsync(
 40211480                        stream,
 40211481                        cancellationToken).ConfigureAwait(false);
 40211482                }
 01483                catch (ChannelClosedException exception)
 01484                {
 1485                    // The exception given to ChannelWriter.Complete(Exception? exception) is the InnerException.
 01486                    Debug.Assert(exception.InnerException is not null);
 01487                    throw ExceptionUtil.Throw(exception.InnerException);
 1488                }
 40211489            }
 01490            catch (IceRpcException)
 01491            {
 1492                // The two methods above throw IceRpcException if the connection has been closed (either by CloseAsync
 1493                // or because the close frame was received). We cleanup up the stream but don't throw to not abort the
 1494                // reading. The connection graceful closure still needs to read on the connection to figure out when the
 1495                // peer shuts down the duplex connection.
 01496                Debug.Assert(_isClosed);
 01497                stream.Input.Complete();
 01498                if (isBidirectional)
 01499                {
 01500                    stream.Output.Complete();
 01501                }
 01502            }
 40211503        }
 1504
 191141505        bool isDataConsumed = false;
 191141506        if (stream is not null)
 189291507        {
 1508            // Let the stream consume the stream frame data.
 189291509            isDataConsumed = await stream.ReceivedDataFrameAsync(
 189291510                size,
 189291511                endStream,
 189291512                cancellationToken).ConfigureAwait(false);
 189281513        }
 1514
 191131515        if (!isDataConsumed)
 2001516        {
 1517            // The stream (if any) didn't consume the data. Read and ignore the data using a helper pipe.
 2001518            var pipe = new Pipe(
 2001519                new PipeOptions(
 2001520                    pool: Pool,
 2001521                    pauseWriterThreshold: 0,
 2001522                    minimumSegmentSize: MinSegmentSize,
 2001523                    useSynchronizationContext: false));
 1524
 2001525            await _duplexConnectionReader.FillBufferWriterAsync(
 2001526                    pipe.Writer,
 2001527                    size,
 2001528                    cancellationToken).ConfigureAwait(false);
 1529
 1951530            pipe.Writer.Complete();
 1951531            pipe.Reader.Complete();
 1951532        }
 191081533    }
 1534
 1535    private bool TryClose(Exception exception, string closeMessage, IceRpcError? peerCloseError = null)
 22081536    {
 22081537        lock (_mutex)
 22081538        {
 22081539            if (_isClosed)
 8371540            {
 8371541                return false;
 1542            }
 13711543            _isClosed = true;
 13711544            _closedMessage = closeMessage;
 13711545            _peerCloseError = peerCloseError;
 13711546            if (_streamSemaphoreWaitCount == 0)
 13581547            {
 13581548                _streamSemaphoreWaitClosed.SetResult();
 13581549            }
 13711550        }
 1551
 1552        // Cancel pending CreateStreamAsync, AcceptStreamAsync and WriteStreamDataFrameAsync operations.
 13711553        _closedCts.Cancel();
 13711554        _acceptStreamChannel.Writer.TryComplete(exception);
 1555
 1556        // Close streams.
 67371557        foreach (SlicStream stream in _streams.Values)
 13121558        {
 13121559            stream.Close(exception);
 13121560        }
 1561
 13711562        return true;
 22081563    }
 1564
 1565    private void WriteFrame(FrameType frameType, ulong? streamId, EncodeAction? encode)
 81761566    {
 81761567        var encoder = new SliceEncoder(_duplexConnectionWriter, SliceEncoding.Slice2);
 81761568        encoder.EncodeFrameType(frameType);
 81761569        Span<byte> sizePlaceholder = encoder.GetPlaceholderSpan(4);
 81761570        int startPos = encoder.EncodedByteCount;
 81761571        if (streamId is not null)
 66971572        {
 66971573            encoder.EncodeVarUInt62(streamId.Value);
 66971574        }
 81761575        encode?.Invoke(ref encoder);
 81761576        SliceEncoder.EncodeVarUInt62((ulong)(encoder.EncodedByteCount - startPos), sizePlaceholder);
 81761577    }
 1578}

Methods/Properties

get_IsServer()
get_MinSegmentSize()
get_PeerInitialStreamWindowSize()
get_PeerMaxStreamFrameSize()
get_Pool()
get_InitialStreamWindowSize()
get_StreamWindowUpdateThreshold()
.ctor(IceRpc.Transports.IDuplexConnection,IceRpc.Transports.MultiplexedConnectionOptions,IceRpc.Transports.Slic.SlicTransportOptions,System.Boolean)
AcceptStreamAsync()
ConnectAsync(System.Threading.CancellationToken)
PerformConnectAsync()
DecodeInitialize()
DecodeInitializeAckOrVersion()
ReadFrameAsync()
CloseAsync()
CreateStreamAsync()
DisposeAsync()
PerformDisposeAsync()
SendPing()
SendReadPing()
SendWritePing()
FillBufferWriterAsync(System.Buffers.IBufferWriter`1<System.Byte>,System.Int32,System.Threading.CancellationToken)
ReleaseStream(IceRpc.Transports.Slic.Internal.SlicStream)
ThrowIfClosed()
WriteConnectionFrame(IceRpc.Transports.Slic.Internal.FrameType,ZeroC.Slice.EncodeAction)
WriteStreamFrame(IceRpc.Transports.Slic.Internal.SlicStream,IceRpc.Transports.Slic.Internal.FrameType,ZeroC.Slice.EncodeAction,System.Boolean)
WriteStreamDataFrameAsync()
EncodeStreamFrameHeader()
AddStream(System.UInt64,IceRpc.Transports.Slic.Internal.SlicStream)
DecodeParameters(System.Collections.Generic.IDictionary`2<IceRpc.Transports.Slic.Internal.ParameterKey,System.Collections.Generic.IList`1<System.Byte>>)
DecodeParamValue()
EncodeParameters()
EncodeParameter()
IsUnknownStream(System.UInt64)
ReadFrameAsync(IceRpc.Transports.Slic.Internal.FrameType,System.Int32,System.Nullable`1<System.UInt64>,System.Threading.CancellationToken)
ReadCloseFrameAsync()
ReadPingFrameAndWritePongFrameAsync()
ReadPongFrameAsync()
ReadStreamWindowUpdateFrameAsync()
ReadFrameBodyAsync()
ReadFrameHeaderAsync()
TryDecodeHeader()
ReadFramesAsync()
ReadStreamDataFrameAsync()
TryClose(System.Exception,System.String,System.Nullable`1<IceRpc.IceRpcError>)
WriteFrame(IceRpc.Transports.Slic.Internal.FrameType,System.Nullable`1<System.UInt64>,ZeroC.Slice.EncodeAction)