diff --git a/internal/cli/cli.go b/internal/cli/cli.go index 73f0986..610e453 100644 --- a/internal/cli/cli.go +++ b/internal/cli/cli.go @@ -88,6 +88,10 @@ func Run(ctx context.Context, args []string, stdout, stderr io.Writer) error { } func (r *runtime) dispatch(args []string) error { + if hasHelpFlag(args[1:]) { + printUsage(r.stdout) + return nil + } switch args[0] { case "metadata": return r.print(controlManifest()) @@ -118,6 +122,15 @@ func (r *runtime) dispatch(args []string) error { } } +func hasHelpFlag(args []string) bool { + for _, arg := range args { + if arg == "--help" || arg == "-h" { + return true + } + } + return false +} + func (r *runtime) withStore(fn func(*store.Store) error) error { st, err := store.Open(r.ctx, r.dbPath) if err != nil { diff --git a/internal/cli/cli_test.go b/internal/cli/cli_test.go index f81461a..c15dc32 100644 --- a/internal/cli/cli_test.go +++ b/internal/cli/cli_test.go @@ -433,6 +433,21 @@ func TestUsageDocumentsMediaFetchOptIn(t *testing.T) { } } +func TestSubcommandHelpPrintsUsage(t *testing.T) { + t.Parallel() + var out, errOut bytes.Buffer + err := Run(context.Background(), []string{"import", "--help"}, &out, &errOut) + if err != nil { + t.Fatalf("help returned error: %v stderr=%s", err, errOut.String()) + } + if errOut.Len() != 0 { + t.Fatalf("stderr = %q, want empty", errOut.String()) + } + if !strings.Contains(out.String(), "telecrawl [--json] import") { + t.Fatalf("help output missing import usage:\n%s", out.String()) + } +} + func accountScopedImportResult(label string) telegramdesktop.ImportResult { now := time.Unix(1_800_000_000, 0).UTC() return telegramdesktop.ImportResult{ diff --git a/internal/telegramdesktop/importer_test.go b/internal/telegramdesktop/importer_test.go index 9c15121..234a317 100644 --- a/internal/telegramdesktop/importer_test.go +++ b/internal/telegramdesktop/importer_test.go @@ -6,15 +6,18 @@ import ( "crypto/cipher" "crypto/sha512" "encoding/binary" + "errors" "io" "os" "path/filepath" "reflect" "strings" "testing" + "time" querymessages "github.com/gotd/td/telegram/query/messages" "github.com/gotd/td/tg" + "github.com/gotd/td/tgerr" "github.com/openclaw/telecrawl/internal/store" postboxpkg "github.com/openclaw/telecrawl/internal/telegramdesktop/postbox" ) @@ -354,6 +357,51 @@ func TestTDataExistingMediaRefsRequireFetchAndSameSource(t *testing.T) { } } +func TestTDataWithFloodWaitRetriesAfterDelay(t *testing.T) { + t.Parallel() + var waits []time.Duration + attempts := 0 + err := tdataWithFloodWaitSleep(context.Background(), func(_ context.Context, delay time.Duration) error { + waits = append(waits, delay) + return nil + }, func(context.Context) error { + attempts++ + if attempts == 1 { + return tgerr.New(420, "FLOOD_WAIT_2") + } + return nil + }) + if err != nil { + t.Fatal(err) + } + if attempts != 2 { + t.Fatalf("attempts = %d, want 2", attempts) + } + if !reflect.DeepEqual(waits, []time.Duration{3 * time.Second}) { + t.Fatalf("waits = %v, want [3s]", waits) + } +} + +func TestTDataWithFloodWaitStopsWhenContextIsDone(t *testing.T) { + t.Parallel() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + attempts := 0 + err := tdataWithFloodWaitSleep(ctx, func(context.Context, time.Duration) error { + cancel() + return ctx.Err() + }, func(context.Context) error { + attempts++ + return tgerr.New(420, "FLOOD_WAIT_2") + }) + if !errors.Is(err, context.Canceled) { + t.Fatalf("err = %v, want context canceled", err) + } + if attempts != 1 { + t.Fatalf("attempts = %d, want 1", attempts) + } +} + func TestTDataReplyTopicMapping(t *testing.T) { t.Parallel() reply := &tg.MessageReplyHeader{} diff --git a/internal/telegramdesktop/tdata.go b/internal/telegramdesktop/tdata.go index 26595e3..533d61e 100644 --- a/internal/telegramdesktop/tdata.go +++ b/internal/telegramdesktop/tdata.go @@ -23,6 +23,7 @@ import ( "github.com/gotd/td/telegram/query/dialogs" querymessages "github.com/gotd/td/telegram/query/messages" "github.com/gotd/td/tg" + "github.com/gotd/td/tgerr" "github.com/openclaw/telecrawl/internal/store" "golang.org/x/crypto/blake2b" ) @@ -179,40 +180,42 @@ func (s *tdataImportSession) loadDialogs(ctx context.Context) ([]tdataDialog, er limit = 0 } count := 0 - err := query.GetDialogs(s.raw).BatchSize(tdataBatchSize).ForEach(ctx, func(ctx context.Context, elem dialogs.Elem) error { - if elem.Deleted() { - return nil - } - chatID := tdataPeerIDString(elem.Dialog.GetPeer(), s.selfID) - if chatID == "" { - return nil - } - if _, ok := seen[chatID]; ok { - return nil - } - seen[chatID] = struct{}{} - info := tdataPeerInfo(elem.Dialog.GetPeer(), elem.Entities, s.selfID) - folderID := tdataDialogFolderID(elem.Dialog) - if chatFilter != "" && !tdataChatFilterMatches(chatID, chatFilter) { + err := tdataWithFloodWait(ctx, func(ctx context.Context) error { + return query.GetDialogs(s.raw).BatchSize(tdataBatchSize).ForEach(ctx, func(ctx context.Context, elem dialogs.Elem) error { + if elem.Deleted() { + return nil + } + chatID := tdataPeerIDString(elem.Dialog.GetPeer(), s.selfID) + if chatID == "" { + return nil + } + if _, ok := seen[chatID]; ok { + return nil + } + seen[chatID] = struct{}{} + info := tdataPeerInfo(elem.Dialog.GetPeer(), elem.Entities, s.selfID) + folderID := tdataDialogFolderID(elem.Dialog) + if chatFilter != "" && !tdataChatFilterMatches(chatID, chatFilter) { + return nil + } + out = append(out, tdataDialog{ + elem: elem, + chatID: chatID, + chatName: firstNonEmpty(info.name, chatID), + kind: firstNonEmpty(info.kind, "unknown"), + username: info.username, + folderID: folderID, + forum: info.forum, + }) + count++ + if chatFilter != "" { + return errTDataStop + } + if limit > 0 && count >= limit { + return errTDataStop + } return nil - } - out = append(out, tdataDialog{ - elem: elem, - chatID: chatID, - chatName: firstNonEmpty(info.name, chatID), - kind: firstNonEmpty(info.kind, "unknown"), - username: info.username, - folderID: folderID, - forum: info.forum, }) - count++ - if chatFilter != "" { - return errTDataStop - } - if limit > 0 && count >= limit { - return errTDataStop - } - return nil }) if errors.Is(err, errTDataStop) { err = nil @@ -293,18 +296,26 @@ func (s *tdataImportSession) loadMessages(ctx context.Context, row tdataDialog) limit := s.opts.MessagesLimit count := 0 var out []store.Message - err := row.elem.Messages(s.raw).BatchSize(tdataBatchSize).ForEach(ctx, func(ctx context.Context, elem querymessages.Elem) error { - msg := elem.Msg - if msg.GetID() == 0 { + seen := make(map[int]struct{}) + err := tdataWithFloodWait(ctx, func(ctx context.Context) error { + return row.elem.Messages(s.raw).BatchSize(tdataBatchSize).ForEach(ctx, func(ctx context.Context, elem querymessages.Elem) error { + msg := elem.Msg + msgID := msg.GetID() + if msgID == 0 { + return nil + } + if _, ok := seen[msgID]; ok { + return nil + } + seen[msgID] = struct{}{} + converted := s.convertMessage(ctx, row, elem) + out = append(out, converted) + count++ + if limit > 0 && count >= limit { + return errTDataStop + } return nil - } - converted := s.convertMessage(ctx, row, elem) - out = append(out, converted) - count++ - if limit > 0 && count >= limit { - return errTDataStop - } - return nil + }) }) if errors.Is(err, errTDataStop) { err = nil @@ -509,7 +520,12 @@ func tdataLargestPhotoThumbSize(photo *tg.Photo) string { } func (s *tdataImportSession) loadFolders(ctx context.Context) ([]store.Folder, []store.FolderChat) { - result, err := s.raw.MessagesGetDialogFilters(ctx) + var result *tg.MessagesDialogFilters + err := tdataWithFloodWait(ctx, func(ctx context.Context) error { + var callErr error + result, callErr = s.raw.MessagesGetDialogFilters(ctx) + return callErr + }) if err != nil || result == nil { return nil, nil } @@ -532,18 +548,20 @@ func (s *tdataImportSession) loadFolders(ctx context.Context) ([]store.Folder, [ } } if id, err := strconv.Atoi(folder.ID); err == nil && id != 0 { - _ = query.GetDialogs(s.raw).FolderID(id).BatchSize(tdataBatchSize).ForEach(ctx, func(ctx context.Context, elem dialogs.Elem) error { - chatID := tdataPeerIDString(elem.Dialog.GetPeer(), s.selfID) - if chatID == "" { + _ = tdataWithFloodWait(ctx, func(ctx context.Context) error { + return query.GetDialogs(s.raw).FolderID(id).BatchSize(tdataBatchSize).ForEach(ctx, func(ctx context.Context, elem dialogs.Elem) error { + chatID := tdataPeerIDString(elem.Dialog.GetPeer(), s.selfID) + if chatID == "" { + return nil + } + set := memberships[folder.ID] + if set == nil { + set = make(map[string]struct{}) + memberships[folder.ID] = set + } + set[chatID] = struct{}{} return nil - } - set := memberships[folder.ID] - if set == nil { - set = make(map[string]struct{}) - memberships[folder.ID] = set - } - set[chatID] = struct{}{} - return nil + }) }) } } @@ -585,7 +603,12 @@ func (s *tdataImportSession) loadTopics(ctx context.Context, row tdataDialog) [] Limit: tdataBatchSize, } for { - result, err := s.raw.MessagesGetForumTopics(ctx, &req) + var result *tg.MessagesForumTopics + err := tdataWithFloodWait(ctx, func(ctx context.Context) error { + var callErr error + result, callErr = s.raw.MessagesGetForumTopics(ctx, &req) + return callErr + }) if err != nil || result == nil || len(result.Topics) == 0 { return out } @@ -628,6 +651,37 @@ func (s *tdataImportSession) loadTopics(ctx context.Context, row tdataDialog) [] } } +func tdataWithFloodWait(ctx context.Context, fn func(context.Context) error) error { + return tdataWithFloodWaitSleep(ctx, tdataSleep, fn) +} + +func tdataWithFloodWaitSleep(ctx context.Context, sleep func(context.Context, time.Duration) error, fn func(context.Context) error) error { + for { + err := fn(ctx) + if err == nil { + return nil + } + delay, ok := tgerr.AsFloodWait(err) + if !ok { + return err + } + if err := sleep(ctx, delay+time.Second); err != nil { + return err + } + } +} + +func tdataSleep(ctx context.Context, delay time.Duration) error { + timer := time.NewTimer(delay) + defer timer.Stop() + select { + case <-timer.C: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + type tdataPeerDetails struct { kind string name string