@@ -82,6 +82,153 @@ func getSourceModTimeKey(metadata map[string]string) string {
82
82
return ""
83
83
}
84
84
85
+ func layerDifference (ctx context.Context , sourceClnt , targetClnt Client , opts mirrorOptions ) chan diffMessage {
86
+ diffCh := make (chan diffMessage , 10000 )
87
+
88
+ go func () {
89
+ defer close (diffCh )
90
+
91
+ // Channels to feed items found by BFS into the difference engine
92
+ srcClientCh := make (chan * ClientContent , 1000 )
93
+ tgtClientCh := make (chan * ClientContent , 1000 )
94
+
95
+ // Goroutine to perform BFS on the source
96
+ go func () {
97
+ defer close (srcClientCh )
98
+ // Queue for *relative object prefixes* to explore
99
+ queue := []string {"" } // "" represents the root prefix
100
+
101
+ for len (queue ) > 0 {
102
+ // Dequeue the next relative prefix
103
+ prefix := queue [0 ]
104
+ queue = queue [1 :]
105
+
106
+ // List items at the current prefix level using the relative prefix
107
+ listCtx , listCancel := context .WithCancel (ctx )
108
+ contentsCh := sourceClnt .List (listCtx , ListOptions {
109
+ Recursive : false , // List only the current level
110
+ WithMetadata : opts .isMetadata ,
111
+ ShowDir : DirLast , // Ensure directories are listed
112
+ Prefix : prefix , // Pass the relative prefix
113
+ })
114
+
115
+ for content := range contentsCh {
116
+ select {
117
+ case <- ctx .Done ():
118
+ listCancel ()
119
+ return
120
+ default :
121
+ if content != nil && content .Err != nil {
122
+ srcClientCh <- content
123
+ listCancel ()
124
+ continue
125
+ }
126
+ if content == nil {
127
+ continue
128
+ }
129
+
130
+ // Send the valid content (file or directory) for comparison
131
+ srcClientCh <- content
132
+
133
+ // If it's a directory, queue its *relative object key* for the next level
134
+ if content .Type .IsDir () {
135
+ relativeKey := content .ObjectKey // Get the relative key
136
+ // Prevent infinite loops: don't re-queue the prefix we just listed,
137
+ // especially the root ("") which might list itself as "/" depending on backend.
138
+ // Also check if ObjectKey is populated.
139
+ if relativeKey != "" && relativeKey != prefix {
140
+ // Ensure the key ends with a separator if it's a directory prefix
141
+ // The S3 ListObjects usually returns directory keys ending with '/'
142
+ if ! strings .HasSuffix (relativeKey , string (content .URL .Separator )) {
143
+ // This case might indicate a non-standard directory representation, handle cautiously
144
+ // For standard S3, common prefixes already end in '/'
145
+ // If needed, append separator: relativeKey += string(content.URL.Separator)
146
+ }
147
+ // Add the relative key (prefix) to the queue
148
+ queue = append (queue , relativeKey )
149
+ }
150
+ }
151
+ }
152
+ }
153
+ listCancel ()
154
+ }
155
+ }()
156
+
157
+ // Goroutine to perform BFS on the target (symmetric to the source)
158
+ go func () {
159
+ defer close (tgtClientCh )
160
+ // Queue for *relative object prefixes*
161
+ queue := []string {"" }
162
+
163
+ for len (queue ) > 0 {
164
+ prefix := queue [0 ]
165
+ queue = queue [1 :]
166
+
167
+ listCtx , listCancel := context .WithCancel (ctx )
168
+ contentsCh := targetClnt .List (listCtx , ListOptions {
169
+ Recursive : false ,
170
+ WithMetadata : opts .isMetadata ,
171
+ ShowDir : DirLast ,
172
+ Prefix : prefix , // Pass the relative prefix
173
+ })
174
+
175
+ for content := range contentsCh {
176
+ select {
177
+ case <- ctx .Done ():
178
+ listCancel ()
179
+ return
180
+ default :
181
+ if content != nil && content .Err != nil {
182
+ tgtClientCh <- content
183
+ listCancel ()
184
+ continue
185
+ }
186
+ if content == nil {
187
+ continue
188
+ }
189
+
190
+ tgtClientCh <- content
191
+
192
+ // If it's a directory, queue its *relative object key*
193
+ if content .Type .IsDir () {
194
+ relativeKey := content .ObjectKey
195
+ if relativeKey != "" && relativeKey != prefix {
196
+ // Ensure trailing slash if needed (usually present from S3 List)
197
+ if ! strings .HasSuffix (relativeKey , string (content .URL .Separator )) {
198
+ // Handle non-standard directory representation if necessary
199
+ }
200
+ queue = append (queue , relativeKey )
201
+ }
202
+ }
203
+ }
204
+ }
205
+ listCancel ()
206
+ }
207
+ }()
208
+
209
+ // Comparison logic remains the same
210
+ err := differenceInternal (
211
+ sourceClnt .GetURL ().String (),
212
+ srcClientCh ,
213
+ targetClnt .GetURL ().String (),
214
+ tgtClientCh ,
215
+ opts ,
216
+ false , // returnSimilar is false
217
+ diffCh ,
218
+ )
219
+
220
+ if err != nil {
221
+ select {
222
+ case <- ctx .Done ():
223
+ default :
224
+ diffCh <- diffMessage {Error : err }
225
+ }
226
+ }
227
+ }()
228
+
229
+ return diffCh
230
+ }
231
+
85
232
// activeActiveModTimeUpdated tries to calculate if the object copy in the target
86
233
// is older than the one in the source by comparing the modtime of the data.
87
234
func activeActiveModTimeUpdated (src , dst * ClientContent ) bool {
@@ -167,7 +314,12 @@ func bucketObjectDifference(ctx context.Context, sourceClnt, targetClnt Client)
167
314
})
168
315
}
169
316
170
- func objectDifference (ctx context.Context , sourceClnt , targetClnt Client , opts mirrorOptions ) (diffCh chan diffMessage ) {
317
+ func objectDifference (ctx context.Context , sourceClnt , targetClnt Client , opts mirrorOptions ) chan diffMessage {
318
+ if opts .bfs {
319
+ // Use layer-by-layer difference for regular objects
320
+ return layerDifference (ctx , sourceClnt , targetClnt , opts )
321
+ }
322
+
171
323
sourceURL := sourceClnt .GetURL ().String ()
172
324
sourceCh := sourceClnt .List (ctx , ListOptions {Recursive : true , WithMetadata : opts .isMetadata , ShowDir : DirNone })
173
325
0 commit comments