Orleans 从入门到精通

Orleans 从入门到精通

目录

  1. 什么是 Orleans
  2. 核心概念
  3. 快速入门
  4. Grain 详解
  5. 状态持久化
  6. 流处理
  7. 定时器与提醒
  8. 分布式事务
  9. 高级特性
  10. 与其他技术集成
  11. 测试策略
  12. 生产部署
  13. 最佳实践与避坑指南
  14. 附录

1. 什么是 Orleans

1.1 简介

Microsoft Orleans 是一个跨平台的 .NET 框架,用于构建分布式、可扩展的应用程序。它由 Microsoft Research 发明,采用了创新的 Virtual Actor 模型,让开发者可以像编写单机应用一样编写分布式系统。

Orleans 已被广泛应用于:

  • Xbox Live - 数千万玩家在线服务
  • Halo 4/5 - 游戏云服务
  • Skype - 实时通信后端
  • Azure IoT - 物联网设备管理
  • Azure Digital Twins - 数字孪生服务
  • PlayFab - 游戏后端平台

1.2 为什么选择 Orleans

优势说明
Virtual Actor无需手动创建/销毁,自动生命周期管理
位置透明调用者无需知道 Actor 物理位置
弹性扩展轻松从单节点扩展到数千节点
容错能力自动故障恢复,状态可持久化
简化开发用面向对象方式编写分布式系统
成熟稳定微软生产环境验证超过 10 年

1.3 Orleans vs 其他 Actor 框架

特性OrleansAkka.NETProto.Actor
Actor 模型Virtual Actor经典 Actor经典 Actor
生命周期自动管理手动控制手动控制
性能 (msg/s)~100K~120K~150K
内存效率最佳中等中等
学习曲线平缓陡峭中等
云原生支持最佳良好良好

2. 核心概念

2.1 架构概览

┌─────────────────────────────────────────────────────────┐
│                      Cluster                             │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐  │
│  │   Silo 1    │◄──►│   Silo 2    │◄──►│   Silo 3    │  │
│  │ ┌─────────┐ │    │ ┌─────────┐ │    │ ┌─────────┐ │  │
│  │ │ Grain A │ │    │ │ Grain B │ │    │ │ Grain C │ │  │
│  │ │ Grain D │ │    │ │ Grain E │ │    │ │ Grain F │ │  │
│  │ └─────────┘ │    │ └─────────┘ │    │ └─────────┘ │  │
│  └──────┬──────┘    └──────┬──────┘    └──────┬──────┘  │
│         │                  │                  │          │
│         └──────────────────┼──────────────────┘          │
│                            │                             │
│                     ┌──────┴──────┐                      │
│                     │   Gateway   │                      │
│                     └──────┬──────┘                      │
└────────────────────────────┼────────────────────────────┘
                             │
                      ┌──────┴──────┐
                      │   Clients   │
                      └─────────────┘

2.2 Grain(谷物)

Grain 是 Orleans 中的核心抽象,代表一个虚拟 Actor。每个 Grain 有:

  • 唯一标识 - 通过 Key 定位
  • 状态 - 可持久化的数据
  • 行为 - 通过接口定义的方法
// 定义 Grain 接口
public interface IUserGrain : IGrainWithStringKey
{
    Task<string> GetName();
    Task SetName(string name);
    Task AddPoints(int points);
    Task<int> GetPoints();
}

// 实现 Grain
public class UserGrain : Grain, IUserGrain
{
    private string _name = "";
    private int _points = 0;

    public Task<string> GetName() => Task.FromResult(_name);
    
    public Task SetName(string name)
    {
        _name = name;
        return Task.CompletedTask;
    }

    public Task<int> GetPoints() => Task.FromResult(_points);
    
    public Task AddPoints(int points)
    {
        _points += points;
        return Task.CompletedTask;
    }
}

2.3 Silo(筒仓)

Silo 是托管 Grain 的运行时容器,负责:

  • Grain 的激活和停用
  • 消息路由和分发
  • 状态持久化
  • 故障检测和恢复
// 配置 Silo
var builder = Host.CreateApplicationBuilder(args);
builder.UseOrleans(silo =>
{
    silo.UseLocalhostClustering();
    silo.AddMemoryGrainStorage("Default");
});

2.4 Cluster(集群)

Cluster 是一组协作的 Silo,提供:

  • 横向扩展能力
  • 高可用性
  • 负载均衡

2.5 Grain Key 类型

接口Key 类型用例
IGrainWithGuidKeyGuid自动生成的唯一 ID
IGrainWithIntegerKeylong数值 ID(如数据库主键)
IGrainWithStringKeystring字符串标识(如用户名)
IGrainWithGuidCompoundKeyGuid + string复合键
IGrainWithIntegerCompoundKeylong + string复合键
// 不同 Key 类型示例
var userByGuid = GrainFactory.GetGrain<IUserGrain>(Guid.NewGuid());
var userById = GrainFactory.GetGrain<IUserGrain>(123456L);
var userByName = GrainFactory.GetGrain<IUserGrain>("john_doe");

3. 快速入门

3.1 安装包

# 服务端
dotnet add package Microsoft.Orleans.Server

# 客户端
dotnet add package Microsoft.Orleans.Client

# SDK(包含代码生成)
dotnet add package Microsoft.Orleans.Sdk

# 分析器(可选,提供编译时检查)
dotnet add package Microsoft.Orleans.Analyzers

3.2 创建第一个 Orleans 应用

Step 1: 定义 Grain 接口

// IHelloGrain.cs
public interface IHelloGrain : IGrainWithStringKey
{
    Task<string> SayHello(string name);
}

Step 2: 实现 Grain

// HelloGrain.cs
public class HelloGrain : Grain, IHelloGrain
{
    public Task<string> SayHello(string name)
    {
        return Task.FromResult($"Hello, {name}!");
    }
}

Step 3: 配置并运行 Silo

// Program.cs
var builder = WebApplication.CreateBuilder(args);

// 配置 Orleans
builder.Host.UseOrleans(silo =>
{
    silo.UseLocalhostClustering();
    silo.AddMemoryGrainStorage("Default");
});

var app = builder.Build();

// 添加 API 端点
app.MapGet("/hello/{name}", async (IGrainFactory grains, string name) =>
{
    var grain = grains.GetGrain<IHelloGrain>(name);
    return await grain.SayHello(name);
});

app.Run();

Step 4: 运行

dotnet run
curl http://localhost:5000/hello/World
# 输出: "Hello, World!"

3.3 独立客户端连接

// 独立客户端程序
using var client = new ClientBuilder()
    .UseLocalhostClustering()
    .Build();

await client.Connect();

var grain = client.GetGrain<IHelloGrain>("test");
var result = await grain.SayHello("World");
Console.WriteLine(result); // "Hello, World!"

4. Grain 详解

4.1 Grain 生命周期

┌─────────────┐
│   Created   │  Grain 被请求但尚未激活
└──────┬──────┘
       │
       ▼
┌─────────────┐
│ Activating  │  调用 OnActivateAsync
└──────┬──────┘
       │
       ▼
┌─────────────┐
│   Active    │  Grain 可响应请求
└──────┬──────┘
       │
       ▼
┌─────────────┐
│Deactivating │  调用 OnDeactivateAsync
└──────┬──────┘
       │
       ▼
┌─────────────┐
│ Deactivated │  Grain 从内存移除
└─────────────┘
public class LifecycleGrain : Grain, ILifecycleGrain
{
    private readonly ILogger<LifecycleGrain> _logger;

    public LifecycleGrain(ILogger<LifecycleGrain> logger)
    {
        _logger = logger;
    }

    public override Task OnActivateAsync(CancellationToken cancellationToken)
    {
        _logger.LogInformation("Grain {Id} activating", this.GetPrimaryKeyString());
        return base.OnActivateAsync(cancellationToken);
    }

    public override Task OnDeactivateAsync(DeactivationReason reason, CancellationToken cancellationToken)
    {
        _logger.LogInformation("Grain {Id} deactivating: {Reason}", 
            this.GetPrimaryKeyString(), reason);
        return base.OnDeactivateAsync(reason, cancellationToken);
    }
}

4.2 Grain 之间调用

public class OrderGrain : Grain, IOrderGrain
{
    public async Task ProcessOrder(Order order)
    {
        // 获取其他 Grain
        var inventory = GrainFactory.GetGrain<IInventoryGrain>(order.ProductId);
        var payment = GrainFactory.GetGrain<IPaymentGrain>(order.UserId);
        var notification = GrainFactory.GetGrain<INotificationGrain>(order.UserId);

        // 检查库存
        var available = await inventory.CheckAvailability(order.Quantity);
        if (!available)
        {
            throw new InvalidOperationException("Insufficient inventory");
        }

        // 处理支付
        await payment.Charge(order.Amount);

        // 扣减库存
        await inventory.Deduct(order.Quantity);

        // 发送通知
        await notification.Send($"Order {order.Id} processed successfully");
    }
}

4.3 不可重入与可重入

默认情况下,Grain 是不可重入的,即同一时刻只能处理一个请求。

// 默认:不可重入(推荐)
public class SafeGrain : Grain, ISafeGrain
{
    // 同一时刻只有一个请求进入
}

// 可重入:允许交错执行
[Reentrant]
public class ReentrantGrain : Grain, IReentrantGrain
{
    // 允许在等待时处理其他请求
    // 警告:需要确保线程安全
}

// 方法级别可重入
public class MixedGrain : Grain, IMixedGrain
{
    [AlwaysInterleave]
    public Task<int> GetCounter() => Task.FromResult(_counter);

    public Task IncrementCounter()  // 不可重入
    {
        _counter++;
        return Task.CompletedTask;
    }
}

4.4 多重激活与 Stateless Worker

// Stateless Worker: 可在多个 Silo 同时激活
[StatelessWorker(maxLocalWorkers: 10)]
public class StatelessGrain : Grain, IStatelessGrain
{
    public Task<string> Process(string input)
    {
        // 无状态处理
        return Task.FromResult(input.ToUpper());
    }
}

// 使用场景:计算密集型、无状态转换

4.5 Grain 版本兼容性

// 使用 [Version] 属性标记版本
[Version(2)]
public interface IVersionedGrain : IGrainWithIntegerKey
{
    Task<string> GetValue();
    Task SetValue(string value);
    
    // 版本 2 新增方法
    Task<int> GetVersion();
}

5. 状态持久化

5.1 持久化存储提供者

Orleans 支持多种存储后端:

存储类型NuGet 包特点
Memory内置仅用于开发测试
Azure TableMicrosoft.Orleans.Persistence.AzureStorage经济高效
Azure BlobMicrosoft.Orleans.Persistence.AzureStorage大对象存储
Azure CosmosMicrosoft.Orleans.Persistence.Cosmos全球分布
RedisOrleans.Contrib.Persistence.Redis高性能缓存
SQL ServerMicrosoft.Orleans.Persistence.AdoNet关系数据库
PostgreSQLMicrosoft.Orleans.Persistence.AdoNet开源数据库
DynamoDBMicrosoft.Orleans.Persistence.DynamoDBAWS 原生

5.2 配置存储

builder.Host.UseOrleans(silo =>
{
    // Azure Table Storage
    silo.AddAzureTableGrainStorage("tableStore", options =>
    {
        options.ConfigureTableServiceClient(connectionString);
    });

    // Azure Blob Storage
    silo.AddAzureBlobGrainStorage("blobStore", options =>
    {
        options.ConfigureBlobServiceClient(connectionString);
    });

    // Redis
    silo.AddRedisGrainStorage("redis", options =>
    {
        options.DataConnectionString = redisConnectionString;
    });

    // ADO.NET (SQL Server)
    silo.AddAdoNetGrainStorage("sql", options =>
    {
        options.ConnectionString = sqlConnectionString;
        options.Invariant = "Microsoft.Data.SqlClient";
    });
});

5.3 使用 IPersistentState(推荐)

// 定义状态类
[GenerateSerializer]
public class UserProfileState
{
    [Id(0)]
    public string Name { get; set; } = "";
    
    [Id(1)]
    public string Email { get; set; } = "";
    
    [Id(2)]
    public int Points { get; set; }
    
    [Id(3)]
    public List<string> Achievements { get; set; } = new();
}

// 实现 Grain
public class UserProfileGrain : Grain, IUserProfileGrain
{
    private readonly IPersistentState<UserProfileState> _state;

    public UserProfileGrain(
        [PersistentState("profile", "tableStore")]
        IPersistentState<UserProfileState> state)
    {
        _state = state;
    }

    public Task<UserProfileState> GetProfile()
    {
        return Task.FromResult(_state.State);
    }

    public async Task UpdateName(string name)
    {
        _state.State.Name = name;
        await _state.WriteStateAsync();
    }

    public async Task AddPoints(int points)
    {
        _state.State.Points += points;
        await _state.WriteStateAsync();
    }

    public async Task AddAchievement(string achievement)
    {
        _state.State.Achievements.Add(achievement);
        await _state.WriteStateAsync();
    }
}

5.4 状态操作

public class StateOperationsGrain : Grain, IStateOperationsGrain
{
    private readonly IPersistentState<MyState> _state;

    // 写入状态
    public async Task SaveState()
    {
        _state.State.Value = "New Value";
        await _state.WriteStateAsync();
    }

    // 读取状态
    public async Task RefreshState()
    {
        await _state.ReadStateAsync();
    }

    // 清除状态
    public async Task ClearState()
    {
        await _state.ClearStateAsync();
    }
}

5.5 Grain 方式(传统)

[StorageProvider(ProviderName = "tableStore")]
public class LegacyGrain : Grain<LegacyState>, ILegacyGrain
{
    public async Task UpdateValue(string value)
    {
        State.Value = value;
        await WriteStateAsync();
    }

    public Task<string> GetValue()
    {
        return Task.FromResult(State.Value);
    }
}

6. 流处理

6.1 流概述

Orleans Streams 提供了一种优雅的方式来实现发布-订阅模式:

┌─────────┐    ┌─────────────┐    ┌─────────┐
│Producer │───►│   Stream    │───►│Consumer │
│ Grain   │    │  (Queue)    │    │ Grain   │
└─────────┘    └─────────────┘    └─────────┘

6.2 配置流提供者

builder.Host.UseOrleans(silo =>
{
    // Azure Queue Streams
    silo.AddAzureQueueStreams("AzureQueue", options =>
    {
        options.ConfigureQueueServiceClient(connectionString);
    });

    // Simple Message Stream (内存)
    silo.AddMemoryStreams("MemoryStream");

    // Azure Event Hubs (可回溯)
    silo.AddEventHubStreams("EventHub", options =>
    {
        options.ConfigureEventHub(builder => 
        {
            builder.ConfigureEventHubConnection(connectionString, "hubName", "consumerGroup");
        });
    });
});

6.3 生产者

public class ProducerGrain : Grain, IProducerGrain
{
    public async Task ProduceEvents()
    {
        var streamProvider = this.GetStreamProvider("AzureQueue");
        var streamId = StreamId.Create("ChatRoom", "general");
        var stream = streamProvider.GetStream<string>(streamId);

        // 发送消息
        await stream.OnNextAsync("Hello, everyone!");
        await stream.OnNextAsync("How are you?");
    }
}

6.4 消费者(显式订阅)

public class ConsumerGrain : Grain, IConsumerGrain
{
    private StreamSubscriptionHandle<string>? _subscription;

    public override async Task OnActivateAsync(CancellationToken cancellationToken)
    {
        var streamProvider = this.GetStreamProvider("AzureQueue");
        var streamId = StreamId.Create("ChatRoom", this.GetPrimaryKeyString());
        var stream = streamProvider.GetStream<string>(streamId);

        // 订阅流
        _subscription = await stream.SubscribeAsync(OnNextAsync, OnErrorAsync, OnCompletedAsync);
    }

    private Task OnNextAsync(string item, StreamSequenceToken? token)
    {
        Console.WriteLine($"Received: {item}");
        return Task.CompletedTask;
    }

    private Task OnErrorAsync(Exception ex)
    {
        Console.WriteLine($"Error: {ex.Message}");
        return Task.CompletedTask;
    }

    private Task OnCompletedAsync()
    {
        Console.WriteLine("Stream completed");
        return Task.CompletedTask;
    }
}

6.5 隐式订阅

// 隐式订阅:Grain 根据属性自动订阅
[ImplicitStreamSubscription("ChatRoom")]
public class ChatRoomGrain : Grain, IChatRoomGrain, IAsyncObserver<string>
{
    public override async Task OnActivateAsync(CancellationToken cancellationToken)
    {
        var streamProvider = this.GetStreamProvider("AzureQueue");
        var streamId = StreamId.Create("ChatRoom", this.GetPrimaryKeyString());
        var stream = streamProvider.GetStream<string>(streamId);

        // 获取或创建订阅句柄
        var handles = await stream.GetAllSubscriptionHandles();
        if (handles.Count > 0)
        {
            await handles[0].ResumeAsync(this);
        }
        else
        {
            await stream.SubscribeAsync(this);
        }
    }

    public Task OnNextAsync(string item, StreamSequenceToken? token = null)
    {
        Console.WriteLine($"ChatRoom {GetPrimaryKeyString()}: {item}");
        return Task.CompletedTask;
    }
}

7. 定时器与提醒

7.1 Grain Timer(非持久化)

适用于高频、临时性任务:

public class TimerGrain : Grain, ITimerGrain
{
    private IGrainTimer? _timer;
    private int _counter = 0;

    public override Task OnActivateAsync(CancellationToken cancellationToken)
    {
        // Orleans 8.2+ 新 API
        _timer = this.RegisterGrainTimer(
            static (state, ct) => state.DoPeriodicWork(ct),
            this,
            new GrainTimerCreationOptions
            {
                DueTime = TimeSpan.FromSeconds(5),
                Period = TimeSpan.FromSeconds(10),
                KeepAlive = true,      // 防止 Grain 被回收
                Interleave = false     // 不与其他调用交错
            });

        return Task.CompletedTask;
    }

    private Task DoPeriodicWork(CancellationToken cancellationToken)
    {
        _counter++;
        Console.WriteLine($"Counter: {_counter}");
        return Task.CompletedTask;
    }

    public override Task OnDeactivateAsync(DeactivationReason reason, CancellationToken cancellationToken)
    {
        _timer?.Dispose();
        return Task.CompletedTask;
    }
}

7.2 Reminder(持久化)

适用于低频、持久性任务:

// 配置 Reminder 存储
builder.Host.UseOrleans(silo =>
{
    silo.UseAzureTableReminderService(options =>
    {
        options.ConfigureTableServiceClient(connectionString);
    });
});

// 实现 IRemindable
public class ReminderGrain : Grain, IReminderGrain, IRemindable
{
    private IGrainReminder? _reminder;

    public async Task StartReminder()
    {
        _reminder = await this.RegisterOrUpdateReminder(
            "DailyCleanup",
            dueTime: TimeSpan.FromMinutes(1),
            period: TimeSpan.FromHours(24));
    }

    public async Task StopReminder()
    {
        if (_reminder != null)
        {
            await this.UnregisterReminder(_reminder);
        }
    }

    public Task ReceiveReminder(string reminderName, TickStatus status)
    {
        Console.WriteLine($"Reminder {reminderName} fired at {status.CurrentTickTime}");
        return Task.CompletedTask;
    }
}

7.3 Timer vs Reminder

特性Grain TimerReminder
持久化
频率高(毫秒级)低(分钟级)
Grain 停用后消失继续触发
存储内存数据库
适用场景临时任务定期清理、报告

8. 分布式事务

8.1 配置事务

builder.Host.UseOrleans(silo =>
{
    silo.UseTransactions();
    
    silo.AddAzureTableTransactionalStateStorage("TransactionStore", options =>
    {
        options.ConfigureTableServiceClient(connectionString);
    });
});

8.2 定义事务性 Grain

// 定义接口
public interface IAccountGrain : IGrainWithStringKey
{
    [Transaction(TransactionOption.Join)]
    Task<decimal> GetBalance();

    [Transaction(TransactionOption.Join)]
    Task Withdraw(decimal amount);

    [Transaction(TransactionOption.Join)]
    Task Deposit(decimal amount);
}

// 实现
[Reentrant]
public class AccountGrain : Grain, IAccountGrain
{
    private readonly ITransactionalState<AccountState> _balance;

    public AccountGrain(
        [TransactionalState("balance", "TransactionStore")]
        ITransactionalState<AccountState> balance)
    {
        _balance = balance;
    }

    public Task<decimal> GetBalance()
    {
        return _balance.PerformRead(state => state.Balance);
    }

    public Task Withdraw(decimal amount)
    {
        return _balance.PerformUpdate(state =>
        {
            if (state.Balance < amount)
                throw new InvalidOperationException("Insufficient funds");
            state.Balance -= amount;
        });
    }

    public Task Deposit(decimal amount)
    {
        return _balance.PerformUpdate(state =>
        {
            state.Balance += amount;
        });
    }
}

8.3 执行事务

public class TransferService
{
    private readonly ITransactionClient _transactionClient;
    private readonly IGrainFactory _grainFactory;

    public async Task Transfer(string fromAccount, string toAccount, decimal amount)
    {
        await _transactionClient.RunTransaction(
            TransactionOption.Create,
            async () =>
            {
                var from = _grainFactory.GetGrain<IAccountGrain>(fromAccount);
                var to = _grainFactory.GetGrain<IAccountGrain>(toAccount);

                await from.Withdraw(amount);
                await to.Deposit(amount);
            });
    }
}

9. 高级特性

9.1 Observer 模式

允许客户端接收 Grain 状态变化通知:

// 定义 Observer 接口
public interface IChatObserver : IGrainObserver
{
    void ReceiveMessage(string from, string message);
    void UserJoined(string username);
    void UserLeft(string username);
}

// Grain 发送通知
public class ChatRoomGrain : Grain, IChatRoomGrain
{
    private readonly List<IGrainObserver> _observers = new();

    public Task Subscribe(IChatObserver observer)
    {
        _observers.Add(observer);
        return Task.CompletedTask;
    }

    public Task SendMessage(string from, string message)
    {
        foreach (var observer in _observers.OfType<IChatObserver>())
        {
            observer.ReceiveMessage(from, message);
        }
        return Task.CompletedTask;
    }
}

// 客户端使用
public class ChatClient : IChatObserver
{
    public void ReceiveMessage(string from, string message)
    {
        Console.WriteLine($"{from}: {message}");
    }

    public async Task Join(IChatRoomGrain room)
    {
        var observer = new ChatClient();
        var reference = await this.GrainFactory.CreateObjectReference<IChatObserver>(observer);
        await room.Subscribe(reference);
    }
}

9.2 Grain Placement 策略

// 随机放置(默认)
[RandomPlacement]
public class RandomGrain : Grain { }

// 基于激活数放置
[ActivationCountBasedPlacement]
public class BalancedGrain : Grain { }

// 资源优化放置(Orleans 9.x 默认)
[ResourceOptimizedPlacement]
public class OptimizedGrain : Grain { }

// 优先本地放置
[PreferLocalPlacement]
public class LocalGrain : Grain { }

// 自定义放置
[PlacementStrategy(typeof(MyCustomPlacement))]
public class CustomGrain : Grain { }

// 配置权重
silo.Configure<ResourceOptimizedPlacementOptions>(options =>
{
    options.CpuUsageWeight = 40;
    options.MemoryUsageWeight = 30;
    options.AvailableMemoryWeight = 20;
    options.MaxAvailableMemoryWeight = 10;
});

9.3 Grain 目录

// 自定义 Grain 目录
siloBuilder.UseAdoNetGrainDirectory(options =>
{
    options.ConnectionString = connectionString;
    options.Invariant = "Microsoft.Data.SqlClient";
});

9.4 请求上下文

// 设置请求上下文(传递元数据)
RequestContext.Set("TraceId", Guid.NewGuid());
RequestContext.Set("UserId", "user123");

// 在 Grain 中读取
public class TracedGrain : Grain, ITracedGrain
{
    public Task DoWork()
    {
        var traceId = RequestContext.Get("TraceId");
        var userId = RequestContext.Get("UserId");
        Console.WriteLine($"TraceId: {traceId}, UserId: {userId}");
        return Task.CompletedTask;
    }
}

9.5 IAsyncEnumerable 支持

// Grain 返回异步流
public interface IStreamGrain : IGrainWithStringKey
{
    IAsyncEnumerable<string> GetUpdates(CancellationToken cancellationToken = default);
}

public class StreamGrain : Grain, IStreamGrain
{
    private readonly Channel<string> _channel = Channel.CreateUnbounded<string>();

    public async Task Publish(string message)
    {
        await _channel.Writer.WriteAsync(message);
    }

    public IAsyncEnumerable<string> GetUpdates(CancellationToken cancellationToken)
    {
        return _channel.Reader.ReadAllAsync(cancellationToken);
    }
}

// 客户端使用
await foreach (var update in grain.GetUpdates().WithCancellation(cancellationToken))
{
    Console.WriteLine(update);
}

9.6 POCO Grain

无需继承 Grain 基类:

public class PocoGrain : IGrainBase, IMyGrain
{
    public IGrainContext GrainContext { get; }
    private readonly ILogger<PocoGrain> _logger;
    private string _value = "";

    public PocoGrain(IGrainContext context, ILogger<PocoGrain> logger)
    {
        GrainContext = context;
        _logger = logger;
    }

    public Task OnActivateAsync(CancellationToken cancellationToken)
    {
        _logger.LogInformation("Activated");
        return Task.CompletedTask;
    }

    public Task<string> GetValue() => Task.FromResult(_value);

    public Task SetValue(string value)
    {
        _value = value;
        return Task.CompletedTask;
    }
}

10. 与其他技术集成

10.1 ASP.NET Core 集成

var builder = WebApplication.CreateBuilder(args);

// 配置 Orleans
builder.Host.UseOrleans(silo =>
{
    silo.UseLocalhostClustering();
    silo.AddMemoryGrainStorage("Default");
});

var app = builder.Build();

// Minimal API
app.MapGet("/users/{id}", async (IGrainFactory grains, string id) =>
{
    var grain = grains.GetGrain<IUserGrain>(id);
    return await grain.GetProfile();
});

app.MapPost("/users/{id}", async (IGrainFactory grains, string id, UserProfile profile) =>
{
    var grain = grains.GetGrain<IUserGrain>(id);
    await grain.UpdateProfile(profile);
    return Results.Ok();
});

app.Run();

10.2 SignalR 集成

dotnet add package SignalR.Orleans
// 配置 Silo
builder.Host.UseOrleans(silo =>
{
    silo.UseSignalR();
});

// 配置 SignalR
builder.Services.AddSignalR().AddOrleans();

// Hub
public class ChatHub : Hub
{
    public async Task SendMessage(string room, string message)
    {
        await Clients.Group(room).SendAsync("ReceiveMessage", message);
    }
}

// 从 Grain 发送消息
public class NotificationGrain : Grain
{
    private readonly IHubContext<ChatHub> _hubContext;

    public NotificationGrain(IHubContext<ChatHub> hubContext)
    {
        _hubContext = hubContext;
    }

    public async Task Broadcast(string room, string message)
    {
        await _hubContext.Clients.Group(room).SendAsync("ReceiveMessage", message);
    }
}

10.3 gRPC 集成

// gRPC 服务
public class GreeterService : Greeter.GreeterBase
{
    private readonly IGrainFactory _grainFactory;

    public GreeterService(IGrainFactory grainFactory)
    {
        _grainFactory = grainFactory;
    }

    public override async Task<HelloReply> SayHello(HelloRequest request, ServerCallContext context)
    {
        var grain = _grainFactory.GetGrain<IHelloGrain>(request.Name);
        var message = await grain.SayHello(request.Name);
        return new HelloReply { Message = message };
    }
}

// 配置
builder.Services.AddGrpc();
builder.Host.UseOrleans(silo => silo.UseLocalhostClustering());

var app = builder.Build();
app.MapGrpcService<GreeterService>();

10.4 .NET Aspire 集成

// AppHost
var orleans = builder.AddOrleans("orleans")
    .WithClustering(builder.AddAzureTableStorage("clustering"))
    .WithGrainStorage("Default", builder.AddAzureBlobStorage("grains"));

builder.AddProject<Projects.Silo>("silo")
    .WithReference(orleans);

builder.AddProject<Projects.Client>("client")
    .WithReference(orleans.AsClient());

11. 测试策略

11.1 集成测试

public class OrleansTestBase : IAsyncLifetime
{
    protected TestCluster Cluster { get; private set; } = null!;

    public async Task InitializeAsync()
    {
        var builder = new TestClusterBuilder();
        builder.AddSiloBuilderConfigurator<TestSiloConfig>();
        
        Cluster = builder.Build();
        await Cluster.DeployAsync();
    }

    public async Task DisposeAsync()
    {
        await Cluster.StopAllSilosAsync();
    }
}

public class TestSiloConfig : ISiloConfigurator
{
    public void Configure(ISiloBuilder siloBuilder)
    {
        siloBuilder.AddMemoryGrainStorage("Default");
    }
}

// 测试类
public class UserGrainTests : OrleansTestBase
{
    [Fact]
    public async Task GetProfile_ReturnsStoredProfile()
    {
        // Arrange
        var grain = Cluster.GrainFactory.GetGrain<IUserGrain>("test-user");
        await grain.SetName("John Doe");

        // Act
        var profile = await grain.GetProfile();

        // Assert
        Assert.Equal("John Doe", profile.Name);
    }

    [Fact]
    public async Task AddPoints_IncreasesTotalPoints()
    {
        // Arrange
        var grain = Cluster.GrainFactory.GetGrain<IUserGrain>("test-user");
        await grain.AddPoints(100);
        await grain.AddPoints(50);

        // Act
        var points = await grain.GetPoints();

        // Assert
        Assert.Equal(150, points);
    }
}

11.2 Mock 单元测试

public class OrderProcessorTests
{
    [Fact]
    public async Task ProcessOrder_WhenInventoryAvailable_ProcessesSuccessfully()
    {
        // Arrange
        var mockInventory = new Mock<IInventoryGrain>();
        mockInventory.Setup(x => x.CheckAvailability(10))
            .ReturnsAsync(true);

        var mockPayment = new Mock<IPaymentGrain>();
        var mockNotification = new Mock<INotificationGrain>();

        var mockFactory = new Mock<IGrainFactory>();
        mockFactory.Setup(x => x.GetGrain<IInventoryGrain>("product-1", null))
            .Returns(mockInventory.Object);

        // Act & Assert
        // ... 测试逻辑
    }
}

11.3 故障注入测试

public class FaultToleranceTests : OrleansTestBase
{
    [Fact]
    public async Task GrainRecovers_AfterSiloCrash()
    {
        // Arrange
        var grain = Cluster.GrainFactory.GetGrain<IUserGrain>("test");
        await grain.SetName("Before Crash");

        // Act - 停止 Silo
        var siloToStop = Cluster.Silos.First();
        await Cluster.StopSiloAsync(siloToStop);

        // 启动新 Silo
        await Cluster.StartSiloAsync();

        // Assert - Grain 应该能恢复
        var name = await grain.GetName();
        Assert.Equal("Before Crash", name);
    }
}

12. 生产部署

12.1 集群配置

builder.Host.UseOrleans(silo =>
{
    // 集群配置
    silo.Configure<ClusterOptions>(options =>
    {
        options.ClusterId = "production-cluster";
        options.ServiceId = "my-service";
    });

    // Azure Storage 集群
    silo.UseAzureStorageClustering(options =>
    {
        options.ConfigureTableServiceClient(connectionString);
    });

    // 端点配置
    silo.ConfigureEndpoints(
        siloPort: 11111,
        gatewayPort: 30000,
        listenOnAnyHostAddress: true);

    // 持久化
    silo.AddAzureTableGrainStorage("Default", options =>
    {
        options.ConfigureTableServiceClient(connectionString);
    });

    // Reminders
    silo.UseAzureTableReminderService(options =>
    {
        options.ConfigureTableServiceClient(connectionString);
    });
});

12.2 Kubernetes 部署

apiVersion: apps/v1
kind: Deployment
metadata:
  name: orleans-silo
spec:
  replicas: 3
  selector:
    matchLabels:
      app: orleans-silo
  template:
    metadata:
      labels:
        app: orleans-silo
        orleans-cluster: production
    spec:
      containers:
      - name: silo
        image: myregistry/orleans-app:latest
        ports:
        - containerPort: 11111
          name: silo
        - containerPort: 30000
          name: gateway
        env:
        - name: ORLEANS_CLUSTER_ID
          value: "production"
        - name: POD_IP
          valueFrom:
            fieldRef:
              fieldPath: status.podIP
        - name: POD_NAME
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "2Gi"
            cpu: "2000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 80
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /health/ready
            port: 80
          initialDelaySeconds: 10
          periodSeconds: 5

12.3 健康检查

builder.Services.AddHealthChecks()
    .AddCheck<OrleansHealthCheck>("orleans");

public class OrleansHealthCheck : IHealthCheck
{
    private readonly IClusterClient _client;

    public OrleansHealthCheck(IClusterClient client)
    {
        _client = client;
    }

    public async Task<HealthCheckResult> CheckHealthAsync(
        HealthCheckContext context,
        CancellationToken cancellationToken = default)
    {
        try
        {
            var managementGrain = _client.GetGrain<IManagementGrain>(0);
            var hosts = await managementGrain.GetHosts();
            
            return hosts.Count > 0
                ? HealthCheckResult.Healthy($"{hosts.Count} silos active")
                : HealthCheckResult.Unhealthy("No silos in cluster");
        }
        catch (Exception ex)
        {
            return HealthCheckResult.Unhealthy("Orleans check failed", ex);
        }
    }
}

12.4 GC 配置

<!-- 项目文件 -->
<PropertyGroup>
  <ServerGarbageCollection>true</ServerGarbageCollection>
  <ConcurrentGarbageCollection>true</ConcurrentGarbageCollection>
</PropertyGroup>

12.5 监控

// OpenTelemetry
builder.Services.AddOpenTelemetry()
    .WithMetrics(metrics => metrics
        .AddPrometheusExporter()
        .AddMeter("Microsoft.Orleans"))
    .WithTracing(tracing => tracing
        .AddSource("Microsoft.Orleans.Runtime")
        .AddSource("Microsoft.Orleans.Application"));

// Orleans Dashboard (Orleans 10+)
builder.Host.UseOrleans(silo =>
{
    silo.UseDashboard(options =>
    {
        options.Port = 8080;
        options.HostSelf = true;
    });
});

13. 最佳实践与避坑指南

13.1 设计原则

原则说明
细粒度 Grain多个小 Grain > 少量大 Grain
避免 Chatty减少 Grain 间频繁通信
无阻塞所有操作必须异步
避免热点不要有单一协调 Grain

13.2 性能优化

// ✅ 好:批量处理
public async Task ProcessBatch(List<Item> items)
{
    var tasks = items.Select(item => ProcessItem(item));
    await Task.WhenAll(tasks);
}

// ❌ 差:逐个等待
public async Task ProcessSequential(List<Item> items)
{
    foreach (var item in items)
    {
        await ProcessItem(item); // 串行等待
    }
}

// ✅ 好:使用 ETag 避免写入冲突
public async Task UpdateWithETag()
{
    var (state, etag) = await _state.GetStateAndETag();
    // 修改 state
    await _state.WriteStateAsync(state, etag);
}

// ✅ 好:配置连接池
silo.Configure<ConnectionOptions>(options =>
{
    options.ConnectionPoolSize = 100;
});

13.3 常见陷阱

陷阱解决方案
阻塞调用使用 async/await
死锁 (A→B→A)使用 [Reentrant] 或重新设计
每次调用 new HttpClient使用 IHttpClientFactory
忘记 WriteStateAsync状态修改后立即持久化
单一热点 Grain分片或使用 StatelessWorker

13.4 序列化优化

// ✅ 好:使用 GenerateSerializer
[GenerateSerializer]
public class MyData
{
    [Id(0)]
    public string Name { get; set; } = "";
    
    [Id(1)]
    public int Value { get; set; }
}

// ❌ 差:依赖默认序列化(使用反射)
public class MyData
{
    public string Name { get; set; }
    public int Value { get; set; }
}

// ✅ 好:使用 [Alias] 保证版本兼容
[GenerateSerializer]
[Alias("MyApp.MyData")]
public class MyData
{
    [Id(0)]
    [Alias("nm")]
    public string Name { get; set; } = "";
}

14. 附录

14.1 NuGet 包速查

包名用途
Microsoft.Orleans.ServerSilo 服务端
Microsoft.Orleans.Client独立客户端
Microsoft.Orleans.SdkSDK(含代码生成)
Microsoft.Orleans.Persistence.AzureStorageAzure 存储
Microsoft.Orleans.Persistence.CosmosCosmos DB
Microsoft.Orleans.Persistence.AdoNetSQL 数据库
Microsoft.Orleans.Persistence.DynamoDBDynamoDB
Microsoft.Orleans.Streaming.AzureStorageAzure Queue
Microsoft.Orleans.Streaming.EventHubsEvent Hubs
Orleans.Contrib.Persistence.RedisRedis
SignalR.OrleansSignalR 集成

14.2 性能参考

场景预期吞吐量
单 Grain~1,000 req/s
单 Silo (8核)~10,000 req/s
单 Silo 活跃 Grain~100,000
集群规模1000+ Silos

14.3 官方资源

  • GitHub: https://github.com/dotnet/orleans
  • 官方文档: https://learn.microsoft.com/dotnet/orleans
  • 示例代码: https://github.com/dotnet/orleans/tree/main/samples
  • 社区: https://github.com/dotnet/orleans/discussions

总结

Microsoft Orleans 是构建分布式系统的强大工具,其 Virtual Actor 模型大大简化了分布式编程的复杂性。通过本教程,你应该掌握了:

  1. 核心概念 - Grain、Silo、Cluster
  2. 状态管理 - 持久化存储配置
  3. 消息传递 - Stream 和 Observer 模式
  4. 定时任务 - Timer 和 Reminder
  5. 分布式事务 - ACID 事务支持
  6. 生产部署 - Kubernetes、监控、健康检查

记住核心原则:让每个 Grain 保持简单、独立,让 Orleans 运行时处理分布式复杂性


文档版本: Orleans 9.x/10.x 最后更新: 2025年2月

← 返回目录