编辑
2025-02-14
C# 应用
00
请注意,本文编写于 81 天前,最后修改于 81 天前,其中某些信息可能已经过时。

目录

简介
环境准备
示例实现
示例1:基础发布/订阅
示例2:请求/响应模式
示例3:状态机(Saga)
总结

简介

MassTransit是一个用于.NET应用程序的开源消息总线框架,支持多种消息传输模式,包括发布/订阅、请求/响应和Saga模式。它与RabbitMQ、Azure Service Bus等消息代理集成良好,提供了可靠的消息传递和处理能力。本文将通过几个完整的C#控制台应用示例,展示如何使用MassTransit。

环境准备

在开始之前,请确保已安装.NET SDK。然后,创建一个新的控制台项目并添加MassTransit和RabbitMQ的NuGet包:

Bash
dotnet add package MassTransit dotnet add package MassTransit.RabbitMQ dotnet add package Microsoft.Extensions.Hosting

示例实现

示例1:基础发布/订阅

在这个示例中,我们将实现一个简单的发布/订阅模式,模拟订单创建的消息传递。

C#
using MassTransit; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; namespace AppMassTransit { public record OrderCreated { public Guid OrderId { get; init; } public string CustomerName { get; init; } public decimal Amount { get; init; } } public class OrderCreatedConsumer : IConsumer<OrderCreated> { public Task Consume(ConsumeContext<OrderCreated> context) { Console.WriteLine($"Order received: {context.Message.OrderId}"); Console.WriteLine($"Customer: {context.Message.CustomerName}"); Console.WriteLine($"Amount: {context.Message.Amount:C}"); return Task.CompletedTask; } } public class Program { public static async Task Main() { var builder = Host.CreateDefaultBuilder() .ConfigureServices((hostContext, services) => { services.AddMassTransit(x => { x.AddConsumer<OrderCreatedConsumer>(); x.UsingRabbitMq((context, cfg) => { cfg.Host("localhost", "/", h => { h.Username("guest"); h.Password("guest"); }); cfg.ConfigureEndpoints(context); }); }); }); var host = builder.Build(); await host.StartAsync(); var publishEndpoint = host.Services.GetRequiredService<IPublishEndpoint>(); while (true) { Console.WriteLine("\nPress 'P' to publish a message, or 'Q' to quit."); var key = Console.ReadKey(); Console.WriteLine(); if (key.Key == ConsoleKey.Q) break; if (key.Key == ConsoleKey.P) { await publishEndpoint.Publish<OrderCreated>(new { OrderId = Guid.NewGuid(), CustomerName = "Rick", Amount = 99.99m }); Console.WriteLine("Message published successfully!"); } } await host.StopAsync(); } } }

image.png

示例2:请求/响应模式

在这个示例中,我们将实现一个请求/响应模式,用于检查库存。

C#
using MassTransit; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; namespace AppMassTransit { public record CheckInventory { public string ProductId { get; init; } public int RequestedQuantity { get; init; } } public record InventoryStatus { public string ProductId { get; init; } public bool IsAvailable { get; init; } public int AvailableQuantity { get; init; } } public class InventoryCheckConsumer : IConsumer<CheckInventory> { private readonly Dictionary<string, int> _inventory = new() { { "PROD-1", 100 }, { "PROD-2", 50 }, { "PROD-3", 0 } }; public Task Consume(ConsumeContext<CheckInventory> context) { var message = context.Message; var availableQuantity = _inventory.GetValueOrDefault(message.ProductId, 0); return context.RespondAsync<InventoryStatus>(new { ProductId = message.ProductId, IsAvailable = availableQuantity >= message.RequestedQuantity, AvailableQuantity = availableQuantity }); } } public class Program { public static async Task Main() { var builder = Host.CreateDefaultBuilder() .ConfigureServices((hostContext, services) => { services.AddMassTransit(x => { x.AddConsumer<InventoryCheckConsumer>(); x.UsingRabbitMq((context, cfg) => { cfg.Host("localhost", "/", h => { h.Username("guest"); h.Password("guest"); }); cfg.ConfigureEndpoints(context); }); }); }); var host = builder.Build(); await host.StartAsync(); var requestClient = host.Services.GetRequiredService<IRequestClient<CheckInventory>>(); while (true) { Console.WriteLine("\nEnter product ID to check (or 'quit' to exit):"); var input = Console.ReadLine(); if (input?.ToLower() == "quit") break; try { var response = await requestClient.GetResponse<InventoryStatus>(new { ProductId = input, RequestedQuantity = 10 }); var status = response.Message; Console.WriteLine($"Product: {status.ProductId}"); Console.WriteLine($"Available: {status.IsAvailable}"); Console.WriteLine($"Quantity: {status.AvailableQuantity}"); } catch (Exception ex) { Console.WriteLine($"Error: {ex.Message}"); } } await host.StopAsync(); } } }

image.png

示例3:状态机(Saga)

在这个示例中,我们将使用状态机来管理订单的生命周期。

C#
using MassTransit; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; namespace AppMassTransit { public class OrderState : SagaStateMachineInstance { public Guid CorrelationId { get; set; } public string CurrentState { get; set; } public string CustomerName { get; set; } public decimal OrderTotal { get; set; } public DateTime? SubmitDate { get; set; } public DateTime? PaymentDate { get; set; } public DateTime? CompletedDate { get; set; } } public record OrderSubmitted { public Guid OrderId { get; init; } public string CustomerName { get; init; } public decimal OrderTotal { get; init; } } public record PaymentCompleted { public Guid OrderId { get; init; } } public record OrderCompleted { public Guid OrderId { get; init; } } public class OrderStateMachine : MassTransitStateMachine<OrderState> { public OrderStateMachine() { InstanceState(x => x.CurrentState); Event(() => OrderSubmittedEvent, x => x.CorrelateById(context => context.Message.OrderId)); Event(() => PaymentCompletedEvent, x => x.CorrelateById(context => context.Message.OrderId)); Event(() => OrderCompletedEvent, x => x.CorrelateById(context => context.Message.OrderId)); Initially( When(OrderSubmittedEvent) .Then(x => { x.Saga.CustomerName = x.Message.CustomerName; x.Saga.OrderTotal = x.Message.OrderTotal; x.Saga.SubmitDate = DateTime.UtcNow; Console.WriteLine($"Order submitted: {x.Saga.CorrelationId}"); }) .TransitionTo(Submitted) ); During(Submitted, When(PaymentCompletedEvent) .Then(x => { x.Saga.PaymentDate = DateTime.UtcNow; Console.WriteLine($"Payment completed: {x.Saga.CorrelationId}"); }) .TransitionTo(Paid) ); During(Paid, When(OrderCompletedEvent) .Then(x => { x.Saga.CompletedDate = DateTime.UtcNow; Console.WriteLine($"Order completed: {x.Saga.CorrelationId}"); }) .TransitionTo(Completed) ); } public State Submitted { get; private set; } public State Paid { get; private set; } public State Completed { get; private set; } public Event<OrderSubmitted> OrderSubmittedEvent { get; private set; } public Event<PaymentCompleted> PaymentCompletedEvent { get; private set; } public Event<OrderCompleted> OrderCompletedEvent { get; private set; } } public class Program { public static async Task Main() { var builder = Host.CreateDefaultBuilder() .ConfigureServices((hostContext, services) => { services.AddMassTransit(x => { x.AddSagaStateMachine<OrderStateMachine, OrderState>() .InMemoryRepository(); x.UsingRabbitMq((context, cfg) => { cfg.Host("localhost", "/", h => { h.Username("guest"); h.Password("guest"); }); cfg.ConfigureEndpoints(context); }); }); }); var host = builder.Build(); await host.StartAsync(); var publishEndpoint = host.Services.GetRequiredService<IPublishEndpoint>(); while (true) { Console.WriteLine("\nSelect action:"); Console.WriteLine("1. Submit Order"); Console.WriteLine("2. Complete Payment"); Console.WriteLine("3. Complete Order"); Console.WriteLine("Q. Quit"); var key = Console.ReadKey(); Console.WriteLine(); switch (key.Key) { case ConsoleKey.D1: try { // 每次提交订单时生成新的 orderId var newOrderId = Guid.NewGuid(); Console.WriteLine($"Attempting to publish OrderSubmitted event with OrderId: {newOrderId}"); await publishEndpoint.Publish<OrderSubmitted>(new { OrderId = newOrderId, CustomerName = "John Doe", OrderTotal = 299.99m }); Console.WriteLine($"Successfully published OrderSubmitted event with OrderId: {newOrderId}"); } catch (Exception ex) { Console.WriteLine($"Error publishing OrderSubmitted event: {ex.Message}"); } break; case ConsoleKey.D2: Console.WriteLine("Please enter the OrderId to complete payment:"); if (Guid.TryParse(Console.ReadLine(), out Guid paymentOrderId)) { await publishEndpoint.Publish<PaymentCompleted>(new { OrderId = paymentOrderId }); } else { Console.WriteLine("Invalid OrderId format"); } break; case ConsoleKey.D3: Console.WriteLine("Please enter the OrderId to complete order:"); if (Guid.TryParse(Console.ReadLine(), out Guid completeOrderId)) { await publishEndpoint.Publish<OrderCompleted>(new { OrderId = completeOrderId }); } else { Console.WriteLine("Invalid OrderId format"); } break; case ConsoleKey.Q: await host.StopAsync(); return; } } } } }

image.png

总结

MassTransit为.NET应用程序提供了强大的消息传输和分布式系统集成能力。通过以上示例,我们可以看到如何在C#控制台应用中实现发布/订阅、请求/响应和Saga模式。MassTransit的灵活性和易用性使其成为构建可靠消息驱动应用程序的理想选择。

本文作者:rick

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!