< Summary

Information
Class: IceRpc.Internal.IceProtocolConnection
Assembly: IceRpc
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Internal/IceProtocolConnection.cs
Tag: 592_20856082467
Line coverage
88%
Covered lines: 833
Uncovered lines: 105
Coverable lines: 938
Total lines: 1575
Line coverage: 88.8%
Branch coverage
83%
Covered branches: 202
Total branches: 242
Branch coverage: 83.4%
Method coverage
100%
Covered methods: 37
Total methods: 37
Method coverage: 100%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.cctor()100%11100%
get_IsServer()100%11100%
.ctor(...)87.5%88100%
ConnectAsync(...)50%2.02281.81%
PerformConnectAsync()83.33%20.831879.41%
EncodeValidateConnectionFrame()100%11100%
DecodeValidateConnectionFrame()100%11100%
DisposeAsync()100%66100%
PerformDisposeAsync()87.5%88100%
InvokeAsync(...)75%8.1888.23%
PerformInvokeAsync()79.16%24.092494.62%
ShutdownAsync(...)87.5%8.09888.88%
PerformShutdownAsync()100%8.08889.18%
EncodeCloseConnectionFrame()100%11100%
SendHeartbeat()75%44100%
SendValidateConnectionFrameAsync()100%2.98237.5%
EncodeValidateConnectionFrame()100%11100%
DecodeRequestIdAndHeader(...)75%13.041280.64%
DecodeResponseHeader(...)71.42%23.042183.33%
EncodeRequestHeader(...)83.33%6.02692.1%
EncodeResponseHeader(...)94.73%19.141992.68%
ReadFullPayloadAsync()50%4.25475%
AcquireWriteLockAsync()100%22100%
CreateFrameReaderAsync()100%1.02173.33%
DecrementDispatchInvocationCount()100%66100%
DispatchRequestAsync()93.75%16.011696.2%
IncrementDispatchInvocationCount()100%22100%
ScheduleInactivityCheck()100%11100%
ReadFramesAsync()72.22%23.251874.69%
AbortTwowayInvocations()100%22100%
ReadFailed()100%11100%
ReadReplyAsync()80%10.021093.75%
ReadRequestAsync()75%20.62088.57%
<ReadRequestAsync()100%1.02170.83%
RefuseNewInvocations(...)100%22100%
SendControlFrameAsync()100%11100%
WriteFailed(...)100%11100%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc/Internal/IceProtocolConnection.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using IceRpc.Transports;
 4using IceRpc.Transports.Internal;
 5using System.Buffers;
 6using System.Collections.Immutable;
 7using System.Diagnostics;
 8using System.IO.Pipelines;
 9using System.Security.Authentication;
 10using ZeroC.Slice;
 11
 12namespace IceRpc.Internal;
 13
 14/// <summary>Implements <see cref="IProtocolConnection" /> for the ice protocol.</summary>
 15internal sealed class IceProtocolConnection : IProtocolConnection
 16{
 117    private static readonly IDictionary<RequestFieldKey, ReadOnlySequence<byte>> _idempotentFields =
 118        new Dictionary<RequestFieldKey, ReadOnlySequence<byte>>
 119        {
 120            [RequestFieldKey.Idempotent] = default
 121        }.ToImmutableDictionary();
 22
 18023    private bool IsServer => _transportConnectionInformation is not null;
 24
 25    private IConnectionContext? _connectionContext; // non-null once the connection is established
 26    private Task? _connectTask;
 27    private readonly IDispatcher _dispatcher;
 28
 29    // The number of outstanding dispatches and invocations.
 30    private int _dispatchInvocationCount;
 31
 32    // We don't want the continuation to run from the dispatch or invocation thread.
 19133    private readonly TaskCompletionSource _dispatchesAndInvocationsCompleted =
 19134        new(TaskCreationOptions.RunContinuationsAsynchronously);
 35
 36    private readonly SemaphoreSlim? _dispatchSemaphore;
 37
 38    // This cancellation token source is canceled when the connection is disposed.
 19139    private readonly CancellationTokenSource _disposedCts = new();
 40
 41    private Task? _disposeTask;
 42    private readonly IDuplexConnection _duplexConnection;
 43    private readonly DuplexConnectionReader _duplexConnectionReader;
 44    private readonly IceDuplexConnectionWriter _duplexConnectionWriter;
 19145    private bool _heartbeatEnabled = true;
 19146    private Task _heartbeatTask = Task.CompletedTask;
 47    private readonly TimeSpan _inactivityTimeout;
 48    private readonly Timer _inactivityTimeoutTimer;
 49    private string? _invocationRefusedMessage;
 50    private int _lastRequestId;
 51    private readonly int _maxFrameSize;
 19152    private readonly Lock _mutex = new();
 53    private readonly PipeOptions _pipeOptions;
 54    private Task? _readFramesTask;
 55
 56    // A connection refuses invocations when it's disposed, shut down, shutting down or merely "shutdown requested".
 57    private bool _refuseInvocations;
 58
 59    // Does ShutdownAsync send a close connection frame?
 19160    private bool _sendCloseConnectionFrame = true;
 61
 62    private Task? _shutdownTask;
 63
 64    // The thread that completes this TCS can run the continuations, and as a result its result must be set without
 65    // holding a lock on _mutex.
 19166    private readonly TaskCompletionSource _shutdownRequestedTcs = new();
 67
 68    // Only set for server connections.
 69    private readonly TransportConnectionInformation? _transportConnectionInformation;
 70
 71    private readonly CancellationTokenSource _twowayDispatchesCts;
 19172    private readonly Dictionary<int, TaskCompletionSource<PipeReader>> _twowayInvocations = new();
 73
 74    private Exception? _writeException; // protected by _writeSemaphore
 19175    private readonly SemaphoreSlim _writeSemaphore = new(1, 1);
 76
 77    public Task<(TransportConnectionInformation ConnectionInformation, Task ShutdownRequested)> ConnectAsync(
 78        CancellationToken cancellationToken)
 19079    {
 80        Task<(TransportConnectionInformation ConnectionInformation, Task ShutdownRequested)> result;
 81        lock (_mutex)
 19082        {
 19083            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 84
 18885            if (_connectTask is not null)
 086            {
 087                throw new InvalidOperationException("Cannot call connect more than once.");
 88            }
 89
 18890            result = PerformConnectAsync();
 18891            _connectTask = result;
 18892        }
 18893        return result;
 94
 95        async Task<(TransportConnectionInformation ConnectionInformation, Task ShutdownRequested)> PerformConnectAsync()
 18896        {
 97            // Make sure we execute the function without holding the connection mutex lock.
 18898            await Task.Yield();
 99
 100            // _disposedCts is not disposed at this point because DisposeAsync waits for the completion of _connectTask.
 188101            using var connectCts = CancellationTokenSource.CreateLinkedTokenSource(
 188102                cancellationToken,
 188103                _disposedCts.Token);
 104
 105            TransportConnectionInformation transportConnectionInformation;
 106
 107            try
 188108            {
 109                // If the transport connection information is null, we need to connect the transport connection. It's
 110                // null for client connections. The transport connection of a server connection is established by
 111                // Server.
 188112                transportConnectionInformation = _transportConnectionInformation ??
 188113                    await _duplexConnection.ConnectAsync(connectCts.Token).ConfigureAwait(false);
 114
 180115                if (IsServer)
 87116                {
 117                    // Send ValidateConnection frame.
 87118                    await SendControlFrameAsync(EncodeValidateConnectionFrame, connectCts.Token).ConfigureAwait(false);
 119
 120                    // The SendControlFrameAsync is a "write" that schedules a heartbeat when the idle timeout is not
 121                    // infinite. So no need to call ScheduleHeartbeat.
 85122                }
 123                else
 93124                {
 93125                    ReadOnlySequence<byte> buffer = await _duplexConnectionReader.ReadAtLeastAsync(
 93126                        IceDefinitions.PrologueSize,
 93127                        connectCts.Token).ConfigureAwait(false);
 128
 83129                    (IcePrologue validateConnectionFrame, long consumed) = DecodeValidateConnectionFrame(buffer);
 83130                    _duplexConnectionReader.AdvanceTo(buffer.GetPosition(consumed), buffer.End);
 131
 83132                    IceDefinitions.CheckPrologue(validateConnectionFrame);
 82133                    if (validateConnectionFrame.FrameSize != IceDefinitions.PrologueSize)
 0134                    {
 0135                        throw new InvalidDataException(
 0136                            $"Received ice frame with only '{validateConnectionFrame.FrameSize}' bytes.");
 137                    }
 82138                    if (validateConnectionFrame.FrameType != IceFrameType.ValidateConnection)
 0139                    {
 0140                        throw new InvalidDataException(
 0141                            $"Expected '{nameof(IceFrameType.ValidateConnection)}' frame but received frame type '{valid
 142                    }
 143
 144                    // The client connection is now connected, so we schedule the first heartbeat.
 82145                    if (_duplexConnection is IceDuplexConnectionDecorator decorator)
 82146                    {
 82147                        decorator.ScheduleHeartbeat();
 82148                    }
 82149                }
 167150            }
 11151            catch (OperationCanceledException)
 11152            {
 11153                cancellationToken.ThrowIfCancellationRequested();
 154
 5155                Debug.Assert(_disposedCts.Token.IsCancellationRequested);
 5156                throw new IceRpcException(
 5157                    IceRpcError.OperationAborted,
 5158                    "The connection establishment was aborted because the connection was disposed.");
 159            }
 1160            catch (InvalidDataException exception)
 1161            {
 1162                throw new IceRpcException(
 1163                    IceRpcError.ConnectionAborted,
 1164                    "The connection was aborted by an ice protocol error.",
 1165                    exception);
 166            }
 1167            catch (AuthenticationException)
 1168            {
 1169                throw;
 170            }
 8171            catch (IceRpcException)
 8172            {
 8173                throw;
 174            }
 0175            catch (Exception exception)
 0176            {
 0177                Debug.Fail($"ConnectAsync failed with an unexpected exception: {exception}");
 0178                throw;
 179            }
 180
 181            // We assign _readFramesTask with _mutex locked to make sure this assignment occurs before the start of
 182            // DisposeAsync. Once _disposeTask is not null, _readFramesTask is immutable.
 183            lock (_mutex)
 167184            {
 167185                if (_disposeTask is not null)
 0186                {
 0187                    throw new IceRpcException(
 0188                        IceRpcError.OperationAborted,
 0189                        "The connection establishment was aborted because the connection was disposed.");
 190                }
 191
 192                // This needs to be set before starting the read frames task below.
 167193                _connectionContext = new ConnectionContext(this, transportConnectionInformation);
 194
 167195                _readFramesTask = ReadFramesAsync(_disposedCts.Token);
 167196            }
 197
 198            // The _readFramesTask waits for this PerformConnectAsync completion before reading anything. As soon as
 199            // it receives a request, it will cancel this inactivity check.
 167200            ScheduleInactivityCheck();
 201
 167202            return (transportConnectionInformation, _shutdownRequestedTcs.Task);
 203
 204            static void EncodeValidateConnectionFrame(IBufferWriter<byte> writer)
 87205            {
 87206                var encoder = new SliceEncoder(writer, SliceEncoding.Slice1);
 87207                IceDefinitions.ValidateConnectionFrame.Encode(ref encoder);
 87208            }
 209
 210            static (IcePrologue, long) DecodeValidateConnectionFrame(ReadOnlySequence<byte> buffer)
 83211            {
 83212                var decoder = new SliceDecoder(buffer, SliceEncoding.Slice1);
 83213                return (new IcePrologue(ref decoder), decoder.Consumed);
 83214            }
 167215        }
 188216    }
 217
 218    public ValueTask DisposeAsync()
 211219    {
 220        lock (_mutex)
 211221        {
 211222            if (_disposeTask is null)
 191223            {
 191224                RefuseNewInvocations("The connection was disposed.");
 225
 191226                _shutdownTask ??= Task.CompletedTask;
 191227                if (_dispatchInvocationCount == 0)
 183228                {
 183229                    _dispatchesAndInvocationsCompleted.TrySetResult();
 183230                }
 231
 191232                _heartbeatEnabled = false; // makes _heartbeatTask immutable
 233
 191234                _disposeTask = PerformDisposeAsync();
 191235            }
 211236        }
 211237        return new(_disposeTask);
 238
 239        async Task PerformDisposeAsync()
 191240        {
 241            // Make sure we execute the code below without holding the mutex lock.
 191242            await Task.Yield();
 243
 191244            _disposedCts.Cancel();
 245
 246            // We don't lock _mutex since once _disposeTask is not null, _connectTask etc are immutable.
 247
 191248            if (_connectTask is not null)
 188249            {
 250                // Wait for all writes to complete. This can't take forever since all writes are canceled by
 251                // _disposedCts.Token.
 188252                await _writeSemaphore.WaitAsync().ConfigureAwait(false);
 253
 254                try
 188255                {
 188256                    await Task.WhenAll(
 188257                        _connectTask,
 188258                        _readFramesTask ?? Task.CompletedTask,
 188259                        _heartbeatTask,
 188260                        _dispatchesAndInvocationsCompleted.Task,
 188261                        _shutdownTask).ConfigureAwait(false);
 133262                }
 55263                catch
 55264                {
 265                    // Expected if any of these tasks failed or was canceled. Each task takes care of handling
 266                    // unexpected exceptions so there's no need to handle them here.
 55267                }
 188268            }
 269
 191270            _duplexConnection.Dispose();
 271
 272            // It's safe to dispose the reader/writer since no more threads are sending/receiving data.
 191273            _duplexConnectionReader.Dispose();
 191274            _duplexConnectionWriter.Dispose();
 275
 191276            _disposedCts.Dispose();
 191277            _twowayDispatchesCts.Dispose();
 278
 191279            _dispatchSemaphore?.Dispose();
 191280            _writeSemaphore.Dispose();
 191281            await _inactivityTimeoutTimer.DisposeAsync().ConfigureAwait(false);
 191282        }
 211283    }
 284
 285    public Task<IncomingResponse> InvokeAsync(OutgoingRequest request, CancellationToken cancellationToken = default)
 1385286    {
 1385287        if (request.Protocol != Protocol.Ice)
 1288        {
 1289            throw new InvalidOperationException(
 1290                $"Cannot send {request.Protocol} request on {Protocol.Ice} connection.");
 291        }
 292
 293        lock (_mutex)
 1384294        {
 1384295            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 296
 1383297            if (_refuseInvocations)
 1298            {
 1299                throw new IceRpcException(IceRpcError.InvocationRefused, _invocationRefusedMessage);
 300            }
 1382301            if (_connectTask is null || !_connectTask.IsCompletedSuccessfully)
 0302            {
 0303                throw new InvalidOperationException("Cannot invoke on a connection that is not fully established.");
 304            }
 305
 1382306            IncrementDispatchInvocationCount();
 1382307        }
 308
 1382309        return PerformInvokeAsync();
 310
 311        async Task<IncomingResponse> PerformInvokeAsync()
 1382312        {
 313            // Since _dispatchInvocationCount > 0, _disposedCts is not disposed.
 1382314            using var invocationCts =
 1382315                CancellationTokenSource.CreateLinkedTokenSource(_disposedCts.Token, cancellationToken);
 316
 1382317            PipeReader? frameReader = null;
 1382318            TaskCompletionSource<PipeReader>? responseCompletionSource = null;
 1382319            int requestId = 0;
 320
 321            try
 1382322            {
 323                // Read the full payload. This can take some time so this needs to be done before acquiring the write
 324                // semaphore.
 1382325                ReadOnlySequence<byte> payloadBuffer = await ReadFullPayloadAsync(request.Payload, invocationCts.Token)
 1382326                    .ConfigureAwait(false);
 327
 328                try
 1382329                {
 330                    // Wait for the writing of other frames to complete.
 1382331                    using SemaphoreLock _ = await AcquireWriteLockAsync(invocationCts.Token).ConfigureAwait(false);
 332
 333                    // Assign the request ID for two-way invocations and keep track of the invocation for receiving the
 334                    // response. The request ID is only assigned once the write semaphore is acquired. We don't want a
 335                    // canceled request to allocate a request ID that won't be used.
 336                    lock (_mutex)
 1382337                    {
 1382338                        if (_refuseInvocations)
 0339                        {
 340                            // It's InvocationCanceled and not InvocationRefused because we've read the payload.
 0341                            throw new IceRpcException(IceRpcError.InvocationCanceled, _invocationRefusedMessage);
 342                        }
 343
 1382344                        if (!request.IsOneway)
 371345                        {
 346                            // wrap around back to 1 if we reach int.MaxValue. 0 means one-way.
 371347                            _lastRequestId = _lastRequestId == int.MaxValue ? 1 : _lastRequestId + 1;
 371348                            requestId = _lastRequestId;
 349
 350                            // RunContinuationsAsynchronously because we don't want the "read frames loop" to run the
 351                            // continuation.
 371352                            responseCompletionSource = new(TaskCreationOptions.RunContinuationsAsynchronously);
 371353                            _twowayInvocations[requestId] = responseCompletionSource;
 371354                        }
 1382355                    }
 356
 1382357                    int payloadSize = checked((int)payloadBuffer.Length);
 358
 359                    try
 1382360                    {
 1382361                        EncodeRequestHeader(_duplexConnectionWriter, request, requestId, payloadSize);
 362
 363                        // We write to the duplex connection with _disposedCts.Token instead of invocationCts.Token.
 364                        // Canceling this write operation is fatal to the connection.
 1382365                        await _duplexConnectionWriter.WriteAsync(payloadBuffer, _disposedCts.Token)
 1382366                            .ConfigureAwait(false);
 1381367                    }
 1368                    catch (Exception exception)
 1369                    {
 1370                        WriteFailed(exception);
 1371                        throw;
 372                    }
 1381373                }
 1374                catch (IceRpcException exception) when (exception.IceRpcError != IceRpcError.InvocationCanceled)
 1375                {
 376                    // Since we could not send the request, the server cannot dispatch it and it's safe to retry.
 377                    // This includes the situation where await AcquireWriteLockAsync throws because a previous write
 378                    // failed.
 1379                    throw new IceRpcException(
 1380                        IceRpcError.InvocationCanceled,
 1381                        "Failed to send ice request.",
 1382                        exception);
 383                }
 384                finally
 1382385                {
 386                    // We've read the payload (see ReadFullPayloadAsync) and we are now done with it.
 1382387                    request.Payload.Complete();
 1382388                }
 389
 1381390                if (request.IsOneway)
 1011391                {
 392                    // We're done, there's no response for one-way requests.
 1011393                    return new IncomingResponse(request, _connectionContext!);
 394                }
 395
 396                // Wait to receive the response.
 370397                Debug.Assert(responseCompletionSource is not null);
 370398                frameReader = await responseCompletionSource.Task.WaitAsync(invocationCts.Token).ConfigureAwait(false);
 399
 351400                if (!frameReader.TryRead(out ReadResult readResult))
 0401                {
 0402                    throw new InvalidDataException($"Received empty response frame for request with id '{requestId}'.");
 403                }
 404
 351405                Debug.Assert(readResult.IsCompleted);
 406
 351407                (StatusCode statusCode, string? errorMessage, SequencePosition consumed) =
 351408                    DecodeResponseHeader(readResult.Buffer, requestId);
 409
 351410                frameReader.AdvanceTo(consumed);
 411
 351412                var response = new IncomingResponse(
 351413                    request,
 351414                    _connectionContext!,
 351415                    statusCode,
 351416                    errorMessage)
 351417                {
 351418                    Payload = frameReader
 351419                };
 420
 351421                frameReader = null; // response now owns frameReader
 351422                return response;
 423            }
 9424            catch (OperationCanceledException)
 9425            {
 9426                cancellationToken.ThrowIfCancellationRequested();
 427
 3428                Debug.Assert(_disposedCts.Token.IsCancellationRequested);
 3429                throw new IceRpcException(
 3430                    IceRpcError.OperationAborted,
 3431                    "The invocation was aborted because the connection was disposed.");
 432            }
 433            finally
 1382434            {
 435                // If responseCompletionSource is not completed, we want to complete it to prevent another method from
 436                // setting an unobservable exception in it. And if it's already completed with an exception, we observe
 437                // this exception.
 1382438                if (responseCompletionSource is not null &&
 1382439                    !responseCompletionSource.TrySetResult(InvalidPipeReader.Instance))
 361440                {
 441                    try
 361442                    {
 361443                        _ = await responseCompletionSource.Task.ConfigureAwait(false);
 351444                    }
 10445                    catch
 10446                    {
 447                        // observe exception, if any
 10448                    }
 361449                }
 450
 451                lock (_mutex)
 1382452                {
 453                    // Unregister the two-way invocation if registered.
 1382454                    if (requestId > 0 && !_refuseInvocations)
 353455                    {
 353456                        _twowayInvocations.Remove(requestId);
 353457                    }
 458
 1382459                    DecrementDispatchInvocationCount();
 1382460                }
 461
 1382462                frameReader?.Complete();
 1382463            }
 0464        }
 2744465    }
 466
 467    public Task ShutdownAsync(CancellationToken cancellationToken = default)
 59468    {
 469        lock (_mutex)
 59470        {
 59471            ObjectDisposedException.ThrowIf(_disposeTask is not null, this);
 472
 58473            if (_shutdownTask is not null)
 0474            {
 0475                throw new InvalidOperationException("Cannot call ShutdownAsync more than once.");
 476            }
 58477            if (_connectTask is null || !_connectTask.IsCompletedSuccessfully)
 3478            {
 3479                throw new InvalidOperationException("Cannot shut down a protocol connection before it's connected.");
 480            }
 481
 55482            RefuseNewInvocations("The connection was shut down.");
 483
 55484            if (_dispatchInvocationCount == 0)
 41485            {
 41486                _dispatchesAndInvocationsCompleted.TrySetResult();
 41487            }
 55488            _shutdownTask = PerformShutdownAsync(_sendCloseConnectionFrame);
 55489        }
 490
 55491        return _shutdownTask;
 492
 493        async Task PerformShutdownAsync(bool sendCloseConnectionFrame)
 55494        {
 55495            await Task.Yield(); // exit mutex lock
 496
 497            try
 55498            {
 55499                Debug.Assert(_readFramesTask is not null);
 500
 501                // Since DisposeAsync waits for the _shutdownTask completion, _disposedCts is not disposed at this
 502                // point.
 55503                using var shutdownCts = CancellationTokenSource.CreateLinkedTokenSource(
 55504                    cancellationToken,
 55505                    _disposedCts.Token);
 506
 507                // Wait for dispatches and invocations to complete.
 55508                await _dispatchesAndInvocationsCompleted.Task.WaitAsync(shutdownCts.Token).ConfigureAwait(false);
 509
 510                // Stops sending heartbeats. We can't do earlier: while we're waiting for dispatches and invocations to
 511                // complete, we need to keep sending heartbeats otherwise the peer could see the connection as idle and
 512                // abort it.
 513                lock (_mutex)
 51514                {
 51515                    _heartbeatEnabled = false; // makes _heartbeatTask immutable
 51516                }
 517
 518                // Wait for the last send heartbeat to complete before sending the CloseConnection frame or disposing
 519                // the duplex connection. _heartbeatTask is immutable once _shutdownTask set. _heartbeatTask can be
 520                // canceled by DisposeAsync.
 51521                await _heartbeatTask.WaitAsync(shutdownCts.Token).ConfigureAwait(false);
 522
 51523                if (sendCloseConnectionFrame)
 23524                {
 525                    // Send CloseConnection frame.
 23526                    await SendControlFrameAsync(EncodeCloseConnectionFrame, shutdownCts.Token).ConfigureAwait(false);
 527
 528                    // Wait for the peer to abort the connection as an acknowledgment for this CloseConnection frame.
 529                    // The peer can also send us a CloseConnection frame if it started shutting down at the same time.
 530                    // We can't just return and dispose the duplex connection since the peer can still be reading frames
 531                    // (including the CloseConnection frame) and we don't want to abort this reading.
 21532                    await _readFramesTask.WaitAsync(shutdownCts.Token).ConfigureAwait(false);
 20533                }
 534                else
 28535                {
 536                    // _readFramesTask should be already completed or nearly completed.
 28537                    await _readFramesTask.WaitAsync(shutdownCts.Token).ConfigureAwait(false);
 538
 539                    // _readFramesTask succeeded means the peer is waiting for us to abort the duplex connection;
 540                    // we oblige.
 19541                    _duplexConnection.Dispose();
 19542                }
 39543            }
 7544            catch (OperationCanceledException)
 7545            {
 7546                cancellationToken.ThrowIfCancellationRequested();
 547
 3548                Debug.Assert(_disposedCts.Token.IsCancellationRequested);
 3549                throw new IceRpcException(
 3550                    IceRpcError.OperationAborted,
 3551                    "The connection shutdown was aborted because the connection was disposed.");
 552            }
 9553            catch (IceRpcException)
 9554            {
 9555                throw;
 556            }
 0557            catch (Exception exception)
 0558            {
 0559                Debug.Fail($"ShutdownAsync failed with an unexpected exception: {exception}");
 0560                throw;
 561            }
 562
 563            static void EncodeCloseConnectionFrame(IBufferWriter<byte> writer)
 22564            {
 22565                var encoder = new SliceEncoder(writer, SliceEncoding.Slice1);
 22566                IceDefinitions.CloseConnectionFrame.Encode(ref encoder);
 22567            }
 39568        }
 55569    }
 570
 191571    internal IceProtocolConnection(
 191572        IDuplexConnection duplexConnection,
 191573        TransportConnectionInformation? transportConnectionInformation,
 191574        ConnectionOptions options)
 191575    {
 191576        _twowayDispatchesCts = CancellationTokenSource.CreateLinkedTokenSource(_disposedCts.Token);
 577
 578        // With ice, we always listen for incoming frames (responses) so we need a dispatcher for incoming requests even
 579        // if we don't expect any. This dispatcher throws an ice ObjectNotExistException back to the client, which makes
 580        // more sense than throwing an UnknownException.
 191581        _dispatcher = options.Dispatcher ?? NotFoundDispatcher.Instance;
 582
 191583        _maxFrameSize = options.MaxIceFrameSize;
 191584        _transportConnectionInformation = transportConnectionInformation;
 585
 191586        if (options.MaxDispatches > 0)
 191587        {
 191588            _dispatchSemaphore = new SemaphoreSlim(
 191589                initialCount: options.MaxDispatches,
 191590                maxCount: options.MaxDispatches);
 191591        }
 592
 191593        _inactivityTimeout = options.InactivityTimeout;
 594
 595        // The readerScheduler doesn't matter (we don't call pipe.Reader.ReadAsync on the resulting pipe), and the
 596        // writerScheduler doesn't matter (pipe.Writer.FlushAsync never blocks).
 191597        _pipeOptions = new PipeOptions(
 191598            pool: options.Pool,
 191599            minimumSegmentSize: options.MinSegmentSize,
 191600            pauseWriterThreshold: 0,
 191601            useSynchronizationContext: false);
 602
 191603        if (options.IceIdleTimeout != Timeout.InfiniteTimeSpan)
 191604        {
 191605            duplexConnection = new IceDuplexConnectionDecorator(
 191606                duplexConnection,
 191607                readIdleTimeout: options.EnableIceIdleCheck ? options.IceIdleTimeout : Timeout.InfiniteTimeSpan,
 191608                writeIdleTimeout: options.IceIdleTimeout,
 191609                SendHeartbeat);
 191610        }
 611
 191612        _duplexConnection = duplexConnection;
 191613        _duplexConnectionReader = new DuplexConnectionReader(_duplexConnection, options.Pool, options.MinSegmentSize);
 191614        _duplexConnectionWriter =
 191615            new IceDuplexConnectionWriter(_duplexConnection, options.Pool, options.MinSegmentSize);
 616
 191617        _inactivityTimeoutTimer = new Timer(_ =>
 5618        {
 5619            bool requestShutdown = false;
 191620
 191621            lock (_mutex)
 5622            {
 5623                if (_dispatchInvocationCount == 0 && _shutdownTask is null)
 5624                {
 5625                    requestShutdown = true;
 5626                    RefuseNewInvocations(
 5627                        $"The connection was shut down because it was inactive for over {_inactivityTimeout.TotalSeconds
 5628                }
 5629            }
 191630
 5631            if (requestShutdown)
 5632            {
 191633                // TrySetResult must be called outside the mutex lock.
 5634                _shutdownRequestedTcs.TrySetResult();
 5635            }
 196636        });
 637
 638        void SendHeartbeat()
 12639        {
 640            lock (_mutex)
 12641            {
 12642                if (_heartbeatTask.IsCompletedSuccessfully && _heartbeatEnabled)
 12643                {
 12644                    _heartbeatTask = SendValidateConnectionFrameAsync(_disposedCts.Token);
 12645                }
 12646            }
 647
 648            async Task SendValidateConnectionFrameAsync(CancellationToken cancellationToken)
 12649            {
 650                // Make sure we execute the function without holding the connection mutex lock.
 12651                await Task.Yield();
 652
 653                try
 12654                {
 12655                    await SendControlFrameAsync(EncodeValidateConnectionFrame, cancellationToken).ConfigureAwait(false);
 12656                }
 0657                catch (OperationCanceledException)
 0658                {
 659                    // Canceled by DisposeAsync
 0660                    throw;
 661                }
 0662                catch (IceRpcException)
 0663                {
 664                    // Expected, typically the peer aborted the connection.
 0665                    throw;
 666                }
 0667                catch (Exception exception)
 0668                {
 0669                    Debug.Fail($"The heartbeat task completed due to an unhandled exception: {exception}");
 0670                    throw;
 671                }
 672
 673                static void EncodeValidateConnectionFrame(IBufferWriter<byte> writer)
 12674                {
 12675                    var encoder = new SliceEncoder(writer, SliceEncoding.Slice1);
 12676                    IceDefinitions.ValidateConnectionFrame.Encode(ref encoder);
 12677                }
 12678            }
 12679        }
 191680    }
 681
 682    private static (int RequestId, IceRequestHeader Header, PipeReader? ContextReader, int Consumed) DecodeRequestIdAndH
 683        ReadOnlySequence<byte> buffer)
 1380684    {
 1380685        var decoder = new SliceDecoder(buffer, SliceEncoding.Slice1);
 686
 1380687        int requestId = decoder.DecodeInt32();
 688
 1380689        var requestHeader = new IceRequestHeader(ref decoder);
 690
 1380691        Pipe? contextPipe = null;
 1380692        long pos = decoder.Consumed;
 1380693        int count = decoder.DecodeSize();
 1380694        if (count > 0)
 7695        {
 28696            for (int i = 0; i < count; ++i)
 7697            {
 7698                decoder.Skip(decoder.DecodeSize()); // Skip the key
 7699                decoder.Skip(decoder.DecodeSize()); // Skip the value
 7700            }
 7701            contextPipe = new Pipe();
 7702            contextPipe.Writer.Write(buffer.Slice(pos, decoder.Consumed - pos));
 7703            contextPipe.Writer.Complete();
 7704        }
 705
 1380706        var encapsulationHeader = new EncapsulationHeader(ref decoder);
 707
 1380708        if (encapsulationHeader.PayloadEncodingMajor != 1 ||
 1380709            encapsulationHeader.PayloadEncodingMinor != 1)
 0710        {
 0711            throw new InvalidDataException(
 0712                $"Unsupported payload encoding '{encapsulationHeader.PayloadEncodingMajor}.{encapsulationHeader.PayloadE
 713        }
 714
 1380715        int payloadSize = encapsulationHeader.EncapsulationSize - 6;
 1380716        if (payloadSize != (buffer.Length - decoder.Consumed))
 0717        {
 0718            throw new InvalidDataException(
 0719                $"Request payload size mismatch: expected {payloadSize} bytes, read {buffer.Length - decoder.Consumed} b
 720        }
 721
 1380722        return (requestId, requestHeader, contextPipe?.Reader, (int)decoder.Consumed);
 1380723    }
 724
 725    private static (StatusCode StatusCode, string? ErrorMessage, SequencePosition Consumed) DecodeResponseHeader(
 726        ReadOnlySequence<byte> buffer,
 727        int requestId)
 351728    {
 351729        ReplyStatus replyStatus = ((int)buffer.FirstSpan[0]).AsReplyStatus();
 730
 351731        if (replyStatus <= ReplyStatus.UserException)
 327732        {
 733            const int headerSize = 7; // reply status byte + encapsulation header
 734
 735            // read and check encapsulation header (6 bytes long)
 736
 327737            if (buffer.Length < headerSize)
 0738            {
 0739                throw new InvalidDataException($"Received invalid frame header for request with id '{requestId}'.");
 740            }
 741
 327742            EncapsulationHeader encapsulationHeader = SliceEncoding.Slice1.DecodeBuffer(
 327743                buffer.Slice(1, 6),
 654744                (ref SliceDecoder decoder) => new EncapsulationHeader(ref decoder));
 745
 746            // Sanity check
 327747            int payloadSize = encapsulationHeader.EncapsulationSize - 6;
 327748            if (payloadSize != buffer.Length - headerSize)
 0749            {
 0750                throw new InvalidDataException(
 0751                    $"Response payload size/frame size mismatch: payload size is {payloadSize} bytes but frame has {buff
 752            }
 753
 327754            SequencePosition consumed = buffer.GetPosition(headerSize);
 755
 327756            return replyStatus == ReplyStatus.Ok ? (StatusCode.Ok, null, consumed) :
 327757                // Set the error message to the empty string, because null is not allowed for status code > Ok.
 327758                (StatusCode.ApplicationError, "", consumed);
 759        }
 760        else
 24761        {
 762            // An ice system exception.
 763
 24764            StatusCode statusCode = replyStatus switch
 24765            {
 13766                ReplyStatus.ObjectNotExistException => StatusCode.NotFound,
 0767                ReplyStatus.FacetNotExistException => StatusCode.NotFound,
 1768                ReplyStatus.OperationNotExistException => StatusCode.NotImplemented,
 1769                ReplyStatus.InvalidData => StatusCode.InvalidData,
 0770                ReplyStatus.Unauthorized => StatusCode.Unauthorized,
 9771                _ => StatusCode.InternalError
 24772            };
 773
 24774            var decoder = new SliceDecoder(buffer.Slice(1), SliceEncoding.Slice1);
 775
 776            string message;
 24777            switch (replyStatus)
 778            {
 779                case ReplyStatus.FacetNotExistException:
 780                case ReplyStatus.ObjectNotExistException:
 781                case ReplyStatus.OperationNotExistException:
 782
 14783                    var requestFailed = new RequestFailedExceptionData(ref decoder);
 784
 14785                    string target = requestFailed.Fragment.Length > 0 ?
 14786                        $"{requestFailed.Identity.ToPath()}#{requestFailed.Fragment}" : requestFailed.Identity.ToPath();
 787
 14788                    message =
 14789                        $"The dispatch failed with status code {statusCode} while dispatching '{requestFailed.Operation}
 14790                    break;
 791                default:
 10792                    message = decoder.DecodeString();
 10793                    break;
 794            }
 24795            decoder.CheckEndOfBuffer();
 24796            return (statusCode, message, buffer.End);
 797        }
 351798    }
 799
 800    private static void EncodeRequestHeader(
 801        IceDuplexConnectionWriter output,
 802        OutgoingRequest request,
 803        int requestId,
 804        int payloadSize)
 1382805    {
 1382806        var encoder = new SliceEncoder(output, SliceEncoding.Slice1);
 807
 808        // Write the request header.
 1382809        encoder.WriteByteSpan(IceDefinitions.FramePrologue);
 1382810        encoder.EncodeIceFrameType(IceFrameType.Request);
 1382811        encoder.EncodeUInt8(0); // compression status
 812
 1382813        Span<byte> sizePlaceholder = encoder.GetPlaceholderSpan(4);
 814
 1382815        encoder.EncodeInt32(requestId);
 816
 1382817        byte encodingMajor = 1;
 1382818        byte encodingMinor = 1;
 819
 820        // Request header.
 1382821        var requestHeader = new IceRequestHeader(
 1382822            Identity.Parse(request.ServiceAddress.Path),
 1382823            request.ServiceAddress.Fragment,
 1382824            request.Operation,
 1382825            request.Fields.ContainsKey(RequestFieldKey.Idempotent) ? OperationMode.Idempotent : OperationMode.Normal);
 1382826        requestHeader.Encode(ref encoder);
 1382827        int directWriteSize = 0;
 1382828        if (request.Fields.TryGetValue(RequestFieldKey.Context, out OutgoingFieldValue requestField))
 7829        {
 7830            if (requestField.WriteAction is Action<IBufferWriter<byte>> writeAction)
 7831            {
 832                // This writes directly to the underlying output; we measure how many bytes are written.
 7833                long start = output.UnflushedBytes;
 7834                writeAction(output);
 7835                directWriteSize = (int)(output.UnflushedBytes - start);
 7836            }
 837            else
 0838            {
 0839                encoder.WriteByteSequence(requestField.ByteSequence);
 0840            }
 7841        }
 842        else
 1375843        {
 1375844            encoder.EncodeSize(0);
 1375845        }
 846
 847        // We ignore all other fields. They can't be sent over ice.
 848
 1382849        new EncapsulationHeader(
 1382850            encapsulationSize: payloadSize + 6,
 1382851            encodingMajor,
 1382852            encodingMinor).Encode(ref encoder);
 853
 1382854        int frameSize = checked(encoder.EncodedByteCount + directWriteSize + payloadSize);
 1382855        SliceEncoder.EncodeInt32(frameSize, sizePlaceholder);
 1382856    }
 857
 858    private static void EncodeResponseHeader(
 859        IBufferWriter<byte> writer,
 860        OutgoingResponse response,
 861        IncomingRequest request,
 862        int requestId,
 863        int payloadSize)
 1367864    {
 1367865        var encoder = new SliceEncoder(writer, SliceEncoding.Slice1);
 866
 867        // Write the response header.
 868
 1367869        encoder.WriteByteSpan(IceDefinitions.FramePrologue);
 1367870        encoder.EncodeIceFrameType(IceFrameType.Reply);
 1367871        encoder.EncodeUInt8(0); // compression status
 1367872        Span<byte> sizePlaceholder = encoder.GetPlaceholderSpan(4);
 873
 1367874        encoder.EncodeInt32(requestId);
 875
 1367876        if (response.StatusCode > StatusCode.ApplicationError ||
 1367877            (response.StatusCode == StatusCode.ApplicationError && payloadSize == 0))
 26878        {
 879            // system exception
 26880            switch (response.StatusCode)
 881            {
 882                case StatusCode.NotFound:
 883                case StatusCode.NotImplemented:
 16884                    encoder.EncodeReplyStatus(response.StatusCode == StatusCode.NotFound ?
 16885                        ReplyStatus.ObjectNotExistException : ReplyStatus.OperationNotExistException);
 886
 16887                    new RequestFailedExceptionData(Identity.Parse(request.Path), request.Fragment, request.Operation)
 16888                        .Encode(ref encoder);
 16889                    break;
 890                case StatusCode.InternalError:
 7891                    encoder.EncodeReplyStatus(ReplyStatus.UnknownException);
 7892                    encoder.EncodeString(response.ErrorMessage!);
 7893                    break;
 894                case StatusCode.InvalidData:
 1895                    encoder.EncodeReplyStatus(ReplyStatus.InvalidData);
 1896                    encoder.EncodeString(response.ErrorMessage!);
 1897                    break;
 898                case StatusCode.Unauthorized:
 0899                    encoder.EncodeReplyStatus(ReplyStatus.Unauthorized);
 0900                    encoder.EncodeString(response.ErrorMessage!);
 0901                    break;
 902                default:
 2903                    encoder.EncodeReplyStatus(ReplyStatus.UnknownException);
 2904                    encoder.EncodeString(
 2905                        $"{response.ErrorMessage} {{ Original StatusCode = {response.StatusCode} }}");
 2906                    break;
 907            }
 26908        }
 909        else
 1341910        {
 1341911            encoder.EncodeReplyStatus((ReplyStatus)response.StatusCode);
 912
 913            // When IceRPC receives a response, it ignores the response encoding. So this "1.1" is only relevant to
 914            // a ZeroC Ice client that decodes the response. The only Slice encoding such a client can possibly use
 915            // to decode the response payload is 1.1 or 1.0, and we don't care about interop with 1.0.
 1341916            var encapsulationHeader = new EncapsulationHeader(
 1341917                encapsulationSize: payloadSize + 6,
 1341918                payloadEncodingMajor: 1,
 1341919                payloadEncodingMinor: 1);
 1341920            encapsulationHeader.Encode(ref encoder);
 1341921        }
 922
 1367923        int frameSize = encoder.EncodedByteCount + payloadSize;
 1367924        SliceEncoder.EncodeInt32(frameSize, sizePlaceholder);
 1367925    }
 926
 927    /// <summary>Reads the full Ice payload from the given pipe reader.</summary>
 928    private static async ValueTask<ReadOnlySequence<byte>> ReadFullPayloadAsync(
 929        PipeReader payload,
 930        CancellationToken cancellationToken)
 2727931    {
 932        // We use ReadAtLeastAsync instead of ReadAsync to bypass the PauseWriterThreshold when the payload is
 933        // backed by a Pipe.
 2727934        ReadResult readResult = await payload.ReadAtLeastAsync(int.MaxValue, cancellationToken).ConfigureAwait(false);
 935
 2724936        if (readResult.IsCanceled)
 0937        {
 0938            throw new InvalidOperationException("Unexpected call to CancelPendingRead on ice payload.");
 939        }
 940
 2724941        return readResult.IsCompleted ? readResult.Buffer :
 2724942            throw new ArgumentException("The payload size is greater than int.MaxValue.", nameof(payload));
 2724943    }
 944
 945    /// <summary>Acquires exclusive access to _duplexConnectionWriter.</summary>
 946    /// <returns>A <see cref="SemaphoreLock" /> that releases the acquired semaphore in its Dispose method.</returns>
 947    private async ValueTask<SemaphoreLock> AcquireWriteLockAsync(CancellationToken cancellationToken)
 2871948    {
 2871949        SemaphoreLock semaphoreLock = await _writeSemaphore.AcquireAsync(cancellationToken).ConfigureAwait(false);
 950
 951        // _writeException is protected by _writeSemaphore
 2871952        if (_writeException is not null)
 1953        {
 1954            semaphoreLock.Dispose();
 955
 1956            throw new IceRpcException(
 1957                IceRpcError.ConnectionAborted,
 1958                "The connection was aborted because a previous write operation failed.",
 1959                _writeException);
 960        }
 961
 2870962        return semaphoreLock;
 2870963    }
 964
 965    /// <summary>Creates a pipe reader to simplify the reading of a request or response frame. The frame is read fully
 966    /// and buffered into an internal pipe.</summary>
 967    private async ValueTask<PipeReader> CreateFrameReaderAsync(int size, CancellationToken cancellationToken)
 2742968    {
 2742969        var pipe = new Pipe(_pipeOptions);
 970
 971        try
 2742972        {
 2742973            await _duplexConnectionReader.FillBufferWriterAsync(pipe.Writer, size, cancellationToken)
 2742974                .ConfigureAwait(false);
 2742975        }
 0976        catch
 0977        {
 0978            pipe.Reader.Complete();
 0979            throw;
 980        }
 981        finally
 2742982        {
 2742983            pipe.Writer.Complete();
 2742984        }
 985
 2742986        return pipe.Reader;
 2742987    }
 988
 989    private void DecrementDispatchInvocationCount()
 2760990    {
 991        lock (_mutex)
 2760992        {
 2760993            if (--_dispatchInvocationCount == 0)
 1196994            {
 1196995                if (_shutdownTask is not null)
 20996                {
 20997                    _dispatchesAndInvocationsCompleted.TrySetResult();
 20998                }
 999                // We enable the inactivity check in order to complete ShutdownRequested when inactive for too long.
 1000                // _refuseInvocations is true when the connection is either about to be "shutdown requested", or shut
 1001                // down / disposed. We don't need to complete ShutdownRequested in any of these situations.
 11761002                else if (!_refuseInvocations)
 11651003                {
 11651004                    ScheduleInactivityCheck();
 11651005                }
 11961006            }
 27601007        }
 27601008    }
 1009
 1010    /// <summary>Dispatches an incoming request. This method executes in a task spawn from the read frames loop.
 1011    /// </summary>
 1012    private async Task DispatchRequestAsync(IncomingRequest request, int requestId, PipeReader? contextReader)
 13781013    {
 13781014        CancellationToken cancellationToken = request.IsOneway ? _disposedCts.Token : _twowayDispatchesCts.Token;
 1015
 1016        OutgoingResponse? response;
 1017        try
 13781018        {
 1019            // The dispatcher can complete the incoming request payload to release its memory as soon as possible.
 1020            try
 13781021            {
 1022                // _dispatcher.DispatchAsync may very well ignore the cancellation token and we don't want to keep
 1023                // dispatching when the cancellation token is canceled.
 13781024                cancellationToken.ThrowIfCancellationRequested();
 1025
 13781026                response = await _dispatcher.DispatchAsync(request, cancellationToken).ConfigureAwait(false);
 13611027            }
 1028            finally
 13781029            {
 13781030                _dispatchSemaphore?.Release();
 13781031            }
 1032
 13611033            if (response != request.Response)
 11034            {
 11035                throw new InvalidOperationException(
 11036                    "The dispatcher did not return the last response created for this request.");
 1037            }
 13601038        }
 181039        catch when (request.IsOneway)
 01040        {
 1041            // ignored since we're not returning anything
 01042            response = null;
 01043        }
 101044        catch (OperationCanceledException exception) when (exception.CancellationToken == cancellationToken)
 91045        {
 1046            // expected when the connection is disposed or the request is canceled by the peer's shutdown
 91047            response = null;
 91048        }
 91049        catch (Exception exception)
 91050        {
 91051            if (exception is not DispatchException dispatchException)
 51052            {
 51053                dispatchException = new DispatchException(StatusCode.InternalError, innerException: exception);
 51054            }
 91055            response = dispatchException.ToOutgoingResponse(request);
 91056        }
 1057        finally
 13781058        {
 13781059            request.Payload.Complete();
 13781060            contextReader?.Complete();
 1061
 1062            // The field values are now invalid - they point to potentially recycled and reused memory. We
 1063            // replace Fields by an empty dictionary to prevent accidental access to this reused memory.
 13781064            request.Fields = ImmutableDictionary<RequestFieldKey, ReadOnlySequence<byte>>.Empty;
 13781065        }
 1066
 1067        try
 13781068        {
 13781069            if (response is not null)
 13691070            {
 1071                // Read the full response payload. This can take some time so this needs to be done before acquiring
 1072                // the write semaphore.
 13691073                ReadOnlySequence<byte> payload = ReadOnlySequence<byte>.Empty;
 1074
 13691075                if (response.StatusCode <= StatusCode.ApplicationError)
 13451076                {
 1077                    try
 13451078                    {
 13451079                        payload = await ReadFullPayloadAsync(response.Payload, cancellationToken)
 13451080                            .ConfigureAwait(false);
 13421081                    }
 21082                    catch (OperationCanceledException exception) when (exception.CancellationToken == cancellationToken)
 21083                    {
 21084                        throw;
 1085                    }
 11086                    catch (Exception exception)
 11087                    {
 1088                        // We "encode" the exception in the error message.
 1089
 11090                        response = new OutgoingResponse(
 11091                            request,
 11092                            StatusCode.InternalError,
 11093                            "The dispatch failed to read the response payload.",
 11094                            exception);
 11095                    }
 13431096                }
 1097                // else payload remains empty because the payload of a dispatch exception (if any) cannot be sent
 1098                // over ice.
 1099
 13671100                int payloadSize = checked((int)payload.Length);
 1101
 1102                // Wait for writing of other frames to complete.
 13671103                using SemaphoreLock _ = await AcquireWriteLockAsync(cancellationToken).ConfigureAwait(false);
 1104                try
 13671105                {
 13671106                    EncodeResponseHeader(_duplexConnectionWriter, response, request, requestId, payloadSize);
 1107
 1108                    // We write to the duplex connection with _disposedCts.Token instead of cancellationToken.
 1109                    // Canceling this write operation is fatal to the connection.
 13671110                    await _duplexConnectionWriter.WriteAsync(payload, _disposedCts.Token).ConfigureAwait(false);
 13661111                }
 11112                catch (Exception exception)
 11113                {
 11114                    WriteFailed(exception);
 11115                    throw;
 1116                }
 13661117            }
 13751118        }
 31119        catch (OperationCanceledException exception) when (
 31120            exception.CancellationToken == _disposedCts.Token ||
 31121            exception.CancellationToken == cancellationToken)
 31122        {
 1123            // expected when the connection is disposed or the request is canceled by the peer's shutdown
 31124        }
 1125        finally
 13781126        {
 13781127            DecrementDispatchInvocationCount();
 13781128        }
 13781129    }
 1130
 1131    /// <summary>Increments the dispatch-invocation count.</summary>
 1132    /// <remarks>This method must be called with _mutex locked.</remarks>
 1133    private void IncrementDispatchInvocationCount()
 27601134    {
 27601135        if (_dispatchInvocationCount++ == 0)
 11961136        {
 1137            // Cancel inactivity check.
 11961138            _inactivityTimeoutTimer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan);
 11961139        }
 27601140    }
 1141
 1142    private void ScheduleInactivityCheck() =>
 13321143        _inactivityTimeoutTimer.Change(_inactivityTimeout, Timeout.InfiniteTimeSpan);
 1144
 1145    /// <summary>Reads incoming frames and returns successfully when a CloseConnection frame is received or when the
 1146    /// connection is aborted during ShutdownAsync or canceled by DisposeAsync.</summary>
 1147    private async Task ReadFramesAsync(CancellationToken cancellationToken)
 1671148    {
 1671149        await Task.Yield(); // exit mutex lock
 1150
 1151        // Wait for _connectTask (which spawned the task running this method) to complete. This way, we won't dispatch
 1152        // any request until _connectTask has completed successfully, and indirectly we won't make any invocation until
 1153        // _connectTask has completed successfully. The creation of the _readFramesTask is the last action taken by
 1154        // _connectTask and as a result this await can't fail.
 1671155        await _connectTask!.ConfigureAwait(false);
 1156
 1157        try
 1671158        {
 29211159            while (!cancellationToken.IsCancellationRequested)
 29191160            {
 29191161                ReadOnlySequence<byte> buffer = await _duplexConnectionReader.ReadAtLeastAsync(
 29191162                    IceDefinitions.PrologueSize,
 29191163                    cancellationToken).ConfigureAwait(false);
 1164
 1165                // First decode and check the prologue.
 1166
 27771167                ReadOnlySequence<byte> prologueBuffer = buffer.Slice(0, IceDefinitions.PrologueSize);
 1168
 27771169                IcePrologue prologue = SliceEncoding.Slice1.DecodeBuffer(
 27771170                    prologueBuffer,
 55541171                    (ref SliceDecoder decoder) => new IcePrologue(ref decoder));
 1172
 27771173                _duplexConnectionReader.AdvanceTo(prologueBuffer.End);
 1174
 27771175                IceDefinitions.CheckPrologue(prologue);
 27761176                if (prologue.FrameSize > _maxFrameSize)
 11177                {
 11178                    throw new InvalidDataException(
 11179                        $"Received frame with size ({prologue.FrameSize}) greater than max frame size.");
 1180                }
 1181
 27751182                if (prologue.CompressionStatus == 2)
 01183                {
 1184                    // The exception handler calls ReadFailed.
 01185                    throw new IceRpcException(
 01186                        IceRpcError.ConnectionAborted,
 01187                        "The connection was aborted because it received a compressed ice frame, and IceRPC does not supp
 1188                }
 1189
 1190                // Then process the frame based on its type.
 27751191                switch (prologue.FrameType)
 1192                {
 1193                    case IceFrameType.CloseConnection:
 211194                    {
 211195                        if (prologue.FrameSize != IceDefinitions.PrologueSize)
 01196                        {
 01197                            throw new InvalidDataException(
 01198                                $"Received {nameof(IceFrameType.CloseConnection)} frame with unexpected data.");
 1199                        }
 1200
 1201                        lock (_mutex)
 211202                        {
 211203                            RefuseNewInvocations(
 211204                                "The connection was shut down because it received a CloseConnection frame from the peer.
 1205
 1206                            // By exiting the "read frames loop" below, we are refusing new dispatches as well.
 1207
 1208                            // Only one side sends the CloseConnection frame.
 211209                            _sendCloseConnectionFrame = false;
 211210                        }
 1211
 1212                        // Even though we're in the "read frames loop", it's ok to cancel CTS and a "synchronous" TCS
 1213                        // below. We won't be reading anything else so it's ok to run continuations synchronously.
 1214
 1215                        // Abort two-way invocations that are waiting for a response (it will never come).
 211216                        AbortTwowayInvocations(
 211217                            IceRpcError.InvocationCanceled,
 211218                            "The invocation was canceled by the shutdown of the peer.");
 1219
 1220                        // Cancel two-way dispatches since the peer is not interested in the responses. This does not
 1221                        // cancel ongoing writes to _duplexConnection: we don't send incomplete/invalid data.
 211222                        _twowayDispatchesCts.Cancel();
 1223
 1224                        // We keep sending heartbeats. If the shutdown request / shutdown is not fulfilled quickly, they
 1225                        // tell the peer we're still alive and maybe stuck waiting for invocations and dispatches to
 1226                        // complete.
 1227
 1228                        // We request a shutdown that will dispose _duplexConnection once all invocations and dispatches
 1229                        // have completed.
 211230                        _shutdownRequestedTcs.TrySetResult();
 211231                        return;
 1232                    }
 1233
 1234                    case IceFrameType.Request:
 13801235                        await ReadRequestAsync(prologue.FrameSize, cancellationToken).ConfigureAwait(false);
 13801236                        break;
 1237
 1238                    case IceFrameType.RequestBatch:
 1239                        // The exception handler calls ReadFailed.
 01240                        throw new IceRpcException(
 01241                            IceRpcError.ConnectionAborted,
 01242                            "The connection was aborted because it received a batch request, and IceRPC does not support
 1243
 1244                    case IceFrameType.Reply:
 13621245                        await ReadReplyAsync(prologue.FrameSize, cancellationToken).ConfigureAwait(false);
 13621246                        break;
 1247
 1248                    case IceFrameType.ValidateConnection:
 121249                    {
 121250                        if (prologue.FrameSize != IceDefinitions.PrologueSize)
 01251                        {
 01252                            throw new InvalidDataException(
 01253                                $"Received {nameof(IceFrameType.ValidateConnection)} frame with unexpected data.");
 1254                        }
 121255                        break;
 1256                    }
 1257
 1258                    default:
 01259                    {
 01260                        throw new InvalidDataException(
 01261                            $"Received Ice frame with unknown frame type '{prologue.FrameType}'.");
 1262                    }
 1263                }
 27541264            } // while
 21265        }
 571266        catch (OperationCanceledException)
 571267        {
 1268            // canceled by DisposeAsync, no need to throw anything
 571269        }
 851270        catch (IceRpcException exception) when (
 851271            exception.IceRpcError == IceRpcError.ConnectionAborted &&
 851272            _dispatchesAndInvocationsCompleted.Task.IsCompleted)
 601273        {
 1274            // The peer acknowledged receipt of the CloseConnection frame by aborting the duplex connection. Return.
 1275            // See ShutdownAsync.
 601276        }
 251277        catch (IceRpcException exception)
 251278        {
 251279            ReadFailed(exception);
 251280            throw;
 1281        }
 21282        catch (InvalidDataException exception)
 21283        {
 21284            ReadFailed(exception);
 21285            throw new IceRpcException(
 21286                IceRpcError.ConnectionAborted,
 21287                "The connection was aborted by an ice protocol error.",
 21288                exception);
 1289        }
 01290        catch (Exception exception)
 01291        {
 01292            Debug.Fail($"The read frames task completed due to an unhandled exception: {exception}");
 01293            ReadFailed(exception);
 01294            throw;
 1295        }
 1296
 1297        // Aborts all pending two-way invocations. Must be called outside the mutex lock after setting
 1298        // _refuseInvocations to true.
 1299        void AbortTwowayInvocations(IceRpcError error, string message, Exception? exception = null)
 481300        {
 481301            Debug.Assert(_refuseInvocations);
 1302
 1303            // _twowayInvocations is immutable once _refuseInvocations is true.
 1681304            foreach (TaskCompletionSource<PipeReader> responseCompletionSource in _twowayInvocations.Values)
 121305            {
 1306                // _twowayInvocations can hold completed completion sources.
 121307                _ = responseCompletionSource.TrySetException(new IceRpcException(error, message, exception));
 121308            }
 481309        }
 1310
 1311        // Takes appropriate action after a read failure.
 1312        void ReadFailed(Exception exception)
 271313        {
 1314            // We also prevent new one-way invocations even though they don't need to read the connection.
 271315            RefuseNewInvocations("The connection was lost because a read operation failed.");
 1316
 1317            // It's ok to cancel CTS and a "synchronous" TCS below. We won't be reading anything else so it's ok to run
 1318            // continuations synchronously.
 1319
 271320            AbortTwowayInvocations(
 271321                IceRpcError.ConnectionAborted,
 271322                "The invocation was aborted because the connection was lost.",
 271323                exception);
 1324
 1325            // ReadFailed is called when the connection is dead or the peer sent us a non-supported frame (e.g. a
 1326            // batch request). We don't need to allow outstanding two-way dispatches to complete in these situations, so
 1327            // we cancel them to speed-up the shutdown.
 271328            _twowayDispatchesCts.Cancel();
 1329
 1330            lock (_mutex)
 271331            {
 1332                // Don't send a close connection frame since we can't wait for the peer's acknowledgment.
 271333                _sendCloseConnectionFrame = false;
 271334            }
 1335
 271336            _ = _shutdownRequestedTcs.TrySetResult();
 271337        }
 1401338    }
 1339
 1340    /// <summary>Reads a reply (incoming response) and completes the invocation response completion source with this
 1341    /// response. This method executes "synchronously" in the read frames loop.</summary>
 1342    private async Task ReadReplyAsync(int replyFrameSize, CancellationToken cancellationToken)
 13621343    {
 1344        // Read the remainder of the frame immediately into frameReader.
 13621345        PipeReader replyFrameReader = await CreateFrameReaderAsync(
 13621346            replyFrameSize - IceDefinitions.PrologueSize,
 13621347            cancellationToken).ConfigureAwait(false);
 1348
 13621349        bool completeFrameReader = true;
 1350
 1351        try
 13621352        {
 1353            // Read and decode request ID
 13621354            if (!replyFrameReader.TryRead(out ReadResult readResult) || readResult.Buffer.Length < 4)
 01355            {
 01356                throw new InvalidDataException("Received a response with an invalid request ID.");
 1357            }
 1358
 13621359            ReadOnlySequence<byte> requestIdBuffer = readResult.Buffer.Slice(0, 4);
 13621360            int requestId = SliceEncoding.Slice1.DecodeBuffer(
 13621361                requestIdBuffer,
 27241362                (ref SliceDecoder decoder) => decoder.DecodeInt32());
 13621363            replyFrameReader.AdvanceTo(requestIdBuffer.End);
 1364
 1365            lock (_mutex)
 13621366            {
 13621367                if (_twowayInvocations.TryGetValue(
 13621368                    requestId,
 13621369                    out TaskCompletionSource<PipeReader>? responseCompletionSource))
 3511370                {
 1371                    // continuation runs asynchronously
 3511372                    if (responseCompletionSource.TrySetResult(replyFrameReader))
 3511373                    {
 3511374                        completeFrameReader = false;
 3511375                    }
 1376                    // else this invocation just completed and is about to remove itself from _twowayInvocations,
 1377                    // or _twowayInvocations is immutable and contains entries for completed invocations.
 3511378                }
 1379                // else the request ID carried by the response is bogus or corresponds to a request that was previously
 1380                // discarded (for example, because its deadline expired).
 13621381            }
 13621382        }
 1383        finally
 13621384        {
 13621385            if (completeFrameReader)
 10111386            {
 10111387                replyFrameReader.Complete();
 10111388            }
 13621389        }
 13621390    }
 1391
 1392    /// <summary>Reads and then dispatches an incoming request in a separate dispatch task. This method executes
 1393    /// "synchronously" in the read frames loop.</summary>
 1394    private async Task ReadRequestAsync(int requestFrameSize, CancellationToken cancellationToken)
 13801395    {
 1396        // Read the request frame.
 13801397        PipeReader requestFrameReader = await CreateFrameReaderAsync(
 13801398            requestFrameSize - IceDefinitions.PrologueSize,
 13801399            cancellationToken).ConfigureAwait(false);
 1400
 1401        // Decode its header.
 1402        int requestId;
 1403        IceRequestHeader requestHeader;
 13801404        PipeReader? contextReader = null;
 1405        IDictionary<RequestFieldKey, ReadOnlySequence<byte>>? fields;
 13801406        Task? dispatchTask = null;
 1407
 1408        try
 13801409        {
 13801410            if (!requestFrameReader.TryRead(out ReadResult readResult))
 01411            {
 01412                throw new InvalidDataException("Received an invalid request frame.");
 1413            }
 1414
 13801415            Debug.Assert(readResult.IsCompleted);
 1416
 13801417            (requestId, requestHeader, contextReader, int consumed) = DecodeRequestIdAndHeader(readResult.Buffer);
 13801418            requestFrameReader.AdvanceTo(readResult.Buffer.GetPosition(consumed));
 1419
 13801420            if (contextReader is null)
 13731421            {
 13731422                fields = requestHeader.OperationMode == OperationMode.Normal ?
 13731423                    ImmutableDictionary<RequestFieldKey, ReadOnlySequence<byte>>.Empty : _idempotentFields;
 13731424            }
 1425            else
 71426            {
 71427                contextReader.TryRead(out ReadResult result);
 71428                Debug.Assert(result.Buffer.Length > 0 && result.IsCompleted);
 71429                fields = new Dictionary<RequestFieldKey, ReadOnlySequence<byte>>()
 71430                {
 71431                    [RequestFieldKey.Context] = result.Buffer
 71432                };
 1433
 71434                if (requestHeader.OperationMode != OperationMode.Normal)
 01435                {
 1436                    // OperationMode can be Idempotent or Nonmutating.
 01437                    fields[RequestFieldKey.Idempotent] = default;
 01438                }
 71439            }
 1440
 13801441            bool releaseDispatchSemaphore = false;
 13801442            if (_dispatchSemaphore is SemaphoreSlim dispatchSemaphore)
 13801443            {
 1444                // This prevents us from receiving any new frames if we're already dispatching the maximum number
 1445                // of requests. We need to do this in the "accept from network loop" to apply back pressure to the
 1446                // caller.
 1447                try
 13801448                {
 13801449                    await dispatchSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
 13801450                    releaseDispatchSemaphore = true;
 13801451                }
 01452                catch (OperationCanceledException)
 01453                {
 1454                    // and return below
 01455                }
 13801456            }
 1457
 1458            lock (_mutex)
 13801459            {
 13801460                if (_shutdownTask is not null)
 21461                {
 1462                    // The connection is (being) disposed or the connection is shutting down and received a request.
 1463                    // We simply discard it. For a graceful shutdown, the two-way invocation in the peer will throw
 1464                    // IceRpcException(InvocationCanceled). We also discard one-way requests: if we accepted them, they
 1465                    // could delay our shutdown and make it time out.
 21466                    if (releaseDispatchSemaphore)
 21467                    {
 21468                        _dispatchSemaphore!.Release();
 21469                    }
 21470                    return;
 1471                }
 1472
 13781473                IncrementDispatchInvocationCount();
 13781474            }
 1475
 1476            // The scheduling of the task can't be canceled since we want to make sure DispatchRequestAsync will
 1477            // cleanup (decrement _dispatchCount etc.) if DisposeAsync is called. dispatchTask takes ownership of the
 1478            // requestFrameReader and contextReader.
 13781479            dispatchTask = Task.Run(
 13781480                async () =>
 13781481                {
 13781482                    using var request = new IncomingRequest(Protocol.Ice, _connectionContext!)
 13781483                    {
 13781484                        Fields = fields,
 13781485                        Fragment = requestHeader.Fragment,
 13781486                        IsOneway = requestId == 0,
 13781487                        Operation = requestHeader.Operation,
 13781488                        Path = requestHeader.Identity.ToPath(),
 13781489                        Payload = requestFrameReader,
 13781490                    };
 13781491
 13781492                    try
 13781493                    {
 13781494                        await DispatchRequestAsync(
 13781495                            request,
 13781496                            requestId,
 13781497                            contextReader).ConfigureAwait(false);
 13781498                    }
 01499                    catch (IceRpcException)
 01500                    {
 13781501                        // expected when the peer aborts the connection.
 01502                    }
 01503                    catch (Exception exception)
 01504                    {
 13781505                        // With ice, a dispatch cannot throw an exception that comes from the application code:
 13781506                        // any exception thrown when reading the response payload is converted into a DispatchException
 13781507                        // response, and the response header has no fields to encode.
 01508                        Debug.Fail($"ice dispatch {request} failed with an unexpected exception: {exception}");
 01509                        throw;
 13781510                    }
 13781511                },
 13781512                CancellationToken.None);
 13781513        }
 1514        finally
 13801515        {
 13801516            if (dispatchTask is null)
 21517            {
 21518                requestFrameReader.Complete();
 21519                contextReader?.Complete();
 21520            }
 13801521        }
 13801522    }
 1523
 1524    private void RefuseNewInvocations(string message)
 3041525    {
 1526        lock (_mutex)
 3041527        {
 3041528            _refuseInvocations = true;
 3041529            _invocationRefusedMessage ??= message;
 3041530        }
 3041531    }
 1532
 1533    /// <summary>Sends a control frame. It takes care of acquiring and releasing the write lock and calls
 1534    /// <see cref="WriteFailed" /> if a failure occurs while writing to _duplexConnectionWriter.</summary>
 1535    /// <param name="encode">Encodes the control frame.</param>
 1536    /// <param name="cancellationToken">The cancellation token.</param>
 1537    /// <remarks>If the cancellation token is canceled while writing to the duplex connection, the connection is
 1538    /// aborted.</remarks>
 1539    private async ValueTask SendControlFrameAsync(
 1540        Action<IBufferWriter<byte>> encode,
 1541        CancellationToken cancellationToken)
 1221542    {
 1221543        using SemaphoreLock _ = await AcquireWriteLockAsync(cancellationToken).ConfigureAwait(false);
 1544
 1545        try
 1211546        {
 1211547            encode(_duplexConnectionWriter);
 1211548            await _duplexConnectionWriter.FlushAsync(cancellationToken).ConfigureAwait(false);
 1181549        }
 31550        catch (Exception exception)
 31551        {
 31552            WriteFailed(exception);
 31553            throw;
 1554        }
 1181555    }
 1556
 1557    /// <summary>Takes appropriate action after a write failure.</summary>
 1558    /// <remarks>Must be called outside the mutex lock but after acquiring _writeSemaphore.</remarks>
 1559    private void WriteFailed(Exception exception)
 51560    {
 51561        Debug.Assert(_writeException is null);
 51562        _writeException = exception; // protected by _writeSemaphore
 1563
 1564        // We can't send new invocations without writing to the connection.
 51565        RefuseNewInvocations("The connection was lost because a write operation failed.");
 1566
 1567        // We can't send responses so these dispatches can be canceled.
 51568        _twowayDispatchesCts.Cancel();
 1569
 1570        // We don't change _sendClosedConnectionFrame. If the _readFrameTask is still running, we want ShutdownAsync
 1571        // to send CloseConnection - and fail.
 1572
 51573        _ = _shutdownRequestedTcs.TrySetResult();
 51574    }
 1575}