< Summary

Information
Class: IceRpc.Protobuf.InvokerExtensions
Assembly: IceRpc.Protobuf
File(s): /home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Protobuf/InvokerExtensions.cs
Tag: 275_13775359185
Line coverage
86%
Covered lines: 102
Uncovered lines: 16
Coverable lines: 118
Total lines: 282
Line coverage: 86.4%
Branch coverage
84%
Covered branches: 27
Total branches: 32
Branch coverage: 84.3%
Method coverage
100%
Covered methods: 7
Total methods: 7
Method coverage: 100%

Metrics

MethodBranch coverage Crap Score Cyclomatic complexity Line coverage
.cctor()100%11100%
InvokeUnaryAsync(...)87.5%8.6878.94%
InvokeClientStreamingAsync(...)75%4.18477.77%
InvokeServerStreamingAsync(...)75%8.6878.94%
InvokeBidiStreamingAsync(...)75%4.18477.77%
ReceiveResponseAsync()100%44100%
ReceiveStreamingResponseAsync()100%44100%

File(s)

/home/runner/work/icerpc-csharp/icerpc-csharp/src/IceRpc.Protobuf/InvokerExtensions.cs

#LineLine coverage
 1// Copyright (c) ZeroC, Inc.
 2
 3using Google.Protobuf;
 4using IceRpc.Features;
 5using IceRpc.Protobuf.Internal;
 6using System.Collections.Immutable;
 7using System.IO.Pipelines;
 8
 9namespace IceRpc.Protobuf;
 10
 11/// <summary>Provides extension methods for <see cref="IInvoker" />.</summary>
 12public static class InvokerExtensions
 13{
 114    private static readonly IDictionary<RequestFieldKey, OutgoingFieldValue> _idempotentFields =
 115        new Dictionary<RequestFieldKey, OutgoingFieldValue>
 116        {
 117            [RequestFieldKey.Idempotent] = default
 118        }.ToImmutableDictionary();
 19
 20    /// <summary>Sends a request to a service and decodes the response. This method is for Protobuf unary RPCs.
 21    /// </summary>
 22    /// <typeparam name="TOutput">The type of the output message.</typeparam>
 23    /// <param name="invoker">The invoker used to send the request.</param>
 24    /// <param name="serviceAddress">The address of the target service.</param>
 25    /// <param name="operation">The name of the operation, as specified in Protobuf.</param>
 26    /// <param name="inputMessage">The input message to encode in the request payload.</param>
 27    /// <param name="messageParser">The <see cref="MessageParser{T}"/> used to decode the response payload.</param>
 28    /// <param name="encodeOptions">The options to customize the encoding of the request payload.</param>
 29    /// <param name="features">The invocation features.</param>
 30    /// <param name="idempotent">When <see langword="true" />, the request is idempotent.</param>
 31    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 32    /// <returns>The operation's return value.</returns>
 33    public static Task<TOutput> InvokeUnaryAsync<TOutput>(
 34        this IInvoker invoker,
 35        ServiceAddress serviceAddress,
 36        string operation,
 37        IMessage inputMessage,
 38        MessageParser<TOutput> messageParser,
 39        ProtobufEncodeOptions? encodeOptions = null,
 40        IFeatureCollection? features = null,
 41        bool idempotent = false,
 42        CancellationToken cancellationToken = default) where TOutput : IMessage<TOutput>
 1243    {
 1244        var request = new OutgoingRequest(serviceAddress)
 1245        {
 1246            Features = features ?? FeatureCollection.Empty,
 1247            Fields = idempotent ?
 1248                _idempotentFields : ImmutableDictionary<RequestFieldKey, OutgoingFieldValue>.Empty,
 1249            Operation = operation,
 1250            Payload = inputMessage.EncodeAsLengthPrefixedMessage(
 1251                encodeOptions?.PipeOptions ?? ProtobufEncodeOptions.Default.PipeOptions),
 1252        };
 53
 54        Task<IncomingResponse> responseTask;
 55        try
 1256        {
 1257            responseTask = invoker.InvokeAsync(request, cancellationToken);
 1258        }
 059        catch
 060        {
 061            request.Dispose();
 062            throw;
 63        }
 64
 65        // ReceiveResponseAsync is responsible for disposing the request
 1266        return ReceiveResponseAsync(messageParser, responseTask, request, cancellationToken);
 1267    }
 68
 69    /// <summary>Sends a request to a service and decodes the response. This method is for Protobuf client-streaming
 70    /// RPCs.</summary>
 71    /// <typeparam name="TInput">The type of the input message.</typeparam>
 72    /// <typeparam name="TOutput">The type of the output message.</typeparam>
 73    /// <param name="invoker">The invoker used to send the request.</param>
 74    /// <param name="serviceAddress">The address of the target service.</param>
 75    /// <param name="operation">The name of the operation, as specified in Protobuf.</param>
 76    /// <param name="stream">The stream of input message to encode in the request payload continuation.</param>
 77    /// <param name="messageParser">The <see cref="MessageParser{T}"/> used to decode the response payload.</param>
 78    /// <param name="encodeOptions">The options to customize the encoding of the request payload continuation.</param>
 79    /// <param name="features">The invocation features.</param>
 80    /// <param name="idempotent">When <see langword="true" />, the request is idempotent.</param>
 81    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 82    /// <returns>The operation's return value.</returns>
 83    public static Task<TOutput> InvokeClientStreamingAsync<TInput, TOutput>(
 84        this IInvoker invoker,
 85        ServiceAddress serviceAddress,
 86        string operation,
 87        IAsyncEnumerable<TInput> stream,
 88        MessageParser<TOutput> messageParser,
 89        ProtobufEncodeOptions? encodeOptions = null,
 90        IFeatureCollection? features = null,
 91        bool idempotent = false,
 92        CancellationToken cancellationToken = default) where TInput : IMessage<TInput>
 93                                                       where TOutput : IMessage<TOutput>
 494    {
 495        var request = new OutgoingRequest(serviceAddress)
 496        {
 497            Features = features ?? FeatureCollection.Empty,
 498            Fields = idempotent ?
 499                _idempotentFields : ImmutableDictionary<RequestFieldKey, OutgoingFieldValue>.Empty,
 4100            Operation = operation,
 4101            PayloadContinuation = stream.ToPipeReader(encodeOptions),
 4102        };
 103
 104        Task<IncomingResponse> responseTask;
 105        try
 4106        {
 4107            responseTask = invoker.InvokeAsync(request, cancellationToken);
 4108        }
 0109        catch
 0110        {
 0111            request.Dispose();
 0112            throw;
 113        }
 114
 115        // ReceiveResponseAsync is responsible for disposing the request
 4116        return ReceiveResponseAsync(messageParser, responseTask, request, cancellationToken);
 4117    }
 118
 119    /// <summary>Sends a request to a service and decodes the response. This method is for Protobuf server-streaming
 120    /// RPCs.</summary>
 121    /// <typeparam name="TOutput">The type of the output message.</typeparam>
 122    /// <param name="invoker">The invoker used to send the request.</param>
 123    /// <param name="serviceAddress">The address of the target service.</param>
 124    /// <param name="operation">The name of the operation, as specified in Protobuf.</param>
 125    /// <param name="inputMessage">The input message to encode in the request payload.</param>
 126    /// <param name="messageParser">The <see cref="MessageParser{T}"/> used to decode the response payload.</param>
 127    /// <param name="encodeOptions">The options to customize the encoding of the request payload.</param>
 128    /// <param name="features">The invocation features.</param>
 129    /// <param name="idempotent">When <see langword="true" />, the request is idempotent.</param>
 130    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 131    /// <returns>The operation's return value.</returns>
 132    public static Task<IAsyncEnumerable<TOutput>> InvokeServerStreamingAsync<TOutput>(
 133        this IInvoker invoker,
 134        ServiceAddress serviceAddress,
 135        string operation,
 136        IMessage inputMessage,
 137        MessageParser<TOutput> messageParser,
 138        ProtobufEncodeOptions? encodeOptions = null,
 139        IFeatureCollection? features = null,
 140        bool idempotent = false,
 141        CancellationToken cancellationToken = default) where TOutput : IMessage<TOutput>
 5142    {
 5143        var request = new OutgoingRequest(serviceAddress)
 5144        {
 5145            Features = features ?? FeatureCollection.Empty,
 5146            Fields = idempotent ?
 5147                _idempotentFields : ImmutableDictionary<RequestFieldKey, OutgoingFieldValue>.Empty,
 5148            Operation = operation,
 5149            Payload = inputMessage.EncodeAsLengthPrefixedMessage(
 5150                encodeOptions?.PipeOptions ?? ProtobufEncodeOptions.Default.PipeOptions),
 5151        };
 152
 153        Task<IncomingResponse> responseTask;
 154        try
 5155        {
 5156            responseTask = invoker.InvokeAsync(request, cancellationToken);
 5157        }
 0158        catch
 0159        {
 0160            request.Dispose();
 0161            throw;
 162        }
 163
 164        // ReceiveStreamingResponseAsync is responsible for disposing the request
 5165        return ReceiveStreamingResponseAsync(messageParser, responseTask, request);
 5166    }
 167
 168    /// <summary>Sends a request to a service and decodes the response. This method is for Protobuf bidi-streaming
 169    /// RPCs.</summary>
 170    /// <typeparam name="TInput">The type of the input message.</typeparam>
 171    /// <typeparam name="TOutput">The type of the output message.</typeparam>
 172    /// <param name="invoker">The invoker used to send the request.</param>
 173    /// <param name="serviceAddress">The address of the target service.</param>
 174    /// <param name="operation">The name of the operation, as specified in Protobuf.</param>
 175    /// <param name="stream">The stream of input message to encode in the request payload continuation.</param>
 176    /// <param name="messageParser">The <see cref="MessageParser{T}"/> used to decode the response payload.</param>
 177    /// <param name="encodeOptions">The options to customize the encoding of the request payload continuation.</param>//
 178    /// <param name="features">The invocation features.</param>
 179    /// <param name="idempotent">When <see langword="true" />, the request is idempotent.</param>
 180    /// <param name="cancellationToken">A cancellation token that receives the cancellation requests.</param>
 181    /// <returns>The operation's return value.</returns>
 182    public static Task<IAsyncEnumerable<TOutput>> InvokeBidiStreamingAsync<TInput, TOutput>(
 183        this IInvoker invoker,
 184        ServiceAddress serviceAddress,
 185        string operation,
 186        IAsyncEnumerable<TInput> stream,
 187        MessageParser<TOutput> messageParser,
 188        ProtobufEncodeOptions? encodeOptions = null,
 189        IFeatureCollection? features = null,
 190        bool idempotent = false,
 191        CancellationToken cancellationToken = default) where TInput : IMessage<TInput>
 192                                                       where TOutput : IMessage<TOutput>
 193
 4194    {
 4195        var request = new OutgoingRequest(serviceAddress)
 4196        {
 4197            Features = features ?? FeatureCollection.Empty,
 4198            Fields = idempotent ?
 4199                _idempotentFields : ImmutableDictionary<RequestFieldKey, OutgoingFieldValue>.Empty,
 4200            Operation = operation,
 4201            PayloadContinuation = stream.ToPipeReader(encodeOptions),
 4202        };
 203
 204        Task<IncomingResponse> responseTask;
 205        try
 4206        {
 4207            responseTask = invoker.InvokeAsync(request, cancellationToken);
 4208        }
 0209        catch
 0210        {
 0211            request.Dispose();
 0212            throw;
 213        }
 214
 215        // ReceiveStreamingResponseAsync is responsible for disposing the request
 4216        return ReceiveStreamingResponseAsync(messageParser, responseTask, request);
 4217    }
 218
 219    private static async Task<TOutput> ReceiveResponseAsync<TOutput>(
 220        MessageParser<TOutput> messageParser,
 221        Task<IncomingResponse> responseTask,
 222        OutgoingRequest request,
 223        CancellationToken cancellationToken) where TOutput : IMessage<TOutput>
 16224    {
 225        try
 16226        {
 16227            IncomingResponse response = await responseTask.ConfigureAwait(false);
 16228            if (response.StatusCode == StatusCode.Ok)
 13229            {
 13230                IProtobufFeature protobufFeature = request.Features.Get<IProtobufFeature>() ?? ProtobufFeature.Default;
 13231                return await response.Payload.DecodeProtobufMessageAsync(
 13232                    messageParser,
 13233                    protobufFeature.MaxMessageLength,
 13234                    cancellationToken).ConfigureAwait(false);
 235            }
 236            else
 3237            {
 238                // IceRPC guarantees the error message is non-null when StatusCode > Ok.
 3239                throw new DispatchException(response.StatusCode, response.ErrorMessage!)
 3240                {
 3241                    ConvertToInternalError = true
 3242                };
 243            }
 244        }
 245        finally
 16246        {
 16247            request.Dispose();
 16248        }
 13249    }
 250
 251    private static async Task<IAsyncEnumerable<TOutput>> ReceiveStreamingResponseAsync<TOutput>(
 252        MessageParser<TOutput> messageParser,
 253        Task<IncomingResponse> responseTask,
 254        OutgoingRequest request) where TOutput : IMessage<TOutput>
 9255    {
 256        try
 9257        {
 9258            IncomingResponse response = await responseTask.ConfigureAwait(false);
 9259            if (response.StatusCode == StatusCode.Ok)
 6260            {
 6261                IProtobufFeature protobufFeature = request.Features.Get<IProtobufFeature>() ?? ProtobufFeature.Default;
 6262                PipeReader payload = response.DetachPayload();
 6263                return payload.ToAsyncEnumerable(
 6264                    messageParser,
 6265                    protobufFeature.MaxMessageLength,
 6266                    CancellationToken.None);
 267            }
 268            else
 3269            {
 270                // IceRPC guarantees the error message is non-null when StatusCode > Ok.
 3271                throw new DispatchException(response.StatusCode, response.ErrorMessage!)
 3272                {
 3273                    ConvertToInternalError = true
 3274                };
 275            }
 276        }
 277        finally
 9278        {
 9279            request.Dispose();
 9280        }
 6281    }
 282}