|
| 1 | +from asyncio import run |
| 2 | +from typing import Annotated |
| 3 | + |
| 4 | +from typer import Argument, Option |
| 5 | + |
| 6 | +from ..app import app |
| 7 | +from .utils import ( |
| 8 | + ARRAPIClient, |
| 9 | + CustomFormat, |
| 10 | + CreatedCustomFormat, |
| 11 | + QualityProfile, |
| 12 | + SONARR_ONLY_IMPLEMENTATIONS, |
| 13 | + RADARR_ONLY_IMPLEMENTATIONS, |
| 14 | + QualityProfileFormatItem, |
| 15 | +) |
| 16 | + |
| 17 | + |
| 18 | +async def copy_all_custom_formats_and_scores_inner( |
| 19 | + *, |
| 20 | + source_arr_url: str, |
| 21 | + source_api_key: str, |
| 22 | + target_arr_url: str, |
| 23 | + target_api_key: str, |
| 24 | + source_arr_headers: dict[str, str] | None = None, |
| 25 | + target_arr_headers: dict[str, str] | None = None, |
| 26 | + source_profile_name: str | None = None, |
| 27 | + target_profile_name: str | None = None, |
| 28 | + delete_extra_custom_formats: bool = False, |
| 29 | +): |
| 30 | + async with ( |
| 31 | + ARRAPIClient( |
| 32 | + source_arr_url, api_key=source_api_key, extra_headers=source_arr_headers |
| 33 | + ) as source_session, |
| 34 | + ARRAPIClient( |
| 35 | + target_arr_url, api_key=target_api_key, extra_headers=target_arr_headers |
| 36 | + ) as target_session, |
| 37 | + ): |
| 38 | + source_type = await source_session.get_arr_type() |
| 39 | + target_type = await target_session.get_arr_type() |
| 40 | + source_custom_formats = await source_session.get_custom_formats() |
| 41 | + target_custom_formats = await target_session.get_custom_formats() |
| 42 | + source_cf_dict = {cf.name: cf for cf in source_custom_formats} |
| 43 | + target_cf_dict = {cf.name: cf for cf in target_custom_formats} |
| 44 | + # extra_custom_formats_names = target_cf_dict.keys() - source_cf_dict.keys() |
| 45 | + extra_custom_format_ids = { |
| 46 | + cf.id for name, cf in target_cf_dict.items() if name not in source_cf_dict |
| 47 | + } |
| 48 | + seen_custom_formats: list[CreatedCustomFormat] = [] |
| 49 | + skipped_source_incompatible_custom_formats: list[CreatedCustomFormat] = [] |
| 50 | + attributes_to_skip = ( |
| 51 | + [] |
| 52 | + if source_type == target_type |
| 53 | + else ( |
| 54 | + SONARR_ONLY_IMPLEMENTATIONS |
| 55 | + if source_type == "Sonarr" |
| 56 | + else RADARR_ONLY_IMPLEMENTATIONS |
| 57 | + ) |
| 58 | + ) |
| 59 | + created_count = 0 |
| 60 | + updated_count = 0 |
| 61 | + |
| 62 | + for source_cf in source_custom_formats: |
| 63 | + if any( |
| 64 | + spec.implementation in attributes_to_skip |
| 65 | + for spec in source_cf.specifications |
| 66 | + ): |
| 67 | + skipped_source_incompatible_custom_formats.append(source_cf) |
| 68 | + print( |
| 69 | + f"Skipped incompatible custom format '{source_cf.name}' for target ARR type '{target_type}'." |
| 70 | + ) |
| 71 | + continue |
| 72 | + if source_cf.name in target_cf_dict: |
| 73 | + target_cf = target_cf_dict[source_cf.name] |
| 74 | + if ( |
| 75 | + source_cf.includeCustomFormatWhenRenaming |
| 76 | + != target_cf.includeCustomFormatWhenRenaming |
| 77 | + or source_cf.specifications != target_cf.specifications |
| 78 | + ): |
| 79 | + updated_cf = await target_session.update_custom_format( |
| 80 | + target_cf.id, |
| 81 | + CustomFormat( |
| 82 | + name=source_cf.name, |
| 83 | + includeCustomFormatWhenRenaming=source_cf.includeCustomFormatWhenRenaming, |
| 84 | + specifications=source_cf.specifications, |
| 85 | + ), |
| 86 | + ) |
| 87 | + seen_custom_formats.append(updated_cf) |
| 88 | + updated_count += 1 |
| 89 | + print(f"Updated custom format '{source_cf.name}' in target ARR.") |
| 90 | + else: |
| 91 | + seen_custom_formats.append(target_cf) |
| 92 | + # print( |
| 93 | + # f"Custom format '{source_cf.name}' already up to date in target ARR." |
| 94 | + # ) |
| 95 | + else: |
| 96 | + created_cf = await target_session.create_custom_format( |
| 97 | + CustomFormat( |
| 98 | + name=source_cf.name, |
| 99 | + includeCustomFormatWhenRenaming=source_cf.includeCustomFormatWhenRenaming, |
| 100 | + specifications=source_cf.specifications, |
| 101 | + ) |
| 102 | + ) |
| 103 | + seen_custom_formats.append(created_cf) |
| 104 | + created_count += 1 |
| 105 | + print(f"Created custom format '{source_cf.name}' in target ARR.") |
| 106 | + print( |
| 107 | + f"Created {created_count} and updated {updated_count} custom formats in target ARR." |
| 108 | + ) |
| 109 | + if delete_extra_custom_formats and extra_custom_format_ids: |
| 110 | + await target_session.bulk_delete_custom_formats(extra_custom_format_ids) |
| 111 | + print( |
| 112 | + f"Deleted {len(extra_custom_format_ids)} extra custom formats from target ARR." |
| 113 | + ) |
| 114 | + if source_profile_name: |
| 115 | + if not target_profile_name: |
| 116 | + raise ValueError( |
| 117 | + "If source_profile_name is provided, target_profile_name must also be provided." |
| 118 | + ) |
| 119 | + if target_profile_name: |
| 120 | + if not source_profile_name: |
| 121 | + raise ValueError( |
| 122 | + "If target_profile_name is provided, source_profile_name must also be provided." |
| 123 | + ) |
| 124 | + if not source_profile_name or not target_profile_name: |
| 125 | + return |
| 126 | + source_quality_profiles = await source_session.get_quality_profiles() |
| 127 | + target_quality_profiles = await target_session.get_quality_profiles() |
| 128 | + source_profile_dict = {qp.name: qp for qp in source_quality_profiles} |
| 129 | + target_profile_dict = {qp.name: qp for qp in target_quality_profiles} |
| 130 | + if source_profile_name not in source_profile_dict: |
| 131 | + raise ValueError( |
| 132 | + f"Source profile '{source_profile_name}' not found in source ARR." |
| 133 | + ) |
| 134 | + if target_profile_name not in target_profile_dict: |
| 135 | + raise ValueError( |
| 136 | + f"Target profile '{target_profile_name}' not found in target ARR." |
| 137 | + ) |
| 138 | + source_profile = source_profile_dict[source_profile_name] |
| 139 | + target_profile = target_profile_dict[target_profile_name] |
| 140 | + skipped_source_format_ids = { |
| 141 | + cf.id for cf in skipped_source_incompatible_custom_formats |
| 142 | + } |
| 143 | + seen_custom_format_by_name = {cf.name: cf for cf in seen_custom_formats} |
| 144 | + source_profile_scores_by_format_name = { |
| 145 | + item.name: item.score for item in source_profile.formatItems |
| 146 | + } |
| 147 | + new_formats = [ |
| 148 | + QualityProfileFormatItem( |
| 149 | + format=item.id, |
| 150 | + name=item.name, |
| 151 | + score=source_profile_scores_by_format_name[item.name], |
| 152 | + ) |
| 153 | + for item in seen_custom_format_by_name.values() |
| 154 | + ] |
| 155 | + not_seen_but_used_formats = [ |
| 156 | + item |
| 157 | + for item in target_profile.formatItems |
| 158 | + if item.name not in seen_custom_format_by_name |
| 159 | + ] # These are needed since every custom format needs to be in the quality profile in order for the update to work |
| 160 | + all_new_formats = new_formats + not_seen_but_used_formats |
| 161 | + all_new_formats.sort(key=lambda item: item.format, reverse=True) |
| 162 | + await target_session.update_quality_profile( |
| 163 | + target_profile.id, |
| 164 | + QualityProfile( |
| 165 | + name=target_profile.name, |
| 166 | + id=target_profile.id, |
| 167 | + cutoff=target_profile.cutoff, |
| 168 | + cutoffFormatScore=target_profile.cutoffFormatScore, |
| 169 | + items=target_profile.items, |
| 170 | + minFormatScore=target_profile.minFormatScore, |
| 171 | + minUpgradeFormatScore=target_profile.minUpgradeFormatScore, |
| 172 | + language=target_profile.language, |
| 173 | + upgradeAllowed=target_profile.upgradeAllowed, |
| 174 | + formatItems=all_new_formats, |
| 175 | + ), |
| 176 | + ) |
| 177 | + print( |
| 178 | + f"Updated target quality profile '{target_profile.name}' with scores from source profile '{source_profile.name}'." |
| 179 | + ) |
| 180 | + |
| 181 | + |
| 182 | +@app.command() |
| 183 | +def copy_all_custom_formats_and_scores( |
| 184 | + source_arr_url: Annotated[ |
| 185 | + str, |
| 186 | + Argument(help="URL of the source ARR instance."), |
| 187 | + ], |
| 188 | + source_api_key: Annotated[ |
| 189 | + str, |
| 190 | + Argument(help="API key for the source ARR instance."), |
| 191 | + ], |
| 192 | + target_arr_url: Annotated[ |
| 193 | + str, |
| 194 | + Argument(help="URL of the target ARR instance."), |
| 195 | + ], |
| 196 | + target_api_key: Annotated[ |
| 197 | + str, |
| 198 | + Argument(help="API key for the target ARR instance."), |
| 199 | + ], |
| 200 | + source_profile_name: Annotated[ |
| 201 | + str | None, |
| 202 | + Option( |
| 203 | + help="Name of the quality profile in the source ARR to copy from if copying profile info.", |
| 204 | + ), |
| 205 | + ] = None, |
| 206 | + target_profile_name: Annotated[ |
| 207 | + str | None, |
| 208 | + Option( |
| 209 | + help="Name of the quality profile in the target ARR to update.", |
| 210 | + ), |
| 211 | + ] = None, |
| 212 | + delete_extra_custom_formats: Annotated[ |
| 213 | + bool, |
| 214 | + Option( |
| 215 | + help="Whether to delete custom formats in the target ARR that do not exist in the source ARR.", |
| 216 | + ), |
| 217 | + ] = False, |
| 218 | + extra_source_arr_headers: Annotated[ |
| 219 | + list[str] | None, |
| 220 | + Option( |
| 221 | + help="Additional headers to include in requests to the source ARR instance, in 'Key: Value' format. Specify option multiple times for multiple headers.", |
| 222 | + ), |
| 223 | + ] = None, |
| 224 | + extra_target_arr_headers: Annotated[ |
| 225 | + list[str] | None, |
| 226 | + Option( |
| 227 | + help="Additional headers to include in requests to the target ARR instance, in 'Key: Value' format. Specify option multiple times for multiple headers.", |
| 228 | + ), |
| 229 | + ] = None, |
| 230 | +): |
| 231 | + """Copy all custom formats and their scores from one ARR instance to another.""" |
| 232 | + print(extra_source_arr_headers, extra_target_arr_headers) |
| 233 | + run( |
| 234 | + copy_all_custom_formats_and_scores_inner( |
| 235 | + source_arr_url=source_arr_url, |
| 236 | + source_api_key=source_api_key, |
| 237 | + target_arr_url=target_arr_url, |
| 238 | + target_api_key=target_api_key, |
| 239 | + source_profile_name=source_profile_name, |
| 240 | + target_profile_name=target_profile_name, |
| 241 | + delete_extra_custom_formats=delete_extra_custom_formats, |
| 242 | + source_arr_headers={ |
| 243 | + k: v |
| 244 | + for header in extra_source_arr_headers or [] |
| 245 | + for k, v in [header.split(":", 1)] |
| 246 | + } |
| 247 | + if extra_source_arr_headers |
| 248 | + else None, |
| 249 | + target_arr_headers={ |
| 250 | + k: v |
| 251 | + for header in extra_target_arr_headers or [] |
| 252 | + for k, v in [header.split(":", 1)] |
| 253 | + } |
| 254 | + if extra_target_arr_headers |
| 255 | + else None, |
| 256 | + ) |
| 257 | + ) |
0 commit comments