Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Copyright 2026 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.storage;

import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.StatusCode;
import java.util.concurrent.TimeUnit;

final class AcoSpan implements Span {
private final Span delegate;
private final String bucketName;
private final OtelStorageDecorator parent;

AcoSpan(Span delegate, String bucketName, OtelStorageDecorator parent) {
this.delegate = delegate;
this.bucketName = bucketName;
this.parent = parent;
}

private void applyCacheAttributes() {
if (bucketName != null && parent != null) {
BucketMetadataCache.BucketMetadata md = parent.bucketMetadataCache.get(bucketName);
if (md != null && !md.fetchPending) {
delegate.setAttribute("gcp.resource.destination.id", md.resource);
delegate.setAttribute("gcp.resource.destination.location", md.location);
}
}
}

@Override
public void end() {
applyCacheAttributes();
delegate.end();
}

@Override
public void end(long timestamp, TimeUnit unit) {
applyCacheAttributes();
delegate.end(timestamp, unit);
}

@Override
public Span recordException(Throwable exception) {
delegate.recordException(exception);
if (exception instanceof StorageException && parent != null) {
StorageException se = (StorageException) exception;
if (se.getCode() == 404 && se.getMessage() != null) {
String msg = se.getMessage().toLowerCase(java.util.Locale.US);
if (msg.contains("bucket not found") || msg.contains("bucket does not exist")) {
parent.bucketMetadataCache.remove(bucketName);
}
}
}
return this;
}

@Override
public Span recordException(Throwable exception, Attributes attributes) {
delegate.recordException(exception, attributes);
if (exception instanceof StorageException && parent != null) {
StorageException se = (StorageException) exception;
if (se.getCode() == 404 && se.getMessage() != null) {
String msg = se.getMessage().toLowerCase(java.util.Locale.US);
if (msg.contains("bucket not found") || msg.contains("bucket does not exist")) {
parent.bucketMetadataCache.remove(bucketName);
}
}
}
return this;
}

@Override
public Span setAttribute(String k, String v) {
delegate.setAttribute(k, v);
return this;
}

@Override
public Span setAttribute(String k, long v) {
delegate.setAttribute(k, v);
return this;
}

@Override
public Span setAttribute(String k, double v) {
delegate.setAttribute(k, v);
return this;
}

@Override
public Span setAttribute(String k, boolean v) {
delegate.setAttribute(k, v);
return this;
}

@Override
public <T> Span setAttribute(AttributeKey<T> k, T v) {
delegate.setAttribute(k, v);
return this;
}

@Override
public Span addEvent(String n) {
delegate.addEvent(n);
return this;
}

@Override
public Span addEvent(String n, Attributes a) {
delegate.addEvent(n, a);
return this;
}

@Override
public Span addEvent(String n, long t, TimeUnit u) {
delegate.addEvent(n, t, u);
return this;
}

@Override
public Span addEvent(String n, Attributes a, long t, TimeUnit u) {
delegate.addEvent(n, a, t, u);
return this;
}

@Override
public Span setStatus(StatusCode c) {
delegate.setStatus(c);
return this;
}

@Override
public Span setStatus(StatusCode c, String d) {
delegate.setStatus(c, d);
return this;
}

@Override
public Span updateName(String name) {
delegate.updateName(name);
return this;
}

@Override
public SpanContext getSpanContext() {
return delegate.getSpanContext();
}

@Override
public boolean isRecording() {
return delegate.isRecording();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
/*
* Copyright 2026 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.storage;

import com.google.cloud.Tuple;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Context;
import java.util.Locale;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

final class AcoSpanBuilder implements SpanBuilder {

private static final Logger LOGGER = Logger.getLogger(AcoSpanBuilder.class.getName());

private final SpanBuilder delegate;
private final OtelStorageDecorator parent;
private String bucketName;

AcoSpanBuilder(SpanBuilder delegate, OtelStorageDecorator parent) {
this.delegate = delegate;
this.parent = parent;
}

@Override
public SpanBuilder setAttribute(String key, String value) {
delegate.setAttribute(key, value);
if ("gsutil.uri".equals(key) && value != null) {
String name = extractBucketName(value);
if (name != null && !name.isEmpty()) {
this.bucketName = name;
}
}
return this;
}

@Override
public <T> SpanBuilder setAttribute(AttributeKey<T> key, T value) {
delegate.setAttribute(key, value);
if ("gsutil.uri".equals(key.getKey()) && value instanceof String) {
String name = extractBucketName((String) value);
if (name != null && !name.isEmpty()) {
this.bucketName = name;
}
}
return this;
}
Comment thread
nidhiii-27 marked this conversation as resolved.

@Override
public Span startSpan() {
if (bucketName != null && parent != null) {
checkCacheAndTriggerFetch(
parent.delegate, parent.bucketMetadataCache, parent.cacheExecutor, bucketName);
return new AcoSpan(delegate.startSpan(), bucketName, parent);
}
return delegate.startSpan();
}

@Override
public SpanBuilder setNoParent() {
delegate.setNoParent();
return this;
}

@Override
public SpanBuilder setAttribute(String key, boolean value) {
delegate.setAttribute(key, value);
return this;
}

@Override
public SpanBuilder setAttribute(String key, double value) {
delegate.setAttribute(key, value);
return this;
}

@Override
public SpanBuilder setAttribute(String key, long value) {
delegate.setAttribute(key, value);
return this;
}

@Override
public SpanBuilder setSpanKind(SpanKind k) {
delegate.setSpanKind(k);
return this;
}

@Override
public SpanBuilder setStartTimestamp(long t, TimeUnit u) {
delegate.setStartTimestamp(t, u);
return this;
}

@Override
public SpanBuilder setParent(Context c) {
delegate.setParent(c);
return this;
}

@Override
public SpanBuilder addLink(SpanContext c) {
delegate.addLink(c);
return this;
}

@Override
public SpanBuilder addLink(SpanContext c, Attributes a) {
delegate.addLink(c, a);
return this;
}

static ExecutorService newCacheExecutor() {
return Executors.newFixedThreadPool(
4,
r -> {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("gcs-aco-metadata-cache-pool");
return t;
});
}

static String extractBucketName(String uri) {
if (uri == null || !uri.startsWith("gs://")) {
return null;
}
String remainder = uri.substring(5);
int firstSlash = remainder.indexOf('/');
if (firstSlash == -1) {
return remainder;
}
return remainder.substring(0, firstSlash);
}

static Tuple<String, String> fetch(Storage delegate, String bucketName) {
Bucket bucket = delegate.get(bucketName);
if (bucket == null) {
return null;
}

String projectId = bucket.getProject() != null ? bucket.getProject().toString() : null;
String resource;
if (projectId != null && !projectId.isEmpty()) {
resource = "projects/" + projectId + "/buckets/" + bucketName;
} else {
resource = "projects/_/buckets/" + bucketName;
}

String location =
bucket.getLocation() != null ? bucket.getLocation().toLowerCase(Locale.US) : "global";
String locationType =
bucket.getLocationType() != null
? bucket.getLocationType().toLowerCase(Locale.US)
: "region";

if ("multi-region".equals(locationType) || "dual-region".equals(locationType)) {
location = "global";
}

return Tuple.of(resource, location);
}

static void checkCacheAndTriggerFetch(
Storage delegate,
BucketMetadataCache bucketMetadataCache,
ExecutorService cacheExecutor,
String bucketName) {
if (bucketMetadataCache.containsKey(bucketName)) {
return;
}

// Put fetchPending placeholder synchronously to block concurrent stampedes
bucketMetadataCache.put(bucketName, "projects/_/buckets/" + bucketName, "global", true);

cacheExecutor.submit(
() -> {
try {
Tuple<String, String> layout = fetch(delegate, bucketName);
if (layout != null) {
bucketMetadataCache.put(bucketName, layout, false);
} else {
// Bucket does not exist (fetch returned null) -> Evict cache entry
bucketMetadataCache.remove(bucketName);
}
} catch (StorageException e) {
if (e.getCode() == 404) {
// Bucket not found -> Evict cache entry
bucketMetadataCache.remove(bucketName);
} else if (e.getCode() == 403) {
// Permission Denied -> Retain fallback values with fetchPending=false (Do Not Retry)
bucketMetadataCache.put(
bucketName, "projects/_/buckets/" + bucketName, "global", false);
} else {
LOGGER.log(Level.WARNING, "Background GetBucket failed", e);
}
} catch (Exception e) {
LOGGER.log(Level.WARNING, "Background GetBucket failed", e);
}
});
}
}
Loading
Loading