| | | 1 | | // Copyright (c) ZeroC, Inc. |
| | | 2 | | |
| | | 3 | | using System.Diagnostics; |
| | | 4 | | using System.Diagnostics.CodeAnalysis; |
| | | 5 | | using System.IO.Pipelines; |
| | | 6 | | using System.Net; |
| | | 7 | | using System.Threading.Channels; |
| | | 8 | | |
| | | 9 | | namespace IceRpc.Transports.Coloc.Internal; |
| | | 10 | | |
| | | 11 | | /// <summary>The listener implementation for the colocated transport.</summary> |
| | | 12 | | internal class ColocListener : IListener<IDuplexConnection> |
| | | 13 | | { |
| | 962 | 14 | | public TransportAddress TransportAddress { get; } |
| | | 15 | | |
| | 516 | 16 | | private readonly CancellationTokenSource _disposeCts = new(); |
| | | 17 | | private readonly Action<ColocListener> _onDispose; |
| | | 18 | | private readonly EndPoint _networkAddress; |
| | | 19 | | private readonly PipeOptions _pipeOptions; |
| | | 20 | | |
| | | 21 | | // The channel used by the client connection ConnectAsync method to queue a connection establishment request. A |
| | | 22 | | // client connection establishment request is represented by: |
| | | 23 | | // - a TaskCompletionSource which is completed by AcceptAsync when the connection is accepted. The server connection |
| | | 24 | | // pipe reader is set as the result. ClientColocConnection.ConnectAsync waits on the task completion source task. |
| | | 25 | | // - the client connection pipe reader provided to the server connection when the server connection is created by |
| | | 26 | | // AcceptAsync. |
| | | 27 | | private readonly Channel<(TaskCompletionSource<PipeReader>, PipeReader)> _channel; |
| | | 28 | | |
| | | 29 | | public async Task<(IDuplexConnection, EndPoint)> AcceptAsync(CancellationToken cancellationToken) |
| | 547 | 30 | | { |
| | 547 | 31 | | ObjectDisposedException.ThrowIf(_disposeCts.IsCancellationRequested, this); |
| | | 32 | | |
| | 545 | 33 | | using var cts = CancellationTokenSource.CreateLinkedTokenSource(_disposeCts.Token, cancellationToken); |
| | | 34 | | try |
| | 545 | 35 | | { |
| | 545 | 36 | | while (true) |
| | 545 | 37 | | { |
| | 545 | 38 | | (TaskCompletionSource<PipeReader> tcs, PipeReader clientPipeReader) = |
| | 545 | 39 | | await _channel.Reader.ReadAsync(cts.Token).ConfigureAwait(false); |
| | | 40 | | |
| | 468 | 41 | | var serverPipe = new Pipe(_pipeOptions); |
| | 468 | 42 | | if (tcs.TrySetResult(serverPipe.Reader)) |
| | 468 | 43 | | { |
| | 468 | 44 | | var serverConnection = new ServerColocConnection( |
| | 468 | 45 | | TransportAddress, |
| | 468 | 46 | | serverPipe.Writer, |
| | 468 | 47 | | clientPipeReader); |
| | 468 | 48 | | return (serverConnection, _networkAddress); |
| | | 49 | | } |
| | | 50 | | else |
| | 0 | 51 | | { |
| | | 52 | | // The client connection establishment was canceled. |
| | 0 | 53 | | serverPipe.Writer.Complete(); |
| | 0 | 54 | | serverPipe.Reader.Complete(); |
| | 0 | 55 | | } |
| | 0 | 56 | | } |
| | | 57 | | } |
| | 77 | 58 | | catch (OperationCanceledException) |
| | 77 | 59 | | { |
| | 77 | 60 | | cancellationToken.ThrowIfCancellationRequested(); |
| | | 61 | | // The accept operation was canceled because the listener was disposed. |
| | 11 | 62 | | Debug.Assert(_disposeCts.IsCancellationRequested); |
| | 11 | 63 | | throw new ObjectDisposedException($"{typeof(ColocListener)}"); |
| | | 64 | | } |
| | 468 | 65 | | } |
| | | 66 | | |
| | | 67 | | public ValueTask DisposeAsync() |
| | 919 | 68 | | { |
| | 919 | 69 | | if (_disposeCts.IsCancellationRequested) |
| | 405 | 70 | | { |
| | | 71 | | // Dispose already called. |
| | 405 | 72 | | return default; |
| | | 73 | | } |
| | | 74 | | |
| | | 75 | | // Notify the owner (e.g. the server transport) so it can release its reference to this listener. |
| | 514 | 76 | | _onDispose(this); |
| | | 77 | | |
| | | 78 | | // Cancel pending AcceptAsync. |
| | 514 | 79 | | _disposeCts.Cancel(); |
| | | 80 | | |
| | | 81 | | // Ensure no more client connection establishment request is queued. |
| | 514 | 82 | | _channel.Writer.Complete(); |
| | | 83 | | |
| | | 84 | | // Complete all the queued client connection establishment requests with IceRpcError.ConnectionRefused. Use |
| | | 85 | | // TrySetException in case the task has been already canceled. |
| | 533 | 86 | | while (_channel.Reader.TryRead(out (TaskCompletionSource<PipeReader> Tcs, PipeReader) item)) |
| | 19 | 87 | | { |
| | 19 | 88 | | item.Tcs.TrySetException(new IceRpcException(IceRpcError.ConnectionRefused)); |
| | 19 | 89 | | } |
| | | 90 | | |
| | 514 | 91 | | _disposeCts.Dispose(); |
| | | 92 | | |
| | 514 | 93 | | return default; |
| | 919 | 94 | | } |
| | | 95 | | |
| | 516 | 96 | | internal ColocListener( |
| | 516 | 97 | | TransportAddress transportAddress, |
| | 516 | 98 | | Action<ColocListener> onDispose, |
| | 516 | 99 | | ColocTransportOptions colocTransportOptions, |
| | 516 | 100 | | DuplexConnectionOptions duplexConnectionOptions) |
| | 516 | 101 | | { |
| | 516 | 102 | | TransportAddress = transportAddress; |
| | | 103 | | |
| | 516 | 104 | | _onDispose = onDispose; |
| | 516 | 105 | | _networkAddress = new ColocEndPoint(transportAddress); |
| | 516 | 106 | | _pipeOptions = new PipeOptions( |
| | 516 | 107 | | pool: duplexConnectionOptions.Pool, |
| | 516 | 108 | | minimumSegmentSize: duplexConnectionOptions.MinSegmentSize, |
| | 516 | 109 | | pauseWriterThreshold: colocTransportOptions.PauseWriterThreshold, |
| | 516 | 110 | | resumeWriterThreshold: colocTransportOptions.ResumeWriterThreshold, |
| | 516 | 111 | | useSynchronizationContext: false); |
| | | 112 | | |
| | | 113 | | // Create a bounded channel with a capacity that matches the listen backlog, and with |
| | | 114 | | // the default concurrency settings that allow multiple reader and writers. |
| | 516 | 115 | | _channel = Channel.CreateBounded<(TaskCompletionSource<PipeReader>, PipeReader)>( |
| | 516 | 116 | | new BoundedChannelOptions(colocTransportOptions.ListenBacklog)); |
| | 516 | 117 | | } |
| | | 118 | | |
| | | 119 | | /// <summary>Queue client connection establishment requests from the client.</summary> |
| | | 120 | | /// <param name="clientPipeReader">A <see cref="PipeReader"/> for reading from the client connection.</param> |
| | | 121 | | /// <param name="cancellationToken">>A cancellation token that receives the cancellation requests.</param> |
| | | 122 | | /// <param name="serverPipeReaderTask">A task that returns a <see cref="PipeReader"/> for reading from the server |
| | | 123 | | /// connection.</param> |
| | | 124 | | /// <returns>Returns true if the connection establishment request has been queue otherwise, false.</returns> |
| | | 125 | | internal bool TryQueueConnect( |
| | | 126 | | PipeReader clientPipeReader, |
| | | 127 | | CancellationToken cancellationToken, |
| | | 128 | | [NotNullWhen(true)] out Task<PipeReader>? serverPipeReaderTask) |
| | 490 | 129 | | { |
| | | 130 | | // Create a tcs that is completed by AcceptAsync when accepts the corresponding connection, at which point |
| | | 131 | | // the client side connect operation will complete. |
| | | 132 | | // We use RunContinuationsAsynchronously to avoid the ConnectAsync continuation end up running in the AcceptAsyn |
| | | 133 | | // loop that completes this tcs. |
| | 490 | 134 | | var tcs = new TaskCompletionSource<PipeReader>(TaskCreationOptions.RunContinuationsAsynchronously); |
| | 490 | 135 | | if (_channel.Writer.TryWrite((tcs, clientPipeReader))) |
| | 487 | 136 | | { |
| | 523 | 137 | | cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken)); |
| | 487 | 138 | | serverPipeReaderTask = tcs.Task; |
| | 487 | 139 | | return true; |
| | | 140 | | } |
| | | 141 | | else |
| | 3 | 142 | | { |
| | 3 | 143 | | serverPipeReaderTask = null; |
| | 3 | 144 | | return false; |
| | | 145 | | } |
| | 490 | 146 | | } |
| | | 147 | | } |