diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala index a5ef8bc42..2a2f60cc0 100644 --- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuanta.scala @@ -47,6 +47,11 @@ import scala.collection.JavaConversions import scala.collection.JavaConversions._ import scala.reflect._ +import org.apache.iceberg.Schema +import org.apache.iceberg.FileFormat +import org.apache.iceberg.catalog.{Catalog, TableIdentifier} + + /** * Represents an intermediate result/data flow edge in a [[WayangPlan]]. * @@ -1013,7 +1018,6 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I this.planBuilder.buildAndExplain(toJson) } - /** * Write the data quanta in this instance to a text file. Triggers execution. * @@ -1027,6 +1031,44 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I writeTextFileJava(url, toSerializableFunction(formatterUdf), udfLoad) } + /** + * Write the data quanta in this instance to a iceberg table. Triggers execution. + * + * @param catalog Iceberg Catalog + * @param schema Iceberg Schema of the table to create + * @param tableIdentifier Iceberg Table Identifier of the table to create + * @param outputFileFormat File format of the output data files + */ + + def writeIcebergTable(catalog: Catalog, + schema: Schema, + tableIdentifier: TableIdentifier, + outputFileFormat: FileFormat): Unit = { + writeIcebergTableJava(catalog, schema, tableIdentifier, outputFileFormat) + } + + /** + * Write the data quanta in this instance to a iceberg table. Triggers execution. + * + * @param catalog Iceberg Catalog + * @param schema Iceberg Schema of the table to create + * @param tableIdentifier Iceberg Table Identifier of the table to create + * @param outputFileFormat File format of the output data files + */ + def writeIcebergTableJava( + catalog: Catalog, + schema: Schema, + tableIdentifier: TableIdentifier, + outputFileFormat: FileFormat ): Unit = { + + val sink = new ApacheIcebergSink(catalog, schema, tableIdentifier, outputFileFormat) + + sink.setName(s"*#-> Write to Iceberg Table Sink ") + this.connectTo(sink, 0) + this.planBuilder.sinks += sink + this.planBuilder.buildAndExecute() + this.planBuilder.sinks.clear() + } def writeParquet(url: String, overwrite: Boolean = false, preferDataset: Boolean = false)(implicit ev: Out =:= Record): Unit = @@ -1095,6 +1137,7 @@ class DataQuanta[Out: ClassTag](val operator: ElementaryOperator, outputIndex: I this.planBuilder.sinks.clear() } + private def writeParquetJava(url: String, overwrite: Boolean, preferDataset: Boolean)(implicit ev: Out =:= Record): Unit = { val _ = ev val sink = new ParquetSink(url, overwrite, preferDataset) diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala index dd8b32bbb..dad054a2f 100644 --- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/DataQuantaBuilder.scala @@ -40,6 +40,10 @@ import org.apache.wayang.core.types.DataSetType import org.apache.wayang.core.util.{Logging, ReflectionUtils, WayangCollections, Tuple => WayangTuple} import org.apache.wayang.core.plan.wayangplan.OutputSlot +import org.apache.iceberg.Schema +import org.apache.iceberg.FileFormat +import org.apache.iceberg.catalog.{Catalog, TableIdentifier} + import scala.collection.mutable.ListBuffer @@ -499,6 +503,27 @@ trait DataQuantaBuilder[+This <: DataQuantaBuilder[_, Out], Out] extends Logging this.dataQuanta().writeTextFileJava(url, formatterUdf, udfLoadProfileEstimator) } + /** + * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.IcebergTableSink]]. This triggers + * execution of the constructed [[WayangPlan]]. + * + * @param catalog Iceberg Catalog + * @param schema Iceberg Schema of the table to create + * @param tableIdentifier Iceberg Table Identifier of the table to create + * @param outputFileFormat File format of the output data files + * @return the collected data quanta + */ + + def writeIcebergTable(catalog: Catalog, + schema: Schema, + tableIdentifier: TableIdentifier, + outputFileFormat: FileFormat, + jobName: String): Unit = { + this.javaPlanBuilder.withJobName(jobName) + this.dataQuanta().writeIcebergTableJava(catalog, schema, tableIdentifier, outputFileFormat) + + } + /** * Feed the built [[DataQuanta]] into a [[org.apache.wayang.basic.operators.KafkaTopicSink]]. This triggers * execution of the constructed [[WayangPlan]]. diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala index 604f637ad..5e464c76b 100644 --- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/JavaPlanBuilder.scala @@ -25,13 +25,15 @@ import java.util.{Collection => JavaCollection} import org.apache.commons.lang3.Validate import org.apache.wayang.api.util.DataQuantaBuilderCache import org.apache.wayang.basic.data.Record -import org.apache.wayang.basic.operators.{AmazonS3Source, AzureBlobStorageSource, GoogleCloudStorageSource, KafkaTopicSource, ParquetSource, TableSource, TextFileSource} +import org.apache.wayang.basic.operators.{AmazonS3Source, AzureBlobStorageSource, GoogleCloudStorageSource, KafkaTopicSource, ParquetSource, TableSource, TextFileSource, ApacheIcebergSource} import org.apache.wayang.commons.util.profiledb.model.Experiment import org.apache.wayang.core.api.WayangContext import org.apache.wayang.core.plan.wayangplan._ import org.apache.wayang.core.types.DataSetType import scala.reflect.ClassTag +import org.apache.iceberg.catalog.{Catalog, TableIdentifier} +import org.apache.iceberg.expressions.Expression /** * Utility to build and execute [[WayangPlan]]s. @@ -79,6 +81,22 @@ class JavaPlanBuilder(wayangCtx: WayangContext, jobName: String) { preferDataset: Boolean = false): UnarySourceDataQuantaBuilder[UnarySourceDataQuantaBuilder[_, Record], Record] = createSourceBuilder(ParquetSource.create(url, projection).preferDatasetOutput(preferDataset))(ClassTag(classOf[Record])) + /** + * Read an Apache Iceberg table and provide it as a dataset of [[Record]]s. + * + * @param catalog the Iceberg catalog containing the table + * @param tableIdentifier the identifier of the Iceberg table to read + * @param filterExpressions optional array of filter expressions to apply during the read + * @param projectionColumns optional array of column names to project (select specific columns) + * @return [[DataQuantaBuilder]] for the Iceberg table + */ + def readApacheIcebergTable( + catalog: Catalog, + tableIdentifier: TableIdentifier, + filterExpressions: Array[Expression] = null, + projectionColumns: Array[String] = null): UnarySourceDataQuantaBuilder[UnarySourceDataQuantaBuilder[_, Record], Record] = + createSourceBuilder(ApacheIcebergSource.create(catalog, tableIdentifier, filterExpressions, projectionColumns))(ClassTag(classOf[Record])) + /** * Read a text file from a Google Cloud Storage bucket and provide it as a dataset of [[String]]s, one per line. * diff --git a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala index 5e3635e68..d49deded1 100644 --- a/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala +++ b/wayang-api/wayang-api-scala-java/src/main/scala/org/apache/wayang/api/PlanBuilder.scala @@ -24,7 +24,7 @@ package org.apache.wayang.api import org.apache.commons.lang3.Validate import org.apache.wayang.api import org.apache.wayang.basic.data.Record -import org.apache.wayang.basic.operators.{AmazonS3Source, AzureBlobStorageSource, CollectionSource, GoogleCloudStorageSource, ObjectFileSource, ParquetSource, TableSource, TextFileSource} +import org.apache.wayang.basic.operators.{AmazonS3Source, AzureBlobStorageSource, CollectionSource, GoogleCloudStorageSource, ObjectFileSource, ParquetSource, TableSource, TextFileSource, ApacheIcebergSource} import org.apache.wayang.commons.util.profiledb.model.Experiment import org.apache.wayang.core.api.WayangContext import org.apache.wayang.core.plan.wayangplan._ @@ -34,6 +34,8 @@ import scala.collection.JavaConversions import scala.collection.mutable.ListBuffer import scala.language.implicitConversions import scala.reflect._ +import org.apache.iceberg.catalog.{Catalog, TableIdentifier} +import org.apache.iceberg.expressions.Expression /** * Utility to build [[WayangPlan]]s. @@ -144,6 +146,21 @@ class PlanBuilder(private[api] val wayangContext: WayangContext, private var job preferDataset: Boolean = false): DataQuanta[Record] = load(ParquetSource.create(url, projection).preferDatasetOutput(preferDataset)) + /** + * Read an Apache Iceberg table and provide it as a dataset of [[Record]]s. + * + * @param catalog the Iceberg catalog containing the table + * @param tableIdentifier the identifier of the Iceberg table to read + * @param filterExpressions optional array of filter expressions to apply during the read + * @param projectionColumns optional array of column names to project (select specific columns) + * @return [[DataQuanta]] of [[Record]] for the Iceberg table + */ + def readApacheIcebergTable( + catalog: Catalog, + tableIdentifier: TableIdentifier, + filterExpressions: Array[Expression] = null, + projectionColumns: Array[String] = null): DataQuanta[Record] = load(ApacheIcebergSource.create(catalog, tableIdentifier, filterExpressions, projectionColumns)) + /** * Read a text file from a Google Cloud Storage bucket and provide it as a dataset of [[String]]s, one per line. * diff --git a/wayang-commons/wayang-basic/pom.xml b/wayang-commons/wayang-basic/pom.xml index 1d1b460ae..049e2f7a6 100644 --- a/wayang-commons/wayang-basic/pom.xml +++ b/wayang-commons/wayang-basic/pom.xml @@ -34,6 +34,11 @@ This modules represents the base Wayang package with the default operators and functions. + + 1.6.0 + 3.3.6 + + @@ -132,6 +137,45 @@ com.azure azure-identity + + + + + + org.apache.iceberg + iceberg-core + ${iceberg.version} + + + org.apache.iceberg + iceberg-api + ${iceberg.version} + + + org.apache.iceberg + iceberg-parquet + ${iceberg.version} + + + org.apache.iceberg + iceberg-data + ${iceberg.version} + + + org.apache.hadoop + hadoop-common + ${hadoop.version} + + + org.apache.hadoop + hadoop-client + ${hadoop.version} + + + org.slf4j + slf4j-simple + 2.0.16 + diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSink.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSink.java new file mode 100644 index 000000000..e9a2b3626 --- /dev/null +++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSink.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.wayang.basic.operators; + +import org.apache.wayang.core.plan.wayangplan.UnarySink; +import org.apache.wayang.core.types.DataSetType; + +import org.apache.iceberg.Schema; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.FileFormat; +import org.apache.wayang.basic.data.Record; + +/** + * This {@link UnarySink} writes all incoming data quanta to an iceberg table. + * Either if the table does not exists it will create new, otherwise append. + * + * @param Data Type if the incoming Data Quanta + */ +public class ApacheIcebergSink extends UnarySink { + + protected final Catalog catalog; + protected final Schema schema; + protected final TableIdentifier tableIdentifier; + protected final FileFormat outputFileFormat; + + /** + * + * @param catalog Iceberg catalog used to resolve the target table; must + * not be {@code null} + * @param schema Iceberg write schema; must be compatible with the + * target table + * @param tableIdentifier fully qualified identifier of the target table + * + * @param outputFileFormat {@link FileFormat} the format of the output data files + */ + public ApacheIcebergSink(Catalog catalog, Schema schema, TableIdentifier tableIdentifier, FileFormat outputFileFormat) { + super(DataSetType.createDefault(Record.class)); + this.catalog = catalog; + this.schema = schema; + this.tableIdentifier = tableIdentifier; + this.outputFileFormat = outputFileFormat; + } + + /** + * Creates a copied instance. + * + * @param that should be copied + */ + public ApacheIcebergSink(ApacheIcebergSink that) { + super(that); + this.catalog = that.catalog; + this.schema = that.schema; + this.tableIdentifier = that.tableIdentifier; + this.outputFileFormat = that.outputFileFormat; + } + + public FileFormat getOutputFileFormat() { + return this.outputFileFormat; + } +} diff --git a/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSource.java b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSource.java new file mode 100644 index 000000000..3f188f418 --- /dev/null +++ b/wayang-commons/wayang-basic/src/main/java/org/apache/wayang/basic/operators/ApacheIcebergSource.java @@ -0,0 +1,339 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.basic.operators; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import org.apache.wayang.basic.data.Record; +import org.apache.wayang.basic.types.RecordType; +import org.apache.wayang.commons.util.profiledb.model.measurement.TimeMeasurement; +import org.apache.wayang.core.api.Configuration; +import org.apache.wayang.core.optimizer.OptimizationContext; +import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimate; +import org.apache.wayang.core.plan.wayangplan.UnarySource; +import org.apache.wayang.core.types.DataSetType; + +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.IcebergGenerics; +import org.apache.iceberg.data.IcebergGenerics.ScanBuilder; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.io.CloseableIterable; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; + +import org.apache.commons.lang3.Validate; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Table; +import org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator; + + +/** + * This source reads an Iceberg Table and outputs the lines as + * {@link org.apache.wayang.basic.data.Record} + * units. + */ +public class ApacheIcebergSource extends UnarySource { + + private final Logger logger = LogManager.getLogger(this.getClass()); + + private final Catalog catalog; + + private final TableIdentifier tableIdentifier; + + private Collection whereExpressions; + private Collection columns; + + private org.apache.iceberg.Table cachedTable = null; + + private static final Double defaultSelectivityValue = 0.10; + + /** + * Creates a new Iceberg source instance. + * + * @param catalog Iceberg catalog used to load the table + * @param tableIdentifier identifier of the target table + * @param whereExpressions list of + * {@link org.apache.iceberg.expressions.Expression} + * filters; empty list for none + * @param columns collection of column names to project; empty list for + * all columns + * @return a new {@link ApacheIcebergSource} instance + */ + public static ApacheIcebergSource create(Catalog catalog, TableIdentifier tableIdentifier, + Expression[] whereExpressions, + String[] columns) { + + List whereList = + (whereExpressions == null) ? Collections.emptyList() : Arrays.asList(whereExpressions); + + List columnList = + (columns == null) ? Collections.emptyList() : Arrays.asList(columns); + + return new ApacheIcebergSource(catalog, tableIdentifier, whereList, columnList); + } + + public ApacheIcebergSource(Catalog catalog, TableIdentifier tableIdentifier, + Collection whereExpressions, Collection columns) { + + + super(createOutputDataSetType(columns)); + this.catalog = catalog; + this.tableIdentifier = tableIdentifier; + + this.whereExpressions = whereExpressions; + this.columns = columns; + } + + /** + * Copies an instance (exclusive of broadcasts). + * + * @param that that should be copied + */ + public ApacheIcebergSource(ApacheIcebergSource that) { + super(that); + this.catalog = that.getCatalog(); + this.columns = that.getColumns(); + this.tableIdentifier = that.getTableIdentifier(); + this.whereExpressions = that.getWhereExpressions(); + } + + @Override + public Optional createCardinalityEstimator( + final int outputIndex, + final Configuration configuration) { + Validate.inclusiveBetween(0, this.getNumOutputs() - 1, outputIndex); + return Optional.of(new ApacheIcebergSource.CardinalityEstimator()); + } + + protected class CardinalityEstimator implements org.apache.wayang.core.optimizer.cardinality.CardinalityEstimator { + + public final CardinalityEstimate FALLBACK_ESTIMATE = new CardinalityEstimate(1000L, 100000000L, 0.7); + + @Override + public CardinalityEstimate estimate(OptimizationContext optimizationContext, + CardinalityEstimate... inputEstimates) { + Validate.isTrue(ApacheIcebergSource.this.getNumInputs() == inputEstimates.length); + + // see Job for StopWatch measurements + final TimeMeasurement timeMeasurement = optimizationContext.getJob().getStopWatch().start( + "Optimization", "Cardinality&Load Estimation", "Push Estimation", "Estimate source cardinalities"); + + // Query the job cache first to see if there is already an estimate. + String jobCacheKey = String.format("%s.estimate(%s)", this.getClass().getCanonicalName(), + ApacheIcebergSource.this.getIcebergTableName()); + + CardinalityEstimate cardinalityEstimate = optimizationContext.queryJobCache(jobCacheKey, + CardinalityEstimate.class); + if (cardinalityEstimate != null) + return cardinalityEstimate; + + // Otherwise calculate the cardinality. + // First, inspect the size of the file and its line sizes. + OptionalLong fileSize = getFileSize(); + if (fileSize.isEmpty()) { + ApacheIcebergSource.this.logger.warn("Could not determine size of {}... deliver fallback estimate.", + ApacheIcebergSource.this.getIcebergTableName()); + timeMeasurement.stop(); + return this.FALLBACK_ESTIMATE; + } + if (fileSize.getAsLong() == 0L) { + timeMeasurement.stop(); + return new CardinalityEstimate(0L, 0L, 1d); + } + + OptionalLong numberRows = ApacheIcebergSource.this.ExtractNumberOfRows(); + + if (numberRows.isEmpty()) { + ApacheIcebergSource.this.logger + .warn("Could not determine the cardinality of {}... deliver fallback estimate.", + ApacheIcebergSource.this.getIcebergTableName()); + timeMeasurement.stop(); + return this.FALLBACK_ESTIMATE; + } + + long rowCount = numberRows.getAsLong(); + cardinalityEstimate = new CardinalityEstimate(rowCount, rowCount, 1d); + + // Cache the result, so that it will not be recalculated again. + optimizationContext.putIntoJobCache(jobCacheKey, cardinalityEstimate); + + timeMeasurement.stop(); + return cardinalityEstimate; + } + } + + /** + * Creates a {@link DataSetType} for the output records based on the given + * column names. + * + * @param columnNames collection of column names to include; empty for default + * record type + * @return a {@link DataSetType} describing the output record structure + */ + + private static DataSetType createOutputDataSetType(Collection columnNames) { + if (columnNames == null) { + columnNames = new ArrayList(); + } + String[] columnNamesAsArray = columnNames.toArray(new String[0]); + return columnNamesAsArray.length == 0 ? DataSetType.createDefault(Record.class) + : DataSetType.createDefault(new RecordType(columnNamesAsArray)); + } + + public Catalog getCatalog() { + return catalog; + } + + public TableIdentifier getTableIdentifier() { + return tableIdentifier; + } + + public Collection getWhereExpressions() { + return whereExpressions; + } + + public Collection getColumns() { + return columns; + } + + private void setCachedTable(Table table) { + this.cachedTable = table; + } + + /** + * Returns the Iceberg table name. + * + * @return the table name from the + * {@link org.apache.iceberg.catalog.TableIdentifier} + */ + public String getIcebergTableName() { + return tableIdentifier.name(); + } + + /** + * Loads and returns the Iceberg {@link org.apache.iceberg.Table}. + * Uses a cached instance if available. + * + * @return the loaded Iceberg table + */ + private Table getTable() { + if (this.cachedTable != null) { + return this.cachedTable; + } + + Table table = this.catalog.loadTable(this.tableIdentifier); + setCachedTable(table); + return table; + } + + /** + * Builds a {@link org.apache.iceberg.data.ScanBuilder} for the current table. + * Applies selected columns and filter expressions if provided. + * + * @return configured {@link org.apache.iceberg.data.ScanBuilder} + */ + public ScanBuilder getScanBuilder() { + + ScanBuilder scanBuilder = IcebergGenerics.read(getTable()); + + if (this.columns != null && this.columns.size() > 0) { + scanBuilder = scanBuilder.select(columns); + } + + if (this.whereExpressions != null && this.whereExpressions.size() > 0) { + for (Expression whereExpression : this.whereExpressions) { + scanBuilder = scanBuilder.where(whereExpression); + } + } + + return scanBuilder; + } + + /** + * Estimates the number of rows in the table. + * Applies a selectivity adjustment if filter expressions are present. + * + * @return estimated number of rows, or {@link OptionalLong#empty()} if + * unavailable + */ + private OptionalLong ExtractNumberOfRows() { + try { + long rowCount = 0L; + + Table table = getTable(); + + try (CloseableIterable tasks = table.newScan().planFiles()) { + for (FileScanTask fileScanTask : tasks) { + rowCount += fileScanTask.estimatedRowsCount(); + } + } + + if (rowCount == 0) { + return OptionalLong.empty(); + } + + if (this.whereExpressions != null && this.whereExpressions.size() > 1) { + + Double updatedRowCount = rowCount * Math.pow(defaultSelectivityValue, this.whereExpressions.size()); + return OptionalLong.of(updatedRowCount.longValue()); + } + + return OptionalLong.of(rowCount); + + } catch (Exception e) { + this.logger.warn("Could not extract the number of rows. Returning empty. Got erro: " + e); + return OptionalLong.empty(); + } + + } + + /** + * Calculates the total file size in bytes of all table files. + * + * @return total file size in bytes, or {@link OptionalLong#empty()} if + * unavailable + */ + private OptionalLong getFileSize() { + + try { + long fileSizeCount = 0L; + try (CloseableIterable tasks = getTable().newScan().planFiles()) { + for (FileScanTask t : tasks) { + fileSizeCount += t.file().fileSizeInBytes(); + } + } + return OptionalLong.of(fileSizeCount); + + } catch (Exception e) { + this.logger.warn("Could not get file size. Returning empty. Got error: " + e); + return OptionalLong.empty(); + + } + + } + +} diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ApacheIcebergSinkMapping.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ApacheIcebergSinkMapping.java new file mode 100644 index 000000000..18fc7862f --- /dev/null +++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ApacheIcebergSinkMapping.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.wayang.java.mapping; + +import java.util.Collection; +import java.util.Collections; + +import org.apache.wayang.basic.operators.ApacheIcebergSink; +import org.apache.wayang.basic.operators.TextFileSink; +import org.apache.wayang.core.api.exception.WayangException; +import org.apache.wayang.core.mapping.Mapping; +import org.apache.wayang.core.mapping.OperatorPattern; +import org.apache.wayang.core.mapping.PlanTransformation; +import org.apache.wayang.core.mapping.ReplacementSubplanFactory; +import org.apache.wayang.core.mapping.SubplanPattern; +import org.apache.wayang.core.plan.wayangplan.OperatorBase; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.java.operators.JavaApacheIcebergSink; +import org.apache.wayang.java.operators.JavaTextFileSink; +import org.apache.wayang.java.platform.JavaPlatform; + +/** + * Mapping from {@link ApacheIcebergSink} to + * {@link JavaApacheIcebergSink}. + */ +public class ApacheIcebergSinkMapping implements Mapping { + + @Override + public Collection getTransformations() { + return Collections.singleton(new PlanTransformation( + this.createSubplanPattern(), + this.createReplacementSubplanFactory(), + JavaPlatform.getInstance())); + } + + private SubplanPattern createSubplanPattern() { + ApacheIcebergSink icebergSink = new ApacheIcebergSink(null, null, null, null); + final OperatorPattern operatorPattern = new OperatorPattern<>( + "sink", icebergSink, false); + return SubplanPattern.createSingleton(operatorPattern); + } + + private ReplacementSubplanFactory createReplacementSubplanFactory() { + return new ReplacementSubplanFactory.OfSingleOperators( + (matchedOperator, epoch) -> new JavaApacheIcebergSink(matchedOperator).at(epoch)); + } +} diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ApacheIcebergSourceMapping.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ApacheIcebergSourceMapping.java new file mode 100644 index 000000000..996540492 --- /dev/null +++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/ApacheIcebergSourceMapping.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.wayang.java.mapping; + +import java.util.Collection; +import java.util.Collections; + +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.wayang.basic.operators.ApacheIcebergSource; +import org.apache.wayang.basic.operators.TextFileSource; +import org.apache.wayang.core.mapping.Mapping; +import org.apache.wayang.core.mapping.OperatorPattern; +import org.apache.wayang.core.mapping.PlanTransformation; +import org.apache.wayang.core.mapping.ReplacementSubplanFactory; +import org.apache.wayang.core.mapping.SubplanPattern; +import org.apache.wayang.java.operators.JavaApacheIcebergSource; +import org.apache.wayang.java.platform.JavaPlatform; +import org.apache.iceberg.expressions.Expression; + + +/** + * Mapping from {@link ApahceIcebergSource} to {@link JavaApacheIcebergSource}. + */ +public class ApacheIcebergSourceMapping implements Mapping { + + + @Override + public Collection getTransformations() { + return Collections.singleton(new PlanTransformation( + this.createSubplanPattern(), + this.createReplacementSubplanFactory(), + JavaPlatform.getInstance() + )); + } + + private SubplanPattern createSubplanPattern() { + ApacheIcebergSource icebergSource = new ApacheIcebergSource((Catalog) null, (TableIdentifier) null, (Collection) null, (Collection) null); + + final OperatorPattern operatorPattern = new OperatorPattern( + "source", new ApacheIcebergSource(icebergSource), false + ); + return SubplanPattern.createSingleton(operatorPattern); + } + + private ReplacementSubplanFactory createReplacementSubplanFactory() { + return new ReplacementSubplanFactory.OfSingleOperators( + (matchedOperator, epoch) -> new JavaApacheIcebergSource(matchedOperator).at(epoch) + ); + } + +} diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java index dfdeca43e..a4418c18a 100644 --- a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java +++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/mapping/Mappings.java @@ -62,7 +62,9 @@ public class Mappings { new KafkaTopicSinkMapping(), new AmazonS3SourceMapping(), new GoogleCloudStorageSourceMapping(), - new AzureBlobStorageSourceMapping() + new AzureBlobStorageSourceMapping(), + new ApacheIcebergSourceMapping(), + new ApacheIcebergSinkMapping() ); public static Collection GRAPH_MAPPINGS = Arrays.asList( diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaApacheIcebergSink.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaApacheIcebergSink.java new file mode 100644 index 000000000..f2042f61f --- /dev/null +++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaApacheIcebergSink.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.wayang.java.operators; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.stream.Stream; + +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.encryption.EncryptedOutputFile; +import org.apache.iceberg.io.DataWriter; +import org.apache.iceberg.io.FileAppenderFactory; +import org.apache.iceberg.io.OutputFileFactory; +import org.apache.iceberg.data.Record; +import org.apache.wayang.basic.channels.FileChannel; +import org.apache.wayang.basic.operators.ApacheIcebergSink; +import org.apache.wayang.core.api.exception.WayangException; +import org.apache.wayang.core.optimizer.OptimizationContext; +import org.apache.wayang.core.plan.wayangplan.ExecutionOperator; +import org.apache.wayang.core.plan.wayangplan.Operator; +import org.apache.wayang.core.platform.ChannelDescriptor; +import org.apache.wayang.core.platform.ChannelInstance; +import org.apache.wayang.core.platform.lineage.ExecutionLineageNode; +import org.apache.wayang.core.types.DataSetType; +import org.apache.wayang.core.util.Tuple; +import org.apache.wayang.java.channels.CollectionChannel; +import org.apache.wayang.java.channels.JavaChannelInstance; +import org.apache.wayang.java.channels.StreamChannel; +import org.apache.wayang.java.execution.JavaExecutor; +import org.apache.wayang.java.platform.JavaPlatform; + +/** + * {@link Operator} for the {@link JavaPlatform} that creates an Iceberg Table. + * If the table does not exists it will create a new, otherwise it will append + * to a table + * {@code R} Is the input type of the incoming Data Stream. Must be a + * {@link org.apache.wayang.basic.data.Record} (or subclass). + */ + +public class JavaApacheIcebergSink extends ApacheIcebergSink + implements JavaExecutionOperator { + + private final int defaultPartitionId = 1; + private final int defaultTaskId = 1; + + /** + * Creates a new sink for the Java Platform. + * + * @param catalog Iceberg catalog used to resolve the target table; must + * not be {@code null} + * @param schema Iceberg write schema; must be compatible with the + * target table + * @param tableIdentifier fully qualified identifier of the target table + * + * @param fileFormat file format used for writing (e.g., Parquet, Avro) + */ + public JavaApacheIcebergSink( + Catalog catalog, + Schema schema, + TableIdentifier tableIdentifier, + FileFormat outputFileFormnat) { + super(catalog, schema, tableIdentifier, outputFileFormnat); + } + + + /** + * Copy constructor. + * + * @param that sink instance to copy + */ + + public JavaApacheIcebergSink(ApacheIcebergSink that) { + super(that); + } + + + + @Override + public Tuple, Collection> evaluate( + ChannelInstance[] inputs, + ChannelInstance[] outputs, + JavaExecutor javaExecutor, + OptimizationContext.OperatorContext operatorContext) { + + try { + + assert inputs.length == 1; + assert outputs.length == 2; + + FileFormat outputFileFormat = getOutputFileFormat(); + + JavaChannelInstance input = (JavaChannelInstance) inputs[0]; + + if (!tableExists()) { + catalog.createTable(tableIdentifier, schema); + } + + Stream inputStream = input + .provideStream() + .peek(r -> { + if (!(r instanceof org.apache.wayang.basic.data.Record)) { + throw new WayangException("Expected Wayang Record but got " + r.getClass()); + }}) + .map(r -> wayangRecordToIcebergRecord((org.apache.wayang.basic.data.Record) r)); + + Table table = catalog.loadTable(tableIdentifier); + OutputFileFactory outputFileFactory = OutputFileFactory + .builderFor(table, this.defaultPartitionId, this.defaultTaskId) + .format(outputFileFormat) + .build(); + + EncryptedOutputFile outputFile = outputFileFactory.newOutputFile(); + + FileAppenderFactory appenderFactory = new org.apache.iceberg.data.GenericAppenderFactory( + this.schema); + + try (DataWriter writer = appenderFactory.newDataWriter(outputFile, + outputFileFormat, /* Partition */null)) { + + inputStream.forEach(dataQuanta -> { + writer.write(dataQuanta); + }); + + } + + } catch (Exception e) { + throw new WayangException("Coult not write stream to iceberg location.", e); + } + + return ExecutionOperator.modelEagerExecution(inputs, outputs, operatorContext); + + } + + @Override + protected ExecutionOperator createCopy() { + return new JavaApacheIcebergSink(this.catalog, this.schema, this.tableIdentifier, getOutputFileFormat()); + } + + @Override + public String getLoadProfileEstimatorConfigurationKey() { + return "wayang.java.apacheicebergsink.load"; + } + + @Override + public List getSupportedInputChannels(int index) { + return Arrays.asList(CollectionChannel.DESCRIPTOR, StreamChannel.DESCRIPTOR); + } + + @Override + public List getSupportedOutputChannels(int index) { + throw new UnsupportedOperationException(); + } + + private org.apache.iceberg.data.Record wayangRecordToIcebergRecord(org.apache.wayang.basic.data.Record wayangRecord) { + GenericRecord template = GenericRecord.create(this.schema); + Record out = template.copy(); + + int n = this.schema.columns().size(); + for (int i = 0; i < n; i++) { + Object v = wayangRecord.getField(i); + out.set(i, v); + } + return out; + }; + + private boolean tableExists() { + return catalog.tableExists(tableIdentifier); + } + +} diff --git a/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaApacheIcebergSource.java b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaApacheIcebergSource.java new file mode 100644 index 000000000..1b10eee91 --- /dev/null +++ b/wayang-platforms/wayang-java/src/main/java/org/apache/wayang/java/operators/JavaApacheIcebergSource.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.wayang.java.operators; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; + +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.IcebergGenerics.ScanBuilder; +import org.apache.iceberg.expressions.Expression; +import org.apache.wayang.basic.operators.ApacheIcebergSource; +import org.apache.wayang.core.api.exception.WayangException; +import org.apache.wayang.core.optimizer.OptimizationContext; +import org.apache.wayang.core.optimizer.costs.LoadProfileEstimators; +import org.apache.wayang.core.platform.ChannelDescriptor; +import org.apache.wayang.core.platform.ChannelInstance; +import org.apache.wayang.core.platform.lineage.ExecutionLineageNode; +import org.apache.wayang.core.util.Tuple; +import org.apache.wayang.java.channels.StreamChannel; +import org.apache.wayang.java.execution.JavaExecutor; + +/** + * This is execution operator implements the {@link ApacheIcebergSource}. + */ +public class JavaApacheIcebergSource +extends ApacheIcebergSource implements JavaExecutionOperator { + + /** + * Creates a new Java Iceberg source instance. + * + * @param catalog {@link org.apache.iceberg.catalog.Catalog} catalog + * used to load the table + * @param tableIdentifier {@linkorg.apache.iceberg.catalog.TableIdentifier} identifier + * of the target table + * @param whereExpressions Collection of + * {@link org.apache.iceberg.expressions.Expression} + * filters; empty list for none + * @param columns collection of column names to project; empty list for + * all columns + * @return a new {@link JavaApacheIcebergSource} instance + */ + + public JavaApacheIcebergSource(Catalog catalog, TableIdentifier tableIdentifier, + Expression[] whereExpressions, String[] columns) { + super(ApacheIcebergSource.create(catalog, tableIdentifier, whereExpressions, columns)); + } + + /** + * Creates a {@link Stream} of {@link org.apache.wayang.basic.data.Record} + * objects + * from the given Iceberg {@link org.apache.iceberg.data.ScanBuilder}. + * + * @param scanBuilder the configured {@link org.apache.iceberg.data.ScanBuilder} + * used to read table data + * @return a sequential {@link Stream} of Wayang + * {@link org.apache.wayang.basic.data.Record} instances + */ + private static Stream getStreamFromIcebergTable(ScanBuilder scanBuilder) { + return StreamSupport.stream(scanBuilder.build().spliterator(), false) + .map(record -> getWayangRecord(record)); + } + + /** + * Converts an Iceberg {@link org.apache.iceberg.data.Record} to a Wayang + * {@link org.apache.wayang.basic.data.Record}. + * + * @param icebergRecord the Iceberg record to convert + * @return a new Wayang {@link org.apache.wayang.basic.data.Record} containing + * the same field values + */ + private static org.apache.wayang.basic.data.Record getWayangRecord(org.apache.iceberg.data.Record icebergRecord) { + Object[] values = new Object[icebergRecord.size()]; + for (int i = 0; i < values.length; i++) { + values[i] = icebergRecord.get(i); + } + return new org.apache.wayang.basic.data.Record(values); + } + + @Override + public Tuple, Collection> evaluate( + ChannelInstance[] inputs, + ChannelInstance[] outputs, + JavaExecutor javaExecutor, + OptimizationContext.OperatorContext operatorContext) { + + assert inputs.length == this.getNumInputs(); + assert outputs.length == this.getNumOutputs(); + + String tableName = this.getIcebergTableName(); + + try { + + ScanBuilder scanBuilder = this.getScanBuilder(); + + ((StreamChannel.Instance) outputs[0]).accept(getStreamFromIcebergTable(scanBuilder)); + + ExecutionLineageNode prepareLineageNode = new ExecutionLineageNode(operatorContext); + prepareLineageNode.add(LoadProfileEstimators.createFromSpecification( + "wayang.java.apacheicebergsource.load.prepare", javaExecutor.getConfiguration())); + ExecutionLineageNode mainLineageNode = new ExecutionLineageNode(operatorContext); + mainLineageNode.add(LoadProfileEstimators.createFromSpecification( + "wayang.java.apacheicebergsource.load.main", javaExecutor.getConfiguration())); + + outputs[0].getLineage().addPredecessor(mainLineageNode); + + return prepareLineageNode.collectAndMark(); + + } catch (Exception e) { + throw new WayangException(String.format("Reading from Apache Iceberg Source table %s failed.", tableName), + e); + + } + + } + + /** + * Copies an instance (exclusive of broadcasts). + * + * @param that that should be copied + */ + public JavaApacheIcebergSource(ApacheIcebergSource that) { + super(that); + } + + @Override + public Collection getLoadProfileEstimatorConfigurationKeys() { + return Arrays.asList("wayang.java.apacheicebergsource.load.prepare", + "wayang.java.apacheicebergsource.load.main"); + } + + @Override + public List getSupportedInputChannels(int index) { + throw new UnsupportedOperationException(String.format("%s does not have input channels.", this)); + } + + @Override + public List getSupportedOutputChannels(int index) { + assert index <= this.getNumOutputs() || (index == 0 && this.getNumOutputs() == 0); + return Collections.singletonList(StreamChannel.DESCRIPTOR); + } + +}