diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala
index a5ef8bc42..2a2f60cc0 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala
@@ -47,6 +47,11 @@ import scala.collection.JavaConversions
import scala.collection.JavaConversions._
import scala.reflect._
+import org.apache.iceberg.Schema
+import org.apache.iceberg.FileFormat
+import org.apache.iceberg.catalog.{Catalog, TableIdentifier}
+
+
/**
* Represents an intermediate result/data flow edge in a [[WayangPlan]].
*
@@ -1013,7 +1018,6 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I
this.planBuilder.buildAndExplain(toJson)
}
-
/**
* Write the data quanta in this instance to a text file. Triggers execution.
*
@@ -1027,6 +1031,44 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I
writeTextFileJava(url, toSerializableFunction(formatterUdf), udfLoad)
}
+ /**
+ * Write the data quanta in this instance to a iceberg table. Triggers execution.
+ *
+ * @param catalog Iceberg Catalog
+ * @param schema Iceberg Schema of the table to create
+ * @param tableIdentifier Iceberg Table Identifier of the table to create
+ * @param outputFileFormat File format of the output data files
+ */
+
+ def writeIcebergTable(catalog: Catalog,
+ schema: Schema,
+ tableIdentifier: TableIdentifier,
+ outputFileFormat: FileFormat): Unit = {
+ writeIcebergTableJava(catalog, schema, tableIdentifier, outputFileFormat)
+ }
+
+ /**
+ * Write the data quanta in this instance to a iceberg table. Triggers execution.
+ *
+ * @param catalog Iceberg Catalog
+ * @param schema Iceberg Schema of the table to create
+ * @param tableIdentifier Iceberg Table Identifier of the table to create
+ * @param outputFileFormat File format of the output data files
+ */
+ def writeIcebergTableJava(
+ catalog: Catalog,
+ schema: Schema,
+ tableIdentifier: TableIdentifier,
+ outputFileFormat: FileFormat ): Unit = {
+
+ val sink = new ApacheIcebergSink(catalog, schema, tableIdentifier, outputFileFormat)
+
+ sink.setName(s"*#-> Write to Iceberg Table Sink ")
+ this.connectTo(sink, 0)
+ this.planBuilder.sinks += sink
+ this.planBuilder.buildAndExecute()
+ this.planBuilder.sinks.clear()
+ }
def writeParquet(url: String,
overwrite: Boolean = false,
preferDataset: Boolean = false)(implicit ev: Out =:= Record): Unit =
@@ -1095,6 +1137,7 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I
this.planBuilder.sinks.clear()
}
+
private def writeParquetJava(url: String, overwrite: Boolean, preferDataset: Boolean)(implicit ev: Out =:= Record): Unit = {
val _ = ev
val sink = new ParquetSink(url, overwrite, preferDataset)
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala
index dd8b32bbb..dad054a2f 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala
@@ -40,6 +40,10 @@ import org.apache.wayang.core.types.DataSetType
import org.apache.wayang.core.util.{Logging, ReflectionUtils, WayangCollections, Tuple => WayangTuple}
import org.apache.wayang.core.plan.wayangplan.OutputSlot
+import org.apache.iceberg.Schema
+import org.apache.iceberg.FileFormat
+import org.apache.iceberg.catalog.{Catalog, TableIdentifier}
+
import scala.collection.mutable.ListBuffer
@@ -499,6 +503,27 @@ trait DataQuantaBuilder[+This <: DataQuantaBuilder[_, Out], Out] extends Logging
this.dataQuanta().writeTextFileJava(url, formatterUdf, udfLoadProfileEstimator)
}
+ /**
+ * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.IcebergTableSink]]. This triggers
+ * execution of the constructed [[WayangPlan]].
+ *
+ * @param catalog Iceberg Catalog
+ * @param schema Iceberg Schema of the table to create
+ * @param tableIdentifier Iceberg Table Identifier of the table to create
+ * @param outputFileFormat File format of the output data files
+ * @return the collected data quanta
+ */
+
+ def writeIcebergTable(catalog: Catalog,
+ schema: Schema,
+ tableIdentifier: TableIdentifier,
+ outputFileFormat: FileFormat,
+ jobName: String): Unit = {
+ this.javaPlanBuilder.withJobName(jobName)
+ this.dataQuanta().writeIcebergTableJava(catalog, schema, tableIdentifier, outputFileFormat)
+
+ }
+
/**
* Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.KafkaTopicSink]]. This triggers
* execution of the constructed [[WayangPlan]].
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala
index 604f637ad..5e464c76b 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala
@@ -25,13 +25,15 @@ import java.util.{Collection => JavaCollection}
import org.apache.commons.lang3.Validate
import org.apache.wayang.api.util.DataQuantaBuilderCache
import org.apache.wayang.basic.data.Record
-import org.apache.wayang.basic.operators.{AmazonS3Source, AzureBlobStorageSource, GoogleCloudStorageSource, KafkaTopicSource, ParquetSource, TableSource, TextFileSource}
+import org.apache.wayang.basic.operators.{AmazonS3Source, AzureBlobStorageSource, GoogleCloudStorageSource, KafkaTopicSource, ParquetSource, TableSource, TextFileSource, ApacheIcebergSource}
import org.apache.wayang.commons.util.profiledb.model.Experiment
import org.apache.wayang.core.api.WayangContext
import org.apache.wayang.core.plan.wayangplan._
import org.apache.wayang.core.types.DataSetType
import scala.reflect.ClassTag
+import org.apache.iceberg.catalog.{Catalog, TableIdentifier}
+import org.apache.iceberg.expressions.Expression
/**
* Utility to build and execute [[WayangPlan]]s.
@@ -79,6 +81,22 @@ class JavaPlanBuilder(wayangCtx: WayangContext, jobName: String) {
preferDataset: Boolean = false): UnarySourceDataQuantaBuilder[UnarySourceDataQuantaBuilder[_, Record], Record] =
createSourceBuilder(ParquetSource.create(url, projection).preferDatasetOutput(preferDataset))(ClassTag(classOf[Record]))
+ /**
+ * Read an Apache Iceberg table and provide it as a dataset of [[Record]]s.
+ *
+ * @param catalog the Iceberg catalog containing the table
+ * @param tableIdentifier the identifier of the Iceberg table to read
+ * @param filterExpressions optional array of filter expressions to apply during the read
+ * @param projectionColumns optional array of column names to project (select specific columns)
+ * @return [[DataQuantaBuilder]] for the Iceberg table
+ */
+ def readApacheIcebergTable(
+ catalog: Catalog,
+ tableIdentifier: TableIdentifier,
+ filterExpressions: Array[Expression] = null,
+ projectionColumns: Array[String] = null): UnarySourceDataQuantaBuilder[UnarySourceDataQuantaBuilder[_, Record], Record] =
+ createSourceBuilder(ApacheIcebergSource.create(catalog, tableIdentifier, filterExpressions, projectionColumns))(ClassTag(classOf[Record]))
+
/**
* Read a text file from a Google Cloud Storage bucket and provide it as a dataset of [[String]]s, one per line.
*
diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala
index 5e3635e68..d49deded1 100644
--- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala
+++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala
@@ -24,7 +24,7 @@ package org.apache.wayang.api
import org.apache.commons.lang3.Validate
import org.apache.wayang.api
import org.apache.wayang.basic.data.Record
-import org.apache.wayang.basic.operators.{AmazonS3Source, AzureBlobStorageSource, CollectionSource, GoogleCloudStorageSource, ObjectFileSource, ParquetSource, TableSource, TextFileSource}
+import org.apache.wayang.basic.operators.{AmazonS3Source, AzureBlobStorageSource, CollectionSource, GoogleCloudStorageSource, ObjectFileSource, ParquetSource, TableSource, TextFileSource, ApacheIcebergSource}
import org.apache.wayang.commons.util.profiledb.model.Experiment
import org.apache.wayang.core.api.WayangContext
import org.apache.wayang.core.plan.wayangplan._
@@ -34,6 +34,8 @@ import scala.collection.JavaConversions
import scala.collection.mutable.ListBuffer
import scala.language.implicitConversions
import scala.reflect._
+import org.apache.iceberg.catalog.{Catalog, TableIdentifier}
+import org.apache.iceberg.expressions.Expression
/**
* Utility to build [[WayangPlan]]s.
@@ -144,6 +146,21 @@ class PlanBuilder(private[api] val wayangContext: WayangContext, private var job
preferDataset: Boolean = false): DataQuanta[Record] =
load(ParquetSource.create(url, projection).preferDatasetOutput(preferDataset))
+ /**
+ * Read an Apache Iceberg table and provide it as a dataset of [[Record]]s.
+ *
+ * @param catalog the Iceberg catalog containing the table
+ * @param tableIdentifier the identifier of the Iceberg table to read
+ * @param filterExpressions optional array of filter expressions to apply during the read
+ * @param projectionColumns optional array of column names to project (select specific columns)
+ * @return [[DataQuanta]] of [[Record]] for the Iceberg table
+ */
+ def readApacheIcebergTable(
+ catalog: Catalog,
+ tableIdentifier: TableIdentifier,
+ filterExpressions: Array[Expression] = null,
+ projectionColumns: Array[String] = null): DataQuanta[Record] = load(ApacheIcebergSource.create(catalog, tableIdentifier, filterExpressions, projectionColumns))
+
/**
* Read a text file from a Google Cloud Storage bucket and provide it as a dataset of [[String]]s, one per line.
*
diff --git a/wayang-commons/wayang-basic/pom.xml b/wayang-commons/wayang-basic/pom.xml
index 1d1b460ae..049e2f7a6 100644
--- a/wayang-commons/wayang-basic/pom.xml
+++ b/wayang-commons/wayang-basic/pom.xml
@@ -34,6 +34,11 @@
This modules represents the base Wayang package with the default operators and functions.
+
+ 1.6.0
+ 3.3.6
+
+
@@ -132,6 +137,45 @@
com.azure
azure-identity
+
+
+
+
+
+ org.apache.iceberg
+ iceberg-core
+ ${iceberg.version}
+
+
+ org.apache.iceberg
+ iceberg-api
+ ${iceberg.version}
+
+
+ org.apache.iceberg
+ iceberg-parquet
+ ${iceberg.version}
+
+
+ org.apache.iceberg
+ iceberg-data
+ ${iceberg.version}
+
+
+ org.apache.hadoop
+ hadoop-common
+ ${hadoop.version}
+
+
+ org.apache.hadoop
+ hadoop-client
+ ${hadoop.version}
+
+
+ org.slf4j
+ slf4j-simple
+ 2.0.16
+
diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSink.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSink.java
new file mode 100644
index 000000000..e9a2b3626
--- /dev/null
+++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSink.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.wayang.basic.operators;
+
+import org.apache.wayang.core.plan.wayangplan.UnarySink;
+import org.apache.wayang.core.types.DataSetType;
+
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.FileFormat;
+import org.apache.wayang.basic.data.Record;
+
+/**
+ * This {@link UnarySink} writes all incoming data quanta to an iceberg table.
+ * Either if the table does not exists it will create new, otherwise append.
+ *
+ * @param Data Type if the incoming Data Quanta
+ */
+public class ApacheIcebergSink extends UnarySink {
+
+ protected final Catalog catalog;
+ protected final Schema schema;
+ protected final TableIdentifier tableIdentifier;
+ protected final FileFormat outputFileFormat;
+
+ /**
+ *
+ * @param catalog Iceberg catalog used to resolve the target table; must
+ * not be {@code null}
+ * @param schema Iceberg write schema; must be compatible with the
+ * target table
+ * @param tableIdentifier fully qualified identifier of the target table
+ *
+ * @param outputFileFormat {@link FileFormat} the format of the output data files
+ */
+ public ApacheIcebergSink(Catalog catalog, Schema schema, TableIdentifier tableIdentifier, FileFormat outputFileFormat) {
+ super(DataSetType.createDefault(Record.class));
+ this.catalog = catalog;
+ this.schema = schema;
+ this.tableIdentifier = tableIdentifier;
+ this.outputFileFormat = outputFileFormat;
+ }
+
+ /**
+ * Creates a copied instance.
+ *
+ * @param that should be copied
+ */
+ public ApacheIcebergSink(ApacheIcebergSink that) {
+ super(that);
+ this.catalog = that.catalog;
+ this.schema = that.schema;
+ this.tableIdentifier = that.tableIdentifier;
+ this.outputFileFormat = that.outputFileFormat;
+ }
+
+ public FileFormat getOutputFileFormat() {
+ return this.outputFileFormat;
+ }
+}
diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSource.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSource.java
new file mode 100644
index 000000000..3f188f418
--- /dev/null
+++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSource.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.basic.operators;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import org.apache.wayang.basic.data.Record;
+import org.apache.wayang.basic.types.RecordType;
+import org.apache.wayang.commons.util.profiledb.model.measurement.TimeMeasurement;
+import org.apache.wayang.core.api.Configuration;
+import org.apache.wayang.core.optimizer.OptimizationContext;
+import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimate;
+import org.apache.wayang.core.plan.wayangplan.UnarySource;
+import org.apache.wayang.core.types.DataSetType;
+
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.IcebergGenerics;
+import org.apache.iceberg.data.IcebergGenerics.ScanBuilder;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.iceberg.io.CloseableIterable;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator;
+
+
+/**
+ * This source reads an Iceberg Table and outputs the lines as
+ * {@link org.apache.wayang.basic.data.Record}
+ * units.
+ */
+public class ApacheIcebergSource extends UnarySource {
+
+ private final Logger logger = LogManager.getLogger(this.getClass());
+
+ private final Catalog catalog;
+
+ private final TableIdentifier tableIdentifier;
+
+ private Collection whereExpressions;
+ private Collection columns;
+
+ private org.apache.iceberg.Table cachedTable = null;
+
+ private static final Double defaultSelectivityValue = 0.10;
+
+ /**
+ * Creates a new Iceberg source instance.
+ *
+ * @param catalog Iceberg catalog used to load the table
+ * @param tableIdentifier identifier of the target table
+ * @param whereExpressions list of
+ * {@link org.apache.iceberg.expressions.Expression}
+ * filters; empty list for none
+ * @param columns collection of column names to project; empty list for
+ * all columns
+ * @return a new {@link ApacheIcebergSource} instance
+ */
+ public static ApacheIcebergSource create(Catalog catalog, TableIdentifier tableIdentifier,
+ Expression[] whereExpressions,
+ String[] columns) {
+
+ List whereList =
+ (whereExpressions == null) ? Collections.emptyList() : Arrays.asList(whereExpressions);
+
+ List columnList =
+ (columns == null) ? Collections.emptyList() : Arrays.asList(columns);
+
+ return new ApacheIcebergSource(catalog, tableIdentifier, whereList, columnList);
+ }
+
+ public ApacheIcebergSource(Catalog catalog, TableIdentifier tableIdentifier,
+ Collection whereExpressions, Collection columns) {
+
+
+ super(createOutputDataSetType(columns));
+ this.catalog = catalog;
+ this.tableIdentifier = tableIdentifier;
+
+ this.whereExpressions = whereExpressions;
+ this.columns = columns;
+ }
+
+ /**
+ * Copies an instance (exclusive of broadcasts).
+ *
+ * @param that that should be copied
+ */
+ public ApacheIcebergSource(ApacheIcebergSource that) {
+ super(that);
+ this.catalog = that.getCatalog();
+ this.columns = that.getColumns();
+ this.tableIdentifier = that.getTableIdentifier();
+ this.whereExpressions = that.getWhereExpressions();
+ }
+
+ @Override
+ public Optional createCardinalityEstimator(
+ final int outputIndex,
+ final Configuration configuration) {
+ Validate.inclusiveBetween(0, this.getNumOutputs() - 1, outputIndex);
+ return Optional.of(new ApacheIcebergSource.CardinalityEstimator());
+ }
+
+ protected class CardinalityEstimator implements org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator {
+
+ public final CardinalityEstimate FALLBACK_ESTIMATE = new CardinalityEstimate(1000L, 100000000L, 0.7);
+
+ @Override
+ public CardinalityEstimate estimate(OptimizationContext optimizationContext,
+ CardinalityEstimate... inputEstimates) {
+ Validate.isTrue(ApacheIcebergSource.this.getNumInputs() == inputEstimates.length);
+
+ // see Job for StopWatch measurements
+ final TimeMeasurement timeMeasurement = optimizationContext.getJob().getStopWatch().start(
+ "Optimization", "Cardinality&Load Estimation", "Push Estimation", "Estimate source cardinalities");
+
+ // Query the job cache first to see if there is already an estimate.
+ String jobCacheKey = String.format("%s.estimate(%s)", this.getClass().getCanonicalName(),
+ ApacheIcebergSource.this.getIcebergTableName());
+
+ CardinalityEstimate cardinalityEstimate = optimizationContext.queryJobCache(jobCacheKey,
+ CardinalityEstimate.class);
+ if (cardinalityEstimate != null)
+ return cardinalityEstimate;
+
+ // Otherwise calculate the cardinality.
+ // First, inspect the size of the file and its line sizes.
+ OptionalLong fileSize = getFileSize();
+ if (fileSize.isEmpty()) {
+ ApacheIcebergSource.this.logger.warn("Could not determine size of {}... deliver fallback estimate.",
+ ApacheIcebergSource.this.getIcebergTableName());
+ timeMeasurement.stop();
+ return this.FALLBACK_ESTIMATE;
+ }
+ if (fileSize.getAsLong() == 0L) {
+ timeMeasurement.stop();
+ return new CardinalityEstimate(0L, 0L, 1d);
+ }
+
+ OptionalLong numberRows = ApacheIcebergSource.this.ExtractNumberOfRows();
+
+ if (numberRows.isEmpty()) {
+ ApacheIcebergSource.this.logger
+ .warn("Could not determine the cardinality of {}... deliver fallback estimate.",
+ ApacheIcebergSource.this.getIcebergTableName());
+ timeMeasurement.stop();
+ return this.FALLBACK_ESTIMATE;
+ }
+
+ long rowCount = numberRows.getAsLong();
+ cardinalityEstimate = new CardinalityEstimate(rowCount, rowCount, 1d);
+
+ // Cache the result, so that it will not be recalculated again.
+ optimizationContext.putIntoJobCache(jobCacheKey, cardinalityEstimate);
+
+ timeMeasurement.stop();
+ return cardinalityEstimate;
+ }
+ }
+
+ /**
+ * Creates a {@link DataSetType} for the output records based on the given
+ * column names.
+ *
+ * @param columnNames collection of column names to include; empty for default
+ * record type
+ * @return a {@link DataSetType} describing the output record structure
+ */
+
+ private static DataSetType createOutputDataSetType(Collection columnNames) {
+ if (columnNames == null) {
+ columnNames = new ArrayList();
+ }
+ String[] columnNamesAsArray = columnNames.toArray(new String[0]);
+ return columnNamesAsArray.length == 0 ? DataSetType.createDefault(Record.class)
+ : DataSetType.createDefault(new RecordType(columnNamesAsArray));
+ }
+
+ public Catalog getCatalog() {
+ return catalog;
+ }
+
+ public TableIdentifier getTableIdentifier() {
+ return tableIdentifier;
+ }
+
+ public Collection getWhereExpressions() {
+ return whereExpressions;
+ }
+
+ public Collection getColumns() {
+ return columns;
+ }
+
+ private void setCachedTable(Table table) {
+ this.cachedTable = table;
+ }
+
+ /**
+ * Returns the Iceberg table name.
+ *
+ * @return the table name from the
+ * {@link org.apache.iceberg.catalog.TableIdentifier}
+ */
+ public String getIcebergTableName() {
+ return tableIdentifier.name();
+ }
+
+ /**
+ * Loads and returns the Iceberg {@link org.apache.iceberg.Table}.
+ * Uses a cached instance if available.
+ *
+ * @return the loaded Iceberg table
+ */
+ private Table getTable() {
+ if (this.cachedTable != null) {
+ return this.cachedTable;
+ }
+
+ Table table = this.catalog.loadTable(this.tableIdentifier);
+ setCachedTable(table);
+ return table;
+ }
+
+ /**
+ * Builds a {@link org.apache.iceberg.data.ScanBuilder} for the current table.
+ * Applies selected columns and filter expressions if provided.
+ *
+ * @return configured {@link org.apache.iceberg.data.ScanBuilder}
+ */
+ public ScanBuilder getScanBuilder() {
+
+ ScanBuilder scanBuilder = IcebergGenerics.read(getTable());
+
+ if (this.columns != null && this.columns.size() > 0) {
+ scanBuilder = scanBuilder.select(columns);
+ }
+
+ if (this.whereExpressions != null && this.whereExpressions.size() > 0) {
+ for (Expression whereExpression : this.whereExpressions) {
+ scanBuilder = scanBuilder.where(whereExpression);
+ }
+ }
+
+ return scanBuilder;
+ }
+
+ /**
+ * Estimates the number of rows in the table.
+ * Applies a selectivity adjustment if filter expressions are present.
+ *
+ * @return estimated number of rows, or {@link OptionalLong#empty()} if
+ * unavailable
+ */
+ private OptionalLong ExtractNumberOfRows() {
+ try {
+ long rowCount = 0L;
+
+ Table table = getTable();
+
+ try (CloseableIterable tasks = table.newScan().planFiles()) {
+ for (FileScanTask fileScanTask : tasks) {
+ rowCount += fileScanTask.estimatedRowsCount();
+ }
+ }
+
+ if (rowCount == 0) {
+ return OptionalLong.empty();
+ }
+
+ if (this.whereExpressions != null && this.whereExpressions.size() > 1) {
+
+ Double updatedRowCount = rowCount * Math.pow(defaultSelectivityValue, this.whereExpressions.size());
+ return OptionalLong.of(updatedRowCount.longValue());
+ }
+
+ return OptionalLong.of(rowCount);
+
+ } catch (Exception e) {
+ this.logger.warn("Could not extract the number of rows. Returning empty. Got erro: " + e);
+ return OptionalLong.empty();
+ }
+
+ }
+
+ /**
+ * Calculates the total file size in bytes of all table files.
+ *
+ * @return total file size in bytes, or {@link OptionalLong#empty()} if
+ * unavailable
+ */
+ private OptionalLong getFileSize() {
+
+ try {
+ long fileSizeCount = 0L;
+ try (CloseableIterable tasks = getTable().newScan().planFiles()) {
+ for (FileScanTask t : tasks) {
+ fileSizeCount += t.file().fileSizeInBytes();
+ }
+ }
+ return OptionalLong.of(fileSizeCount);
+
+ } catch (Exception e) {
+ this.logger.warn("Could not get file size. Returning empty. Got error: " + e);
+ return OptionalLong.empty();
+
+ }
+
+ }
+
+}
diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ApacheIcebergSinkMapping.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ApacheIcebergSinkMapping.java
new file mode 100644
index 000000000..18fc7862f
--- /dev/null
+++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ApacheIcebergSinkMapping.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.wayang.java.mapping;
+
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.wayang.basic.operators.ApacheIcebergSink;
+import org.apache.wayang.basic.operators.TextFileSink;
+import org.apache.wayang.core.api.exception.WayangException;
+import org.apache.wayang.core.mapping.Mapping;
+import org.apache.wayang.core.mapping.OperatorPattern;
+import org.apache.wayang.core.mapping.PlanTransformation;
+import org.apache.wayang.core.mapping.ReplacementSubplanFactory;
+import org.apache.wayang.core.mapping.SubplanPattern;
+import org.apache.wayang.core.plan.wayangplan.OperatorBase;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.java.operators.JavaApacheIcebergSink;
+import org.apache.wayang.java.operators.JavaTextFileSink;
+import org.apache.wayang.java.platform.JavaPlatform;
+
+/**
+ * Mapping from {@link ApacheIcebergSink} to
+ * {@link JavaApacheIcebergSink}.
+ */
+public class ApacheIcebergSinkMapping implements Mapping {
+
+ @Override
+ public Collection getTransformations() {
+ return Collections.singleton(new PlanTransformation(
+ this.createSubplanPattern(),
+ this.createReplacementSubplanFactory(),
+ JavaPlatform.getInstance()));
+ }
+
+ private SubplanPattern createSubplanPattern() {
+ ApacheIcebergSink icebergSink = new ApacheIcebergSink(null, null, null, null);
+ final OperatorPattern operatorPattern = new OperatorPattern<>(
+ "sink", icebergSink, false);
+ return SubplanPattern.createSingleton(operatorPattern);
+ }
+
+ private ReplacementSubplanFactory createReplacementSubplanFactory() {
+ return new ReplacementSubplanFactory.OfSingleOperators(
+ (matchedOperator, epoch) -> new JavaApacheIcebergSink(matchedOperator).at(epoch));
+ }
+}
diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ApacheIcebergSourceMapping.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ApacheIcebergSourceMapping.java
new file mode 100644
index 000000000..996540492
--- /dev/null
+++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ApacheIcebergSourceMapping.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.wayang.java.mapping;
+
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.wayang.basic.operators.ApacheIcebergSource;
+import org.apache.wayang.basic.operators.TextFileSource;
+import org.apache.wayang.core.mapping.Mapping;
+import org.apache.wayang.core.mapping.OperatorPattern;
+import org.apache.wayang.core.mapping.PlanTransformation;
+import org.apache.wayang.core.mapping.ReplacementSubplanFactory;
+import org.apache.wayang.core.mapping.SubplanPattern;
+import org.apache.wayang.java.operators.JavaApacheIcebergSource;
+import org.apache.wayang.java.platform.JavaPlatform;
+import org.apache.iceberg.expressions.Expression;
+
+
+/**
+ * Mapping from {@link ApahceIcebergSource} to {@link JavaApacheIcebergSource}.
+ */
+public class ApacheIcebergSourceMapping implements Mapping {
+
+
+ @Override
+ public Collection getTransformations() {
+ return Collections.singleton(new PlanTransformation(
+ this.createSubplanPattern(),
+ this.createReplacementSubplanFactory(),
+ JavaPlatform.getInstance()
+ ));
+ }
+
+ private SubplanPattern createSubplanPattern() {
+ ApacheIcebergSource icebergSource = new ApacheIcebergSource((Catalog) null, (TableIdentifier) null, (Collection) null, (Collection) null);
+
+ final OperatorPattern operatorPattern = new OperatorPattern(
+ "source", new ApacheIcebergSource(icebergSource), false
+ );
+ return SubplanPattern.createSingleton(operatorPattern);
+ }
+
+ private ReplacementSubplanFactory createReplacementSubplanFactory() {
+ return new ReplacementSubplanFactory.OfSingleOperators(
+ (matchedOperator, epoch) -> new JavaApacheIcebergSource(matchedOperator).at(epoch)
+ );
+ }
+
+}
diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java
index dfdeca43e..a4418c18a 100644
--- a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java
+++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java
@@ -62,7 +62,9 @@ public class Mappings {
new KafkaTopicSinkMapping(),
new AmazonS3SourceMapping(),
new GoogleCloudStorageSourceMapping(),
- new AzureBlobStorageSourceMapping()
+ new AzureBlobStorageSourceMapping(),
+ new ApacheIcebergSourceMapping(),
+ new ApacheIcebergSinkMapping()
);
public static Collection GRAPH_MAPPINGS = Arrays.asList(
diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaApacheIcebergSink.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaApacheIcebergSink.java
new file mode 100644
index 000000000..f2042f61f
--- /dev/null
+++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaApacheIcebergSink.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.wayang.java.operators;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Stream;
+
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.encryption.EncryptedOutputFile;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.io.OutputFileFactory;
+import org.apache.iceberg.data.Record;
+import org.apache.wayang.basic.channels.FileChannel;
+import org.apache.wayang.basic.operators.ApacheIcebergSink;
+import org.apache.wayang.core.api.exception.WayangException;
+import org.apache.wayang.core.optimizer.OptimizationContext;
+import org.apache.wayang.core.plan.wayangplan.ExecutionOperator;
+import org.apache.wayang.core.plan.wayangplan.Operator;
+import org.apache.wayang.core.platform.ChannelDescriptor;
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
+import org.apache.wayang.core.types.DataSetType;
+import org.apache.wayang.core.util.Tuple;
+import org.apache.wayang.java.channels.CollectionChannel;
+import org.apache.wayang.java.channels.JavaChannelInstance;
+import org.apache.wayang.java.channels.StreamChannel;
+import org.apache.wayang.java.execution.JavaExecutor;
+import org.apache.wayang.java.platform.JavaPlatform;
+
+/**
+ * {@link Operator} for the {@link JavaPlatform} that creates an Iceberg Table.
+ * If the table does not exists it will create a new, otherwise it will append
+ * to a table
+ * {@code R} Is the input type of the incoming Data Stream. Must be a
+ * {@link org.apache.wayang.basic.data.Record} (or subclass).
+ */
+
+public class JavaApacheIcebergSink extends ApacheIcebergSink
+ implements JavaExecutionOperator {
+
+ private final int defaultPartitionId = 1;
+ private final int defaultTaskId = 1;
+
+ /**
+ * Creates a new sink for the Java Platform.
+ *
+ * @param catalog Iceberg catalog used to resolve the target table; must
+ * not be {@code null}
+ * @param schema Iceberg write schema; must be compatible with the
+ * target table
+ * @param tableIdentifier fully qualified identifier of the target table
+ *
+ * @param fileFormat file format used for writing (e.g., Parquet, Avro)
+ */
+ public JavaApacheIcebergSink(
+ Catalog catalog,
+ Schema schema,
+ TableIdentifier tableIdentifier,
+ FileFormat outputFileFormnat) {
+ super(catalog, schema, tableIdentifier, outputFileFormnat);
+ }
+
+
+ /**
+ * Copy constructor.
+ *
+ * @param that sink instance to copy
+ */
+
+ public JavaApacheIcebergSink(ApacheIcebergSink that) {
+ super(that);
+ }
+
+
+
+ @Override
+ public Tuple, Collection> evaluate(
+ ChannelInstance[] inputs,
+ ChannelInstance[] outputs,
+ JavaExecutor javaExecutor,
+ OptimizationContext.OperatorContext operatorContext) {
+
+ try {
+
+ assert inputs.length == 1;
+ assert outputs.length == 2;
+
+ FileFormat outputFileFormat = getOutputFileFormat();
+
+ JavaChannelInstance input = (JavaChannelInstance) inputs[0];
+
+ if (!tableExists()) {
+ catalog.createTable(tableIdentifier, schema);
+ }
+
+ Stream inputStream = input
+ .provideStream()
+ .peek(r -> {
+ if (!(r instanceof org.apache.wayang.basic.data.Record)) {
+ throw new WayangException("Expected Wayang Record but got " + r.getClass());
+ }})
+ .map(r -> wayangRecordToIcebergRecord((org.apache.wayang.basic.data.Record) r));
+
+ Table table = catalog.loadTable(tableIdentifier);
+ OutputFileFactory outputFileFactory = OutputFileFactory
+ .builderFor(table, this.defaultPartitionId, this.defaultTaskId)
+ .format(outputFileFormat)
+ .build();
+
+ EncryptedOutputFile outputFile = outputFileFactory.newOutputFile();
+
+ FileAppenderFactory appenderFactory = new org.apache.iceberg.data.GenericAppenderFactory(
+ this.schema);
+
+ try (DataWriter writer = appenderFactory.newDataWriter(outputFile,
+ outputFileFormat, /* Partition */null)) {
+
+ inputStream.forEach(dataQuanta -> {
+ writer.write(dataQuanta);
+ });
+
+ }
+
+ } catch (Exception e) {
+ throw new WayangException("Coult not write stream to iceberg location.", e);
+ }
+
+ return ExecutionOperator.modelEagerExecution(inputs, outputs, operatorContext);
+
+ }
+
+ @Override
+ protected ExecutionOperator createCopy() {
+ return new JavaApacheIcebergSink(this.catalog, this.schema, this.tableIdentifier, getOutputFileFormat());
+ }
+
+ @Override
+ public String getLoadProfileEstimatorConfigurationKey() {
+ return "wayang.java.apacheicebergsink.load";
+ }
+
+ @Override
+ public List getSupportedInputChannels(int index) {
+ return Arrays.asList(CollectionChannel.DESCRIPTOR, StreamChannel.DESCRIPTOR);
+ }
+
+ @Override
+ public List getSupportedOutputChannels(int index) {
+ throw new UnsupportedOperationException();
+ }
+
+ private org.apache.iceberg.data.Record wayangRecordToIcebergRecord(org.apache.wayang.basic.data.Record wayangRecord) {
+ GenericRecord template = GenericRecord.create(this.schema);
+ Record out = template.copy();
+
+ int n = this.schema.columns().size();
+ for (int i = 0; i < n; i++) {
+ Object v = wayangRecord.getField(i);
+ out.set(i, v);
+ }
+ return out;
+ };
+
+ private boolean tableExists() {
+ return catalog.tableExists(tableIdentifier);
+ }
+
+}
diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaApacheIcebergSource.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaApacheIcebergSource.java
new file mode 100644
index 000000000..1b10eee91
--- /dev/null
+++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaApacheIcebergSource.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.wayang.java.operators;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.IcebergGenerics.ScanBuilder;
+import org.apache.iceberg.expressions.Expression;
+import org.apache.wayang.basic.operators.ApacheIcebergSource;
+import org.apache.wayang.core.api.exception.WayangException;
+import org.apache.wayang.core.optimizer.OptimizationContext;
+import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators;
+import org.apache.wayang.core.platform.ChannelDescriptor;
+import org.apache.wayang.core.platform.ChannelInstance;
+import org.apache.wayang.core.platform.lineage.ExecutionLineageNode;
+import org.apache.wayang.core.util.Tuple;
+import org.apache.wayang.java.channels.StreamChannel;
+import org.apache.wayang.java.execution.JavaExecutor;
+
+/**
+ * This is execution operator implements the {@link ApacheIcebergSource}.
+ */
+public class JavaApacheIcebergSource
+extends ApacheIcebergSource implements JavaExecutionOperator {
+
+ /**
+ * Creates a new Java Iceberg source instance.
+ *
+ * @param catalog {@link org.apache.iceberg.catalog.Catalog} catalog
+ * used to load the table
+ * @param tableIdentifier {@linkorg.apache.iceberg.catalog.TableIdentifier} identifier
+ * of the target table
+ * @param whereExpressions Collection of
+ * {@link org.apache.iceberg.expressions.Expression}
+ * filters; empty list for none
+ * @param columns collection of column names to project; empty list for
+ * all columns
+ * @return a new {@link JavaApacheIcebergSource} instance
+ */
+
+ public JavaApacheIcebergSource(Catalog catalog, TableIdentifier tableIdentifier,
+ Expression[] whereExpressions, String[] columns) {
+ super(ApacheIcebergSource.create(catalog, tableIdentifier, whereExpressions, columns));
+ }
+
+ /**
+ * Creates a {@link Stream} of {@link org.apache.wayang.basic.data.Record}
+ * objects
+ * from the given Iceberg {@link org.apache.iceberg.data.ScanBuilder}.
+ *
+ * @param scanBuilder the configured {@link org.apache.iceberg.data.ScanBuilder}
+ * used to read table data
+ * @return a sequential {@link Stream} of Wayang
+ * {@link org.apache.wayang.basic.data.Record} instances
+ */
+ private static Stream getStreamFromIcebergTable(ScanBuilder scanBuilder) {
+ return StreamSupport.stream(scanBuilder.build().spliterator(), false)
+ .map(record -> getWayangRecord(record));
+ }
+
+ /**
+ * Converts an Iceberg {@link org.apache.iceberg.data.Record} to a Wayang
+ * {@link org.apache.wayang.basic.data.Record}.
+ *
+ * @param icebergRecord the Iceberg record to convert
+ * @return a new Wayang {@link org.apache.wayang.basic.data.Record} containing
+ * the same field values
+ */
+ private static org.apache.wayang.basic.data.Record getWayangRecord(org.apache.iceberg.data.Record icebergRecord) {
+ Object[] values = new Object[icebergRecord.size()];
+ for (int i = 0; i < values.length; i++) {
+ values[i] = icebergRecord.get(i);
+ }
+ return new org.apache.wayang.basic.data.Record(values);
+ }
+
+ @Override
+ public Tuple, Collection> evaluate(
+ ChannelInstance[] inputs,
+ ChannelInstance[] outputs,
+ JavaExecutor javaExecutor,
+ OptimizationContext.OperatorContext operatorContext) {
+
+ assert inputs.length == this.getNumInputs();
+ assert outputs.length == this.getNumOutputs();
+
+ String tableName = this.getIcebergTableName();
+
+ try {
+
+ ScanBuilder scanBuilder = this.getScanBuilder();
+
+ ((StreamChannel.Instance) outputs[0]).accept(getStreamFromIcebergTable(scanBuilder));
+
+ ExecutionLineageNode prepareLineageNode = new ExecutionLineageNode(operatorContext);
+ prepareLineageNode.add(LoadProfileEstimators.createFromSpecification(
+ "wayang.java.apacheicebergsource.load.prepare", javaExecutor.getConfiguration()));
+ ExecutionLineageNode mainLineageNode = new ExecutionLineageNode(operatorContext);
+ mainLineageNode.add(LoadProfileEstimators.createFromSpecification(
+ "wayang.java.apacheicebergsource.load.main", javaExecutor.getConfiguration()));
+
+ outputs[0].getLineage().addPredecessor(mainLineageNode);
+
+ return prepareLineageNode.collectAndMark();
+
+ } catch (Exception e) {
+ throw new WayangException(String.format("Reading from Apache Iceberg Source table %s failed.", tableName),
+ e);
+
+ }
+
+ }
+
+ /**
+ * Copies an instance (exclusive of broadcasts).
+ *
+ * @param that that should be copied
+ */
+ public JavaApacheIcebergSource(ApacheIcebergSource that) {
+ super(that);
+ }
+
+ @Override
+ public Collection getLoadProfileEstimatorConfigurationKeys() {
+ return Arrays.asList("wayang.java.apacheicebergsource.load.prepare",
+ "wayang.java.apacheicebergsource.load.main");
+ }
+
+ @Override
+ public List getSupportedInputChannels(int index) {
+ throw new UnsupportedOperationException(String.format("%s does not have input channels.", this));
+ }
+
+ @Override
+ public List getSupportedOutputChannels(int index) {
+ assert index <= this.getNumOutputs() || (index == 0 && this.getNumOutputs() == 0);
+ return Collections.singletonList(StreamChannel.DESCRIPTOR);
+ }
+
+}