Skip to content

协程

Valkyrie 提供了强大的协程支持,通过 yield 关键字实现协作式多任务处理。协程允许函数在执行过程中暂停和恢复,非常适合处理异步操作和状态机。协程与生成器的主要区别在于协程更注重控制流的暂停和恢复,而不仅仅是产生值序列。

协程状态管理

协程生命周期

valkyrie
# 协程状态枚举
union CoroutineState {
    Created,     # 已创建但未开始
    Running,     # 正在执行
    Suspended,   # 已暂停(yield)
    Completed,   # 已完成
    Error(Any)   # 发生错误
}

# 检查协程状态
micro example_coroutine() {
    print("开始执行")
    yield "第一个值"
    print("继续执行")
    yield "第二个值"
    print("执行完成")
}

let coro = example_coroutine()
print(coro.state())  # Created

let first = coro.next()
print(coro.state())  # Suspended
print(first)         # "第一个值"

let second = coro.next()
print(coro.state())  # Suspended
print(second)        # "第二个值"

coro.next()          # 完成执行
print(coro.state())  # Completed

协程控制

valkyrie
# 手动控制协程执行
micro controlled_coroutine() {
    let mut state = "idle"
    loop {
        let command = yield state
        command.match {
            case "start":
                state = "running"
            case "pause":
                state = "paused"
            case "stop":
                state = "stopped"
                break
            case _:
                state = "unknown_command"
        }
    }
}

let coro = controlled_coroutine()
print(coro.next())           # "idle"
print(coro.send("start"))    # "running"
print(coro.send("pause"))    # "paused"
print(coro.send("stop"))     # "stopped"

异步协程

异步操作

valkyrie
# 异步协程
async micro fetch_data(url: String) -> String {
    print("开始请求: ${ url }")
    let response = http_get(url).await?
    yield "请求已发送"  # 可以在异步函数中使用 yield
    
    if response.status == 200 {
        yield "请求成功"
        response.body
    } else {
        raise "请求失败: ${ response.status }"
    }
}

# 使用异步协程
async micro main() {
    let fetcher = fetch_data("https://api.example.com/data")
    
    # 处理中间状态
    for status in fetcher {
        print("状态: ${ status }")
    }
    
    # 获取最终结果
    try {
        let data = fetcher.await?
        print("数据: ${ data }")
    }
    .catch {
        case _:
            print("错误: ${ error }")
    }
}

并发协程

valkyrie
# 并发执行多个协程
async micro concurrent_processing(items: [String]) {
    let promises = items.map(async micro(item) {
        let result = process_item(item)
        yield "处理完成: ${ item }"
        result
    })
    
    # 等待所有 Promise 完成
    let results = Promise::all(promises).await?
    yield "所有任务完成"
    results
}

# 使用
async micro run_concurrent() {
    let processor = concurrent_processing(["item1", "item2", "item3"])
    
    for update in processor {
        print(update)
    }
    
    let final_results = processor.await?
    print("最终结果: ${ final_results }")
}

高级协程模式

状态机协程

valkyrie
# 状态机实现
union State {
    Idle,
    Processing,
    Waiting,
    Complete
}

micro state_machine() {
    let mut state = State::Idle
    let mut data = null
    
    loop {
        state.match {
            case State::Idle: {
                yield "等待输入"
                data = yield_receive()  # 等待外部输入
                state = State::Processing
            }
            case State::Processing: {
                yield "处理中..."
                let result = process_data(data)
                if result.is_ok() {
                    state = State::Complete
                } else {
                    state = State::Waiting
                }
            }
            case State::Waiting: {
                yield "等待重试"
                sleep(1000)  # 等待1秒
                state = State::Processing
            }
            case State::Complete: {
                yield "处理完成"
                break
            }
        }
    }
}

协程池

valkyrie
# 协程池管理
class CoroutinePool {
    coroutines: [Coroutine],
    max_size: i32,
    active_count: i32
    
    new(max_size: i32) -> Self {
        CoroutinePool {
            coroutines: [],
            max_size: max_size,
            active_count: 0
        }
    }
    
    spawn(task: micro() -> Any) -> bool {
        if self.active_count < self.max_size {
            let coro = Coroutine::new(task)
            self.coroutines.push(coro)
            self.active_count += 1
            true
        } else {
            false  # 池已满
        }
    }
    
    run_all() {
        while self.active_count > 0 {
            for coro in self.coroutines {
                if coro.state() == CoroutineState::Suspended {
                    let result = coro.resume()
                    yield "协程进度: ${ result }"
                    
                    if coro.state() == CoroutineState::Completed {
                        self.active_count -= 1
                    }
                }
            }
        }
        yield "所有协程完成"
    }
}

错误处理

协程异常处理

valkyrie
# 协程中的异常处理
micro error_prone_generator() {
    try {
        yield "开始处理"
        
        let risky_operation = perform_risky_task()
        yield "风险操作完成"
        
        if risky_operation.is_error() {
            raise "操作失败"
        }
        
        yield "处理成功"
    }
    .catch {
        case _:
            yield "发生错误: ${ error }"
            raise error  # 重新抛出异常
    }
}

# 使用带错误处理的协程
let gen = error_prone_generator()
try {
    for status in gen {
        print(status)
    }
}
.catch {
    case _:
        print("协程异常: ${ error }")
}

最佳实践

1. 协程设计原则

valkyrie
# 保持协程简单和专注
micro good_generator(data: [String]) {
    for item in data {
        if item.is_valid() {
            yield item.process()  # 只做一件事
        }
    }
}

# 避免在协程中进行复杂的状态管理
# 不好的例子:
micro bad_generator() {
    let mut complex_state = ComplexState::new()
    # ... 复杂的状态逻辑
}

2. 资源管理

valkyrie
# 确保资源正确释放
micro file_processor(filename: String) {
    let file = open_file(filename)
    try {
        while !file.eof() {
            let line = file.read_line()
            yield process_line(line)
        }
    }
    # 使用using确保文件关闭
    # using file = open_file(filename) { ... }
}

3. 性能考虑

valkyrie
# 避免频繁的小yield
# 不好的例子:
micro inefficient_generator(data: [i32]) {
    for item in data {
        yield item  # 每个元素都yield
    }
}

# 好的例子:
micro efficient_generator(data: [i32]) {
    let mut batch = []
    for item in data {
        batch.push(item)
        if batch.len() >= 100 {
            yield batch  # 批量yield
            batch = []
        }
    }
    if !batch.is_empty() {
        yield batch  # 处理剩余项目
    }
}

4. 测试协程

valkyrie
# 协程测试策略
micro test_generator() {
    let gen = count_up(3)
    
    # 测试生成的值
    @assert_equal(gen.next(), Some(0))
    @assert_equal(gen.next(), Some(1))
    @assert_equal(gen.next(), Some(2))
    @assert_equal(gen.next(), None)
    
    # 测试状态
    @assert_equal(gen.state(), CoroutineState::Completed)
}

# 异步协程测试
async micro test_async_generator() {
    let gen = async_data_processor()
    
    let first_result = gen.next().await?
    @assert(first_result.is_some())
    
    let final_result = gen.collect_all().await?
    @assert_equal(final_result.len(), 5)
}

Released under the MIT License.