MassTransit是一个用于.NET应用程序的开源消息总线框架,支持多种消息传输模式,包括发布/订阅、请求/响应和Saga模式。它与RabbitMQ、Azure Service Bus等消息代理集成良好,提供了可靠的消息传递和处理能力。本文将通过几个完整的C#控制台应用示例,展示如何使用MassTransit。
在开始之前,请确保已安装.NET SDK。然后,创建一个新的控制台项目并添加MassTransit和RabbitMQ的NuGet包:
Bashdotnet add package MassTransit dotnet add package MassTransit.RabbitMQ dotnet add package Microsoft.Extensions.Hosting
在这个示例中,我们将实现一个简单的发布/订阅模式,模拟订单创建的消息传递。
C#using MassTransit;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
namespace AppMassTransit
{
public record OrderCreated
{
public Guid OrderId { get; init; }
public string CustomerName { get; init; }
public decimal Amount { get; init; }
}
public class OrderCreatedConsumer : IConsumer<OrderCreated>
{
public Task Consume(ConsumeContext<OrderCreated> context)
{
Console.WriteLine($"Order received: {context.Message.OrderId}");
Console.WriteLine($"Customer: {context.Message.CustomerName}");
Console.WriteLine($"Amount: {context.Message.Amount:C}");
return Task.CompletedTask;
}
}
public class Program
{
public static async Task Main()
{
var builder = Host.CreateDefaultBuilder()
.ConfigureServices((hostContext, services) =>
{
services.AddMassTransit(x =>
{
x.AddConsumer<OrderCreatedConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ConfigureEndpoints(context);
});
});
});
var host = builder.Build();
await host.StartAsync();
var publishEndpoint = host.Services.GetRequiredService<IPublishEndpoint>();
while (true)
{
Console.WriteLine("\nPress 'P' to publish a message, or 'Q' to quit.");
var key = Console.ReadKey();
Console.WriteLine();
if (key.Key == ConsoleKey.Q)
break;
if (key.Key == ConsoleKey.P)
{
await publishEndpoint.Publish<OrderCreated>(new
{
OrderId = Guid.NewGuid(),
CustomerName = "Rick",
Amount = 99.99m
});
Console.WriteLine("Message published successfully!");
}
}
await host.StopAsync();
}
}
}
在这个示例中,我们将实现一个请求/响应模式,用于检查库存。
C#using MassTransit;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
namespace AppMassTransit
{
public record CheckInventory
{
public string ProductId { get; init; }
public int RequestedQuantity { get; init; }
}
public record InventoryStatus
{
public string ProductId { get; init; }
public bool IsAvailable { get; init; }
public int AvailableQuantity { get; init; }
}
public class InventoryCheckConsumer : IConsumer<CheckInventory>
{
private readonly Dictionary<string, int> _inventory = new()
{
{ "PROD-1", 100 },
{ "PROD-2", 50 },
{ "PROD-3", 0 }
};
public Task Consume(ConsumeContext<CheckInventory> context)
{
var message = context.Message;
var availableQuantity = _inventory.GetValueOrDefault(message.ProductId, 0);
return context.RespondAsync<InventoryStatus>(new
{
ProductId = message.ProductId,
IsAvailable = availableQuantity >= message.RequestedQuantity,
AvailableQuantity = availableQuantity
});
}
}
public class Program
{
public static async Task Main()
{
var builder = Host.CreateDefaultBuilder()
.ConfigureServices((hostContext, services) =>
{
services.AddMassTransit(x =>
{
x.AddConsumer<InventoryCheckConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ConfigureEndpoints(context);
});
});
});
var host = builder.Build();
await host.StartAsync();
var requestClient = host.Services.GetRequiredService<IRequestClient<CheckInventory>>();
while (true)
{
Console.WriteLine("\nEnter product ID to check (or 'quit' to exit):");
var input = Console.ReadLine();
if (input?.ToLower() == "quit")
break;
try
{
var response = await requestClient.GetResponse<InventoryStatus>(new
{
ProductId = input,
RequestedQuantity = 10
});
var status = response.Message;
Console.WriteLine($"Product: {status.ProductId}");
Console.WriteLine($"Available: {status.IsAvailable}");
Console.WriteLine($"Quantity: {status.AvailableQuantity}");
}
catch (Exception ex)
{
Console.WriteLine($"Error: {ex.Message}");
}
}
await host.StopAsync();
}
}
}
在这个示例中,我们将使用状态机来管理订单的生命周期。
C#using MassTransit;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
namespace AppMassTransit
{
public class OrderState : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
public string CustomerName { get; set; }
public decimal OrderTotal { get; set; }
public DateTime? SubmitDate { get; set; }
public DateTime? PaymentDate { get; set; }
public DateTime? CompletedDate { get; set; }
}
public record OrderSubmitted
{
public Guid OrderId { get; init; }
public string CustomerName { get; init; }
public decimal OrderTotal { get; init; }
}
public record PaymentCompleted
{
public Guid OrderId { get; init; }
}
public record OrderCompleted
{
public Guid OrderId { get; init; }
}
public class OrderStateMachine : MassTransitStateMachine<OrderState>
{
public OrderStateMachine()
{
InstanceState(x => x.CurrentState);
Event(() => OrderSubmittedEvent, x => x.CorrelateById(context => context.Message.OrderId));
Event(() => PaymentCompletedEvent, x => x.CorrelateById(context => context.Message.OrderId));
Event(() => OrderCompletedEvent, x => x.CorrelateById(context => context.Message.OrderId));
Initially(
When(OrderSubmittedEvent)
.Then(x =>
{
x.Saga.CustomerName = x.Message.CustomerName;
x.Saga.OrderTotal = x.Message.OrderTotal;
x.Saga.SubmitDate = DateTime.UtcNow;
Console.WriteLine($"Order submitted: {x.Saga.CorrelationId}");
})
.TransitionTo(Submitted)
);
During(Submitted,
When(PaymentCompletedEvent)
.Then(x =>
{
x.Saga.PaymentDate = DateTime.UtcNow;
Console.WriteLine($"Payment completed: {x.Saga.CorrelationId}");
})
.TransitionTo(Paid)
);
During(Paid,
When(OrderCompletedEvent)
.Then(x =>
{
x.Saga.CompletedDate = DateTime.UtcNow;
Console.WriteLine($"Order completed: {x.Saga.CorrelationId}");
})
.TransitionTo(Completed)
);
}
public State Submitted { get; private set; }
public State Paid { get; private set; }
public State Completed { get; private set; }
public Event<OrderSubmitted> OrderSubmittedEvent { get; private set; }
public Event<PaymentCompleted> PaymentCompletedEvent { get; private set; }
public Event<OrderCompleted> OrderCompletedEvent { get; private set; }
}
public class Program
{
public static async Task Main()
{
var builder = Host.CreateDefaultBuilder()
.ConfigureServices((hostContext, services) =>
{
services.AddMassTransit(x =>
{
x.AddSagaStateMachine<OrderStateMachine, OrderState>()
.InMemoryRepository();
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ConfigureEndpoints(context);
});
});
});
var host = builder.Build();
await host.StartAsync();
var publishEndpoint = host.Services.GetRequiredService<IPublishEndpoint>();
while (true)
{
Console.WriteLine("\nSelect action:");
Console.WriteLine("1. Submit Order");
Console.WriteLine("2. Complete Payment");
Console.WriteLine("3. Complete Order");
Console.WriteLine("Q. Quit");
var key = Console.ReadKey();
Console.WriteLine();
switch (key.Key)
{
case ConsoleKey.D1:
try
{
// 每次提交订单时生成新的 orderId
var newOrderId = Guid.NewGuid();
Console.WriteLine($"Attempting to publish OrderSubmitted event with OrderId: {newOrderId}");
await publishEndpoint.Publish<OrderSubmitted>(new
{
OrderId = newOrderId,
CustomerName = "John Doe",
OrderTotal = 299.99m
});
Console.WriteLine($"Successfully published OrderSubmitted event with OrderId: {newOrderId}");
}
catch (Exception ex)
{
Console.WriteLine($"Error publishing OrderSubmitted event: {ex.Message}");
}
break;
case ConsoleKey.D2:
Console.WriteLine("Please enter the OrderId to complete payment:");
if (Guid.TryParse(Console.ReadLine(), out Guid paymentOrderId))
{
await publishEndpoint.Publish<PaymentCompleted>(new
{
OrderId = paymentOrderId
});
}
else
{
Console.WriteLine("Invalid OrderId format");
}
break;
case ConsoleKey.D3:
Console.WriteLine("Please enter the OrderId to complete order:");
if (Guid.TryParse(Console.ReadLine(), out Guid completeOrderId))
{
await publishEndpoint.Publish<OrderCompleted>(new
{
OrderId = completeOrderId
});
}
else
{
Console.WriteLine("Invalid OrderId format");
}
break;
case ConsoleKey.Q:
await host.StopAsync();
return;
}
}
}
}
}
MassTransit为.NET应用程序提供了强大的消息传输和分布式系统集成能力。通过以上示例,我们可以看到如何在C#控制台应用中实现发布/订阅、请求/响应和Saga模式。MassTransit的灵活性和易用性使其成为构建可靠消息驱动应用程序的理想选择。
本文作者:rick
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!