1818import os
1919import shutil
2020import urllib
21+ from collections import defaultdict
2122from pathlib import Path
2223from typing import TYPE_CHECKING , Any , Dict , List , Optional , Tuple , Union , cast
2324
2930from renku .core import errors
3031from renku .core .config import get_value , remove_value , set_value
3132from renku .core .dataset .datasets_provenance import DatasetsProvenance
32- from renku .core .dataset .pointer_file import (
33- create_external_file ,
34- delete_external_file ,
35- is_linked_file_updated ,
36- update_linked_file ,
37- )
33+ from renku .core .dataset .pointer_file import delete_external_file , is_linked_file_updated , update_linked_file
34+ from renku .core .dataset .providers .api import AddProviderInterface , ProviderApi
3835from renku .core .dataset .providers .factory import ProviderFactory
39- from renku .core .dataset .providers .models import ProviderDataset
36+ from renku .core .dataset .providers .git import GitProvider
37+ from renku .core .dataset .providers .models import DatasetUpdateAction , ProviderDataset
4038from renku .core .dataset .request_model import ImageRequestModel
4139from renku .core .dataset .tag import get_dataset_by_tag , prompt_access_token , prompt_tag_selection
4240from renku .core .interface .dataset_gateway import IDatasetGateway
43- from renku .core .storage import check_external_storage , pull_paths_from_storage , track_paths_in_storage
41+ from renku .core .storage import check_external_storage , track_paths_in_storage
4442from renku .core .util import communication
4543from renku .core .util .datetime8601 import local_now
46- from renku .core .util .git import clone_repository , get_cache_directory_for_repository , get_git_user
47- from renku .core .util .metadata import is_linked_file , prompt_for_credentials , read_credentials , store_credentials
44+ from renku .core .util .git import get_git_user
45+ from renku .core .util .metadata import prompt_for_credentials , read_credentials , store_credentials
4846from renku .core .util .os import (
4947 create_symlink ,
5048 delete_dataset_file ,
7270
7371if TYPE_CHECKING :
7472 from renku .core .interface .storage import IStorage
75- from renku .infrastructure .repository import Repository
7673
7774
7875@validate_arguments (config = dict (arbitrary_types_allowed = True ))
@@ -571,6 +568,7 @@ def remove_files(dataset):
571568 total_size = calculate_total_size (importer .provider_dataset_files ),
572569 clear_files_before = True ,
573570 datadir = datadir ,
571+ storage = provider_dataset .storage ,
574572 )
575573
576574 new_dataset .update_metadata_from (provider_dataset )
@@ -714,19 +712,32 @@ def update_datasets(
714712 raise errors .ParameterError ("No files matched the criteria." )
715713 return imported_dataset_updates_view_models , []
716714
717- git_files = []
715+ provider_files : Dict [ AddProviderInterface , List [ DynamicProxy ]] = defaultdict ( list )
718716 unique_remotes = set ()
719717 linked_files = []
720- local_files = []
721718
722719 for file in records :
723- if file .based_on :
724- git_files .append (file )
725- unique_remotes .add (file .based_on .url )
726- elif file .linked :
720+ if file .linked :
727721 linked_files .append (file )
728722 else :
729- local_files .append (file )
723+ if not getattr (file , "provider" , None ):
724+ if file .based_on :
725+ uri = file .dataset .same_as .value if file .dataset .same_as else file .based_on .url
726+ else :
727+ uri = file .source
728+ try :
729+ file .provider = cast (
730+ AddProviderInterface ,
731+ ProviderFactory .get_add_provider (uri ),
732+ )
733+ except errors .DatasetProviderNotFound :
734+ communication .warn (f"Couldn't find provider for file { file .path } in dataset { file .dataset .name } " )
735+ continue
736+
737+ provider_files [file .provider ].append (file )
738+
739+ if isinstance (file .provider , GitProvider ):
740+ unique_remotes .add (file .based_on .url )
730741
731742 if ref and len (unique_remotes ) > 1 :
732743 raise errors .ParameterError (
@@ -741,18 +752,24 @@ def update_datasets(
741752 updated = update_linked_files (linked_files , dry_run = dry_run )
742753 updated_files .extend (updated )
743754
744- if git_files and not no_remote :
745- updated , deleted = update_dataset_git_files (files = git_files , ref = ref , delete = delete , dry_run = dry_run )
746- updated_files .extend (updated )
747- deleted_files .extend (deleted )
755+ provider_context : Dict [str , Any ] = {}
756+
757+ for provider , files in provider_files .items ():
758+ if (no_remote and cast (ProviderApi , provider ).is_remote ) or (
759+ no_local and not cast (ProviderApi , provider ).is_remote
760+ ):
761+ continue
748762
749- if local_files and not no_local :
750- updated , deleted , new = update_dataset_local_files (
751- records = local_files , check_data_directory = check_data_directory
763+ results = provider .update_files (
764+ files = files ,
765+ dry_run = dry_run ,
766+ delete = delete ,
767+ context = provider_context ,
768+ ref = ref ,
769+ check_data_directory = check_data_directory ,
752770 )
753- updated_files .extend (updated )
754- deleted_files .extend (deleted )
755- updated_files .extend (new )
771+ updated_files .extend (r .entity for r in results if r .action == DatasetUpdateAction .UPDATE )
772+ deleted_files .extend (r .entity for r in results if r .action == DatasetUpdateAction .DELETE )
756773
757774 if not dry_run :
758775 if deleted_files and not delete :
@@ -974,154 +991,30 @@ def move_files(dataset_gateway: IDatasetGateway, files: Dict[Path, Path], to_dat
974991 datasets_provenance .add_or_update (to_dataset , creator = creator )
975992
976993
977- def update_dataset_local_files (
978- records : List [DynamicProxy ], check_data_directory : bool
979- ) -> Tuple [List [DynamicProxy ], List [DynamicProxy ], List [DynamicProxy ]]:
980- """Update files metadata from the git history.
981-
982- Args:
983- records(List[DynamicProxy]): File records to update.
984- check_data_directory(bool): Whether to check the dataset's data directory for new files.
985- Returns:
986- Tuple[List[DynamicProxy], List[DynamicProxy]]: Tuple of updated and deleted file records.
987- """
988- updated_files : List [DynamicProxy ] = []
989- deleted_files : List [DynamicProxy ] = []
990- new_files : List [DynamicProxy ] = []
991- progress_text = "Checking for local updates"
992-
993- try :
994- communication .start_progress (progress_text , len (records ))
995- check_paths = []
996- records_to_check = []
997-
998- for file in records :
999- communication .update_progress (progress_text , 1 )
1000-
1001- if file .based_on or file .linked :
1002- continue
1003-
1004- if not (project_context .path / file .entity .path ).exists ():
1005- deleted_files .append (file )
1006- continue
1007-
1008- check_paths .append (file .entity .path )
1009- records_to_check .append (file )
1010-
1011- checksums = project_context .repository .get_object_hashes (check_paths )
1012-
1013- for file in records_to_check :
1014- current_checksum = checksums .get (file .entity .path )
1015- if not current_checksum :
1016- deleted_files .append (file )
1017- elif current_checksum != file .entity .checksum :
1018- updated_files .append (file )
1019- elif check_data_directory and not any (file .entity .path == f .entity .path for f in file .dataset .files ):
1020- datadir = file .dataset .get_datadir ()
1021- try :
1022- get_safe_relative_path (file .entity .path , datadir )
1023- except ValueError :
1024- continue
1025-
1026- new_files .append (file )
1027- finally :
1028- communication .finalize_progress (progress_text )
1029-
1030- return updated_files , deleted_files , new_files
1031-
1032-
1033994def _update_datasets_files_metadata (updated_files : List [DynamicProxy ], deleted_files : List [DynamicProxy ], delete : bool ):
1034995 modified_datasets = {}
1035996 checksums = project_context .repository .get_object_hashes ([file .entity .path for file in updated_files ])
1036997 for file in updated_files :
1037998 new_file = DatasetFile .from_path (
1038999 path = file .entity .path , based_on = file .based_on , source = file .source , checksum = checksums .get (file .entity .path )
10391000 )
1040- modified_datasets [file .dataset .name ] = file .dataset
1001+ modified_datasets [file .dataset .name ] = (
1002+ file .dataset ._subject if isinstance (file .dataset , DynamicProxy ) else file .dataset
1003+ )
10411004 file .dataset .add_or_update_files (new_file )
10421005
10431006 if delete :
10441007 for file in deleted_files :
1045- modified_datasets [file .dataset .name ] = file .dataset
1008+ modified_datasets [file .dataset .name ] = (
1009+ file .dataset ._subject if isinstance (file .dataset , DynamicProxy ) else file .dataset
1010+ )
10461011 file .dataset .unlink_file (file .entity .path )
10471012
10481013 datasets_provenance = DatasetsProvenance ()
10491014 for dataset in modified_datasets .values ():
10501015 datasets_provenance .add_or_update (dataset , creator = get_git_user (repository = project_context .repository ))
10511016
10521017
1053- def update_dataset_git_files (
1054- files : List [DynamicProxy ], ref : Optional [str ], delete : bool , dry_run : bool
1055- ) -> Tuple [List [DynamicProxy ], List [DynamicProxy ]]:
1056- """Update files and dataset metadata according to their remotes.
1057-
1058- Args:
1059- files(List[DynamicProxy]): List of files to be updated.
1060- ref(Optional[str]): Reference to use for update.
1061- delete(bool, optional): Indicates whether to delete files or not (Default value = False).
1062- dry_run(bool): Whether to perform update or only print changes.
1063-
1064- Returns:
1065- Tuple[List[DynamicProxy], List[DynamicProxy]]: Tuple of updated and deleted file records.
1066- """
1067- visited_repos : Dict [str , "Repository" ] = {}
1068- updated_files : List [DynamicProxy ] = []
1069- deleted_files : List [DynamicProxy ] = []
1070-
1071- progress_text = "Checking files for updates"
1072-
1073- try :
1074- communication .start_progress (progress_text , len (files ))
1075- for file in files :
1076- communication .update_progress (progress_text , 1 )
1077- if not file .based_on :
1078- continue
1079-
1080- based_on = file .based_on
1081- url = based_on .url
1082- if url in visited_repos :
1083- remote_repository = visited_repos [url ]
1084- else :
1085- communication .echo (msg = "Cloning remote repository..." )
1086- path = get_cache_directory_for_repository (url = url )
1087- remote_repository = clone_repository (url = url , path = path , checkout_revision = ref )
1088- visited_repos [url ] = remote_repository
1089-
1090- checksum = remote_repository .get_object_hash (path = based_on .path , revision = "HEAD" )
1091- found = checksum is not None
1092- changed = found and based_on .checksum != checksum
1093-
1094- src = remote_repository .path / based_on .path
1095- dst = project_context .metadata_path .parent / file .entity .path
1096-
1097- if not found :
1098- if not dry_run and delete :
1099- delete_dataset_file (dst , follow_symlinks = True )
1100- project_context .repository .add (dst , force = True )
1101- deleted_files .append (file )
1102- elif changed :
1103- if not dry_run :
1104- # Fetch file if it is tracked by Git LFS
1105- pull_paths_from_storage (remote_repository , remote_repository .path / based_on .path )
1106- if is_linked_file (path = src , project_path = remote_repository .path ):
1107- delete_dataset_file (dst , follow_symlinks = True )
1108- create_external_file (target = src .resolve (), path = dst )
1109- else :
1110- shutil .copy (src , dst )
1111- file .based_on = RemoteEntity (
1112- checksum = checksum , path = based_on .path , url = based_on .url # type: ignore
1113- )
1114- updated_files .append (file )
1115- finally :
1116- communication .finalize_progress (progress_text )
1117-
1118- if not updated_files and (not delete or not deleted_files ):
1119- # Nothing to commit or update
1120- return [], deleted_files
1121-
1122- return updated_files , deleted_files
1123-
1124-
11251018def update_linked_files (records : List [DynamicProxy ], dry_run : bool ) -> List [DynamicProxy ]:
11261019 """Update files linked to other files in the project.
11271020
@@ -1230,7 +1123,7 @@ def should_include(filepath: Path) -> bool:
12301123 continue
12311124
12321125 record = DynamicProxy (file )
1233- record .dataset = dataset
1126+ record .dataset = DynamicProxy ( dataset )
12341127 records .append (record )
12351128
12361129 if not check_data_directory :
0 commit comments