Skip to content

Commit 7291e67

Browse files
authored
fix(llc, persistence): improve sync reliability and error handling (#2390)
1 parent 6fb8bc8 commit 7291e67

File tree

10 files changed

+219
-37
lines changed

10 files changed

+219
-37
lines changed

packages/stream_chat/CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
## Upcoming
22

3+
🐞 Fixed
4+
5+
- Improved sync reliability and error handling with enhanced `lastSyncAt` initialization, 400
6+
error recovery, and automatic flushing of stale persistence data after 30 days of inactivity.
7+
38
✅ Added
49

510
- Added support for `Channel.messageCount` field.

packages/stream_chat/lib/src/client/client.dart

Lines changed: 33 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import 'dart:async';
22

3+
import 'package:collection/collection.dart';
34
import 'package:dio/dio.dart';
45
import 'package:logging/logging.dart';
56
import 'package:meta/meta.dart';
@@ -186,9 +187,6 @@ class StreamChatClient {
186187

187188
late final RetryPolicy _retryPolicy;
188189

189-
/// the last dateTime at the which all the channels were synced
190-
DateTime? _lastSyncedAt;
191-
192190
/// The retry policy options getter
193191
RetryPolicy get retryPolicy => _retryPolicy;
194192

@@ -519,25 +517,17 @@ class StreamChatClient {
519517

520518
if (connectionRecovered) {
521519
// connection recovered
522-
final cids = state.channels.keys.toList(growable: false);
520+
final cids = [...state.channels.keys.toSet()];
523521
if (cids.isNotEmpty) {
524522
await queryChannelsOnline(
525523
filter: Filter.in_('cid', cids),
526524
paginationParams: const PaginationParams(limit: 30),
527525
);
528-
if (persistenceEnabled) {
529-
await sync(cids: cids, lastSyncAt: _lastSyncedAt);
530-
}
531-
} else {
532-
// channels are empty, assuming it's a fresh start
533-
// and making sure `lastSyncAt` is initialized
534-
if (persistenceEnabled) {
535-
final lastSyncAt = await chatPersistenceClient?.getLastSyncAt();
536-
if (lastSyncAt == null) {
537-
await chatPersistenceClient?.updateLastSyncAt(DateTime.now());
538-
}
539-
}
526+
527+
// Sync the persistence client if available
528+
if (persistenceEnabled) await sync(cids: cids);
540529
}
530+
541531
handleEvent(Event(
542532
type: EventType.connectionRecovered,
543533
online: true,
@@ -569,34 +559,45 @@ class StreamChatClient {
569559
Future<void> sync({List<String>? cids, DateTime? lastSyncAt}) {
570560
return _syncLock.synchronized(() async {
571561
final channels = cids ?? await chatPersistenceClient?.getChannelCids();
572-
if (channels == null || channels.isEmpty) {
573-
return;
574-
}
562+
if (channels == null || channels.isEmpty) return;
575563

576564
final syncAt = lastSyncAt ?? await chatPersistenceClient?.getLastSyncAt();
577565
if (syncAt == null) {
578-
return;
566+
logger.info('Fresh sync start: lastSyncAt initialized to now.');
567+
return chatPersistenceClient?.updateLastSyncAt(DateTime.now());
579568
}
580569

581570
try {
571+
logger.info('Syncing events since $syncAt for channels: $channels');
572+
582573
final res = await _chatApi.general.sync(channels, syncAt);
583-
final events = res.events
584-
..sort((a, b) => a.createdAt.compareTo(b.createdAt));
574+
final events = res.events.sorted(
575+
(a, b) => a.createdAt.compareTo(b.createdAt),
576+
);
585577

586578
for (final event in events) {
587-
logger.fine('event.type: ${event.type}');
588-
final messageText = event.message?.text;
589-
if (messageText != null) {
590-
logger.fine('event.message.text: $messageText');
591-
}
579+
logger.fine('Syncing event: ${event.type}');
592580
handleEvent(event);
593581
}
594582

595-
final now = DateTime.now();
596-
_lastSyncedAt = now;
597-
chatPersistenceClient?.updateLastSyncAt(now);
598-
} catch (e, stk) {
599-
logger.severe('Error during sync', e, stk);
583+
final updatedSyncAt = events.lastOrNull?.createdAt ?? DateTime.now();
584+
return chatPersistenceClient?.updateLastSyncAt(updatedSyncAt);
585+
} catch (error, stk) {
586+
// If we got a 400 error, it means that either the sync time is too
587+
// old or the channel list is too long or too many events need to be
588+
// synced. In this case, we should just flush the persistence client
589+
// and start over.
590+
if (error is StreamChatNetworkError && error.statusCode == 400) {
591+
logger.warning(
592+
'Failed to sync events due to stale or oversized state. '
593+
'Resetting the persistence client to enable a fresh start.',
594+
);
595+
596+
await chatPersistenceClient?.flush();
597+
return chatPersistenceClient?.updateLastSyncAt(DateTime.now());
598+
}
599+
600+
logger.warning('Error syncing events', error, stk);
600601
}
601602
});
602603
}
@@ -2102,7 +2103,6 @@ class StreamChatClient {
21022103
// resetting state.
21032104
state.dispose();
21042105
state = ClientState(this);
2105-
_lastSyncedAt = null;
21062106

21072107
// resetting credentials.
21082108
_tokenManager.reset();

packages/stream_chat/lib/src/db/chat_persistence_client.dart

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ abstract class ChatPersistenceClient {
3434
/// If [flush] is true, the data will also be deleted
3535
Future<void> disconnect({bool flush = false});
3636

37+
/// Clears all the data stored in the persistence client.
38+
Future<void> flush();
39+
3740
/// Get stored replies by messageId
3841
Future<List<Message>> getReplies(
3942
String parentId, {

packages/stream_chat/test/src/client/client_test.dart

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
// ignore_for_file: avoid_redundant_argument_values
2+
13
import 'package:mocktail/mocktail.dart';
24
import 'package:stream_chat/src/core/http/token.dart';
35
import 'package:stream_chat/stream_chat.dart';
@@ -3668,5 +3670,69 @@ void main() {
36683670
);
36693671
},
36703672
);
3673+
3674+
group('Sync Method Tests', () {
3675+
test(
3676+
'should retrieve data from persistence client and sync successfully',
3677+
() async {
3678+
final cids = ['channel1', 'channel2'];
3679+
final lastSyncAt = DateTime.now().subtract(const Duration(hours: 1));
3680+
final fakeClient = FakePersistenceClient(
3681+
channelCids: cids,
3682+
lastSyncAt: lastSyncAt,
3683+
);
3684+
3685+
client.chatPersistenceClient = fakeClient;
3686+
when(() => api.general.sync(cids, lastSyncAt)).thenAnswer(
3687+
(_) async => SyncResponse()..events = [],
3688+
);
3689+
3690+
await client.sync();
3691+
3692+
verify(() => api.general.sync(cids, lastSyncAt)).called(1);
3693+
3694+
final newLastSyncAt = await fakeClient.getLastSyncAt();
3695+
expect(newLastSyncAt?.isAfter(lastSyncAt), isTrue);
3696+
},
3697+
);
3698+
3699+
test('should set lastSyncAt on first sync when null', () async {
3700+
final fakeClient = FakePersistenceClient(
3701+
channelCids: ['channel1'],
3702+
lastSyncAt: null,
3703+
);
3704+
3705+
client.chatPersistenceClient = fakeClient;
3706+
3707+
await client.sync();
3708+
3709+
expectLater(fakeClient.getLastSyncAt(), completion(isNotNull));
3710+
verifyNever(() => api.general.sync(any(), any()));
3711+
});
3712+
3713+
test('should flush persistence client on 400 error', () async {
3714+
final cids = ['channel1'];
3715+
final lastSyncAt = DateTime.now().subtract(const Duration(hours: 1));
3716+
final fakeClient = FakePersistenceClient(
3717+
channelCids: cids,
3718+
lastSyncAt: lastSyncAt,
3719+
);
3720+
3721+
client.chatPersistenceClient = fakeClient;
3722+
when(() => api.general.sync(cids, lastSyncAt)).thenThrow(
3723+
StreamChatNetworkError.raw(
3724+
code: 4,
3725+
statusCode: 400,
3726+
message: 'Too many events',
3727+
),
3728+
);
3729+
3730+
await client.sync();
3731+
3732+
expect(await fakeClient.getChannelCids(), isEmpty); // Should be flushed
3733+
3734+
verify(() => api.general.sync(cids, lastSyncAt)).called(1);
3735+
});
3736+
});
36713737
});
36723738
}

packages/stream_chat/test/src/db/chat_persistence_client_test.dart

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ class TestPersistenceClient extends ChatPersistenceClient {
6767
@override
6868
Future<void> disconnect({bool flush = false}) => throw UnimplementedError();
6969

70+
@override
71+
Future<void> flush() => throw UnimplementedError();
72+
7073
@override
7174
Future<ChannelModel?> getChannelByCid(String cid) async =>
7275
ChannelModel(cid: cid);

packages/stream_chat/test/src/fakes.dart

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,63 @@ class FakeTokenManager extends Fake implements TokenManager {
4343

4444
class FakeMultiPartFile extends Fake implements MultipartFile {}
4545

46+
/// Fake persistence client for testing persistence client reliability features
47+
class FakePersistenceClient extends Fake implements ChatPersistenceClient {
48+
FakePersistenceClient({
49+
DateTime? lastSyncAt,
50+
List<String>? channelCids,
51+
}) : _lastSyncAt = lastSyncAt,
52+
_channelCids = channelCids ?? [];
53+
54+
String? _userId;
55+
bool _isConnected = false;
56+
DateTime? _lastSyncAt;
57+
List<String> _channelCids;
58+
59+
// Track method calls for testing
60+
int connectCallCount = 0;
61+
int disconnectCallCount = 0;
62+
63+
@override
64+
bool get isConnected => _isConnected;
65+
66+
@override
67+
String? get userId => _userId;
68+
69+
@override
70+
Future<void> connect(String userId) async {
71+
_userId = userId;
72+
_isConnected = true;
73+
connectCallCount++;
74+
}
75+
76+
@override
77+
Future<void> disconnect({bool flush = false}) async {
78+
if (flush) await this.flush();
79+
80+
_userId = null;
81+
_isConnected = false;
82+
disconnectCallCount++;
83+
}
84+
85+
@override
86+
Future<void> flush() async {
87+
_lastSyncAt = null;
88+
_channelCids = [];
89+
}
90+
91+
@override
92+
Future<DateTime?> getLastSyncAt() async => _lastSyncAt;
93+
94+
@override
95+
Future<void> updateLastSyncAt(DateTime lastSyncAt) async {
96+
_lastSyncAt = lastSyncAt;
97+
}
98+
99+
@override
100+
Future<List<String>> getChannelCids() async => _channelCids;
101+
}
102+
46103
class FakeChatApi extends Fake implements StreamChatApi {
47104
UserApi? _user;
48105

packages/stream_chat_persistence/CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
## Upcoming
22

3+
✅ Added
4+
5+
- Added support for `client.flush()` method to clear database.
36
- Added support for `Channel.messageCount` field.
47

58
## 9.17.0

packages/stream_chat_persistence/lib/src/db/drift_chat_database.dart

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -73,11 +73,18 @@ class DriftChatDatabase extends _$DriftChatDatabase {
7373
);
7474

7575
/// Deletes all the tables
76-
Future<void> flush() => batch((batch) {
77-
allTables.forEach((table) {
78-
delete(table).go();
79-
});
76+
Future<void> flush() async {
77+
await customStatement('PRAGMA foreign_keys = OFF');
78+
try {
79+
await transaction(() async {
80+
for (final table in allTables) {
81+
await delete(table).go();
82+
}
8083
});
84+
} finally {
85+
await customStatement('PRAGMA foreign_keys = ON');
86+
}
87+
}
8188

8289
/// Closes the database instance
8390
Future<void> disconnect() => close();

packages/stream_chat_persistence/lib/src/stream_chat_persistence_client.dart

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,6 +461,13 @@ class StreamChatPersistenceClient extends ChatPersistenceClient {
461461
return db!.transaction(() => super.updateChannelStates(channelStates));
462462
}
463463

464+
@override
465+
Future<void> flush() {
466+
assert(_debugIsConnected, '');
467+
_logger.info('flush');
468+
return db!.flush();
469+
}
470+
464471
@override
465472
Future<void> disconnect({bool flush = false}) async {
466473
_logger.info('disconnect');

packages/stream_chat_persistence/test/stream_chat_persistence_client_test.dart

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,37 @@ void main() {
5454
expect(client.isConnected, false);
5555
});
5656

57+
test('flush', () async {
58+
const userId = 'testUserId';
59+
final client = StreamChatPersistenceClient(logLevel: Level.ALL);
60+
61+
await client.connect(userId, databaseProvider: testDatabaseProvider);
62+
addTearDown(() async => client.disconnect());
63+
64+
final connectionEvent = Event(
65+
type: EventType.healthCheck,
66+
createdAt: DateTime.timestamp(),
67+
me: OwnUser(id: userId, name: 'Test User'),
68+
);
69+
70+
await client.updateConnectionInfo(connectionEvent);
71+
72+
// Add some test data
73+
final testDate = DateTime.now();
74+
await client.updateLastSyncAt(testDate);
75+
76+
// Verify data exists
77+
final lastSyncAtBeforeFlush = await client.getLastSyncAt();
78+
expect(lastSyncAtBeforeFlush, isNotNull);
79+
80+
// Flush the database
81+
await client.flush();
82+
83+
// Verify data is cleared
84+
final lastSyncAtAfterFlush = await client.getLastSyncAt();
85+
expect(lastSyncAtAfterFlush, isNull);
86+
});
87+
5788
test('client function throws stateError if db is not yet connected', () {
5889
final client = StreamChatPersistenceClient(logLevel: Level.ALL);
5990
expect(

0 commit comments

Comments
 (0)