< Summary

Information
Class: IceRpc.Internal.IceProtocolConnection
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Internal/IceProtocolConnection.cs
Tag: 1321_24790053727
Line coverage
89%
Covered lines: 836
Uncovered lines: 101
Coverable lines: 937
Total lines: 1578
Line coverage: 89.2%
Branch coverage
84%
Covered branches: 204
Total branches: 242
Branch coverage: 84.2%
Method coverage
100%
Covered methods: 37
Fully covered methods: 18
Total methods: 37
Method coverage: 100%
Full method coverage: 48.6%

Metrics

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Internal/IceProtocolConnection.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Ice.Codec;
 4using IceRpc.Transports;
 5using IceRpc.Transports.Internal;
 6using System.Buffers;
 7using System.Collections.Immutable;
 8using System.Diagnostics;
 9using System.IO.Pipelines;
 10using System.Security.Authentication;
 11
 12namespace IceRpc.Internal;
 13
 14/// <summary>Implements <see cref="IProtocolConnection" /> for the ice protocol.</summary>
 15internal sealed class IceProtocolConnection : IProtocolConnection
 16{
 117    private static readonly IDictionary<RequestFieldKey, ReadOnlySequence<byte>> _idempotentFields =
 118        new Dictionary<RequestFieldKey, ReadOnlySequence<byte>>
 119        {
 120            [RequestFieldKey.Idempotent] = default
 121        }.ToImmutableDictionary();
 22
 20623    private bool IsServer => _transportConnectionInformation is not null;
 24
 25    private IConnectionContext? _connectionContext; // non-null once the connection is established
 26    private Task? _connectTask;
 27    private readonly IDispatcher _dispatcher;
 28
 29    // The number of outstanding dispatches and invocations.
 30    private int _dispatchInvocationCount;
 31
 32    // We don't want the continuation to run from the dispatch or invocation thread.
 21733    private readonly TaskCompletionSource _dispatchesAndInvocationsCompleted =
 21734        new(TaskCreationOptions.RunContinuationsAsynchronously);
 35
 36    private readonly SemaphoreSlim? _dispatchSemaphore;
 37
 38    // This cancellation token source is canceled when the connection is disposed.
 21739    private readonly CancellationTokenSource _disposedCts = new();
 40
 41    private Task? _disposeTask;
 42    private readonly IDuplexConnection _duplexConnection;
 43    private readonly DuplexConnectionReader _duplexConnectionReader;
 44    private readonly IceDuplexConnectionWriter _duplexConnectionWriter;
 21745    private bool _heartbeatEnabled = true;
 21746    private Task _heartbeatTask = Task.CompletedTask;
 47    private readonly TimeSpan _inactivityTimeout;
 48    private readonly Timer _inactivityTimeoutTimer;
 49    private string? _invocationRefusedMessage;
 50    private int _lastRequestId;
 51    private readonly int _maxFrameSize;
 21752    private readonly Lock _mutex = new();
 53    private readonly PipeOptions _pipeOptions;
 54    private Task? _readFramesTask;
 55
 56    // A connection refuses invocations when it's disposed, shut down, shutting down or merely "shutdown requested".
 57    private bool _refuseInvocations;
 58
 59    // Does ShutdownAsync send a close connection frame?
 21760    private bool _sendCloseConnectionFrame = true;
 61
 62    private Task? _shutdownTask;
 63
 64    // The thread that completes this TCS can run the continuations, and as a result its result must be set without
 65    // holding a lock on _mutex.
 21766    private readonly TaskCompletionSource _shutdownRequestedTcs = new();
 67
 68    // Only set for server connections.
 69    private readonly TransportConnectionInformation? _transportConnectionInformation;
 70
 71    private readonly CancellationTokenSource _twowayDispatchesCts;
 21772    private readonly Dictionary<int, TaskCompletionSource<PipeReader>> _twowayInvocations = new();
 73
 74    private Exception? _writeException; // protected by _writeSemaphore
 21775    private readonly SemaphoreSlim _writeSemaphore = new(1, 1);
 76
 77    public Task<(TransportConnectionInformation ConnectionInformation, Task ShutdownRequested)> ConnectAsync(
 78        CancellationToken cancellationToken)
 21679    {
 80        Task<(TransportConnectionInformation ConnectionInformation, Task ShutdownRequested)> result;
 81        lock (_mutex)
 21682        {
 21683            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 84
 21485            if (_connectTask is not null)
 086            {
 087                throw new InvalidOperationException("Cannot call connect more than once.");
 88            }
 89
 21490            result = PerformConnectAsync();
 21491            _connectTask = result;
 21492        }
 21493        return result;
 94
 95        async Task<(TransportConnectionInformation ConnectionInformation, Task ShutdownRequested)> PerformConnectAsync()
 21496        {
 97            // Make sure we execute the function without holding the connection mutex lock.
 21498            await Task.Yield();
 99
 100            // _disposedCts is not disposed at this point because DisposeAsync waits for the completion of _connectTask.
 214101            using var connectCts = CancellationTokenSource.CreateLinkedTokenSource(
 214102                cancellationToken,
 214103                _disposedCts.Token);
 104
 105            TransportConnectionInformation transportConnectionInformation;
 106
 107            try
 214108            {
 109                // If the transport connection information is null, we need to connect the transport connection. It's
 110                // null for client connections. The transport connection of a server connection is established by
 111                // Server.
 214112                transportConnectionInformation = _transportConnectionInformation ??
 214113                    await _duplexConnection.ConnectAsync(connectCts.Token).ConfigureAwait(false);
 114
 206115                if (IsServer)
 100116                {
 117                    // Send ValidateConnection frame.
 100118                    await SendControlFrameAsync(EncodeValidateConnectionFrame, connectCts.Token).ConfigureAwait(false);
 119
 120                    // The SendControlFrameAsync is a "write" that schedules a heartbeat when the idle timeout is not
 121                    // infinite. So no need to call ScheduleHeartbeat.
 98122                }
 123                else
 106124                {
 106125                    ReadOnlySequence<byte> buffer = await _duplexConnectionReader.ReadAtLeastAsync(
 106126                        IceDefinitions.PrologueSize,
 106127                        connectCts.Token).ConfigureAwait(false);
 128
 96129                    (IcePrologue validateConnectionFrame, long consumed) = DecodeValidateConnectionFrame(buffer);
 96130                    _duplexConnectionReader.AdvanceTo(buffer.GetPosition(consumed), buffer.End);
 131
 96132                    IceDefinitions.CheckPrologue(validateConnectionFrame);
 95133                    if (validateConnectionFrame.FrameSize != IceDefinitions.PrologueSize)
 0134                    {
 0135                        throw new InvalidDataException(
 0136                            $"Received ice frame with only '{validateConnectionFrame.FrameSize}' bytes.");
 137                    }
 95138                    if (validateConnectionFrame.FrameType != IceFrameType.ValidateConnection)
 0139                    {
 0140                        throw new InvalidDataException(
 0141                            $"Expected '{nameof(IceFrameType.ValidateConnection)}' frame but received frame type '{valid
 142                    }
 143
 144                    // The client connection is now connected, so we schedule the first heartbeat.
 95145                    if (_duplexConnection is IceDuplexConnectionDecorator decorator)
 95146                    {
 95147                        decorator.ScheduleHeartbeat();
 95148                    }
 95149                }
 193150            }
 11151            catch (OperationCanceledException)
 11152            {
 11153                cancellationToken.ThrowIfCancellationRequested();
 154
 5155                Debug.Assert(_disposedCts.Token.IsCancellationRequested);
 5156                throw new IceRpcException(
 5157                    IceRpcError.OperationAborted,
 5158                    "The connection establishment was aborted because the connection was disposed.");
 159            }
 1160            catch (InvalidDataException exception)
 1161            {
 1162                throw new IceRpcException(
 1163                    IceRpcError.ConnectionAborted,
 1164                    "The connection was aborted by an ice protocol error.",
 1165                    exception);
 166            }
 1167            catch (AuthenticationException)
 1168            {
 1169                throw;
 170            }
 8171            catch (IceRpcException)
 8172            {
 8173                throw;
 174            }
 0175            catch (Exception exception)
 0176            {
 0177                Debug.Fail($"ConnectAsync failed with an unexpected exception: {exception}");
 0178                throw;
 179            }
 180
 181            // We assign _readFramesTask with _mutex locked to make sure this assignment occurs before the start of
 182            // DisposeAsync. Once _disposeTask is not null, _readFramesTask is immutable.
 183            lock (_mutex)
 193184            {
 193185                if (_disposeTask is not null)
 0186                {
 0187                    throw new IceRpcException(
 0188                        IceRpcError.OperationAborted,
 0189                        "The connection establishment was aborted because the connection was disposed.");
 190                }
 191
 192                // This needs to be set before starting the read frames task below.
 193193                _connectionContext = new ConnectionContext(this, transportConnectionInformation);
 194
 193195                _readFramesTask = ReadFramesAsync(_disposedCts.Token);
 193196            }
 197
 198            // The _readFramesTask waits for this PerformConnectAsync completion before reading anything. As soon as
 199            // it receives a request, it will cancel this inactivity check.
 193200            ScheduleInactivityCheck();
 201
 193202            return (transportConnectionInformation, _shutdownRequestedTcs.Task);
 203
 204            static void EncodeValidateConnectionFrame(IBufferWriter<byte> writer)
 100205            {
 100206                var encoder = new IceEncoder(writer);
 100207                IceDefinitions.ValidateConnectionFrame.Encode(ref encoder);
 100208            }
 209
 210            static (IcePrologue, long) DecodeValidateConnectionFrame(ReadOnlySequence<byte> buffer)
 96211            {
 96212                var decoder = new IceDecoder(buffer);
 96213                return (new IcePrologue(ref decoder), decoder.Consumed);
 96214            }
 193215        }
 214216    }
 217
 218    public ValueTask DisposeAsync()
 237219    {
 220        lock (_mutex)
 237221        {
 237222            if (_disposeTask is null)
 217223            {
 217224                RefuseNewInvocations("The connection was disposed.");
 225
 217226                _shutdownTask ??= Task.CompletedTask;
 217227                if (_dispatchInvocationCount == 0)
 208228                {
 208229                    _dispatchesAndInvocationsCompleted.TrySetResult();
 208230                }
 231
 217232                _heartbeatEnabled = false; // makes _heartbeatTask immutable
 233
 217234                _disposeTask = PerformDisposeAsync();
 217235            }
 237236        }
 237237        return new(_disposeTask);
 238
 239        async Task PerformDisposeAsync()
 217240        {
 241            // Make sure we execute the code below without holding the mutex lock.
 217242            await Task.Yield();
 243
 217244            _disposedCts.Cancel();
 245
 246            // We don't lock _mutex since once _disposeTask is not null, _connectTask etc are immutable.
 247
 217248            if (_connectTask is not null)
 214249            {
 250                // Wait for all writes to complete. This can't take forever since all writes are canceled by
 251                // _disposedCts.Token.
 214252                await _writeSemaphore.WaitAsync().ConfigureAwait(false);
 253
 254                try
 214255                {
 214256                    await Task.WhenAll(
 214257                        _connectTask,
 214258                        _readFramesTask ?? Task.CompletedTask,
 214259                        _heartbeatTask,
 214260                        _dispatchesAndInvocationsCompleted.Task,
 214261                        _shutdownTask).ConfigureAwait(false);
 160262                }
 54263                catch
 54264                {
 265                    // Expected if any of these tasks failed or was canceled. Each task takes care of handling
 266                    // unexpected exceptions so there's no need to handle them here.
 54267                }
 214268            }
 269
 217270            _duplexConnection.Dispose();
 271
 272            // It's safe to dispose the reader/writer since no more threads are sending/receiving data.
 217273            _duplexConnectionReader.Dispose();
 217274            _duplexConnectionWriter.Dispose();
 275
 217276            _disposedCts.Dispose();
 217277            _twowayDispatchesCts.Dispose();
 278
 217279            _dispatchSemaphore?.Dispose();
 217280            _writeSemaphore.Dispose();
 217281            await _inactivityTimeoutTimer.DisposeAsync().ConfigureAwait(false);
 217282        }
 237283    }
 284
 285    public Task<IncomingResponse> InvokeAsync(OutgoingRequest request, CancellationToken cancellationToken = default)
 1394286    {
 1394287        if (request.Protocol != Protocol.Ice)
 1288        {
 1289            throw new InvalidOperationException(
 1290                $"Cannot send {request.Protocol} request on {Protocol.Ice} connection.");
 291        }
 292
 293        lock (_mutex)
 1393294        {
 1393295            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 296
 1392297            if (_refuseInvocations)
 1298            {
 1299                throw new IceRpcException(IceRpcError.InvocationRefused, _invocationRefusedMessage);
 300            }
 1391301            if (_connectTask is null || !_connectTask.IsCompletedSuccessfully)
 0302            {
 0303                throw new InvalidOperationException("Cannot invoke on a connection that is not fully established.");
 304            }
 305
 1391306            IncrementDispatchInvocationCount();
 1391307        }
 308
 1391309        return PerformInvokeAsync();
 310
 311        async Task<IncomingResponse> PerformInvokeAsync()
 1391312        {
 313            // Since _dispatchInvocationCount > 0, _disposedCts is not disposed.
 1391314            using var invocationCts =
 1391315                CancellationTokenSource.CreateLinkedTokenSource(_disposedCts.Token, cancellationToken);
 316
 1391317            PipeReader? frameReader = null;
 1391318            TaskCompletionSource<PipeReader>? responseCompletionSource = null;
 1391319            int requestId = 0;
 320
 321            try
 1391322            {
 323                // Read the full payload. This can take some time so this needs to be done before acquiring the write
 324                // semaphore.
 1391325                ReadOnlySequence<byte> payloadBuffer = await ReadFullPayloadAsync(request.Payload, invocationCts.Token)
 1391326                    .ConfigureAwait(false);
 327
 328                try
 1391329                {
 330                    // Wait for the writing of other frames to complete.
 1391331                    using SemaphoreLock _ = await AcquireWriteLockAsync(invocationCts.Token).ConfigureAwait(false);
 332
 333                    // Assign the request ID for two-way invocations and keep track of the invocation for receiving the
 334                    // response. The request ID is only assigned once the write semaphore is acquired. We don't want a
 335                    // canceled request to allocate a request ID that won't be used.
 336                    lock (_mutex)
 1391337                    {
 1391338                        if (_refuseInvocations)
 0339                        {
 340                            // It's InvocationCanceled and not InvocationRefused because we've read the payload.
 0341                            throw new IceRpcException(IceRpcError.InvocationCanceled, _invocationRefusedMessage);
 342                        }
 343
 1391344                        if (!request.IsOneway)
 386345                        {
 346                            // wrap around back to 1 if we reach int.MaxValue. 0 means one-way.
 386347                            _lastRequestId = _lastRequestId == int.MaxValue ? 1 : _lastRequestId + 1;
 386348                            requestId = _lastRequestId;
 349
 350                            // RunContinuationsAsynchronously because we don't want the "read frames loop" to run the
 351                            // continuation.
 386352                            responseCompletionSource = new(TaskCreationOptions.RunContinuationsAsynchronously);
 386353                            _twowayInvocations[requestId] = responseCompletionSource;
 386354                        }
 1391355                    }
 356
 1391357                    int payloadSize = checked((int)payloadBuffer.Length);
 358
 359                    try
 1391360                    {
 1391361                        EncodeRequestHeader(_duplexConnectionWriter, request, requestId, payloadSize);
 362
 363                        // We write to the duplex connection with _disposedCts.Token instead of invocationCts.Token.
 364                        // Canceling this write operation is fatal to the connection.
 1391365                        await _duplexConnectionWriter.WriteAsync(payloadBuffer, _disposedCts.Token)
 1391366                            .ConfigureAwait(false);
 1390367                    }
 1368                    catch (Exception exception)
 1369                    {
 1370                        WriteFailed(exception);
 1371                        throw;
 372                    }
 1390373                }
 1374                catch (IceRpcException exception) when (exception.IceRpcError != IceRpcError.InvocationCanceled)
 1375                {
 376                    // Since we could not send the request, the server cannot dispatch it and it's safe to retry.
 377                    // This includes the situation where await AcquireWriteLockAsync throws because a previous write
 378                    // failed.
 1379                    throw new IceRpcException(
 1380                        IceRpcError.InvocationCanceled,
 1381                        "Failed to send ice request.",
 1382                        exception);
 383                }
 384                finally
 1391385                {
 386                    // We've read the payload (see ReadFullPayloadAsync) and we are now done with it.
 1391387                    request.Payload.Complete();
 1391388                }
 389
 1390390                if (request.IsOneway)
 1005391                {
 392                    // We're done, there's no response for one-way requests.
 1005393                    return new IncomingResponse(request, _connectionContext!);
 394                }
 395
 396                // Wait to receive the response.
 385397                Debug.Assert(responseCompletionSource is not null);
 385398                frameReader = await responseCompletionSource.Task.WaitAsync(invocationCts.Token).ConfigureAwait(false);
 399
 366400                if (!frameReader.TryRead(out ReadResult readResult))
 0401                {
 0402                    throw new InvalidDataException($"Received empty response frame for request with id '{requestId}'.");
 403                }
 404
 366405                Debug.Assert(readResult.IsCompleted);
 406
 366407                (StatusCode statusCode, string? errorMessage, SequencePosition consumed) =
 366408                    DecodeResponseHeader(readResult.Buffer, requestId);
 409
 366410                frameReader.AdvanceTo(consumed);
 411
 366412                var response = new IncomingResponse(
 366413                    request,
 366414                    _connectionContext!,
 366415                    statusCode,
 366416                    errorMessage)
 366417                {
 366418                    Payload = frameReader
 366419                };
 420
 366421                frameReader = null; // response now owns frameReader
 366422                return response;
 423            }
 9424            catch (OperationCanceledException)
 9425            {
 9426                cancellationToken.ThrowIfCancellationRequested();
 427
 3428                Debug.Assert(_disposedCts.Token.IsCancellationRequested);
 3429                throw new IceRpcException(
 3430                    IceRpcError.OperationAborted,
 3431                    "The invocation was aborted because the connection was disposed.");
 432            }
 433            finally
 1391434            {
 435                // If responseCompletionSource is not completed, we want to complete it to prevent another method from
 436                // setting an unobservable exception in it. And if it's already completed with an exception, we observe
 437                // this exception.
 1391438                if (responseCompletionSource is not null &&
 1391439                    !responseCompletionSource.TrySetResult(InvalidPipeReader.Instance))
 376440                {
 441                    try
 376442                    {
 376443                        _ = await responseCompletionSource.Task.ConfigureAwait(false);
 366444                    }
 10445                    catch
 10446                    {
 447                        // observe exception, if any
 10448                    }
 376449                }
 450
 451                lock (_mutex)
 1391452                {
 453                    // Unregister the two-way invocation if registered.
 1391454                    if (requestId > 0 && !_refuseInvocations)
 368455                    {
 368456                        _twowayInvocations.Remove(requestId);
 368457                    }
 458
 1391459                    DecrementDispatchInvocationCount();
 1391460                }
 461
 1391462                frameReader?.Complete();
 1391463            }
 0464        }
 2762465    }
 466
 467    public Task ShutdownAsync(CancellationToken cancellationToken = default)
 66468    {
 469        lock (_mutex)
 66470        {
 66471            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 472
 65473            if (_shutdownTask is not null)
 0474            {
 0475                throw new InvalidOperationException("Cannot call ShutdownAsync more than once.");
 476            }
 65477            if (_connectTask is null || !_connectTask.IsCompletedSuccessfully)
 3478            {
 3479                throw new InvalidOperationException("Cannot shut down a protocol connection before it's connected.");
 480            }
 481
 62482            RefuseNewInvocations("The connection was shut down.");
 483
 62484            if (_dispatchInvocationCount == 0)
 48485            {
 48486                _dispatchesAndInvocationsCompleted.TrySetResult();
 48487            }
 62488            _shutdownTask = PerformShutdownAsync(_sendCloseConnectionFrame);
 62489        }
 490
 62491        return _shutdownTask;
 492
 493        async Task PerformShutdownAsync(bool sendCloseConnectionFrame)
 62494        {
 62495            await Task.Yield(); // exit mutex lock
 496
 497            try
 62498            {
 62499                Debug.Assert(_readFramesTask is not null);
 500
 501                // Since DisposeAsync waits for the _shutdownTask completion, _disposedCts is not disposed at this
 502                // point.
 62503                using var shutdownCts = CancellationTokenSource.CreateLinkedTokenSource(
 62504                    cancellationToken,
 62505                    _disposedCts.Token);
 506
 507                // Wait for dispatches and invocations to complete.
 62508                await _dispatchesAndInvocationsCompleted.Task.WaitAsync(shutdownCts.Token).ConfigureAwait(false);
 509
 510                // Stops sending heartbeats. We can't do earlier: while we're waiting for dispatches and invocations to
 511                // complete, we need to keep sending heartbeats otherwise the peer could see the connection as idle and
 512                // abort it.
 513                lock (_mutex)
 58514                {
 58515                    _heartbeatEnabled = false; // makes _heartbeatTask immutable
 58516                }
 517
 518                // Wait for the last send heartbeat to complete before sending the CloseConnection frame or disposing
 519                // the duplex connection. _heartbeatTask is immutable once _shutdownTask set. _heartbeatTask can be
 520                // canceled by DisposeAsync.
 58521                await _heartbeatTask.WaitAsync(shutdownCts.Token).ConfigureAwait(false);
 522
 58523                if (sendCloseConnectionFrame)
 27524                {
 525                    // Send CloseConnection frame.
 27526                    await SendControlFrameAsync(EncodeCloseConnectionFrame, shutdownCts.Token).ConfigureAwait(false);
 527
 528                    // Wait for the peer to abort the connection as an acknowledgment for this CloseConnection frame.
 529                    // The peer can also send us a CloseConnection frame if it started shutting down at the same time.
 530                    // We can't just return and dispose the duplex connection since the peer can still be reading frames
 531                    // (including the CloseConnection frame) and we don't want to abort this reading.
 25532                    await _readFramesTask.WaitAsync(shutdownCts.Token).ConfigureAwait(false);
 25533                }
 534                else
 31535                {
 536                    // _readFramesTask should be already completed or nearly completed.
 31537                    await _readFramesTask.WaitAsync(shutdownCts.Token).ConfigureAwait(false);
 538
 539                    // _readFramesTask succeeded means the peer is waiting for us to abort the duplex connection;
 540                    // we oblige.
 23541                    _duplexConnection.Dispose();
 23542                }
 48543            }
 5544            catch (OperationCanceledException)
 5545            {
 5546                cancellationToken.ThrowIfCancellationRequested();
 547
 2548                Debug.Assert(_disposedCts.Token.IsCancellationRequested);
 2549                throw new IceRpcException(
 2550                    IceRpcError.OperationAborted,
 2551                    "The connection shutdown was aborted because the connection was disposed.");
 552            }
 9553            catch (IceRpcException)
 9554            {
 9555                throw;
 556            }
 0557            catch (Exception exception)
 0558            {
 0559                Debug.Fail($"ShutdownAsync failed with an unexpected exception: {exception}");
 0560                throw;
 561            }
 562
 563            static void EncodeCloseConnectionFrame(IBufferWriter<byte> writer)
 26564            {
 26565                var encoder = new IceEncoder(writer);
 26566                IceDefinitions.CloseConnectionFrame.Encode(ref encoder);
 26567            }
 48568        }
 62569    }
 570
 217571    internal IceProtocolConnection(
 217572        IDuplexConnection duplexConnection,
 217573        TransportConnectionInformation? transportConnectionInformation,
 217574        ConnectionOptions options)
 217575    {
 217576        _twowayDispatchesCts = CancellationTokenSource.CreateLinkedTokenSource(_disposedCts.Token);
 577
 578        // With ice, we always listen for incoming frames (responses) so we need a dispatcher for incoming requests even
 579        // if we don't expect any. This dispatcher throws an ice ObjectNotExistException back to the client, which makes
 580        // more sense than throwing an UnknownException.
 217581        _dispatcher = options.Dispatcher ?? NotFoundDispatcher.Instance;
 582
 217583        _maxFrameSize = options.MaxIceFrameSize;
 217584        _transportConnectionInformation = transportConnectionInformation;
 585
 217586        if (options.MaxDispatches > 0)
 217587        {
 217588            _dispatchSemaphore = new SemaphoreSlim(
 217589                initialCount: options.MaxDispatches,
 217590                maxCount: options.MaxDispatches);
 217591        }
 592
 217593        _inactivityTimeout = options.InactivityTimeout;
 594
 595        // The readerScheduler doesn't matter (we don't call pipe.Reader.ReadAsync on the resulting pipe), and the
 596        // writerScheduler doesn't matter (pipe.Writer.FlushAsync never blocks).
 217597        _pipeOptions = new PipeOptions(
 217598            pool: options.Pool,
 217599            minimumSegmentSize: options.MinSegmentSize,
 217600            pauseWriterThreshold: 0,
 217601            useSynchronizationContext: false);
 602
 217603        if (options.IceIdleTimeout != Timeout.InfiniteTimeSpan)
 217604        {
 217605            duplexConnection = new IceDuplexConnectionDecorator(
 217606                duplexConnection,
 217607                readIdleTimeout: options.EnableIceIdleCheck ? options.IceIdleTimeout : Timeout.InfiniteTimeSpan,
 217608                writeIdleTimeout: options.IceIdleTimeout,
 217609                SendHeartbeat);
 217610        }
 611
 217612        _duplexConnection = duplexConnection;
 217613        _duplexConnectionReader = new DuplexConnectionReader(_duplexConnection, options.Pool, options.MinSegmentSize);
 217614        _duplexConnectionWriter =
 217615            new IceDuplexConnectionWriter(_duplexConnection, options.Pool, options.MinSegmentSize);
 616
 217617        _inactivityTimeoutTimer = new Timer(_ =>
 5618        {
 5619            bool requestShutdown = false;
 217620
 217621            lock (_mutex)
 5622            {
 5623                if (_dispatchInvocationCount == 0 && _shutdownTask is null)
 5624                {
 5625                    requestShutdown = true;
 5626                    RefuseNewInvocations(
 5627                        $"The connection was shut down because it was inactive for over {_inactivityTimeout.TotalSeconds
 5628                }
 5629            }
 217630
 5631            if (requestShutdown)
 5632            {
 217633                // TrySetResult must be called outside the mutex lock.
 5634                _shutdownRequestedTcs.TrySetResult();
 5635            }
 222636        });
 637
 638        void SendHeartbeat()
 12639        {
 640            lock (_mutex)
 12641            {
 12642                if (_heartbeatTask.IsCompletedSuccessfully && _heartbeatEnabled)
 12643                {
 12644                    _heartbeatTask = SendValidateConnectionFrameAsync(_disposedCts.Token);
 12645                }
 12646            }
 647
 648            async Task SendValidateConnectionFrameAsync(CancellationToken cancellationToken)
 12649            {
 650                // Make sure we execute the function without holding the connection mutex lock.
 12651                await Task.Yield();
 652
 653                try
 12654                {
 12655                    await SendControlFrameAsync(EncodeValidateConnectionFrame, cancellationToken).ConfigureAwait(false);
 12656                }
 0657                catch (OperationCanceledException)
 0658                {
 659                    // Canceled by DisposeAsync
 0660                    throw;
 661                }
 0662                catch (IceRpcException)
 0663                {
 664                    // Expected, typically the peer aborted the connection.
 0665                    throw;
 666                }
 0667                catch (Exception exception)
 0668                {
 0669                    Debug.Fail($"The heartbeat task completed due to an unhandled exception: {exception}");
 0670                    throw;
 671                }
 672
 673                static void EncodeValidateConnectionFrame(IBufferWriter<byte> writer)
 12674                {
 12675                    var encoder = new IceEncoder(writer);
 12676                    IceDefinitions.ValidateConnectionFrame.Encode(ref encoder);
 12677                }
 12678            }
 12679        }
 217680    }
 681
 682    private static (int RequestId, IceRequestHeader Header, PipeReader? ContextReader, int Consumed) DecodeRequestIdAndH
 683        ReadOnlySequence<byte> buffer)
 1389684    {
 1389685        var decoder = new IceDecoder(buffer);
 686
 1389687        int requestId = decoder.DecodeInt();
 688
 1389689        var requestHeader = new IceRequestHeader(ref decoder);
 1389690        requestHeader.Facet.CheckFacetCount();
 691
 1389692        Pipe? contextPipe = null;
 1389693        long pos = decoder.Consumed;
 1389694        int count = decoder.DecodeSize();
 1389695        if (count > 0)
 7696        {
 28697            for (int i = 0; i < count; ++i)
 7698            {
 7699                decoder.Skip(decoder.DecodeSize()); // Skip the key
 7700                decoder.Skip(decoder.DecodeSize()); // Skip the value
 7701            }
 7702            contextPipe = new Pipe();
 7703            contextPipe.Writer.Write(buffer.Slice(pos, decoder.Consumed - pos));
 7704            contextPipe.Writer.Complete();
 7705        }
 706
 1389707        var encapsulationHeader = new EncapsulationHeader(ref decoder);
 708
 1389709        if (encapsulationHeader.PayloadEncodingMajor != 1 ||
 1389710            encapsulationHeader.PayloadEncodingMinor != 1)
 0711        {
 0712            throw new InvalidDataException(
 0713                $"Unsupported payload encoding '{encapsulationHeader.PayloadEncodingMajor}.{encapsulationHeader.PayloadE
 714        }
 715
 1389716        int payloadSize = encapsulationHeader.EncapsulationSize - 6;
 1389717        if (payloadSize != (buffer.Length - decoder.Consumed))
 0718        {
 0719            throw new InvalidDataException(
 0720                $"Request payload size mismatch: expected {payloadSize} bytes, read {buffer.Length - decoder.Consumed} b
 721        }
 722
 1389723        return (requestId, requestHeader, contextPipe?.Reader, (int)decoder.Consumed);
 1389724    }
 725
 726    private static (StatusCode StatusCode, string? ErrorMessage, SequencePosition Consumed) DecodeResponseHeader(
 727        ReadOnlySequence<byte> buffer,
 728        int requestId)
 366729    {
 366730        var replyStatus = (ReplyStatus)buffer.FirstSpan[0];
 731
 366732        if (replyStatus <= ReplyStatus.UserException)
 333733        {
 734            const int headerSize = 7; // reply status byte + encapsulation header
 735
 736            // read and check encapsulation header (6 bytes long)
 737
 333738            if (buffer.Length < headerSize)
 0739            {
 0740                throw new InvalidDataException($"Received invalid frame header for request with id '{requestId}'.");
 741            }
 742
 333743            EncapsulationHeader encapsulationHeader =
 666744                buffer.Slice(1, 6).DecodeIceBuffer((ref IceDecoder decoder) => new EncapsulationHeader(ref decoder));
 745
 746            // Sanity check
 333747            int payloadSize = encapsulationHeader.EncapsulationSize - 6;
 333748            if (payloadSize != buffer.Length - headerSize)
 0749            {
 0750                throw new InvalidDataException(
 0751                    $"Response payload size/frame size mismatch: payload size is {payloadSize} bytes but frame has {buff
 752            }
 753
 333754            SequencePosition consumed = buffer.GetPosition(headerSize);
 755
 333756            return replyStatus == ReplyStatus.Ok ? (StatusCode.Ok, null, consumed) :
 333757                // Set the error message to the empty string, because null is not allowed for status code > Ok.
 333758                (StatusCode.ApplicationError, "", consumed);
 759        }
 760        else
 33761        {
 762            // An ice system exception.
 763
 33764            StatusCode statusCode = replyStatus switch
 33765            {
 14766                ReplyStatus.ObjectNotExist => StatusCode.NotFound,
 0767                ReplyStatus.FacetNotExist => StatusCode.NotFound,
 2768                ReplyStatus.OperationNotExist => StatusCode.NotImplemented,
 2769                ReplyStatus.InvalidData => StatusCode.InvalidData,
 1770                ReplyStatus.Unauthorized => StatusCode.Unauthorized,
 14771                _ => StatusCode.InternalError
 33772            };
 773
 33774            var decoder = new IceDecoder(buffer.Slice(1));
 775
 776            string message;
 33777            switch (replyStatus)
 778            {
 779                case ReplyStatus.FacetNotExist:
 780                case ReplyStatus.ObjectNotExist:
 781                case ReplyStatus.OperationNotExist:
 782
 16783                    var requestFailed = new RequestFailedExceptionData(ref decoder);
 784
 16785                    string target = requestFailed.Facet.Count > 0 ?
 16786                        $"{requestFailed.Identity.ToPath()}#{requestFailed.Facet.ToFragment()}" : requestFailed.Identity
 787
 16788                    message =
 16789                        $"The dispatch failed with status code {statusCode} while dispatching '{requestFailed.Operation}
 16790                    break;
 791                default:
 17792                    message = decoder.DecodeString();
 17793                    break;
 794            }
 33795            decoder.CheckEndOfBuffer();
 33796            return (statusCode, message, buffer.End);
 797        }
 366798    }
 799
 800    private static void EncodeRequestHeader(
 801        IceDuplexConnectionWriter output,
 802        OutgoingRequest request,
 803        int requestId,
 804        int payloadSize)
 1391805    {
 1391806        var encoder = new IceEncoder(output);
 807
 808        // Write the request header.
 1391809        encoder.WriteByteSpan(IceDefinitions.FramePrologue);
 1391810        encoder.EncodeIceFrameType(IceFrameType.Request);
 1391811        encoder.EncodeByte(0); // compression status
 812
 1391813        Span<byte> sizePlaceholder = encoder.GetPlaceholderSpan(4);
 814
 1391815        encoder.EncodeInt(requestId);
 816
 1391817        byte encodingMajor = 1;
 1391818        byte encodingMinor = 1;
 819
 820        // Request header.
 1391821        var requestHeader = new IceRequestHeader(
 1391822            IceIdentity.Parse(request.ServiceAddress.Path),
 1391823            request.ServiceAddress.Fragment.ToFacet(),
 1391824            request.Operation,
 1391825            request.Fields.ContainsKey(RequestFieldKey.Idempotent) ? OperationMode.Idempotent : OperationMode.Normal);
 1391826        requestHeader.Encode(ref encoder);
 1391827        int directWriteSize = 0;
 1391828        if (request.Fields.TryGetValue(RequestFieldKey.Context, out OutgoingFieldValue requestField))
 7829        {
 7830            if (requestField.WriteAction is Action<IBufferWriter<byte>> writeAction)
 7831            {
 832                // This writes directly to the underlying output; we measure how many bytes are written.
 7833                long start = output.UnflushedBytes;
 7834                writeAction(output);
 7835                directWriteSize = (int)(output.UnflushedBytes - start);
 7836            }
 837            else
 0838            {
 0839                encoder.WriteByteSequence(requestField.ByteSequence);
 0840            }
 7841        }
 842        else
 1384843        {
 1384844            encoder.EncodeSize(0);
 1384845        }
 846
 847        // We ignore all other fields. They can't be sent over ice.
 848
 1391849        new EncapsulationHeader(
 1391850            encapsulationSize: payloadSize + 6,
 1391851            encodingMajor,
 1391852            encodingMinor).Encode(ref encoder);
 853
 1391854        int frameSize = checked(encoder.EncodedByteCount + directWriteSize + payloadSize);
 1391855        IceEncoder.EncodeInt(frameSize, sizePlaceholder);
 1391856    }
 857
 858    private static void EncodeResponseHeader(
 859        IBufferWriter<byte> writer,
 860        OutgoingResponse response,
 861        IncomingRequest request,
 862        int requestId,
 863        int payloadSize)
 1376864    {
 1376865        var encoder = new IceEncoder(writer);
 866
 867        // Write the response header.
 868
 1376869        encoder.WriteByteSpan(IceDefinitions.FramePrologue);
 1376870        encoder.EncodeIceFrameType(IceFrameType.Reply);
 1376871        encoder.EncodeByte(0); // compression status
 1376872        Span<byte> sizePlaceholder = encoder.GetPlaceholderSpan(4);
 873
 1376874        encoder.EncodeInt(requestId);
 875
 1376876        if (response.StatusCode > StatusCode.ApplicationError ||
 1376877            (response.StatusCode == StatusCode.ApplicationError && payloadSize == 0))
 35878        {
 879            // system exception
 35880            switch (response.StatusCode)
 881            {
 882                case StatusCode.NotFound:
 883                case StatusCode.NotImplemented:
 18884                    encoder.EncodeReplyStatus(response.StatusCode == StatusCode.NotFound ?
 18885                        ReplyStatus.ObjectNotExist : ReplyStatus.OperationNotExist);
 886
 18887                    new RequestFailedExceptionData(
 18888                        IceIdentity.Parse(request.Path),
 18889                        request.Fragment.ToFacet(),
 18890                        request.Operation).Encode(ref encoder);
 18891                    break;
 892                case StatusCode.InternalError:
 8893                    encoder.EncodeReplyStatus(ReplyStatus.UnknownException);
 8894                    encoder.EncodeString(response.ErrorMessage!);
 8895                    break;
 896                case StatusCode.InvalidData:
 2897                    encoder.EncodeReplyStatus(ReplyStatus.InvalidData);
 2898                    encoder.EncodeString(response.ErrorMessage!);
 2899                    break;
 900                case StatusCode.Unauthorized:
 1901                    encoder.EncodeReplyStatus(ReplyStatus.Unauthorized);
 1902                    encoder.EncodeString(response.ErrorMessage!);
 1903                    break;
 904                default:
 6905                    encoder.EncodeReplyStatus(ReplyStatus.UnknownException);
 6906                    encoder.EncodeString(
 6907                        $"{response.ErrorMessage} {{ Original StatusCode = {response.StatusCode} }}");
 6908                    break;
 909            }
 35910        }
 911        else
 1341912        {
 1341913            encoder.EncodeReplyStatus((ReplyStatus)response.StatusCode);
 914
 915            // When IceRPC receives a response, it ignores the response encoding. So this "1.1" is only relevant to
 916            // a ZeroC Ice client that decodes the response. The only Slice encoding such a client can possibly use
 917            // to decode the response payload is 1.1 or 1.0, and we don't care about interop with 1.0.
 1341918            var encapsulationHeader = new EncapsulationHeader(
 1341919                encapsulationSize: payloadSize + 6,
 1341920                payloadEncodingMajor: 1,
 1341921                payloadEncodingMinor: 1);
 1341922            encapsulationHeader.Encode(ref encoder);
 1341923        }
 924
 1376925        int frameSize = encoder.EncodedByteCount + payloadSize;
 1376926        IceEncoder.EncodeInt(frameSize, sizePlaceholder);
 1376927    }
 928
 929    /// <summary>Reads the full Ice payload from the given pipe reader.</summary>
 930    private static async ValueTask<ReadOnlySequence<byte>> ReadFullPayloadAsync(
 931        PipeReader payload,
 932        CancellationToken cancellationToken)
 2737933    {
 934        // We use ReadAtLeastAsync instead of ReadAsync to bypass the PauseWriterThreshold when the payload is
 935        // backed by a Pipe.
 2737936        ReadResult readResult = await payload.ReadAtLeastAsync(int.MaxValue, cancellationToken).ConfigureAwait(false);
 937
 2734938        if (readResult.IsCanceled)
 0939        {
 0940            throw new InvalidOperationException("Unexpected call to CancelPendingRead on ice payload.");
 941        }
 942
 2734943        return readResult.IsCompleted ? readResult.Buffer :
 2734944            throw new ArgumentException("The payload size is greater than int.MaxValue.", nameof(payload));
 2734945    }
 946
 947    /// <summary>Acquires exclusive access to _duplexConnectionWriter.</summary>
 948    /// <returns>A <see cref="SemaphoreLock" /> that releases the acquired semaphore in its Dispose method.</returns>
 949    private async ValueTask<SemaphoreLock> AcquireWriteLockAsync(CancellationToken cancellationToken)
 2906950    {
 2906951        SemaphoreLock semaphoreLock = await _writeSemaphore.AcquireAsync(cancellationToken).ConfigureAwait(false);
 952
 953        // _writeException is protected by _writeSemaphore
 2906954        if (_writeException is not null)
 1955        {
 1956            semaphoreLock.Dispose();
 957
 1958            throw new IceRpcException(
 1959                IceRpcError.ConnectionAborted,
 1960                "The connection was aborted because a previous write operation failed.",
 1961                _writeException);
 962        }
 963
 2905964        return semaphoreLock;
 2905965    }
 966
 967    /// <summary>Creates a pipe reader to simplify the reading of a request or response frame. The frame is read fully
 968    /// and buffered into an internal pipe.</summary>
 969    private async ValueTask<PipeReader> CreateFrameReaderAsync(int size, CancellationToken cancellationToken)
 2760970    {
 2760971        var pipe = new Pipe(_pipeOptions);
 972
 973        try
 2760974        {
 2760975            await _duplexConnectionReader.FillBufferWriterAsync(pipe.Writer, size, cancellationToken)
 2760976                .ConfigureAwait(false);
 2760977        }
 0978        catch
 0979        {
 0980            pipe.Reader.Complete();
 0981            throw;
 982        }
 983        finally
 2760984        {
 2760985            pipe.Writer.Complete();
 2760986        }
 987
 2760988        return pipe.Reader;
 2760989    }
 990
 991    private void DecrementDispatchInvocationCount()
 2778992    {
 993        lock (_mutex)
 2778994        {
 2778995            if (--_dispatchInvocationCount == 0)
 1231996            {
 1231997                if (_shutdownTask is not null)
 21998                {
 21999                    _dispatchesAndInvocationsCompleted.TrySetResult();
 211000                }
 1001                // We enable the inactivity check in order to complete ShutdownRequested when inactive for too long.
 1002                // _refuseInvocations is true when the connection is either about to be "shutdown requested", or shut
 1003                // down / disposed. We don't need to complete ShutdownRequested in any of these situations.
 12101004                else if (!_refuseInvocations)
 12001005                {
 12001006                    ScheduleInactivityCheck();
 12001007                }
 12311008            }
 27781009        }
 27781010    }
 1011
 1012    /// <summary>Dispatches an incoming request. This method executes in a task spawn from the read frames loop.
 1013    /// </summary>
 1014    private async Task DispatchRequestAsync(IncomingRequest request, int requestId, PipeReader? contextReader)
 13871015    {
 13871016        CancellationToken cancellationToken = request.IsOneway ? _disposedCts.Token : _twowayDispatchesCts.Token;
 1017
 1018        OutgoingResponse? response;
 1019        try
 13871020        {
 1021            // The dispatcher can complete the incoming request payload to release its memory as soon as possible.
 1022            try
 13871023            {
 1024                // _dispatcher.DispatchAsync may very well ignore the cancellation token and we don't want to keep
 1025                // dispatching when the cancellation token is canceled.
 13871026                cancellationToken.ThrowIfCancellationRequested();
 1027
 13871028                response = await _dispatcher.DispatchAsync(request, cancellationToken).ConfigureAwait(false);
 13701029            }
 1030            finally
 13871031            {
 13871032                _dispatchSemaphore?.Release();
 13871033            }
 1034
 13701035            if (response != request.Response)
 11036            {
 11037                throw new InvalidOperationException(
 11038                    "The dispatcher did not return the last response created for this request.");
 1039            }
 13691040        }
 181041        catch when (request.IsOneway)
 01042        {
 1043            // ignored since we're not returning anything
 01044            response = null;
 01045        }
 101046        catch (OperationCanceledException exception) when (exception.CancellationToken == cancellationToken)
 91047        {
 1048            // expected when the connection is disposed or the request is canceled by the peer's shutdown
 91049            response = null;
 91050        }
 91051        catch (Exception exception)
 91052        {
 91053            if (exception is not DispatchException dispatchException)
 51054            {
 51055                dispatchException = new DispatchException(StatusCode.InternalError, innerException: exception);
 51056            }
 91057            response = dispatchException.ToOutgoingResponse(request);
 91058        }
 1059        finally
 13871060        {
 13871061            request.Payload.Complete();
 13871062            contextReader?.Complete();
 1063
 1064            // The field values are now invalid - they point to potentially recycled and reused memory. We
 1065            // replace Fields by an empty dictionary to prevent accidental access to this reused memory.
 13871066            request.Fields = ImmutableDictionary<RequestFieldKey, ReadOnlySequence<byte>>.Empty;
 13871067        }
 1068
 1069        try
 13871070        {
 13871071            if (response is not null)
 13781072            {
 1073                // Read the full response payload. This can take some time so this needs to be done before acquiring
 1074                // the write semaphore.
 13781075                ReadOnlySequence<byte> payload = ReadOnlySequence<byte>.Empty;
 1076
 13781077                if (response.StatusCode <= StatusCode.ApplicationError)
 13461078                {
 1079                    try
 13461080                    {
 13461081                        payload = await ReadFullPayloadAsync(response.Payload, cancellationToken)
 13461082                            .ConfigureAwait(false);
 13431083                    }
 21084                    catch (OperationCanceledException exception) when (exception.CancellationToken == cancellationToken)
 21085                    {
 21086                        throw;
 1087                    }
 11088                    catch (Exception exception)
 11089                    {
 1090                        // We "encode" the exception in the error message.
 1091
 11092                        response = new OutgoingResponse(
 11093                            request,
 11094                            StatusCode.InternalError,
 11095                            "The dispatch failed to read the response payload.",
 11096                            exception);
 11097                    }
 13441098                }
 1099                // else payload remains empty because the payload of a dispatch exception (if any) cannot be sent
 1100                // over ice.
 1101
 13761102                int payloadSize = checked((int)payload.Length);
 1103
 1104                // Wait for writing of other frames to complete.
 13761105                using SemaphoreLock _ = await AcquireWriteLockAsync(cancellationToken).ConfigureAwait(false);
 1106                try
 13761107                {
 13761108                    EncodeResponseHeader(_duplexConnectionWriter, response, request, requestId, payloadSize);
 1109
 1110                    // We write to the duplex connection with _disposedCts.Token instead of cancellationToken.
 1111                    // Canceling this write operation is fatal to the connection.
 13761112                    await _duplexConnectionWriter.WriteAsync(payload, _disposedCts.Token).ConfigureAwait(false);
 13751113                }
 11114                catch (Exception exception)
 11115                {
 11116                    WriteFailed(exception);
 11117                    throw;
 1118                }
 13751119            }
 13841120        }
 31121        catch (OperationCanceledException exception) when (
 31122            exception.CancellationToken == _disposedCts.Token ||
 31123            exception.CancellationToken == cancellationToken)
 31124        {
 1125            // expected when the connection is disposed or the request is canceled by the peer's shutdown
 31126        }
 1127        finally
 13871128        {
 13871129            DecrementDispatchInvocationCount();
 13871130        }
 13871131    }
 1132
 1133    /// <summary>Increments the dispatch-invocation count.</summary>
 1134    /// <remarks>This method must be called with _mutex locked.</remarks>
 1135    private void IncrementDispatchInvocationCount()
 27781136    {
 27781137        if (_dispatchInvocationCount++ == 0)
 12311138        {
 1139            // Cancel inactivity check.
 12311140            _inactivityTimeoutTimer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
 12311141        }
 27781142    }
 1143
 1144    private void ScheduleInactivityCheck() =>
 13931145        _inactivityTimeoutTimer.Change(_inactivityTimeout, Timeout.InfiniteTimeSpan);
 1146
 1147    /// <summary>Reads incoming frames and returns successfully when a CloseConnection frame is received or when the
 1148    /// connection is aborted during ShutdownAsync or canceled by DisposeAsync.</summary>
 1149    private async Task ReadFramesAsync(CancellationToken cancellationToken)
 1931150    {
 1931151        await Task.Yield(); // exit mutex lock
 1152
 1153        // Wait for _connectTask (which spawned the task running this method) to complete. This way, we won't dispatch
 1154        // any request until _connectTask has completed successfully, and indirectly we won't make any invocation until
 1155        // _connectTask has completed successfully. The creation of the _readFramesTask is the last action taken by
 1156        // _connectTask and as a result this await can't fail.
 1931157        await _connectTask!.ConfigureAwait(false);
 1158
 1159        try
 1931160        {
 29651161            while (!cancellationToken.IsCancellationRequested)
 29631162            {
 29631163                ReadOnlySequence<byte> buffer = await _duplexConnectionReader.ReadAtLeastAsync(
 29631164                    IceDefinitions.PrologueSize,
 29631165                    cancellationToken).ConfigureAwait(false);
 1166
 1167                // First decode and check the prologue.
 1168
 27991169                ReadOnlySequence<byte> prologueBuffer = buffer.Slice(0, IceDefinitions.PrologueSize);
 1170
 27991171                IcePrologue prologue =
 55981172                    prologueBuffer.DecodeIceBuffer((ref IceDecoder decoder) => new IcePrologue(ref decoder));
 1173
 27991174                _duplexConnectionReader.AdvanceTo(prologueBuffer.End);
 1175
 27991176                IceDefinitions.CheckPrologue(prologue);
 27981177                if (prologue.FrameSize > _maxFrameSize)
 11178                {
 11179                    throw new InvalidDataException(
 11180                        $"Received frame with size ({prologue.FrameSize}) greater than max frame size.");
 1181                }
 1182
 27971183                if (prologue.CompressionStatus == 2)
 01184                {
 1185                    // The exception handler calls ReadFailed.
 01186                    throw new IceRpcException(
 01187                        IceRpcError.ConnectionAborted,
 01188                        "The connection was aborted because it received a compressed ice frame, and IceRPC does not supp
 1189                }
 1190
 1191                // Then process the frame based on its type.
 27971192                switch (prologue.FrameType)
 1193                {
 1194                    case IceFrameType.CloseConnection:
 251195                    {
 251196                        if (prologue.FrameSize != IceDefinitions.PrologueSize)
 01197                        {
 01198                            throw new InvalidDataException(
 01199                                $"Received {nameof(IceFrameType.CloseConnection)} frame with unexpected data.");
 1200                        }
 1201
 1202                        lock (_mutex)
 251203                        {
 251204                            RefuseNewInvocations(
 251205                                "The connection was shut down because it received a CloseConnection frame from the peer.
 1206
 1207                            // By exiting the "read frames loop" below, we are refusing new dispatches as well.
 1208
 1209                            // Only one side sends the CloseConnection frame.
 251210                            _sendCloseConnectionFrame = false;
 251211                        }
 1212
 1213                        // Even though we're in the "read frames loop", it's ok to cancel CTS and a "synchronous" TCS
 1214                        // below. We won't be reading anything else so it's ok to run continuations synchronously.
 1215
 1216                        // Abort two-way invocations that are waiting for a response (it will never come).
 1217                        // We use InvocationCanceled (not ConnectionAborted) because the ice protocol guarantees the
 1218                        // peer has sent responses for all two-way requests it accepted before sending CloseConnection.
 1219                        // These pending two-way requests were never processed by the peer, so it's safe for a retry
 1220                        // interceptor to retry them unconditionally.
 251221                        AbortTwowayInvocations(
 251222                            IceRpcError.InvocationCanceled,
 251223                            "The invocation was canceled by the shutdown of the peer.");
 1224
 1225                        // Cancel two-way dispatches since the peer is not interested in the responses. This does not
 1226                        // cancel ongoing writes to _duplexConnection: we don't send incomplete/invalid data.
 251227                        _twowayDispatchesCts.Cancel();
 1228
 1229                        // We keep sending heartbeats. If the shutdown request / shutdown is not fulfilled quickly, they
 1230                        // tell the peer we're still alive and maybe stuck waiting for invocations and dispatches to
 1231                        // complete.
 1232
 1233                        // We request a shutdown that will dispose _duplexConnection once all invocations and dispatches
 1234                        // have completed.
 251235                        _shutdownRequestedTcs.TrySetResult();
 251236                        return;
 1237                    }
 1238
 1239                    case IceFrameType.Request:
 13891240                        await ReadRequestAsync(prologue.FrameSize, cancellationToken).ConfigureAwait(false);
 13891241                        break;
 1242
 1243                    case IceFrameType.RequestBatch:
 1244                        // The exception handler calls ReadFailed.
 01245                        throw new IceRpcException(
 01246                            IceRpcError.ConnectionAborted,
 01247                            "The connection was aborted because it received a batch request, and IceRPC does not support
 1248
 1249                    case IceFrameType.Reply:
 13711250                        await ReadReplyAsync(prologue.FrameSize, cancellationToken).ConfigureAwait(false);
 13711251                        break;
 1252
 1253                    case IceFrameType.ValidateConnection:
 121254                    {
 121255                        if (prologue.FrameSize != IceDefinitions.PrologueSize)
 01256                        {
 01257                            throw new InvalidDataException(
 01258                                $"Received {nameof(IceFrameType.ValidateConnection)} frame with unexpected data.");
 1259                        }
 121260                        break;
 1261                    }
 1262
 1263                    default:
 01264                    {
 01265                        throw new InvalidDataException(
 01266                            $"Received Ice frame with unknown frame type '{prologue.FrameType}'.");
 1267                    }
 1268                }
 27721269            } // while
 21270        }
 661271        catch (OperationCanceledException)
 661272        {
 1273            // canceled by DisposeAsync, no need to throw anything
 661274        }
 981275        catch (IceRpcException exception) when (
 981276            exception.IceRpcError == IceRpcError.ConnectionAborted &&
 981277            _dispatchesAndInvocationsCompleted.Task.IsCompleted)
 731278        {
 1279            // The peer acknowledged receipt of the CloseConnection frame by aborting the duplex connection. Return.
 1280            // See ShutdownAsync.
 731281        }
 251282        catch (IceRpcException exception)
 251283        {
 251284            ReadFailed(exception);
 251285            throw;
 1286        }
 21287        catch (InvalidDataException exception)
 21288        {
 21289            ReadFailed(exception);
 21290            throw new IceRpcException(
 21291                IceRpcError.ConnectionAborted,
 21292                "The connection was aborted by an ice protocol error.",
 21293                exception);
 1294        }
 01295        catch (Exception exception)
 01296        {
 01297            Debug.Fail($"The read frames task completed due to an unhandled exception: {exception}");
 01298            ReadFailed(exception);
 01299            throw;
 1300        }
 1301
 1302        // Aborts all pending two-way invocations. Must be called outside the mutex lock after setting
 1303        // _refuseInvocations to true.
 1304        void AbortTwowayInvocations(IceRpcError error, string message, Exception? exception = null)
 521305        {
 521306            Debug.Assert(_refuseInvocations);
 1307
 1308            // _twowayInvocations is immutable once _refuseInvocations is true.
 1801309            foreach (TaskCompletionSource<PipeReader> responseCompletionSource in _twowayInvocations.Values)
 121310            {
 1311                // _twowayInvocations can hold completed completion sources.
 121312                _ = responseCompletionSource.TrySetException(new IceRpcException(error, message, exception));
 121313            }
 521314        }
 1315
 1316        // Takes appropriate action after a read failure.
 1317        void ReadFailed(Exception exception)
 271318        {
 1319            // We also prevent new one-way invocations even though they don't need to read the connection.
 271320            RefuseNewInvocations("The connection was lost because a read operation failed.");
 1321
 1322            // It's ok to cancel CTS and a "synchronous" TCS below. We won't be reading anything else so it's ok to run
 1323            // continuations synchronously.
 1324
 271325            AbortTwowayInvocations(
 271326                IceRpcError.ConnectionAborted,
 271327                "The invocation was aborted because the connection was lost.",
 271328                exception);
 1329
 1330            // ReadFailed is called when the connection is dead or the peer sent us a non-supported frame (e.g. a
 1331            // batch request). We don't need to allow outstanding two-way dispatches to complete in these situations, so
 1332            // we cancel them to speed-up the shutdown.
 271333            _twowayDispatchesCts.Cancel();
 1334
 1335            lock (_mutex)
 271336            {
 1337                // Don't send a close connection frame since we can't wait for the peer's acknowledgment.
 271338                _sendCloseConnectionFrame = false;
 271339            }
 1340
 271341            _ = _shutdownRequestedTcs.TrySetResult();
 271342        }
 1661343    }
 1344
 1345    /// <summary>Reads a reply (incoming response) and completes the invocation response completion source with this
 1346    /// response. This method executes "synchronously" in the read frames loop.</summary>
 1347    private async Task ReadReplyAsync(int replyFrameSize, CancellationToken cancellationToken)
 13711348    {
 1349        // Read the remainder of the frame immediately into frameReader.
 13711350        PipeReader replyFrameReader = await CreateFrameReaderAsync(
 13711351            replyFrameSize - IceDefinitions.PrologueSize,
 13711352            cancellationToken).ConfigureAwait(false);
 1353
 13711354        bool completeFrameReader = true;
 1355
 1356        try
 13711357        {
 1358            // Read and decode request ID
 13711359            if (!replyFrameReader.TryRead(out ReadResult readResult) || readResult.Buffer.Length < 4)
 01360            {
 01361                throw new InvalidDataException("Received a response with an invalid request ID.");
 1362            }
 1363
 13711364            ReadOnlySequence<byte> requestIdBuffer = readResult.Buffer.Slice(0, 4);
 27421365            int requestId = requestIdBuffer.DecodeIceBuffer((ref IceDecoder decoder) => decoder.DecodeInt());
 13711366            replyFrameReader.AdvanceTo(requestIdBuffer.End);
 1367
 1368            lock (_mutex)
 13711369            {
 13711370                if (_twowayInvocations.TryGetValue(
 13711371                    requestId,
 13711372                    out TaskCompletionSource<PipeReader>? responseCompletionSource))
 3661373                {
 1374                    // continuation runs asynchronously
 3661375                    if (responseCompletionSource.TrySetResult(replyFrameReader))
 3661376                    {
 3661377                        completeFrameReader = false;
 3661378                    }
 1379                    // else this invocation just completed and is about to remove itself from _twowayInvocations,
 1380                    // or _twowayInvocations is immutable and contains entries for completed invocations.
 3661381                }
 1382                // else the request ID carried by the response is bogus or corresponds to a request that was previously
 1383                // discarded (for example, because its deadline expired).
 13711384            }
 13711385        }
 1386        finally
 13711387        {
 13711388            if (completeFrameReader)
 10051389            {
 10051390                replyFrameReader.Complete();
 10051391            }
 13711392        }
 13711393    }
 1394
 1395    /// <summary>Reads and then dispatches an incoming request in a separate dispatch task. This method executes
 1396    /// "synchronously" in the read frames loop.</summary>
 1397    private async Task ReadRequestAsync(int requestFrameSize, CancellationToken cancellationToken)
 13891398    {
 1399        // Read the request frame.
 13891400        PipeReader requestFrameReader = await CreateFrameReaderAsync(
 13891401            requestFrameSize - IceDefinitions.PrologueSize,
 13891402            cancellationToken).ConfigureAwait(false);
 1403
 1404        // Decode its header.
 1405        int requestId;
 1406        IceRequestHeader requestHeader;
 13891407        PipeReader? contextReader = null;
 1408        IDictionary<RequestFieldKey, ReadOnlySequence<byte>>? fields;
 13891409        Task? dispatchTask = null;
 1410
 1411        try
 13891412        {
 13891413            if (!requestFrameReader.TryRead(out ReadResult readResult))
 01414            {
 01415                throw new InvalidDataException("Received an invalid request frame.");
 1416            }
 1417
 13891418            Debug.Assert(readResult.IsCompleted);
 1419
 13891420            (requestId, requestHeader, contextReader, int consumed) = DecodeRequestIdAndHeader(readResult.Buffer);
 13891421            requestFrameReader.AdvanceTo(readResult.Buffer.GetPosition(consumed));
 1422
 13891423            if (contextReader is null)
 13821424            {
 13821425                fields = requestHeader.OperationMode == OperationMode.Normal ?
 13821426                    ImmutableDictionary<RequestFieldKey, ReadOnlySequence<byte>>.Empty : _idempotentFields;
 13821427            }
 1428            else
 71429            {
 71430                contextReader.TryRead(out ReadResult result);
 71431                Debug.Assert(result.Buffer.Length > 0 && result.IsCompleted);
 71432                fields = new Dictionary<RequestFieldKey, ReadOnlySequence<byte>>()
 71433                {
 71434                    [RequestFieldKey.Context] = result.Buffer
 71435                };
 1436
 71437                if (requestHeader.OperationMode != OperationMode.Normal)
 01438                {
 1439                    // OperationMode can be Idempotent or Nonmutating.
 01440                    fields[RequestFieldKey.Idempotent] = default;
 01441                }
 71442            }
 1443
 13891444            bool releaseDispatchSemaphore = false;
 13891445            if (_dispatchSemaphore is SemaphoreSlim dispatchSemaphore)
 13891446            {
 1447                // This prevents us from receiving any new frames if we're already dispatching the maximum number
 1448                // of requests. We need to do this in the "accept from network loop" to apply back pressure to the
 1449                // caller.
 1450                try
 13891451                {
 13891452                    await dispatchSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
 13891453                    releaseDispatchSemaphore = true;
 13891454                }
 01455                catch (OperationCanceledException)
 01456                {
 1457                    // and return below
 01458                }
 13891459            }
 1460
 1461            lock (_mutex)
 13891462            {
 13891463                if (_shutdownTask is not null)
 21464                {
 1465                    // The connection is (being) disposed or the connection is shutting down and received a request.
 1466                    // We simply discard it. For a graceful shutdown, the two-way invocation in the peer will throw
 1467                    // IceRpcException(InvocationCanceled). We also discard one-way requests: if we accepted them, they
 1468                    // could delay our shutdown and make it time out.
 21469                    if (releaseDispatchSemaphore)
 21470                    {
 21471                        _dispatchSemaphore!.Release();
 21472                    }
 21473                    return;
 1474                }
 1475
 13871476                IncrementDispatchInvocationCount();
 13871477            }
 1478
 1479            // The scheduling of the task can't be canceled since we want to make sure DispatchRequestAsync will
 1480            // cleanup (decrement _dispatchCount etc.) if DisposeAsync is called. dispatchTask takes ownership of the
 1481            // requestFrameReader and contextReader.
 13871482            dispatchTask = Task.Run(
 13871483                async () =>
 13871484                {
 13871485                    using var request = new IncomingRequest(Protocol.Ice, _connectionContext!)
 13871486                    {
 13871487                        Fields = fields,
 13871488                        Fragment = requestHeader.Facet.ToFragment(),
 13871489                        IsOneway = requestId == 0,
 13871490                        Operation = requestHeader.Operation,
 13871491                        Path = requestHeader.Identity.ToPath(),
 13871492                        Payload = requestFrameReader,
 13871493                    };
 13871494
 13871495                    try
 13871496                    {
 13871497                        await DispatchRequestAsync(
 13871498                            request,
 13871499                            requestId,
 13871500                            contextReader).ConfigureAwait(false);
 13871501                    }
 01502                    catch (IceRpcException)
 01503                    {
 13871504                        // expected when the peer aborts the connection.
 01505                    }
 01506                    catch (Exception exception)
 01507                    {
 13871508                        // With ice, a dispatch cannot throw an exception that comes from the application code:
 13871509                        // any exception thrown when reading the response payload is converted into a DispatchException
 13871510                        // response, and the response header has no fields to encode.
 01511                        Debug.Fail($"ice dispatch {request} failed with an unexpected exception: {exception}");
 01512                        throw;
 13871513                    }
 13871514                },
 13871515                CancellationToken.None);
 13871516        }
 1517        finally
 13891518        {
 13891519            if (dispatchTask is null)
 21520            {
 21521                requestFrameReader.Complete();
 21522                contextReader?.Complete();
 21523            }
 13891524        }
 13891525    }
 1526
 1527    private void RefuseNewInvocations(string message)
 3411528    {
 1529        lock (_mutex)
 3411530        {
 3411531            _refuseInvocations = true;
 3411532            _invocationRefusedMessage ??= message;
 3411533        }
 3411534    }
 1535
 1536    /// <summary>Sends a control frame. It takes care of acquiring and releasing the write lock and calls
 1537    /// <see cref="WriteFailed" /> if a failure occurs while writing to _duplexConnectionWriter.</summary>
 1538    /// <param name="encode">Encodes the control frame.</param>
 1539    /// <param name="cancellationToken">The cancellation token.</param>
 1540    /// <remarks>If the cancellation token is canceled while writing to the duplex connection, the connection is
 1541    /// aborted.</remarks>
 1542    private async ValueTask SendControlFrameAsync(
 1543        Action<IBufferWriter<byte>> encode,
 1544        CancellationToken cancellationToken)
 1391545    {
 1391546        using SemaphoreLock _ = await AcquireWriteLockAsync(cancellationToken).ConfigureAwait(false);
 1547
 1548        try
 1381549        {
 1381550            encode(_duplexConnectionWriter);
 1381551            await _duplexConnectionWriter.FlushAsync(cancellationToken).ConfigureAwait(false);
 1351552        }
 31553        catch (Exception exception)
 31554        {
 31555            WriteFailed(exception);
 31556            throw;
 1557        }
 1351558    }
 1559
 1560    /// <summary>Takes appropriate action after a write failure.</summary>
 1561    /// <remarks>Must be called outside the mutex lock but after acquiring _writeSemaphore.</remarks>
 1562    private void WriteFailed(Exception exception)
 51563    {
 51564        Debug.Assert(_writeException is null);
 51565        _writeException = exception; // protected by _writeSemaphore
 1566
 1567        // We can't send new invocations without writing to the connection.
 51568        RefuseNewInvocations("The connection was lost because a write operation failed.");
 1569
 1570        // We can't send responses so these dispatches can be canceled.
 51571        _twowayDispatchesCts.Cancel();
 1572
 1573        // We don't change _sendClosedConnectionFrame. If the _readFrameTask is still running, we want ShutdownAsync
 1574        // to send CloseConnection - and fail.
 1575
 51576        _ = _shutdownRequestedTcs.TrySetResult();
 51577    }
 1578}