Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions core/src/main/java/org/apache/calcite/util/BuiltInMethod.java
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@ public enum BuiltInMethod {
Integer.class, int.class, int.class, BigDecimal.class, RoundingMode.class),
INTO(ExtendedEnumerable.class, "into", Collection.class),
REMOVE_ALL(ExtendedEnumerable.class, "removeAll", Collection.class),
UPDATE(ExtendedEnumerable.class, "update", List.class, Function1.class,
Function1.class, Function1.class),
SCHEMA_GET_SUB_SCHEMA(Schema.class, "getSubSchema", String.class),
SCHEMA_GET_TABLE(Schema.class, "getTable", String.class),
SCHEMA_PLUS_ADD_TABLE(SchemaPlus.class, "add", String.class, Table.class),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to you under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.calcite.adapter.enumerable;

import org.apache.calcite.linq4j.Linq4j;

import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;

/** Tests for {@link EnumerableTableModify} row-consumption semantics. */
class EnumerableTableModifyTest {

@Test void testApplyUpdateOneToOneUpdatesOnlyFirstNMatchingRows() {
final List<Object[]> sink = new ArrayList<>();
sink.add(new Object[] {1, 10});
sink.add(new Object[] {1, 10});
sink.add(new Object[] {1, 10});
sink.add(new Object[] {2, 20});

// Source row layout: [original_i, original_j, new_j].
final List<Object[]> source =
Arrays.asList(new Object[] {1, 10, 100}, new Object[] {1, 10, 200});

final long count =
EnumerableTableModify.applyUpdateOneToOne(Linq4j.asEnumerable(source), sink, 2,
new int[] {1});

assertThat(count, is(2L));
assertThat(toValueRows(sink),
is(
Arrays.asList(
Arrays.asList(1, 100),
Arrays.asList(1, 200),
Arrays.asList(1, 10),
Arrays.asList(2, 20))));
}

@Test void testApplyDeleteDoesNotSkipRowsWhenSourceBackedBySink() {
final List<Object> sink = new ArrayList<>(Arrays.asList(1, 1, 1));
final List<Object[]> source =
Arrays.asList(new Object[] {1}, new Object[] {1}, new Object[] {1});

EnumerableTableModify.applyDeleteRowsByKey(
Linq4j.asEnumerable(source), sink, row -> new Object[] {row});

assertThat(sink, is(Collections.emptyList()));
}

private static List<List<Object>> toValueRows(List<Object[]> rows) {
final List<List<Object>> valueRows = new ArrayList<>();
for (Object[] row : rows) {
valueRows.add(Arrays.asList(row));
}
return valueRows;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -278,14 +278,14 @@ public class JdbcFrontLinqBackTest {
final List<Employee> employees = new ArrayList<>();
CalciteAssert.AssertThat with = mutable(employees);
with.query("select * from \"foo\".\"bar\"")
.returnsUnordered(
"empid=0; deptno=0; name=first; salary=0.0; commission=null");
.returnsUnordered("empid=0; deptno=0; name=first; salary=0.0; commission=null");
with.query("insert into \"foo\".\"bar\" select * from \"hr\".\"emps\"")
.updates(4);
with.query("select count(*) as c from \"foo\".\"bar\"")
.returnsUnordered("C=5");
final String deleteSql = "delete from \"foo\".\"bar\" "
+ "where \"deptno\" = 10";
with.query("select count(*) as c from \"foo\".\"bar\" where \"deptno\" = 10")
.returnsUnordered("C=3");
final String deleteSql = "delete from \"foo\".\"bar\" where \"deptno\" = 10";
with.query(deleteSql)
.updates(3);
final String sql = "select \"name\", count(*) as c\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,15 @@
return EnumerableDefaults.remove(getThis(), sink);
}

@Override public <TKey> long update(

Check warning on line 380 in linq4j/src/main/java/org/apache/calcite/linq4j/DefaultEnumerable.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Rename this generic name to match the regular expression '^[A-Z][0-9]?$'.

See more on https://sonarcloud.io/project/issues?id=apache_calcite&issues=AZ4Nea_IbgnSuG7_8QQM&open=AZ4Nea_IbgnSuG7_8QQM&pullRequest=4922
List<T> sink,
Function1<T, TKey> sinkKeySelector,
Function1<T, TKey> sourceKeySelector,
Function1<T, T> transform) {
return EnumerableDefaults.update(getThis(), sink, sinkKeySelector,
sourceKeySelector, transform);
}

@Override public <TInner, TKey, TResult> Enumerable<TResult> hashJoin(
Enumerable<TInner> inner, Function1<T, TKey> outerKeySelector,
Function1<TInner, TKey> innerKeySelector,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
Expand Down Expand Up @@ -4399,6 +4400,40 @@
return sink;
}

/**
* Default implementation of
* {@link ExtendedEnumerable#update(List, Function1, Function1, Function1)}.
*
* <p>Builds a map from source-row keys to replacement rows in a single pass
* over the source, then performs a single pass over the sink, replacing
* matched rows in place.
*/
public static <T, TKey> long update(

Check warning on line 4411 in linq4j/src/main/java/org/apache/calcite/linq4j/EnumerableDefaults.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Rename this generic name to match the regular expression '^[A-Z][0-9]?$'.

See more on https://sonarcloud.io/project/issues?id=apache_calcite&issues=AZ4Nea-ibgnSuG7_8QQL&open=AZ4Nea-ibgnSuG7_8QQL&pullRequest=4922
Enumerable<T> source,
List<T> sink,
Function1<T, TKey> sinkKeySelector,
Function1<T, TKey> sourceKeySelector,
Function1<T, T> sourceTransform) {
final Map<TKey, T> updateMap = new HashMap<>();
try (Enumerator<T> e = source.enumerator()) {
while (e.moveNext()) {
final T row = e.current();
updateMap.put(sourceKeySelector.apply(row), sourceTransform.apply(row));
}
}
long updateCount = 0;
final ListIterator<T> it = sink.listIterator();
while (it.hasNext()) {
final T current = it.next();
final T newRow = updateMap.get(sinkKeySelector.apply(current));
if (newRow != null) {
it.set(newRow);
updateCount++;
}
}
return updateCount;
}

/**
* Hash table with null-safe key set.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,32 @@
*/
<C extends Collection<? super TSource>> C removeAll(C sink);

/**
* Updates rows of {@code sink} based on the contents of this sequence.
*
* <p>For each element {@code x} of this sequence, {@code sourceKeySelector}
* computes a key, and {@code sourceTransform} computes a replacement row.
* Then for each element {@code y} of {@code sink}, {@code sinkKeySelector}
* computes a key; if it matches a key produced from this sequence, {@code y}
* is replaced (in place) with the corresponding replacement row.
*
* <p>The sink is a {@link List} so that elements can be replaced
* in place while preserving order.
*
* @param sink List to be updated in place
* @param sinkKeySelector Function that extracts a key from a sink row
* @param sourceKeySelector Function that extracts a key from a source row
* @param transform Function that produces the replacement row from a
* source row
* @param <TKey> Key type
* @return Number of rows replaced
*/
<TKey> long update(

Check warning on line 570 in linq4j/src/main/java/org/apache/calcite/linq4j/ExtendedEnumerable.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Rename this generic name to match the regular expression '^[A-Z][0-9]?$'.

See more on https://sonarcloud.io/project/issues?id=apache_calcite&issues=AZ4Nea9JbgnSuG7_8QQK&open=AZ4Nea9JbgnSuG7_8QQK&pullRequest=4922
List<TSource> sink,
Function1<TSource, TKey> sinkKeySelector,
Function1<TSource, TKey> sourceKeySelector,
Function1<TSource, TSource> transform);

/**
* Correlates the elements of two sequences based on
* matching keys. The default equality comparer is used to compare
Expand Down
153 changes: 153 additions & 0 deletions server/src/test/java/org/apache/calcite/test/ServerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,159 @@ static Connection connect() throws SQLException {
executor.execute((SqlTruncateTable) o, context);
}

@Test void testUpdate() throws Exception {
try (Connection c = connect();
Statement s = c.createStatement()) {
s.execute("create table t (i int not null, j int not null)");
s.executeUpdate("insert into t values (1, 10)");
s.executeUpdate("insert into t values (2, 20)");
s.executeUpdate("insert into t values (3, 30)");

// Update one row
int count = s.executeUpdate("update t set j = 99 where i = 2");
assertThat(count, is(1));

try (ResultSet r = s.executeQuery("select i, j from t order by i")) {
assertThat(r.next(), is(true));
assertThat(r.getInt(1), is(1));
assertThat(r.getInt(2), is(10));
assertThat(r.next(), is(true));
assertThat(r.getInt(1), is(2));
assertThat(r.getInt(2), is(99));
assertThat(r.next(), is(true));
assertThat(r.getInt(1), is(3));
assertThat(r.getInt(2), is(30));
assertThat(r.next(), is(false));
}

// Update multiple rows
count = s.executeUpdate("update t set j = 0 where i > 1");
assertThat(count, is(2));

try (ResultSet r = s.executeQuery("select sum(j) from t")) {
assertThat(r.next(), is(true));
assertThat(r.getInt(1), is(10));
assertThat(r.next(), is(false));
}

// Update zero rows (no predicate match)
count = s.executeUpdate("update t set j = 100 where i = 99");
assertThat(count, is(0));
}
}

@Test void testUpdateDuplicateRows() throws Exception {
try (Connection c = connect();
Statement s = c.createStatement()) {
s.execute("create table t (i int not null, j int not null)");
s.executeUpdate("insert into t values (1, 10)");
s.executeUpdate("insert into t values (1, 10)");
s.executeUpdate("insert into t values (1, 10)");
s.executeUpdate("insert into t values (2, 20)");

final int count = s.executeUpdate("update t set j = 99 where i = 1 and j = 10");
assertThat(count, is(3));

try (ResultSet r =
s.executeQuery("select "
+ "sum(case when i = 1 and j = 99 then 1 else 0 end), "
+ "sum(case when i = 1 and j = 10 then 1 else 0 end), "
+ "sum(case when i = 2 and j = 20 then 1 else 0 end) "
+ "from t")) {
assertThat(r.next(), is(true));
assertThat(r.getInt(1), is(3));
assertThat(r.getInt(2), is(0));
assertThat(r.getInt(3), is(1));
assertThat(r.next(), is(false));
}
}
}

/** Tests that INSERT ... SELECT returns the correct row count when
* 0, 1, or multiple rows are produced by the source query.
* Exercises {@link org.apache.calcite.server.MutableArrayTable} via the
* enumerable INSERT path. */
@Test void testInsertSelectRowCount() throws Exception {
try (Connection c = connect();
Statement s = c.createStatement()) {
s.execute("create table src (i int not null, j int not null)");
s.executeUpdate("insert into src values (1, 10)");
s.executeUpdate("insert into src values (2, 20)");
s.execute("create table dst (i int not null, j int not null)");

// Insert 0 rows (source query returns nothing)
int count = s.executeUpdate("insert into dst select * from src where 1 = 0");
assertThat(count, is(0));

// Insert 1 row
count = s.executeUpdate("insert into dst select * from src where i = 1");
assertThat(count, is(1));

// Insert multiple rows
count = s.executeUpdate("insert into dst select * from src");
assertThat(count, is(2));
}
}

/** Tests that DELETE returns the correct row count when
* 0, 1, or multiple rows match the predicate.
* Exercises {@link org.apache.calcite.server.MutableArrayTable} via the
* enumerable DELETE path. */
@Test void testDelete() throws Exception {
try (Connection c = connect();
Statement s = c.createStatement()) {
s.execute("create table t (i int not null, j int not null)");
s.executeUpdate("insert into t values (1, 10)");
s.executeUpdate("insert into t values (2, 20)");
s.executeUpdate("insert into t values (3, 30)");

// Delete 0 rows (no predicate match)
int count = s.executeUpdate("delete from t where i = 99");
assertThat(count, is(0));

// Verify all 3 rows are still present
try (ResultSet r = s.executeQuery("select count(*) from t")) {
assertThat(r.next(), is(true));
assertThat(r.getInt(1), is(3));
}

// Delete 1 row
count = s.executeUpdate("delete from t where i = 2");
assertThat(count, is(1));

// Delete multiple rows (both remaining rows: i=1 and i=3)
count = s.executeUpdate("delete from t where i > 0");
assertThat(count, is(2));

// Verify table is empty
try (ResultSet r = s.executeQuery("select count(*) from t")) {
assertThat(r.next(), is(true));
assertThat(r.getInt(1), is(0));
}
}
}

@Test void testDeleteDuplicateRows() throws Exception {
try (Connection c = connect();
Statement s = c.createStatement()) {
s.execute("create table t (i int not null, j int not null)");
s.executeUpdate("insert into t values (1, 10)");
s.executeUpdate("insert into t values (1, 10)");
s.executeUpdate("insert into t values (1, 10)");
s.executeUpdate("insert into t values (2, 20)");

final int count = s.executeUpdate("delete from t where i = 1 and j = 10");
assertThat(count, is(3));

try (ResultSet r = s.executeQuery("select i, j from t")) {
assertThat(r.next(), is(true));
assertThat(r.getInt(1), is(2));
assertThat(r.getInt(2), is(20));
assertThat(r.next(), is(false));
}
}
}

@Test void testStatement() throws Exception {
try (Connection c = connect();
Statement s = c.createStatement();
Expand Down
Loading