Orleans 从入门到精通
目录
1. 什么是 Orleans 2. 核心概念 3. 快速入门 4. Grain 详解 5. 状态持久化 6. 流处理 7. 定时器与提醒 8. 分布式事务 9. 高级特性 10. 与其他技术集成 11. 测试策略 12. 生产部署 13. 最佳实践与避坑指南 14. 附录
---
1. 什么是 Orleans
1.1 简介
Microsoft Orleans 是一个跨平台的 .NET 框架,用于构建分布式、可扩展的应用程序。它由 Microsoft Research 发明,采用了创新的 Virtual Actor 模型,让开发者可以像编写单机应用一样编写分布式系统。
Orleans 已被广泛应用于:
- Xbox Live - 数千万玩家在线服务
- Halo 4/5 - 游戏云服务
- Skype - 实时通信后端
- Azure IoT - 物联网设备管理
- Azure Digital Twins - 数字孪生服务
- PlayFab - 游戏后端平台
1.2 为什么选择 Orleans
| 优势 | 说明 |
|---|---|
| Virtual Actor | 无需手动创建/销毁,自动生命周期管理 |
| 位置透明 | 调用者无需知道 Actor 物理位置 |
| 弹性扩展 | 轻松从单节点扩展到数千节点 |
| 容错能力 | 自动故障恢复,状态可持久化 |
| 简化开发 | 用面向对象方式编写分布式系统 |
| 成熟稳定 | 微软生产环境验证超过 10 年 |
1.3 Orleans vs 其他 Actor 框架
| 特性 | Orleans | Akka.NET | Proto.Actor |
|---|---|---|---|
| Actor 模型 | Virtual Actor | 经典 Actor | 经典 Actor |
| 生命周期 | 自动管理 | 手动控制 | 手动控制 |
| 性能 (msg/s) | ~100K | ~120K | ~150K |
| 内存效率 | 最佳 | 中等 | 中等 |
| 学习曲线 | 平缓 | 陡峭 | 中等 |
| 云原生支持 | 最佳 | 良好 | 良好 |
2. 核心概念
2.1 架构概览
┌─────────────────────────────────────────────────────────┐
│ Cluster │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Silo 1 │◄──►│ Silo 2 │◄──►│ Silo 3 │ │
│ │ ┌─────────┐ │ │ ┌─────────┐ │ │ ┌─────────┐ │ │
│ │ │ Grain A │ │ │ │ Grain B │ │ │ │ Grain C │ │ │
│ │ │ Grain D │ │ │ │ Grain E │ │ │ │ Grain F │ │ │
│ │ └─────────┘ │ │ └─────────┘ │ │ └─────────┘ │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ └──────────────────┼──────────────────┘ │
│ │ │
│ ┌──────┴──────┐ │
│ │ Gateway │ │
│ └──────┬──────┘ │
└────────────────────────────┼────────────────────────────┘
│
┌──────┴──────┐
│ Clients │
└─────────────┘
2.2 Grain(谷物)
Grain 是 Orleans 中的核心抽象,代表一个虚拟 Actor。每个 Grain 有:
- 唯一标识 - 通过 Key 定位
- 状态 - 可持久化的数据
- 行为 - 通过接口定义的方法
// 定义 Grain 接口
public interface IUserGrain : IGrainWithStringKey
{
Task<string> GetName();
Task SetName(string name);
Task AddPoints(int points);
Task<int> GetPoints();
}
// 实现 Grain
public class UserGrain : Grain, IUserGrain
{
private string _name = "";
private int _points = 0;
public Task<string> GetName() => Task.FromResult(_name);
public Task SetName(string name)
{
_name = name;
return Task.CompletedTask;
}
public Task<int> GetPoints() => Task.FromResult(_points);
public Task AddPoints(int points)
{
_points += points;
return Task.CompletedTask;
}
}
2.3 Silo(筒仓)
Silo 是托管 Grain 的运行时容器,负责:
- Grain 的激活和停用
- 消息路由和分发
- 状态持久化
- 故障检测和恢复
// 配置 Silo
var builder = Host.CreateApplicationBuilder(args);
builder.UseOrleans(silo =>
{
silo.UseLocalhostClustering();
silo.AddMemoryGrainStorage("Default");
});
2.4 Cluster(集群)
Cluster 是一组协作的 Silo,提供:
- 横向扩展能力
- 高可用性
- 负载均衡
2.5 Grain Key 类型
| 接口 | Key 类型 | 用例 |
|---|---|---|
IGrainWithGuidKey | Guid | 自动生成的唯一 ID |
IGrainWithIntegerKey | long | 数值 ID(如数据库主键) |
IGrainWithStringKey | string | 字符串标识(如用户名) |
IGrainWithGuidCompoundKey | Guid + string | 复合键 |
IGrainWithIntegerCompoundKey | long + string | 复合键 |
// 不同 Key 类型示例
var userByGuid = GrainFactory.GetGrain<IUserGrain>(Guid.NewGuid());
var userById = GrainFactory.GetGrain<IUserGrain>(123456L);
var userByName = GrainFactory.GetGrain<IUserGrain>("john_doe");
---
3. 快速入门
3.1 安装包
# 服务端
dotnet add package Microsoft.Orleans.Server
# 客户端
dotnet add package Microsoft.Orleans.Client
# SDK(包含代码生成)
dotnet add package Microsoft.Orleans.Sdk
# 分析器(可选,提供编译时检查)
dotnet add package Microsoft.Orleans.Analyzers
3.2 创建第一个 Orleans 应用
Step 1: 定义 Grain 接口
// IHelloGrain.cs
public interface IHelloGrain : IGrainWithStringKey
{
Task<string> SayHello(string name);
}
Step 2: 实现 Grain
// HelloGrain.cs
public class HelloGrain : Grain, IHelloGrain
{
public Task<string> SayHello(string name)
{
return Task.FromResult($"Hello, {name}!");
}
}
Step 3: 配置并运行 Silo
// Program.cs
var builder = WebApplication.CreateBuilder(args);
// 配置 Orleans
builder.Host.UseOrleans(silo =>
{
silo.UseLocalhostClustering();
silo.AddMemoryGrainStorage("Default");
});
var app = builder.Build();
// 添加 API 端点
app.MapGet("/hello/{name}", async (IGrainFactory grains, string name) =>
{
var grain = grains.GetGrain<IHelloGrain>(name);
return await grain.SayHello(name);
});
app.Run();
Step 4: 运行
dotnet run
curl http://localhost:5000/hello/World
# 输出: "Hello, World!"
3.3 独立客户端连接
// 独立客户端程序
using var client = new ClientBuilder()
.UseLocalhostClustering()
.Build();
await client.Connect();
var grain = client.GetGrain<IHelloGrain>("test");
var result = await grain.SayHello("World");
Console.WriteLine(result); // "Hello, World!"
---
4. Grain 详解
4.1 Grain 生命周期
┌─────────────┐
│ Created │ Grain 被请求但尚未激活
└──────┬──────┘
│
▼
┌─────────────┐
│ Activating │ 调用 OnActivateAsync
└──────┬──────┘
│
▼
┌─────────────┐
│ Active │ Grain 可响应请求
└──────┬──────┘
│
▼
┌─────────────┐
│Deactivating │ 调用 OnDeactivateAsync
└──────┬──────┘
│
▼
┌─────────────┐
│ Deactivated │ Grain 从内存移除
└─────────────┘
public class LifecycleGrain : Grain, ILifecycleGrain
{
private readonly ILogger<LifecycleGrain> _logger;
public LifecycleGrain(ILogger<LifecycleGrain> logger)
{
_logger = logger;
}
public override Task OnActivateAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Grain {Id} activating", this.GetPrimaryKeyString());
return base.OnActivateAsync(cancellationToken);
}
public override Task OnDeactivateAsync(DeactivationReason reason, CancellationToken cancellationToken)
{
_logger.LogInformation("Grain {Id} deactivating: {Reason}",
this.GetPrimaryKeyString(), reason);
return base.OnDeactivateAsync(reason, cancellationToken);
}
}
4.2 Grain 之间调用
public class OrderGrain : Grain, IOrderGrain
{
public async Task ProcessOrder(Order order)
{
// 获取其他 Grain
var inventory = GrainFactory.GetGrain<IInventoryGrain>(order.ProductId);
var payment = GrainFactory.GetGrain<IPaymentGrain>(order.UserId);
var notification = GrainFactory.GetGrain<INotificationGrain>(order.UserId);
// 检查库存
var available = await inventory.CheckAvailability(order.Quantity);
if (!available)
{
throw new InvalidOperationException("Insufficient inventory");
}
// 处理支付
await payment.Charge(order.Amount);
// 扣减库存
await inventory.Deduct(order.Quantity);
// 发送通知
await notification.Send($"Order {order.Id} processed successfully");
}
}
4.3 不可重入与可重入
默认情况下,Grain 是不可重入的,即同一时刻只能处理一个请求。
// 默认:不可重入(推荐)
public class SafeGrain : Grain, ISafeGrain
{
// 同一时刻只有一个请求进入
}
// 可重入:允许交错执行
[Reentrant]
public class ReentrantGrain : Grain, IReentrantGrain
{
// 允许在等待时处理其他请求
// 警告:需要确保线程安全
}
// 方法级别可重入
public class MixedGrain : Grain, IMixedGrain
{
[AlwaysInterleave]
public Task<int> GetCounter() => Task.FromResult(_counter);
public Task IncrementCounter() // 不可重入
{
_counter++;
return Task.CompletedTask;
}
}
4.4 多重激活与 Stateless Worker
// Stateless Worker: 可在多个 Silo 同时激活
[StatelessWorker(maxLocalWorkers: 10)]
public class StatelessGrain : Grain, IStatelessGrain
{
public Task<string> Process(string input)
{
// 无状态处理
return Task.FromResult(input.ToUpper());
}
}
// 使用场景:计算密集型、无状态转换
4.5 Grain 版本兼容性
// 使用 [Version] 属性标记版本
[Version(2)]
public interface IVersionedGrain : IGrainWithIntegerKey
{
Task<string> GetValue();
Task SetValue(string value);
// 版本 2 新增方法
Task<int> GetVersion();
}
---
5. 状态持久化
5.1 持久化存储提供者
Orleans 支持多种存储后端:
| 存储类型 | NuGet 包 | 特点 |
|---|---|---|
| Memory | 内置 | 仅用于开发测试 |
| Azure Table | Microsoft.Orleans.Persistence.AzureStorage | 经济高效 |
| Azure Blob | Microsoft.Orleans.Persistence.AzureStorage | 大对象存储 |
| Azure Cosmos | Microsoft.Orleans.Persistence.Cosmos | 全球分布 |
| Redis | Orleans.Contrib.Persistence.Redis | 高性能缓存 |
| SQL Server | Microsoft.Orleans.Persistence.AdoNet | 关系数据库 |
| PostgreSQL | Microsoft.Orleans.Persistence.AdoNet | 开源数据库 |
| DynamoDB | Microsoft.Orleans.Persistence.DynamoDB | AWS 原生 |
5.2 配置存储
builder.Host.UseOrleans(silo =>
{
// Azure Table Storage
silo.AddAzureTableGrainStorage("tableStore", options =>
{
options.ConfigureTableServiceClient(connectionString);
});
// Azure Blob Storage
silo.AddAzureBlobGrainStorage("blobStore", options =>
{
options.ConfigureBlobServiceClient(connectionString);
});
// Redis
silo.AddRedisGrainStorage("redis", options =>
{
options.DataConnectionString = redisConnectionString;
});
// ADO.NET (SQL Server)
silo.AddAdoNetGrainStorage("sql", options =>
{
options.ConnectionString = sqlConnectionString;
options.Invariant = "Microsoft.Data.SqlClient";
});
});
5.3 使用 IPersistentState(推荐)
// 定义状态类
[GenerateSerializer]
public class UserProfileState
{
[Id(0)]
public string Name { get; set; } = "";
[Id(1)]
public string Email { get; set; } = "";
[Id(2)]
public int Points { get; set; }
[Id(3)]
public List<string> Achievements { get; set; } = new();
}
// 实现 Grain
public class UserProfileGrain : Grain, IUserProfileGrain
{
private readonly IPersistentState<UserProfileState> _state;
public UserProfileGrain(
[PersistentState("profile", "tableStore")]
IPersistentState<UserProfileState> state)
{
_state = state;
}
public Task<UserProfileState> GetProfile()
{
return Task.FromResult(_state.State);
}
public async Task UpdateName(string name)
{
_state.State.Name = name;
await _state.WriteStateAsync();
}
public async Task AddPoints(int points)
{
_state.State.Points += points;
await _state.WriteStateAsync();
}
public async Task AddAchievement(string achievement)
{
_state.State.Achievements.Add(achievement);
await _state.WriteStateAsync();
}
}
5.4 状态操作
public class StateOperationsGrain : Grain, IStateOperationsGrain
{
private readonly IPersistentState<MyState> _state;
// 写入状态
public async Task SaveState()
{
_state.State.Value = "New Value";
await _state.WriteStateAsync();
}
// 读取状态
public async Task RefreshState()
{
await _state.ReadStateAsync();
}
// 清除状态
public async Task ClearState()
{
await _state.ClearStateAsync();
}
}
5.5 Grain 方式(传统)
[StorageProvider(ProviderName = "tableStore")]
public class LegacyGrain : Grain<LegacyState>, ILegacyGrain
{
public async Task UpdateValue(string value)
{
State.Value = value;
await WriteStateAsync();
}
public Task<string> GetValue()
{
return Task.FromResult(State.Value);
}
}
---
6. 流处理
6.1 流概述
Orleans Streams 提供了一种优雅的方式来实现发布-订阅模式:
┌─────────┐ ┌─────────────┐ ┌─────────┐
│Producer │───►│ Stream │───►│Consumer │
│ Grain │ │ (Queue) │ │ Grain │
└─────────┘ └─────────────┘ └─────────┘
6.2 配置流提供者
builder.Host.UseOrleans(silo =>
{
// Azure Queue Streams
silo.AddAzureQueueStreams("AzureQueue", options =>
{
options.ConfigureQueueServiceClient(connectionString);
});
// Simple Message Stream (内存)
silo.AddMemoryStreams("MemoryStream");
// Azure Event Hubs (可回溯)
silo.AddEventHubStreams("EventHub", options =>
{
options.ConfigureEventHub(builder =>
{
builder.ConfigureEventHubConnection(connectionString, "hubName", "consumerGroup");
});
});
});
6.3 生产者
public class ProducerGrain : Grain, IProducerGrain
{
public async Task ProduceEvents()
{
var streamProvider = this.GetStreamProvider("AzureQueue");
var streamId = StreamId.Create("ChatRoom", "general");
var stream = streamProvider.GetStream<string>(streamId);
// 发送消息
await stream.OnNextAsync("Hello, everyone!");
await stream.OnNextAsync("How are you?");
}
}
6.4 消费者(显式订阅)
public class ConsumerGrain : Grain, IConsumerGrain
{
private StreamSubscriptionHandle<string>? _subscription;
public override async Task OnActivateAsync(CancellationToken cancellationToken)
{
var streamProvider = this.GetStreamProvider("AzureQueue");
var streamId = StreamId.Create("ChatRoom", this.GetPrimaryKeyString());
var stream = streamProvider.GetStream<string>(streamId);
// 订阅流
_subscription = await stream.SubscribeAsync(OnNextAsync, OnErrorAsync, OnCompletedAsync);
}
private Task OnNextAsync(string item, StreamSequenceToken? token)
{
Console.WriteLine($"Received: {item}");
return Task.CompletedTask;
}
private Task OnErrorAsync(Exception ex)
{
Console.WriteLine($"Error: {ex.Message}");
return Task.CompletedTask;
}
private Task OnCompletedAsync()
{
Console.WriteLine("Stream completed");
return Task.CompletedTask;
}
}
6.5 隐式订阅
// 隐式订阅:Grain 根据属性自动订阅
[ImplicitStreamSubscription("ChatRoom")]
public class ChatRoomGrain : Grain, IChatRoomGrain, IAsyncObserver<string>
{
public override async Task OnActivateAsync(CancellationToken cancellationToken)
{
var streamProvider = this.GetStreamProvider("AzureQueue");
var streamId = StreamId.Create("ChatRoom", this.GetPrimaryKeyString());
var stream = streamProvider.GetStream<string>(streamId);
// 获取或创建订阅句柄
var handles = await stream.GetAllSubscriptionHandles();
if (handles.Count > 0)
{
await handles[0].ResumeAsync(this);
}
else
{
await stream.SubscribeAsync(this);
}
}
public Task OnNextAsync(string item, StreamSequenceToken? token = null)
{
Console.WriteLine($"ChatRoom {GetPrimaryKeyString()}: {item}");
return Task.CompletedTask;
}
}
---
7. 定时器与提醒
7.1 Grain Timer(非持久化)
适用于高频、临时性任务:
public class TimerGrain : Grain, ITimerGrain
{
private IGrainTimer? _timer;
private int _counter = 0;
public override Task OnActivateAsync(CancellationToken cancellationToken)
{
// Orleans 8.2+ 新 API
_timer = this.RegisterGrainTimer(
static (state, ct) => state.DoPeriodicWork(ct),
this,
new GrainTimerCreationOptions
{
DueTime = TimeSpan.FromSeconds(5),
Period = TimeSpan.FromSeconds(10),
KeepAlive = true, // 防止 Grain 被回收
Interleave = false // 不与其他调用交错
});
return Task.CompletedTask;
}
private Task DoPeriodicWork(CancellationToken cancellationToken)
{
_counter++;
Console.WriteLine($"Counter: {_counter}");
return Task.CompletedTask;
}
public override Task OnDeactivateAsync(DeactivationReason reason, CancellationToken cancellationToken)
{
_timer?.Dispose();
return Task.CompletedTask;
}
}
7.2 Reminder(持久化)
适用于低频、持久性任务:
// 配置 Reminder 存储
builder.Host.UseOrleans(silo =>
{
silo.UseAzureTableReminderService(options =>
{
options.ConfigureTableServiceClient(connectionString);
});
});
// 实现 IRemindable
public class ReminderGrain : Grain, IReminderGrain, IRemindable
{
private IGrainReminder? _reminder;
public async Task StartReminder()
{
_reminder = await this.RegisterOrUpdateReminder(
"DailyCleanup",
dueTime: TimeSpan.FromMinutes(1),
period: TimeSpan.FromHours(24));
}
public async Task StopReminder()
{
if (_reminder != null)
{
await this.UnregisterReminder(_reminder);
}
}
public Task ReceiveReminder(string reminderName, TickStatus status)
{
Console.WriteLine($"Reminder {reminderName} fired at {status.CurrentTickTime}");
return Task.CompletedTask;
}
}
7.3 Timer vs Reminder
| 特性 | Grain Timer | Reminder |
|---|---|---|
| 持久化 | ❌ | ✅ |
| 频率 | 高(毫秒级) | 低(分钟级) |
| Grain 停用后 | 消失 | 继续触发 |
| 存储 | 内存 | 数据库 |
| 适用场景 | 临时任务 | 定期清理、报告 |
8. 分布式事务
8.1 配置事务
builder.Host.UseOrleans(silo =>
{
silo.UseTransactions();
silo.AddAzureTableTransactionalStateStorage("TransactionStore", options =>
{
options.ConfigureTableServiceClient(connectionString);
});
});
8.2 定义事务性 Grain
// 定义接口
public interface IAccountGrain : IGrainWithStringKey
{
[Transaction(TransactionOption.Join)]
Task<decimal> GetBalance();
[Transaction(TransactionOption.Join)]
Task Withdraw(decimal amount);
[Transaction(TransactionOption.Join)]
Task Deposit(decimal amount);
}
// 实现
[Reentrant]
public class AccountGrain : Grain, IAccountGrain
{
private readonly ITransactionalState<AccountState> _balance;
public AccountGrain(
[TransactionalState("balance", "TransactionStore")]
ITransactionalState<AccountState> balance)
{
_balance = balance;
}
public Task<decimal> GetBalance()
{
return _balance.PerformRead(state => state.Balance);
}
public Task Withdraw(decimal amount)
{
return _balance.PerformUpdate(state =>
{
if (state.Balance < amount)
throw new InvalidOperationException("Insufficient funds");
state.Balance -= amount;
});
}
public Task Deposit(decimal amount)
{
return _balance.PerformUpdate(state =>
{
state.Balance += amount;
});
}
}
8.3 执行事务
public class TransferService
{
private readonly ITransactionClient _transactionClient;
private readonly IGrainFactory _grainFactory;
public async Task Transfer(string fromAccount, string toAccount, decimal amount)
{
await _transactionClient.RunTransaction(
TransactionOption.Create,
async () =>
{
var from = _grainFactory.GetGrain<IAccountGrain>(fromAccount);
var to = _grainFactory.GetGrain<IAccountGrain>(toAccount);
await from.Withdraw(amount);
await to.Deposit(amount);
});
}
}
---
9. 高级特性
9.1 Observer 模式
允许客户端接收 Grain 状态变化通知:
// 定义 Observer 接口
public interface IChatObserver : IGrainObserver
{
void ReceiveMessage(string from, string message);
void UserJoined(string username);
void UserLeft(string username);
}
// Grain 发送通知
public class ChatRoomGrain : Grain, IChatRoomGrain
{
private readonly List<IGrainObserver> _observers = new();
public Task Subscribe(IChatObserver observer)
{
_observers.Add(observer);
return Task.CompletedTask;
}
public Task SendMessage(string from, string message)
{
foreach (var observer in _observers.OfType<IChatObserver>())
{
observer.ReceiveMessage(from, message);
}
return Task.CompletedTask;
}
}
// 客户端使用
public class ChatClient : IChatObserver
{
public void ReceiveMessage(string from, string message)
{
Console.WriteLine($"{from}: {message}");
}
public async Task Join(IChatRoomGrain room)
{
var observer = new ChatClient();
var reference = await this.GrainFactory.CreateObjectReference<IChatObserver>(observer);
await room.Subscribe(reference);
}
}
9.2 Grain Placement 策略
// 随机放置(默认)
[RandomPlacement]
public class RandomGrain : Grain { }
// 基于激活数放置
[ActivationCountBasedPlacement]
public class BalancedGrain : Grain { }
// 资源优化放置(Orleans 9.x 默认)
[ResourceOptimizedPlacement]
public class OptimizedGrain : Grain { }
// 优先本地放置
[PreferLocalPlacement]
public class LocalGrain : Grain { }
// 自定义放置
[PlacementStrategy(typeof(MyCustomPlacement))]
public class CustomGrain : Grain { }
// 配置权重
silo.Configure<ResourceOptimizedPlacementOptions>(options =>
{
options.CpuUsageWeight = 40;
options.MemoryUsageWeight = 30;
options.AvailableMemoryWeight = 20;
options.MaxAvailableMemoryWeight = 10;
});
9.3 Grain 目录
// 自定义 Grain 目录
siloBuilder.UseAdoNetGrainDirectory(options =>
{
options.ConnectionString = connectionString;
options.Invariant = "Microsoft.Data.SqlClient";
});
9.4 请求上下文
// 设置请求上下文(传递元数据)
RequestContext.Set("TraceId", Guid.NewGuid());
RequestContext.Set("UserId", "user123");
// 在 Grain 中读取
public class TracedGrain : Grain, ITracedGrain
{
public Task DoWork()
{
var traceId = RequestContext.Get("TraceId");
var userId = RequestContext.Get("UserId");
Console.WriteLine($"TraceId: {traceId}, UserId: {userId}");
return Task.CompletedTask;
}
}
9.5 IAsyncEnumerable 支持
// Grain 返回异步流
public interface IStreamGrain : IGrainWithStringKey
{
IAsyncEnumerable<string> GetUpdates(CancellationToken cancellationToken = default);
}
public class StreamGrain : Grain, IStreamGrain
{
private readonly Channel<string> _channel = Channel.CreateUnbounded<string>();
public async Task Publish(string message)
{
await _channel.Writer.WriteAsync(message);
}
public IAsyncEnumerable<string> GetUpdates(CancellationToken cancellationToken)
{
return _channel.Reader.ReadAllAsync(cancellationToken);
}
}
// 客户端使用
await foreach (var update in grain.GetUpdates().WithCancellation(cancellationToken))
{
Console.WriteLine(update);
}
9.6 POCO Grain
无需继承 Grain 基类:
public class PocoGrain : IGrainBase, IMyGrain
{
public IGrainContext GrainContext { get; }
private readonly ILogger<PocoGrain> _logger;
private string _value = "";
public PocoGrain(IGrainContext context, ILogger<PocoGrain> logger)
{
GrainContext = context;
_logger = logger;
}
public Task OnActivateAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Activated");
return Task.CompletedTask;
}
public Task<string> GetValue() => Task.FromResult(_value);
public Task SetValue(string value)
{
_value = value;
return Task.CompletedTask;
}
}
---
10. 与其他技术集成
10.1 ASP.NET Core 集成
var builder = WebApplication.CreateBuilder(args);
// 配置 Orleans
builder.Host.UseOrleans(silo =>
{
silo.UseLocalhostClustering();
silo.AddMemoryGrainStorage("Default");
});
var app = builder.Build();
// Minimal API
app.MapGet("/users/{id}", async (IGrainFactory grains, string id) =>
{
var grain = grains.GetGrain<IUserGrain>(id);
return await grain.GetProfile();
});
app.MapPost("/users/{id}", async (IGrainFactory grains, string id, UserProfile profile) =>
{
var grain = grains.GetGrain<IUserGrain>(id);
await grain.UpdateProfile(profile);
return Results.Ok();
});
app.Run();
10.2 SignalR 集成
dotnet add package SignalR.Orleans
// 配置 Silo
builder.Host.UseOrleans(silo =>
{
silo.UseSignalR();
});
// 配置 SignalR
builder.Services.AddSignalR().AddOrleans();
// Hub
public class ChatHub : Hub
{
public async Task SendMessage(string room, string message)
{
await Clients.Group(room).SendAsync("ReceiveMessage", message);
}
}
// 从 Grain 发送消息
public class NotificationGrain : Grain
{
private readonly IHubContext<ChatHub> _hubContext;
public NotificationGrain(IHubContext<ChatHub> hubContext)
{
_hubContext = hubContext;
}
public async Task Broadcast(string room, string message)
{
await _hubContext.Clients.Group(room).SendAsync("ReceiveMessage", message);
}
}
10.3 gRPC 集成
// gRPC 服务
public class GreeterService : Greeter.GreeterBase
{
private readonly IGrainFactory _grainFactory;
public GreeterService(IGrainFactory grainFactory)
{
_grainFactory = grainFactory;
}
public override async Task<HelloReply> SayHello(HelloRequest request, ServerCallContext context)
{
var grain = _grainFactory.GetGrain<IHelloGrain>(request.Name);
var message = await grain.SayHello(request.Name);
return new HelloReply { Message = message };
}
}
// 配置
builder.Services.AddGrpc();
builder.Host.UseOrleans(silo => silo.UseLocalhostClustering());
var app = builder.Build();
app.MapGrpcService<GreeterService>();
10.4 .NET Aspire 集成
// AppHost
var orleans = builder.AddOrleans("orleans")
.WithClustering(builder.AddAzureTableStorage("clustering"))
.WithGrainStorage("Default", builder.AddAzureBlobStorage("grains"));
builder.AddProject<Projects.Silo>("silo")
.WithReference(orleans);
builder.AddProject<Projects.Client>("client")
.WithReference(orleans.AsClient());
---
11. 测试策略
11.1 集成测试
public class OrleansTestBase : IAsyncLifetime
{
protected TestCluster Cluster { get; private set; } = null!;
public async Task InitializeAsync()
{
var builder = new TestClusterBuilder();
builder.AddSiloBuilderConfigurator<TestSiloConfig>();
Cluster = builder.Build();
await Cluster.DeployAsync();
}
public async Task DisposeAsync()
{
await Cluster.StopAllSilosAsync();
}
}
public class TestSiloConfig : ISiloConfigurator
{
public void Configure(ISiloBuilder siloBuilder)
{
siloBuilder.AddMemoryGrainStorage("Default");
}
}
// 测试类
public class UserGrainTests : OrleansTestBase
{
[Fact]
public async Task GetProfile_ReturnsStoredProfile()
{
// Arrange
var grain = Cluster.GrainFactory.GetGrain<IUserGrain>("test-user");
await grain.SetName("John Doe");
// Act
var profile = await grain.GetProfile();
// Assert
Assert.Equal("John Doe", profile.Name);
}
[Fact]
public async Task AddPoints_IncreasesTotalPoints()
{
// Arrange
var grain = Cluster.GrainFactory.GetGrain<IUserGrain>("test-user");
await grain.AddPoints(100);
await grain.AddPoints(50);
// Act
var points = await grain.GetPoints();
// Assert
Assert.Equal(150, points);
}
}
11.2 Mock 单元测试
public class OrderProcessorTests
{
[Fact]
public async Task ProcessOrder_WhenInventoryAvailable_ProcessesSuccessfully()
{
// Arrange
var mockInventory = new Mock<IInventoryGrain>();
mockInventory.Setup(x => x.CheckAvailability(10))
.ReturnsAsync(true);
var mockPayment = new Mock<IPaymentGrain>();
var mockNotification = new Mock<INotificationGrain>();
var mockFactory = new Mock<IGrainFactory>();
mockFactory.Setup(x => x.GetGrain<IInventoryGrain>("product-1", null))
.Returns(mockInventory.Object);
// Act & Assert
// ... 测试逻辑
}
}
11.3 故障注入测试
public class FaultToleranceTests : OrleansTestBase
{
[Fact]
public async Task GrainRecovers_AfterSiloCrash()
{
// Arrange
var grain = Cluster.GrainFactory.GetGrain<IUserGrain>("test");
await grain.SetName("Before Crash");
// Act - 停止 Silo
var siloToStop = Cluster.Silos.First();
await Cluster.StopSiloAsync(siloToStop);
// 启动新 Silo
await Cluster.StartSiloAsync();
// Assert - Grain 应该能恢复
var name = await grain.GetName();
Assert.Equal("Before Crash", name);
}
}
---
12. 生产部署
12.1 集群配置
builder.Host.UseOrleans(silo =>
{
// 集群配置
silo.Configure<ClusterOptions>(options =>
{
options.ClusterId = "production-cluster";
options.ServiceId = "my-service";
});
// Azure Storage 集群
silo.UseAzureStorageClustering(options =>
{
options.ConfigureTableServiceClient(connectionString);
});
// 端点配置
silo.ConfigureEndpoints(
siloPort: 11111,
gatewayPort: 30000,
listenOnAnyHostAddress: true);
// 持久化
silo.AddAzureTableGrainStorage("Default", options =>
{
options.ConfigureTableServiceClient(connectionString);
});
// Reminders
silo.UseAzureTableReminderService(options =>
{
options.ConfigureTableServiceClient(connectionString);
});
});
12.2 Kubernetes 部署
apiVersion: apps/v1
kind: Deployment
metadata:
name: orleans-silo
spec:
replicas: 3
selector:
matchLabels:
app: orleans-silo
template:
metadata:
labels:
app: orleans-silo
orleans-cluster: production
spec:
containers:
- name: silo
image: myregistry/orleans-app:latest
ports:
- containerPort: 11111
name: silo
- containerPort: 30000
name: gateway
env:
- name: ORLEANS_CLUSTER_ID
value: "production"
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "2000m"
livenessProbe:
httpGet:
path: /health
port: 80
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health/ready
port: 80
initialDelaySeconds: 10
periodSeconds: 5
12.3 健康检查
builder.Services.AddHealthChecks()
.AddCheck<OrleansHealthCheck>("orleans");
public class OrleansHealthCheck : IHealthCheck
{
private readonly IClusterClient _client;
public OrleansHealthCheck(IClusterClient client)
{
_client = client;
}
public async Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken cancellationToken = default)
{
try
{
var managementGrain = _client.GetGrain<IManagementGrain>(0);
var hosts = await managementGrain.GetHosts();
return hosts.Count > 0
? HealthCheckResult.Healthy($"{hosts.Count} silos active")
: HealthCheckResult.Unhealthy("No silos in cluster");
}
catch (Exception ex)
{
return HealthCheckResult.Unhealthy("Orleans check failed", ex);
}
}
}
12.4 GC 配置
<!-- 项目文件 -->
<PropertyGroup>
<ServerGarbageCollection>true</ServerGarbageCollection>
<ConcurrentGarbageCollection>true</ConcurrentGarbageCollection>
</PropertyGroup>
12.5 监控
// OpenTelemetry
builder.Services.AddOpenTelemetry()
.WithMetrics(metrics => metrics
.AddPrometheusExporter()
.AddMeter("Microsoft.Orleans"))
.WithTracing(tracing => tracing
.AddSource("Microsoft.Orleans.Runtime")
.AddSource("Microsoft.Orleans.Application"));
// Orleans Dashboard (Orleans 10+)
builder.Host.UseOrleans(silo =>
{
silo.UseDashboard(options =>
{
options.Port = 8080;
options.HostSelf = true;
});
});
---
13. 最佳实践与避坑指南
13.1 设计原则
| 原则 | 说明 |
|---|---|
| 细粒度 Grain | 多个小 Grain > 少量大 Grain |
| 避免 Chatty | 减少 Grain 间频繁通信 |
| 无阻塞 | 所有操作必须异步 |
| 避免热点 | 不要有单一协调 Grain |
13.2 性能优化
// ✅ 好:批量处理
public async Task ProcessBatch(List<Item> items)
{
var tasks = items.Select(item => ProcessItem(item));
await Task.WhenAll(tasks);
}
// ❌ 差:逐个等待
public async Task ProcessSequential(List<Item> items)
{
foreach (var item in items)
{
await ProcessItem(item); // 串行等待
}
}
// ✅ 好:使用 ETag 避免写入冲突
public async Task UpdateWithETag()
{
var (state, etag) = await _state.GetStateAndETag();
// 修改 state
await _state.WriteStateAsync(state, etag);
}
// ✅ 好:配置连接池
silo.Configure<ConnectionOptions>(options =>
{
options.ConnectionPoolSize = 100;
});
13.3 常见陷阱
| 陷阱 | 解决方案 |
|---|---|
| 阻塞调用 | 使用 async/await |
| 死锁 (A→B→A) | 使用 [Reentrant] 或重新设计 |
| 每次调用 new HttpClient | 使用 IHttpClientFactory |
| 忘记 WriteStateAsync | 状态修改后立即持久化 |
| 单一热点 Grain | 分片或使用 StatelessWorker |
13.4 序列化优化
// ✅ 好:使用 GenerateSerializer
[GenerateSerializer]
public class MyData
{
[Id(0)]
public string Name { get; set; } = "";
[Id(1)]
public int Value { get; set; }
}
// ❌ 差:依赖默认序列化(使用反射)
public class MyData
{
public string Name { get; set; }
public int Value { get; set; }
}
// ✅ 好:使用 [Alias] 保证版本兼容
[GenerateSerializer]
[Alias("MyApp.MyData")]
public class MyData
{
[Id(0)]
[Alias("nm")]
public string Name { get; set; } = "";
}
---
14. 附录
14.1 NuGet 包速查
| 包名 | 用途 |
|---|---|
Microsoft.Orleans.Server | Silo 服务端 |
Microsoft.Orleans.Client | 独立客户端 |
Microsoft.Orleans.Sdk | SDK(含代码生成) |
Microsoft.Orleans.Persistence.AzureStorage | Azure 存储 |
Microsoft.Orleans.Persistence.Cosmos | Cosmos DB |
Microsoft.Orleans.Persistence.AdoNet | SQL 数据库 |
Microsoft.Orleans.Persistence.DynamoDB | DynamoDB |
Microsoft.Orleans.Streaming.AzureStorage | Azure Queue |
Microsoft.Orleans.Streaming.EventHubs | Event Hubs |
Orleans.Contrib.Persistence.Redis | Redis |
SignalR.Orleans | SignalR 集成 |
14.2 性能参考
| 场景 | 预期吞吐量 |
|---|---|
| 单 Grain | ~1,000 req/s |
| 单 Silo (8核) | ~10,000 req/s |
| 单 Silo 活跃 Grain | ~100,000 |
| 集群规模 | 1000+ Silos |
14.3 官方资源
- GitHub: https://github.com/dotnet/orleans
- 官方文档: https://learn.microsoft.com/dotnet/orleans
- 示例代码: https://github.com/dotnet/orleans/tree/main/samples
- 社区: https://github.com/dotnet/orleans/discussions
总结
Microsoft Orleans 是构建分布式系统的强大工具,其 Virtual Actor 模型大大简化了分布式编程的复杂性。通过本教程,你应该掌握了:
1. 核心概念 - Grain、Silo、Cluster 2. 状态管理 - 持久化存储配置 3. 消息传递 - Stream 和 Observer 模式 4. 定时任务 - Timer 和 Reminder 5. 分布式事务 - ACID 事务支持 6. 生产部署 - Kubernetes、监控、健康检查
记住核心原则:让每个 Grain 保持简单、独立,让 Orleans 运行时处理分布式复杂性。
---
*文档版本: Orleans 9.x/10.x* *最后更新: 2025年2月*