Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
6ca23cd
add support ApacheIcebergSource in Wayang Basic
ChristofferKristensen Oct 9, 2025
3c2f21b
Add support for a Basic ApacheIcebergSource
ChristofferKristensen Oct 11, 2025
b1d311f
Add implementation for a JavaApacheIcergSource that allows to read fr…
ChristofferKristensen Oct 11, 2025
cee5f69
Add support for a basic ApacheIcebergSink that allows to write data t…
ChristofferKristensen Oct 11, 2025
736f4cd
Add implementation for a JavaApacheIcebergSink that allows to write W…
ChristofferKristensen Oct 11, 2025
6fd5874
refactor whereExpression to be of type Collection and not List
ChristofferKristensen Oct 11, 2025
9597d52
Add mappings for iceberg source and sinks
ChristofferKristensen Oct 11, 2025
f720435
Suport for Source iceberg Tables
ChristofferKristensen Oct 23, 2025
3c5efc1
add methods to read from and write to iceberg tables
ChristofferKristensen Jan 13, 2026
2e5c885
add source operators for apache iceberg for tje Java Platform
ChristofferKristensen Jan 13, 2026
f552d28
add sink operators for Apache Iceberg for the Java Platform
ChristofferKristensen Jan 13, 2026
7fd48d6
Merge branch 'main' into main
ChristofferEmilKristensen Jan 14, 2026
0e67d6a
update imports for wayang-api-scala-java files
ChristofferEmilKristensen Jan 14, 2026
813e050
update imports so all class names are not fully qualifed with namespa…
ChristofferEmilKristensen Jan 14, 2026
a0df79d
change references to be apache iceberg source
ChristofferKristensen Jan 15, 2026
8f99f1a
fix unamibious reference with CardinalityEstimators
ChristofferKristensen Jan 15, 2026
d2fc0d5
add space between tableIdentifier and Class object.
ChristofferKristensen Jan 15, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ import scala.collection.JavaConversions
import scala.collection.JavaConversions._
import scala.reflect._

import org.apache.iceberg.Schema
import org.apache.iceberg.FileFormat
import org.apache.iceberg.catalog.{Catalog, TableIdentifier}


/**
* Represents an intermediate result/data flow edge in a [[WayangPlan]].
*
Expand Down Expand Up @@ -1013,7 +1018,6 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I
this.planBuilder.buildAndExplain(toJson)
}


/**
* Write the data quanta in this instance to a text file. Triggers execution.
*
Expand All @@ -1027,6 +1031,44 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I
writeTextFileJava(url, toSerializableFunction(formatterUdf), udfLoad)
}

/**
* Write the data quanta in this instance to a iceberg table. Triggers execution.
*
* @param catalog Iceberg Catalog
* @param schema Iceberg Schema of the table to create
* @param tableIdentifier Iceberg Table Identifier of the table to create
* @param outputFileFormat File format of the output data files
*/

def writeIcebergTable(catalog: Catalog,
schema: Schema,
tableIdentifier: TableIdentifier,
outputFileFormat: FileFormat): Unit = {
writeIcebergTableJava(catalog, schema, tableIdentifier, outputFileFormat)
}

/**
* Write the data quanta in this instance to a iceberg table. Triggers execution.
*
* @param catalog Iceberg Catalog
* @param schema Iceberg Schema of the table to create
* @param tableIdentifier Iceberg Table Identifier of the table to create
* @param outputFileFormat File format of the output data files
*/
def writeIcebergTableJava(
catalog: Catalog,
schema: Schema,
tableIdentifier: TableIdentifier,
outputFileFormat: FileFormat ): Unit = {

val sink = new ApacheIcebergSink(catalog, schema, tableIdentifier, outputFileFormat)

sink.setName(s"*#-> Write to Iceberg Table Sink ")
this.connectTo(sink, 0)
this.planBuilder.sinks += sink
this.planBuilder.buildAndExecute()
this.planBuilder.sinks.clear()
}
def writeParquet(url: String,
overwrite: Boolean = false,
preferDataset: Boolean = false)(implicit ev: Out =:= Record): Unit =
Expand Down Expand Up @@ -1095,6 +1137,7 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I
this.planBuilder.sinks.clear()
}


private def writeParquetJava(url: String, overwrite: Boolean, preferDataset: Boolean)(implicit ev: Out =:= Record): Unit = {
val _ = ev
val sink = new ParquetSink(url, overwrite, preferDataset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ import org.apache.wayang.core.types.DataSetType
import org.apache.wayang.core.util.{Logging, ReflectionUtils, WayangCollections, Tuple => WayangTuple}
import org.apache.wayang.core.plan.wayangplan.OutputSlot

import org.apache.iceberg.Schema
import org.apache.iceberg.FileFormat
import org.apache.iceberg.catalog.{Catalog, TableIdentifier}



import scala.collection.mutable.ListBuffer
Expand Down Expand Up @@ -499,6 +503,27 @@ trait DataQuantaBuilder[+This <: DataQuantaBuilder[_, Out], Out] extends Logging
this.dataQuanta().writeTextFileJava(url, formatterUdf, udfLoadProfileEstimator)
}

/**
* Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.IcebergTableSink]]. This triggers
* execution of the constructed [[WayangPlan]].
*
* @param catalog Iceberg Catalog
* @param schema Iceberg Schema of the table to create
* @param tableIdentifier Iceberg Table Identifier of the table to create
* @param outputFileFormat File format of the output data files
* @return the collected data quanta
*/

def writeIcebergTable(catalog: Catalog,
schema: Schema,
tableIdentifier: TableIdentifier,
outputFileFormat: FileFormat,
jobName: String): Unit = {
this.javaPlanBuilder.withJobName(jobName)
this.dataQuanta().writeIcebergTableJava(catalog, schema, tableIdentifier, outputFileFormat)

}

/**
* Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.KafkaTopicSink]]. This triggers
* execution of the constructed [[WayangPlan]].
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ import java.util.{Collection => JavaCollection}
import org.apache.commons.lang3.Validate
import org.apache.wayang.api.util.DataQuantaBuilderCache
import org.apache.wayang.basic.data.Record
import org.apache.wayang.basic.operators.{AmazonS3Source, AzureBlobStorageSource, GoogleCloudStorageSource, KafkaTopicSource, ParquetSource, TableSource, TextFileSource}
import org.apache.wayang.basic.operators.{AmazonS3Source, AzureBlobStorageSource, GoogleCloudStorageSource, KafkaTopicSource, ParquetSource, TableSource, TextFileSource, ApacheIcebergSource}
import org.apache.wayang.commons.util.profiledb.model.Experiment
import org.apache.wayang.core.api.WayangContext
import org.apache.wayang.core.plan.wayangplan._
import org.apache.wayang.core.types.DataSetType

import scala.reflect.ClassTag
import org.apache.iceberg.catalog.{Catalog, TableIdentifier}
import org.apache.iceberg.expressions.Expression

/**
* Utility to build and execute [[WayangPlan]]s.
Expand Down Expand Up @@ -79,6 +81,22 @@ class JavaPlanBuilder(wayangCtx: WayangContext, jobName: String) {
preferDataset: Boolean = false): UnarySourceDataQuantaBuilder[UnarySourceDataQuantaBuilder[_, Record], Record] =
createSourceBuilder(ParquetSource.create(url, projection).preferDatasetOutput(preferDataset))(ClassTag(classOf[Record]))

/**
* Read an Apache Iceberg table and provide it as a dataset of [[Record]]s.
*
* @param catalog the Iceberg catalog containing the table
* @param tableIdentifier the identifier of the Iceberg table to read
* @param filterExpressions optional array of filter expressions to apply during the read
* @param projectionColumns optional array of column names to project (select specific columns)
* @return [[DataQuantaBuilder]] for the Iceberg table
*/
def readApacheIcebergTable(
catalog: Catalog,
tableIdentifier: TableIdentifier,
filterExpressions: Array[Expression] = null,
projectionColumns: Array[String] = null): UnarySourceDataQuantaBuilder[UnarySourceDataQuantaBuilder[_, Record], Record] =
createSourceBuilder(ApacheIcebergSource.create(catalog, tableIdentifier, filterExpressions, projectionColumns))(ClassTag(classOf[Record]))

/**
* Read a text file from a Google Cloud Storage bucket and provide it as a dataset of [[String]]s, one per line.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ package org.apache.wayang.api
import org.apache.commons.lang3.Validate
import org.apache.wayang.api
import org.apache.wayang.basic.data.Record
import org.apache.wayang.basic.operators.{AmazonS3Source, AzureBlobStorageSource, CollectionSource, GoogleCloudStorageSource, ObjectFileSource, ParquetSource, TableSource, TextFileSource}
import org.apache.wayang.basic.operators.{AmazonS3Source, AzureBlobStorageSource, CollectionSource, GoogleCloudStorageSource, ObjectFileSource, ParquetSource, TableSource, TextFileSource, ApacheIcebergSource}
import org.apache.wayang.commons.util.profiledb.model.Experiment
import org.apache.wayang.core.api.WayangContext
import org.apache.wayang.core.plan.wayangplan._
Expand All @@ -34,6 +34,8 @@ import scala.collection.JavaConversions
import scala.collection.mutable.ListBuffer
import scala.language.implicitConversions
import scala.reflect._
import org.apache.iceberg.catalog.{Catalog, TableIdentifier}
import org.apache.iceberg.expressions.Expression

/**
* Utility to build [[WayangPlan]]s.
Expand Down Expand Up @@ -144,6 +146,21 @@ class PlanBuilder(private[api] val wayangContext: WayangContext, private var job
preferDataset: Boolean = false): DataQuanta[Record] =
load(ParquetSource.create(url, projection).preferDatasetOutput(preferDataset))

/**
* Read an Apache Iceberg table and provide it as a dataset of [[Record]]s.
*
* @param catalog the Iceberg catalog containing the table
* @param tableIdentifier the identifier of the Iceberg table to read
* @param filterExpressions optional array of filter expressions to apply during the read
* @param projectionColumns optional array of column names to project (select specific columns)
* @return [[DataQuanta]] of [[Record]] for the Iceberg table
*/
def readApacheIcebergTable(
catalog: Catalog,
tableIdentifier: TableIdentifier,
filterExpressions: Array[Expression] = null,
projectionColumns: Array[String] = null): DataQuanta[Record] = load(ApacheIcebergSource.create(catalog, tableIdentifier, filterExpressions, projectionColumns))

/**
* Read a text file from a Google Cloud Storage bucket and provide it as a dataset of [[String]]s, one per line.
*
Expand Down
44 changes: 44 additions & 0 deletions wayang-commons/wayang-basic/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
This modules represents the base Wayang package with the default operators and functions.
</description>

<properties>
<iceberg.version>1.6.0</iceberg.version>
<hadoop.version>3.3.6</hadoop.version>
</properties>

<dependencyManagement>
<dependencies>
<!-- Google Cloud Libraries BOM -->
Expand Down Expand Up @@ -132,6 +137,45 @@
<groupId>com.azure</groupId>
<artifactId>azure-identity</artifactId>
</dependency>


<!-- Apache Iceberg properties-->

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-core</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-api</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-parquet</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-data</artifactId>
<version>${iceberg.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>2.0.16</version>
</dependency>


</dependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.wayang.basic.operators;

import org.apache.wayang.core.plan.wayangplan.UnarySink;
import org.apache.wayang.core.types.DataSetType;

import org.apache.iceberg.Schema;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.FileFormat;
import org.apache.wayang.basic.data.Record;

/**
* This {@link UnarySink} writes all incoming data quanta to an iceberg table.
* Either if the table does not exists it will create new, otherwise append.
*
* @param <T> Data Type if the incoming Data Quanta
*/
public class ApacheIcebergSink extends UnarySink<org.apache.wayang.basic.data.Record> {

protected final Catalog catalog;
protected final Schema schema;
protected final TableIdentifier tableIdentifier;
protected final FileFormat outputFileFormat;

/**
*
* @param catalog Iceberg catalog used to resolve the target table; must
* not be {@code null}
* @param schema Iceberg write schema; must be compatible with the
* target table
* @param tableIdentifier fully qualified identifier of the target table
*
* @param outputFileFormat {@link FileFormat} the format of the output data files
*/
public ApacheIcebergSink(Catalog catalog, Schema schema, TableIdentifier tableIdentifier, FileFormat outputFileFormat) {
super(DataSetType.createDefault(Record.class));
this.catalog = catalog;
this.schema = schema;
this.tableIdentifier = tableIdentifier;
this.outputFileFormat = outputFileFormat;
}

/**
* Creates a copied instance.
*
* @param that should be copied
*/
public ApacheIcebergSink(ApacheIcebergSink that) {
super(that);
this.catalog = that.catalog;
this.schema = that.schema;
this.tableIdentifier = that.tableIdentifier;
this.outputFileFormat = that.outputFileFormat;
}

public FileFormat getOutputFileFormat() {
return this.outputFileFormat;
}
}
Loading