| | 1 | | // Copyright (c) ZeroC, Inc. |
| | 2 | |
|
| | 3 | | using System.Diagnostics; |
| | 4 | | using System.Diagnostics.CodeAnalysis; |
| | 5 | | using System.IO.Pipelines; |
| | 6 | | using System.Net; |
| | 7 | | using System.Threading.Channels; |
| | 8 | |
|
| | 9 | | namespace IceRpc.Transports.Coloc.Internal; |
| | 10 | |
|
| | 11 | | /// <summary>The listener implementation for the colocated transport.</summary> |
| | 12 | | internal class ColocListener : IListener<IDuplexConnection> |
| | 13 | | { |
| 2897 | 14 | | public ServerAddress ServerAddress { get; } |
| | 15 | |
|
| 986 | 16 | | private readonly CancellationTokenSource _disposeCts = new(); |
| | 17 | | private readonly EndPoint _networkAddress; |
| | 18 | | private readonly PipeOptions _pipeOptions; |
| | 19 | |
|
| | 20 | | // The channel used by the client connection ConnectAsync method to queue a connection establishment request. A |
| | 21 | | // client connection establishment request is represented by: |
| | 22 | | // - a TaskCompletionSource which is completed by AcceptAsync when the connection is accepted. The server connection |
| | 23 | | // pipe reader is set as the result. ClientColocConnection.ConnectAsync waits on the task completion source task. |
| | 24 | | // - the client connection pipe reader provided to the server connection when the server connection is created by |
| | 25 | | // AcceptAsync. |
| | 26 | | private readonly Channel<(TaskCompletionSource<PipeReader>, PipeReader)> _channel; |
| | 27 | |
|
| | 28 | | public async Task<(IDuplexConnection, EndPoint)> AcceptAsync(CancellationToken cancellationToken) |
| 1032 | 29 | | { |
| 1032 | 30 | | ObjectDisposedException.ThrowIf(_disposeCts.IsCancellationRequested, this); |
| | 31 | |
|
| 1028 | 32 | | using var cts = CancellationTokenSource.CreateLinkedTokenSource(_disposeCts.Token, cancellationToken); |
| | 33 | | try |
| 1028 | 34 | | { |
| 1030 | 35 | | while (true) |
| 1030 | 36 | | { |
| 1030 | 37 | | (TaskCompletionSource<PipeReader> tcs, PipeReader clientPipeReader) = |
| 1030 | 38 | | await _channel.Reader.ReadAsync(cts.Token).ConfigureAwait(false); |
| | 39 | |
|
| 894 | 40 | | var serverPipe = new Pipe(_pipeOptions); |
| 894 | 41 | | if (tcs.TrySetResult(serverPipe.Reader)) |
| 892 | 42 | | { |
| 892 | 43 | | var serverConnection = new ServerColocConnection( |
| 892 | 44 | | ServerAddress, |
| 892 | 45 | | serverPipe.Writer, |
| 892 | 46 | | clientPipeReader); |
| 892 | 47 | | return (serverConnection, _networkAddress); |
| | 48 | | } |
| | 49 | | else |
| 2 | 50 | | { |
| | 51 | | // The client connection establishment was canceled. |
| 2 | 52 | | serverPipe.Writer.Complete(); |
| 2 | 53 | | serverPipe.Reader.Complete(); |
| 2 | 54 | | } |
| 2 | 55 | | } |
| | 56 | | } |
| 136 | 57 | | catch (OperationCanceledException) |
| 136 | 58 | | { |
| 136 | 59 | | cancellationToken.ThrowIfCancellationRequested(); |
| | 60 | | // The accept operation was canceled because the listener was disposed. |
| 21 | 61 | | Debug.Assert(_disposeCts.IsCancellationRequested); |
| 21 | 62 | | throw new ObjectDisposedException($"{typeof(ColocListener)}"); |
| | 63 | | } |
| 892 | 64 | | } |
| | 65 | |
|
| | 66 | | public ValueTask DisposeAsync() |
| 1766 | 67 | | { |
| 1766 | 68 | | if (_disposeCts.IsCancellationRequested) |
| 784 | 69 | | { |
| | 70 | | // Dispose already called. |
| 784 | 71 | | return default; |
| | 72 | | } |
| | 73 | |
|
| | 74 | | // Cancel pending AcceptAsync. |
| 982 | 75 | | _disposeCts.Cancel(); |
| | 76 | |
|
| | 77 | | // Ensure no more client connection establishment request is queued. |
| 982 | 78 | | _channel.Writer.Complete(); |
| | 79 | |
|
| | 80 | | // Complete all the queued client connection establishment requests with IceRpcError.ConnectionRefused. Use |
| | 81 | | // TrySetException in case the task has been already canceled. |
| 1019 | 82 | | while (_channel.Reader.TryRead(out (TaskCompletionSource<PipeReader> Tcs, PipeReader) item)) |
| 37 | 83 | | { |
| 37 | 84 | | item.Tcs.TrySetException(new IceRpcException(IceRpcError.ConnectionRefused)); |
| 37 | 85 | | } |
| | 86 | |
|
| 982 | 87 | | _disposeCts.Dispose(); |
| | 88 | |
|
| 982 | 89 | | return default; |
| 1766 | 90 | | } |
| | 91 | |
|
| 986 | 92 | | internal ColocListener( |
| 986 | 93 | | ServerAddress serverAddress, |
| 986 | 94 | | ColocTransportOptions colocTransportOptions, |
| 986 | 95 | | DuplexConnectionOptions duplexConnectionOptions) |
| 986 | 96 | | { |
| 986 | 97 | | ServerAddress = serverAddress; |
| | 98 | |
|
| 986 | 99 | | _networkAddress = new ColocEndPoint(serverAddress); |
| 986 | 100 | | _pipeOptions = new PipeOptions( |
| 986 | 101 | | pool: duplexConnectionOptions.Pool, |
| 986 | 102 | | minimumSegmentSize: duplexConnectionOptions.MinSegmentSize, |
| 986 | 103 | | pauseWriterThreshold: colocTransportOptions.PauseWriterThreshold, |
| 986 | 104 | | resumeWriterThreshold: colocTransportOptions.ResumeWriterThreshold, |
| 986 | 105 | | useSynchronizationContext: false); |
| | 106 | |
|
| | 107 | | // Create a bounded channel with a capacity that matches the listen backlog, and with |
| | 108 | | // the default concurrency settings that allow multiple reader and writers. |
| 986 | 109 | | _channel = Channel.CreateBounded<(TaskCompletionSource<PipeReader>, PipeReader)>( |
| 986 | 110 | | new BoundedChannelOptions(colocTransportOptions.ListenBacklog)); |
| 986 | 111 | | } |
| | 112 | |
|
| | 113 | | /// <summary>Queue client connection establishment requests from the client.</summary> |
| | 114 | | /// <param name="clientPipeReader">A <see cref="PipeReader"/> for reading from the client connection.</param> |
| | 115 | | /// <param name="cancellationToken">>A cancellation token that receives the cancellation requests.</param> |
| | 116 | | /// <param name="serverPipeReaderTask">A task that returns a <see cref="PipeReader"/> for reading from the server |
| | 117 | | /// connection.</param> |
| | 118 | | /// <returns>Returns true if the connection establishment request has been queue otherwise, false.</returns> |
| | 119 | | internal bool TryQueueConnect( |
| | 120 | | PipeReader clientPipeReader, |
| | 121 | | CancellationToken cancellationToken, |
| | 122 | | [NotNullWhen(true)] out Task<PipeReader>? serverPipeReaderTask) |
| 943 | 123 | | { |
| | 124 | | // Create a tcs that is completed by AcceptAsync when accepts the corresponding connection, at which point |
| | 125 | | // the client side connect operation will complete. |
| | 126 | | // We use RunContinuationsAsynchronously to avoid the ConnectAsync continuation end up running in the AcceptAsyn |
| | 127 | | // loop that completes this tcs. |
| 943 | 128 | | var tcs = new TaskCompletionSource<PipeReader>(TaskCreationOptions.RunContinuationsAsynchronously); |
| 943 | 129 | | if (_channel.Writer.TryWrite((tcs, clientPipeReader))) |
| 931 | 130 | | { |
| 1003 | 131 | | cancellationToken.Register(() => tcs.TrySetCanceled(cancellationToken)); |
| 931 | 132 | | serverPipeReaderTask = tcs.Task; |
| 931 | 133 | | return true; |
| | 134 | | } |
| | 135 | | else |
| 12 | 136 | | { |
| 12 | 137 | | serverPipeReaderTask = null; |
| 12 | 138 | | return false; |
| | 139 | | } |
| 943 | 140 | | } |
| | 141 | | } |