diff --git a/acceptance/bundle/dms/databricks.yml b/acceptance/bundle/dms/databricks.yml new file mode 100644 index 0000000000..c21c8a9392 --- /dev/null +++ b/acceptance/bundle/dms/databricks.yml @@ -0,0 +1,7 @@ +bundle: + name: metadata-service-test + +resources: + jobs: + test_job: + name: test-job diff --git a/acceptance/bundle/dms/deploy-error/databricks.yml b/acceptance/bundle/dms/deploy-error/databricks.yml new file mode 100644 index 0000000000..4786eeddf7 --- /dev/null +++ b/acceptance/bundle/dms/deploy-error/databricks.yml @@ -0,0 +1,7 @@ +bundle: + name: metadata-service-error-test + +resources: + jobs: + test_job: + name: test-job diff --git a/acceptance/bundle/dms/deploy-error/out.test.toml b/acceptance/bundle/dms/deploy-error/out.test.toml new file mode 100644 index 0000000000..6ce208a048 --- /dev/null +++ b/acceptance/bundle/dms/deploy-error/out.test.toml @@ -0,0 +1,6 @@ +Local = true +Cloud = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["direct"] + DATABRICKS_BUNDLE_MANAGED_STATE = ["true"] diff --git a/acceptance/bundle/dms/deploy-error/output.txt b/acceptance/bundle/dms/deploy-error/output.txt new file mode 100644 index 0000000000..db0a3d43e1 --- /dev/null +++ b/acceptance/bundle/dms/deploy-error/output.txt @@ -0,0 +1,61 @@ + +>>> musterr [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/metadata-service-error-test/default/files... +Deploying resources... +Error: cannot create resources.jobs.test_job: Invalid job configuration. (400 INVALID_PARAMETER_VALUE) + +Endpoint: POST [DATABRICKS_URL]/api/2.2/jobs/create +HTTP Status: 400 Bad Request +API error_code: INVALID_PARAMETER_VALUE +API message: Invalid job configuration. + +Updating deployment state... + +>>> print_requests.py --get //bundle +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments", + "q": { + "deployment_id": "[UUID]" + }, + "body": { + "target_name": "default" + } +} +{ + "method": "GET", + "path": "/api/2.0/bundle/deployments/[UUID]" +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "1" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "version_type": "VERSION_TYPE_DEPLOY", + "target_name": "default" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/operations", + "q": { + "resource_key": "resources.jobs.test_job" + }, + "body": { + "resource_key": "resources.jobs.test_job", + "action_type": "OPERATION_ACTION_TYPE_CREATE", + "status": "OPERATION_STATUS_FAILED", + "error_message": "Invalid job configuration." + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/complete", + "body": { + "name": "deployments/[UUID]/versions/1", + "completion_reason": "VERSION_COMPLETE_FAILURE" + } +} diff --git a/acceptance/bundle/dms/deploy-error/script b/acceptance/bundle/dms/deploy-error/script new file mode 100644 index 0000000000..806beae3de --- /dev/null +++ b/acceptance/bundle/dms/deploy-error/script @@ -0,0 +1,5 @@ +# Deploy with the metadata service enabled, expecting a resource creation failure. +trace musterr $CLI bundle deploy + +# Print the metadata service requests to verify the failed operation is reported. +trace print_requests.py --get //bundle diff --git a/acceptance/bundle/dms/deploy-error/test.toml b/acceptance/bundle/dms/deploy-error/test.toml new file mode 100644 index 0000000000..9d7f2c1348 --- /dev/null +++ b/acceptance/bundle/dms/deploy-error/test.toml @@ -0,0 +1,8 @@ +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] +EnvMatrix.DATABRICKS_BUNDLE_MANAGED_STATE = ["true"] +RecordRequests = true + +[[Server]] +Pattern = "POST /api/2.2/jobs/create" +Response.StatusCode = 400 +Response.Body = '{"error_code": "INVALID_PARAMETER_VALUE", "message": "Invalid job configuration."}' diff --git a/acceptance/bundle/dms/initial-register/databricks.yml b/acceptance/bundle/dms/initial-register/databricks.yml new file mode 100644 index 0000000000..04d35740b6 --- /dev/null +++ b/acceptance/bundle/dms/initial-register/databricks.yml @@ -0,0 +1,7 @@ +bundle: + name: initial-register-test + +resources: + jobs: + test_job: + name: test-job diff --git a/acceptance/bundle/dms/initial-register/out.test.toml b/acceptance/bundle/dms/initial-register/out.test.toml new file mode 100644 index 0000000000..991ce54dfa --- /dev/null +++ b/acceptance/bundle/dms/initial-register/out.test.toml @@ -0,0 +1,6 @@ +Local = true +Cloud = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["direct"] + DATABRICKS_BUNDLE_MANAGED_STATE = ["false"] diff --git a/acceptance/bundle/dms/initial-register/output.txt b/acceptance/bundle/dms/initial-register/output.txt new file mode 100644 index 0000000000..b48d0b6630 --- /dev/null +++ b/acceptance/bundle/dms/initial-register/output.txt @@ -0,0 +1,61 @@ + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/initial-register-test/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/initial-register-test/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +>>> print_requests.py --get //bundle +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments", + "q": { + "deployment_id": "[UUID]" + }, + "body": { + "target_name": "default" + } +} +{ + "method": "GET", + "path": "/api/2.0/bundle/deployments/[UUID]" +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "1" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "version_type": "VERSION_TYPE_DEPLOY", + "target_name": "default" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/operations", + "q": { + "resource_key": "resources.jobs.test_job" + }, + "body": { + "resource_key": "resources.jobs.test_job", + "action_type": "OPERATION_ACTION_TYPE_INITIAL_REGISTER", + "resource_id": "[NUMID]", + "status": "OPERATION_STATUS_SUCCEEDED" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/complete", + "body": { + "name": "deployments/[UUID]/versions/1", + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} diff --git a/acceptance/bundle/dms/initial-register/script b/acceptance/bundle/dms/initial-register/script new file mode 100644 index 0000000000..a971338f43 --- /dev/null +++ b/acceptance/bundle/dms/initial-register/script @@ -0,0 +1,11 @@ +# First deploy without metadata service - creates the job using filesystem lock. +trace $CLI bundle deploy + +# Enable metadata service and deploy again. +# Since this is the first DMS deployment (version 1) and the job hasn't changed +# (Skip action), it should be reported as INITIAL_REGISTER. +export DATABRICKS_BUNDLE_MANAGED_STATE=true +trace $CLI bundle deploy + +# Print metadata service requests from the second deploy. +trace print_requests.py --get //bundle diff --git a/acceptance/bundle/dms/initial-register/test.toml b/acceptance/bundle/dms/initial-register/test.toml new file mode 100644 index 0000000000..270de6ce44 --- /dev/null +++ b/acceptance/bundle/dms/initial-register/test.toml @@ -0,0 +1,5 @@ +# Override parent: start without managed state so the first deploy uses filesystem lock. +# The script enables managed state before the second deploy. +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] +EnvMatrix.DATABRICKS_BUNDLE_MANAGED_STATE = ["false"] +RecordRequests = true diff --git a/acceptance/bundle/dms/out.test.toml b/acceptance/bundle/dms/out.test.toml new file mode 100644 index 0000000000..6ce208a048 --- /dev/null +++ b/acceptance/bundle/dms/out.test.toml @@ -0,0 +1,6 @@ +Local = true +Cloud = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["direct"] + DATABRICKS_BUNDLE_MANAGED_STATE = ["true"] diff --git a/acceptance/bundle/dms/output.txt b/acceptance/bundle/dms/output.txt new file mode 100644 index 0000000000..536622a866 --- /dev/null +++ b/acceptance/bundle/dms/output.txt @@ -0,0 +1,117 @@ + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/metadata-service-test/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +>>> print_requests.py --get //bundle +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments", + "q": { + "deployment_id": "[UUID]" + }, + "body": { + "target_name": "default" + } +} +{ + "method": "GET", + "path": "/api/2.0/bundle/deployments/[UUID]" +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "1" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "version_type": "VERSION_TYPE_DEPLOY", + "target_name": "default" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/operations", + "q": { + "resource_key": "resources.jobs.test_job" + }, + "body": { + "resource_key": "resources.jobs.test_job", + "action_type": "OPERATION_ACTION_TYPE_CREATE", + "resource_id": "[NUMID]", + "status": "OPERATION_STATUS_SUCCEEDED" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/complete", + "body": { + "name": "deployments/[UUID]/versions/1", + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} + +>>> [CLI] bundle destroy --auto-approve +The following resources will be deleted: + delete resources.jobs.test_job + +All files and directories at the following location will be deleted: /Workspace/Users/[USERNAME]/.bundle/metadata-service-test/default + +Deleting files... +Destroy complete! + +>>> print_requests.py --get //bundle +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments", + "q": { + "deployment_id": "[UUID]" + }, + "body": { + "target_name": "default" + } +} +{ + "method": "GET", + "path": "/api/2.0/bundle/deployments/[UUID]" +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "2" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "version_type": "VERSION_TYPE_DESTROY", + "target_name": "default" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/2/operations", + "q": { + "resource_key": "resources.jobs.test_job" + }, + "body": { + "resource_key": "resources.jobs.test_job", + "action_type": "OPERATION_ACTION_TYPE_DELETE", + "resource_id": "[NUMID]", + "status": "OPERATION_STATUS_SUCCEEDED" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/2/complete", + "body": { + "name": "deployments/[UUID]/versions/2", + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} +{ + "method": "DELETE", + "path": "/api/2.0/bundle/deployments/[UUID]" +} diff --git a/acceptance/bundle/dms/release-lock-error/databricks.yml b/acceptance/bundle/dms/release-lock-error/databricks.yml new file mode 100644 index 0000000000..94323b84d9 --- /dev/null +++ b/acceptance/bundle/dms/release-lock-error/databricks.yml @@ -0,0 +1,11 @@ +bundle: + name: dms-release-lock-error + +targets: + fail-complete: + default: true + +resources: + jobs: + test_job: + name: test-job diff --git a/acceptance/bundle/dms/release-lock-error/out.test.toml b/acceptance/bundle/dms/release-lock-error/out.test.toml new file mode 100644 index 0000000000..6ce208a048 --- /dev/null +++ b/acceptance/bundle/dms/release-lock-error/out.test.toml @@ -0,0 +1,6 @@ +Local = true +Cloud = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["direct"] + DATABRICKS_BUNDLE_MANAGED_STATE = ["true"] diff --git a/acceptance/bundle/dms/release-lock-error/output.txt b/acceptance/bundle/dms/release-lock-error/output.txt new file mode 100644 index 0000000000..253593e8d9 --- /dev/null +++ b/acceptance/bundle/dms/release-lock-error/output.txt @@ -0,0 +1,56 @@ + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/dms-release-lock-error/fail-complete/files... +Deploying resources... +Updating deployment state... +Deployment complete! +Warn: Failed to release deployment lock: complete version: simulated complete version failure + +>>> print_requests.py --get //bundle +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments", + "q": { + "deployment_id": "[UUID]" + }, + "body": { + "target_name": "fail-complete" + } +} +{ + "method": "GET", + "path": "/api/2.0/bundle/deployments/[UUID]" +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "1" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "version_type": "VERSION_TYPE_DEPLOY", + "target_name": "fail-complete" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/operations", + "q": { + "resource_key": "resources.jobs.test_job" + }, + "body": { + "resource_key": "resources.jobs.test_job", + "action_type": "OPERATION_ACTION_TYPE_CREATE", + "resource_id": "[NUMID]", + "status": "OPERATION_STATUS_SUCCEEDED" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/complete", + "body": { + "name": "deployments/[UUID]/versions/1", + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} diff --git a/acceptance/bundle/dms/release-lock-error/script b/acceptance/bundle/dms/release-lock-error/script new file mode 100644 index 0000000000..deff401bef --- /dev/null +++ b/acceptance/bundle/dms/release-lock-error/script @@ -0,0 +1,8 @@ +# Deploy with the metadata service enabled. +# The target name "fail-complete" triggers a simulated error on the +# CompleteVersion endpoint (release lock), so deploy should warn about +# the failed lock release. +trace $CLI bundle deploy + +# Print the metadata service requests to verify the lock release was attempted. +trace print_requests.py --get //bundle diff --git a/acceptance/bundle/dms/release-lock-error/test.toml b/acceptance/bundle/dms/release-lock-error/test.toml new file mode 100644 index 0000000000..1910e96135 --- /dev/null +++ b/acceptance/bundle/dms/release-lock-error/test.toml @@ -0,0 +1,2 @@ +# Override target to "fail-complete" which makes the test server's +# CompleteVersion endpoint return an error, simulating a release failure. diff --git a/acceptance/bundle/dms/script b/acceptance/bundle/dms/script new file mode 100644 index 0000000000..fa4f54bd57 --- /dev/null +++ b/acceptance/bundle/dms/script @@ -0,0 +1,11 @@ +# Deploy with the metadata service enabled. +trace $CLI bundle deploy + +# Print all metadata service requests made during deploy. +trace print_requests.py --get //bundle + +# Destroy with the metadata service enabled. +trace $CLI bundle destroy --auto-approve + +# Print all metadata service requests made during destroy. +trace print_requests.py --get //bundle diff --git a/acceptance/bundle/dms/sequential-deploys/databricks.yml b/acceptance/bundle/dms/sequential-deploys/databricks.yml new file mode 100644 index 0000000000..0d7c1fb63b --- /dev/null +++ b/acceptance/bundle/dms/sequential-deploys/databricks.yml @@ -0,0 +1,7 @@ +bundle: + name: sequential-deploys-test + +resources: + jobs: + test_job: + name: test-job diff --git a/acceptance/bundle/dms/sequential-deploys/out.test.toml b/acceptance/bundle/dms/sequential-deploys/out.test.toml new file mode 100644 index 0000000000..6ce208a048 --- /dev/null +++ b/acceptance/bundle/dms/sequential-deploys/out.test.toml @@ -0,0 +1,6 @@ +Local = true +Cloud = false + +[EnvMatrix] + DATABRICKS_BUNDLE_ENGINE = ["direct"] + DATABRICKS_BUNDLE_MANAGED_STATE = ["true"] diff --git a/acceptance/bundle/dms/sequential-deploys/output.txt b/acceptance/bundle/dms/sequential-deploys/output.txt new file mode 100644 index 0000000000..35dbc6626f --- /dev/null +++ b/acceptance/bundle/dms/sequential-deploys/output.txt @@ -0,0 +1,135 @@ + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/sequential-deploys-test/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/sequential-deploys-test/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +>>> [CLI] bundle deploy +Uploading bundle files to /Workspace/Users/[USERNAME]/.bundle/sequential-deploys-test/default/files... +Deploying resources... +Updating deployment state... +Deployment complete! + +>>> print_requests.py --get //bundle +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments", + "q": { + "deployment_id": "[UUID]" + }, + "body": { + "target_name": "default" + } +} +{ + "method": "GET", + "path": "/api/2.0/bundle/deployments/[UUID]" +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "1" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "version_type": "VERSION_TYPE_DEPLOY", + "target_name": "default" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/operations", + "q": { + "resource_key": "resources.jobs.test_job" + }, + "body": { + "resource_key": "resources.jobs.test_job", + "action_type": "OPERATION_ACTION_TYPE_CREATE", + "resource_id": "[NUMID]", + "status": "OPERATION_STATUS_SUCCEEDED" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/1/complete", + "body": { + "name": "deployments/[UUID]/versions/1", + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments", + "q": { + "deployment_id": "[UUID]" + }, + "body": { + "target_name": "default" + } +} +{ + "method": "GET", + "path": "/api/2.0/bundle/deployments/[UUID]" +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "2" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "version_type": "VERSION_TYPE_DEPLOY", + "target_name": "default" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/2/complete", + "body": { + "name": "deployments/[UUID]/versions/2", + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments", + "q": { + "deployment_id": "[UUID]" + }, + "body": { + "target_name": "default" + } +} +{ + "method": "GET", + "path": "/api/2.0/bundle/deployments/[UUID]" +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions", + "q": { + "version_id": "3" + }, + "body": { + "cli_version": "[DEV_VERSION]", + "version_type": "VERSION_TYPE_DEPLOY", + "target_name": "default" + } +} +{ + "method": "POST", + "path": "/api/2.0/bundle/deployments/[UUID]/versions/3/complete", + "body": { + "name": "deployments/[UUID]/versions/3", + "completion_reason": "VERSION_COMPLETE_SUCCESS" + } +} diff --git a/acceptance/bundle/dms/sequential-deploys/script b/acceptance/bundle/dms/sequential-deploys/script new file mode 100644 index 0000000000..42850217b8 --- /dev/null +++ b/acceptance/bundle/dms/sequential-deploys/script @@ -0,0 +1,7 @@ +# Deploy three times in sequence to verify version numbers increment. +trace $CLI bundle deploy +trace $CLI bundle deploy +trace $CLI bundle deploy + +# Print metadata service requests. Version IDs should be 1, 2, 3. +trace print_requests.py --get //bundle diff --git a/acceptance/bundle/dms/test.toml b/acceptance/bundle/dms/test.toml new file mode 100644 index 0000000000..4cebdfc83a --- /dev/null +++ b/acceptance/bundle/dms/test.toml @@ -0,0 +1,3 @@ +EnvMatrix.DATABRICKS_BUNDLE_ENGINE = ["direct"] +EnvMatrix.DATABRICKS_BUNDLE_MANAGED_STATE = ["true"] +RecordRequests = true diff --git a/bundle/bundle.go b/bundle/bundle.go index 97824eb839..2d6e691886 100644 --- a/bundle/bundle.go +++ b/bundle/bundle.go @@ -225,10 +225,10 @@ func TryLoad(ctx context.Context) *Bundle { return b } -func (b *Bundle) WorkspaceClientE() (*databricks.WorkspaceClient, error) { +func (b *Bundle) WorkspaceClientE(ctx context.Context) (*databricks.WorkspaceClient, error) { b.clientOnce.Do(func() { var err error - b.client, err = b.Config.Workspace.Client() + b.client, err = b.Config.Workspace.Client(ctx) if err != nil { b.clientErr = fmt.Errorf("cannot resolve bundle auth configuration: %w", err) } @@ -238,7 +238,7 @@ func (b *Bundle) WorkspaceClientE() (*databricks.WorkspaceClient, error) { } func (b *Bundle) WorkspaceClient() *databricks.WorkspaceClient { - client, err := b.WorkspaceClientE() + client, err := b.WorkspaceClientE(context.TODO()) if err != nil { panic(err) } diff --git a/bundle/bundle_test.go b/bundle/bundle_test.go index fa8282f343..b571cf8e16 100644 --- a/bundle/bundle_test.go +++ b/bundle/bundle_test.go @@ -184,12 +184,13 @@ func TestClearWorkspaceClient(t *testing.T) { b.Config.Workspace.Host = "https://nonexistent.example.com" b.Config.Workspace.Profile = "profile-A" - _, err1 := b.WorkspaceClientE() + ctx := t.Context() + _, err1 := b.WorkspaceClientE(ctx) require.Error(t, err1) assert.Contains(t, err1.Error(), "profile-A") // Without retry, second call returns the same cached error (same object). - _, err1b := b.WorkspaceClientE() + _, err1b := b.WorkspaceClientE(ctx) assert.Same(t, err1, err1b, "expected same cached error without retry") // After retry, change the profile to "profile-B" and call again. @@ -197,7 +198,7 @@ func TestClearWorkspaceClient(t *testing.T) { b.ClearWorkspaceClient() b.Config.Workspace.Profile = "profile-B" - _, err2 := b.WorkspaceClientE() + _, err2 := b.WorkspaceClientE(ctx) require.Error(t, err2) assert.Contains(t, err2.Error(), "profile-B", "expected re-execution to pick up new profile") assert.NotContains(t, err2.Error(), "profile-A", "stale cached error should not appear") diff --git a/bundle/config/workspace.go b/bundle/config/workspace.go index c699dc070b..608c8aab63 100644 --- a/bundle/config/workspace.go +++ b/bundle/config/workspace.go @@ -1,11 +1,14 @@ package config import ( + "context" + "net/http" "os" "path/filepath" "github.com/databricks/cli/libs/auth" "github.com/databricks/cli/libs/databrickscfg" + "github.com/databricks/cli/libs/env" "github.com/databricks/databricks-sdk-go" "github.com/databricks/databricks-sdk-go/config" "github.com/databricks/databricks-sdk-go/marshal" @@ -156,7 +159,7 @@ func (w *Workspace) NormalizeHostURL() { } } -func (w *Workspace) Client() (*databricks.WorkspaceClient, error) { +func (w *Workspace) Client(ctx context.Context) (*databricks.WorkspaceClient, error) { // Extract query parameters (?o=, ?a=) from the host URL before building // the SDK config. This ensures workspace_id and account_id are available // for profile resolution during EnsureResolved(). @@ -193,9 +196,34 @@ func (w *Workspace) Client() (*databricks.WorkspaceClient, error) { } } + // If DATABRICKS_LITESWAP_ID is set, wrap the transport to inject the + // x-databricks-traffic-id header for routing to the liteswap instance. + if liteswapID := env.Get(ctx, "DATABRICKS_LITESWAP_ID"); liteswapID != "" { + inner := cfg.HTTPTransport + if inner == nil { + inner = http.DefaultTransport + } + cfg.HTTPTransport = &liteswapTransport{ + inner: inner, + trafficID: "testenv://liteswap/" + liteswapID, + } + } + return databricks.NewWorkspaceClient((*databricks.Config)(cfg)) } +// liteswapTransport injects the x-databricks-traffic-id header to route +// requests to a liteswap service instance. +type liteswapTransport struct { + inner http.RoundTripper + trafficID string +} + +func (t *liteswapTransport) RoundTrip(req *http.Request) (*http.Response, error) { + req.Header.Set("x-databricks-traffic-id", t.trafficID) + return t.inner.RoundTrip(req) +} + func init() { arg0 := os.Args[0] diff --git a/bundle/config/workspace_test.go b/bundle/config/workspace_test.go index 4181d17170..4d87503a03 100644 --- a/bundle/config/workspace_test.go +++ b/bundle/config/workspace_test.go @@ -34,7 +34,7 @@ func TestWorkspaceResolveProfileFromHost(t *testing.T) { t.Run("no config file", func(t *testing.T) { setupWorkspaceTest(t) - _, err := w.Client() + _, err := w.Client(t.Context()) assert.NoError(t, err) }) @@ -49,7 +49,7 @@ func TestWorkspaceResolveProfileFromHost(t *testing.T) { }) require.NoError(t, err) - client, err := w.Client() + client, err := w.Client(t.Context()) assert.NoError(t, err) assert.Equal(t, "default", client.Config.Profile) }) @@ -67,7 +67,7 @@ func TestWorkspaceResolveProfileFromHost(t *testing.T) { require.NoError(t, err) t.Setenv("DATABRICKS_CONFIG_FILE", filepath.Join(home, "customcfg")) - client, err := w.Client() + client, err := w.Client(t.Context()) assert.NoError(t, err) assert.Equal(t, "custom", client.Config.Profile) }) @@ -149,7 +149,7 @@ func TestWorkspaceClientNormalizesHostBeforeProfileResolution(t *testing.T) { w := Workspace{ Host: "https://spog.databricks.com/?o=222", } - client, err := w.Client() + client, err := w.Client(t.Context()) require.NoError(t, err) assert.Equal(t, "ws2", client.Config.Profile) } @@ -165,7 +165,7 @@ func TestWorkspaceVerifyProfileForHost(t *testing.T) { t.Run("no config file", func(t *testing.T) { setupWorkspaceTest(t) - _, err := w.Client() + _, err := w.Client(t.Context()) assert.ErrorIs(t, err, fs.ErrNotExist) }) @@ -179,7 +179,7 @@ func TestWorkspaceVerifyProfileForHost(t *testing.T) { }) require.NoError(t, err) - _, err = w.Client() + _, err = w.Client(t.Context()) assert.NoError(t, err) }) @@ -193,7 +193,7 @@ func TestWorkspaceVerifyProfileForHost(t *testing.T) { }) require.NoError(t, err) - _, err = w.Client() + _, err = w.Client(t.Context()) assert.ErrorContains(t, err, "doesn’t match the host configured in the bundle") }) @@ -209,7 +209,7 @@ func TestWorkspaceVerifyProfileForHost(t *testing.T) { require.NoError(t, err) t.Setenv("DATABRICKS_CONFIG_FILE", filepath.Join(home, "customcfg")) - _, err = w.Client() + _, err = w.Client(t.Context()) assert.NoError(t, err) }) @@ -225,7 +225,7 @@ func TestWorkspaceVerifyProfileForHost(t *testing.T) { require.NoError(t, err) t.Setenv("DATABRICKS_CONFIG_FILE", filepath.Join(home, "customcfg")) - _, err = w.Client() + _, err = w.Client(t.Context()) assert.ErrorContains(t, err, "doesn’t match the host configured in the bundle") }) } diff --git a/bundle/deploy/lock/acquire.go b/bundle/deploy/lock/acquire.go deleted file mode 100644 index d4f788c3ca..0000000000 --- a/bundle/deploy/lock/acquire.go +++ /dev/null @@ -1,69 +0,0 @@ -package lock - -import ( - "context" - "errors" - "io/fs" - - "github.com/databricks/cli/bundle" - "github.com/databricks/cli/bundle/permissions" - "github.com/databricks/cli/libs/diag" - "github.com/databricks/cli/libs/locker" - "github.com/databricks/cli/libs/log" -) - -type acquire struct{} - -func Acquire() bundle.Mutator { - return &acquire{} -} - -func (m *acquire) Name() string { - return "lock:acquire" -} - -func (m *acquire) init(b *bundle.Bundle) error { - user := b.Config.Workspace.CurrentUser.UserName - dir := b.Config.Workspace.StatePath - l, err := locker.CreateLocker(user, dir, b.WorkspaceClient()) - if err != nil { - return err - } - - b.Locker = l - return nil -} - -func (m *acquire) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { - // Return early if locking is disabled. - if !b.Config.Bundle.Deployment.Lock.IsEnabled() { - log.Infof(ctx, "Skipping; locking is disabled") - return nil - } - - err := m.init(b) - if err != nil { - return diag.FromErr(err) - } - - force := b.Config.Bundle.Deployment.Lock.Force - log.Infof(ctx, "Acquiring deployment lock (force: %v)", force) - err = b.Locker.Lock(ctx, force) - if err != nil { - log.Errorf(ctx, "Failed to acquire deployment lock: %v", err) - - if errors.Is(err, fs.ErrPermission) { - return permissions.ReportPossiblePermissionDenied(ctx, b, b.Config.Workspace.StatePath) - } - - if errors.Is(err, fs.ErrNotExist) { - // If we get a "doesn't exist" error from the API this indicates - // we either don't have permissions or the path is invalid. - return permissions.ReportPossiblePermissionDenied(ctx, b, b.Config.Workspace.StatePath) - } - - return diag.FromErr(err) - } - - return nil -} diff --git a/bundle/deploy/lock/deployment_metadata_service.go b/bundle/deploy/lock/deployment_metadata_service.go new file mode 100644 index 0000000000..9df6d59f99 --- /dev/null +++ b/bundle/deploy/lock/deployment_metadata_service.go @@ -0,0 +1,397 @@ +package lock + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "io/fs" + "net/http" + "strconv" + "strings" + "time" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/deploy" + "github.com/databricks/cli/bundle/deployplan" + "github.com/databricks/cli/bundle/direct" + "github.com/databricks/cli/bundle/direct/dstate" + "github.com/databricks/cli/internal/build" + "github.com/databricks/cli/libs/filer" + "github.com/databricks/cli/libs/log" + "github.com/databricks/cli/libs/tmpdms" + "github.com/databricks/databricks-sdk-go/apierr" + "github.com/google/uuid" +) + +const defaultHeartbeatInterval = 30 * time.Second + +type metadataServiceLock struct { + b *bundle.Bundle + versionType tmpdms.VersionType + + svc *tmpdms.DeploymentMetadataAPI + deploymentID string + versionID string + + stopHeartbeat func() + +} + +func newMetadataServiceLock(b *bundle.Bundle, versionType tmpdms.VersionType) *metadataServiceLock { + return &metadataServiceLock{b: b, versionType: versionType} +} + +func (l *metadataServiceLock) Acquire(ctx context.Context) error { + if l.b.Config.Bundle.Deployment.Lock.Force { + return fmt.Errorf("force lock is not supported with the deployment metadata service") + } + + svc, err := tmpdms.NewDeploymentMetadataAPI(l.b.WorkspaceClient()) + if err != nil { + return fmt.Errorf("failed to create metadata service client: %w", err) + } + l.svc = svc + + deploymentID, versionID, lastVersionID, err := acquireLock(ctx, l.b, svc, l.versionType) + if err != nil { + return err + } + + l.deploymentID = deploymentID + l.versionID = versionID + l.stopHeartbeat = startHeartbeat(ctx, svc, deploymentID, versionID) + + // Store the last version ID on the bundle so that the plan can record + // which deployment version it was computed against (OCC). + l.b.DeploymentBundle.DeploymentVersion = lastVersionID + + // Set the initial registration reporter if this is the first DMS deployment. + // This will be called after the state DB is loaded (during plan computation) + // to register all existing resources before any CRUD operations. + if versionID == "1" { + l.b.DeploymentBundle.InitialRegistrationReporter = makeInitialRegistrationReporter(svc, deploymentID, versionID) + } + + l.b.DeploymentBundle.OperationReporter = makeOperationReporter(svc, deploymentID, versionID) + return nil +} + +func (l *metadataServiceLock) Release(ctx context.Context, status DeploymentStatus) error { + if l.stopHeartbeat != nil { + l.stopHeartbeat() + } + + reason := tmpdms.VersionCompleteSuccess + if status == DeploymentFailure { + reason = tmpdms.VersionCompleteFailure + } + + // Use a separate context for cleanup so the lock is released even if the + // parent context was cancelled (e.g. user hit Ctrl+C). + cleanupCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) //nolint:gocritic // Intentional: cleanup must survive parent cancellation + defer cancel() + + _, completeErr := l.svc.CompleteVersion(cleanupCtx, tmpdms.CompleteVersionRequest{ + DeploymentID: l.deploymentID, + VersionID: l.versionID, + Name: fmt.Sprintf("deployments/%s/versions/%s", l.deploymentID, l.versionID), + CompletionReason: reason, + }) + if completeErr == nil { + log.Infof(ctx, "Released deployment lock: deployment=%s version=%s reason=%s", l.deploymentID, l.versionID, reason) + } + + // For destroy operations, delete the deployment record after + // successfully releasing the lock. + if status == DeploymentSuccess && l.versionType == tmpdms.VersionTypeDestroy { + _, deleteErr := l.svc.DeleteDeployment(cleanupCtx, tmpdms.DeleteDeploymentRequest{ + DeploymentID: l.deploymentID, + }) + if deleteErr != nil { + log.Warnf(ctx, "Failed to delete deployment: %v", deleteErr) + } + } + + return completeErr +} + +// acquireLock implements the lock acquisition protocol using the deployment +// metadata service: resolve deployment ID, ensure deployment, create version. +// The returned lastVersionID is the deployment's last_version_id before this +// version was created (empty string for brand-new deployments). +func acquireLock(ctx context.Context, b *bundle.Bundle, svc *tmpdms.DeploymentMetadataAPI, versionType tmpdms.VersionType) (deploymentID string, versionID string, lastVersionID string, err error) { + deploymentID, err = resolveDeploymentID(ctx, b) + if err != nil { + return "", "", "", err + } + + // Ensure the deployment exists in the metadata service. + _, createErr := svc.CreateDeployment(ctx, tmpdms.CreateDeploymentRequest{ + DeploymentID: deploymentID, + Deployment: &tmpdms.Deployment{ + TargetName: b.Config.Bundle.Target, + }, + }) + if createErr != nil && !isAlreadyExists(createErr) { + return "", "", "", fmt.Errorf("failed to create deployment: %w", createErr) + } + + // Get the deployment to determine the next version ID. + dep, getErr := svc.GetDeployment(ctx, tmpdms.GetDeploymentRequest{ + DeploymentID: deploymentID, + }) + if getErr != nil { + return "", "", "", fmt.Errorf("failed to get deployment: %w", getErr) + } + + lastVersionID = dep.LastVersionID + if lastVersionID == "" { + versionID = "1" + } else { + lastVersion, parseErr := strconv.ParseInt(lastVersionID, 10, 64) + if parseErr != nil { + return "", "", "", fmt.Errorf("failed to parse last_version_id %q: %w", lastVersionID, parseErr) + } + versionID = strconv.FormatInt(lastVersion+1, 10) + } + + // Create a version to acquire the deployment lock. + version, versionErr := svc.CreateVersion(ctx, tmpdms.CreateVersionRequest{ + DeploymentID: deploymentID, + Parent: "deployments/" + deploymentID, + VersionID: versionID, + Version: &tmpdms.Version{ + CliVersion: build.GetInfo().Version, + VersionType: versionType, + TargetName: b.Config.Bundle.Target, + }, + }) + if versionErr != nil { + return "", "", "", fmt.Errorf("failed to acquire deployment lock: %w", versionErr) + } + + log.Infof(ctx, "Acquired deployment lock: deployment=%s version=%s", deploymentID, version.VersionID) + return deploymentID, versionID, lastVersionID, nil +} + +const deploymentIDFilename = "_deployment_id" + +// resolveDeploymentID reads the deployment ID from the workspace state directory. +// It first checks for a _deployment_id file. If not found, it falls back to +// reading the lineage from resources.json (for first-time DMS migration). +// The resolved ID is written to _deployment_id in the workspace for future use. +func resolveDeploymentID(ctx context.Context, b *bundle.Bundle) (string, error) { + f, err := deploy.StateFiler(b) + if err != nil { + return "", fmt.Errorf("failed to create state filer: %w", err) + } + + // Try reading _deployment_id from the workspace state directory. + deploymentID, err := readDeploymentIDFile(ctx, f) + if err != nil { + return "", err + } + if deploymentID != "" { + return deploymentID, nil + } + + // Fall back to reading lineage from resources.json in the workspace. + deploymentID, err = readLineageFromResourcesJSON(ctx, f) + if err != nil { + return "", err + } + if deploymentID == "" { + // Fresh deployment: generate a new ID. + deploymentID = uuid.New().String() + } + + // Persist the deployment ID to the workspace for future deployments. + err = f.Write(ctx, deploymentIDFilename, strings.NewReader(deploymentID), filer.CreateParentDirectories, filer.OverwriteIfExists) + if err != nil { + return "", fmt.Errorf("failed to write %s to workspace: %w", deploymentIDFilename, err) + } + + return deploymentID, nil +} + +// readDeploymentIDFile reads the _deployment_id file from the workspace. +// Returns ("", nil) if the file does not exist. +func readDeploymentIDFile(ctx context.Context, f filer.Filer) (string, error) { + reader, err := f.Read(ctx, deploymentIDFilename) + if errors.Is(err, fs.ErrNotExist) { + return "", nil + } + if err != nil { + return "", fmt.Errorf("failed to read %s from workspace: %w", deploymentIDFilename, err) + } + defer reader.Close() + + data, err := io.ReadAll(reader) + if err != nil { + return "", fmt.Errorf("failed to read %s content: %w", deploymentIDFilename, err) + } + + id := strings.TrimSpace(string(data)) + if id == "" { + log.Warnf(ctx, "Found empty %s in workspace, falling back to resources.json", deploymentIDFilename) + return "", nil + } + return id, nil +} + +// readLineageFromResourcesJSON reads the lineage field from resources.json in the workspace. +// Returns ("", nil) if the file does not exist or has no lineage. +func readLineageFromResourcesJSON(ctx context.Context, f filer.Filer) (string, error) { + reader, err := f.Read(ctx, "resources.json") + if errors.Is(err, fs.ErrNotExist) { + return "", nil + } + if err != nil { + return "", fmt.Errorf("failed to read resources.json from workspace: %w", err) + } + defer reader.Close() + + data, err := io.ReadAll(reader) + if err != nil { + return "", fmt.Errorf("failed to read resources.json content: %w", err) + } + + var db dstate.Database + if err := json.Unmarshal(data, &db); err != nil { + return "", fmt.Errorf("failed to parse resources.json: %w", err) + } + return db.Lineage, nil +} + +// makeInitialRegistrationReporter returns a reporter that registers a single +// existing resource with the deployment metadata service. +func makeInitialRegistrationReporter(svc *tmpdms.DeploymentMetadataAPI, deploymentID, versionID string) direct.InitialRegistrationReporter { + return func(ctx context.Context, resourceKey, resourceID string) error { + _, err := svc.CreateOperation(ctx, tmpdms.CreateOperationRequest{ + DeploymentID: deploymentID, + VersionID: versionID, + Parent: fmt.Sprintf("deployments/%s/versions/%s", deploymentID, versionID), + ResourceKey: resourceKey, + Operation: &tmpdms.Operation{ + ResourceKey: resourceKey, + ResourceID: resourceID, + Status: tmpdms.OperationStatusSucceeded, + ActionType: tmpdms.OperationActionTypeInitRegister, + }, + }) + if err != nil { + return fmt.Errorf("registering existing resource %s: %w", resourceKey, err) + } + return nil + } +} + +// makeOperationReporter returns an OperationReporter that reports each resource +// operation (success or failure) to the deployment metadata service. +func makeOperationReporter(svc *tmpdms.DeploymentMetadataAPI, deploymentID, versionID string) direct.OperationReporter { + return func( + ctx context.Context, + resourceKey string, + resourceID string, + action deployplan.ActionType, + operationErr error, + ) error { + actionType, err := planActionToOperationAction(action) + if err != nil { + return fmt.Errorf("mapping action for resource %s: %w", resourceKey, err) + } + if actionType == "" { + return nil + } + + status := tmpdms.OperationStatusSucceeded + var errorMessage string + if operationErr != nil { + status = tmpdms.OperationStatusFailed + errorMessage = operationErr.Error() + } + + _, err = svc.CreateOperation(ctx, tmpdms.CreateOperationRequest{ + DeploymentID: deploymentID, + VersionID: versionID, + Parent: fmt.Sprintf("deployments/%s/versions/%s", deploymentID, versionID), + ResourceKey: resourceKey, + Operation: &tmpdms.Operation{ + ResourceKey: resourceKey, + ResourceID: resourceID, + Status: status, + ActionType: actionType, + ErrorMessage: errorMessage, + }, + }) + if err != nil { + return fmt.Errorf("reporting operation for resource %s: %w", resourceKey, err) + } + return nil + } +} + +// planActionToOperationAction maps a deploy plan action to a metadata service +// operation action type. No-op actions like Skip return ("", nil) and should +// be ignored. Initial registration is handled separately by makeInitialRegistrationReporter. +func planActionToOperationAction(action deployplan.ActionType) (tmpdms.OperationActionType, error) { + switch action { + case deployplan.Skip: + return "", nil + case deployplan.Create: + return tmpdms.OperationActionTypeCreate, nil + case deployplan.Update: + return tmpdms.OperationActionTypeUpdate, nil + case deployplan.UpdateWithID: + return tmpdms.OperationActionTypeUpdateWithID, nil + case deployplan.Delete: + return tmpdms.OperationActionTypeDelete, nil + case deployplan.Recreate: + return tmpdms.OperationActionTypeRecreate, nil + case deployplan.Resize: + return tmpdms.OperationActionTypeResize, nil + default: + return "", fmt.Errorf("unsupported operation action type: %s", action) + } +} + +// startHeartbeat starts a background goroutine that sends heartbeats to keep +// the deployment lock alive. Returns a cancel function to stop the heartbeat. +func startHeartbeat(ctx context.Context, svc *tmpdms.DeploymentMetadataAPI, deploymentID, versionID string) context.CancelFunc { + ctx, cancel := context.WithCancel(ctx) + + go func() { + ticker := time.NewTicker(defaultHeartbeatInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + _, err := svc.Heartbeat(ctx, tmpdms.HeartbeatRequest{ + DeploymentID: deploymentID, + VersionID: versionID, + }) + if err != nil { + log.Warnf(ctx, "Failed to send deployment heartbeat: %v", err) + } else { + log.Debugf(ctx, "Deployment heartbeat sent for deployment=%s version=%s", deploymentID, versionID) + } + } + } + }() + + return cancel +} + +// isAlreadyExists checks if an error indicates the resource already exists (HTTP 409). +func isAlreadyExists(err error) bool { + var apiErr *apierr.APIError + if errors.As(err, &apiErr) && apiErr.StatusCode == http.StatusConflict { + return true + } + return false +} diff --git a/bundle/deploy/lock/deployment_metadata_service_test.go b/bundle/deploy/lock/deployment_metadata_service_test.go new file mode 100644 index 0000000000..433d403a64 --- /dev/null +++ b/bundle/deploy/lock/deployment_metadata_service_test.go @@ -0,0 +1,71 @@ +package lock + +import ( + "net/http" + "testing" + + "github.com/databricks/cli/bundle/deployplan" + "github.com/databricks/cli/libs/tmpdms" + "github.com/databricks/databricks-sdk-go/apierr" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestPlanActionToOperationAction(t *testing.T) { + tests := []struct { + action deployplan.ActionType + expected tmpdms.OperationActionType + isNoop bool + }{ + {deployplan.Skip, "", true}, + {deployplan.Create, tmpdms.OperationActionTypeCreate, false}, + {deployplan.Update, tmpdms.OperationActionTypeUpdate, false}, + {deployplan.UpdateWithID, tmpdms.OperationActionTypeUpdateWithID, false}, + {deployplan.Delete, tmpdms.OperationActionTypeDelete, false}, + {deployplan.Recreate, tmpdms.OperationActionTypeRecreate, false}, + {deployplan.Resize, tmpdms.OperationActionTypeResize, false}, + } + + for _, tt := range tests { + t.Run(string(tt.action), func(t *testing.T) { + result, err := planActionToOperationAction(tt.action) + require.NoError(t, err) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestPlanActionToOperationActionSkipIsNoop(t *testing.T) { + result, err := planActionToOperationAction(deployplan.Skip) + require.NoError(t, err) + assert.Equal(t, tmpdms.OperationActionType(""), result) +} + +func TestPlanActionToOperationActionUnsupported(t *testing.T) { + _, err := planActionToOperationAction("unknown_action") + assert.ErrorContains(t, err, "unsupported operation action type") +} + +func TestIsAlreadyExists(t *testing.T) { + assert.True(t, isAlreadyExists(&apierr.APIError{StatusCode: http.StatusConflict})) + assert.False(t, isAlreadyExists(&apierr.APIError{StatusCode: http.StatusNotFound})) + assert.False(t, isAlreadyExists(&apierr.APIError{StatusCode: http.StatusInternalServerError})) + assert.False(t, isAlreadyExists(assert.AnError)) + assert.False(t, isAlreadyExists(nil)) +} + +func TestGoalToVersionType(t *testing.T) { + vt, ok := goalToVersionType(GoalDeploy) + assert.True(t, ok) + assert.Equal(t, tmpdms.VersionTypeDeploy, vt) + + vt, ok = goalToVersionType(GoalDestroy) + assert.True(t, ok) + assert.Equal(t, tmpdms.VersionTypeDestroy, vt) + + _, ok = goalToVersionType(GoalBind) + assert.False(t, ok) + + _, ok = goalToVersionType(GoalUnbind) + assert.False(t, ok) +} diff --git a/bundle/deploy/lock/lock.go b/bundle/deploy/lock/lock.go new file mode 100644 index 0000000000..c51b17c912 --- /dev/null +++ b/bundle/deploy/lock/lock.go @@ -0,0 +1,62 @@ +package lock + +import ( + "context" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/bundle/env" + "github.com/databricks/cli/libs/tmpdms" +) + +// Goal describes the purpose of a deployment operation. +type Goal string + +const ( + GoalBind = Goal("bind") + GoalUnbind = Goal("unbind") + GoalDeploy = Goal("deploy") + GoalDestroy = Goal("destroy") +) + +// DeploymentStatus indicates whether the deployment operation succeeded or failed. +type DeploymentStatus int + +const ( + DeploymentSuccess DeploymentStatus = iota + DeploymentFailure +) + +// DeploymentLock manages the deployment lock lifecycle. +type DeploymentLock interface { + // Acquire acquires the deployment lock. + Acquire(ctx context.Context) error + + // Release releases the deployment lock with the given deployment status. + Release(ctx context.Context, status DeploymentStatus) error +} + +// NewDeploymentLock returns a DeploymentLock implementation based on the +// current environment. If managed state is enabled and the goal maps to a +// supported version type, a metadata service lock is returned. Otherwise, +// a workspace filesystem lock is returned. +func NewDeploymentLock(ctx context.Context, b *bundle.Bundle, goal Goal) DeploymentLock { + useManagedState, _ := env.ManagedState(ctx) + if useManagedState == "true" { + versionType, ok := goalToVersionType(goal) + if ok { + return newMetadataServiceLock(b, versionType) + } + } + return newWorkspaceFilesystemLock(b, goal) +} + +func goalToVersionType(goal Goal) (tmpdms.VersionType, bool) { + switch goal { + case GoalDeploy: + return tmpdms.VersionTypeDeploy, true + case GoalDestroy: + return tmpdms.VersionTypeDestroy, true + default: + return "", false + } +} diff --git a/bundle/deploy/lock/release.go b/bundle/deploy/lock/release.go deleted file mode 100644 index 26f95edfc9..0000000000 --- a/bundle/deploy/lock/release.go +++ /dev/null @@ -1,58 +0,0 @@ -package lock - -import ( - "context" - - "github.com/databricks/cli/bundle" - "github.com/databricks/cli/libs/diag" - "github.com/databricks/cli/libs/locker" - "github.com/databricks/cli/libs/log" -) - -type Goal string - -const ( - GoalBind = Goal("bind") - GoalUnbind = Goal("unbind") - GoalDeploy = Goal("deploy") - GoalDestroy = Goal("destroy") -) - -type release struct { - goal Goal -} - -func Release(goal Goal) bundle.Mutator { - return &release{goal} -} - -func (m *release) Name() string { - return "lock:release" -} - -func (m *release) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { - // Return early if locking is disabled. - if !b.Config.Bundle.Deployment.Lock.IsEnabled() { - log.Infof(ctx, "Skipping; locking is disabled") - return nil - } - - // Return early if the locker is not set. - // It is likely an error occurred prior to initialization of the locker instance. - if b.Locker == nil { - log.Warnf(ctx, "Unable to release lock if locker is not configured") - return nil - } - - log.Infof(ctx, "Releasing deployment lock") - switch m.goal { - case GoalDeploy: - return diag.FromErr(b.Locker.Unlock(ctx)) - case GoalBind, GoalUnbind: - return diag.FromErr(b.Locker.Unlock(ctx)) - case GoalDestroy: - return diag.FromErr(b.Locker.Unlock(ctx, locker.AllowLockFileNotExist)) - default: - return diag.Errorf("unknown goal for lock release: %s", m.goal) - } -} diff --git a/bundle/deploy/lock/workspace_filesystem.go b/bundle/deploy/lock/workspace_filesystem.go new file mode 100644 index 0000000000..e84f327d84 --- /dev/null +++ b/bundle/deploy/lock/workspace_filesystem.go @@ -0,0 +1,66 @@ +package lock + +import ( + "context" + + "github.com/databricks/cli/bundle" + "github.com/databricks/cli/libs/locker" + "github.com/databricks/cli/libs/log" +) + +type workspaceFilesystemLock struct { + b *bundle.Bundle + goal Goal +} + +func newWorkspaceFilesystemLock(b *bundle.Bundle, goal Goal) *workspaceFilesystemLock { + return &workspaceFilesystemLock{b: b, goal: goal} +} + +func (l *workspaceFilesystemLock) Acquire(ctx context.Context) error { + b := l.b + + if !b.Config.Bundle.Deployment.Lock.IsEnabled() { + log.Infof(ctx, "Skipping; locking is disabled") + return nil + } + + user := b.Config.Workspace.CurrentUser.UserName + dir := b.Config.Workspace.StatePath + lk, err := locker.CreateLocker(user, dir, b.WorkspaceClient()) + if err != nil { + return err + } + + b.Locker = lk + + force := b.Config.Bundle.Deployment.Lock.Force + log.Infof(ctx, "Acquiring deployment lock (force: %v)", force) + err = lk.Lock(ctx, force) + if err != nil { + log.Errorf(ctx, "Failed to acquire deployment lock: %v", err) + return err + } + + return nil +} + +func (l *workspaceFilesystemLock) Release(ctx context.Context, _ DeploymentStatus) error { + b := l.b + + if !b.Config.Bundle.Deployment.Lock.IsEnabled() { + log.Infof(ctx, "Skipping; locking is disabled") + return nil + } + + if b.Locker == nil { + log.Warnf(ctx, "Unable to release lock if locker is not configured") + return nil + } + + log.Infof(ctx, "Releasing deployment lock") + if l.goal == GoalDestroy { + return b.Locker.Unlock(ctx, locker.AllowLockFileNotExist) + } + return b.Locker.Unlock(ctx) +} diff --git a/bundle/deployplan/plan.go b/bundle/deployplan/plan.go index e0dcd9b288..e7eae034b1 100644 --- a/bundle/deployplan/plan.go +++ b/bundle/deployplan/plan.go @@ -22,6 +22,12 @@ type Plan struct { Serial int `json:"serial,omitempty"` Plan map[string]*PlanEntry `json:"plan,omitzero"` + // DeploymentVersion is the DMS deployment version that this plan was + // computed against. Used for optimistic concurrency control: the plan + // can only be applied when the deployment's last_version_id still equals + // this value, ensuring no other deployment happened in between. + DeploymentVersion string `json:"deployment_version,omitempty"` + mutex sync.Mutex `json:"-"` lockmap lockmap `json:"-"` } diff --git a/bundle/direct/bundle_apply.go b/bundle/direct/bundle_apply.go index af1cadfaf2..79211da2fa 100644 --- a/bundle/direct/bundle_apply.go +++ b/bundle/direct/bundle_apply.go @@ -26,6 +26,20 @@ func (b *DeploymentBundle) Apply(ctx context.Context, client *databricks.Workspa } b.StateDB.AssertOpened() + + // On the first DMS deployment, register all existing resources from state + // before any CRUD operations. This is a separate phase so that registration + // errors are treated as deployment failures. + if b.InitialRegistrationReporter != nil { + for resourceKey, entry := range b.StateDB.Data.State { + if err := b.InitialRegistrationReporter(ctx, resourceKey, entry.ID); err != nil { + logdiag.LogError(ctx, err) + return + } + } + b.InitialRegistrationReporter = nil + } + b.RemoteStateCache.Clear() g, err := makeGraph(plan) @@ -84,7 +98,23 @@ func (b *DeploymentBundle) Apply(ctx context.Context, client *databricks.Workspa logdiag.LogError(ctx, fmt.Errorf("%s: Unexpected delete action during migration", errorPrefix)) return false } + + // Capture the resource ID before deletion for operation reporting. + var deleteResourceID string + if b.OperationReporter != nil { + if dbentry, ok := b.StateDB.GetResourceEntry(resourceKey); ok { + deleteResourceID = dbentry.ID + } + } + err = d.Destroy(ctx, &b.StateDB) + if b.OperationReporter != nil { + reportErr := b.OperationReporter(ctx, resourceKey, deleteResourceID, action, err) + if reportErr != nil { + logdiag.LogError(ctx, fmt.Errorf("%s: failed to report operation: %w", errorPrefix, reportErr)) + return false + } + } if err != nil { logdiag.LogError(ctx, fmt.Errorf("%s: %w", errorPrefix, err)) return false @@ -93,7 +123,6 @@ func (b *DeploymentBundle) Apply(ctx context.Context, client *databricks.Workspa } // We don't keep NewState around for 'skip' nodes - if action != deployplan.Skip { if !b.resolveReferences(ctx, resourceKey, entry, errorPrefix, false) { return false @@ -124,6 +153,18 @@ func (b *DeploymentBundle) Apply(ctx context.Context, client *databricks.Workspa err = d.Deploy(ctx, &b.StateDB, sv.Value, action, entry) } + // Report the operation inline to the metadata service. + if b.OperationReporter != nil && !migrateMode { + var resourceID string + if dbentry, ok := b.StateDB.GetResourceEntry(resourceKey); ok { + resourceID = dbentry.ID + } + if reportErr := b.OperationReporter(ctx, resourceKey, resourceID, action, err); reportErr != nil { + logdiag.LogError(ctx, fmt.Errorf("%s: failed to report operation: %w", errorPrefix, reportErr)) + return false + } + } + if err != nil { logdiag.LogError(ctx, fmt.Errorf("%s: %w", errorPrefix, err)) return false diff --git a/bundle/direct/bundle_plan.go b/bundle/direct/bundle_plan.go index 8bd2b4bfd1..96f5bf013f 100644 --- a/bundle/direct/bundle_plan.go +++ b/bundle/direct/bundle_plan.go @@ -126,6 +126,11 @@ func (b *DeploymentBundle) CalculatePlan(ctx context.Context, client *databricks return nil, fmt.Errorf("reading config: %w", err) } + // Record the DMS deployment version for optimistic concurrency control. + // When applying a pre-computed plan, this version is validated against + // the current deployment state to detect stale plans. + plan.DeploymentVersion = b.DeploymentVersion + b.Plan = plan g, err := makeGraph(plan) diff --git a/bundle/direct/pkg.go b/bundle/direct/pkg.go index 74e72e79b0..a005d61a93 100644 --- a/bundle/direct/pkg.go +++ b/bundle/direct/pkg.go @@ -37,6 +37,27 @@ type DeploymentUnit struct { DependsOn []deployplan.DependsOnEntry } +// OperationReporter is called after each resource operation (success or failure) +// to report it to the deployment metadata service. If operationErr is non-nil the +// operation is recorded as failed with the error message. Returns an error if +// reporting fails; callers must treat this as a deployment failure. +type OperationReporter func( + ctx context.Context, + resourceKey string, + resourceID string, + action deployplan.ActionType, + operationErr error, +) error + +// InitialRegistrationReporter registers a single existing resource with the +// metadata service during the first DMS deployment. Called for each resource +// in the state DB before any CRUD operations. +type InitialRegistrationReporter func( + ctx context.Context, + resourceKey string, + resourceID string, +) error + // DeploymentBundle holds everything needed to deploy a bundle type DeploymentBundle struct { StateDB dstate.DeploymentState @@ -44,6 +65,18 @@ type DeploymentBundle struct { Plan *deployplan.Plan RemoteStateCache sync.Map StateCache structvar.Cache + + // DeploymentVersion is the DMS last_version_id before the current version + // was created. Set during lock acquisition and copied to the plan for OCC. + DeploymentVersion string + + // OperationReporter, when set, is called inline after each successful + // resource Create/Update/Delete to report the operation to the metadata service. + OperationReporter OperationReporter + + // InitialRegistrationReporter, when set, is called during the first DMS + // deployment to register all existing resources from state before CRUD. + InitialRegistrationReporter InitialRegistrationReporter } // SetRemoteState updates the remote state with type validation and marks as fresh. diff --git a/bundle/env/deployment_metadata.go b/bundle/env/deployment_metadata.go new file mode 100644 index 0000000000..a4d08c7cd0 --- /dev/null +++ b/bundle/env/deployment_metadata.go @@ -0,0 +1,15 @@ +package env + +import "context" + +// managedStateVariable names the environment variable that controls whether +// server-managed state is used for locking and resource state management. +const managedStateVariable = "DATABRICKS_BUNDLE_MANAGED_STATE" + +// ManagedState returns the environment variable that controls whether +// server-managed state is used for locking and resource state management. +func ManagedState(ctx context.Context) (string, bool) { + return get(ctx, []string{ + managedStateVariable, + }) +} diff --git a/bundle/phases/bind.go b/bundle/phases/bind.go index a8f99b28e8..22635e57a7 100644 --- a/bundle/phases/bind.go +++ b/bundle/phases/bind.go @@ -27,13 +27,15 @@ func Bind(ctx context.Context, b *bundle.Bundle, opts *terraform.BindOptions) { return } - bundle.ApplyContext(ctx, b, lock.Acquire()) - if logdiag.HasError(ctx) { + dl := lock.NewDeploymentLock(ctx, b, lock.GoalBind) + if err := dl.Acquire(ctx); err != nil { + logdiag.LogError(ctx, err) return } - defer func() { - bundle.ApplyContext(ctx, b, lock.Release(lock.GoalBind)) + if err := dl.Release(ctx, lock.DeploymentSuccess); err != nil { + log.Warnf(ctx, "Failed to release deployment lock: %v", err) + } }() if engine.IsDirect() { @@ -129,13 +131,15 @@ func Unbind(ctx context.Context, b *bundle.Bundle, bundleType, tfResourceType, r return } - bundle.ApplyContext(ctx, b, lock.Acquire()) - if logdiag.HasError(ctx) { + dl := lock.NewDeploymentLock(ctx, b, lock.GoalUnbind) + if err := dl.Acquire(ctx); err != nil { + logdiag.LogError(ctx, err) return } - defer func() { - bundle.ApplyContext(ctx, b, lock.Release(lock.GoalUnbind)) + if err := dl.Release(ctx, lock.DeploymentSuccess); err != nil { + log.Warnf(ctx, "Failed to release deployment lock: %v", err) + } }() if engine.IsDirect() { diff --git a/bundle/phases/deploy.go b/bundle/phases/deploy.go index 4613a7a211..8b1eca3284 100644 --- a/bundle/phases/deploy.go +++ b/bundle/phases/deploy.go @@ -15,6 +15,7 @@ import ( "github.com/databricks/cli/bundle/deploy/terraform" "github.com/databricks/cli/bundle/deployplan" "github.com/databricks/cli/bundle/direct" + "github.com/databricks/cli/bundle/env" "github.com/databricks/cli/bundle/libraries" "github.com/databricks/cli/bundle/metrics" "github.com/databricks/cli/bundle/permissions" @@ -97,28 +98,31 @@ func approvalForDeploy(ctx context.Context, b *bundle.Bundle, plan *deployplan.P return approved, nil } -func deployCore(ctx context.Context, b *bundle.Bundle, plan *deployplan.Plan, targetEngine engine.EngineType) { +func deployCore(ctx context.Context, b *bundle.Bundle, plan *deployplan.Plan, engine engine.EngineType) { // Core mutators that CRUD resources and modify deployment state. These // mutators need informed consent if they are potentially destructive. cmdio.LogString(ctx, "Deploying resources...") - if targetEngine.IsDirect() { + if engine.IsDirect() { b.DeploymentBundle.Apply(ctx, b.WorkspaceClient(), plan, direct.MigrateMode(false)) } else { bundle.ApplyContext(ctx, b, terraform.Apply()) } - // Even if deployment failed, there might be updates in states that we need to upload - statemgmt.PushResourcesState(ctx, b, targetEngine) + // Even if deployment failed, there might be updates in state that we need + // to upload. For the filesystem-based state, this uploads the state file to + // the workspace. For the metadata service, this is a no-op since operation + // results are reported inline during deployment. + statemgmt.PushResourcesState(ctx, b, engine) if logdiag.HasError(ctx) { return } bundle.ApplySeqContext(ctx, b, - statemgmt.Load(targetEngine), + statemgmt.Load(engine), metadata.Compute(), metadata.Upload(), - statemgmt.UploadStateForYamlSync(targetEngine), + statemgmt.UploadStateForYamlSync(engine), ) if !logdiag.HasError(ctx) { @@ -139,27 +143,44 @@ func uploadLibraries(ctx context.Context, b *bundle.Bundle, libs map[string][]li // The deploy phase deploys artifacts and resources. // If readPlanPath is provided, the plan is loaded from that file instead of being calculated. func Deploy(ctx context.Context, b *bundle.Bundle, outputHandler sync.OutputHandler, engine engine.EngineType, libs map[string][]libraries.LocationToUpdate, plan *deployplan.Plan) { - log.Info(ctx, "Phase: deploy") + useMetadataService, _ := env.ManagedState(ctx) - // Core mutators that CRUD resources and modify deployment state. These - // mutators need informed consent if they are potentially destructive. - bundle.ApplySeqContext(ctx, b, - scripts.Execute(config.ScriptPreDeploy), - lock.Acquire(), - ) + if useMetadataService == "true" { + log.Info(ctx, "Phase: deploy (with metadata service)") + if !engine.IsDirect() { + logdiag.LogError(ctx, errors.New("managed state is only supported with the direct deployment engine")) + return + } + } else { + log.Info(ctx, "Phase: deploy") + } + bundle.ApplyContext(ctx, b, scripts.Execute(config.ScriptPreDeploy)) if logdiag.HasError(ctx) { - // lock is not acquired here return } - // lock is acquired here + // Acquire the deployment lock. + var failed bool + + dl := lock.NewDeploymentLock(ctx, b, lock.GoalDeploy) + if err := dl.Acquire(ctx); err != nil { + logdiag.LogError(ctx, err) + return + } defer func() { - bundle.ApplyContext(ctx, b, lock.Release(lock.GoalDeploy)) + status := lock.DeploymentSuccess + if failed || logdiag.HasError(ctx) { + status = lock.DeploymentFailure + } + if err := dl.Release(ctx, status); err != nil { + log.Warnf(ctx, "Failed to release deployment lock: %v", err) + } }() uploadLibraries(ctx, b, libs) if logdiag.HasError(ctx) { + failed = true return } @@ -171,40 +192,42 @@ func Deploy(ctx context.Context, b *bundle.Bundle, outputHandler sync.OutputHand metrics.TrackUsedCompute(), deploy.ResourcePathMkdir(), ) - if logdiag.HasError(ctx) { + failed = true return } if plan != nil { - // Initialize DeploymentBundle for applying the loaded plan + // Initialize DeploymentBundle for applying the loaded plan. _, localPath := b.StateFilenameDirect(ctx) err := b.DeploymentBundle.InitForApply(ctx, b.WorkspaceClient(), localPath, plan) if err != nil { logdiag.LogError(ctx, err) + failed = true return } } else { plan = RunPlan(ctx, b, engine) } - if logdiag.HasError(ctx) { + failed = true return } haveApproval, err := approvalForDeploy(ctx, b, plan) if err != nil { logdiag.LogError(ctx, err) + failed = true return } - if haveApproval { - deployCore(ctx, b, plan, engine) - } else { + if !haveApproval { cmdio.LogString(ctx, "Deployment cancelled!") return } + deployCore(ctx, b, plan, engine) if logdiag.HasError(ctx) { + failed = true return } diff --git a/bundle/phases/destroy.go b/bundle/phases/destroy.go index e6be00b579..07d3cc8ddd 100644 --- a/bundle/phases/destroy.go +++ b/bundle/phases/destroy.go @@ -13,6 +13,7 @@ import ( "github.com/databricks/cli/bundle/deploy/terraform" "github.com/databricks/cli/bundle/deployplan" "github.com/databricks/cli/bundle/direct" + "github.com/databricks/cli/bundle/env" "github.com/databricks/cli/libs/cmdio" "github.com/databricks/cli/libs/log" "github.com/databricks/cli/libs/logdiag" @@ -98,7 +99,6 @@ func destroyCore(ctx context.Context, b *bundle.Bundle, plan *deployplan.Plan, e if engine.IsDirect() { b.DeploymentBundle.Apply(ctx, b.WorkspaceClient(), plan, direct.MigrateMode(false)) } else { - // Core destructive mutators for destroy. These require informed user consent. bundle.ApplyContext(ctx, b, terraform.Apply()) } @@ -115,26 +115,44 @@ func destroyCore(ctx context.Context, b *bundle.Bundle, plan *deployplan.Plan, e // The destroy phase deletes artifacts and resources. func Destroy(ctx context.Context, b *bundle.Bundle, engine engine.EngineType) { - log.Info(ctx, "Phase: destroy") + useMetadataService, _ := env.ManagedState(ctx) + + if useMetadataService == "true" { + log.Info(ctx, "Phase: destroy (with metadata service)") + if !engine.IsDirect() { + logdiag.LogError(ctx, errors.New("managed state is only supported with the direct deployment engine")) + return + } + } else { + log.Info(ctx, "Phase: destroy") + } ok, err := assertRootPathExists(ctx, b) if err != nil { logdiag.LogError(ctx, err) return } - if !ok { cmdio.LogString(ctx, "No active deployment found to destroy!") return } - bundle.ApplyContext(ctx, b, lock.Acquire()) - if logdiag.HasError(ctx) { + // Acquire the deployment lock. + var failed bool + + dl := lock.NewDeploymentLock(ctx, b, lock.GoalDestroy) + if err := dl.Acquire(ctx); err != nil { + logdiag.LogError(ctx, err) return } - defer func() { - bundle.ApplyContext(ctx, b, lock.Release(lock.GoalDestroy)) + status := lock.DeploymentSuccess + if failed || logdiag.HasError(ctx) { + status = lock.DeploymentFailure + } + if err := dl.Release(ctx, status); err != nil { + log.Warnf(ctx, "Failed to release deployment lock: %v", err) + } }() if !engine.IsDirect() { @@ -152,6 +170,7 @@ func Destroy(ctx context.Context, b *bundle.Bundle, engine engine.EngineType) { } if logdiag.HasError(ctx) { + failed = true return } @@ -161,18 +180,21 @@ func Destroy(ctx context.Context, b *bundle.Bundle, engine engine.EngineType) { plan, err = b.DeploymentBundle.CalculatePlan(ctx, b.WorkspaceClient(), nil, localPath) if err != nil { logdiag.LogError(ctx, err) + failed = true return } } else { tf := b.Terraform if tf == nil { logdiag.LogError(ctx, errors.New("terraform not initialized")) + failed = true return } plan, err = terraform.ShowPlanFile(ctx, tf, b.TerraformPlanPath) if err != nil { logdiag.LogError(ctx, err) + failed = true return } } @@ -180,11 +202,15 @@ func Destroy(ctx context.Context, b *bundle.Bundle, engine engine.EngineType) { hasApproval, err := approvalForDestroy(ctx, b, plan) if err != nil { logdiag.LogError(ctx, err) + failed = true return } if hasApproval { destroyCore(ctx, b, plan, engine) + if logdiag.HasError(ctx) { + failed = true + } } else { cmdio.LogString(ctx, "Destroy cancelled!") } diff --git a/bundle/statemgmt/state_load.go b/bundle/statemgmt/state_load.go index ef9e1f829d..c0dfe45c97 100644 --- a/bundle/statemgmt/state_load.go +++ b/bundle/statemgmt/state_load.go @@ -45,7 +45,6 @@ func (l *load) Apply(ctx context.Context, b *bundle.Bundle) diag.Diagnostics { return diag.FromErr(err) } } else { - var err error state, err = terraform.ParseResourcesState(ctx, b) if err != nil { return diag.FromErr(err) diff --git a/bundle/statemgmt/state_push.go b/bundle/statemgmt/state_push.go index b2da9f893c..13f2926369 100644 --- a/bundle/statemgmt/state_push.go +++ b/bundle/statemgmt/state_push.go @@ -9,6 +9,7 @@ import ( "github.com/databricks/cli/bundle" "github.com/databricks/cli/bundle/config/engine" "github.com/databricks/cli/bundle/deploy" + "github.com/databricks/cli/bundle/env" "github.com/databricks/cli/libs/cmdio" "github.com/databricks/cli/libs/filer" "github.com/databricks/cli/libs/log" @@ -16,6 +17,9 @@ import ( ) // PushResourcesState uploads the local state file to the remote location. +// When the deployment metadata service is enabled, the state is written to a +// backup path (resources.json.backup) instead. This gives users a safety net +// to revert from DMS to file-based state if needed. func PushResourcesState(ctx context.Context, b *bundle.Bundle, engine engine.EngineType) { f, err := deploy.StateFiler(b) if err != nil { @@ -31,6 +35,13 @@ func PushResourcesState(ctx context.Context, b *bundle.Bundle, engine engine.Eng remotePath, localPath = b.StateFilenameTerraform(ctx) } + // When DMS is enabled, write state to a backup path instead of the + // primary path. The DMS is the source of truth for resource state, + // but we keep a backup for rollback purposes. + if useDMS, _ := env.ManagedState(ctx); useDMS == "true" { + remotePath += ".backup" + } + local, err := os.Open(localPath) if errors.Is(err, fs.ErrNotExist) { // The state file can be absent if terraform apply is skipped because diff --git a/cmd/root/auth.go b/cmd/root/auth.go index 477db14337..8446b9b525 100644 --- a/cmd/root/auth.go +++ b/cmd/root/auth.go @@ -261,7 +261,7 @@ func MustWorkspaceClient(cmd *cobra.Command, args []string) error { if b != nil { ctx = cmdctx.SetConfigUsed(ctx, b.Config.Workspace.Config()) cmd.SetContext(ctx) - client, err := b.WorkspaceClientE() + client, err := b.WorkspaceClientE(ctx) if err != nil { return err } diff --git a/cmd/root/bundle.go b/cmd/root/bundle.go index 0b2ba1cfc6..602710d722 100644 --- a/cmd/root/bundle.go +++ b/cmd/root/bundle.go @@ -161,7 +161,7 @@ func configureBundle(cmd *cobra.Command, b *bundle.Bundle) { // // Note that just initializing a workspace client and loading auth configuration // is a fast operation. It does not perform network I/O or invoke processes (for example the Azure CLI). - client, err := b.WorkspaceClientE() + client, err := b.WorkspaceClientE(ctx) if err != nil { names, isMulti := databrickscfg.AsMultipleProfiles(err) if !isMulti { @@ -177,7 +177,7 @@ func configureBundle(cmd *cobra.Command, b *bundle.Bundle) { b.Config.Workspace.Profile = selected b.ClearWorkspaceClient() - client, err = b.WorkspaceClientE() + client, err = b.WorkspaceClientE(ctx) if err != nil { logdiag.LogError(ctx, err) return diff --git a/libs/testserver/deployment_metadata.go b/libs/testserver/deployment_metadata.go new file mode 100644 index 0000000000..1554712c4e --- /dev/null +++ b/libs/testserver/deployment_metadata.go @@ -0,0 +1,414 @@ +package testserver + +import ( + "encoding/json" + "fmt" + "net/http" + "strconv" + "strings" + "time" + + "github.com/databricks/cli/libs/tmpdms" +) + +// deploymentMetadata holds in-memory state for the deployment metadata service. +// Stored per-workspace inside FakeWorkspace. +type deploymentMetadata struct { + // deployments keyed by deployment_id + deployments map[string]tmpdms.Deployment + + // versions keyed by "deploymentId/versionId" + versions map[string]tmpdms.Version + + // operations keyed by "deploymentId/versionId/resourceKey" + operations map[string]tmpdms.Operation + + // resources keyed by "deploymentId/resourceKey" + resources map[string]tmpdms.Resource + + // lock state per deployment: which version holds the lock and when it expires + lockHolder map[string]string // deploymentId -> "deployments/{id}/versions/{vid}" + lockExpiry map[string]time.Time // deploymentId -> expiry time +} + +func newDeploymentMetadata() *deploymentMetadata { + return &deploymentMetadata{ + deployments: map[string]tmpdms.Deployment{}, + versions: map[string]tmpdms.Version{}, + operations: map[string]tmpdms.Operation{}, + resources: map[string]tmpdms.Resource{}, + lockHolder: map[string]string{}, + lockExpiry: map[string]time.Time{}, + } +} + +const lockDuration = 5 * time.Minute + +func (s *FakeWorkspace) DeploymentMetadataCreateDeployment(req Request) Response { + defer s.LockUnlock()() + + // deployment_id is a query parameter, not in the body. + deploymentID := req.URL.Query().Get("deployment_id") + if deploymentID == "" { + return Response{ + StatusCode: http.StatusBadRequest, + Body: map[string]string{"error_code": "INVALID_PARAMETER_VALUE", "message": "deployment_id is required"}, + } + } + + // The body maps to the Deployment sub-message. + var bodyDeployment tmpdms.Deployment + if len(req.Body) > 0 { + if err := json.Unmarshal(req.Body, &bodyDeployment); err != nil { + return Response{ + StatusCode: http.StatusBadRequest, + Body: map[string]string{"error_code": "INVALID_PARAMETER_VALUE", "message": fmt.Sprintf("invalid request: %s", err)}, + } + } + } + + state := s.deploymentMetadata + if _, exists := state.deployments[deploymentID]; exists { + return Response{ + StatusCode: http.StatusConflict, + Body: map[string]string{"error_code": "ALREADY_EXISTS", "message": fmt.Sprintf("deployment %s already exists", deploymentID)}, + } + } + + now := time.Now().UTC() + deployment := tmpdms.Deployment{ + Name: "deployments/" + deploymentID, + DisplayName: deploymentID, + TargetName: bodyDeployment.TargetName, + Status: tmpdms.DeploymentStatusActive, + CreatedBy: s.CurrentUser().UserName, + CreateTime: &now, + UpdateTime: &now, + } + + state.deployments[deploymentID] = deployment + return Response{Body: deployment} +} + +func (s *FakeWorkspace) DeploymentMetadataGetDeployment(deploymentID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + deployment, ok := state.deployments[deploymentID] + if !ok { + return Response{ + StatusCode: http.StatusNotFound, + Body: map[string]string{"error_code": "NOT_FOUND", "message": fmt.Sprintf("deployment %s not found", deploymentID)}, + } + } + return Response{Body: deployment} +} + +func (s *FakeWorkspace) DeploymentMetadataDeleteDeployment(deploymentID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + deployment, ok := state.deployments[deploymentID] + if !ok { + return Response{ + StatusCode: http.StatusNotFound, + Body: map[string]string{"error_code": "NOT_FOUND", "message": fmt.Sprintf("deployment %s not found", deploymentID)}, + } + } + + now := time.Now().UTC() + deployment.Status = tmpdms.DeploymentStatusDeleted + deployment.DestroyTime = &now + deployment.DestroyedBy = s.CurrentUser().UserName + deployment.UpdateTime = &now + state.deployments[deploymentID] = deployment + + return Response{Body: deployment} +} + +func (s *FakeWorkspace) DeploymentMetadataCreateVersion(req Request, deploymentID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + deployment, ok := state.deployments[deploymentID] + if !ok { + return Response{ + StatusCode: http.StatusNotFound, + Body: map[string]string{"error_code": "NOT_FOUND", "message": fmt.Sprintf("deployment %s not found", deploymentID)}, + } + } + + // version_id is a query parameter, not in the body. + versionID := req.URL.Query().Get("version_id") + if versionID == "" { + return Response{ + StatusCode: http.StatusBadRequest, + Body: map[string]string{"error_code": "INVALID_PARAMETER_VALUE", "message": "version_id is required"}, + } + } + + // The body maps to the Version sub-message. + var bodyVersion tmpdms.Version + if len(req.Body) > 0 { + if err := json.Unmarshal(req.Body, &bodyVersion); err != nil { + return Response{ + StatusCode: http.StatusBadRequest, + Body: map[string]string{"error_code": "INVALID_PARAMETER_VALUE", "message": fmt.Sprintf("invalid request: %s", err)}, + } + } + } + + // Validate version_id == last_version_id + 1 (matching server behavior). + var expectedVersionID string + if deployment.LastVersionID == "" { + expectedVersionID = "1" + } else { + lastVersion, err := strconv.ParseInt(deployment.LastVersionID, 10, 64) + if err != nil { + return Response{ + StatusCode: http.StatusInternalServerError, + Body: map[string]string{"error_code": "INTERNAL_ERROR", "message": "stored last_version_id is not a valid number: " + deployment.LastVersionID}, + } + } + expectedVersionID = strconv.FormatInt(lastVersion+1, 10) + } + if versionID != expectedVersionID { + return Response{ + StatusCode: http.StatusConflict, + Body: map[string]string{ + "error_code": "ABORTED", + "message": fmt.Sprintf("version_id must be %s (last_version_id + 1), got: %s", expectedVersionID, versionID), + }, + } + } + + // Check lock: if a lock is held and not expired, reject with 409. + now := time.Now().UTC() + if holder, hasLock := state.lockHolder[deploymentID]; hasLock { + if expiry, ok := state.lockExpiry[deploymentID]; ok && expiry.After(now) { + return Response{ + StatusCode: http.StatusConflict, + Body: map[string]string{ + "error_code": "ABORTED", + "message": fmt.Sprintf("deployment is locked by %s until %s", holder, expiry.Format(time.RFC3339)), + }, + } + } + } + + versionKey := deploymentID + "/" + versionID + version := tmpdms.Version{ + Name: fmt.Sprintf("deployments/%s/versions/%s", deploymentID, versionID), + VersionID: versionID, + CreatedBy: s.CurrentUser().UserName, + CreateTime: &now, + Status: tmpdms.VersionStatusInProgress, + } + version.CliVersion = bodyVersion.CliVersion + version.VersionType = bodyVersion.VersionType + + state.versions[versionKey] = version + + // Acquire the lock. + lockExpiry := now.Add(lockDuration) + state.lockHolder[deploymentID] = version.Name + state.lockExpiry[deploymentID] = lockExpiry + + // Update the deployment's last_version_id and status. + deployment.LastVersionID = versionID + deployment.Status = tmpdms.DeploymentStatusInProgress + deployment.UpdateTime = &now + state.deployments[deploymentID] = deployment + + return Response{Body: version} +} + +func (s *FakeWorkspace) DeploymentMetadataGetVersion(deploymentID, versionID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + versionKey := deploymentID + "/" + versionID + version, ok := state.versions[versionKey] + if !ok { + return Response{ + StatusCode: http.StatusNotFound, + Body: map[string]string{"error_code": "NOT_FOUND", "message": fmt.Sprintf("version %s not found", versionKey)}, + } + } + return Response{Body: version} +} + +func (s *FakeWorkspace) DeploymentMetadataHeartbeat(req Request, deploymentID, versionID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + versionKey := deploymentID + "/" + versionID + version, ok := state.versions[versionKey] + if !ok { + return Response{ + StatusCode: http.StatusNotFound, + Body: map[string]string{"error_code": "NOT_FOUND", "message": fmt.Sprintf("version %s not found", versionKey)}, + } + } + + if version.Status != tmpdms.VersionStatusInProgress { + return Response{ + StatusCode: http.StatusConflict, + Body: map[string]string{"error_code": "ABORTED", "message": "version is no longer in progress"}, + } + } + + // Verify this version holds the lock. + expectedHolder := fmt.Sprintf("deployments/%s/versions/%s", deploymentID, versionID) + if state.lockHolder[deploymentID] != expectedHolder { + return Response{ + StatusCode: http.StatusConflict, + Body: map[string]string{"error_code": "ABORTED", "message": "lock is not held by this version"}, + } + } + + // Renew the lock. + now := time.Now().UTC() + newExpiry := now.Add(lockDuration) + state.lockExpiry[deploymentID] = newExpiry + + return Response{Body: tmpdms.HeartbeatResponse{ExpireTime: &newExpiry}} +} + +func (s *FakeWorkspace) DeploymentMetadataCompleteVersion(req Request, deploymentID, versionID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + + // Allow tests to simulate a complete version failure. If the deployment's + // target_name is "fail-complete", return a 500 error. + if deployment, ok := state.deployments[deploymentID]; ok && deployment.TargetName == "fail-complete" { + return Response{ + StatusCode: http.StatusInternalServerError, + Body: map[string]string{"error_code": "INTERNAL_ERROR", "message": "simulated complete version failure"}, + } + } + + versionKey := deploymentID + "/" + versionID + version, ok := state.versions[versionKey] + if !ok { + return Response{ + StatusCode: http.StatusNotFound, + Body: map[string]string{"error_code": "NOT_FOUND", "message": fmt.Sprintf("version %s not found", versionKey)}, + } + } + + if version.Status != tmpdms.VersionStatusInProgress { + return Response{ + StatusCode: http.StatusConflict, + Body: map[string]string{"error_code": "ABORTED", "message": "version is already completed"}, + } + } + + var completeReq tmpdms.CompleteVersionRequest + if err := json.Unmarshal(req.Body, &completeReq); err != nil { + return Response{ + StatusCode: http.StatusBadRequest, + Body: map[string]string{"error_code": "INVALID_PARAMETER_VALUE", "message": fmt.Sprintf("invalid request: %s", err)}, + } + } + + now := time.Now().UTC() + version.Status = tmpdms.VersionStatusCompleted + version.CompleteTime = &now + version.CompletionReason = completeReq.CompletionReason + version.CompletedBy = s.CurrentUser().UserName + state.versions[versionKey] = version + + // Release the lock. + delete(state.lockHolder, deploymentID) + delete(state.lockExpiry, deploymentID) + + // Update deployment status based on completion reason. + if deployment, ok := state.deployments[deploymentID]; ok { + switch completeReq.CompletionReason { + case tmpdms.VersionCompleteSuccess: + deployment.Status = tmpdms.DeploymentStatusActive + case tmpdms.VersionCompleteFailure, tmpdms.VersionCompleteForceAbort, tmpdms.VersionCompleteLeaseExpired: + deployment.Status = tmpdms.DeploymentStatusFailed + case tmpdms.VersionCompleteUnspecified: + // No status change for unspecified completion reason. + } + deployment.UpdateTime = &now + state.deployments[deploymentID] = deployment + } + + return Response{Body: version} +} + +func (s *FakeWorkspace) DeploymentMetadataCreateOperation(req Request, deploymentID, versionID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + + // resource_key is a query parameter, not in the body. + resourceKey := req.URL.Query().Get("resource_key") + if resourceKey == "" { + return Response{ + StatusCode: http.StatusBadRequest, + Body: map[string]string{"error_code": "INVALID_PARAMETER_VALUE", "message": "resource_key is required"}, + } + } + + // The body maps to the Operation sub-message. + var bodyOperation tmpdms.Operation + if len(req.Body) > 0 { + if err := json.Unmarshal(req.Body, &bodyOperation); err != nil { + return Response{ + StatusCode: http.StatusBadRequest, + Body: map[string]string{"error_code": "INVALID_PARAMETER_VALUE", "message": fmt.Sprintf("invalid request: %s", err)}, + } + } + } + + now := time.Now().UTC() + opKey := deploymentID + "/" + versionID + "/" + resourceKey + operation := tmpdms.Operation{ + Name: fmt.Sprintf("deployments/%s/versions/%s/operations/%s", deploymentID, versionID, resourceKey), + ResourceKey: resourceKey, + CreateTime: &now, + ActionType: bodyOperation.ActionType, + State: bodyOperation.State, + ResourceID: bodyOperation.ResourceID, + Status: bodyOperation.Status, + ErrorMessage: bodyOperation.ErrorMessage, + } + + state.operations[opKey] = operation + + // Upsert the deployment-level resource. + resKey := deploymentID + "/" + resourceKey + resource := tmpdms.Resource{ + Name: fmt.Sprintf("deployments/%s/resources/%s", deploymentID, resourceKey), + ResourceKey: resourceKey, + State: bodyOperation.State, + ResourceID: bodyOperation.ResourceID, + LastActionType: bodyOperation.ActionType, + LastVersionID: versionID, + } + state.resources[resKey] = resource + + return Response{Body: operation} +} + +func (s *FakeWorkspace) DeploymentMetadataListResources(deploymentID string) Response { + defer s.LockUnlock()() + + state := s.deploymentMetadata + prefix := deploymentID + "/" + var resources []tmpdms.Resource + for key, resource := range state.resources { + if strings.HasPrefix(key, prefix) { + resources = append(resources, resource) + } + } + if resources == nil { + resources = []tmpdms.Resource{} + } + return Response{Body: tmpdms.ListResourcesResponse{Resources: resources}} +} diff --git a/libs/testserver/fake_workspace.go b/libs/testserver/fake_workspace.go index b13aae069a..64faaf78e2 100644 --- a/libs/testserver/fake_workspace.go +++ b/libs/testserver/fake_workspace.go @@ -173,6 +173,8 @@ type FakeWorkspace struct { // clusterVenvs caches Python venvs per existing cluster ID, // matching cloud behavior where libraries are cached on running clusters. clusterVenvs map[string]*clusterEnv + + deploymentMetadata *deploymentMetadata } func (s *FakeWorkspace) LockUnlock() func() { @@ -297,6 +299,7 @@ func NewFakeWorkspace(url, token string) *FakeWorkspace { PostgresEndpoints: map[string]postgres.Endpoint{}, PostgresOperations: map[string]postgres.Operation{}, clusterVenvs: map[string]*clusterEnv{}, + deploymentMetadata: newDeploymentMetadata(), Alerts: map[string]sql.AlertV2{}, Experiments: map[string]ml.GetExperimentResponse{}, ModelRegistryModels: map[string]ml.Model{}, diff --git a/libs/testserver/handlers.go b/libs/testserver/handlers.go index 9e30cb5f0c..904284ed51 100644 --- a/libs/testserver/handlers.go +++ b/libs/testserver/handlers.go @@ -905,4 +905,42 @@ func AddDefaultHandlers(server *Server) { }, } }) + + // Deployment Metadata Service: + + server.Handle("POST", "/api/2.0/bundle/deployments", func(req Request) any { + return req.Workspace.DeploymentMetadataCreateDeployment(req) + }) + + server.Handle("GET", "/api/2.0/bundle/deployments/{deployment_id}", func(req Request) any { + return req.Workspace.DeploymentMetadataGetDeployment(req.Vars["deployment_id"]) + }) + + server.Handle("DELETE", "/api/2.0/bundle/deployments/{deployment_id}", func(req Request) any { + return req.Workspace.DeploymentMetadataDeleteDeployment(req.Vars["deployment_id"]) + }) + + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions", func(req Request) any { + return req.Workspace.DeploymentMetadataCreateVersion(req, req.Vars["deployment_id"]) + }) + + server.Handle("GET", "/api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}", func(req Request) any { + return req.Workspace.DeploymentMetadataGetVersion(req.Vars["deployment_id"], req.Vars["version_id"]) + }) + + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/heartbeat", func(req Request) any { + return req.Workspace.DeploymentMetadataHeartbeat(req, req.Vars["deployment_id"], req.Vars["version_id"]) + }) + + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/complete", func(req Request) any { + return req.Workspace.DeploymentMetadataCompleteVersion(req, req.Vars["deployment_id"], req.Vars["version_id"]) + }) + + server.Handle("POST", "/api/2.0/bundle/deployments/{deployment_id}/versions/{version_id}/operations", func(req Request) any { + return req.Workspace.DeploymentMetadataCreateOperation(req, req.Vars["deployment_id"], req.Vars["version_id"]) + }) + + server.Handle("GET", "/api/2.0/bundle/deployments/{deployment_id}/resources", func(req Request) any { + return req.Workspace.DeploymentMetadataListResources(req.Vars["deployment_id"]) + }) } diff --git a/libs/testserver/server.go b/libs/testserver/server.go index 2d7048dc8d..54d291fb7e 100644 --- a/libs/testserver/server.go +++ b/libs/testserver/server.go @@ -305,7 +305,7 @@ func (s *Server) Handle(method, path string, handler HandlerFunc) { var resp EncodedResponse - if bytes.Contains(request.Body, []byte("INJECT_ERROR")) { + if bytes.Contains(request.Body, []byte("INJECT_ERROR")) || strings.Contains(r.URL.Path, "INJECT_ERROR") { resp = EncodedResponse{ StatusCode: 500, Body: []byte("INJECTED"), diff --git a/libs/tmpdms/api.go b/libs/tmpdms/api.go new file mode 100644 index 0000000000..b39590ad3f --- /dev/null +++ b/libs/tmpdms/api.go @@ -0,0 +1,165 @@ +package tmpdms + +import ( + "context" + "errors" + "fmt" + "net/http" + + "github.com/databricks/databricks-sdk-go" + "github.com/databricks/databricks-sdk-go/apierr" + "github.com/databricks/databricks-sdk-go/client" +) + +const basePath = "/api/2.0/bundle" + +// DeploymentMetadataAPI is a client for the Deployment Metadata Service. +// +// This is a temporary implementation that will be replaced by the SDK-generated +// client once the proto definitions land in the Go SDK. The method signatures +// and types are designed to match what the SDK will generate, so migration +// should be a straightforward import path change. +type DeploymentMetadataAPI struct { + api *client.DatabricksClient +} + +func NewDeploymentMetadataAPI(w *databricks.WorkspaceClient) (*DeploymentMetadataAPI, error) { + apiClient, err := client.New(w.Config) + if err != nil { + return nil, fmt.Errorf("failed to create deployment metadata API client: %w", err) + } + return &DeploymentMetadataAPI{api: apiClient}, nil +} + +func (a *DeploymentMetadataAPI) CreateDeployment(ctx context.Context, request CreateDeploymentRequest) (*Deployment, error) { + var resp Deployment + path := basePath + "/deployments" + query := map[string]any{"deployment_id": request.DeploymentID} + err := a.api.Do(ctx, http.MethodPost, path, nil, query, request.Deployment, &resp) + if err != nil { + return nil, mapError("create deployment", err) + } + return &resp, nil +} + +func (a *DeploymentMetadataAPI) GetDeployment(ctx context.Context, request GetDeploymentRequest) (*Deployment, error) { + var resp Deployment + path := fmt.Sprintf("%s/deployments/%s", basePath, request.DeploymentID) + err := a.api.Do(ctx, http.MethodGet, path, nil, nil, nil, &resp) + if err != nil { + return nil, mapError("get deployment", err) + } + return &resp, nil +} + +func (a *DeploymentMetadataAPI) DeleteDeployment(ctx context.Context, request DeleteDeploymentRequest) (*Deployment, error) { + var resp Deployment + path := fmt.Sprintf("%s/deployments/%s", basePath, request.DeploymentID) + err := a.api.Do(ctx, http.MethodDelete, path, nil, nil, nil, &resp) + if err != nil { + return nil, mapError("delete deployment", err) + } + return &resp, nil +} + +func (a *DeploymentMetadataAPI) CreateVersion(ctx context.Context, request CreateVersionRequest) (*Version, error) { + var resp Version + path := fmt.Sprintf("%s/deployments/%s/versions", basePath, request.DeploymentID) + query := map[string]any{"version_id": request.VersionID} + err := a.api.Do(ctx, http.MethodPost, path, nil, query, request.Version, &resp) + if err != nil { + return nil, mapError("create version", err) + } + return &resp, nil +} + +func (a *DeploymentMetadataAPI) GetVersion(ctx context.Context, request GetVersionRequest) (*Version, error) { + var resp Version + path := fmt.Sprintf("%s/deployments/%s/versions/%s", basePath, request.DeploymentID, request.VersionID) + err := a.api.Do(ctx, http.MethodGet, path, nil, nil, nil, &resp) + if err != nil { + return nil, mapError("get version", err) + } + return &resp, nil +} + +func (a *DeploymentMetadataAPI) Heartbeat(ctx context.Context, request HeartbeatRequest) (*HeartbeatResponse, error) { + var resp HeartbeatResponse + path := fmt.Sprintf("%s/deployments/%s/versions/%s/heartbeat", basePath, request.DeploymentID, request.VersionID) + err := a.api.Do(ctx, http.MethodPost, path, nil, nil, struct{}{}, &resp) + if err != nil { + return nil, mapError("heartbeat", err) + } + return &resp, nil +} + +func (a *DeploymentMetadataAPI) CompleteVersion(ctx context.Context, request CompleteVersionRequest) (*Version, error) { + var resp Version + path := fmt.Sprintf("%s/deployments/%s/versions/%s/complete", basePath, request.DeploymentID, request.VersionID) + err := a.api.Do(ctx, http.MethodPost, path, nil, nil, request, &resp) + if err != nil { + return nil, mapError("complete version", err) + } + return &resp, nil +} + +func (a *DeploymentMetadataAPI) CreateOperation(ctx context.Context, request CreateOperationRequest) (*Operation, error) { + var resp Operation + path := fmt.Sprintf("%s/deployments/%s/versions/%s/operations", basePath, request.DeploymentID, request.VersionID) + query := map[string]any{"resource_key": request.ResourceKey} + err := a.api.Do(ctx, http.MethodPost, path, nil, query, request.Operation, &resp) + if err != nil { + return nil, mapError("create operation", err) + } + return &resp, nil +} + +func (a *DeploymentMetadataAPI) ListResources(ctx context.Context, request ListResourcesRequest) ([]Resource, error) { + var allResources []Resource + pageToken := "" + + for { + var resp ListResourcesResponse + path := fmt.Sprintf("%s/deployments/%s/resources", basePath, request.DeploymentID) + + q := map[string]any{ + "page_size": 1000, + } + if pageToken != "" { + q["page_token"] = pageToken + } + + err := a.api.Do(ctx, http.MethodGet, path, nil, q, nil, &resp) + if err != nil { + return nil, mapError("list resources", err) + } + + allResources = append(allResources, resp.Resources...) + if resp.NextPageToken == "" { + break + } + pageToken = resp.NextPageToken + } + + return allResources, nil +} + +// mapError translates API errors into user-friendly messages. +func mapError(operation string, err error) error { + var apiErr *apierr.APIError + if !errors.As(err, &apiErr) { + return fmt.Errorf("%s: %w", operation, err) + } + + switch apiErr.StatusCode { + case http.StatusConflict: + return fmt.Errorf("%s: deployment is locked by another active deployment. "+ + "If the prior deployment failed, the lock will expire automatically after 5 minutes: %w", operation, err) + case http.StatusNotFound: + return fmt.Errorf("%s: resource not found: %w", operation, err) + case http.StatusBadRequest: + return fmt.Errorf("%s: bad request: %w", operation, err) + default: + return fmt.Errorf("%s: %w", operation, err) + } +} diff --git a/libs/tmpdms/types.go b/libs/tmpdms/types.go new file mode 100644 index 0000000000..8dd6cdbfb6 --- /dev/null +++ b/libs/tmpdms/types.go @@ -0,0 +1,226 @@ +// Package tmpdms is a temporary client library for the Deployment Metadata Service. +// It mirrors the structure that the Databricks Go SDK will eventually generate from +// the service's proto definitions. When the protos land in the SDK, migration should +// be a straightforward import path change. +package tmpdms + +import "time" + +// Enum types matching the proto definitions. +// Values are the proto enum name strings, which is how proto-over-HTTP serializes enums. + +type ( + DeploymentStatus string + VersionStatus string + VersionComplete string + VersionType string + OperationStatus string + OperationActionType string + DeploymentResourceType string +) + +const ( + DeploymentStatusUnspecified DeploymentStatus = "DEPLOYMENT_STATUS_UNSPECIFIED" + DeploymentStatusActive DeploymentStatus = "DEPLOYMENT_STATUS_ACTIVE" + DeploymentStatusFailed DeploymentStatus = "DEPLOYMENT_STATUS_FAILED" + DeploymentStatusInProgress DeploymentStatus = "DEPLOYMENT_STATUS_IN_PROGRESS" + DeploymentStatusDeleted DeploymentStatus = "DEPLOYMENT_STATUS_DELETED" +) + +const ( + VersionStatusUnspecified VersionStatus = "VERSION_STATUS_UNSPECIFIED" + VersionStatusInProgress VersionStatus = "VERSION_STATUS_IN_PROGRESS" + VersionStatusCompleted VersionStatus = "VERSION_STATUS_COMPLETED" +) + +const ( + VersionCompleteUnspecified VersionComplete = "VERSION_COMPLETE_UNSPECIFIED" + VersionCompleteSuccess VersionComplete = "VERSION_COMPLETE_SUCCESS" + VersionCompleteFailure VersionComplete = "VERSION_COMPLETE_FAILURE" + VersionCompleteForceAbort VersionComplete = "VERSION_COMPLETE_FORCE_ABORT" + VersionCompleteLeaseExpired VersionComplete = "VERSION_COMPLETE_LEASE_EXPIRED" +) + +const ( + VersionTypeUnspecified VersionType = "VERSION_TYPE_UNSPECIFIED" + VersionTypeDeploy VersionType = "VERSION_TYPE_DEPLOY" + VersionTypeDestroy VersionType = "VERSION_TYPE_DESTROY" +) + +const ( + OperationStatusUnspecified OperationStatus = "OPERATION_STATUS_UNSPECIFIED" + OperationStatusSucceeded OperationStatus = "OPERATION_STATUS_SUCCEEDED" + OperationStatusFailed OperationStatus = "OPERATION_STATUS_FAILED" +) + +const ( + OperationActionTypeUnspecified OperationActionType = "OPERATION_ACTION_TYPE_UNSPECIFIED" + OperationActionTypeResize OperationActionType = "OPERATION_ACTION_TYPE_RESIZE" + OperationActionTypeUpdate OperationActionType = "OPERATION_ACTION_TYPE_UPDATE" + OperationActionTypeUpdateWithID OperationActionType = "OPERATION_ACTION_TYPE_UPDATE_WITH_ID" + OperationActionTypeCreate OperationActionType = "OPERATION_ACTION_TYPE_CREATE" + OperationActionTypeRecreate OperationActionType = "OPERATION_ACTION_TYPE_RECREATE" + OperationActionTypeDelete OperationActionType = "OPERATION_ACTION_TYPE_DELETE" + OperationActionTypeBind OperationActionType = "OPERATION_ACTION_TYPE_BIND" + OperationActionTypeBindAndUpdate OperationActionType = "OPERATION_ACTION_TYPE_BIND_AND_UPDATE" + OperationActionTypeInitRegister OperationActionType = "OPERATION_ACTION_TYPE_INITIAL_REGISTER" +) + +const ( + ResourceTypeUnspecified DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_UNSPECIFIED" + ResourceTypeJob DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_JOB" + ResourceTypePipeline DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_PIPELINE" + ResourceTypeModel DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_MODEL" + ResourceTypeRegisteredModel DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_REGISTERED_MODEL" + ResourceTypeExperiment DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_EXPERIMENT" + ResourceTypeServingEndpoint DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_MODEL_SERVING_ENDPOINT" + ResourceTypeQualityMonitor DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_QUALITY_MONITOR" + ResourceTypeSchema DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_SCHEMA" + ResourceTypeVolume DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_VOLUME" + ResourceTypeCluster DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_CLUSTER" + ResourceTypeDashboard DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_DASHBOARD" + ResourceTypeApp DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_APP" + ResourceTypeCatalog DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_CATALOG" + ResourceTypeExternalLocation DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_EXTERNAL_LOCATION" + ResourceTypeSecretScope DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_SECRET_SCOPE" + ResourceTypeAlert DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_ALERT" + ResourceTypeSQLWarehouse DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_SQL_WAREHOUSE" + ResourceTypeDatabaseInstance DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_DATABASE_INSTANCE" + ResourceTypeDatabaseCatalog DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_DATABASE_CATALOG" + ResourceTypeSyncedDBTable DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_SYNCED_DATABASE_TABLE" + ResourceTypePostgresProject DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_POSTGRES_PROJECT" + ResourceTypePostgresBranch DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_POSTGRES_BRANCH" + ResourceTypePostgresEndpoint DeploymentResourceType = "DEPLOYMENT_RESOURCE_TYPE_POSTGRES_ENDPOINT" +) + +// Resource types (proto message equivalents). + +type Deployment struct { + Name string `json:"name,omitempty"` + DisplayName string `json:"display_name,omitempty"` + TargetName string `json:"target_name,omitempty"` + Status DeploymentStatus `json:"status,omitempty"` + LastVersionID string `json:"last_version_id,omitempty"` + CreatedBy string `json:"created_by,omitempty"` + CreateTime *time.Time `json:"create_time,omitempty"` + UpdateTime *time.Time `json:"update_time,omitempty"` + DestroyTime *time.Time `json:"destroy_time,omitempty"` + DestroyedBy string `json:"destroyed_by,omitempty"` +} + +type Version struct { + Name string `json:"name,omitempty"` + VersionID string `json:"version_id,omitempty"` + CreatedBy string `json:"created_by,omitempty"` + CreateTime *time.Time `json:"create_time,omitempty"` + CompleteTime *time.Time `json:"complete_time,omitempty"` + CliVersion string `json:"cli_version,omitempty"` + Status VersionStatus `json:"status,omitempty"` + VersionType VersionType `json:"version_type,omitempty"` + CompletionReason VersionComplete `json:"completion_reason,omitempty"` + CompletedBy string `json:"completed_by,omitempty"` + DisplayName string `json:"display_name,omitempty"` + TargetName string `json:"target_name,omitempty"` +} + +type Operation struct { + Name string `json:"name,omitempty"` + ResourceKey string `json:"resource_key,omitempty"` + ActionType OperationActionType `json:"action_type,omitempty"` + State any `json:"state,omitempty"` + ResourceID string `json:"resource_id,omitempty"` + CreateTime *time.Time `json:"create_time,omitempty"` + Status OperationStatus `json:"status,omitempty"` + ErrorMessage string `json:"error_message,omitempty"` +} + +type Resource struct { + Name string `json:"name,omitempty"` + ResourceKey string `json:"resource_key,omitempty"` + State any `json:"state,omitempty"` + ResourceID string `json:"resource_id,omitempty"` + LastActionType OperationActionType `json:"last_action_type,omitempty"` + LastVersionID string `json:"last_version_id,omitempty"` + ResourceType DeploymentResourceType `json:"resource_type,omitempty"` +} + +// Request types. + +type CreateDeploymentRequest struct { + DeploymentID string `json:"deployment_id"` + Deployment *Deployment `json:"deployment"` +} + +type GetDeploymentRequest struct { + DeploymentID string `json:"-"` +} + +type DeleteDeploymentRequest struct { + DeploymentID string `json:"-"` +} + +type CreateVersionRequest struct { + DeploymentID string `json:"-"` + Parent string `json:"parent"` + Version *Version `json:"version"` + VersionID string `json:"version_id"` +} + +type GetVersionRequest struct { + DeploymentID string `json:"-"` + VersionID string `json:"-"` +} + +type HeartbeatRequest struct { + DeploymentID string `json:"-"` + VersionID string `json:"-"` +} + +type CompleteVersionRequest struct { + DeploymentID string `json:"-"` + VersionID string `json:"-"` + Name string `json:"name"` + CompletionReason VersionComplete `json:"completion_reason"` + Force bool `json:"force,omitempty"` +} + +type CreateOperationRequest struct { + DeploymentID string `json:"-"` + VersionID string `json:"-"` + Parent string `json:"parent"` + ResourceKey string `json:"resource_key"` + Operation *Operation `json:"operation"` +} + +type ListResourcesRequest struct { + DeploymentID string `json:"-"` + Parent string `json:"parent"` + PageSize int `json:"page_size,omitempty"` + PageToken string `json:"page_token,omitempty"` +} + +// Response types. + +type HeartbeatResponse struct { + ExpireTime *time.Time `json:"expire_time,omitempty"` +} + +type ListDeploymentsResponse struct { + Deployments []Deployment `json:"deployments"` + NextPageToken string `json:"next_page_token,omitempty"` +} + +type ListVersionsResponse struct { + Versions []Version `json:"versions"` + NextPageToken string `json:"next_page_token,omitempty"` +} + +type ListOperationsResponse struct { + Operations []Operation `json:"operations"` + NextPageToken string `json:"next_page_token,omitempty"` +} + +type ListResourcesResponse struct { + Resources []Resource `json:"resources"` + NextPageToken string `json:"next_page_token,omitempty"` +}