反应式编程
反应式编程是一种基于数据流和变化传播的编程范式。在 Valkyrie 中,反应式编程通过 Observable、Signal 和 Reactive 等抽象提供了强大的数据流处理能力。
核心概念
Observable
Observable 是反应式编程的基础,表示一个可观察的数据流:
valkyrie
# Observable 特征定义
trait Observable<T> {
subscribe<F>(self, observer: F) -> Subscription where F: micro(T) -> Unit
map<U, F>(self, f: F) -> Observable<U> where F: micro(T) -> U
filter<F>(self, predicate: F) -> Observable<T> where F: micro(T) -> Boolean
merge(self, other: Observable<T>) -> Observable<T>
take(self, count: usize) -> Observable<T>
}
# 创建 Observable
let numbers = Observable::from([1, 2, 3, 4, 5])
let mouse_clicks = Observable::from_events("click")
let timer = Observable::interval(1000) # 每秒触发一次Signal
Signal 是具有当前值的反应式状态:
valkyrie
# Signal 定义
class Signal<T> {
private value: T
private observers: [micro(T) -> Unit]
new(initial: T) -> Signal<T>
get(self) -> T
set(mut self, new_value: T)
update<F>(mut self, updater: F) where F: micro(T) -> T
subscribe<F>(self, observer: F) -> Subscription where F: micro(T) -> Unit
}
# 使用 Signal
let counter = Signal::new(0)
let doubled = counter.map(|x| x * 2)
# 订阅变化
counter.subscribe(|value| {
print("计数器值: ${value}")
})
# 更新值
counter.set(5) # 输出: 计数器值: 5
counter.update(|x| x + 1) # 输出: 计数器值: 6基本操作符
转换操作符
valkyrie
# map - 转换每个值
let numbers = Observable::from([1, 2, 3, 4, 5])
let squares = numbers.map(|x| x * x)
# flat_map - 扁平化映射
let words = Observable::from(["hello", "world"])
let characters = words.flat_map(|word| {
Observable::from(word.chars())
})
# scan - 累积操作
let numbers = Observable::from([1, 2, 3, 4, 5])
let running_sum = numbers.scan(0, |acc, x| acc + x)
# 输出: 1, 3, 6, 10, 15过滤操作符
valkyrie
# filter - 过滤值
let numbers = Observable::from([1, 2, 3, 4, 5, 6])
let evens = numbers.filter(|x| x % 2 == 0)
# take - 取前 N 个值
let first_three = numbers.take(3)
# skip - 跳过前 N 个值
let after_two = numbers.skip(2)
# distinct - 去重
let unique = Observable::from([1, 1, 2, 2, 3, 3]).distinct()组合操作符
valkyrie
# merge - 合并多个流
let stream1 = Observable::from([1, 3, 5])
let stream2 = Observable::from([2, 4, 6])
let merged = stream1.merge(stream2)
# zip - 配对组合
let names = Observable::from(["Alice", "Bob", "Charlie"])
let ages = Observable::from([25, 30, 35])
let people = names.zip(ages).map(|(name, age)| {
Person { name, age }
})
# combine_latest - 最新值组合
let temperature = Signal::new(20.0)
let humidity = Signal::new(60.0)
let comfort_index = temperature.combine_latest(humidity).map(|(temp, hum)| {
calculate_comfort(temp, hum)
})实际应用示例
用户界面反应式更新
valkyrie
# 反应式 UI 组件
class CounterComponent {
private count: Signal<i32>
private increment_clicks: Observable<Unit>
private decrement_clicks: Observable<Unit>
new() -> CounterComponent {
let count = Signal::new(0)
let increment_clicks = Observable::from_events("increment")
let decrement_clicks = Observable::from_events("decrement")
# 响应点击事件
increment_clicks.subscribe(|| {
count.update(|x| x + 1)
})
decrement_clicks.subscribe(|| {
count.update(|x| x - 1)
})
CounterComponent {
count,
increment_clicks,
decrement_clicks
}
}
render(self) -> Widget {
let count_text = self.count.map(|value| "计数: ${value}")
VStack {
Text(count_text)
HStack {
Button("增加").on_click(self.increment_clicks)
Button("减少").on_click(self.decrement_clicks)
}
}
}
}数据流处理
valkyrie
# 实时数据处理管道
class DataProcessor {
process_sensor_data(sensor_stream: Observable<SensorReading>) -> Observable<ProcessedData> {
sensor_stream
.filter(|reading| reading.is_valid()) # 过滤无效数据
.map(|reading| reading.normalize()) # 标准化数据
.buffer(Duration::seconds(5)) # 5秒缓冲窗口
.map(|batch| self.analyze_batch(batch)) # 批量分析
.filter(|result| result.confidence > 0.8) # 过滤低置信度结果
}
private micro analyze_batch(batch: [SensorReading]) -> ProcessedData {
let average = batch.iter().map(|r| r.value).sum() / batch.len()
let variance = calculate_variance(batch)
ProcessedData {
timestamp: now(),
average,
variance,
confidence: calculate_confidence(variance)
}
}
}
# 使用数据处理器
let processor = DataProcessor::new()
let sensor_stream = Observable::from_websocket("ws://sensor.example.com")
let processed_stream = processor.process_sensor_data(sensor_stream)
processed_stream.subscribe(|data| {
print("处理结果: 平均值=${data.average}, 置信度=${data.confidence}")
if data.confidence > 0.95 {
alert_system.notify("高置信度数据: ${data}")
}
})异步操作组合
valkyrie
# 反应式 HTTP 客户端
class ReactiveHttpClient {
get<T>(url: String) -> Observable<Result<T, HttpError>> {
Observable::create(|observer| {
async {
try {
let response = http_get(url).await?
let data = response.json::<T>().await?
observer.next(Ok(data))
observer.complete()
}
.catch {
case e:
observer.error(e)
}
}
})
}
retry<T>(observable: Observable<Result<T, HttpError>>, max_retries: usize) -> Observable<Result<T, HttpError>> {
observable.catch_error(|error| {
if max_retries > 0 {
print("重试请求,剩余次数: ${max_retries}")
Observable::timer(Duration::seconds(1))
.flat_map(|| self.retry(observable, max_retries - 1))
} else {
Observable::error(error)
}
})
}
}
# 使用示例
let client = ReactiveHttpClient::new()
let user_data = client.get::<User>("https://api.example.com/user/123")
.retry(3) # 最多重试3次
.timeout(Duration::seconds(10)) # 10秒超时
user_data.subscribe(|result| {
result.match {
case Ok(user):
print("用户信息: ${user.name}")
case Err(error):
print("获取用户信息失败: ${error}")
}
})错误处理
错误恢复策略
valkyrie
# 错误处理操作符
trait ObservableErrorHandling<T> {
# 捕获错误并提供默认值
catch_error<F>(self, handler: F) -> Observable<T> where F: micro(Error) -> Observable<T>
# 重试操作
retry(self, count: usize) -> Observable<T>
# 超时处理
timeout(self, duration: Duration) -> Observable<T>
}
# 实际使用
let unreliable_stream = fetch_data_stream()
.catch_error(|error| {
print("发生错误: ${error},使用缓存数据")
Observable::from(cached_data)
})
.retry(3)
.timeout(Duration::seconds(30))
unreliable_stream.subscribe(|data| {
process_data(data)
})错误传播控制
valkyrie
# 部分错误处理
let mixed_stream = Observable::from([1, 2, 3, 4, 5])
.map(|x| {
if x == 3 {
raise ValueError("无效值: 3")
}
x * 2
})
.on_error_resume_next(|error| {
print("跳过错误: ${error}")
Observable::empty() # 跳过错误项
})
mixed_stream.subscribe(|value| {
print("处理值: ${value}")
}) # 输出: 2, 4, 8, 10 (跳过了3)资源管理
订阅生命周期
valkyrie
# Subscription 管理
class Subscription {
private is_disposed: Boolean
private cleanup: micro() -> Unit
dispose(mut self) {
if !self.is_disposed {
self.cleanup()
self.is_disposed = true
}
}
is_disposed(self) -> Boolean {
self.is_disposed
}
}
# CompositeSubscription 用于管理多个订阅
class CompositeSubscription {
private subscriptions: [Subscription]
add(mut self, subscription: Subscription) {
self.subscriptions.push(subscription)
}
dispose_all(mut self) {
for subscription in self.subscriptions {
subscription.dispose()
}
self.subscriptions.clear()
}
}
# 使用示例
let composite = CompositeSubscription::new()
let sub1 = timer_stream.subscribe(|_| print("定时器触发"))
let sub2 = click_stream.subscribe(|_| print("点击事件"))
composite.add(sub1)
composite.add(sub2)
# 在组件销毁时清理所有订阅
composite.dispose_all()背压处理
valkyrie
# 背压策略
enum BackpressureStrategy {
Buffer(capacity: usize), # 缓冲策略
Drop, # 丢弃策略
Latest, # 保留最新策略
Error # 错误策略
}
# 应用背压控制
let fast_producer = Observable::interval(Duration::milliseconds(1)) # 每毫秒产生数据
let slow_consumer = fast_producer
.observe_on(Scheduler::computation()) # 在计算线程池处理
.buffer(100) # 缓冲100个元素
.sample(Duration::seconds(1)) # 每秒采样一次
slow_consumer.subscribe(|batch| {
print("处理批次,大小: ${batch.len()}")
# 慢速处理逻辑
Thread::sleep(Duration::milliseconds(100))
})调度器
线程调度
valkyrie
# 调度器类型
enum Scheduler {
CurrentThread, # 当前线程
Computation, # 计算线程池
IO, # I/O 线程池
NewThread, # 新线程
Trampoline # 蹦床调度器
}
# 指定调度器
let data_stream = Observable::from_file("large_file.txt")
.subscribe_on(Scheduler::IO) # 在 I/O 线程读取文件
.observe_on(Scheduler::Computation) # 在计算线程处理数据
.map(|line| expensive_computation(line))
.observe_on(Scheduler::CurrentThread) # 在主线程更新 UI
data_stream.subscribe(|result| {
update_ui(result) # UI 更新必须在主线程
})测试支持
测试调度器
valkyrie
# 测试用的虚拟时间调度器
class TestScheduler {
private virtual_time: Duration
private scheduled_actions: [(Duration, micro() -> Unit)]
advance_time_by(mut self, duration: Duration) {
let target_time = self.virtual_time + duration
while let Some((time, action)) = self.scheduled_actions.first() {
if time <= target_time {
self.virtual_time = time
action()
self.scheduled_actions.remove(0)
} else {
break
}
}
self.virtual_time = target_time
}
}
# 测试示例
↯[test]
micro test_timer_observable() {
let scheduler = TestScheduler::new()
let timer = Observable::timer(Duration::seconds(5), scheduler)
let mut received_values = []
timer.subscribe(|value| {
received_values.push(value)
})
# 推进虚拟时间
scheduler.advance_time_by(Duration::seconds(3))
@assert_eq(received_values.len(), 0) # 还没到时间
scheduler.advance_time_by(Duration::seconds(3))
@assert_eq(received_values.len(), 1) # 定时器触发
}最佳实践
1. 避免内存泄漏
valkyrie
# 正确的订阅管理
class Component {
private subscriptions: CompositeSubscription
new() -> Component {
let subscriptions = CompositeSubscription::new()
# 订阅数据流
let sub = data_stream.subscribe(|data| {
self.handle_data(data)
})
subscriptions.add(sub)
Component { subscriptions }
}
destroy(mut self) {
# 组件销毁时清理订阅
self.subscriptions.dispose_all()
}
}2. 合理使用操作符
valkyrie
# 优化操作符链
let optimized_stream = source_stream
.filter(|x| x.is_valid()) # 尽早过滤
.take(1000) # 限制数量
.map(|x| x.transform()) # 转换数据
.distinct() # 去重
.buffer(Duration::seconds(1)) # 批处理
# 避免过长的操作符链
let intermediate = source_stream
.filter(|x| x.is_valid())
.map(|x| x.normalize())
let final_stream = intermediate
.group_by(|x| x.category)
.flat_map(|group| group.buffer(10))3. 错误边界
valkyrie
# 设置错误边界防止整个流崩溃
let resilient_stream = risky_stream
.map(|item| {
try {
process_item(item)
}
.catch {
case ProcessingError(e):
log_error("处理失败: ${e}")
default_value() # 提供默认值
case _:
raise # 重新抛出严重错误
}
})
.filter(|result| result.is_some())通过这些反应式编程模式,Valkyrie 提供了强大而灵活的数据流处理能力,使开发者能够构建响应式、可维护的应用程序。