Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,7 @@ public void testStreamWriteFrequentCompactWithOverlappingKeys() throws Exception
}
write.compact(BinaryRow.EMPTY_ROW, 0, false);
write.compact(BinaryRow.EMPTY_ROW, 1, false);
commit.commit(commitId, write.prepareCommit(false, commitId));
commit.commit(commitId, write.prepareCommit(true, commitId));
commitId++;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.paimon.format.vortex;

import org.apache.paimon.arrow.ArrowBundleRecords;
import org.apache.paimon.arrow.ArrowUtils;
import org.apache.paimon.arrow.vector.ArrowCStruct;
import org.apache.paimon.arrow.vector.ArrowFormatWriter;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.BundleFormatWriter;
Expand All @@ -28,15 +30,16 @@

import dev.vortex.api.DType;
import dev.vortex.api.VortexWriter;
import org.apache.arrow.c.ArrowArray;
import org.apache.arrow.c.ArrowSchema;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Map;

import static org.apache.paimon.arrow.ArrowUtils.serializeToIpc;

/** Vortex records writer. */
public class VortexRecordsWriter implements BundleFormatWriter {

Expand All @@ -45,7 +48,9 @@ public class VortexRecordsWriter implements BundleFormatWriter {
private final ArrowFormatWriter arrowFormatWriter;
private final VortexWriter nativeWriter;
private final String path;
private long bytesWritten = 0;
private final ArrowArray arrowArray;
private final ArrowSchema arrowSchema;
private final BufferAllocator allocator;
private long jniCost = 0;

public VortexRecordsWriter(
Expand All @@ -56,6 +61,9 @@ public VortexRecordsWriter(
throws IOException {
this.arrowFormatWriter = arrowFormatWriter;
this.path = path.toUri().toString();
this.allocator = arrowFormatWriter.getAllocator();
this.arrowArray = ArrowArray.allocateNew(allocator);
this.arrowSchema = ArrowSchema.allocateNew(allocator);

DType dtype = VortexTypeUtils.toDType(rowType);
this.nativeWriter = VortexWriter.create(this.path, dtype, storageOptions);
Expand Down Expand Up @@ -84,9 +92,9 @@ public void writeBundle(BundleRecords bundleRecords) throws IOException {

@Override
public boolean reachTargetSize(boolean suggestedCheck, long targetSize) {
// Note: bytesWritten tracks Arrow IPC serialized bytes, not the actual Vortex file size
// (which may differ due to Vortex's own compression/encoding).
return suggestedCheck && (bytesWritten > targetSize);
// Vortex applies its own compression/encoding, so in-memory Arrow size is much larger
// than the actual file size on disk. Always return false to avoid rolling into small files.
return false;
}

@Override
Expand All @@ -95,6 +103,8 @@ public void close() throws IOException {
LOG.info("Jni cost: {}ms for file: {}", jniCost, path);
long t1 = System.currentTimeMillis();
nativeWriter.close();
arrowArray.close();
arrowSchema.close();
arrowFormatWriter.close();
long closeCost = (System.currentTimeMillis() - t1);
LOG.info("Close cost: {}ms for file: {}", closeCost, path);
Expand All @@ -109,10 +119,10 @@ private void flush() throws IOException {
}

private void writeVsr(VectorSchemaRoot vsr) throws IOException {
byte[] arrowData = serializeToIpc(vsr);
bytesWritten += arrowData.length;
ArrowCStruct cStruct =
ArrowUtils.serializeToCStruct(vsr, arrowArray, arrowSchema, allocator);
long t1 = System.currentTimeMillis();
nativeWriter.writeBatch(arrowData);
nativeWriter.writeBatchFfi(cStruct.arrayAddress(), cStruct.schemaAddress());
jniCost += (System.currentTimeMillis() - t1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ static VortexWriter create(String uri, DType dtype, Map<String, String> options)

void writeBatch(byte[] arrowData) throws IOException;

void writeBatchFfi(long arrowArrayAddr, long arrowSchemaAddr) throws IOException;

@Override
void close() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@ public void writeBatch(byte[] arrowData) throws IOException {
}
}

@Override
public void writeBatchFfi(long arrowArrayAddr, long arrowSchemaAddr) throws IOException {
logger.trace("Writing batch via FFI with arrayAddr={}, schemaAddr={}", arrowArrayAddr, arrowSchemaAddr);

boolean success = NativeWriterMethods.writeBatchFfi(ptr.getAsLong(), arrowArrayAddr, arrowSchemaAddr);
if (!success) {
logger.error("Failed to write batch via FFI to Vortex file");
throw new IOException("Failed to write batch via FFI to Vortex file");
}
}

@Override
public void close() {
if (!this.ptr.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,8 @@ private NativeWriterMethods() {}

public static native boolean writeBatch(long writerPtr, byte[] arrowData);

public static native boolean writeBatchFfi(
long writerPtr, long arrowArrayAddr, long arrowSchemaAddr);

public static native void close(long writerPtr);
}