# Orleans 从入门到精通
## 目录
1. [什么是 Orleans](#1-什么是-orleans)
2. [核心概念](#2-核心概念)
3. [快速入门](#3-快速入门)
4. [Grain 详解](#4-grain-详解)
5. [状态持久化](#5-状态持久化)
6. [流处理](#6-流处理)
7. [定时器与提醒](#7-定时器与提醒)
8. [分布式事务](#8-分布式事务)
9. [高级特性](#9-高级特性)
10. [与其他技术集成](#10-与其他技术集成)
11. [测试策略](#11-测试策略)
12. [生产部署](#12-生产部署)
13. [最佳实践与避坑指南](#13-最佳实践与避坑指南)
14. [附录](#14-附录)
---
## 1. 什么是 Orleans
### 1.1 简介
**Microsoft Orleans** 是一个跨平台的 .NET 框架,用于构建分布式、可扩展的应用程序。它由 Microsoft Research 发明,采用了创新的 **Virtual Actor 模型**,让开发者可以像编写单机应用一样编写分布式系统。
Orleans 已被广泛应用于:
- **Xbox Live** - 数千万玩家在线服务
- **Halo 4/5** - 游戏云服务
- **Skype** - 实时通信后端
- **Azure IoT** - 物联网设备管理
- **Azure Digital Twins** - 数字孪生服务
- **PlayFab** - 游戏后端平台
### 1.2 为什么选择 Orleans
| 优势 | 说明 |
|------|------|
| **Virtual Actor** | 无需手动创建/销毁,自动生命周期管理 |
| **位置透明** | 调用者无需知道 Actor 物理位置 |
| **弹性扩展** | 轻松从单节点扩展到数千节点 |
| **容错能力** | 自动故障恢复,状态可持久化 |
| **简化开发** | 用面向对象方式编写分布式系统 |
| **成熟稳定** | 微软生产环境验证超过 10 年 |
### 1.3 Orleans vs 其他 Actor 框架
| 特性 | Orleans | Akka.NET | Proto.Actor |
|------|---------|----------|-------------|
| Actor 模型 | Virtual Actor | 经典 Actor | 经典 Actor |
| 生命周期 | 自动管理 | 手动控制 | 手动控制 |
| 性能 (msg/s) | ~100K | ~120K | ~150K |
| 内存效率 | **最佳** | 中等 | 中等 |
| 学习曲线 | 平缓 | 陡峭 | 中等 |
| 云原生支持 | **最佳** | 良好 | 良好 |
---
## 2. 核心概念
### 2.1 架构概览
```
┌─────────────────────────────────────────────────────────┐
│ Cluster │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ Silo 1 │◄──►│ Silo 2 │◄──►│ Silo 3 │ │
│ │ ┌─────────┐ │ │ ┌─────────┐ │ │ ┌─────────┐ │ │
│ │ │ Grain A │ │ │ │ Grain B │ │ │ │ Grain C │ │ │
│ │ │ Grain D │ │ │ │ Grain E │ │ │ │ Grain F │ │ │
│ │ └─────────┘ │ │ └─────────┘ │ │ └─────────┘ │ │
│ └──────┬──────┘ └──────┬──────┘ └──────┬──────┘ │
│ │ │ │ │
│ └──────────────────┼──────────────────┘ │
│ │ │
│ ┌──────┴──────┐ │
│ │ Gateway │ │
│ └──────┬──────┘ │
└────────────────────────────┼────────────────────────────┘
│
┌──────┴──────┐
│ Clients │
└─────────────┘
```
### 2.2 Grain(谷物)
**Grain** 是 Orleans 中的核心抽象,代表一个虚拟 Actor。每个 Grain 有:
- **唯一标识** - 通过 Key 定位
- **状态** - 可持久化的数据
- **行为** - 通过接口定义的方法
```csharp
// 定义 Grain 接口
public interface IUserGrain : IGrainWithStringKey
{
Task<string> GetName();
Task SetName(string name);
Task AddPoints(int points);
Task<int> GetPoints();
}
// 实现 Grain
public class UserGrain : Grain, IUserGrain
{
private string _name = "";
private int _points = 0;
public Task<string> GetName() => Task.FromResult(_name);
public Task SetName(string name)
{
_name = name;
return Task.CompletedTask;
}
public Task<int> GetPoints() => Task.FromResult(_points);
public Task AddPoints(int points)
{
_points += points;
return Task.CompletedTask;
}
}
```
### 2.3 Silo(筒仓)
**Silo** 是托管 Grain 的运行时容器,负责:
- Grain 的激活和停用
- 消息路由和分发
- 状态持久化
- 故障检测和恢复
```csharp
// 配置 Silo
var builder = Host.CreateApplicationBuilder(args);
builder.UseOrleans(silo =>
{
silo.UseLocalhostClustering();
silo.AddMemoryGrainStorage("Default");
});
```
### 2.4 Cluster(集群)
**Cluster** 是一组协作的 Silo,提供:
- 横向扩展能力
- 高可用性
- 负载均衡
### 2.5 Grain Key 类型
| 接口 | Key 类型 | 用例 |
|------|---------|------|
| `IGrainWithGuidKey` | Guid | 自动生成的唯一 ID |
| `IGrainWithIntegerKey` | long | 数值 ID(如数据库主键) |
| `IGrainWithStringKey` | string | 字符串标识(如用户名) |
| `IGrainWithGuidCompoundKey` | Guid + string | 复合键 |
| `IGrainWithIntegerCompoundKey` | long + string | 复合键 |
```csharp
// 不同 Key 类型示例
var userByGuid = GrainFactory.GetGrain<IUserGrain>(Guid.NewGuid());
var userById = GrainFactory.GetGrain<IUserGrain>(123456L);
var userByName = GrainFactory.GetGrain<IUserGrain>("john_doe");
```
---
## 3. 快速入门
### 3.1 安装包
```bash
# 服务端
dotnet add package Microsoft.Orleans.Server
# 客户端
dotnet add package Microsoft.Orleans.Client
# SDK(包含代码生成)
dotnet add package Microsoft.Orleans.Sdk
# 分析器(可选,提供编译时检查)
dotnet add package Microsoft.Orleans.Analyzers
```
### 3.2 创建第一个 Orleans 应用
**Step 1: 定义 Grain 接口**
```csharp
// IHelloGrain.cs
public interface IHelloGrain : IGrainWithStringKey
{
Task<string> SayHello(string name);
}
```
**Step 2: 实现 Grain**
```csharp
// HelloGrain.cs
public class HelloGrain : Grain, IHelloGrain
{
public Task<string> SayHello(string name)
{
return Task.FromResult($"Hello, {name}!");
}
}
```
**Step 3: 配置并运行 Silo**
```csharp
// Program.cs
var builder = WebApplication.CreateBuilder(args);
// 配置 Orleans
builder.Host.UseOrleans(silo =>
{
silo.UseLocalhostClustering();
silo.AddMemoryGrainStorage("Default");
});
var app = builder.Build();
// 添加 API 端点
app.MapGet("/hello/{name}", async (IGrainFactory grains, string name) =>
{
var grain = grains.GetGrain<IHelloGrain>(name);
return await grain.SayHello(name);
});
app.Run();
```
**Step 4: 运行**
```bash
dotnet run
curl http://localhost:5000/hello/World
# 输出: "Hello, World!"
```
### 3.3 独立客户端连接
```csharp
// 独立客户端程序
using var client = new ClientBuilder()
.UseLocalhostClustering()
.Build();
await client.Connect();
var grain = client.GetGrain<IHelloGrain>("test");
var result = await grain.SayHello("World");
Console.WriteLine(result); // "Hello, World!"
```
---
## 4. Grain 详解
### 4.1 Grain 生命周期
```
┌─────────────┐
│ Created │ Grain 被请求但尚未激活
└──────┬──────┘
│
▼
┌─────────────┐
│ Activating │ 调用 OnActivateAsync
└──────┬──────┘
│
▼
┌─────────────┐
│ Active │ Grain 可响应请求
└──────┬──────┘
│
▼
┌─────────────┐
│Deactivating │ 调用 OnDeactivateAsync
└──────┬──────┘
│
▼
┌─────────────┐
│ Deactivated │ Grain 从内存移除
└─────────────┘
```
```csharp
public class LifecycleGrain : Grain, ILifecycleGrain
{
private readonly ILogger<LifecycleGrain> _logger;
public LifecycleGrain(ILogger<LifecycleGrain> logger)
{
_logger = logger;
}
public override Task OnActivateAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Grain {Id} activating", this.GetPrimaryKeyString());
return base.OnActivateAsync(cancellationToken);
}
public override Task OnDeactivateAsync(DeactivationReason reason, CancellationToken cancellationToken)
{
_logger.LogInformation("Grain {Id} deactivating: {Reason}",
this.GetPrimaryKeyString(), reason);
return base.OnDeactivateAsync(reason, cancellationToken);
}
}
```
### 4.2 Grain 之间调用
```csharp
public class OrderGrain : Grain, IOrderGrain
{
public async Task ProcessOrder(Order order)
{
// 获取其他 Grain
var inventory = GrainFactory.GetGrain<IInventoryGrain>(order.ProductId);
var payment = GrainFactory.GetGrain<IPaymentGrain>(order.UserId);
var notification = GrainFactory.GetGrain<INotificationGrain>(order.UserId);
// 检查库存
var available = await inventory.CheckAvailability(order.Quantity);
if (!available)
{
throw new InvalidOperationException("Insufficient inventory");
}
// 处理支付
await payment.Charge(order.Amount);
// 扣减库存
await inventory.Deduct(order.Quantity);
// 发送通知
await notification.Send($"Order {order.Id} processed successfully");
}
}
```
### 4.3 不可重入与可重入
默认情况下,Grain 是**不可重入**的,即同一时刻只能处理一个请求。
```csharp
// 默认:不可重入(推荐)
public class SafeGrain : Grain, ISafeGrain
{
// 同一时刻只有一个请求进入
}
// 可重入:允许交错执行
[Reentrant]
public class ReentrantGrain : Grain, IReentrantGrain
{
// 允许在等待时处理其他请求
// 警告:需要确保线程安全
}
// 方法级别可重入
public class MixedGrain : Grain, IMixedGrain
{
[AlwaysInterleave]
public Task<int> GetCounter() => Task.FromResult(_counter);
public Task IncrementCounter() // 不可重入
{
_counter++;
return Task.CompletedTask;
}
}
```
### 4.4 多重激活与 Stateless Worker
```csharp
// Stateless Worker: 可在多个 Silo 同时激活
[StatelessWorker(maxLocalWorkers: 10)]
public class StatelessGrain : Grain, IStatelessGrain
{
public Task<string> Process(string input)
{
// 无状态处理
return Task.FromResult(input.ToUpper());
}
}
// 使用场景:计算密集型、无状态转换
```
### 4.5 Grain 版本兼容性
```csharp
// 使用 [Version] 属性标记版本
[Version(2)]
public interface IVersionedGrain : IGrainWithIntegerKey
{
Task<string> GetValue();
Task SetValue(string value);
// 版本 2 新增方法
Task<int> GetVersion();
}
```
---
## 5. 状态持久化
### 5.1 持久化存储提供者
Orleans 支持多种存储后端:
| 存储类型 | NuGet 包 | 特点 |
|---------|---------|------|
| Memory | 内置 | 仅用于开发测试 |
| Azure Table | `Microsoft.Orleans.Persistence.AzureStorage` | 经济高效 |
| Azure Blob | `Microsoft.Orleans.Persistence.AzureStorage` | 大对象存储 |
| Azure Cosmos | `Microsoft.Orleans.Persistence.Cosmos` | 全球分布 |
| Redis | `Orleans.Contrib.Persistence.Redis` | 高性能缓存 |
| SQL Server | `Microsoft.Orleans.Persistence.AdoNet` | 关系数据库 |
| PostgreSQL | `Microsoft.Orleans.Persistence.AdoNet` | 开源数据库 |
| DynamoDB | `Microsoft.Orleans.Persistence.DynamoDB` | AWS 原生 |
### 5.2 配置存储
```csharp
builder.Host.UseOrleans(silo =>
{
// Azure Table Storage
silo.AddAzureTableGrainStorage("tableStore", options =>
{
options.ConfigureTableServiceClient(connectionString);
});
// Azure Blob Storage
silo.AddAzureBlobGrainStorage("blobStore", options =>
{
options.ConfigureBlobServiceClient(connectionString);
});
// Redis
silo.AddRedisGrainStorage("redis", options =>
{
options.DataConnectionString = redisConnectionString;
});
// ADO.NET (SQL Server)
silo.AddAdoNetGrainStorage("sql", options =>
{
options.ConnectionString = sqlConnectionString;
options.Invariant = "Microsoft.Data.SqlClient";
});
});
```
### 5.3 使用 IPersistentState(推荐)
```csharp
// 定义状态类
[GenerateSerializer]
public class UserProfileState
{
[Id(0)]
public string Name { get; set; } = "";
[Id(1)]
public string Email { get; set; } = "";
[Id(2)]
public int Points { get; set; }
[Id(3)]
public List<string> Achievements { get; set; } = new();
}
// 实现 Grain
public class UserProfileGrain : Grain, IUserProfileGrain
{
private readonly IPersistentState<UserProfileState> _state;
public UserProfileGrain(
[PersistentState("profile", "tableStore")]
IPersistentState<UserProfileState> state)
{
_state = state;
}
public Task<UserProfileState> GetProfile()
{
return Task.FromResult(_state.State);
}
public async Task UpdateName(string name)
{
_state.State.Name = name;
await _state.WriteStateAsync();
}
public async Task AddPoints(int points)
{
_state.State.Points += points;
await _state.WriteStateAsync();
}
public async Task AddAchievement(string achievement)
{
_state.State.Achievements.Add(achievement);
await _state.WriteStateAsync();
}
}
```
### 5.4 状态操作
```csharp
public class StateOperationsGrain : Grain, IStateOperationsGrain
{
private readonly IPersistentState<MyState> _state;
// 写入状态
public async Task SaveState()
{
_state.State.Value = "New Value";
await _state.WriteStateAsync();
}
// 读取状态
public async Task RefreshState()
{
await _state.ReadStateAsync();
}
// 清除状态
public async Task ClearState()
{
await _state.ClearStateAsync();
}
}
```
### 5.5 Grain<TState> 方式(传统)
```csharp
[StorageProvider(ProviderName = "tableStore")]
public class LegacyGrain : Grain<LegacyState>, ILegacyGrain
{
public async Task UpdateValue(string value)
{
State.Value = value;
await WriteStateAsync();
}
public Task<string> GetValue()
{
return Task.FromResult(State.Value);
}
}
```
---
## 6. 流处理
### 6.1 流概述
Orleans Streams 提供了一种优雅的方式来实现发布-订阅模式:
```
┌─────────┐ ┌─────────────┐ ┌─────────┐
│Producer │───►│ Stream │───►│Consumer │
│ Grain │ │ (Queue) │ │ Grain │
└─────────┘ └─────────────┘ └─────────┘
```
### 6.2 配置流提供者
```csharp
builder.Host.UseOrleans(silo =>
{
// Azure Queue Streams
silo.AddAzureQueueStreams("AzureQueue", options =>
{
options.ConfigureQueueServiceClient(connectionString);
});
// Simple Message Stream (内存)
silo.AddMemoryStreams("MemoryStream");
// Azure Event Hubs (可回溯)
silo.AddEventHubStreams("EventHub", options =>
{
options.ConfigureEventHub(builder =>
{
builder.ConfigureEventHubConnection(connectionString, "hubName", "consumerGroup");
});
});
});
```
### 6.3 生产者
```csharp
public class ProducerGrain : Grain, IProducerGrain
{
public async Task ProduceEvents()
{
var streamProvider = this.GetStreamProvider("AzureQueue");
var streamId = StreamId.Create("ChatRoom", "general");
var stream = streamProvider.GetStream<string>(streamId);
// 发送消息
await stream.OnNextAsync("Hello, everyone!");
await stream.OnNextAsync("How are you?");
}
}
```
### 6.4 消费者(显式订阅)
```csharp
public class ConsumerGrain : Grain, IConsumerGrain
{
private StreamSubscriptionHandle<string>? _subscription;
public override async Task OnActivateAsync(CancellationToken cancellationToken)
{
var streamProvider = this.GetStreamProvider("AzureQueue");
var streamId = StreamId.Create("ChatRoom", this.GetPrimaryKeyString());
var stream = streamProvider.GetStream<string>(streamId);
// 订阅流
_subscription = await stream.SubscribeAsync(OnNextAsync, OnErrorAsync, OnCompletedAsync);
}
private Task OnNextAsync(string item, StreamSequenceToken? token)
{
Console.WriteLine($"Received: {item}");
return Task.CompletedTask;
}
private Task OnErrorAsync(Exception ex)
{
Console.WriteLine($"Error: {ex.Message}");
return Task.CompletedTask;
}
private Task OnCompletedAsync()
{
Console.WriteLine("Stream completed");
return Task.CompletedTask;
}
}
```
### 6.5 隐式订阅
```csharp
// 隐式订阅:Grain 根据属性自动订阅
[ImplicitStreamSubscription("ChatRoom")]
public class ChatRoomGrain : Grain, IChatRoomGrain, IAsyncObserver<string>
{
public override async Task OnActivateAsync(CancellationToken cancellationToken)
{
var streamProvider = this.GetStreamProvider("AzureQueue");
var streamId = StreamId.Create("ChatRoom", this.GetPrimaryKeyString());
var stream = streamProvider.GetStream<string>(streamId);
// 获取或创建订阅句柄
var handles = await stream.GetAllSubscriptionHandles();
if (handles.Count > 0)
{
await handles[0].ResumeAsync(this);
}
else
{
await stream.SubscribeAsync(this);
}
}
public Task OnNextAsync(string item, StreamSequenceToken? token = null)
{
Console.WriteLine($"ChatRoom {GetPrimaryKeyString()}: {item}");
return Task.CompletedTask;
}
}
```
---
## 7. 定时器与提醒
### 7.1 Grain Timer(非持久化)
适用于高频、临时性任务:
```csharp
public class TimerGrain : Grain, ITimerGrain
{
private IGrainTimer? _timer;
private int _counter = 0;
public override Task OnActivateAsync(CancellationToken cancellationToken)
{
// Orleans 8.2+ 新 API
_timer = this.RegisterGrainTimer(
static (state, ct) => state.DoPeriodicWork(ct),
this,
new GrainTimerCreationOptions
{
DueTime = TimeSpan.FromSeconds(5),
Period = TimeSpan.FromSeconds(10),
KeepAlive = true, // 防止 Grain 被回收
Interleave = false // 不与其他调用交错
});
return Task.CompletedTask;
}
private Task DoPeriodicWork(CancellationToken cancellationToken)
{
_counter++;
Console.WriteLine($"Counter: {_counter}");
return Task.CompletedTask;
}
public override Task OnDeactivateAsync(DeactivationReason reason, CancellationToken cancellationToken)
{
_timer?.Dispose();
return Task.CompletedTask;
}
}
```
### 7.2 Reminder(持久化)
适用于低频、持久性任务:
```csharp
// 配置 Reminder 存储
builder.Host.UseOrleans(silo =>
{
silo.UseAzureTableReminderService(options =>
{
options.ConfigureTableServiceClient(connectionString);
});
});
// 实现 IRemindable
public class ReminderGrain : Grain, IReminderGrain, IRemindable
{
private IGrainReminder? _reminder;
public async Task StartReminder()
{
_reminder = await this.RegisterOrUpdateReminder(
"DailyCleanup",
dueTime: TimeSpan.FromMinutes(1),
period: TimeSpan.FromHours(24));
}
public async Task StopReminder()
{
if (_reminder != null)
{
await this.UnregisterReminder(_reminder);
}
}
public Task ReceiveReminder(string reminderName, TickStatus status)
{
Console.WriteLine($"Reminder {reminderName} fired at {status.CurrentTickTime}");
return Task.CompletedTask;
}
}
```
### 7.3 Timer vs Reminder
| 特性 | Grain Timer | Reminder |
|------|-------------|----------|
| 持久化 | ❌ | ✅ |
| 频率 | 高(毫秒级) | 低(分钟级) |
| Grain 停用后 | 消失 | 继续触发 |
| 存储 | 内存 | 数据库 |
| 适用场景 | 临时任务 | 定期清理、报告 |
---
## 8. 分布式事务
### 8.1 配置事务
```csharp
builder.Host.UseOrleans(silo =>
{
silo.UseTransactions();
silo.AddAzureTableTransactionalStateStorage("TransactionStore", options =>
{
options.ConfigureTableServiceClient(connectionString);
});
});
```
### 8.2 定义事务性 Grain
```csharp
// 定义接口
public interface IAccountGrain : IGrainWithStringKey
{
[Transaction(TransactionOption.Join)]
Task<decimal> GetBalance();
[Transaction(TransactionOption.Join)]
Task Withdraw(decimal amount);
[Transaction(TransactionOption.Join)]
Task Deposit(decimal amount);
}
// 实现
[Reentrant]
public class AccountGrain : Grain, IAccountGrain
{
private readonly ITransactionalState<AccountState> _balance;
public AccountGrain(
[TransactionalState("balance", "TransactionStore")]
ITransactionalState<AccountState> balance)
{
_balance = balance;
}
public Task<decimal> GetBalance()
{
return _balance.PerformRead(state => state.Balance);
}
public Task Withdraw(decimal amount)
{
return _balance.PerformUpdate(state =>
{
if (state.Balance < amount)
throw new InvalidOperationException("Insufficient funds");
state.Balance -= amount;
});
}
public Task Deposit(decimal amount)
{
return _balance.PerformUpdate(state =>
{
state.Balance += amount;
});
}
}
```
### 8.3 执行事务
```csharp
public class TransferService
{
private readonly ITransactionClient _transactionClient;
private readonly IGrainFactory _grainFactory;
public async Task Transfer(string fromAccount, string toAccount, decimal amount)
{
await _transactionClient.RunTransaction(
TransactionOption.Create,
async () =>
{
var from = _grainFactory.GetGrain<IAccountGrain>(fromAccount);
var to = _grainFactory.GetGrain<IAccountGrain>(toAccount);
await from.Withdraw(amount);
await to.Deposit(amount);
});
}
}
```
---
## 9. 高级特性
### 9.1 Observer 模式
允许客户端接收 Grain 状态变化通知:
```csharp
// 定义 Observer 接口
public interface IChatObserver : IGrainObserver
{
void ReceiveMessage(string from, string message);
void UserJoined(string username);
void UserLeft(string username);
}
// Grain 发送通知
public class ChatRoomGrain : Grain, IChatRoomGrain
{
private readonly List<IGrainObserver> _observers = new();
public Task Subscribe(IChatObserver observer)
{
_observers.Add(observer);
return Task.CompletedTask;
}
public Task SendMessage(string from, string message)
{
foreach (var observer in _observers.OfType<IChatObserver>())
{
observer.ReceiveMessage(from, message);
}
return Task.CompletedTask;
}
}
// 客户端使用
public class ChatClient : IChatObserver
{
public void ReceiveMessage(string from, string message)
{
Console.WriteLine($"{from}: {message}");
}
public async Task Join(IChatRoomGrain room)
{
var observer = new ChatClient();
var reference = await this.GrainFactory.CreateObjectReference<IChatObserver>(observer);
await room.Subscribe(reference);
}
}
```
### 9.2 Grain Placement 策略
```csharp
// 随机放置(默认)
[RandomPlacement]
public class RandomGrain : Grain { }
// 基于激活数放置
[ActivationCountBasedPlacement]
public class BalancedGrain : Grain { }
// 资源优化放置(Orleans 9.x 默认)
[ResourceOptimizedPlacement]
public class OptimizedGrain : Grain { }
// 优先本地放置
[PreferLocalPlacement]
public class LocalGrain : Grain { }
// 自定义放置
[PlacementStrategy(typeof(MyCustomPlacement))]
public class CustomGrain : Grain { }
// 配置权重
silo.Configure<ResourceOptimizedPlacementOptions>(options =>
{
options.CpuUsageWeight = 40;
options.MemoryUsageWeight = 30;
options.AvailableMemoryWeight = 20;
options.MaxAvailableMemoryWeight = 10;
});
```
### 9.3 Grain 目录
```csharp
// 自定义 Grain 目录
siloBuilder.UseAdoNetGrainDirectory(options =>
{
options.ConnectionString = connectionString;
options.Invariant = "Microsoft.Data.SqlClient";
});
```
### 9.4 请求上下文
```csharp
// 设置请求上下文(传递元数据)
RequestContext.Set("TraceId", Guid.NewGuid());
RequestContext.Set("UserId", "user123");
// 在 Grain 中读取
public class TracedGrain : Grain, ITracedGrain
{
public Task DoWork()
{
var traceId = RequestContext.Get("TraceId");
var userId = RequestContext.Get("UserId");
Console.WriteLine($"TraceId: {traceId}, UserId: {userId}");
return Task.CompletedTask;
}
}
```
### 9.5 IAsyncEnumerable 支持
```csharp
// Grain 返回异步流
public interface IStreamGrain : IGrainWithStringKey
{
IAsyncEnumerable<string> GetUpdates(CancellationToken cancellationToken = default);
}
public class StreamGrain : Grain, IStreamGrain
{
private readonly Channel<string> _channel = Channel.CreateUnbounded<string>();
public async Task Publish(string message)
{
await _channel.Writer.WriteAsync(message);
}
public IAsyncEnumerable<string> GetUpdates(CancellationToken cancellationToken)
{
return _channel.Reader.ReadAllAsync(cancellationToken);
}
}
// 客户端使用
await foreach (var update in grain.GetUpdates().WithCancellation(cancellationToken))
{
Console.WriteLine(update);
}
```
### 9.6 POCO Grain
无需继承 `Grain` 基类:
```csharp
public class PocoGrain : IGrainBase, IMyGrain
{
public IGrainContext GrainContext { get; }
private readonly ILogger<PocoGrain> _logger;
private string _value = "";
public PocoGrain(IGrainContext context, ILogger<PocoGrain> logger)
{
GrainContext = context;
_logger = logger;
}
public Task OnActivateAsync(CancellationToken cancellationToken)
{
_logger.LogInformation("Activated");
return Task.CompletedTask;
}
public Task<string> GetValue() => Task.FromResult(_value);
public Task SetValue(string value)
{
_value = value;
return Task.CompletedTask;
}
}
```
---
## 10. 与其他技术集成
### 10.1 ASP.NET Core 集成
```csharp
var builder = WebApplication.CreateBuilder(args);
// 配置 Orleans
builder.Host.UseOrleans(silo =>
{
silo.UseLocalhostClustering();
silo.AddMemoryGrainStorage("Default");
});
var app = builder.Build();
// Minimal API
app.MapGet("/users/{id}", async (IGrainFactory grains, string id) =>
{
var grain = grains.GetGrain<IUserGrain>(id);
return await grain.GetProfile();
});
app.MapPost("/users/{id}", async (IGrainFactory grains, string id, UserProfile profile) =>
{
var grain = grains.GetGrain<IUserGrain>(id);
await grain.UpdateProfile(profile);
return Results.Ok();
});
app.Run();
```
### 10.2 SignalR 集成
```bash
dotnet add package SignalR.Orleans
```
```csharp
// 配置 Silo
builder.Host.UseOrleans(silo =>
{
silo.UseSignalR();
});
// 配置 SignalR
builder.Services.AddSignalR().AddOrleans();
// Hub
public class ChatHub : Hub
{
public async Task SendMessage(string room, string message)
{
await Clients.Group(room).SendAsync("ReceiveMessage", message);
}
}
// 从 Grain 发送消息
public class NotificationGrain : Grain
{
private readonly IHubContext<ChatHub> _hubContext;
public NotificationGrain(IHubContext<ChatHub> hubContext)
{
_hubContext = hubContext;
}
public async Task Broadcast(string room, string message)
{
await _hubContext.Clients.Group(room).SendAsync("ReceiveMessage", message);
}
}
```
### 10.3 gRPC 集成
```csharp
// gRPC 服务
public class GreeterService : Greeter.GreeterBase
{
private readonly IGrainFactory _grainFactory;
public GreeterService(IGrainFactory grainFactory)
{
_grainFactory = grainFactory;
}
public override async Task<HelloReply> SayHello(HelloRequest request, ServerCallContext context)
{
var grain = _grainFactory.GetGrain<IHelloGrain>(request.Name);
var message = await grain.SayHello(request.Name);
return new HelloReply { Message = message };
}
}
// 配置
builder.Services.AddGrpc();
builder.Host.UseOrleans(silo => silo.UseLocalhostClustering());
var app = builder.Build();
app.MapGrpcService<GreeterService>();
```
### 10.4 .NET Aspire 集成
```csharp
// AppHost
var orleans = builder.AddOrleans("orleans")
.WithClustering(builder.AddAzureTableStorage("clustering"))
.WithGrainStorage("Default", builder.AddAzureBlobStorage("grains"));
builder.AddProject<Projects.Silo>("silo")
.WithReference(orleans);
builder.AddProject<Projects.Client>("client")
.WithReference(orleans.AsClient());
```
---
## 11. 测试策略
### 11.1 集成测试
```csharp
public class OrleansTestBase : IAsyncLifetime
{
protected TestCluster Cluster { get; private set; } = null!;
public async Task InitializeAsync()
{
var builder = new TestClusterBuilder();
builder.AddSiloBuilderConfigurator<TestSiloConfig>();
Cluster = builder.Build();
await Cluster.DeployAsync();
}
public async Task DisposeAsync()
{
await Cluster.StopAllSilosAsync();
}
}
public class TestSiloConfig : ISiloConfigurator
{
public void Configure(ISiloBuilder siloBuilder)
{
siloBuilder.AddMemoryGrainStorage("Default");
}
}
// 测试类
public class UserGrainTests : OrleansTestBase
{
[Fact]
public async Task GetProfile_ReturnsStoredProfile()
{
// Arrange
var grain = Cluster.GrainFactory.GetGrain<IUserGrain>("test-user");
await grain.SetName("John Doe");
// Act
var profile = await grain.GetProfile();
// Assert
Assert.Equal("John Doe", profile.Name);
}
[Fact]
public async Task AddPoints_IncreasesTotalPoints()
{
// Arrange
var grain = Cluster.GrainFactory.GetGrain<IUserGrain>("test-user");
await grain.AddPoints(100);
await grain.AddPoints(50);
// Act
var points = await grain.GetPoints();
// Assert
Assert.Equal(150, points);
}
}
```
### 11.2 Mock 单元测试
```csharp
public class OrderProcessorTests
{
[Fact]
public async Task ProcessOrder_WhenInventoryAvailable_ProcessesSuccessfully()
{
// Arrange
var mockInventory = new Mock<IInventoryGrain>();
mockInventory.Setup(x => x.CheckAvailability(10))
.ReturnsAsync(true);
var mockPayment = new Mock<IPaymentGrain>();
var mockNotification = new Mock<INotificationGrain>();
var mockFactory = new Mock<IGrainFactory>();
mockFactory.Setup(x => x.GetGrain<IInventoryGrain>("product-1", null))
.Returns(mockInventory.Object);
// Act & Assert
// ... 测试逻辑
}
}
```
### 11.3 故障注入测试
```csharp
public class FaultToleranceTests : OrleansTestBase
{
[Fact]
public async Task GrainRecovers_AfterSiloCrash()
{
// Arrange
var grain = Cluster.GrainFactory.GetGrain<IUserGrain>("test");
await grain.SetName("Before Crash");
// Act - 停止 Silo
var siloToStop = Cluster.Silos.First();
await Cluster.StopSiloAsync(siloToStop);
// 启动新 Silo
await Cluster.StartSiloAsync();
// Assert - Grain 应该能恢复
var name = await grain.GetName();
Assert.Equal("Before Crash", name);
}
}
```
---
## 12. 生产部署
### 12.1 集群配置
```csharp
builder.Host.UseOrleans(silo =>
{
// 集群配置
silo.Configure<ClusterOptions>(options =>
{
options.ClusterId = "production-cluster";
options.ServiceId = "my-service";
});
// Azure Storage 集群
silo.UseAzureStorageClustering(options =>
{
options.ConfigureTableServiceClient(connectionString);
});
// 端点配置
silo.ConfigureEndpoints(
siloPort: 11111,
gatewayPort: 30000,
listenOnAnyHostAddress: true);
// 持久化
silo.AddAzureTableGrainStorage("Default", options =>
{
options.ConfigureTableServiceClient(connectionString);
});
// Reminders
silo.UseAzureTableReminderService(options =>
{
options.ConfigureTableServiceClient(connectionString);
});
});
```
### 12.2 Kubernetes 部署
```yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: orleans-silo
spec:
replicas: 3
selector:
matchLabels:
app: orleans-silo
template:
metadata:
labels:
app: orleans-silo
orleans-cluster: production
spec:
containers:
- name: silo
image: myregistry/orleans-app:latest
ports:
- containerPort: 11111
name: silo
- containerPort: 30000
name: gateway
env:
- name: ORLEANS_CLUSTER_ID
value: "production"
- name: POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "2000m"
livenessProbe:
httpGet:
path: /health
port: 80
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /health/ready
port: 80
initialDelaySeconds: 10
periodSeconds: 5
```
### 12.3 健康检查
```csharp
builder.Services.AddHealthChecks()
.AddCheck<OrleansHealthCheck>("orleans");
public class OrleansHealthCheck : IHealthCheck
{
private readonly IClusterClient _client;
public OrleansHealthCheck(IClusterClient client)
{
_client = client;
}
public async Task<HealthCheckResult> CheckHealthAsync(
HealthCheckContext context,
CancellationToken cancellationToken = default)
{
try
{
var managementGrain = _client.GetGrain<IManagementGrain>(0);
var hosts = await managementGrain.GetHosts();
return hosts.Count > 0
? HealthCheckResult.Healthy($"{hosts.Count} silos active")
: HealthCheckResult.Unhealthy("No silos in cluster");
}
catch (Exception ex)
{
return HealthCheckResult.Unhealthy("Orleans check failed", ex);
}
}
}
```
### 12.4 GC 配置
```xml
<!-- 项目文件 -->
<PropertyGroup>
<ServerGarbageCollection>true</ServerGarbageCollection>
<ConcurrentGarbageCollection>true</ConcurrentGarbageCollection>
</PropertyGroup>
```
### 12.5 监控
```csharp
// OpenTelemetry
builder.Services.AddOpenTelemetry()
.WithMetrics(metrics => metrics
.AddPrometheusExporter()
.AddMeter("Microsoft.Orleans"))
.WithTracing(tracing => tracing
.AddSource("Microsoft.Orleans.Runtime")
.AddSource("Microsoft.Orleans.Application"));
// Orleans Dashboard (Orleans 10+)
builder.Host.UseOrleans(silo =>
{
silo.UseDashboard(options =>
{
options.Port = 8080;
options.HostSelf = true;
});
});
```
---
## 13. 最佳实践与避坑指南
### 13.1 设计原则
| 原则 | 说明 |
|------|------|
| **细粒度 Grain** | 多个小 Grain > 少量大 Grain |
| **避免 Chatty** | 减少 Grain 间频繁通信 |
| **无阻塞** | 所有操作必须异步 |
| **避免热点** | 不要有单一协调 Grain |
### 13.2 性能优化
```csharp
// ✅ 好:批量处理
public async Task ProcessBatch(List<Item> items)
{
var tasks = items.Select(item => ProcessItem(item));
await Task.WhenAll(tasks);
}
// ❌ 差:逐个等待
public async Task ProcessSequential(List<Item> items)
{
foreach (var item in items)
{
await ProcessItem(item); // 串行等待
}
}
// ✅ 好:使用 ETag 避免写入冲突
public async Task UpdateWithETag()
{
var (state, etag) = await _state.GetStateAndETag();
// 修改 state
await _state.WriteStateAsync(state, etag);
}
// ✅ 好:配置连接池
silo.Configure<ConnectionOptions>(options =>
{
options.ConnectionPoolSize = 100;
});
```
### 13.3 常见陷阱
| 陷阱 | 解决方案 |
|------|---------|
| 阻塞调用 | 使用 async/await |
| 死锁 (A→B→A) | 使用 [Reentrant] 或重新设计 |
| 每次调用 new HttpClient | 使用 IHttpClientFactory |
| 忘记 WriteStateAsync | 状态修改后立即持久化 |
| 单一热点 Grain | 分片或使用 StatelessWorker |
### 13.4 序列化优化
```csharp
// ✅ 好:使用 GenerateSerializer
[GenerateSerializer]
public class MyData
{
[Id(0)]
public string Name { get; set; } = "";
[Id(1)]
public int Value { get; set; }
}
// ❌ 差:依赖默认序列化(使用反射)
public class MyData
{
public string Name { get; set; }
public int Value { get; set; }
}
// ✅ 好:使用 [Alias] 保证版本兼容
[GenerateSerializer]
[Alias("MyApp.MyData")]
public class MyData
{
[Id(0)]
[Alias("nm")]
public string Name { get; set; } = "";
}
```
---
## 14. 附录
### 14.1 NuGet 包速查
| 包名 | 用途 |
|------|------|
| `Microsoft.Orleans.Server` | Silo 服务端 |
| `Microsoft.Orleans.Client` | 独立客户端 |
| `Microsoft.Orleans.Sdk` | SDK(含代码生成) |
| `Microsoft.Orleans.Persistence.AzureStorage` | Azure 存储 |
| `Microsoft.Orleans.Persistence.Cosmos` | Cosmos DB |
| `Microsoft.Orleans.Persistence.AdoNet` | SQL 数据库 |
| `Microsoft.Orleans.Persistence.DynamoDB` | DynamoDB |
| `Microsoft.Orleans.Streaming.AzureStorage` | Azure Queue |
| `Microsoft.Orleans.Streaming.EventHubs` | Event Hubs |
| `Orleans.Contrib.Persistence.Redis` | Redis |
| `SignalR.Orleans` | SignalR 集成 |
### 14.2 性能参考
| 场景 | 预期吞吐量 |
|------|-----------|
| 单 Grain | ~1,000 req/s |
| 单 Silo (8核) | ~10,000 req/s |
| 单 Silo 活跃 Grain | ~100,000 |
| 集群规模 | 1000+ Silos |
### 14.3 官方资源
- **GitHub**: https://github.com/dotnet/orleans
- **官方文档**: https://learn.microsoft.com/dotnet/orleans
- **示例代码**: https://github.com/dotnet/orleans/tree/main/samples
- **社区**: https://github.com/dotnet/orleans/discussions
---
## 总结
Microsoft Orleans 是构建分布式系统的强大工具,其 Virtual Actor 模型大大简化了分布式编程的复杂性。通过本教程,你应该掌握了:
1. **核心概念** - Grain、Silo、Cluster
2. **状态管理** - 持久化存储配置
3. **消息传递** - Stream 和 Observer 模式
4. **定时任务** - Timer 和 Reminder
5. **分布式事务** - ACID 事务支持
6. **生产部署** - Kubernetes、监控、健康检查
记住核心原则:**让每个 Grain 保持简单、独立,让 Orleans 运行时处理分布式复杂性**。
---
*文档版本: Orleans 9.x/10.x*
*最后更新: 2025年2月*
登录后可参与表态
讨论回复
0 条回复还没有人回复,快来发表你的看法吧!