feat: implement some api; add md editor to app; login and signup logic.

This commit is contained in:
2021-12-30 20:04:00 +08:00
parent 1a0e8f8de7
commit 853132f1a8
61 changed files with 3205 additions and 149 deletions
+77
View File
@@ -0,0 +1,77 @@
// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import 'dart:async';
import 'neat_cache.dart' show Cache;
/// Low-level interface for a cache.
///
/// This can be an in-memory cache, something that writes to disk or to an
/// cache service such as memcached or redis.
///
/// The [Cache] provided by `package:neat_cache`, is intended to wrap
/// a [CacheProvider] and provide a more convinient high-level interface.
///
/// Implementers of [CacheProvider] can implement something that stores a value
/// of any type `T`, but usually implementers should aim to implement
/// `CacheProvider<List<int>>` which stores binary data.
///
/// Implementations of the [CacheProvider] interface using a remote backing
/// store should throw [IntermittentCacheException] when an intermittent network
/// issue occurs. The [CacheProvider] should obviously attempt to reconnect to
/// the remote backing store, but it should not retry operations.
///
/// Operations will be retried by [Cache], if necessary. Many use-cases of
/// caching are resilient to intermittent failures.
abstract class CacheProvider<T> {
/// Fetch data stored under [key].
///
/// If nothing is cached for [key], this **must** return `null`.
Future<T?> get(String key);
/// Set [value] stored at [key] with optional [ttl].
///
/// If a value is already stored at [key], that value should be overwritten
/// with the new [value] given here.
///
/// When given [ttl] is advisory, however, implementers should avoid returning
/// entries that are far past their [ttl].
Future<void> set(String key, T value, [Duration? ttl]);
/// Clear value stored at [key].
///
/// After this has returned future calls to [get] for the given [key] should
/// not return any value, unless a new value have been set.
Future<void> purge(String key);
/// Close all connections, causing all future operations to throw.
///
/// This method frees resources used by this [CacheProvider], if backed by
/// a remote service like redis, this should close the connection.
///
/// Calling [close] multiple times does not throw. But after this has returned
/// all future operations should throw [StateError].
Future<void> close();
}
/// Exception thrown when there is an intermittent exception.
///
/// This is typically thrown if there is an intermittent connection error.
class IntermittentCacheException implements Exception {
final String _message;
IntermittentCacheException(this._message);
@override
String toString() => _message;
}
+228
View File
@@ -0,0 +1,228 @@
// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import 'dart:async';
import 'dart:convert';
import 'package:convert/convert.dart' show IdentityCodec;
import 'package:logging/logging.dart';
import 'package:retry/retry.dart';
import 'cache_provider.dart';
import 'src/providers/inmemory.dart';
import 'src/providers/redis.dart';
final _logger = Logger('neat_cache');
/// Cache for objects of type [T], wrapping a [CacheProvider] to provide a
/// high-level interface.
///
/// Cache entries are accessed using the indexing operator `[]`, this returns a
/// [Entry<T>] wrapper that can be used to get/set data cached at given key.
///
/// **Example**
/// ```dart
/// final Cache<List<int>> cache = Cache.inMemoryCache(4096);
///
/// // Write data to cache
/// await cache['cached-zeros'].set([0, 0, 0, 0]);
///
/// // Read data from cache
/// var r = await cache['cached-zeros'].get();
/// expect(r, equals([0, 0, 0, 0]));
/// ```
///
/// A [Cache] can be _fused_ with a [Codec] using [withCodec] to get a cache
/// that stores a different kind of objects. It is also possible to create
/// a chuild cache using [withPrefix], such that all entries in the child
/// cache have a given prefix.
abstract class Cache<T> {
/// Get [Entry] wrapping data cached at [key].
Entry<T> operator [](String key);
/// Get a [Cache] wrapping of this cache with given [prefix].
Cache<T> withPrefix(String prefix);
/// Get a [Cache] wrapping of this cache by encoding objects of type [S] as
/// [T] using the given [codec].
Cache<S> withCodec<S>(Codec<S, T> codec);
/// Get a [Cache] wrapping of this cache with given [ttl] as default for all
/// entries being set using [Entry.set].
///
/// This only specifies a different default [ttl], to be used when [Entry.set]
/// is called without a [ttl] parameter.
Cache<T> withTTL(Duration ttl);
/// Create a [Cache] wrapping a [CacheProvider].
factory Cache(CacheProvider<T> provider) {
return _Cache<T, T>(provider, '', IdentityCodec());
}
/// Create an in-memory [CacheProvider] holding a maximum of [maxSize] cache
/// entries.
static CacheProvider<List<int>> inMemoryCacheProvider(int maxSize) {
return InMemoryCacheProvider(maxSize);
}
/// Create a redis [CacheProvider] by connecting using a [connectionString] on
/// the form `redis://<host>:<port>`.
static CacheProvider<List<int>> redisCacheProvider(Uri connectionString, {Duration commandTimeLimit = const Duration(milliseconds: 200)}) {
return RedisCacheProvider(connectionString, commandTimeLimit: commandTimeLimit);
}
}
/// Pointer to a location in the cache.
///
/// This simply wraps a cache key, such that you don't need to supply a cache
/// key for [get], [set] and [purge] operations.
abstract class Entry<T> {
/// Get value stored in this cache entry.
///
/// If used without [create], this function simply gets the value or `null` if
/// no value is stored.
///
/// If used with [create], this function becomes an upsert, returning the
/// value stored if any, otherwise creating a new value and storing it with
/// optional [ttl]. If multiple callers are using the same cache this is an
/// inherently racy operation, that is multiple instances of the value may
/// be created.
///
/// The [get] method is a best-effort method. In case of intermittent failures
/// from the underlying [CacheProvider] the [get] method will ignore failures
/// and return `null` (or result from [create] if specified).
Future<T?> get([Future<T?> Function() create, Duration ttl]);
/// Set the value stored in this cache entry.
///
/// If given [ttl] specifies the time-to-live. Notice that this is advisatory,
/// the underlying [CacheProvider] may choose to evit cache entries at any
/// time. However, it can be assumed that entries will not live far past
/// their [ttl].
///
/// The [set] method is a best-effort method. In case of intermittent failures
/// from the underlying [CacheProvider] the [set] method will ignore failures.
///
/// To ensure that cache entries are purged, use the [purge] method with
/// `retries` not set to zero.
Future<T?> set(T? value, [Duration ttl]);
/// Clear the value stored in this cache entry.
///
/// If [retries] is `0` (default), this is a best-effort method, which will
/// ignore intermittent failures. If [retries] is non-zero the operation will
/// be retried with exponential back-off, and [IntermittentCacheException]
/// will be thrown if all retries fails.
Future purge({int retries = 0});
}
class _Cache<T, V> implements Cache<T> {
final CacheProvider<V> _provider;
final String _prefix;
final Codec<T, V> _codec;
final Duration? _ttl;
_Cache(this._provider, this._prefix, this._codec, [this._ttl]);
@override
Entry<T> operator [](String key) => _Entry(this, _prefix + key);
@override
Cache<T> withPrefix(String prefix) =>
_Cache(_provider, _prefix + prefix, _codec, _ttl);
@override
Cache<S> withCodec<S>(Codec<S, T> codec) =>
_Cache(_provider, _prefix, codec.fuse(_codec), _ttl);
@override
Cache<T> withTTL(Duration ttl) => _Cache(_provider, _prefix, _codec, ttl);
}
class _Entry<T, V> implements Entry<T> {
final _Cache<T, V> _owner;
final String _key;
_Entry(this._owner, this._key);
@override
Future<T?> get([Future<T?> Function()? create, Duration? ttl]) async {
V? value;
try {
_logger.finest(() => 'reading cache entry for "$_key"');
value = await _owner._provider.get(_key);
} on IntermittentCacheException {
_logger.fine(
// embedding [intermittent-cache-failure] to allow for easy log metrics
'[intermittent-cache-failure], failed to get cache entry for "$_key"',
);
value = null;
}
if (value == null) {
if (create == null) {
return null;
}
final created = await create();
if (created != null) {
// Calling `set(null)` is equivalent to `purge()`, we can skip that here
await set(created, ttl);
}
return created;
}
return _owner._codec.decode(value);
}
@override
Future<T?> set(T? value, [Duration? ttl]) async {
if (value == null) {
await purge();
return null;
}
ttl ??= _owner._ttl;
final raw = _owner._codec.encode(value);
try {
await _owner._provider.set(_key, raw, ttl);
} on IntermittentCacheException {
_logger.fine(
// embedding [intermittent-cache-failure] to allow for easy log metrics
'[intermittent-cache-failure], failed to set cache entry for "$_key"',
);
}
return value;
}
@override
Future<void> purge({int retries = 0}) async {
// Common path is that we have no retries
if (retries == 0) {
try {
await _owner._provider.purge(_key);
} on IntermittentCacheException {
_logger.fine(
// embedding [intermittent-cache-failure] to allow for easy log metrics
'[intermittent-cache-failure], failed to purge cache entry for "$_key"',
);
}
return;
}
// Test that we have a positive number of retries.
if (retries < 0) {
ArgumentError.value(retries, 'retries', 'retries < 0 is not allowed');
}
return await retry(
() => _owner._provider.purge(_key),
retryIf: (e) => e is IntermittentCacheException,
maxAttempts: 1 + retries,
);
}
}
+110
View File
@@ -0,0 +1,110 @@
// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import 'dart:async';
import '../../cache_provider.dart';
class _InMemoryEntry<T> {
final T value;
final DateTime? _expires;
_InMemoryEntry(this.value, [this._expires]);
bool get isExpired => _expires != null && _expires!.isBefore(DateTime.now());
}
/// Simple two-generational LRU cache inspired by:
/// https://github.com/sindresorhus/quick-lru
class InMemoryCacheProvider<T> extends CacheProvider<T> {
/// New generation of cache entries.
Map<String, _InMemoryEntry<T>> _new = <String, _InMemoryEntry<T>>{};
/// Old generation of cache entries.
Map<String, _InMemoryEntry<T>> _old = <String, _InMemoryEntry<T>>{};
/// Maximum size before clearing old generation.
final int _maxSize;
/// Have this been closed.
bool _isClosed = false;
InMemoryCacheProvider(this._maxSize);
/// Clear old generation, if _maxSize have been reached.
void _maintainGenerations() {
if (_new.length >= _maxSize) {
_old = _new;
_new = {};
}
}
@override
Future<T?> get(String key) async {
if (_isClosed) {
throw StateError('CacheProvider.close() have been called');
}
// Lookup in the new generation
var entry = _new[key];
if (entry != null) {
if (!entry.isExpired) {
return entry.value;
}
// Remove, if expired
_new.remove(key);
}
// Lookup in the old generation
entry = _old[key];
if (entry != null) {
if (!entry.isExpired) {
// If not expired, we insert the entry into the new generation
_new[key] = entry;
_maintainGenerations();
return entry.value;
}
// Remove, if expired
_old.remove(key);
}
return null;
}
@override
Future<void> set(String key, T value, [Duration? ttl]) async {
if (_isClosed) {
throw StateError('CacheProvider.close() have been called');
}
if (ttl == null) {
_new[key] = _InMemoryEntry(value);
} else {
_new[key] = _InMemoryEntry(value, DateTime.now().add(ttl));
}
// Always remove key from old generation to avoid risks of looking up there
// if it's overwritten by an entry with a shorter ttl
_old.remove(key);
_maintainGenerations();
}
@override
Future<void> purge(String key) async {
if (_isClosed) {
throw StateError('CacheProvider.close() have been called');
}
_new.remove(key);
_old.remove(key);
}
@override
Future<void> close() async {
_isClosed = true;
_old = {};
_new = {};
}
}
+210
View File
@@ -0,0 +1,210 @@
// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import 'dart:async';
import 'dart:io' show IOException, Socket, SocketOption;
import 'dart:typed_data';
import 'resp.dart';
import 'package:logging/logging.dart';
import '../../cache_provider.dart';
final _log = Logger('neat_cache');
class _RedisContext {
final RespClient client;
_RedisContext({
required this.client,
});
}
class RedisCacheProvider extends CacheProvider<List<int>> {
final Uri _connectionString;
final Duration _connectTimeLimit;
final Duration _commandTimeLimit;
final Duration _reconnectDelay;
Future<_RedisContext>? _context;
bool _isClosed = false;
RedisCacheProvider(
Uri connectionString, {
Duration connectTimeLimit = const Duration(seconds: 30),
Duration commandTimeLimit = const Duration(milliseconds: 200),
Duration reconnectDelay = const Duration(seconds: 30),
}) : _connectionString = connectionString,
_connectTimeLimit = connectTimeLimit,
_commandTimeLimit = commandTimeLimit,
_reconnectDelay = reconnectDelay {
if (!connectionString.isScheme('redis')) {
throw ArgumentError.value(
connectionString, 'connectionString', 'must have scheme redis://');
}
if (!connectionString.hasEmptyPath) {
throw ArgumentError.value(
connectionString, 'connectionString', 'cannot have a path');
}
if (connectTimeLimit.isNegative) {
throw ArgumentError.value(
connectTimeLimit, 'connectTimeLimit', 'must be positive');
}
if (commandTimeLimit.isNegative) {
throw ArgumentError.value(
commandTimeLimit, 'commandTimeLimit', 'must be positive');
}
if (reconnectDelay.isNegative) {
throw ArgumentError.value(
reconnectDelay, 'reconnectDelay', 'must be positive');
}
}
Future<_RedisContext> _createContext() async {
try {
_log.info('Connecting to redis');
final socket = await Socket.connect(
_connectionString.host,
_connectionString.port,
).timeout(_connectTimeLimit);
socket.setOption(SocketOption.tcpNoDelay, true);
var client = RespClient(socket, socket);
if (_connectionString.userInfo.isNotEmpty) {
await client.command(['AUTH', _connectionString.userInfo]);
}
// Create context
return _RedisContext(
client: client,
);
} on RedisConnectionException {
throw IntermittentCacheException('connection failed');
} on TimeoutException {
throw IntermittentCacheException('connect failed with timeout');
} on IOException catch (e) {
throw IntermittentCacheException('connect failed with IOException: $e');
} on Exception {
throw IntermittentCacheException('connect failed with exception');
}
}
Future<_RedisContext> _getContext() {
if (_context != null) {
return _context!;
}
_context = _createContext();
scheduleMicrotask(() async {
_RedisContext ctx;
try {
ctx = await _context!;
} on IntermittentCacheException {
// If connecting fails, then we sleep and try again
await Future.delayed(_reconnectDelay);
_context = null; // reset _context, so next operation creates a new
return;
} catch (e) {
_log.shout('unknown error/exception connecting to redis', e);
_context = null; // reset _context, so next operation creates a new
rethrow; // propagate the error to crash to application.
}
// If connecting was successful, then we await the connection being
// closed or error, and we reset _context.
try {
await ctx.client.closed;
} catch (e) {
// ignore error
}
_context = null;
});
return _context!;
}
Future<T> _withResp<T>(Future<T> Function(RespClient) fn) async {
if (_isClosed) {
throw StateError('CacheProvider.closed() has been called');
}
final ctx = await _getContext();
try {
return await fn(ctx.client).timeout(_commandTimeLimit);
} on RedisCommandException catch (e) {
throw AssertionError('error from redis command: $e');
} on TimeoutException {
// If we had a timeout, doing the command we forcibly disconnect
// from the server, such that next retry will use a new connection.
await ctx.client.close(force: true);
throw IntermittentCacheException('redis command timeout');
} on RedisConnectionException catch (e) {
throw IntermittentCacheException('redis error: $e');
} on IOException catch (e) {
throw IntermittentCacheException('socket broken: $e');
}
}
@override
Future<void> close() async {
_isClosed = true;
if (_context != null) {
try {
final ctx = await _context!;
await ctx.client.close();
} catch (e) {
// ignore
}
}
}
@override
Future<List<int>?> get(String key) => _withResp((client) async {
final r = await client.command(['GET', key]).catchError((e) {
throw e;
});
if (r == null) {
return null;
}
if (r is Uint8List) {
return r;
}
assert(false, 'unexpected response from redis server');
// Force close the client
scheduleMicrotask(() => client.close(force: true));
});
@override
Future<void> set(String key, List<int> value, [Duration? ttl]) =>
_withResp((client) async {
final r = await client.command([
'SET',
key,
value,
if (ttl != null) ...<Object>['EX', ttl.inSeconds],
]);
if (r != 'OK') {
assert(false, 'unexpected response from redis server');
// Force close the client
scheduleMicrotask(() => client.close(force: true));
}
});
@override
Future<void> purge(String key) => _withResp((client) async {
final r = await client.command(['DEL', key]);
if (r is! int) {
assert(false, 'unexpected response from redis server');
// Force close the client
scheduleMicrotask(() => client.close(force: true));
}
});
}
+526
View File
@@ -0,0 +1,526 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
/// Minimalistic [RESP[1] protocol implementation in Dart.
///
/// [1]: https://redis.io/topics/protocol
library resp;
import 'dart:async';
import 'dart:collection';
import 'dart:convert';
import 'dart:typed_data';
import 'package:logging/logging.dart';
final _log = Logger('neat_cache:redis');
/// Thrown when the server returns an error in response to a command.
class RedisCommandException implements Exception {
final String type;
final String message;
RedisCommandException._(this.type, this.message);
@override
String toString() => 'RedisCommandException: $type $message';
}
/// Thrown if the redis connection is broken.
///
/// This typically happens if the connection is unexpectedly closed, the
/// [RESP protocol][1] is violated, or there is an internal error.
///
/// [1]: https://redis.io/topics/protocol
class RedisConnectionException implements Exception {
final String message;
RedisConnectionException._(this.message);
@override
String toString() => 'RedisConnectionException: $message';
}
/// Client implementing the [RESP protocol][1].
///
/// [1]: https://redis.io/topics/protocol
class RespClient {
static final _newLine = ascii.encode('\r\n');
/// No value in redis can be more than [512 MB][1].
///
/// [1]: https://redis.io/topics/data-types
static const _maxValueSize = 512 * 1024 * 1024;
final _ByteStreamScanner _input;
final StreamSink<List<int>> _output;
Future _pendingStream = Future.value(null);
final _pending = Queue<Completer<Object?>>();
bool _closing = false;
final _closed = Completer<void>();
/// Creates an instance of [RespClient] given an [input]stream and an [output]
/// sink.
///
/// If connecting over TCP as usual the `Socket` object from `dart:io`
/// implements both [Stream<Uint8List>] and [StreamSink<List<int>>]. Thus,
/// the following example is a reasonable way to make a client:
///
/// ```dart
/// // Connect to redis server
/// final socket = await Socket.connect(host, port);
/// socket.setOption(SocketOption.tcpNoDelay, true);
///
/// // Create client
/// final client = RespClient(socket, socket);
/// ```
RespClient(
Stream<Uint8List> input,
StreamSink<List<int>> output,
) : _input = _ByteStreamScanner(input),
_output = output {
scheduleMicrotask(_readInput);
scheduleMicrotask(() async {
try {
await _output.done;
} catch (e, st) {
if (!_closing) {
return await _abort(e, st);
}
}
if (!_closing) {
await _abort(
RedisConnectionException._('outgoing connection closed'),
StackTrace.current,
);
}
});
}
/// Returns a [Future] that is resolved when the connection is closed.
Future<void> get closed => _closed.future;
/// Send command to redis and return the result.
///
/// The [args] is a list of:
/// * [String],
/// * [List<int>], and,
/// * [int].
/// This is always encoded as an _RESP Array_ of _RESP Bulk Strings_.
///
/// Response will decoded as follows:
/// * RESP Simple String: returns [String],
/// * RESP Error: throws [RedisCommandException],
/// * RESP Integer: returns [int],
/// * RESP Bulk String: returns [Uint8List],
/// * RESP nil Bulk String: returns `null`,
/// * RESP Array: returns [List<Object?>], and,
/// * RESP nil Arrray: returns `null`.
///
/// Throws [RedisConnectionException] if underlying connection as been broken
/// or if the [RESP protocol][1] has been violated. After this, the client
/// should not be used further.
///
/// Forwards any [Exception] thrown by the underlying connection and aborts
/// the [RespClient]. Once aborted [closed] will be resolved, and further
/// attempts to call [command] will throw [RedisConnectionException].
///
/// Consumers are encouraged to handle [RedisConnectionException] and
/// reconnect, creating a new [RespClient], when [RedisConnectionException] is
/// encountered.
///
/// [1]: https://redis.io/topics/protocol
Future<Object?> command(List<Object> args) async {
if (_closing) {
throw RedisConnectionException._('redis connection is closed');
}
final out = BytesBuilder(copy: false);
out.addByte('*'.codeUnitAt(0));
out.add(ascii.encode(args.length.toString()));
out.add(_newLine);
for (final arg in args) {
List<int> bytes;
if (arg is String) {
bytes = utf8.encode(arg);
} else if (arg is List<int>) {
bytes = arg;
} else if (arg is int) {
bytes = ascii.encode(arg.toString());
} else {
throw ArgumentError.value(
args,
'args',
'arguments for redis must be String, List<int>, int',
);
}
out.addByte(r'$'.codeUnitAt(0));
out.add(ascii.encode(bytes.length.toString()));
out.add(_newLine);
out.add(bytes);
out.add(_newLine);
}
final c = Completer<Object?>();
_pending.addLast(c);
try {
_pendingStream = _pendingStream
.then((value) => _output.addStream(Stream.value(out.toBytes())));
} on Exception catch (e, st) {
await _abort(e, st);
}
try {
return await c.future;
} on RedisCommandException catch (e) {
// Don't use rethrow because the stack-trace really should start here.
// we always throw RedisCommandException with a StackTrace.empty, because
// it's a thing that happens on the server, and that stack-trace of the
// code that reads the value from the server is uninteresting.
throw e; // ignore: use_rethrow_when_possible
}
}
/// Send `QUIT` command to redis and close the connection.
///
/// If [force] is `true`, then the connection will be forcibly closed
/// immediately. Otherwise, connection will reject new commands, but wait for
/// existing commands to complete.
Future<void> close({bool force = false}) async {
_closing = true;
if (!_closing) {
// Always send QUIT message to be nice
try {
final quit = command(['QUIT']);
scheduleMicrotask(() async {
await quit.catchError((_) {/* ignore */});
});
} catch (_) {
// ignore
}
}
if (!force) {
scheduleMicrotask(() async {
await _output.close().catchError((_) {/* ignore */});
});
await _closed.future;
} else {
await _output.close().catchError((_) {/* ignore */});
// Resolve all outstanding requests
final pending = _pending.toList(growable: false);
_pending.clear();
final e = RedisConnectionException._('redis client forcibly closed');
final st = StackTrace.current;
pending.forEach((c) => c.completeError(e, st));
}
await _input.cancel();
assert(_pending.isEmpty, 'new pending requests added after close()');
}
/// Abort due to internal error
Future<void> _abort(Object e, StackTrace st) async {
if (!_closing) {
_log.warning('redis connection broken:', e, st);
}
_closing = true;
// Resolve all outstanding requests
final pending = _pending.toList(growable: false);
_pending.clear();
scheduleMicrotask(() {
pending.forEach((c) => c.completeError(e, st));
});
if (!_closed.isCompleted) {
_closed.complete();
}
await _input.cancel();
assert(_pending.isEmpty, 'new pending requests added after aborting');
}
void _readInput() async {
try {
while (true) {
Object? value;
try {
value = await _readValue();
} on RedisConnectionException catch (e, st) {
return await _abort(e, st);
}
if (_pending.isEmpty) {
return await _abort(
RedisConnectionException._('unexpected value from server'),
StackTrace.current,
);
}
final c = _pending.removeFirst();
if (value is RedisCommandException) {
// This is an error code returned by the server, it doesn't have a
// stack-trace!
c.completeError(value, StackTrace.empty);
} else {
c.complete(value);
}
if (_closing && _pending.isEmpty) {
return _closed.complete();
}
}
} catch (e, st) {
_log.shout('internal redis client error:', e, st);
await _abort(
RedisConnectionException._('internal redis client error: $e'),
st,
);
}
}
static final _whitespacePattern = RegExp(r'\s');
Future<Object?> _readValue() async {
Uint8List line;
try {
line = await _input.readLine(maxSize: _maxValueSize);
} on Exception catch (e, st) {
await _abort(e, st);
throw RedisConnectionException._('exception reading line: $e');
}
if (line.isEmpty) {
throw RedisConnectionException._('Incoming stream from server closed');
}
if (!_endsWithNewLine(line)) {
throw RedisConnectionException._(
'Invalid server message: missing newline',
);
}
final type = line[0];
final rest = Uint8List.sublistView(line, 1, line.length - 2);
// Handle simple strings
if (type == '+'.codeUnitAt(0)) {
try {
return utf8.decode(rest);
} on FormatException catch (e) {
throw RedisConnectionException._(
'Invalid simple string from server: $e',
);
}
}
// Handle errors
if (type == '-'.codeUnitAt(0)) {
final message = utf8.decode(
rest,
allowMalformed: true,
);
final i = message.indexOf(_whitespacePattern);
if (i != -1 && i + 1 < message.length) {
return RedisCommandException._(
message.substring(0, i),
message.substring(i + 1),
);
}
return RedisCommandException._('ERR', message);
}
// Handle integers
if (type == ':'.codeUnitAt(0)) {
int value;
try {
value = int.parse(ascii.decode(rest));
} on FormatException catch (e) {
throw RedisConnectionException._(
'Invalid integer from server: $e',
);
}
if (value < 0) {
throw RedisConnectionException._(
'Invalid integer from server: value < 0',
);
}
return value;
}
// Handle bulk strings (binary blobs)
if (type == r'$'.codeUnitAt(0)) {
int length;
try {
length = int.parse(ascii.decode(rest));
} on FormatException catch (e) {
throw RedisConnectionException._(
'Invalid bulk string length from server: $e',
);
}
if (length == -1) {
return null; // Special case for nil value
}
if (length < 0 || length > _maxValueSize) {
throw RedisConnectionException._(
'Invalid bulk string length from server: $length',
);
}
Uint8List bytes;
try {
bytes = await _input.readBytes(length + 2);
} on Exception catch (e, st) {
await _abort(e, st);
throw RedisConnectionException._('exception reading bytes: $e');
}
if (bytes.length != length + 2) {
throw RedisConnectionException._('Incoming stream from server closed');
}
if (!_endsWithNewLine(bytes)) {
throw RedisConnectionException._('Invalid bulk string from server');
}
return Uint8List.sublistView(bytes, 0, length);
}
// Handle arrays
if (type == '*'.codeUnitAt(0)) {
int length;
try {
length = int.parse(ascii.decode(rest));
} on FormatException catch (e) {
throw RedisConnectionException._(
'Invalid array length from server: $e',
);
}
if (length == -1) {
return null; // Special case for nil value
}
if (length < 0) {
throw RedisConnectionException._(
'Invalid array length from server: $length',
);
}
final values = <Object?>[];
for (var i = 0; i < length; i++) {
values.add(await _readValue());
}
return values;
}
throw RedisConnectionException._(
'Unknown type from server: ${String.fromCharCode(type)}',
);
}
}
bool _endsWithNewLine(Uint8List line) {
final N = line.length;
return (N >= 2 &&
line[N - 2] == '\r'.codeUnitAt(0) &&
line[N - 1] == '\n'.codeUnitAt(0));
}
/// An stream wrapper for reading line-by-line or reading N bytes.
class _ByteStreamScanner {
static final _emptyList = Uint8List.fromList([]);
final StreamIterator<Uint8List> _input;
Uint8List _buffer = _emptyList;
_ByteStreamScanner(Stream<Uint8List> stream)
: _input = StreamIterator(stream);
/// Read a single byte, return zero if stream has ended.
Future<int?> readByte() async {
final bytes = await readBytes(1);
if (bytes.isEmpty) {
return null;
}
return bytes[0];
}
/// Read up to [size] bytes from stream, returns less than [size] bytes if
/// stream ends before [size] bytes are read.
Future<Uint8List> readBytes(int size) async {
RangeError.checkNotNegative(size, 'size');
final out = BytesBuilder(copy: false);
while (size > 0) {
if (_buffer.isEmpty) {
if (!(await _input.moveNext())) {
// Don't attempt to read more data, as there is no more data.
break;
}
_buffer = _input.current;
}
if (_buffer.isNotEmpty) {
if (size < _buffer.length) {
out.add(Uint8List.sublistView(_buffer, 0, size));
_buffer = Uint8List.sublistView(_buffer, size);
break;
}
out.add(_buffer);
size -= _buffer.length;
_buffer = _emptyList;
}
}
return out.toBytes();
}
/// Read until the next `\n` inclusive.
///
/// Throws [RedisConnectionException] if [maxSize] is exceeded.
Future<Uint8List> readLine({int? maxSize}) async {
if (maxSize != null) {
RangeError.checkNotNegative(maxSize, 'maxSize');
}
final out = BytesBuilder(copy: false);
while (true) {
if (_buffer.isEmpty) {
if (!(await _input.moveNext())) {
// Don't attempt to read more data, as there is no more data.
break;
}
_buffer = _input.current;
}
if (_buffer.isNotEmpty) {
final i = _buffer.indexOf('\n'.codeUnitAt(0));
if (i != -1) {
out.add(Uint8List.sublistView(_buffer, 0, i + 1));
_buffer = Uint8List.sublistView(_buffer, i + 1);
break;
}
out.add(_buffer);
_buffer = _emptyList;
}
if (maxSize != null && out.length > maxSize) {
throw RedisConnectionException._('Line exceeds maxSize: $maxSize');
}
}
return out.toBytes();
}
/// Cancel underlying stream, ending it prematurely.
Future<void> cancel() async => await _input.cancel();
}