| | | 1 | | // Copyright (c) ZeroC, Inc. |
| | | 2 | | |
| | | 3 | | using System.Diagnostics; |
| | | 4 | | using System.Diagnostics.CodeAnalysis; |
| | | 5 | | using System.IO.Pipelines; |
| | | 6 | | using System.Net; |
| | | 7 | | using System.Threading.Channels; |
| | | 8 | | |
| | | 9 | | namespace IceRpc.Transports.Coloc.Internal; |
| | | 10 | | |
| | | 11 | | /// <summary>The listener implementation for the colocated transport.</summary> |
| | | 12 | | internal class ColocListener : IListener<IDuplexConnection>, IDisposable |
| | | 13 | | { |
| | 1018 | 14 | | public TransportAddress TransportAddress { get; } |
| | | 15 | | |
| | | 16 | | [SuppressMessage( |
| | | 17 | | "Usage", |
| | | 18 | | "CA2213:Disposable fields should be disposed", |
| | | 19 | | Justification = "Disposing this CTS races with AcceptAsync creating a linked token source from its Token; a CTS |
| | 544 | 20 | | private readonly CancellationTokenSource _disposeCts = new(); |
| | | 21 | | private bool _disposed; |
| | | 22 | | private readonly Action<ColocListener> _onDispose; |
| | 544 | 23 | | private readonly Lock _mutex = new(); |
| | | 24 | | private readonly EndPoint _networkAddress; |
| | | 25 | | private readonly PipeOptions _pipeOptions; |
| | | 26 | | |
| | | 27 | | // The channel used by the client connection ConnectAsync method to queue a connection establishment request. A |
| | | 28 | | // client connection establishment request is represented by: |
| | | 29 | | // - a TaskCompletionSource which is completed by AcceptAsync when the connection is accepted. The server connection |
| | | 30 | | // pipe reader is set as the result. ClientColocConnection.ConnectAsync waits on the task completion source task. |
| | | 31 | | // - the client connection pipe reader provided to the server connection when the server connection is created by |
| | | 32 | | // AcceptAsync. |
| | | 33 | | private readonly Channel<(TaskCompletionSource<PipeReader>, PipeReader)> _channel; |
| | | 34 | | |
| | | 35 | | public async Task<(IDuplexConnection, EndPoint)> AcceptAsync(CancellationToken cancellationToken) |
| | 578 | 36 | | { |
| | | 37 | | CancellationTokenSource cts; |
| | | 38 | | lock (_mutex) |
| | 578 | 39 | | { |
| | 578 | 40 | | ObjectDisposedException.ThrowIf(_disposed, this); |
| | 576 | 41 | | cts = CancellationTokenSource.CreateLinkedTokenSource(_disposeCts.Token, cancellationToken); |
| | 576 | 42 | | } |
| | 576 | 43 | | using var _ = cts; |
| | | 44 | | try |
| | 576 | 45 | | { |
| | 576 | 46 | | while (true) |
| | 576 | 47 | | { |
| | 576 | 48 | | (TaskCompletionSource<PipeReader> tcs, PipeReader clientPipeReader) = |
| | 576 | 49 | | await _channel.Reader.ReadAsync(cts.Token).ConfigureAwait(false); |
| | | 50 | | |
| | 496 | 51 | | var serverPipe = new Pipe(_pipeOptions); |
| | 496 | 52 | | if (tcs.TrySetResult(serverPipe.Reader)) |
| | 496 | 53 | | { |
| | 496 | 54 | | var serverConnection = new ServerColocConnection( |
| | 496 | 55 | | TransportAddress, |
| | 496 | 56 | | serverPipe.Writer, |
| | 496 | 57 | | clientPipeReader); |
| | 496 | 58 | | return (serverConnection, _networkAddress); |
| | | 59 | | } |
| | | 60 | | else |
| | 0 | 61 | | { |
| | | 62 | | // The client connection establishment was canceled. |
| | 0 | 63 | | serverPipe.Writer.Complete(); |
| | 0 | 64 | | serverPipe.Reader.Complete(); |
| | 0 | 65 | | } |
| | 0 | 66 | | } |
| | | 67 | | } |
| | 80 | 68 | | catch (OperationCanceledException) |
| | 80 | 69 | | { |
| | 80 | 70 | | cancellationToken.ThrowIfCancellationRequested(); |
| | | 71 | | // The accept operation was canceled because the listener was disposed. |
| | 11 | 72 | | Debug.Assert(_disposeCts.IsCancellationRequested); |
| | 11 | 73 | | throw new ObjectDisposedException($"{typeof(ColocListener)}"); |
| | | 74 | | } |
| | 496 | 75 | | } |
| | | 76 | | |
| | | 77 | | public void Dispose() |
| | 968 | 78 | | { |
| | | 79 | | lock (_mutex) |
| | 968 | 80 | | { |
| | 968 | 81 | | if (_disposed) |
| | 424 | 82 | | { |
| | 424 | 83 | | return; |
| | | 84 | | } |
| | 544 | 85 | | _disposed = true; |
| | | 86 | | |
| | | 87 | | // Notify the owner (e.g. the server transport) so it can release its reference to this listener. |
| | 544 | 88 | | _onDispose(this); |
| | | 89 | | |
| | | 90 | | // Cancel pending AcceptAsync. |
| | 544 | 91 | | _disposeCts.Cancel(); |
| | | 92 | | |
| | | 93 | | // Ensure no more client connection establishment request is queued. |
| | 544 | 94 | | _channel.Writer.Complete(); |
| | | 95 | | |
| | | 96 | | // Complete all the queued client connection establishment requests with IceRpcError.ConnectionRefused. |
| | | 97 | | // Use TrySetException in case the task has been already canceled. |
| | 563 | 98 | | while (_channel.Reader.TryRead(out (TaskCompletionSource<PipeReader> Tcs, PipeReader) item)) |
| | 19 | 99 | | { |
| | 19 | 100 | | item.Tcs.TrySetException(new IceRpcException(IceRpcError.ConnectionRefused)); |
| | 19 | 101 | | } |
| | 544 | 102 | | } |
| | 968 | 103 | | } |
| | | 104 | | |
| | | 105 | | public ValueTask DisposeAsync() |
| | 966 | 106 | | { |
| | 966 | 107 | | Dispose(); |
| | 966 | 108 | | return default; |
| | 966 | 109 | | } |
| | | 110 | | |
| | 544 | 111 | | internal ColocListener( |
| | 544 | 112 | | TransportAddress transportAddress, |
| | 544 | 113 | | Action<ColocListener> onDispose, |
| | 544 | 114 | | ColocTransportOptions colocTransportOptions, |
| | 544 | 115 | | DuplexConnectionOptions duplexConnectionOptions) |
| | 544 | 116 | | { |
| | 544 | 117 | | TransportAddress = transportAddress; |
| | | 118 | | |
| | 544 | 119 | | _onDispose = onDispose; |
| | 544 | 120 | | _networkAddress = new ColocEndPoint(transportAddress); |
| | 544 | 121 | | _pipeOptions = new PipeOptions( |
| | 544 | 122 | | pool: duplexConnectionOptions.Pool, |
| | 544 | 123 | | minimumSegmentSize: duplexConnectionOptions.MinSegmentSize, |
| | 544 | 124 | | pauseWriterThreshold: colocTransportOptions.PauseWriterThreshold, |
| | 544 | 125 | | resumeWriterThreshold: colocTransportOptions.ResumeWriterThreshold, |
| | 544 | 126 | | useSynchronizationContext: false); |
| | | 127 | | |
| | | 128 | | // Create a bounded channel with a capacity that matches the listen backlog, and with |
| | | 129 | | // the default concurrency settings that allow multiple reader and writers. |
| | 544 | 130 | | _channel = Channel.CreateBounded<(TaskCompletionSource<PipeReader>, PipeReader)>( |
| | 544 | 131 | | new BoundedChannelOptions(colocTransportOptions.ListenBacklog)); |
| | 544 | 132 | | } |
| | | 133 | | |
| | | 134 | | /// <summary>Queue client connection establishment requests from the client.</summary> |
| | | 135 | | /// <param name="clientPipeReader">A <see cref="PipeReader"/> for reading from the client connection.</param> |
| | | 136 | | /// <param name="cancellationToken">>A cancellation token that receives the cancellation requests.</param> |
| | | 137 | | /// <param name="serverPipeReaderTask">A task that returns a <see cref="PipeReader"/> for reading from the server |
| | | 138 | | /// connection.</param> |
| | | 139 | | /// <returns>Returns true if the connection establishment request has been queue otherwise, false.</returns> |
| | | 140 | | internal bool TryQueueConnect( |
| | | 141 | | PipeReader clientPipeReader, |
| | | 142 | | CancellationToken cancellationToken, |
| | | 143 | | [NotNullWhen(true)] out Task<PipeReader>? serverPipeReaderTask) |
| | 518 | 144 | | { |
| | | 145 | | // Create a tcs that is completed by AcceptAsync when accepts the corresponding connection, at which point |
| | | 146 | | // the client side connect operation will complete. |
| | | 147 | | // We use RunContinuationsAsynchronously to avoid the ConnectAsync continuation end up running in the AcceptAsyn |
| | | 148 | | // loop that completes this tcs. |
| | 518 | 149 | | var tcs = new TaskCompletionSource<PipeReader>(TaskCreationOptions.RunContinuationsAsynchronously); |
| | 518 | 150 | | if (_channel.Writer.TryWrite((tcs, clientPipeReader))) |
| | 515 | 151 | | { |
| | 551 | 152 | | cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken)); |
| | 515 | 153 | | serverPipeReaderTask = tcs.Task; |
| | 515 | 154 | | return true; |
| | | 155 | | } |
| | | 156 | | else |
| | 3 | 157 | | { |
| | 3 | 158 | | serverPipeReaderTask = null; |
| | 3 | 159 | | return false; |
| | | 160 | | } |
| | 518 | 161 | | } |
| | | 162 | | } |