11use std:: collections:: HashMap ;
22use std:: path:: { Path , PathBuf } ;
33use std:: str:: FromStr as _;
4+ use std:: sync:: Arc ;
45
56use anyhow:: { Context as _, Result } ;
67use clap:: { Arg , ArgMatches , Command } ;
@@ -10,7 +11,8 @@ use log::{debug, info, warn};
1011use objectstore_client:: { ClientBuilder , ExpirationPolicy , Usecase } ;
1112use secrecy:: ExposeSecret as _;
1213use sha2:: { Digest as _, Sha256 } ;
13- use tokio:: fs:: File ;
14+ use tokio:: io:: AsyncReadExt as _;
15+ use tokio:: sync:: Mutex ;
1416use walkdir:: WalkDir ;
1517
1618use crate :: api:: { Api , CreateSnapshotResponse , ImageMetadata , SnapshotsManifest } ;
@@ -209,16 +211,16 @@ fn validate_image_sizes(images: &[ImageInfo]) -> Result<()> {
209211 Ok ( ( ) )
210212}
211213
212- fn compute_sha256_hash ( path : & Path ) -> Result < String > {
213- use std:: io:: Read as _;
214-
215- let mut file = std:: fs:: File :: open ( path)
214+ async fn compute_sha256_hash ( path : & Path ) -> Result < String > {
215+ let mut file = tokio:: fs:: File :: open ( path)
216+ . await
216217 . with_context ( || format ! ( "Failed to open image for hashing: {}" , path. display( ) ) ) ?;
217218 let mut hasher = Sha256 :: new ( ) ;
218219 let mut buffer = [ 0u8 ; 8192 ] ;
219220 loop {
220221 let bytes_read = file
221222 . read ( & mut buffer)
223+ . await
222224 . with_context ( || format ! ( "Failed to read image for hashing: {}" , path. display( ) ) ) ?;
223225 if bytes_read == 0 {
224226 break ;
@@ -290,66 +292,72 @@ fn upload_images(
290292 . build ( )
291293 . context ( "Failed to create tokio runtime" ) ?;
292294
293- let mut many_builder = session. many ( ) ;
294- let mut manifest_entries = HashMap :: new ( ) ;
295295 let image_count = images. len ( ) ;
296+ let manifest_entries = Arc :: new ( Mutex :: new ( HashMap :: new ( ) ) ) ;
296297
297- for image in images {
298- debug ! ( "Processing image: {}" , image. path. display( ) ) ;
299-
300- let hash = compute_sha256_hash ( & image. path ) ?;
301- let file = runtime. block_on ( File :: open ( & image. path ) ) . with_context ( || {
302- format ! ( "Failed to open image for upload: {}" , image. path. display( ) )
303- } ) ?;
298+ runtime. block_on ( async {
299+ let mut many_builder = session. many ( ) ;
304300
305- let key = format ! ( "{org_id}/{project_id}/{hash}" ) ;
306- info ! ( "Queueing {} as {key }" , image. relative_path . display( ) ) ;
301+ for image in images {
302+ debug ! ( "Processing image: { }" , image. path . display( ) ) ;
307303
308- many_builder = many_builder. push (
309- session
310- . put_file ( file)
311- . key ( & key)
312- . expiration_policy ( expiration) ,
313- ) ;
304+ let hash = compute_sha256_hash ( & image. path ) . await ?;
305+ let file = tokio:: fs:: File :: open ( & image. path ) . await . with_context ( || {
306+ format ! ( "Failed to open image for upload: {}" , image. path. display( ) )
307+ } ) ?;
314308
315- let image_file_name = image
316- . relative_path
317- . file_name ( )
318- . unwrap_or_default ( )
319- . to_string_lossy ( )
320- . into_owned ( ) ;
321- manifest_entries. insert (
322- hash,
323- ImageMetadata {
324- image_file_name,
325- width : image. width ,
326- height : image. height ,
327- } ,
328- ) ;
329- }
309+ let key = format ! ( "{org_id}/{project_id}/{hash}" ) ;
310+ info ! ( "Queueing {} as {key}" , image. relative_path. display( ) ) ;
330311
331- let result = runtime. block_on ( async { many_builder. send ( ) . error_for_failures ( ) . await } ) ;
312+ many_builder = many_builder. push (
313+ session
314+ . put_file ( file)
315+ . key ( & key)
316+ . expiration_policy ( expiration) ,
317+ ) ;
332318
333- match result {
334- Ok ( ( ) ) => {
335- println ! (
336- "{} Uploaded {} image {}" ,
337- style( ">" ) . dim( ) ,
338- style( image_count) . yellow( ) ,
339- if image_count == 1 { "file" } else { "files" }
319+ let image_file_name = image
320+ . relative_path
321+ . file_name ( )
322+ . unwrap_or_default ( )
323+ . to_string_lossy ( )
324+ . into_owned ( ) ;
325+ manifest_entries. lock ( ) . await . insert (
326+ hash,
327+ ImageMetadata {
328+ image_file_name,
329+ width : image. width ,
330+ height : image. height ,
331+ } ,
340332 ) ;
341- Ok ( manifest_entries)
342333 }
343- Err ( errors) => {
344- eprintln ! ( "There were errors uploading images:" ) ;
345- let mut error_count = 0 ;
346- for error in errors {
347- eprintln ! ( " {}" , style( error) . red( ) ) ;
348- error_count += 1 ;
334+
335+ let result = many_builder. send ( ) . error_for_failures ( ) . await ;
336+ match result {
337+ Ok ( ( ) ) => {
338+ println ! (
339+ "{} Uploaded {} image {}" ,
340+ style( ">" ) . dim( ) ,
341+ style( image_count) . yellow( ) ,
342+ if image_count == 1 { "file" } else { "files" }
343+ ) ;
344+ Ok ( ( ) )
345+ }
346+ Err ( errors) => {
347+ eprintln ! ( "There were errors uploading images:" ) ;
348+ let mut error_count = 0 ;
349+ for error in errors {
350+ eprintln ! ( " {}" , style( error) . red( ) ) ;
351+ error_count += 1 ;
352+ }
353+ anyhow:: bail!( "Failed to upload {error_count} out of {image_count} images" )
349354 }
350- anyhow:: bail!( "Failed to upload {error_count} out of {image_count} images" )
351355 }
352- }
356+ } ) ?;
357+
358+ Ok ( Arc :: try_unwrap ( manifest_entries)
359+ . expect ( "all references should be dropped after runtime completes" )
360+ . into_inner ( ) )
353361}
354362
355363#[ cfg( test) ]
0 commit comments