< Summary

Information
Class: IceRpc.Transports.Slic.Internal.SlicConnection
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicConnection.cs
Tag: 592_20856082467
Line coverage
90%
Covered lines: 906
Uncovered lines: 96
Coverable lines: 1002
Total lines: 1578
Line coverage: 90.4%
Branch coverage
90%
Covered branches: 298
Total branches: 330
Branch coverage: 90.3%
Method coverage
97%
Covered methods: 45
Total methods: 46
Method coverage: 97.8%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
get_IsServer()100%11100%
get_MinSegmentSize()100%11100%
get_PeerInitialStreamWindowSize()100%11100%
get_PeerMaxStreamFrameSize()100%11100%
get_Pool()100%11100%
get_InitialStreamWindowSize()100%11100%
get_StreamWindowUpdateThreshold()100%11100%
.ctor(...)100%44100%
AcceptStreamAsync()100%66100%
ConnectAsync(...)75%4.06484.61%
PerformConnectAsync()100%24.052495.5%
DecodeInitialize()75%4.02490%
DecodeInitializeAckOrVersion()66.66%6.01692.85%
ReadFrameAsync()100%88100%
CloseAsync()100%1414100%
CreateStreamAsync()100%141497.61%
DisposeAsync()100%22100%
PerformDisposeAsync()93.75%16.221690.47%
SendPing()100%1.04166.66%
SendReadPing()100%22100%
SendWritePing()0%620%
FillBufferWriterAsync(...)100%11100%
ReleaseStream(...)100%88100%
ThrowIfClosed()100%22100%
WriteConnectionFrame(...)100%22100%
WriteStreamFrame(...)75%8.09888.88%
WriteStreamDataFrameAsync()94.44%36.463692.94%
EncodeStreamFrameHeader()100%22100%
AddStream(...)100%66100%
DecodeParameters(...)75%34.52473.68%
DecodeParamValue()100%1.03170%
EncodeParameters()100%66100%
EncodeParameter()100%11100%
IsUnknownStream(...)100%1212100%
ReadFrameAsync(...)95.65%23.072394.87%
ReadCloseFrameAsync()100%1313100%
ReadPingFrameAndWritePongFrameAsync()100%11100%
ReadPongFrameAsync()66.66%6.09686.66%
ReadStreamWindowUpdateFrameAsync()100%22100%
ReadFrameBodyAsync()100%44100%
ReadFrameHeaderAsync()83.33%6.22681.81%
TryDecodeHeader()75%13.511278.12%
ReadFramesAsync()83.33%6.11685.41%
ReadStreamDataFrameAsync()80.55%64.183672.09%
TryClose(...)100%66100%
WriteFrame(...)100%44100%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Transports/Slic/Internal/SlicConnection.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Internal;
 4using IceRpc.Transports.Internal;
 5using System.Buffers;
 6using System.Collections.Concurrent;
 7using System.Diagnostics;
 8using System.IO.Pipelines;
 9using System.Security.Authentication;
 10using System.Threading.Channels;
 11using ZeroC.Slice;
 12
 13namespace IceRpc.Transports.Slic.Internal;
 14
 15/// <summary>The Slic connection implements an <see cref="IMultiplexedConnection" /> on top of a <see
 16/// cref="IDuplexConnection" />.</summary>
 17internal class SlicConnection : IMultiplexedConnection
 18{
 19    /// <summary>Gets a value indicating whether or not this is the server-side of the connection.</summary>
 1692720    internal bool IsServer { get; }
 21
 22    /// <summary>Gets the minimum size of the segment requested from <see cref="Pool" />.</summary>
 552223    internal int MinSegmentSize { get; }
 24
 25    /// <summary>Gets the peer's initial stream window size. This property is set to the <see
 26    /// cref="ParameterKey.InitialStreamWindowSize"/> value carried by the <see cref="FrameType.Initialize" />
 27    /// frame.</summary>
 335228    internal int PeerInitialStreamWindowSize { get; private set; }
 29
 30    /// <summary>Gets the maximum size of stream frames accepted by the peer. This property is set to the <see
 31    /// cref="ParameterKey.MaxStreamFrameSize"/> value carried by the <see cref="FrameType.Initialize" />
 32    /// frame.</summary>
 979733    internal int PeerMaxStreamFrameSize { get; private set; }
 34
 35    /// <summary>Gets the <see cref="MemoryPool{T}" /> used for obtaining memory buffers.</summary>
 552236    internal MemoryPool<byte> Pool { get; }
 37
 38    /// <summary>Gets the initial stream window size.</summary>
 972739    internal int InitialStreamWindowSize { get; }
 40
 41    /// <summary>Gets the window update threshold. When the window size is increased and this threshold reached, a <see
 42    /// cref="FrameType.StreamWindowUpdate" /> frame is sent.</summary>
 636343    internal int StreamWindowUpdateThreshold => InitialStreamWindowSize / StreamWindowUpdateRatio;
 44
 45    // The ratio used to compute the StreamWindowUpdateThreshold. For now, the stream window update is sent when the
 46    // window size grows over InitialStreamWindowSize / StreamWindowUpdateRatio.
 47    private const int StreamWindowUpdateRatio = 2;
 48
 49    private readonly Channel<IMultiplexedStream> _acceptStreamChannel;
 50    private int _bidirectionalStreamCount;
 51    private SemaphoreSlim? _bidirectionalStreamSemaphore;
 52    private readonly CancellationToken _closedCancellationToken;
 70053    private readonly CancellationTokenSource _closedCts = new();
 54    private string? _closedMessage;
 55    private Task<TransportConnectionInformation>? _connectTask;
 70056    private readonly CancellationTokenSource _disposedCts = new();
 57    private Task? _disposeTask;
 58    private readonly SlicDuplexConnectionDecorator _duplexConnection;
 59    private readonly DuplexConnectionReader _duplexConnectionReader;
 60    private readonly SlicDuplexConnectionWriter _duplexConnectionWriter;
 61    private bool _isClosed;
 62    private ulong? _lastRemoteBidirectionalStreamId;
 63    private ulong? _lastRemoteUnidirectionalStreamId;
 64    private readonly TimeSpan _localIdleTimeout;
 65    private readonly int _maxBidirectionalStreams;
 66    private readonly int _maxStreamFrameSize;
 67    private readonly int _maxUnidirectionalStreams;
 68    // _mutex ensure the assignment of _lastRemoteXxx members and the addition of the stream to _streams is
 69    // an atomic operation.
 70070    private readonly Lock _mutex = new();
 71    private ulong _nextBidirectionalId;
 72    private ulong _nextUnidirectionalId;
 73    private IceRpcError? _peerCloseError;
 70074    private TimeSpan _peerIdleTimeout = Timeout.InfiniteTimeSpan;
 75    private int _pendingPongCount;
 76    private Task? _readFramesTask;
 77
 70078    private readonly ConcurrentDictionary<ulong, SlicStream> _streams = new();
 79    private int _streamSemaphoreWaitCount;
 70080    private readonly TaskCompletionSource _streamSemaphoreWaitClosed =
 70081        new(TaskCreationOptions.RunContinuationsAsynchronously);
 82
 83    private int _unidirectionalStreamCount;
 84    private SemaphoreSlim? _unidirectionalStreamSemaphore;
 85
 86    // This is only set for server connections to ensure that _duplexConnectionWriter.Write is not called after
 87    // _duplexConnectionWriter.Shutdown. This can occur if the client-side of the connection sends the close frame
 88    // followed by the shutdown of the duplex connection and if CloseAsync is called at the same time on the server
 89    // connection.
 90    private bool _writerIsShutdown;
 91
 92    public async ValueTask<IMultiplexedStream> AcceptStreamAsync(CancellationToken cancellationToken)
 237593    {
 94        lock (_mutex)
 237595        {
 237596            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 97
 237498            if (_connectTask is null || !_connectTask.IsCompletedSuccessfully)
 199            {
 1100                throw new InvalidOperationException("Cannot accept stream before connecting the Slic connection.");
 101            }
 2373102            if (_isClosed)
 12103            {
 12104                throw new IceRpcException(_peerCloseError ?? IceRpcError.ConnectionAborted, _closedMessage);
 105            }
 2361106        }
 107
 108        try
 2361109        {
 2361110            return await _acceptStreamChannel.Reader.ReadAsync(cancellationToken).ConfigureAwait(false);
 111        }
 113112        catch (ChannelClosedException exception)
 113113        {
 113114            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 112115            Debug.Assert(exception.InnerException is not null);
 116            // The exception given to ChannelWriter.Complete(Exception? exception) is the InnerException.
 112117            throw ExceptionUtil.Throw(exception.InnerException);
 118        }
 2024119    }
 120
 121    public Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken)
 681122    {
 123        lock (_mutex)
 681124        {
 681125            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 126
 681127            if (_connectTask is not null)
 1128            {
 1129                throw new InvalidOperationException("Cannot connect twice a Slic connection.");
 130            }
 680131            if (_isClosed)
 0132            {
 0133                throw new InvalidOperationException("Cannot connect a closed Slic connection.");
 134            }
 680135            _connectTask = PerformConnectAsync();
 680136        }
 680137        return _connectTask;
 138
 139        async Task<TransportConnectionInformation> PerformConnectAsync()
 680140        {
 680141            await Task.Yield(); // Exit mutex lock
 142
 143            // Connect the duplex connection.
 144            TransportConnectionInformation transportConnectionInformation;
 680145            TimeSpan peerIdleTimeout = TimeSpan.MaxValue;
 146
 147            try
 680148            {
 680149                transportConnectionInformation = await _duplexConnection.ConnectAsync(cancellationToken)
 680150                    .ConfigureAwait(false);
 151
 152                // Initialize the Slic connection.
 658153                if (IsServer)
 331154                {
 155                    // Read the Initialize frame.
 331156                    (ulong version, InitializeBody? initializeBody) = await ReadFrameAsync(
 331157                        DecodeInitialize,
 331158                        cancellationToken).ConfigureAwait(false);
 159
 325160                    if (initializeBody is null)
 2161                    {
 162                        // Unsupported version, try to negotiate another version by sending a Version frame with the
 163                        // Slic versions supported by this server.
 2164                        ulong[] supportedVersions = new ulong[] { SlicDefinitions.V1 };
 165
 2166                        WriteConnectionFrame(FrameType.Version, new VersionBody(supportedVersions).Encode);
 167
 2168                        (version, initializeBody) = await ReadFrameAsync(
 2169                            (frameType, buffer) =>
 2170                            {
 2171                                if (frameType is null)
 1172                                {
 2173                                    // The client shut down the connection because it doesn't support any of the
 2174                                    // server's supported Slic versions.
 1175                                    throw new IceRpcException(
 1176                                        IceRpcError.ConnectionRefused,
 1177                                        $"The connection was refused because the client Slic version {version} is not su
 2178                                }
 2179                                else
 1180                                {
 1181                                    return DecodeInitialize(frameType, buffer);
 2182                                }
 1183                            },
 2184                            cancellationToken).ConfigureAwait(false);
 1185                    }
 186
 324187                    Debug.Assert(initializeBody is not null);
 188
 324189                    DecodeParameters(initializeBody.Value.Parameters);
 190
 191                    // Write back an InitializeAck frame.
 324192                    WriteConnectionFrame(FrameType.InitializeAck, new InitializeAckBody(EncodeParameters()).Encode);
 324193                }
 194                else
 327195                {
 196                    // Write the Initialize frame.
 327197                    WriteConnectionFrame(
 327198                        FrameType.Initialize,
 327199                        (ref SliceEncoder encoder) =>
 327200                        {
 327201                            encoder.EncodeVarUInt62(SlicDefinitions.V1);
 327202                            new InitializeBody(EncodeParameters()).Encode(ref encoder);
 654203                        });
 204
 205                    // Read and decode the InitializeAck or Version frame.
 327206                    (InitializeAckBody? initializeAckBody, VersionBody? versionBody) = await ReadFrameAsync(
 327207                        DecodeInitializeAckOrVersion,
 327208                        cancellationToken).ConfigureAwait(false);
 209
 304210                    Debug.Assert(initializeAckBody is not null || versionBody is not null);
 211
 304212                    if (initializeAckBody is not null)
 302213                    {
 302214                        DecodeParameters(initializeAckBody.Value.Parameters);
 302215                    }
 216
 304217                    if (versionBody is not null)
 2218                    {
 2219                        if (versionBody.Value.Versions.Contains(SlicDefinitions.V1))
 1220                        {
 1221                            throw new InvalidDataException(
 1222                                "The server supported versions include the version initially requested.");
 223                        }
 224                        else
 1225                        {
 226                            // We only support V1 and the peer rejected V1.
 1227                            throw new IceRpcException(
 1228                                IceRpcError.ConnectionRefused,
 1229                                $"The connection was refused because the server only supports Slic version(s) {string.Jo
 230                        }
 231                    }
 302232                }
 626233            }
 6234            catch (InvalidDataException exception)
 6235            {
 6236                throw new IceRpcException(
 6237                    IceRpcError.IceRpcError,
 6238                    "The connection was aborted by a Slic protocol error.",
 6239                    exception);
 240            }
 24241            catch (OperationCanceledException)
 24242            {
 24243                throw;
 244            }
 4245            catch (AuthenticationException)
 4246            {
 4247                throw;
 248            }
 20249            catch (IceRpcException)
 20250            {
 20251                throw;
 252            }
 0253            catch (Exception exception)
 0254            {
 0255                Debug.Fail($"ConnectAsync failed with an unexpected exception: {exception}");
 0256                throw;
 257            }
 258
 259            // Enable the idle timeout checks after the connection establishment. The Ping frames sent by the keep alive
 260            // check are not expected until the Slic connection initialization completes. The idle timeout check uses
 261            // the smallest idle timeout.
 626262            TimeSpan idleTimeout = _peerIdleTimeout == Timeout.InfiniteTimeSpan ? _localIdleTimeout :
 626263                (_peerIdleTimeout < _localIdleTimeout ? _peerIdleTimeout : _localIdleTimeout);
 264
 626265            if (idleTimeout != Timeout.InfiniteTimeSpan)
 624266            {
 624267                _duplexConnection.Enable(idleTimeout);
 624268            }
 269
 626270            _readFramesTask = ReadFramesAsync(_disposedCts.Token);
 271
 626272            return transportConnectionInformation;
 626273        }
 274
 275        static (ulong, InitializeBody?) DecodeInitialize(FrameType? frameType, ReadOnlySequence<byte> buffer)
 327276        {
 327277            if (frameType != FrameType.Initialize)
 0278            {
 0279                throw new InvalidDataException($"Received unexpected {frameType} frame.");
 280            }
 281
 327282            return SliceEncoding.Slice2.DecodeBuffer<(ulong, InitializeBody?)>(
 327283                buffer,
 327284                (ref SliceDecoder decoder) =>
 327285                {
 327286                    ulong version = decoder.DecodeVarUInt62();
 326287                    if (version == SlicDefinitions.V1)
 324288                    {
 324289                        return (version, new InitializeBody(ref decoder));
 327290                    }
 327291                    else
 2292                    {
 2293                        decoder.Skip((int)(buffer.Length - decoder.Consumed));
 2294                        return (version, null);
 327295                    }
 653296                });
 326297        }
 298
 299        static (InitializeAckBody?, VersionBody?) DecodeInitializeAckOrVersion(
 300            FrameType? frameType,
 301            ReadOnlySequence<byte> buffer) =>
 306302            frameType switch
 306303            {
 303304                FrameType.InitializeAck => (
 303305                    SliceEncoding.Slice2.DecodeBuffer(
 303306                        buffer,
 303307                        (ref SliceDecoder decoder) => new InitializeAckBody(ref decoder)),
 303308                    null),
 3309                FrameType.Version => (
 3310                    null,
 3311                    SliceEncoding.Slice2.DecodeBuffer(
 3312                        buffer,
 6313                        (ref SliceDecoder decoder) => new VersionBody(ref decoder))),
 0314                _ => throw new InvalidDataException($"Received unexpected Slic frame: '{frameType}'."),
 306315            };
 316
 317        async ValueTask<T> ReadFrameAsync<T>(
 318            Func<FrameType?, ReadOnlySequence<byte>, T> decodeFunc,
 319            CancellationToken cancellationToken)
 660320        {
 660321            (FrameType FrameType, int FrameSize, ulong?)? header =
 660322                await ReadFrameHeaderAsync(cancellationToken).ConfigureAwait(false);
 323
 324            ReadOnlySequence<byte> buffer;
 634325            if (header is null || header.Value.FrameSize == 0)
 4326            {
 4327                buffer = ReadOnlySequence<byte>.Empty;
 4328            }
 329            else
 630330            {
 630331                buffer = await _duplexConnectionReader.ReadAtLeastAsync(
 630332                    header.Value.FrameSize,
 630333                    cancellationToken).ConfigureAwait(false);
 630334                if (buffer.Length > header.Value.FrameSize)
 8335                {
 8336                    buffer = buffer.Slice(0, header.Value.FrameSize);
 8337                }
 630338            }
 339
 634340            T decodedFrame = decodeFunc(header?.FrameType, buffer);
 630341            _duplexConnectionReader.AdvanceTo(buffer.End);
 630342            return decodedFrame;
 630343        }
 680344    }
 345
 346    public async Task CloseAsync(MultiplexedConnectionCloseError closeError, CancellationToken cancellationToken)
 106347    {
 348        lock (_mutex)
 106349        {
 106350            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 351
 106352            if (_connectTask is null || !_connectTask.IsCompletedSuccessfully)
 1353            {
 1354                throw new InvalidOperationException("Cannot close a Slic connection before connecting it.");
 355            }
 105356        }
 357
 105358        bool waitForWriterShutdown = false;
 105359        if (TryClose(new IceRpcException(IceRpcError.OperationAborted), "The connection was closed."))
 85360        {
 361            lock (_mutex)
 85362            {
 363                // The duplex connection writer of a server connection might already be shutdown
 364                // (_writerIsShutdown=true) if the client-side sent the Close frame and shut down the duplex connection.
 365                // This doesn't apply to the client-side since the server-side doesn't shutdown the duplex connection
 366                // writer after sending the Close frame.
 85367                if (!IsServer || !_writerIsShutdown)
 85368                {
 85369                    WriteFrame(FrameType.Close, streamId: null, new CloseBody((ulong)closeError).Encode);
 85370                    if (IsServer)
 45371                    {
 45372                        _duplexConnectionWriter.Flush();
 45373                    }
 374                    else
 40375                    {
 376                        // The sending of the client-side Close frame is followed by the shutdown of the duplex
 377                        // connection. For TCP, it's important to always shutdown the connection on the client-side
 378                        // first to avoid TIME_WAIT states on the server-side.
 40379                        _duplexConnectionWriter.Shutdown();
 40380                        waitForWriterShutdown = true;
 40381                    }
 85382                }
 85383            }
 85384        }
 385
 105386        if (waitForWriterShutdown)
 40387        {
 40388            await _duplexConnectionWriter.WriterTask.WaitAsync(cancellationToken).ConfigureAwait(false);
 40389        }
 390
 391        // Now, wait for the peer to close the write side of the connection, which will terminate the read frames task.
 105392        Debug.Assert(_readFramesTask is not null);
 105393        await _readFramesTask.WaitAsync(cancellationToken).ConfigureAwait(false);
 103394    }
 395
 396    public async ValueTask<IMultiplexedStream> CreateStreamAsync(
 397        bool bidirectional,
 398        CancellationToken cancellationToken)
 2090399    {
 400        lock (_mutex)
 2090401        {
 2090402            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 403
 2087404            if (_connectTask is null || !_connectTask.IsCompletedSuccessfully)
 2405            {
 2406                throw new InvalidOperationException("Cannot create stream before connecting the Slic connection.");
 407            }
 2085408            if (_isClosed)
 6409            {
 6410                throw new IceRpcException(_peerCloseError ?? IceRpcError.ConnectionAborted, _closedMessage);
 411            }
 412
 2079413            ++_streamSemaphoreWaitCount;
 2079414        }
 415
 416        try
 2079417        {
 2079418            using var createStreamCts = CancellationTokenSource.CreateLinkedTokenSource(
 2079419                _closedCancellationToken,
 2079420                cancellationToken);
 421
 2079422            SemaphoreSlim? streamCountSemaphore = bidirectional ?
 2079423                _bidirectionalStreamSemaphore :
 2079424                _unidirectionalStreamSemaphore;
 425
 2079426            if (streamCountSemaphore is null)
 1427            {
 428                // The stream semaphore is null if the peer's max streams configuration is 0. In this case, we let
 429                // CreateStreamAsync hang indefinitely until the connection is closed.
 1430                await Task.Delay(-1, createStreamCts.Token).ConfigureAwait(false);
 0431            }
 432            else
 2078433            {
 2078434                await streamCountSemaphore.WaitAsync(createStreamCts.Token).ConfigureAwait(false);
 2068435            }
 436
 2068437            return new SlicStream(this, bidirectional, isRemote: false);
 438        }
 11439        catch (OperationCanceledException)
 11440        {
 11441            cancellationToken.ThrowIfCancellationRequested();
 7442            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 6443            Debug.Assert(_isClosed);
 6444            throw new IceRpcException(_peerCloseError ?? IceRpcError.OperationAborted, _closedMessage);
 445        }
 446        finally
 2079447        {
 448            lock (_mutex)
 2079449            {
 2079450                --_streamSemaphoreWaitCount;
 2079451                if (_isClosed && _streamSemaphoreWaitCount == 0)
 7452                {
 7453                    _streamSemaphoreWaitClosed.SetResult();
 7454                }
 2079455            }
 2079456        }
 2068457    }
 458
 459    public ValueTask DisposeAsync()
 963460    {
 461        lock (_mutex)
 963462        {
 963463            _disposeTask ??= PerformDisposeAsync();
 963464        }
 963465        return new(_disposeTask);
 466
 467        async Task PerformDisposeAsync()
 699468        {
 469            // Make sure we execute the code below without holding the mutex lock.
 699470            await Task.Yield();
 699471            TryClose(new IceRpcException(IceRpcError.OperationAborted), "The connection was disposed.");
 472
 699473            _disposedCts.Cancel();
 474
 475            try
 699476            {
 699477                await Task.WhenAll(
 699478                    _connectTask ?? Task.CompletedTask,
 699479                    _readFramesTask ?? Task.CompletedTask,
 699480                    _streamSemaphoreWaitClosed.Task).ConfigureAwait(false);
 399481            }
 300482            catch
 300483            {
 484                // Expected if any of these tasks failed or was canceled. Each task takes care of handling unexpected
 485                // exceptions so there's no need to handle them here.
 300486            }
 487
 488            // Clean-up the streams that might still be queued on the channel.
 724489            while (_acceptStreamChannel.Reader.TryRead(out IMultiplexedStream? stream))
 25490            {
 25491                if (stream.IsBidirectional)
 5492                {
 5493                    stream.Output.Complete();
 5494                    stream.Input.Complete();
 5495                }
 20496                else if (stream.IsRemote)
 20497                {
 20498                    stream.Input.Complete();
 20499                }
 500                else
 0501                {
 0502                    stream.Output.Complete();
 0503                }
 25504            }
 505
 506            try
 699507            {
 508                // Prevents unobserved task exceptions.
 699509                await _acceptStreamChannel.Reader.Completion.ConfigureAwait(false);
 0510            }
 699511            catch
 699512            {
 699513            }
 514
 699515            await _duplexConnectionWriter.DisposeAsync().ConfigureAwait(false);
 699516            _duplexConnectionReader.Dispose();
 699517            _duplexConnection.Dispose();
 518
 699519            _disposedCts.Dispose();
 699520            _bidirectionalStreamSemaphore?.Dispose();
 699521            _unidirectionalStreamSemaphore?.Dispose();
 699522            _closedCts.Dispose();
 699523        }
 963524    }
 525
 700526    internal SlicConnection(
 700527        IDuplexConnection duplexConnection,
 700528        MultiplexedConnectionOptions options,
 700529        SlicTransportOptions slicOptions,
 700530        bool isServer)
 700531    {
 700532        IsServer = isServer;
 533
 700534        Pool = options.Pool;
 700535        MinSegmentSize = options.MinSegmentSize;
 700536        _maxBidirectionalStreams = options.MaxBidirectionalStreams;
 700537        _maxUnidirectionalStreams = options.MaxUnidirectionalStreams;
 538
 700539        InitialStreamWindowSize = slicOptions.InitialStreamWindowSize;
 700540        _localIdleTimeout = slicOptions.IdleTimeout;
 700541        _maxStreamFrameSize = slicOptions.MaxStreamFrameSize;
 542
 700543        _acceptStreamChannel = Channel.CreateUnbounded<IMultiplexedStream>(new UnboundedChannelOptions
 700544        {
 700545            SingleReader = true,
 700546            SingleWriter = true
 700547        });
 548
 700549        _closedCancellationToken = _closedCts.Token;
 550
 551        // Only the client-side sends pings to keep the connection alive when idle timeout (set later) is not infinite.
 700552        _duplexConnection = IsServer ?
 700553            new SlicDuplexConnectionDecorator(duplexConnection) :
 700554            new SlicDuplexConnectionDecorator(duplexConnection, SendReadPing, SendWritePing);
 555
 700556        _duplexConnectionReader = new DuplexConnectionReader(_duplexConnection, options.Pool, options.MinSegmentSize);
 700557        _duplexConnectionWriter = new SlicDuplexConnectionWriter(
 700558            _duplexConnection,
 700559            options.Pool,
 700560            options.MinSegmentSize);
 561
 562        // We use the same stream ID numbering scheme as QUIC.
 700563        if (IsServer)
 345564        {
 345565            _nextBidirectionalId = 1;
 345566            _nextUnidirectionalId = 3;
 345567        }
 568        else
 355569        {
 355570            _nextBidirectionalId = 0;
 355571            _nextUnidirectionalId = 2;
 355572        }
 573
 574        void SendPing(long payload)
 15575        {
 576            try
 15577            {
 15578                WriteConnectionFrame(FrameType.Ping, new PingBody(payload).Encode);
 14579            }
 1580            catch (IceRpcException)
 1581            {
 582                // Expected if the connection is closed.
 1583            }
 0584            catch (Exception exception)
 0585            {
 0586                Debug.Fail($"The sending of a Ping frame failed with an unexpected exception: {exception}");
 0587                throw;
 588            }
 15589        }
 590
 591        void SendReadPing()
 15592        {
 593            // This local function is no-op if there is already a pending Pong.
 15594            if (Interlocked.CompareExchange(ref _pendingPongCount, 1, 0) == 0)
 15595            {
 15596                SendPing(1L);
 15597            }
 15598        }
 599
 600        void SendWritePing()
 0601        {
 602            // _pendingPongCount can be <= 0 if an unexpected pong is received. If it's the case, the connection is
 603            // being torn down and there's no point in sending a ping frame.
 0604            if (Interlocked.Increment(ref _pendingPongCount) > 0)
 0605            {
 0606                SendPing(0L);
 0607            }
 0608        }
 700609    }
 610
 611    /// <summary>Fills the given writer with stream data received on the connection.</summary>
 612    /// <param name="bufferWriter">The destination buffer writer.</param>
 613    /// <param name="byteCount">The amount of stream data to read.</param>
 614    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 615    internal ValueTask FillBufferWriterAsync(
 616        IBufferWriter<byte> bufferWriter,
 617        int byteCount,
 618        CancellationToken cancellationToken) =>
 9536619        _duplexConnectionReader.FillBufferWriterAsync(bufferWriter, byteCount, cancellationToken);
 620
 621    /// <summary>Releases a stream from the connection. The connection stream count is decremented and if this is a
 622    /// client allow a new stream to be started.</summary>
 623    /// <param name="stream">The released stream.</param>
 624    internal void ReleaseStream(SlicStream stream)
 4101625    {
 4101626        Debug.Assert(stream.IsStarted);
 627
 4101628        _streams.Remove(stream.Id, out SlicStream? _);
 629
 4101630        if (stream.IsRemote)
 2049631        {
 2049632            if (stream.IsBidirectional)
 656633            {
 656634                Interlocked.Decrement(ref _bidirectionalStreamCount);
 656635            }
 636            else
 1393637            {
 1393638                Interlocked.Decrement(ref _unidirectionalStreamCount);
 1393639            }
 2049640        }
 2052641        else if (!_isClosed)
 1668642        {
 1668643            if (stream.IsBidirectional)
 579644            {
 579645                _bidirectionalStreamSemaphore!.Release();
 579646            }
 647            else
 1089648            {
 1089649                _unidirectionalStreamSemaphore!.Release();
 1089650            }
 1668651        }
 4101652    }
 653
 654    /// <summary>Throws the connection closure exception if the connection is already closed.</summary>
 655    internal void ThrowIfClosed()
 7899656    {
 657        lock (_mutex)
 7899658        {
 7899659            if (_isClosed)
 9660            {
 9661                throw new IceRpcException(_peerCloseError ?? IceRpcError.ConnectionAborted, _closedMessage);
 662            }
 7890663        }
 7890664    }
 665
 666    /// <summary>Writes a connection frame.</summary>
 667    /// <param name="frameType">The frame type.</param>
 668    /// <param name="encode">The action to encode the frame.</param>
 669    internal void WriteConnectionFrame(FrameType frameType, EncodeAction? encode)
 682670    {
 682671        Debug.Assert(frameType < FrameType.Stream);
 672
 673        lock (_mutex)
 682674        {
 682675            if (_isClosed)
 1676            {
 1677                throw new IceRpcException(_peerCloseError ?? IceRpcError.ConnectionAborted, _closedMessage);
 678            }
 681679            WriteFrame(frameType, streamId: null, encode);
 681680            _duplexConnectionWriter.Flush();
 681681        }
 681682    }
 683
 684    /// <summary>Writes a stream frame.</summary>
 685    /// <param name="stream">The stream to write the frame for.</param>
 686    /// <param name="frameType">The frame type.</param>
 687    /// <param name="encode">The action to encode the frame.</param>
 688    /// <param name="writeReadsClosedFrame"><see langword="true" /> if a <see cref="FrameType.StreamReadsClosed" />
 689    /// frame should be written after the stream frame.</param>
 690    /// <remarks>This method is called by streams and might be called on a closed connection. The connection might
 691    /// also be closed concurrently while it's in progress.</remarks>
 692    internal void WriteStreamFrame(
 693        SlicStream stream,
 694        FrameType frameType,
 695        EncodeAction? encode,
 696        bool writeReadsClosedFrame)
 2959697    {
 698        // Ensure that this method is called for any FrameType.StreamXxx frame type except FrameType.Stream.
 2959699        Debug.Assert(frameType >= FrameType.StreamLast && stream.IsStarted);
 700
 701        lock (_mutex)
 2959702        {
 2959703            if (_isClosed)
 0704            {
 0705                return;
 706            }
 707
 2959708            WriteFrame(frameType, stream.Id, encode);
 2959709            if (writeReadsClosedFrame)
 79710            {
 79711                WriteFrame(FrameType.StreamReadsClosed, stream.Id, encode: null);
 79712            }
 2959713            if (frameType == FrameType.StreamLast)
 566714            {
 715                // Notify the stream that the last stream frame is considered sent at this point. This will close
 716                // writes on the stream and allow the stream to be released if reads are also closed.
 566717                stream.WroteLastStreamFrame();
 566718            }
 2959719            _duplexConnectionWriter.Flush();
 2959720        }
 2959721    }
 722
 723    /// <summary>Writes a stream data frame.</summary>
 724    /// <param name="stream">The stream to write the frame for.</param>
 725    /// <param name="source1">The first stream frame data source.</param>
 726    /// <param name="source2">The second stream frame data source.</param>
 727    /// <param name="endStream"><see langword="true" /> to write a <see cref="FrameType.StreamLast" /> frame and
 728    /// <see langword="false" /> to write a <see cref="FrameType.Stream" /> frame.</param>
 729    /// <param name="writeReadsClosedFrame"><see langword="true" /> if a <see cref="FrameType.StreamReadsClosed" />
 730    /// frame should be written after the stream frame.</param>
 731    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 732    /// <remarks>This method is called by streams and might be called on a closed connection. The connection might
 733    /// also be closed concurrently while it's in progress.</remarks>
 734    internal async ValueTask<FlushResult> WriteStreamDataFrameAsync(
 735        SlicStream stream,
 736        ReadOnlySequence<byte> source1,
 737        ReadOnlySequence<byte> source2,
 738        bool endStream,
 739        bool writeReadsClosedFrame,
 740        CancellationToken cancellationToken)
 7869741    {
 7869742        Debug.Assert(!source1.IsEmpty || endStream);
 743
 7869744        if (_connectTask is null)
 0745        {
 0746            throw new InvalidOperationException("Cannot send a stream frame before calling ConnectAsync.");
 747        }
 748
 7869749        using var writeCts = CancellationTokenSource.CreateLinkedTokenSource(
 7869750            _closedCancellationToken,
 7869751            cancellationToken);
 752
 753        try
 7869754        {
 755            do
 10183756            {
 757                // Next, ensure send credit is available. If not, this will block until the receiver allows sending
 758                // additional data.
 10183759                int sendCredit = 0;
 10183760                if (!source1.IsEmpty || !source2.IsEmpty)
 10178761                {
 10178762                    sendCredit = await stream.AcquireSendCreditAsync(writeCts.Token).ConfigureAwait(false);
 9164763                    Debug.Assert(sendCredit > 0);
 9164764                }
 765
 766                // Gather data from source1 or source2 up to sendCredit bytes or the peer maximum stream frame size.
 9169767                int sendMaxSize = Math.Min(sendCredit, PeerMaxStreamFrameSize);
 768                ReadOnlySequence<byte> sendSource1;
 769                ReadOnlySequence<byte> sendSource2;
 9169770                if (!source1.IsEmpty)
 8164771                {
 8164772                    int length = Math.Min((int)source1.Length, sendMaxSize);
 8164773                    sendSource1 = source1.Slice(0, length);
 8164774                    source1 = source1.Slice(length);
 8164775                }
 776                else
 1005777                {
 1005778                    sendSource1 = ReadOnlySequence<byte>.Empty;
 1005779                }
 780
 9169781                if (source1.IsEmpty && !source2.IsEmpty)
 2057782                {
 2057783                    int length = Math.Min((int)source2.Length, sendMaxSize - (int)sendSource1.Length);
 2057784                    sendSource2 = source2.Slice(0, length);
 2057785                    source2 = source2.Slice(length);
 2057786                }
 787                else
 7112788                {
 7112789                    sendSource2 = ReadOnlySequence<byte>.Empty;
 7112790                }
 791
 792                // If there's no data left to send and endStream is true, it's the last stream frame.
 9169793                bool lastStreamFrame = endStream && source1.IsEmpty && source2.IsEmpty;
 794
 795                lock (_mutex)
 9169796                {
 9169797                    if (_isClosed)
 0798                    {
 0799                        throw new IceRpcException(_peerCloseError ?? IceRpcError.ConnectionAborted, _closedMessage);
 800                    }
 801
 9169802                    if (!stream.IsStarted)
 2052803                    {
 2052804                        if (stream.IsBidirectional)
 655805                        {
 655806                            AddStream(_nextBidirectionalId, stream);
 655807                            _nextBidirectionalId += 4;
 655808                        }
 809                        else
 1397810                        {
 1397811                            AddStream(_nextUnidirectionalId, stream);
 1397812                            _nextUnidirectionalId += 4;
 1397813                        }
 2052814                    }
 815
 816                    // Notify the stream that we're consuming sendSize credit. It's important to call this before
 817                    // sending the stream frame to avoid race conditions where the StreamWindowUpdate frame could be
 818                    // received before the send credit was updated.
 9169819                    if (sendCredit > 0)
 9164820                    {
 9164821                        stream.ConsumedSendCredit((int)(sendSource1.Length + sendSource2.Length));
 9164822                    }
 823
 9169824                    EncodeStreamFrameHeader(stream.Id, sendSource1.Length + sendSource2.Length, lastStreamFrame);
 825
 9169826                    if (lastStreamFrame)
 781827                    {
 828                        // Notify the stream that the last stream frame is considered sent at this point. This will
 829                        // complete writes on the stream and allow the stream to be released if reads are also
 830                        // completed.
 781831                        stream.WroteLastStreamFrame();
 781832                    }
 833
 834                    // Write and flush the stream frame.
 9169835                    if (!sendSource1.IsEmpty)
 8164836                    {
 8164837                        _duplexConnectionWriter.Write(sendSource1);
 8164838                    }
 9169839                    if (!sendSource2.IsEmpty)
 2057840                    {
 2057841                        _duplexConnectionWriter.Write(sendSource2);
 2057842                    }
 843
 9169844                    if (writeReadsClosedFrame)
 371845                    {
 371846                        WriteFrame(FrameType.StreamReadsClosed, stream.Id, encode: null);
 371847                    }
 9169848                    _duplexConnectionWriter.Flush();
 9169849                }
 9169850            }
 9169851            while (!source1.IsEmpty || !source2.IsEmpty); // Loop until there's no data left to send.
 6855852        }
 1014853        catch (OperationCanceledException)
 1014854        {
 1014855            cancellationToken.ThrowIfCancellationRequested();
 856
 0857            Debug.Assert(_isClosed);
 0858            throw new IceRpcException(_peerCloseError ?? IceRpcError.OperationAborted, _closedMessage);
 859        }
 860
 6855861        return new FlushResult(isCanceled: false, isCompleted: false);
 862
 863        void EncodeStreamFrameHeader(ulong streamId, long size, bool lastStreamFrame)
 9169864        {
 9169865            var encoder = new SliceEncoder(_duplexConnectionWriter, SliceEncoding.Slice2);
 9169866            encoder.EncodeFrameType(!lastStreamFrame ? FrameType.Stream : FrameType.StreamLast);
 9169867            Span<byte> sizePlaceholder = encoder.GetPlaceholderSpan(4);
 9169868            int startPos = encoder.EncodedByteCount;
 9169869            encoder.EncodeVarUInt62(streamId);
 9169870            SliceEncoder.EncodeVarUInt62((ulong)(encoder.EncodedByteCount - startPos + size), sizePlaceholder);
 9169871        }
 6855872    }
 873
 874    private void AddStream(ulong id, SlicStream stream)
 4102875    {
 876        lock (_mutex)
 4102877        {
 4102878            if (_isClosed)
 1879            {
 1880                throw new IceRpcException(_peerCloseError ?? IceRpcError.ConnectionAborted, _closedMessage);
 881            }
 882
 4101883            _streams[id] = stream;
 884
 885            // Assign the stream ID within the mutex to ensure that the addition of the stream to the connection and the
 886            // stream ID assignment are atomic.
 4101887            stream.Id = id;
 888
 889            // Keep track of the last assigned stream ID. This is used to figure out if the stream is known or unknown.
 4101890            if (stream.IsRemote)
 2049891            {
 2049892                if (stream.IsBidirectional)
 656893                {
 656894                    _lastRemoteBidirectionalStreamId = id;
 656895                }
 896                else
 1393897                {
 1393898                    _lastRemoteUnidirectionalStreamId = id;
 1393899                }
 2049900            }
 4101901        }
 4101902    }
 903
 904    private void DecodeParameters(IDictionary<ParameterKey, IList<byte>> parameters)
 626905    {
 626906        int? maxStreamFrameSize = null;
 626907        int? peerInitialStreamWindowSize = null;
 7916908        foreach ((ParameterKey key, IList<byte> buffer) in parameters)
 3019909        {
 3019910            switch (key)
 911            {
 912                case ParameterKey.MaxBidirectionalStreams:
 555913                {
 555914                    int value = DecodeParamValue(buffer);
 555915                    if (value > 0)
 555916                    {
 555917                        _bidirectionalStreamSemaphore = new SemaphoreSlim(value, value);
 555918                    }
 555919                    break;
 920                }
 921                case ParameterKey.MaxUnidirectionalStreams:
 607922                {
 607923                    int value = DecodeParamValue(buffer);
 607924                    if (value > 0)
 607925                    {
 607926                        _unidirectionalStreamSemaphore = new SemaphoreSlim(value, value);
 607927                    }
 607928                    break;
 929                }
 930                case ParameterKey.IdleTimeout:
 605931                {
 605932                    _peerIdleTimeout = TimeSpan.FromMilliseconds(DecodeParamValue(buffer));
 605933                    if (_peerIdleTimeout == TimeSpan.Zero)
 0934                    {
 0935                        throw new InvalidDataException(
 0936                            "The IdleTimeout Slic connection parameter is invalid, it must be greater than 0 s.");
 937                    }
 605938                    break;
 939                }
 940                case ParameterKey.MaxStreamFrameSize:
 626941                {
 626942                    maxStreamFrameSize = DecodeParamValue(buffer);
 626943                    if (maxStreamFrameSize < 1024)
 0944                    {
 0945                        throw new InvalidDataException(
 0946                            "The MaxStreamFrameSize connection parameter is invalid, it must be greater than 1KB.");
 947                    }
 626948                    break;
 949                }
 950                case ParameterKey.InitialStreamWindowSize:
 626951                {
 626952                    peerInitialStreamWindowSize = DecodeParamValue(buffer);
 626953                    if (peerInitialStreamWindowSize < 1024)
 0954                    {
 0955                        throw new InvalidDataException(
 0956                            "The InitialStreamWindowSize connection parameter is invalid, it must be greater than 1KB.")
 957                    }
 626958                    break;
 959                }
 960                // Ignore unsupported parameter.
 961            }
 3019962        }
 963
 626964        if (maxStreamFrameSize is null)
 0965        {
 0966            throw new InvalidDataException(
 0967                "The peer didn't send the required MaxStreamFrameSize connection parameter.");
 968        }
 969        else
 626970        {
 626971            PeerMaxStreamFrameSize = maxStreamFrameSize.Value;
 626972        }
 973
 626974        if (peerInitialStreamWindowSize is null)
 0975        {
 0976            throw new InvalidDataException(
 0977                "The peer didn't send the required InitialStreamWindowSize connection parameter.");
 978        }
 979        else
 626980        {
 626981            PeerInitialStreamWindowSize = peerInitialStreamWindowSize.Value;
 626982        }
 983
 984        // all parameter values are currently integers in the range 0..Int32Max encoded as varuint62.
 985        static int DecodeParamValue(IList<byte> buffer)
 3019986        {
 987            // The IList<byte> decoded by the IceRPC + Slice integration is backed by an array
 3019988            ulong value = SliceEncoding.Slice2.DecodeBuffer(
 3019989                new ReadOnlySequence<byte>((byte[])buffer),
 6038990                (ref SliceDecoder decoder) => decoder.DecodeVarUInt62());
 991            try
 3019992            {
 3019993                return checked((int)value);
 994            }
 0995            catch (OverflowException exception)
 0996            {
 0997                throw new InvalidDataException("The value is out of the varuint32 accepted range.", exception);
 998            }
 3019999        }
 6261000    }
 1001
 1002    private Dictionary<ParameterKey, IList<byte>> EncodeParameters()
 6511003    {
 6511004        var parameters = new List<KeyValuePair<ParameterKey, IList<byte>>>
 6511005        {
 6511006            // Required parameters.
 6511007            EncodeParameter(ParameterKey.MaxStreamFrameSize, (ulong)_maxStreamFrameSize),
 6511008            EncodeParameter(ParameterKey.InitialStreamWindowSize, (ulong)InitialStreamWindowSize)
 6511009        };
 1010
 1011        // Optional parameters.
 6511012        if (_localIdleTimeout != Timeout.InfiniteTimeSpan)
 6491013        {
 6491014            parameters.Add(EncodeParameter(ParameterKey.IdleTimeout, (ulong)_localIdleTimeout.TotalMilliseconds));
 6491015        }
 6511016        if (_maxBidirectionalStreams > 0)
 5921017        {
 5921018            parameters.Add(EncodeParameter(ParameterKey.MaxBidirectionalStreams, (ulong)_maxBidirectionalStreams));
 5921019        }
 6511020        if (_maxUnidirectionalStreams > 0)
 6511021        {
 6511022            parameters.Add(EncodeParameter(ParameterKey.MaxUnidirectionalStreams, (ulong)_maxUnidirectionalStreams));
 6511023        }
 1024
 6511025        return new Dictionary<ParameterKey, IList<byte>>(parameters);
 1026
 1027        static KeyValuePair<ParameterKey, IList<byte>> EncodeParameter(ParameterKey key, ulong value)
 31941028        {
 31941029            int sizeLength = SliceEncoder.GetVarUInt62EncodedSize(value);
 31941030            byte[] buffer = new byte[sizeLength];
 31941031            SliceEncoder.EncodeVarUInt62(value, buffer);
 31941032            return new(key, buffer);
 31941033        }
 6511034    }
 1035
 1036    private bool IsUnknownStream(ulong streamId)
 48841037    {
 48841038        bool isRemote = streamId % 2 == (IsServer ? 0ul : 1ul);
 48841039        bool isBidirectional = streamId % 4 < 2;
 48841040        if (isRemote)
 26401041        {
 26401042            if (isBidirectional)
 12121043            {
 12121044                return _lastRemoteBidirectionalStreamId is null || streamId > _lastRemoteBidirectionalStreamId;
 1045            }
 1046            else
 14281047            {
 14281048                return _lastRemoteUnidirectionalStreamId is null || streamId > _lastRemoteUnidirectionalStreamId;
 1049            }
 1050        }
 1051        else
 22441052        {
 22441053            if (isBidirectional)
 11141054            {
 11141055                return streamId >= _nextBidirectionalId;
 1056            }
 1057            else
 11301058            {
 11301059                return streamId >= _nextUnidirectionalId;
 1060            }
 1061        }
 48841062    }
 1063
 1064    private Task ReadFrameAsync(FrameType frameType, int size, ulong? streamId, CancellationToken cancellationToken)
 125161065    {
 125161066        if (frameType >= FrameType.Stream && streamId is null)
 01067        {
 01068            throw new InvalidDataException("Received stream frame without stream ID.");
 1069        }
 1070
 125161071        switch (frameType)
 1072        {
 1073            case FrameType.Close:
 871074            {
 871075                return ReadCloseFrameAsync(size, cancellationToken);
 1076            }
 1077            case FrameType.Ping:
 161078            {
 161079                return ReadPingFrameAndWritePongFrameAsync(size, cancellationToken);
 1080            }
 1081            case FrameType.Pong:
 171082            {
 171083                return ReadPongFrameAsync(size, cancellationToken);
 1084            }
 1085            case FrameType.Stream:
 1086            case FrameType.StreamLast:
 96221087            {
 96221088                return ReadStreamDataFrameAsync(frameType, size, streamId!.Value, cancellationToken);
 1089            }
 1090            case FrameType.StreamWindowUpdate:
 10111091            {
 10111092                if (IsUnknownStream(streamId!.Value))
 11093                {
 11094                    throw new InvalidDataException($"Received {frameType} frame for unknown stream.");
 1095                }
 1096
 10101097                return ReadStreamWindowUpdateFrameAsync(size, streamId!.Value, cancellationToken);
 1098            }
 1099            case FrameType.StreamReadsClosed:
 1100            case FrameType.StreamWritesClosed:
 17601101            {
 17601102                if (size > 0)
 21103                {
 21104                    throw new InvalidDataException($"Unexpected body for {frameType} frame.");
 1105                }
 17581106                if (IsUnknownStream(streamId!.Value))
 21107                {
 21108                    throw new InvalidDataException($"Received {frameType} frame for unknown stream.");
 1109                }
 1110
 17561111                if (_streams.TryGetValue(streamId.Value, out SlicStream? stream))
 13041112                {
 13041113                    if (frameType == FrameType.StreamWritesClosed)
 401114                    {
 401115                        stream.ReceivedWritesClosedFrame();
 401116                    }
 1117                    else
 12641118                    {
 12641119                        stream.ReceivedReadsClosedFrame();
 12641120                    }
 13041121                }
 17561122                return Task.CompletedTask;
 1123            }
 1124            default:
 31125            {
 31126                throw new InvalidDataException($"Received unexpected {frameType} frame.");
 1127            }
 1128        }
 1129
 1130        async Task ReadCloseFrameAsync(int size, CancellationToken cancellationToken)
 871131        {
 871132            CloseBody closeBody = await ReadFrameBodyAsync(
 871133                FrameType.Close,
 871134                size,
 861135                (ref SliceDecoder decoder) => new CloseBody(ref decoder),
 871136                cancellationToken).ConfigureAwait(false);
 1137
 851138            IceRpcError? peerCloseError = closeBody.ApplicationErrorCode switch
 851139            {
 631140                (ulong)MultiplexedConnectionCloseError.NoError => IceRpcError.ConnectionClosedByPeer,
 41141                (ulong)MultiplexedConnectionCloseError.Refused => IceRpcError.ConnectionRefused,
 81142                (ulong)MultiplexedConnectionCloseError.ServerBusy => IceRpcError.ServerBusy,
 51143                (ulong)MultiplexedConnectionCloseError.Aborted => IceRpcError.ConnectionAborted,
 51144                _ => null
 851145            };
 1146
 1147            bool notAlreadyClosed;
 851148            if (peerCloseError is null)
 51149            {
 51150                notAlreadyClosed = TryClose(
 51151                    new IceRpcException(IceRpcError.ConnectionAborted),
 51152                    $"The connection was closed by the peer with an unknown application error code: '{closeBody.Applicat
 51153                    IceRpcError.ConnectionAborted);
 51154            }
 1155            else
 801156            {
 801157                notAlreadyClosed = TryClose(
 801158                    new IceRpcException(peerCloseError.Value),
 801159                    "The connection was closed by the peer.",
 801160                    peerCloseError);
 801161            }
 1162
 1163            // The server-side of the duplex connection is only shutdown once the client-side is shutdown. When using
 1164            // TCP, this ensures that the server TCP connection won't end-up in the TIME_WAIT state on the server-side.
 851165            if (notAlreadyClosed && !IsServer)
 311166            {
 1167                // DisposeAsync waits for the reads frames task to complete before disposing the writer.
 1168                lock (_mutex)
 311169                {
 311170                    _duplexConnectionWriter.Shutdown();
 311171                }
 311172                await _duplexConnectionWriter.WriterTask.WaitAsync(cancellationToken).ConfigureAwait(false);
 291173            }
 831174        }
 1175
 1176        async Task ReadPingFrameAndWritePongFrameAsync(int size, CancellationToken cancellationToken)
 161177        {
 1178            // Read the ping frame.
 161179            PingBody pingBody = await ReadFrameBodyAsync(
 161180                FrameType.Ping,
 161181                size,
 151182                (ref SliceDecoder decoder) => new PingBody(ref decoder),
 161183                cancellationToken).ConfigureAwait(false);
 1184
 1185            // Return a pong frame with the ping payload.
 141186            WriteConnectionFrame(FrameType.Pong, new PongBody(pingBody.Payload).Encode);
 141187        }
 1188
 1189        async Task ReadPongFrameAsync(int size, CancellationToken cancellationToken)
 171190        {
 171191            if (Interlocked.Decrement(ref _pendingPongCount) >= 0)
 141192            {
 1193                // Ensure the pong frame payload value is expected.
 1194
 141195                PongBody pongBody = await ReadFrameBodyAsync(
 141196                    FrameType.Pong,
 141197                    size,
 141198                    (ref SliceDecoder decoder) => new PongBody(ref decoder),
 141199                    cancellationToken).ConfigureAwait(false);
 1200
 1201                // For now, we only send a 0 or 1 payload value (0 for "write ping" and 1 for "read ping").
 141202                if (pongBody.Payload != 0L && pongBody.Payload != 1L)
 01203                {
 01204                    throw new InvalidDataException($"Received {nameof(FrameType.Pong)} with unexpected payload.");
 1205                }
 141206            }
 1207            else
 31208            {
 1209                // If not waiting for a pong frame, this pong frame is unexpected.
 31210                throw new InvalidDataException($"Received unexpected {nameof(FrameType.Pong)} frame.");
 1211            }
 141212        }
 1213
 1214        async Task ReadStreamWindowUpdateFrameAsync(int size, ulong streamId, CancellationToken cancellationToken)
 10101215        {
 10101216            StreamWindowUpdateBody frame = await ReadFrameBodyAsync(
 10101217                FrameType.StreamWindowUpdate,
 10101218                size,
 10101219                (ref SliceDecoder decoder) => new StreamWindowUpdateBody(ref decoder),
 10101220                cancellationToken).ConfigureAwait(false);
 10101221            if (_streams.TryGetValue(streamId, out SlicStream? stream))
 9681222            {
 9681223                stream.ReceivedWindowUpdateFrame(frame);
 9681224            }
 10101225        }
 1226
 1227        async Task<T> ReadFrameBodyAsync<T>(
 1228            FrameType frameType,
 1229            int size,
 1230            DecodeFunc<T> decodeFunc,
 1231            CancellationToken cancellationToken)
 11271232        {
 11271233            if (size <= 0)
 21234            {
 21235                throw new InvalidDataException($"Unexpected empty body for {frameType} frame.");
 1236            }
 1237
 11251238            ReadOnlySequence<byte> buffer = await _duplexConnectionReader.ReadAtLeastAsync(size, cancellationToken)
 11251239                .ConfigureAwait(false);
 1240
 11251241            if (buffer.Length > size)
 6491242            {
 6491243                buffer = buffer.Slice(0, size);
 6491244            }
 1245
 11251246            T decodedFrame = SliceEncoding.Slice2.DecodeBuffer(buffer, decodeFunc);
 11231247            _duplexConnectionReader.AdvanceTo(buffer.End);
 11231248            return decodedFrame;
 11231249        }
 125081250    }
 1251
 1252    private async ValueTask<(FrameType FrameType, int FrameSize, ulong? StreamId)?> ReadFrameHeaderAsync(
 1253        CancellationToken cancellationToken)
 137821254    {
 137821255        while (true)
 137821256        {
 1257            // Read data from the pipe reader.
 137821258            if (!_duplexConnectionReader.TryRead(out ReadOnlySequence<byte> buffer))
 87561259            {
 87561260                buffer = await _duplexConnectionReader.ReadAsync(cancellationToken).ConfigureAwait(false);
 82651261            }
 1262
 132911263            if (buffer.IsEmpty)
 1371264            {
 1371265                return null;
 1266            }
 1267
 131541268            if (TryDecodeHeader(
 131541269                buffer,
 131541270                out (FrameType FrameType, int FrameSize, ulong? StreamId) header,
 131541271                out int consumed))
 131491272            {
 131491273                _duplexConnectionReader.AdvanceTo(buffer.GetPosition(consumed));
 131491274                return header;
 1275            }
 1276            else
 01277            {
 01278                _duplexConnectionReader.AdvanceTo(buffer.Start, buffer.End);
 01279            }
 01280        }
 1281
 1282        static bool TryDecodeHeader(
 1283            ReadOnlySequence<byte> buffer,
 1284            out (FrameType FrameType, int FrameSize, ulong? StreamId) header,
 1285            out int consumed)
 131541286        {
 131541287            header = default;
 131541288            consumed = default;
 1289
 131541290            var decoder = new SliceDecoder(buffer, SliceEncoding.Slice2);
 1291
 1292            // Decode the frame type and frame size.
 131541293            if (!decoder.TryDecodeUInt8(out byte frameType) || !decoder.TryDecodeVarUInt62(out ulong frameSize))
 01294            {
 01295                return false;
 1296            }
 1297
 131541298            header.FrameType = frameType.AsFrameType();
 1299            try
 131511300            {
 131511301                header.FrameSize = checked((int)frameSize);
 131511302            }
 01303            catch (OverflowException exception)
 01304            {
 01305                throw new InvalidDataException("The frame size can't be larger than int.MaxValue.", exception);
 1306            }
 1307
 1308            // If it's a stream frame, try to decode the stream ID
 131511309            if (header.FrameType >= FrameType.Stream)
 123951310            {
 123951311                if (header.FrameSize == 0)
 11312                {
 11313                    throw new InvalidDataException("Invalid stream frame size.");
 1314                }
 1315
 123941316                consumed = (int)decoder.Consumed;
 123941317                if (!decoder.TryDecodeVarUInt62(out ulong streamId))
 01318                {
 01319                    return false;
 1320                }
 123941321                header.StreamId = streamId;
 123941322                header.FrameSize -= (int)decoder.Consumed - consumed;
 1323
 123941324                if (header.FrameSize < 0)
 11325                {
 11326                    throw new InvalidDataException("Invalid stream frame size.");
 1327                }
 123931328            }
 1329
 131491330            consumed = (int)decoder.Consumed;
 131491331            return true;
 131491332        }
 132861333    }
 1334
 1335    private async Task ReadFramesAsync(CancellationToken cancellationToken)
 6261336    {
 1337        try
 6261338        {
 131221339            while (true)
 131221340            {
 131221341                (FrameType Type, int Size, ulong? StreamId)? header = await ReadFrameHeaderAsync(cancellationToken)
 131221342                    .ConfigureAwait(false);
 1343
 126521344                if (header is null)
 1361345                {
 1346                    lock (_mutex)
 1361347                    {
 1361348                        if (!_isClosed)
 01349                        {
 1350                            // Unexpected duplex connection shutdown.
 01351                            throw new IceRpcException(IceRpcError.ConnectionAborted);
 1352                        }
 1361353                    }
 1354                    // The peer has shut down the duplex connection.
 1361355                    break;
 1356                }
 1357
 125161358                await ReadFrameAsync(header.Value.Type, header.Value.Size, header.Value.StreamId, cancellationToken)
 125161359                    .ConfigureAwait(false);
 124961360            }
 1361
 1361362            if (IsServer)
 701363            {
 701364                Debug.Assert(_isClosed);
 1365
 1366                // The server-side of the duplex connection is only shutdown once the client-side is shutdown. When
 1367                // using TCP, this ensures that the server TCP connection won't end-up in the TIME_WAIT state on the
 1368                // server-side.
 1369
 1370                // DisposeAsync waits for the reads frames task to complete before disposing the writer.
 1371                lock (_mutex)
 701372                {
 701373                    _duplexConnectionWriter.Shutdown();
 1374
 1375                    // Make sure that CloseAsync doesn't call Write on the writer if it's called shortly after the peer
 1376                    // shutdown its side of the connection (which triggers ReadFrameHeaderAsync to return null).
 701377                    _writerIsShutdown = true;
 701378                }
 1379
 701380                await _duplexConnectionWriter.WriterTask.WaitAsync(cancellationToken).ConfigureAwait(false);
 681381            }
 1341382        }
 2451383        catch (OperationCanceledException)
 2451384        {
 1385            // Expected, DisposeAsync was called.
 2451386        }
 2281387        catch (IceRpcException exception)
 2281388        {
 2281389            TryClose(exception, "The connection was lost.", IceRpcError.ConnectionAborted);
 2281390            throw;
 1391        }
 191392        catch (InvalidDataException exception)
 191393        {
 191394            var rpcException = new IceRpcException(
 191395                IceRpcError.IceRpcError,
 191396                "The connection was aborted by a Slic protocol error.",
 191397                exception);
 191398            TryClose(rpcException, rpcException.Message, IceRpcError.IceRpcError);
 191399            throw rpcException;
 1400        }
 01401        catch (Exception exception)
 01402        {
 01403            Debug.Fail($"The read frames task completed due to an unhandled exception: {exception}");
 01404            TryClose(exception, "The connection was lost.", IceRpcError.ConnectionAborted);
 01405            throw;
 1406        }
 3791407    }
 1408
 1409    private async Task ReadStreamDataFrameAsync(
 1410        FrameType type,
 1411        int size,
 1412        ulong streamId,
 1413        CancellationToken cancellationToken)
 96221414    {
 96221415        bool endStream = type == FrameType.StreamLast;
 96221416        bool isRemote = streamId % 2 == (IsServer ? 0ul : 1ul);
 96221417        bool isBidirectional = streamId % 4 < 2;
 1418
 96221419        if (!isBidirectional && !isRemote)
 01420        {
 01421            throw new InvalidDataException(
 01422                "Received unexpected stream frame on local unidirectional stream.");
 1423        }
 96221424        else if (size == 0 && !endStream)
 11425        {
 11426            throw new InvalidDataException($"Received invalid {nameof(FrameType.Stream)} frame.");
 1427        }
 1428
 96211429        if (!_streams.TryGetValue(streamId, out SlicStream? stream) && isRemote && IsUnknownStream(streamId))
 20501430        {
 1431            // Create a new remote stream.
 1432
 20501433            if (size == 0)
 01434            {
 01435                throw new InvalidDataException("Received empty stream frame on new stream.");
 1436            }
 1437
 20501438            if (isBidirectional)
 6561439            {
 6561440                if (streamId > _lastRemoteBidirectionalStreamId + 4)
 01441                {
 01442                    throw new InvalidDataException("Invalid stream ID.");
 1443                }
 1444
 6561445                if (_bidirectionalStreamCount == _maxBidirectionalStreams)
 01446                {
 01447                    throw new IceRpcException(
 01448                        IceRpcError.IceRpcError,
 01449                        $"The maximum bidirectional stream count {_maxBidirectionalStreams} was reached.");
 1450                }
 6561451                Interlocked.Increment(ref _bidirectionalStreamCount);
 6561452            }
 1453            else
 13941454            {
 13941455                if (streamId > _lastRemoteUnidirectionalStreamId + 4)
 01456                {
 01457                    throw new InvalidDataException("Invalid stream ID.");
 1458                }
 1459
 13941460                if (_unidirectionalStreamCount == _maxUnidirectionalStreams)
 01461                {
 01462                    throw new IceRpcException(
 01463                        IceRpcError.IceRpcError,
 01464                        $"The maximum unidirectional stream count {_maxUnidirectionalStreams} was reached");
 1465                }
 13941466                Interlocked.Increment(ref _unidirectionalStreamCount);
 13941467            }
 1468
 1469            // The stream is registered with the connection and queued on the channel. The caller of AcceptStreamAsync
 1470            // is responsible for cleaning up the stream.
 20501471            stream = new SlicStream(this, isBidirectional, isRemote: true);
 1472
 1473            try
 20501474            {
 20501475                AddStream(streamId, stream);
 1476
 1477                try
 20491478                {
 20491479                    await _acceptStreamChannel.Writer.WriteAsync(
 20491480                        stream,
 20491481                        cancellationToken).ConfigureAwait(false);
 20491482                }
 01483                catch (ChannelClosedException exception)
 01484                {
 1485                    // The exception given to ChannelWriter.Complete(Exception? exception) is the InnerException.
 01486                    Debug.Assert(exception.InnerException is not null);
 01487                    throw ExceptionUtil.Throw(exception.InnerException);
 1488                }
 20491489            }
 11490            catch (IceRpcException)
 11491            {
 1492                // The two methods above throw IceRpcException if the connection has been closed (either by CloseAsync
 1493                // or because the close frame was received). We cleanup up the stream but don't throw to not abort the
 1494                // reading. The connection graceful closure still needs to read on the connection to figure out when the
 1495                // peer shuts down the duplex connection.
 11496                Debug.Assert(_isClosed);
 11497                stream.Input.Complete();
 11498                if (isBidirectional)
 01499                {
 01500                    stream.Output.Complete();
 01501                }
 11502            }
 20501503        }
 1504
 96211505        bool isDataConsumed = false;
 96211506        if (stream is not null)
 95471507        {
 1508            // Let the stream consume the stream frame data.
 95471509            isDataConsumed = await stream.ReceivedDataFrameAsync(
 95471510                size,
 95471511                endStream,
 95471512                cancellationToken).ConfigureAwait(false);
 95471513        }
 1514
 96211515        if (!isDataConsumed)
 851516        {
 1517            // The stream (if any) didn't consume the data. Read and ignore the data using a helper pipe.
 851518            var pipe = new Pipe(
 851519                new PipeOptions(
 851520                    pool: Pool,
 851521                    pauseWriterThreshold: 0,
 851522                    minimumSegmentSize: MinSegmentSize,
 851523                    useSynchronizationContext: false));
 1524
 851525            await _duplexConnectionReader.FillBufferWriterAsync(
 851526                    pipe.Writer,
 851527                    size,
 851528                    cancellationToken).ConfigureAwait(false);
 1529
 831530            pipe.Writer.Complete();
 831531            pipe.Reader.Complete();
 831532        }
 96191533    }
 1534
 1535    private bool TryClose(Exception exception, string closeMessage, IceRpcError? peerCloseError = null)
 11361536    {
 1537        lock (_mutex)
 11361538        {
 11361539            if (_isClosed)
 4371540            {
 4371541                return false;
 1542            }
 6991543            _isClosed = true;
 6991544            _closedMessage = closeMessage;
 6991545            _peerCloseError = peerCloseError;
 6991546            if (_streamSemaphoreWaitCount == 0)
 6921547            {
 6921548                _streamSemaphoreWaitClosed.SetResult();
 6921549            }
 6991550        }
 1551
 1552        // Cancel pending CreateStreamAsync, AcceptStreamAsync and WriteStreamDataFrameAsync operations.
 6991553        _closedCts.Cancel();
 6991554        _acceptStreamChannel.Writer.TryComplete(exception);
 1555
 1556        // Close streams.
 34431557        foreach (SlicStream stream in _streams.Values)
 6731558        {
 6731559            stream.Close(exception);
 6731560        }
 1561
 6991562        return true;
 11361563    }
 1564
 1565    private void WriteFrame(FrameType frameType, ulong? streamId, EncodeAction? encode)
 41751566    {
 41751567        var encoder = new SliceEncoder(_duplexConnectionWriter, SliceEncoding.Slice2);
 41751568        encoder.EncodeFrameType(frameType);
 41751569        Span<byte> sizePlaceholder = encoder.GetPlaceholderSpan(4);
 41751570        int startPos = encoder.EncodedByteCount;
 41751571        if (streamId is not null)
 34091572        {
 34091573            encoder.EncodeVarUInt62(streamId.Value);
 34091574        }
 41751575        encode?.Invoke(ref encoder);
 41751576        SliceEncoder.EncodeVarUInt62((ulong)(encoder.EncodedByteCount - startPos), sizePlaceholder);
 41751577    }
 1578}

Methods/Properties

get_IsServer()
get_MinSegmentSize()
get_PeerInitialStreamWindowSize()
get_PeerMaxStreamFrameSize()
get_Pool()
get_InitialStreamWindowSize()
get_StreamWindowUpdateThreshold()
.ctor(IceRpc.Transports.IDuplexConnection,IceRpc.Transports.MultiplexedConnectionOptions,IceRpc.Transports.Slic.SlicTransportOptions,System.Boolean)
AcceptStreamAsync()
ConnectAsync(System.Threading.CancellationToken)
PerformConnectAsync()
DecodeInitialize()
DecodeInitializeAckOrVersion()
ReadFrameAsync()
CloseAsync()
CreateStreamAsync()
DisposeAsync()
PerformDisposeAsync()
SendPing()
SendReadPing()
SendWritePing()
FillBufferWriterAsync(System.Buffers.IBufferWriter`1<System.Byte>,System.Int32,System.Threading.CancellationToken)
ReleaseStream(IceRpc.Transports.Slic.Internal.SlicStream)
ThrowIfClosed()
WriteConnectionFrame(IceRpc.Transports.Slic.Internal.FrameType,ZeroC.Slice.EncodeAction)
WriteStreamFrame(IceRpc.Transports.Slic.Internal.SlicStream,IceRpc.Transports.Slic.Internal.FrameType,ZeroC.Slice.EncodeAction,System.Boolean)
WriteStreamDataFrameAsync()
EncodeStreamFrameHeader()
AddStream(System.UInt64,IceRpc.Transports.Slic.Internal.SlicStream)
DecodeParameters(System.Collections.Generic.IDictionary`2<IceRpc.Transports.Slic.Internal.ParameterKey,System.Collections.Generic.IList`1<System.Byte>>)
DecodeParamValue()
EncodeParameters()
EncodeParameter()
IsUnknownStream(System.UInt64)
ReadFrameAsync(IceRpc.Transports.Slic.Internal.FrameType,System.Int32,System.Nullable`1<System.UInt64>,System.Threading.CancellationToken)
ReadCloseFrameAsync()
ReadPingFrameAndWritePongFrameAsync()
ReadPongFrameAsync()
ReadStreamWindowUpdateFrameAsync()
ReadFrameBodyAsync()
ReadFrameHeaderAsync()
TryDecodeHeader()
ReadFramesAsync()
ReadStreamDataFrameAsync()
TryClose(System.Exception,System.String,System.Nullable`1<IceRpc.IceRpcError>)
WriteFrame(IceRpc.Transports.Slic.Internal.FrameType,System.Nullable`1<System.UInt64>,ZeroC.Slice.EncodeAction)