Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions internal/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ func Run(ctx context.Context, args []string, stdout, stderr io.Writer) error {
}

func (r *runtime) dispatch(args []string) error {
if hasHelpFlag(args[1:]) {
printUsage(r.stdout)
return nil
}
switch args[0] {
case "metadata":
return r.print(controlManifest())
Expand Down Expand Up @@ -118,6 +122,15 @@ func (r *runtime) dispatch(args []string) error {
}
}

func hasHelpFlag(args []string) bool {
for _, arg := range args {
if arg == "--help" || arg == "-h" {
return true
}
}
return false
}

func (r *runtime) withStore(fn func(*store.Store) error) error {
st, err := store.Open(r.ctx, r.dbPath)
if err != nil {
Expand Down
15 changes: 15 additions & 0 deletions internal/cli/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,21 @@ func TestUsageDocumentsMediaFetchOptIn(t *testing.T) {
}
}

func TestSubcommandHelpPrintsUsage(t *testing.T) {
t.Parallel()
var out, errOut bytes.Buffer
err := Run(context.Background(), []string{"import", "--help"}, &out, &errOut)
if err != nil {
t.Fatalf("help returned error: %v stderr=%s", err, errOut.String())
}
if errOut.Len() != 0 {
t.Fatalf("stderr = %q, want empty", errOut.String())
}
if !strings.Contains(out.String(), "telecrawl [--json] import") {
t.Fatalf("help output missing import usage:\n%s", out.String())
}
}

func accountScopedImportResult(label string) telegramdesktop.ImportResult {
now := time.Unix(1_800_000_000, 0).UTC()
return telegramdesktop.ImportResult{
Expand Down
48 changes: 48 additions & 0 deletions internal/telegramdesktop/importer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,18 @@ import (
"crypto/cipher"
"crypto/sha512"
"encoding/binary"
"errors"
"io"
"os"
"path/filepath"
"reflect"
"strings"
"testing"
"time"

querymessages "github.com/gotd/td/telegram/query/messages"
"github.com/gotd/td/tg"
"github.com/gotd/td/tgerr"
"github.com/openclaw/telecrawl/internal/store"
postboxpkg "github.com/openclaw/telecrawl/internal/telegramdesktop/postbox"
)
Expand Down Expand Up @@ -354,6 +357,51 @@ func TestTDataExistingMediaRefsRequireFetchAndSameSource(t *testing.T) {
}
}

func TestTDataWithFloodWaitRetriesAfterDelay(t *testing.T) {
t.Parallel()
var waits []time.Duration
attempts := 0
err := tdataWithFloodWaitSleep(context.Background(), func(_ context.Context, delay time.Duration) error {
waits = append(waits, delay)
return nil
}, func(context.Context) error {
attempts++
if attempts == 1 {
return tgerr.New(420, "FLOOD_WAIT_2")
}
return nil
})
if err != nil {
t.Fatal(err)
}
if attempts != 2 {
t.Fatalf("attempts = %d, want 2", attempts)
}
if !reflect.DeepEqual(waits, []time.Duration{3 * time.Second}) {
t.Fatalf("waits = %v, want [3s]", waits)
}
}

func TestTDataWithFloodWaitStopsWhenContextIsDone(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
attempts := 0
err := tdataWithFloodWaitSleep(ctx, func(context.Context, time.Duration) error {
cancel()
return ctx.Err()
}, func(context.Context) error {
attempts++
return tgerr.New(420, "FLOOD_WAIT_2")
})
if !errors.Is(err, context.Canceled) {
t.Fatalf("err = %v, want context canceled", err)
}
if attempts != 1 {
t.Fatalf("attempts = %d, want 1", attempts)
}
}

func TestTDataReplyTopicMapping(t *testing.T) {
t.Parallel()
reply := &tg.MessageReplyHeader{}
Expand Down
166 changes: 110 additions & 56 deletions internal/telegramdesktop/tdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/gotd/td/telegram/query/dialogs"
querymessages "github.com/gotd/td/telegram/query/messages"
"github.com/gotd/td/tg"
"github.com/gotd/td/tgerr"
"github.com/openclaw/telecrawl/internal/store"
"golang.org/x/crypto/blake2b"
)
Expand Down Expand Up @@ -179,40 +180,42 @@ func (s *tdataImportSession) loadDialogs(ctx context.Context) ([]tdataDialog, er
limit = 0
}
count := 0
err := query.GetDialogs(s.raw).BatchSize(tdataBatchSize).ForEach(ctx, func(ctx context.Context, elem dialogs.Elem) error {
if elem.Deleted() {
return nil
}
chatID := tdataPeerIDString(elem.Dialog.GetPeer(), s.selfID)
if chatID == "" {
return nil
}
if _, ok := seen[chatID]; ok {
return nil
}
seen[chatID] = struct{}{}
info := tdataPeerInfo(elem.Dialog.GetPeer(), elem.Entities, s.selfID)
folderID := tdataDialogFolderID(elem.Dialog)
if chatFilter != "" && !tdataChatFilterMatches(chatID, chatFilter) {
err := tdataWithFloodWait(ctx, func(ctx context.Context) error {
return query.GetDialogs(s.raw).BatchSize(tdataBatchSize).ForEach(ctx, func(ctx context.Context, elem dialogs.Elem) error {
if elem.Deleted() {
return nil
}
chatID := tdataPeerIDString(elem.Dialog.GetPeer(), s.selfID)
if chatID == "" {
return nil
}
if _, ok := seen[chatID]; ok {
return nil
}
seen[chatID] = struct{}{}
info := tdataPeerInfo(elem.Dialog.GetPeer(), elem.Entities, s.selfID)
folderID := tdataDialogFolderID(elem.Dialog)
if chatFilter != "" && !tdataChatFilterMatches(chatID, chatFilter) {
return nil
}
out = append(out, tdataDialog{
elem: elem,
chatID: chatID,
chatName: firstNonEmpty(info.name, chatID),
kind: firstNonEmpty(info.kind, "unknown"),
username: info.username,
folderID: folderID,
forum: info.forum,
})
count++
if chatFilter != "" {
return errTDataStop
}
if limit > 0 && count >= limit {
return errTDataStop
}
return nil
}
out = append(out, tdataDialog{
elem: elem,
chatID: chatID,
chatName: firstNonEmpty(info.name, chatID),
kind: firstNonEmpty(info.kind, "unknown"),
username: info.username,
folderID: folderID,
forum: info.forum,
})
count++
if chatFilter != "" {
return errTDataStop
}
if limit > 0 && count >= limit {
return errTDataStop
}
return nil
})
if errors.Is(err, errTDataStop) {
err = nil
Expand Down Expand Up @@ -293,18 +296,26 @@ func (s *tdataImportSession) loadMessages(ctx context.Context, row tdataDialog)
limit := s.opts.MessagesLimit
count := 0
var out []store.Message
err := row.elem.Messages(s.raw).BatchSize(tdataBatchSize).ForEach(ctx, func(ctx context.Context, elem querymessages.Elem) error {
msg := elem.Msg
if msg.GetID() == 0 {
seen := make(map[int]struct{})
err := tdataWithFloodWait(ctx, func(ctx context.Context) error {
return row.elem.Messages(s.raw).BatchSize(tdataBatchSize).ForEach(ctx, func(ctx context.Context, elem querymessages.Elem) error {
msg := elem.Msg
msgID := msg.GetID()
if msgID == 0 {
return nil
}
if _, ok := seen[msgID]; ok {
return nil
}
seen[msgID] = struct{}{}
converted := s.convertMessage(ctx, row, elem)
out = append(out, converted)
count++
if limit > 0 && count >= limit {
return errTDataStop
}
return nil
}
converted := s.convertMessage(ctx, row, elem)
out = append(out, converted)
count++
if limit > 0 && count >= limit {
return errTDataStop
}
return nil
})
})
if errors.Is(err, errTDataStop) {
err = nil
Expand Down Expand Up @@ -509,7 +520,12 @@ func tdataLargestPhotoThumbSize(photo *tg.Photo) string {
}

func (s *tdataImportSession) loadFolders(ctx context.Context) ([]store.Folder, []store.FolderChat) {
result, err := s.raw.MessagesGetDialogFilters(ctx)
var result *tg.MessagesDialogFilters
err := tdataWithFloodWait(ctx, func(ctx context.Context) error {
var callErr error
result, callErr = s.raw.MessagesGetDialogFilters(ctx)
return callErr
})
if err != nil || result == nil {
return nil, nil
}
Expand All @@ -532,18 +548,20 @@ func (s *tdataImportSession) loadFolders(ctx context.Context) ([]store.Folder, [
}
}
if id, err := strconv.Atoi(folder.ID); err == nil && id != 0 {
_ = query.GetDialogs(s.raw).FolderID(id).BatchSize(tdataBatchSize).ForEach(ctx, func(ctx context.Context, elem dialogs.Elem) error {
chatID := tdataPeerIDString(elem.Dialog.GetPeer(), s.selfID)
if chatID == "" {
_ = tdataWithFloodWait(ctx, func(ctx context.Context) error {
return query.GetDialogs(s.raw).FolderID(id).BatchSize(tdataBatchSize).ForEach(ctx, func(ctx context.Context, elem dialogs.Elem) error {
chatID := tdataPeerIDString(elem.Dialog.GetPeer(), s.selfID)
if chatID == "" {
return nil
}
set := memberships[folder.ID]
if set == nil {
set = make(map[string]struct{})
memberships[folder.ID] = set
}
set[chatID] = struct{}{}
return nil
}
set := memberships[folder.ID]
if set == nil {
set = make(map[string]struct{})
memberships[folder.ID] = set
}
set[chatID] = struct{}{}
return nil
})
})
}
}
Expand Down Expand Up @@ -585,7 +603,12 @@ func (s *tdataImportSession) loadTopics(ctx context.Context, row tdataDialog) []
Limit: tdataBatchSize,
}
for {
result, err := s.raw.MessagesGetForumTopics(ctx, &req)
var result *tg.MessagesForumTopics
err := tdataWithFloodWait(ctx, func(ctx context.Context) error {
var callErr error
result, callErr = s.raw.MessagesGetForumTopics(ctx, &req)
return callErr
})
if err != nil || result == nil || len(result.Topics) == 0 {
return out
}
Expand Down Expand Up @@ -628,6 +651,37 @@ func (s *tdataImportSession) loadTopics(ctx context.Context, row tdataDialog) []
}
}

func tdataWithFloodWait(ctx context.Context, fn func(context.Context) error) error {
return tdataWithFloodWaitSleep(ctx, tdataSleep, fn)
}

func tdataWithFloodWaitSleep(ctx context.Context, sleep func(context.Context, time.Duration) error, fn func(context.Context) error) error {
for {
err := fn(ctx)
if err == nil {
return nil
}
delay, ok := tgerr.AsFloodWait(err)
if !ok {
return err
}
if err := sleep(ctx, delay+time.Second); err != nil {
return err
}
}
}

func tdataSleep(ctx context.Context, delay time.Duration) error {
timer := time.NewTimer(delay)
defer timer.Stop()
select {
case <-timer.C:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

type tdataPeerDetails struct {
kind string
name string
Expand Down