| | 1 | | // Copyright (c) ZeroC, Inc. |
| | 2 | |
|
| | 3 | | using IceRpc.Transports; |
| | 4 | | using System.Buffers; |
| | 5 | | using System.Diagnostics; |
| | 6 | |
|
| | 7 | | namespace IceRpc.Internal; |
| | 8 | |
|
| | 9 | | /// <summary>Decorates <see cref="ReadAsync" /> to fail if no byte is received for over readIdleTimeout. Also decorates |
| | 10 | | /// <see cref="WriteAsync" /> to send a heartbeat (writeIdleTimeout / 2) after a successful write. Both sides of the |
| | 11 | | /// connection are expected to use the same idle timeouts.</summary> |
| | 12 | | internal class IceDuplexConnectionDecorator : IDuplexConnection |
| | 13 | | { |
| | 14 | | private readonly IDuplexConnection _decoratee; |
| 376 | 15 | | private readonly CancellationTokenSource _readCts = new(); |
| | 16 | | private readonly TimeSpan _readIdleTimeout; |
| | 17 | | private readonly TimeSpan _writeIdleTimeout; |
| | 18 | | private readonly Timer _writeTimer; |
| | 19 | |
|
| | 20 | | public Task<TransportConnectionInformation> ConnectAsync(CancellationToken cancellationToken) => |
| 197 | 21 | | _decoratee.ConnectAsync(cancellationToken); |
| | 22 | |
|
| | 23 | | public void Dispose() |
| 410 | 24 | | { |
| 410 | 25 | | _decoratee.Dispose(); |
| 410 | 26 | | _readCts.Dispose(); |
| | 27 | |
|
| | 28 | | // Using Dispose is fine, there's no need to wait for the sendHeartbeat to complete if it's running. |
| 410 | 29 | | _writeTimer.Dispose(); |
| 410 | 30 | | } |
| | 31 | |
|
| | 32 | | public ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken) |
| 38897 | 33 | | { |
| 38897 | 34 | | return _readIdleTimeout == Timeout.InfiniteTimeSpan ? |
| 38897 | 35 | | _decoratee.ReadAsync(buffer, cancellationToken) : |
| 38897 | 36 | | PerformReadAsync(); |
| | 37 | |
|
| | 38 | | async ValueTask<int> PerformReadAsync() |
| 38897 | 39 | | { |
| | 40 | | try |
| 38897 | 41 | | { |
| 38897 | 42 | | using CancellationTokenRegistration _ = cancellationToken.UnsafeRegister( |
| 161 | 43 | | cts => ((CancellationTokenSource)cts!).Cancel(), |
| 38897 | 44 | | _readCts); |
| 38897 | 45 | | _readCts.CancelAfter(_readIdleTimeout); // enable idle timeout before reading |
| 38897 | 46 | | return await _decoratee.ReadAsync(buffer, _readCts.Token).ConfigureAwait(false); |
| | 47 | | } |
| 131 | 48 | | catch (OperationCanceledException) |
| 131 | 49 | | { |
| 131 | 50 | | cancellationToken.ThrowIfCancellationRequested(); |
| | 51 | |
|
| 4 | 52 | | throw new IceRpcException( |
| 4 | 53 | | IceRpcError.ConnectionIdle, |
| 4 | 54 | | $"The connection did not receive any bytes for over {_readIdleTimeout.TotalSeconds} s."); |
| | 55 | | } |
| | 56 | | finally |
| 38897 | 57 | | { |
| 38897 | 58 | | _readCts.CancelAfter(Timeout.InfiniteTimeSpan); // disable idle timeout if not canceled |
| 38897 | 59 | | } |
| 38597 | 60 | | } |
| 38897 | 61 | | } |
| | 62 | |
|
| | 63 | | public Task ShutdownWriteAsync(CancellationToken cancellationToken) => |
| 0 | 64 | | _decoratee.ShutdownWriteAsync(cancellationToken); |
| | 65 | |
|
| | 66 | | public ValueTask WriteAsync(ReadOnlySequence<byte> buffer, CancellationToken cancellationToken) |
| 5638 | 67 | | { |
| 5638 | 68 | | return _writeIdleTimeout == Timeout.InfiniteTimeSpan ? |
| 5638 | 69 | | _decoratee.WriteAsync(buffer, cancellationToken) : |
| 5638 | 70 | | PerformWriteAsync(); |
| | 71 | |
|
| | 72 | | async ValueTask PerformWriteAsync() |
| 5638 | 73 | | { |
| | 74 | | // No need to send a heartbeat now since we're about to write. |
| 5638 | 75 | | CancelWriteTimer(); |
| | 76 | |
|
| 5638 | 77 | | await _decoratee.WriteAsync(buffer, cancellationToken).ConfigureAwait(false); |
| | 78 | |
|
| | 79 | | // After each successful write, we schedule a heartbeat at _writeIdleTimeout / 2 in the future. |
| | 80 | | // Since each heartbeat is itself a write, if there is no application activity at all, we'll send successive |
| | 81 | | // heartbeats at _writeIdleTimeout / 2 intervals. |
| 5628 | 82 | | RescheduleWriteTimer(); |
| 5628 | 83 | | } |
| 5638 | 84 | | } |
| | 85 | |
|
| | 86 | | /// <summary>Constructs a decorator that ensures a call to <see cref="ReadAsync" /> will fail after readIdleTimeout. |
| | 87 | | /// This decorator also schedules a heartbeat after each write (see <see cref="RescheduleWriteTimer" />).</summary> |
| 376 | 88 | | internal IceDuplexConnectionDecorator( |
| 376 | 89 | | IDuplexConnection decoratee, |
| 376 | 90 | | TimeSpan readIdleTimeout, |
| 376 | 91 | | TimeSpan writeIdleTimeout, |
| 376 | 92 | | Action sendHeartbeat) |
| 376 | 93 | | { |
| 376 | 94 | | Debug.Assert(writeIdleTimeout != Timeout.InfiniteTimeSpan); |
| 376 | 95 | | _decoratee = decoratee; |
| 376 | 96 | | _readIdleTimeout = readIdleTimeout; // can be infinite i.e. disabled |
| 376 | 97 | | _writeIdleTimeout = writeIdleTimeout; |
| 401 | 98 | | _writeTimer = new Timer(_ => sendHeartbeat()); |
| | 99 | | // We can't schedule the initial heartbeat yet. The heartbeat is an ice protocol frame; we can send it only once |
| | 100 | | // the connection is connected at the ice protocol level. |
| 376 | 101 | | } |
| | 102 | |
|
| | 103 | | /// <summary>Schedules the initial heartbeat. Called by a client IceProtocolConnection after it receives the |
| | 104 | | /// initial ValidateConnection frame from the server.</summary> |
| 159 | 105 | | internal void ScheduleHeartbeat() => RescheduleWriteTimer(); |
| | 106 | |
|
| | 107 | | /// <summary>Cancels the write timer.</summary> |
| 5638 | 108 | | private void CancelWriteTimer() => _writeTimer.Change(Timeout.InfiniteTimeSpan, Timeout.InfiniteTimeSpan); |
| | 109 | |
|
| | 110 | | /// <summary>Schedules or reschedules the write timer. We send a heartbeat when this timer expires.</summary> |
| 5787 | 111 | | private void RescheduleWriteTimer() => _writeTimer.Change(_writeIdleTimeout / 2, Timeout.InfiniteTimeSpan); |
| | 112 | | } |