1.前言

在Dart库中,有两种实现异步编程的方式(FutureStream),使用它们只需要在代码中引入dart:async即可。
本文主要介绍Stream的相关概念及利用其异步特性来实现简单的响应式编程。

2.什么是Stream?

为了将Stream的概念可视化与简单化,可以将它想成是管道(pipe)的两端,它只允许从一端插入数据并通过管道从另外一端流出数据。
在Flutter中,

我们将这样的管道称作Stream
为了控制Stream,我们通常可以使用StreamController来进行管理;
为了向Stream中插入数据,StreamController提供了类型为StreamSink的属性sink作为入口;
StreamController提供stream属性作为数据的出口。

通常在本文范围内我们会使用StreamController来管理Stream,后续文章在引入rxdart这个库之后会更多的使用Subject。

3.Stream可以传输什么?

任何东西都可以!包括简单的值,事件,对象,集合,map,error或者其他的Stream,任何类型的数据都可以使用Stream来传输。

4.如何感知Stream中传输的数据?

当你需要使用Stream中传输的数据时,可以简单地使用listen函数来监听StreamController的stream属性。
在定义完listener(监听者)之后,我们会收到StreamSubscription(订阅)对象,通过这个订阅对象我们就可以接收到Stream发送数据变更的通知。
只要至少有一个活跃的监听者,Stream会创建以下事件来通知订阅对象:

数据从Stream中流出;
Stream接收到错误信息;
Stream关闭。

同时,你也可以通过订阅对象来:

停止监听;
暂停监听;
恢复监听。

5.Stream是单纯的管道?

肯定不是!Stream均可以在数据流入之前和流出之前对数据进行处理。
我们可以使用StreamTransformer来处理Stream中的数据,它具有以下特点:

它是Stream中能够捕获数据流的一系列函数;
可以对Stream中的数据进行处理加工;
经过处理的Stream依然是Stream(链式编码的前提)。

StreamTransformer可以用于以下场景(包括但不仅限于):

过滤:过滤基于条件的任何类型数据;
修改:对任何类型数据进行修改;
重新分组:对数据进行重新分组;
向其他Stream注入数据;
缓存
其他基于数据的行为和操作;

6.Stream的类型

有两种类型的Stream。

单点订阅(单播)Stream

该类型的Stream在其整个生命周期中只有一个监听者。

如果订阅已经被取消,就不能再次监听这个Stream。

多点订阅(广播)Stream

对比前者,该类型的Stream不限制监听者的数量。

我们可以在广播Stream的任何周期添加监听者,新的监听者会在监听的同时收到相应的订阅。

7.如何使用Stream流出数据构建Widget?

Flutter提供了名为StreamBuilder的类,它监听Stream,并且在Stream数据流出时会自动重新构建widget,通过其builder进行回调。
以下是示例代码:

StreamBuilder<T>(key: <#该组件的唯一ID,可选#>, stream: <#被监听的Stream#>, initialData: <#初始数据,可选#>, builder:(BuildContext context, AsyncSnapshot<T> snapshot) {
 return <#实际的widget构造代码#>;
})

demo

import 'dart:async';

import 'package:flutter/material.dart';

class StreamDemo extends StatelessWidget {
  @override
  Widget build(BuildContext context) {
    return Scaffold(
      appBar: AppBar(
        title: Text('StreamDemo'),
        elevation: 0.0,
      ),
      body: StreamDemoHome(),
    );
  }
}

class StreamDemoHome extends StatefulWidget {
  @override
  _StreamDemoHomeState createState() => _StreamDemoHomeState();
}

class _StreamDemoHomeState extends State<StreamDemoHome> {
  StreamSubscription _streamDemoSubscription;
  StreamController<String> _streamDemo;
  StreamSink _sinkDemo;
  String _data = '...';

  @override
  void dispose() {
    _streamDemo.close();
    super.dispose();
  }

  @override
  void initState() {
    super.initState();

    print('Create a stream.');
    // Stream<String> _streamDemo = Stream.fromFuture(fetchData());
    _streamDemo = StreamController.broadcast();
    _sinkDemo = _streamDemo.sink;

    print('Start listening on a stream.');
    _streamDemoSubscription =
        _streamDemo.stream.listen(onData, onError: onError, onDone: onDone);
    
    _streamDemo.stream.listen(onDataTwo, onError: onError, onDone: onDone);

    print('Initialize completed.');
  }

  void onDone() {
    print('Done!');
  }

  void onError(error) {
    print('Error: $error');
  }

  void onData(String data) {
    setState(() {
      _data = data;
    });
    print('$data');
  }

  void onDataTwo(String data) {
    print('onDataTwo: $data');
  }

  void _pauseStream() {
    print('Pause subscription');
    _streamDemoSubscription.pause();
  }

  void _resumeStream() {
    print('Resume subscription');
    _streamDemoSubscription.resume();
  }

  void _cancelStream() {
    print('Cancel subscription');
    _streamDemoSubscription.cancel();
  }

  void _addDataToStream() async {
    print('Add data to stream.');

    String data = await fetchData();
    // _streamDemo.add(data);
    _sinkDemo.add(data);
  }

  Future<String> fetchData() async {
    await Future.delayed(Duration(seconds: 5));
    // throw 'Something happened';
    return 'hello ~';
  }

  @override
  Widget build(BuildContext context) {
    return Container(
      child: Center(
        child: Column(
          mainAxisAlignment: MainAxisAlignment.center,
          children: <Widget>[
            // Text(_data),
            StreamBuilder(
              stream: _streamDemo.stream,
              initialData: '...',
              builder: (context, snapshot) {
                return Text('${snapshot.data}');
              },
            ),
            Row(
              mainAxisAlignment: MainAxisAlignment.center,
              children: <Widget>[
                FlatButton(
                  child: Text('Add'),
                  onPressed: _addDataToStream,
                ),
                FlatButton(
                  child: Text('Pause'),
                  onPressed: _pauseStream,
                ),
                FlatButton(
                  child: Text('Resume'),
                  onPressed: _resumeStream,
                ),
                FlatButton(
                  child: Text('Cancel'),
                  onPressed: _cancelStream,
                ),
              ],
            ),
          ],
        ),
      ),
    );
  }
}

效果:

Flutter响应式编程-风君雪科技博客