< Summary

Information
Class: IceRpc.Protobuf.RpcMethods.InvokerExtensions
Assembly: IceRpc.Protobuf
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Protobuf/RpcMethods/InvokerExtensions.cs
Tag: 1321_24790053727
Line coverage
86%
Covered lines: 102
Uncovered lines: 16
Coverable lines: 118
Total lines: 281
Line coverage: 86.4%
Branch coverage
84%
Covered branches: 27
Total branches: 32
Branch coverage: 84.3%
Method coverage
100%
Covered methods: 7
Fully covered methods: 3
Total methods: 7
Method coverage: 100%
Full method coverage: 42.8%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.cctor()100%11100%
InvokeUnaryAsync(...)87.5%9878.94%
InvokeClientStreamingAsync(...)75%4477.77%
InvokeServerStreamingAsync(...)75%9878.94%
InvokeBidiStreamingAsync(...)75%4477.77%
ReceiveResponseAsync()100%44100%
ReceiveStreamingResponseAsync()100%44100%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Protobuf/RpcMethods/InvokerExtensions.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using Google.Protobuf;
 4using IceRpc.Features;
 5using IceRpc.Protobuf.RpcMethods.Internal;
 6using System.Collections.Immutable;
 7using System.IO.Pipelines;
 8
 9namespace IceRpc.Protobuf.RpcMethods;
 10
 11/// <summary>Provides extension methods for <see cref="IInvoker" />.</summary>
 12public static class InvokerExtensions
 13{
 114    private static readonly IDictionary<RequestFieldKey, OutgoingFieldValue> _idempotentFields =
 115        new Dictionary<RequestFieldKey, OutgoingFieldValue>
 116        {
 117            [RequestFieldKey.Idempotent] = default
 118        }.ToImmutableDictionary();
 19
 20    /// <summary>Sends a request to a service and decodes the response. This method is for Protobuf unary RPCs.
 21    /// </summary>
 22    /// <typeparam name="TOutput">The type of the output message.</typeparam>
 23    /// <param name="invoker">The invoker used to send the request.</param>
 24    /// <param name="serviceAddress">The address of the target service.</param>
 25    /// <param name="operation">The name of the operation, as specified in Protobuf.</param>
 26    /// <param name="inputMessage">The input message to encode in the request payload.</param>
 27    /// <param name="messageParser">The <see cref="MessageParser{T}"/> used to decode the response payload.</param>
 28    /// <param name="encodeOptions">The options to customize the encoding of the request payload.</param>
 29    /// <param name="features">The invocation features.</param>
 30    /// <param name="idempotent">When <see langword="true" />, the request is idempotent.</param>
 31    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 32    /// <returns>The operation's return value.</returns>
 33    public static Task<TOutput> InvokeUnaryAsync<TOutput>(
 34        this IInvoker invoker,
 35        ServiceAddress serviceAddress,
 36        string operation,
 37        IMessage inputMessage,
 38        MessageParser<TOutput> messageParser,
 39        ProtobufEncodeOptions? encodeOptions = null,
 40        IFeatureCollection? features = null,
 41        bool idempotent = false,
 42        CancellationToken cancellationToken = default) where TOutput : IMessage<TOutput>
 1243    {
 1244        var request = new OutgoingRequest(serviceAddress)
 1245        {
 1246            Features = features ?? FeatureCollection.Empty,
 1247            Fields = idempotent ?
 1248                _idempotentFields : ImmutableDictionary<RequestFieldKey, OutgoingFieldValue>.Empty,
 1249            Operation = operation,
 1250            Payload = inputMessage.EncodeAsLengthPrefixedMessage(
 1251                encodeOptions?.PipeOptions ?? ProtobufEncodeOptions.Default.PipeOptions),
 1252        };
 53
 54        Task<IncomingResponse> responseTask;
 55        try
 1256        {
 1257            responseTask = invoker.InvokeAsync(request, cancellationToken);
 1258        }
 059        catch
 060        {
 061            request.Dispose();
 062            throw;
 63        }
 64
 65        // ReceiveResponseAsync is responsible for disposing the request
 1266        return ReceiveResponseAsync(messageParser, responseTask, request, cancellationToken);
 1267    }
 68
 69    /// <summary>Sends a request to a service and decodes the response. This method is for Protobuf client-streaming
 70    /// RPCs.</summary>
 71    /// <typeparam name="TInput">The type of the input message.</typeparam>
 72    /// <typeparam name="TOutput">The type of the output message.</typeparam>
 73    /// <param name="invoker">The invoker used to send the request.</param>
 74    /// <param name="serviceAddress">The address of the target service.</param>
 75    /// <param name="operation">The name of the operation, as specified in Protobuf.</param>
 76    /// <param name="stream">The stream of input message to encode in the request payload continuation.</param>
 77    /// <param name="messageParser">The <see cref="MessageParser{T}"/> used to decode the response payload.</param>
 78    /// <param name="encodeOptions">The options to customize the encoding of the request payload continuation.</param>
 79    /// <param name="features">The invocation features.</param>
 80    /// <param name="idempotent">When <see langword="true" />, the request is idempotent.</param>
 81    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 82    /// <returns>The operation's return value.</returns>
 83    public static Task<TOutput> InvokeClientStreamingAsync<TInput, TOutput>(
 84        this IInvoker invoker,
 85        ServiceAddress serviceAddress,
 86        string operation,
 87        IAsyncEnumerable<TInput> stream,
 88        MessageParser<TOutput> messageParser,
 89        ProtobufEncodeOptions? encodeOptions = null,
 90        IFeatureCollection? features = null,
 91        bool idempotent = false,
 92        CancellationToken cancellationToken = default) where TInput : IMessage<TInput>
 93                                                       where TOutput : IMessage<TOutput>
 494    {
 495        var request = new OutgoingRequest(serviceAddress)
 496        {
 497            Features = features ?? FeatureCollection.Empty,
 498            Fields = idempotent ?
 499                _idempotentFields : ImmutableDictionary<RequestFieldKey, OutgoingFieldValue>.Empty,
 4100            Operation = operation,
 4101            PayloadContinuation = stream.ToPipeReader(encodeOptions),
 4102        };
 103
 104        Task<IncomingResponse> responseTask;
 105        try
 4106        {
 4107            responseTask = invoker.InvokeAsync(request, cancellationToken);
 4108        }
 0109        catch
 0110        {
 0111            request.Dispose();
 0112            throw;
 113        }
 114
 115        // ReceiveResponseAsync is responsible for disposing the request
 4116        return ReceiveResponseAsync(messageParser, responseTask, request, cancellationToken);
 4117    }
 118
 119    /// <summary>Sends a request to a service and decodes the response. This method is for Protobuf server-streaming
 120    /// RPCs.</summary>
 121    /// <typeparam name="TOutput">The type of the output message.</typeparam>
 122    /// <param name="invoker">The invoker used to send the request.</param>
 123    /// <param name="serviceAddress">The address of the target service.</param>
 124    /// <param name="operation">The name of the operation, as specified in Protobuf.</param>
 125    /// <param name="inputMessage">The input message to encode in the request payload.</param>
 126    /// <param name="messageParser">The <see cref="MessageParser{T}"/> used to decode the response payload.</param>
 127    /// <param name="encodeOptions">The options to customize the encoding of the request payload.</param>
 128    /// <param name="features">The invocation features.</param>
 129    /// <param name="idempotent">When <see langword="true" />, the request is idempotent.</param>
 130    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 131    /// <returns>The operation's return value.</returns>
 132    public static Task<IAsyncEnumerable<TOutput>> InvokeServerStreamingAsync<TOutput>(
 133        this IInvoker invoker,
 134        ServiceAddress serviceAddress,
 135        string operation,
 136        IMessage inputMessage,
 137        MessageParser<TOutput> messageParser,
 138        ProtobufEncodeOptions? encodeOptions = null,
 139        IFeatureCollection? features = null,
 140        bool idempotent = false,
 141        CancellationToken cancellationToken = default) where TOutput : IMessage<TOutput>
 5142    {
 5143        var request = new OutgoingRequest(serviceAddress)
 5144        {
 5145            Features = features ?? FeatureCollection.Empty,
 5146            Fields = idempotent ?
 5147                _idempotentFields : ImmutableDictionary<RequestFieldKey, OutgoingFieldValue>.Empty,
 5148            Operation = operation,
 5149            Payload = inputMessage.EncodeAsLengthPrefixedMessage(
 5150                encodeOptions?.PipeOptions ?? ProtobufEncodeOptions.Default.PipeOptions),
 5151        };
 152
 153        Task<IncomingResponse> responseTask;
 154        try
 5155        {
 5156            responseTask = invoker.InvokeAsync(request, cancellationToken);
 5157        }
 0158        catch
 0159        {
 0160            request.Dispose();
 0161            throw;
 162        }
 163
 164        // ReceiveStreamingResponseAsync is responsible for disposing the request
 5165        return ReceiveStreamingResponseAsync(messageParser, responseTask, request);
 5166    }
 167
 168    /// <summary>Sends a request to a service and decodes the response. This method is for Protobuf bidi-streaming
 169    /// RPCs.</summary>
 170    /// <typeparam name="TInput">The type of the input message.</typeparam>
 171    /// <typeparam name="TOutput">The type of the output message.</typeparam>
 172    /// <param name="invoker">The invoker used to send the request.</param>
 173    /// <param name="serviceAddress">The address of the target service.</param>
 174    /// <param name="operation">The name of the operation, as specified in Protobuf.</param>
 175    /// <param name="stream">The stream of input message to encode in the request payload continuation.</param>
 176    /// <param name="messageParser">The <see cref="MessageParser{T}"/> used to decode the response payload.</param>
 177    /// <param name="encodeOptions">The options to customize the encoding of the request payload continuation.</param>//
 178    /// <param name="features">The invocation features.</param>
 179    /// <param name="idempotent">When <see langword="true" />, the request is idempotent.</param>
 180    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 181    /// <returns>The operation's return value.</returns>
 182    public static Task<IAsyncEnumerable<TOutput>> InvokeBidiStreamingAsync<TInput, TOutput>(
 183        this IInvoker invoker,
 184        ServiceAddress serviceAddress,
 185        string operation,
 186        IAsyncEnumerable<TInput> stream,
 187        MessageParser<TOutput> messageParser,
 188        ProtobufEncodeOptions? encodeOptions = null,
 189        IFeatureCollection? features = null,
 190        bool idempotent = false,
 191        CancellationToken cancellationToken = default) where TInput : IMessage<TInput>
 192                                                       where TOutput : IMessage<TOutput>
 4193    {
 4194        var request = new OutgoingRequest(serviceAddress)
 4195        {
 4196            Features = features ?? FeatureCollection.Empty,
 4197            Fields = idempotent ?
 4198                _idempotentFields : ImmutableDictionary<RequestFieldKey, OutgoingFieldValue>.Empty,
 4199            Operation = operation,
 4200            PayloadContinuation = stream.ToPipeReader(encodeOptions),
 4201        };
 202
 203        Task<IncomingResponse> responseTask;
 204        try
 4205        {
 4206            responseTask = invoker.InvokeAsync(request, cancellationToken);
 4207        }
 0208        catch
 0209        {
 0210            request.Dispose();
 0211            throw;
 212        }
 213
 214        // ReceiveStreamingResponseAsync is responsible for disposing the request
 4215        return ReceiveStreamingResponseAsync(messageParser, responseTask, request);
 4216    }
 217
 218    private static async Task<TOutput> ReceiveResponseAsync<TOutput>(
 219        MessageParser<TOutput> messageParser,
 220        Task<IncomingResponse> responseTask,
 221        OutgoingRequest request,
 222        CancellationToken cancellationToken) where TOutput : IMessage<TOutput>
 16223    {
 224        try
 16225        {
 16226            IncomingResponse response = await responseTask.ConfigureAwait(false);
 16227            if (response.StatusCode == StatusCode.Ok)
 13228            {
 13229                IProtobufFeature protobufFeature = request.Features.Get<IProtobufFeature>() ?? ProtobufFeature.Default;
 13230                return await response.Payload.DecodeProtobufMessageAsync(
 13231                    messageParser,
 13232                    protobufFeature.MaxMessageLength,
 13233                    cancellationToken).ConfigureAwait(false);
 234            }
 235            else
 3236            {
 237                // IceRPC guarantees the error message is non-null when StatusCode > Ok.
 3238                throw new DispatchException(response.StatusCode, response.ErrorMessage!)
 3239                {
 3240                    ConvertToInternalError = true
 3241                };
 242            }
 243        }
 244        finally
 16245        {
 16246            request.Dispose();
 16247        }
 13248    }
 249
 250    private static async Task<IAsyncEnumerable<TOutput>> ReceiveStreamingResponseAsync<TOutput>(
 251        MessageParser<TOutput> messageParser,
 252        Task<IncomingResponse> responseTask,
 253        OutgoingRequest request) where TOutput : IMessage<TOutput>
 9254    {
 255        try
 9256        {
 9257            IncomingResponse response = await responseTask.ConfigureAwait(false);
 9258            if (response.StatusCode == StatusCode.Ok)
 6259            {
 6260                IProtobufFeature protobufFeature = request.Features.Get<IProtobufFeature>() ?? ProtobufFeature.Default;
 6261                PipeReader payload = response.DetachPayload();
 6262                return payload.ToAsyncEnumerable(
 6263                    messageParser,
 6264                    protobufFeature.MaxMessageLength,
 6265                    CancellationToken.None);
 266            }
 267            else
 3268            {
 269                // IceRPC guarantees the error message is non-null when StatusCode > Ok.
 3270                throw new DispatchException(response.StatusCode, response.ErrorMessage!)
 3271                {
 3272                    ConvertToInternalError = true
 3273                };
 274            }
 275        }
 276        finally
 9277        {
 9278            request.Dispose();
 9279        }
 6280    }
 281}