11use crate :: error:: * ;
22use crate :: util:: * ;
33use crate :: Result ;
4- use cloud_storage:: { object:: Object , ListRequest } ;
4+ use cloud_storage:: { object:: Object , Client , ListRequest } ;
55use futures:: stream:: FuturesUnordered ;
66use futures:: stream:: { StreamExt , TryStreamExt } ;
77use snafu:: { futures:: TryStreamExt as SnafuTryStreamExt , ResultExt } ;
@@ -10,16 +10,30 @@ use tokio::fs::{self, File};
1010use tokio:: io:: AsyncWriteExt ;
1111
1212#[ derive( Debug ) ]
13- pub struct GCS2Local {
13+ pub struct GcsSource {
1414 pub ( crate ) force_overwrite : bool ,
1515 pub ( crate ) concurrency : usize ,
16+ pub ( crate ) client : Client ,
1617}
1718
18- impl GCS2Local {
19- /// Syncs remote GCS bucket path to a local path
19+ impl GcsSource {
20+ pub fn new ( force_overwrite : bool , concurrency : usize ) -> Self {
21+ let client = Client :: default ( ) ;
22+ Self {
23+ force_overwrite,
24+ concurrency,
25+ client,
26+ }
27+ }
28+
29+ pub fn client ( & self ) -> & Client {
30+ & self . client
31+ }
32+
33+ /// Syncs remote Gcs bucket path to a local path
2034 ///
2135 /// Returns actual downloads count
22- pub async fn sync_gcs_to_local (
36+ pub async fn to_local (
2337 & self ,
2438 bucket_src : & str ,
2539 path_src : & str ,
@@ -32,18 +46,23 @@ impl GCS2Local {
3246 dst_dir. as_ref( )
3347 ) ;
3448 let dst_dir = dst_dir. as_ref ( ) ;
35- let objects_src = Object :: list (
36- bucket_src,
37- ListRequest {
38- prefix : Some ( path_src. to_owned ( ) ) ,
39- ..Default :: default ( )
40- } ,
41- )
42- . await
43- . context ( CloudStorage {
44- object : path_src. to_owned ( ) ,
45- op : OpSource :: pre ( OpSource :: ListPrefix ) ,
46- } ) ?;
49+ log:: trace!( "Requesting objects" ) ;
50+ let objects_src = self
51+ . client
52+ . object ( )
53+ . list (
54+ bucket_src,
55+ ListRequest {
56+ prefix : Some ( path_src. to_owned ( ) ) ,
57+ ..Default :: default ( )
58+ } ,
59+ )
60+ . await
61+ . context ( CloudStorage {
62+ object : path_src. to_owned ( ) ,
63+ op : OpSource :: pre ( OpSource :: ListPrefix ) ,
64+ } ) ?;
65+ log:: trace!( "iterating objects" ) ;
4766 objects_src
4867 . context ( CloudStorage {
4968 object : path_src. to_owned ( ) ,
@@ -53,9 +72,12 @@ impl GCS2Local {
5372 . try_fold (
5473 ( 0usize , dst_dir) ,
5574 |( mut count, dst_dir) , object_srcs| async move {
75+ log:: trace!( "objects: {:?}" , object_srcs) ;
5676 let mut jobs_pool = FuturesUnordered :: new ( ) ;
5777
5878 for object_src in object_srcs. items {
79+ log:: trace!( "object: {:?}" , object_src) ;
80+
5981 if jobs_pool. len ( ) == self . concurrency {
6082 // unwrap because it's not empty
6183 count += jobs_pool. next ( ) . await . unwrap ( ) ?;
@@ -87,6 +109,7 @@ impl GCS2Local {
87109
88110 let path_dst = path_dst. to_str ( ) . expect ( "valid utf8 file name" ) . to_owned ( ) ;
89111
112+ log:: trace!( "downloading object {:?}" , object_src) ;
90113 let job = Self :: download_object (
91114 self . force_overwrite ,
92115 bucket_src,
@@ -96,9 +119,12 @@ impl GCS2Local {
96119
97120 jobs_pool. push ( job) ;
98121 }
122+
123+ log:: trace!( "waiting for jobs completion" ) ;
99124 while let Some ( job) = jobs_pool. next ( ) . await {
100125 count += job?;
101126 }
127+ log:: trace!( "all jobs completed" ) ;
102128
103129 Ok ( ( count, dst_dir) )
104130 } ,
@@ -107,6 +133,56 @@ impl GCS2Local {
107133 . map ( |( count, _) | count)
108134 }
109135
136+ /// Copies remote Gcs bucket file or directory to another remote Gcs bucket file or directory
137+ pub async fn to_gcs (
138+ & self ,
139+ bucket_src : & str ,
140+ path_src : & str ,
141+ bucket_dst : & str ,
142+ path_dst : & str ,
143+ ) -> Result < usize , Error > {
144+ let objects_src = self
145+ . client
146+ . object ( )
147+ . list (
148+ bucket_src,
149+ ListRequest {
150+ prefix : Some ( path_src. to_owned ( ) ) ,
151+ ..Default :: default ( )
152+ } ,
153+ )
154+ . await
155+ . context ( CloudStorage {
156+ object : path_src. to_owned ( ) ,
157+ op : OpSource :: pre ( OpSource :: ListPrefix ) ,
158+ } ) ?;
159+ objects_src
160+ . context ( CloudStorage {
161+ object : path_src. to_owned ( ) ,
162+ op : OpSource :: ListPrefix ,
163+ } )
164+ // .map_err(Error::from)
165+ . try_fold (
166+ ( 0usize , bucket_dst, path_dst) ,
167+ |( mut count, bucket_dst, path_dst) , object_srcs| async move {
168+ for object_src in object_srcs. items {
169+ object_src
170+ . copy ( bucket_dst, path_dst)
171+ . await
172+ . context ( CloudStorage {
173+ object : path_dst. to_owned ( ) ,
174+ op : OpSource :: CopyObject ,
175+ } ) ?;
176+ count += 1 ;
177+ }
178+
179+ Ok ( ( count, bucket_dst, path_dst) )
180+ } ,
181+ )
182+ . await
183+ . map ( |( count, ..) | count)
184+ }
185+
110186 async fn create_parent_dirs ( force_overwrite : bool , path_dst : impl AsRef < Path > ) -> Result < ( ) > {
111187 let path_dst = PathBuf :: from ( path_dst. as_ref ( ) ) ;
112188
0 commit comments