Skip to content

Commit ad2c5f0

Browse files
committed
refactor: store cache
1 parent c23aebd commit ad2c5f0

File tree

14 files changed

+889
-115
lines changed

14 files changed

+889
-115
lines changed

server/profiler/profiler.go

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package profiler
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"log/slog"
7+
"net/http"
8+
"net/http/pprof"
9+
"runtime"
10+
"time"
11+
12+
"github.com/labstack/echo/v4"
13+
)
14+
15+
// Profiler provides HTTP endpoints for memory profiling
16+
type Profiler struct {
17+
memStatsLogInterval time.Duration
18+
}
19+
20+
// NewProfiler creates a new profiler
21+
func NewProfiler() *Profiler {
22+
return &Profiler{
23+
memStatsLogInterval: 1 * time.Minute,
24+
}
25+
}
26+
27+
// RegisterRoutes adds profiling endpoints to the Echo server
28+
func (p *Profiler) RegisterRoutes(e *echo.Echo) {
29+
// Register pprof handlers
30+
g := e.Group("/debug/pprof")
31+
g.GET("", echo.WrapHandler(http.HandlerFunc(pprof.Index)))
32+
g.GET("/cmdline", echo.WrapHandler(http.HandlerFunc(pprof.Cmdline)))
33+
g.GET("/profile", echo.WrapHandler(http.HandlerFunc(pprof.Profile)))
34+
g.POST("/symbol", echo.WrapHandler(http.HandlerFunc(pprof.Symbol)))
35+
g.GET("/symbol", echo.WrapHandler(http.HandlerFunc(pprof.Symbol)))
36+
g.GET("/trace", echo.WrapHandler(http.HandlerFunc(pprof.Trace)))
37+
g.GET("/allocs", echo.WrapHandler(http.HandlerFunc(pprof.Handler("allocs").ServeHTTP)))
38+
g.GET("/block", echo.WrapHandler(http.HandlerFunc(pprof.Handler("block").ServeHTTP)))
39+
g.GET("/goroutine", echo.WrapHandler(http.HandlerFunc(pprof.Handler("goroutine").ServeHTTP)))
40+
g.GET("/heap", echo.WrapHandler(http.HandlerFunc(pprof.Handler("heap").ServeHTTP)))
41+
g.GET("/mutex", echo.WrapHandler(http.HandlerFunc(pprof.Handler("mutex").ServeHTTP)))
42+
g.GET("/threadcreate", echo.WrapHandler(http.HandlerFunc(pprof.Handler("threadcreate").ServeHTTP)))
43+
44+
// Add a custom memory stats endpoint
45+
g.GET("/memstats", func(c echo.Context) error {
46+
var m runtime.MemStats
47+
runtime.ReadMemStats(&m)
48+
return c.JSON(http.StatusOK, map[string]interface{}{
49+
"alloc": m.Alloc,
50+
"totalAlloc": m.TotalAlloc,
51+
"sys": m.Sys,
52+
"numGC": m.NumGC,
53+
"heapAlloc": m.HeapAlloc,
54+
"heapSys": m.HeapSys,
55+
"heapInuse": m.HeapInuse,
56+
"heapObjects": m.HeapObjects,
57+
})
58+
})
59+
}
60+
61+
// StartMemoryMonitor starts a goroutine that periodically logs memory stats
62+
func (p *Profiler) StartMemoryMonitor(ctx context.Context) {
63+
go func() {
64+
ticker := time.NewTicker(p.memStatsLogInterval)
65+
defer ticker.Stop()
66+
67+
// Store previous heap allocation to track growth
68+
var lastHeapAlloc uint64
69+
var lastNumGC uint32
70+
71+
for {
72+
select {
73+
case <-ticker.C:
74+
var m runtime.MemStats
75+
runtime.ReadMemStats(&m)
76+
77+
// Calculate heap growth since last check
78+
heapGrowth := int64(m.HeapAlloc) - int64(lastHeapAlloc)
79+
gcCount := m.NumGC - lastNumGC
80+
81+
slog.Info("memory stats",
82+
"heapAlloc", byteCountIEC(m.HeapAlloc),
83+
"heapSys", byteCountIEC(m.HeapSys),
84+
"heapObjects", m.HeapObjects,
85+
"heapGrowth", byteCountIEC(uint64(heapGrowth)),
86+
"numGoroutine", runtime.NumGoroutine(),
87+
"numGC", m.NumGC,
88+
"gcSince", gcCount,
89+
"nextGC", byteCountIEC(m.NextGC),
90+
"gcPause", time.Duration(m.PauseNs[(m.NumGC+255)%256]).String(),
91+
)
92+
93+
// Track values for next iteration
94+
lastHeapAlloc = m.HeapAlloc
95+
lastNumGC = m.NumGC
96+
97+
// Force GC if memory usage is high to see if objects can be reclaimed
98+
if m.HeapAlloc > 500*1024*1024 { // 500 MB threshold
99+
slog.Info("forcing garbage collection due to high memory usage")
100+
runtime.GC()
101+
}
102+
case <-ctx.Done():
103+
return
104+
}
105+
}
106+
}()
107+
}
108+
109+
// byteCountIEC converts bytes to a human-readable string (MiB, GiB)
110+
func byteCountIEC(b uint64) string {
111+
const unit = 1024
112+
if b < unit {
113+
return fmt.Sprintf("%d B", b)
114+
}
115+
div, exp := uint64(unit), 0
116+
for n := b / unit; n >= unit; n /= unit {
117+
div *= unit
118+
exp++
119+
}
120+
return fmt.Sprintf("%.1f %ciB", float64(b)/float64(div), "KMGTPE"[exp])
121+
}

server/router/api/v1/resource_service.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/usememos/memos/plugin/storage/s3"
2727
v1pb "github.com/usememos/memos/proto/gen/api/v1"
2828
storepb "github.com/usememos/memos/proto/gen/store"
29+
"github.com/usememos/memos/server/profile"
2930
"github.com/usememos/memos/store"
3031
)
3132

@@ -71,7 +72,7 @@ func (s *APIV1Service) CreateResource(ctx context.Context, request *v1pb.CreateR
7172
}
7273
create.Size = int64(size)
7374
create.Blob = request.Resource.Content
74-
if err := SaveResourceBlob(ctx, s.Store, create); err != nil {
75+
if err := SaveResourceBlob(ctx, s.Profile, s.Store, create); err != nil {
7576
return nil, status.Errorf(codes.Internal, "failed to save resource blob: %v", err)
7677
}
7778

@@ -286,8 +287,8 @@ func (s *APIV1Service) convertResourceFromStore(ctx context.Context, resource *s
286287
}
287288

288289
// SaveResourceBlob save the blob of resource based on the storage config.
289-
func SaveResourceBlob(ctx context.Context, s *store.Store, create *store.Resource) error {
290-
workspaceStorageSetting, err := s.GetWorkspaceStorageSetting(ctx)
290+
func SaveResourceBlob(ctx context.Context, profile *profile.Profile, stores *store.Store, create *store.Resource) error {
291+
workspaceStorageSetting, err := stores.GetWorkspaceStorageSetting(ctx)
291292
if err != nil {
292293
return errors.Wrap(err, "Failed to find workspace storage setting")
293294
}
@@ -308,7 +309,7 @@ func SaveResourceBlob(ctx context.Context, s *store.Store, create *store.Resourc
308309
// Ensure the directory exists.
309310
osPath := filepath.FromSlash(internalPath)
310311
if !filepath.IsAbs(osPath) {
311-
osPath = filepath.Join(s.Profile.Data, osPath)
312+
osPath = filepath.Join(profile.Data, osPath)
312313
}
313314
dir := filepath.Dir(osPath)
314315
if err = os.MkdirAll(dir, os.ModePerm); err != nil {

server/runner/memopayload/runner.go

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package memopayload
33
import (
44
"context"
55
"log/slog"
6+
"runtime"
67
"slices"
78

89
"github.com/pkg/errors"
@@ -26,23 +27,52 @@ func NewRunner(store *store.Store) *Runner {
2627

2728
// RunOnce rebuilds the payload of all memos.
2829
func (r *Runner) RunOnce(ctx context.Context) {
29-
memos, err := r.Store.ListMemos(ctx, &store.FindMemo{})
30-
if err != nil {
31-
slog.Error("failed to list memos", "err", err)
32-
return
33-
}
30+
// Process memos in batches to avoid loading all memos into memory at once
31+
const batchSize = 100
32+
offset := 0
33+
processed := 0
3434

35-
for _, memo := range memos {
36-
if err := RebuildMemoPayload(memo); err != nil {
37-
slog.Error("failed to rebuild memo payload", "err", err)
38-
continue
35+
for {
36+
limit := batchSize
37+
memos, err := r.Store.ListMemos(ctx, &store.FindMemo{
38+
Limit: &limit,
39+
Offset: &offset,
40+
})
41+
if err != nil {
42+
slog.Error("failed to list memos", "err", err)
43+
return
3944
}
40-
if err := r.Store.UpdateMemo(ctx, &store.UpdateMemo{
41-
ID: memo.ID,
42-
Payload: memo.Payload,
43-
}); err != nil {
44-
slog.Error("failed to update memo", "err", err)
45+
46+
// Break if no more memos
47+
if len(memos) == 0 {
48+
break
49+
}
50+
51+
// Process batch
52+
batchSuccessCount := 0
53+
for _, memo := range memos {
54+
if err := RebuildMemoPayload(memo); err != nil {
55+
slog.Error("failed to rebuild memo payload", "err", err, "memoID", memo.ID)
56+
continue
57+
}
58+
if err := r.Store.UpdateMemo(ctx, &store.UpdateMemo{
59+
ID: memo.ID,
60+
Payload: memo.Payload,
61+
}); err != nil {
62+
slog.Error("failed to update memo", "err", err, "memoID", memo.ID)
63+
continue
64+
}
65+
batchSuccessCount++
4566
}
67+
68+
processed += len(memos)
69+
slog.Info("Processed memo batch", "batchSize", len(memos), "successCount", batchSuccessCount, "totalProcessed", processed)
70+
71+
// Move to next batch
72+
offset += len(memos)
73+
74+
// Force garbage collection between batches to prevent memory accumulation
75+
runtime.GC()
4676
}
4777
}
4878

server/runner/s3presign/runner.go

Lines changed: 74 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package s3presign
33
import (
44
"context"
55
"log/slog"
6+
"runtime"
67
"time"
78

89
"google.golang.org/protobuf/types/known/timestamppb"
@@ -50,59 +51,88 @@ func (r *Runner) CheckAndPresign(ctx context.Context) {
5051
}
5152

5253
s3StorageType := storepb.ResourceStorageType_S3
53-
resources, err := r.Store.ListResources(ctx, &store.FindResource{
54-
GetBlob: false,
55-
StorageType: &s3StorageType,
56-
})
57-
if err != nil {
58-
return
59-
}
54+
// Limit resources to a reasonable batch size
55+
const batchSize = 100
56+
offset := 0
57+
58+
for {
59+
limit := batchSize
60+
resources, err := r.Store.ListResources(ctx, &store.FindResource{
61+
GetBlob: false,
62+
StorageType: &s3StorageType,
63+
Limit: &limit,
64+
Offset: &offset,
65+
})
66+
if err != nil {
67+
slog.Error("Failed to list resources for presigning", "error", err)
68+
return
69+
}
6070

61-
for _, resource := range resources {
62-
s3ObjectPayload := resource.Payload.GetS3Object()
63-
if s3ObjectPayload == nil {
64-
continue
71+
// Break if no more resources
72+
if len(resources) == 0 {
73+
break
6574
}
6675

67-
if s3ObjectPayload.LastPresignedTime != nil {
68-
// Skip if the presigned URL is still valid for the next 4 days.
69-
// The expiration time is set to 5 days.
70-
if time.Now().Before(s3ObjectPayload.LastPresignedTime.AsTime().Add(4 * 24 * time.Hour)) {
76+
// Process batch of resources
77+
presignCount := 0
78+
for _, resource := range resources {
79+
s3ObjectPayload := resource.Payload.GetS3Object()
80+
if s3ObjectPayload == nil {
7181
continue
7282
}
73-
}
7483

75-
s3Config := workspaceStorageSetting.GetS3Config()
76-
if s3ObjectPayload.S3Config != nil {
77-
s3Config = s3ObjectPayload.S3Config
78-
}
79-
if s3Config == nil {
80-
slog.Error("S3 config is not found")
81-
continue
82-
}
84+
if s3ObjectPayload.LastPresignedTime != nil {
85+
// Skip if the presigned URL is still valid for the next 4 days.
86+
// The expiration time is set to 5 days.
87+
if time.Now().Before(s3ObjectPayload.LastPresignedTime.AsTime().Add(4 * 24 * time.Hour)) {
88+
continue
89+
}
90+
}
8391

84-
s3Client, err := s3.NewClient(ctx, s3Config)
85-
if err != nil {
86-
slog.Error("Failed to create S3 client", "error", err)
87-
continue
88-
}
92+
s3Config := workspaceStorageSetting.GetS3Config()
93+
if s3ObjectPayload.S3Config != nil {
94+
s3Config = s3ObjectPayload.S3Config
95+
}
96+
if s3Config == nil {
97+
slog.Error("S3 config is not found")
98+
continue
99+
}
89100

90-
presignURL, err := s3Client.PresignGetObject(ctx, s3ObjectPayload.Key)
91-
if err != nil {
92-
return
93-
}
94-
s3ObjectPayload.S3Config = s3Config
95-
s3ObjectPayload.LastPresignedTime = timestamppb.New(time.Now())
96-
if err := r.Store.UpdateResource(ctx, &store.UpdateResource{
97-
ID: resource.ID,
98-
Reference: &presignURL,
99-
Payload: &storepb.ResourcePayload{
100-
Payload: &storepb.ResourcePayload_S3Object_{
101-
S3Object: s3ObjectPayload,
101+
s3Client, err := s3.NewClient(ctx, s3Config)
102+
if err != nil {
103+
slog.Error("Failed to create S3 client", "error", err)
104+
continue
105+
}
106+
107+
presignURL, err := s3Client.PresignGetObject(ctx, s3ObjectPayload.Key)
108+
if err != nil {
109+
slog.Error("Failed to presign URL", "error", err, "resourceID", resource.ID)
110+
continue
111+
}
112+
113+
s3ObjectPayload.S3Config = s3Config
114+
s3ObjectPayload.LastPresignedTime = timestamppb.New(time.Now())
115+
if err := r.Store.UpdateResource(ctx, &store.UpdateResource{
116+
ID: resource.ID,
117+
Reference: &presignURL,
118+
Payload: &storepb.ResourcePayload{
119+
Payload: &storepb.ResourcePayload_S3Object_{
120+
S3Object: s3ObjectPayload,
121+
},
102122
},
103-
},
104-
}); err != nil {
105-
return
123+
}); err != nil {
124+
slog.Error("Failed to update resource", "error", err, "resourceID", resource.ID)
125+
continue
126+
}
127+
presignCount++
106128
}
129+
130+
slog.Info("Presigned batch of S3 resources", "batchSize", len(resources), "presigned", presignCount)
131+
132+
// Move to next batch
133+
offset += len(resources)
134+
135+
// Prevent memory accumulation between batches
136+
runtime.GC()
107137
}
108138
}

0 commit comments

Comments
 (0)