2
2
// License, v. 2.0. If a copy of the MPL was not distributed with this
3
3
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
4
4
5
- //! APIs to help access bundles
5
+ //! Utilities to access support bundles via the internal API
6
6
7
- use crate :: index:: SupportBundleIndex ;
8
7
use anyhow:: Context as _;
9
- use anyhow:: Result ;
8
+ use anyhow:: bail ;
10
9
use async_trait:: async_trait;
11
10
use bytes:: Buf ;
12
11
use bytes:: Bytes ;
13
12
use camino:: Utf8Path ;
14
13
use camino:: Utf8PathBuf ;
15
14
use futures:: Stream ;
16
15
use futures:: StreamExt ;
16
+ use futures:: TryStreamExt ;
17
+ use nexus_client:: types:: SupportBundleInfo ;
18
+ use nexus_client:: types:: SupportBundleState ;
17
19
use omicron_uuid_kinds:: GenericUuid ;
18
20
use omicron_uuid_kinds:: SupportBundleUuid ;
19
21
use std:: io;
22
+ use std:: io:: Write ;
20
23
use std:: pin:: Pin ;
21
24
use std:: task:: Context ;
22
25
use std:: task:: Poll ;
26
+ use std:: time:: Duration ;
27
+ use support_bundle_viewer:: BoxedFileAccessor ;
28
+ use support_bundle_viewer:: SupportBundleAccessor ;
29
+ use support_bundle_viewer:: SupportBundleIndex ;
23
30
use tokio:: io:: AsyncRead ;
24
31
use tokio:: io:: ReadBuf ;
25
32
26
- /// An I/O source which can read to a buffer
27
- ///
28
- /// This describes access to individual files within the bundle.
29
- pub trait FileAccessor : AsyncRead + Unpin { }
30
- impl < T : AsyncRead + Unpin + ?Sized > FileAccessor for T { }
31
-
32
- pub type BoxedFileAccessor < ' a > = Box < dyn FileAccessor + ' a > ;
33
-
34
- /// Describes how the support bundle's data and metadata are accessed.
35
- #[ async_trait]
36
- pub trait SupportBundleAccessor {
37
- /// Access the index of a support bundle
38
- async fn get_index ( & self ) -> Result < SupportBundleIndex > ;
39
-
40
- /// Access a file within the support bundle
41
- async fn get_file < ' a > (
42
- & mut self ,
43
- path : & Utf8Path ,
44
- ) -> Result < BoxedFileAccessor < ' a > >
45
- where
46
- Self : ' a ;
47
- }
48
-
49
33
pub struct StreamedFile < ' a > {
50
34
client : & ' a nexus_client:: Client ,
51
35
id : SupportBundleUuid ,
@@ -67,7 +51,7 @@ impl<'a> StreamedFile<'a> {
67
51
// use range requests to stream out portions of the file.
68
52
//
69
53
// This means that we would potentially want to restart the stream with a different position.
70
- async fn start_stream ( & mut self ) -> Result < ( ) > {
54
+ async fn start_stream ( & mut self ) -> anyhow :: Result < ( ) > {
71
55
// TODO: Add range headers, for range requests? Though this
72
56
// will require adding support to Progenitor + Nexus too.
73
57
let stream = self
@@ -140,10 +124,22 @@ impl<'a> InternalApiAccess<'a> {
140
124
}
141
125
}
142
126
127
+ async fn utf8_stream_to_string (
128
+ mut stream : impl futures:: Stream < Item = reqwest:: Result < bytes:: Bytes > >
129
+ + std:: marker:: Unpin ,
130
+ ) -> anyhow:: Result < String > {
131
+ let mut bytes = Vec :: new ( ) ;
132
+ while let Some ( chunk) = stream. next ( ) . await {
133
+ let chunk = chunk?;
134
+ bytes. extend_from_slice ( & chunk) ;
135
+ }
136
+ Ok ( String :: from_utf8 ( bytes) ?)
137
+ }
138
+
143
139
// Access for: The nexus internal API
144
140
#[ async_trait]
145
141
impl < ' c > SupportBundleAccessor for InternalApiAccess < ' c > {
146
- async fn get_index ( & self ) -> Result < SupportBundleIndex > {
142
+ async fn get_index ( & self ) -> anyhow :: Result < SupportBundleIndex > {
147
143
let stream = self
148
144
. client
149
145
. support_bundle_index ( self . id . as_untyped_uuid ( ) )
@@ -160,7 +156,7 @@ impl<'c> SupportBundleAccessor for InternalApiAccess<'c> {
160
156
async fn get_file < ' a > (
161
157
& mut self ,
162
158
path : & Utf8Path ,
163
- ) -> Result < BoxedFileAccessor < ' a > >
159
+ ) -> anyhow :: Result < BoxedFileAccessor < ' a > >
164
160
where
165
161
' c : ' a ,
166
162
{
@@ -173,71 +169,105 @@ impl<'c> SupportBundleAccessor for InternalApiAccess<'c> {
173
169
}
174
170
}
175
171
176
- pub struct LocalFileAccess {
177
- archive : zip:: read:: ZipArchive < std:: fs:: File > ,
178
- }
172
+ async fn wait_for_bundle_to_be_collected (
173
+ client : & nexus_client:: Client ,
174
+ id : SupportBundleUuid ,
175
+ ) -> Result < SupportBundleInfo , anyhow:: Error > {
176
+ let mut printed_wait_msg = false ;
177
+ loop {
178
+ let sb = client
179
+ . support_bundle_view ( id. as_untyped_uuid ( ) )
180
+ . await
181
+ . with_context ( || {
182
+ format ! ( "failed to query for support bundle {}" , id)
183
+ } ) ?;
179
184
180
- impl LocalFileAccess {
181
- pub fn new ( path : & Utf8Path ) -> Result < Self > {
182
- let file = std:: fs:: File :: open ( path) ?;
183
- Ok ( Self { archive : zip:: read:: ZipArchive :: new ( file) ? } )
185
+ match sb. state {
186
+ SupportBundleState :: Active => {
187
+ if printed_wait_msg {
188
+ eprintln ! ( "" ) ;
189
+ }
190
+ return Ok ( sb. into_inner ( ) ) ;
191
+ }
192
+ SupportBundleState :: Collecting => {
193
+ if !printed_wait_msg {
194
+ eprint ! ( "Waiting for {} to finish collection..." , id) ;
195
+ printed_wait_msg = true ;
196
+ }
197
+ tokio:: time:: sleep ( Duration :: from_secs ( 1 ) ) . await ;
198
+ eprint ! ( "." ) ;
199
+ std:: io:: stderr ( ) . flush ( ) . context ( "cannot flush stderr" ) ?;
200
+ }
201
+ other => bail ! ( "Unexepcted state: {other}" ) ,
202
+ }
184
203
}
185
204
}
186
205
187
- // Access for: Local zip files
188
- #[ async_trait]
189
- impl SupportBundleAccessor for LocalFileAccess {
190
- async fn get_index ( & self ) -> Result < SupportBundleIndex > {
191
- let names: Vec < & str > = self . archive . file_names ( ) . collect ( ) ;
192
- let all_names = names. join ( "\n " ) ;
193
- Ok ( SupportBundleIndex :: new ( & all_names) )
194
- }
206
+ /// Returns either a specific bundle or the latest active bundle.
207
+ ///
208
+ /// If a bundle is being collected, waits for it.
209
+ pub async fn access_bundle_from_id (
210
+ client : & nexus_client:: Client ,
211
+ id : Option < SupportBundleUuid > ,
212
+ ) -> Result < InternalApiAccess < ' _ > , anyhow:: Error > {
213
+ let id = match id {
214
+ Some ( id) => {
215
+ // Ensure the bundle has been collected
216
+ let sb = wait_for_bundle_to_be_collected (
217
+ client,
218
+ SupportBundleUuid :: from_untyped_uuid ( * id. as_untyped_uuid ( ) ) ,
219
+ )
220
+ . await ?;
221
+ SupportBundleUuid :: from_untyped_uuid ( sb. id . into_untyped_uuid ( ) )
222
+ }
223
+ None => {
224
+ // Grab the latest if one isn't supplied
225
+ let support_bundle_stream =
226
+ client. support_bundle_list_stream ( None , None ) ;
227
+ let mut support_bundles = support_bundle_stream
228
+ . try_collect :: < Vec < _ > > ( )
229
+ . await
230
+ . context ( "listing support bundles" ) ?;
231
+ support_bundles. sort_by_key ( |k| k. time_created ) ;
195
232
196
- async fn get_file < ' a > (
197
- & mut self ,
198
- path : & Utf8Path ,
199
- ) -> Result < BoxedFileAccessor < ' a > > {
200
- let mut file = self . archive . by_name ( path. as_str ( ) ) ?;
201
- let mut buf = Vec :: new ( ) ;
202
- std:: io:: copy ( & mut file, & mut buf) ?;
233
+ let active_sb = support_bundles
234
+ . iter ( )
235
+ . find ( |sb| matches ! ( sb. state, SupportBundleState :: Active ) ) ;
203
236
204
- Ok ( Box :: new ( AsyncZipFile { buf, copied : 0 } ) )
205
- }
206
- }
237
+ let sb = match active_sb {
238
+ Some ( sb) => sb. clone ( ) ,
239
+ None => {
240
+ // This is a special case, but not an uncommon one:
241
+ //
242
+ // - Someone just created a bundle...
243
+ // - ... but collection is still happening.
244
+ //
245
+ // To smooth out this experience for users, we wait for the
246
+ // collection to complete.
247
+ let collecting_sb = support_bundles. iter ( ) . find ( |sb| {
248
+ matches ! ( sb. state, SupportBundleState :: Collecting )
249
+ } ) ;
250
+ if let Some ( collecting_sb) = collecting_sb {
251
+ let id = & collecting_sb. id ;
252
+ wait_for_bundle_to_be_collected (
253
+ client,
254
+ SupportBundleUuid :: from_untyped_uuid (
255
+ * id. as_untyped_uuid ( ) ,
256
+ ) ,
257
+ )
258
+ . await ?
259
+ } else {
260
+ bail ! (
261
+ "Cannot find active support bundle. Try creating one"
262
+ )
263
+ }
264
+ }
265
+ } ;
207
266
208
- // We're currently buffering the entire file into memory, mostly because dealing with the lifetime
209
- // of ZipArchive and ZipFile objects is so difficult.
210
- pub struct AsyncZipFile {
211
- buf : Vec < u8 > ,
212
- copied : usize ,
213
- }
267
+ eprintln ! ( "Inspecting bundle {} from {}" , sb. id, sb. time_created) ;
214
268
215
- impl AsyncRead for AsyncZipFile {
216
- fn poll_read (
217
- mut self : Pin < & mut Self > ,
218
- _cx : & mut Context < ' _ > ,
219
- buf : & mut ReadBuf < ' _ > ,
220
- ) -> Poll < io:: Result < ( ) > > {
221
- let to_copy =
222
- std:: cmp:: min ( self . buf . len ( ) - self . copied , buf. remaining ( ) ) ;
223
- if to_copy == 0 {
224
- return Poll :: Ready ( Ok ( ( ) ) ) ;
269
+ SupportBundleUuid :: from_untyped_uuid ( sb. id . into_untyped_uuid ( ) )
225
270
}
226
- let src = & self . buf [ self . copied ..] ;
227
- buf. put_slice ( & src[ ..to_copy] ) ;
228
- self . copied += to_copy;
229
- Poll :: Ready ( Ok ( ( ) ) )
230
- }
231
- }
232
-
233
- async fn utf8_stream_to_string (
234
- mut stream : impl futures:: Stream < Item = reqwest:: Result < bytes:: Bytes > >
235
- + std:: marker:: Unpin ,
236
- ) -> Result < String > {
237
- let mut bytes = Vec :: new ( ) ;
238
- while let Some ( chunk) = stream. next ( ) . await {
239
- let chunk = chunk?;
240
- bytes. extend_from_slice ( & chunk) ;
241
- }
242
- Ok ( String :: from_utf8 ( bytes) ?)
271
+ } ;
272
+ Ok ( InternalApiAccess :: new ( client, id) )
243
273
}
0 commit comments