3
3
// SPDX-License-Identifier: LGPL-3.0-only
4
4
5
5
const { promisify : p } = require ( 'util' )
6
- const { fromMessageSigil } = require ( 'ssb-uri2' )
6
+ const { fromMessageSigil, toMessageSigil } = require ( 'ssb-uri2' )
7
7
const pull = require ( 'pull-stream' )
8
8
const pullDefer = require ( 'pull-defer' )
9
+ const pullMany = require ( 'pull-many' )
9
10
const pullFlatMerge = require ( 'pull-flat-merge' )
10
11
const Strategy = require ( '@tangle/strategy' )
11
12
const Reduce = require ( '@tangle/reduce' )
12
13
const OverwriteFields = require ( '@tangle/overwrite-fields' )
13
14
const clarify = require ( 'clarify-error' )
14
15
const Butt64 = require ( 'butt64' )
15
16
const isCanonicalBase64 = require ( 'is-canonical-base64' )
16
- const { where, and, type, live, toPullStream } = require ( 'ssb-db2/operators' )
17
+ const {
18
+ where,
19
+ and,
20
+ type,
21
+ live,
22
+ key,
23
+ isDecrypted,
24
+ toPullStream,
25
+ } = require ( 'ssb-db2/operators' )
17
26
const {
18
27
validator : {
19
28
group : {
@@ -29,9 +38,9 @@ const isSubsetOf = require('set.prototype.issubsetof')
29
38
const intersection = require ( 'set.prototype.intersection' )
30
39
31
40
const GetMembers = require ( './get-members' )
32
-
33
- const { groupRecp } = require ( '../operators' )
34
41
const hookClose = require ( '../hook-close' )
42
+ const { groupRecp } = require ( '../operators' )
43
+
35
44
const getTangleUpdates = require ( '../tangles/get-tangle-updates' )
36
45
37
46
const msgPattern = toPattern ( new Butt64 ( 'ssb:message/[a-zA-Z0-9-]+/' , null , 32 ) )
@@ -179,14 +188,16 @@ module.exports = function Epochs(ssb) {
179
188
// then skip all the preferrentEpochs until we get up to the current
180
189
// This is important for listMembers to not send confusing results
181
190
getPreferredEpoch ( groupId , ( err , preferredEpoch ) => {
182
- if ( err ) return deferredSource . abort ( clarify ( err , 'failed to get initial preferred epoch' ) )
183
-
191
+ // if we're live and this fails we don't really mind, just go straight to live
184
192
if ( ! live ) {
185
- deferredSource . resolve ( pull . once ( preferredEpoch ) )
193
+ if ( err ) deferredSource . abort ( clarify ( err , 'failed to get initial preferred epoch' ) )
194
+ else deferredSource . resolve ( pull . once ( preferredEpoch ) )
195
+
186
196
return
187
197
}
188
198
189
- var sync = false
199
+ // if we couldn't get current preferred, we'll just go live
200
+ var sync = ! ! err
190
201
const source = pull (
191
202
epochsReduce . stream ( ssb , groupId , { getters : allGetters , live } ) ,
192
203
pull . asyncMap ( BuildPreferredEpoch ( ssb , groupId ) ) ,
@@ -411,24 +422,58 @@ function epochNodeStream(ssb, groupId, opts = {}) {
411
422
412
423
return deferredSource
413
424
}
414
- function getGroupInit ( ssb , groupId , cb ) {
415
- ssb . box2 . getGroupInfo ( groupId , ( err , info ) => {
416
- // prettier-ignore
417
- if ( err ) return cb ( clarify ( err , 'Failed to get group info for ' + groupId ) )
418
- if ( ! info ) return cb ( new Error ( 'Unknown group' ) )
419
425
420
- // Fetch the tangle root
421
- ssb . db . get ( info . root , ( err , rootVal ) => {
422
- // prettier-ignore
423
- if ( err ) return cb ( clarify ( err , 'Failed to load group root with id ' + info . root ) )
426
+ function getRootVal ( ssb , msgId , cb ) {
427
+ pull (
428
+ pullMany ( [
429
+ ssb . db . query (
430
+ where ( and ( isDecrypted ( 'box2' ) , key ( toMessageSigil ( msgId ) ) ) ) ,
431
+ live ( { old : true } ) ,
432
+ toPullStream ( )
433
+ ) ,
434
+ pull (
435
+ ssb . db . reindexed ( ) ,
436
+ pull . filter ( ( msg ) => fromMessageSigil ( msg . key ) === msgId )
437
+ ) ,
438
+ ] ) ,
439
+ pull . take ( 1 ) ,
440
+ pull . drain (
441
+ ( msg ) => cb ( null , msg . value ) ,
442
+ ( err ) => {
443
+ if ( err ) cb ( Error ( 'Failed getting root msg async' , { cause : err } ) )
444
+ }
445
+ )
446
+ )
447
+ }
424
448
425
- if ( ! isInitRoot ( rootVal ) )
449
+ function getGroupInit ( ssb , groupId , cb ) {
450
+ pull (
451
+ ssb . box2 . getGroupInfoUpdates ( groupId ) ,
452
+ pull . take ( 1 ) ,
453
+ pull . drain (
454
+ ( info ) => {
455
+ if ( ! info ) return cb ( new Error ( 'Unknown group' ) )
456
+
457
+ // Fetch the tangle root
458
+ // This is based on a live stream since sometimes the group info comes in very quick, before the root msg has had time to get put into the db
459
+ // and sometimes it might take ages (we haven't gotten that feed yet)
460
+ getRootVal ( ssb , info . root , ( err , rootVal ) => {
461
+ // prettier-ignore
462
+ if ( err ) return cb ( clarify ( err , 'Failed to load group root with id ' + info . root ) )
463
+
464
+ if ( ! isInitRoot ( rootVal ) )
465
+ // prettier-ignore
466
+ return cb ( clarify ( new Error ( isInitRoot . string ) , 'Malformed group/init root message' ) )
467
+
468
+ cb ( null , { key : info . root , value : rootVal } )
469
+ } )
470
+ } ,
471
+ ( err ) => {
426
472
// prettier-ignore
427
- return cb ( clarify ( new Error ( isInitRoot . string ) , 'Malformed group/init root message' ) )
428
-
429
- cb ( null , { key : info . root , value : rootVal } )
430
- } )
431
- } )
473
+ if ( err ) return cb ( clarify ( err , 'Failed to get group info for ' + groupId ) )
474
+ }
475
+ )
476
+ )
432
477
}
433
478
434
479
/* HELPERS */
0 commit comments