Skip to content

Commit 5dbdde5

Browse files
committed
Fixed the missing time column with non-default name & INF without '' when explicitly defined in show create && the bug that the db ttl is non-default for replacing tree views && Pipe: NPE of Deletion Sync & failed logic for compressing progressReportEvent (#17457)
1 parent e96ce95 commit 5dbdde5

8 files changed

Lines changed: 3082 additions & 2 deletions

File tree

integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBDatabaseIT.java

Lines changed: 909 additions & 0 deletions
Large diffs are not rendered by default.

integration-test/src/test/java/org/apache/iotdb/relational/it/schema/IoTDBTableIT.java

Lines changed: 1117 additions & 0 deletions
Large diffs are not rendered by default.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.confignode.procedure.impl.schema.table.view;
21+
22+
import org.apache.iotdb.common.rpc.thrift.TSStatus;
23+
import org.apache.iotdb.commons.exception.IoTDBException;
24+
import org.apache.iotdb.commons.exception.MetadataException;
25+
import org.apache.iotdb.commons.schema.table.TableNodeStatus;
26+
import org.apache.iotdb.commons.schema.table.TreeViewSchema;
27+
import org.apache.iotdb.commons.schema.table.TsTable;
28+
import org.apache.iotdb.confignode.consensus.request.write.table.view.PreCreateTableViewPlan;
29+
import org.apache.iotdb.confignode.exception.DatabaseNotExistsException;
30+
import org.apache.iotdb.confignode.persistence.schema.TreeDeviceViewFieldDetector;
31+
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
32+
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
33+
import org.apache.iotdb.confignode.procedure.impl.schema.SchemaUtils;
34+
import org.apache.iotdb.confignode.procedure.impl.schema.table.CreateTableProcedure;
35+
import org.apache.iotdb.confignode.procedure.state.schema.CreateTableState;
36+
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
37+
import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
38+
import org.apache.iotdb.rpc.TSStatusCode;
39+
40+
import org.apache.tsfile.utils.Pair;
41+
import org.apache.tsfile.utils.ReadWriteIOUtils;
42+
import org.slf4j.Logger;
43+
import org.slf4j.LoggerFactory;
44+
45+
import java.io.DataOutputStream;
46+
import java.io.IOException;
47+
import java.nio.ByteBuffer;
48+
import java.util.Objects;
49+
import java.util.Optional;
50+
51+
import static org.apache.iotdb.rpc.TSStatusCode.TABLE_ALREADY_EXISTS;
52+
53+
public class CreateTableViewProcedure extends CreateTableProcedure {
54+
private static final Logger LOGGER = LoggerFactory.getLogger(CreateTableViewProcedure.class);
55+
private boolean replace;
56+
private TsTable oldView;
57+
private TableNodeStatus oldStatus;
58+
59+
public CreateTableViewProcedure(final boolean isGeneratedByPipe) {
60+
super(isGeneratedByPipe);
61+
}
62+
63+
public CreateTableViewProcedure(
64+
final String database,
65+
final TsTable table,
66+
final boolean replace,
67+
final boolean isGeneratedByPipe) {
68+
super(database, table, isGeneratedByPipe);
69+
this.replace = replace;
70+
}
71+
72+
@Override
73+
protected void checkTableExistence(final ConfigNodeProcedureEnv env) {
74+
if (!replace) {
75+
super.checkTableExistence(env);
76+
} else {
77+
try {
78+
final Optional<Pair<TsTable, TableNodeStatus>> oldTableAndStatus =
79+
env.getConfigManager()
80+
.getClusterSchemaManager()
81+
.getTableAndStatusIfExists(database, table.getTableName());
82+
if (oldTableAndStatus.isPresent()) {
83+
if (!TreeViewSchema.isTreeViewTable(oldTableAndStatus.get().getLeft())) {
84+
setFailure(
85+
new ProcedureException(
86+
new IoTDBException(
87+
String.format(
88+
"Table '%s.%s' already exists.", database, table.getTableName()),
89+
TABLE_ALREADY_EXISTS.getStatusCode())));
90+
return;
91+
} else {
92+
oldView = oldTableAndStatus.get().getLeft();
93+
oldStatus = oldTableAndStatus.get().getRight();
94+
}
95+
}
96+
final TDatabaseSchema schema =
97+
env.getConfigManager().getClusterSchemaManager().getDatabaseSchemaByName(database);
98+
if (!table.getPropValue(TsTable.TTL_PROPERTY).isPresent()
99+
&& schema.isSetTTL()
100+
&& schema.getTTL() != Long.MAX_VALUE) {
101+
table.addProp(TsTable.TTL_PROPERTY, String.valueOf(schema.getTTL()));
102+
}
103+
setNextState(CreateTableState.PRE_CREATE);
104+
} catch (final MetadataException | DatabaseNotExistsException e) {
105+
setFailure(new ProcedureException(e));
106+
}
107+
}
108+
final TSStatus status =
109+
new TreeDeviceViewFieldDetector(env.getConfigManager(), table, null)
110+
.detectMissingFieldTypes();
111+
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
112+
setFailure(new ProcedureException(new IoTDBException(status)));
113+
}
114+
}
115+
116+
@Override
117+
protected void preCreateTable(final ConfigNodeProcedureEnv env) {
118+
final TSStatus status =
119+
SchemaUtils.executeInConsensusLayer(
120+
new PreCreateTableViewPlan(database, table, TableNodeStatus.PRE_CREATE), env, LOGGER);
121+
if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
122+
setNextState(CreateTableState.PRE_RELEASE);
123+
} else {
124+
setFailure(new ProcedureException(new IoTDBException(status)));
125+
}
126+
}
127+
128+
@Override
129+
protected void rollbackCreate(final ConfigNodeProcedureEnv env) {
130+
if (Objects.isNull(oldView)) {
131+
super.rollbackCreate(env);
132+
return;
133+
}
134+
final TSStatus status =
135+
SchemaUtils.executeInConsensusLayer(
136+
new PreCreateTableViewPlan(database, oldView, oldStatus), env, LOGGER);
137+
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
138+
LOGGER.warn("Failed to rollback table creation {}.{}", database, table.getTableName());
139+
setFailure(new ProcedureException(new IoTDBException(status)));
140+
}
141+
}
142+
143+
@Override
144+
public void serialize(final DataOutputStream stream) throws IOException {
145+
stream.writeShort(
146+
isGeneratedByPipe
147+
? ProcedureType.PIPE_ENRICHED_CREATE_TABLE_VIEW_PROCEDURE.getTypeCode()
148+
: ProcedureType.CREATE_TABLE_VIEW_PROCEDURE.getTypeCode());
149+
innerSerialize(stream);
150+
ReadWriteIOUtils.write(replace, stream);
151+
152+
ReadWriteIOUtils.write(Objects.nonNull(oldView), stream);
153+
if (Objects.nonNull(oldView)) {
154+
oldView.serialize(stream);
155+
}
156+
157+
ReadWriteIOUtils.write(Objects.nonNull(oldStatus), stream);
158+
if (Objects.nonNull(oldStatus)) {
159+
oldStatus.serialize(stream);
160+
}
161+
}
162+
163+
@Override
164+
public void deserialize(final ByteBuffer byteBuffer) {
165+
super.deserialize(byteBuffer);
166+
replace = ReadWriteIOUtils.readBool(byteBuffer);
167+
168+
if (ReadWriteIOUtils.readBool(byteBuffer)) {
169+
this.oldView = TsTable.deserialize(byteBuffer);
170+
}
171+
172+
if (ReadWriteIOUtils.readBool(byteBuffer)) {
173+
this.oldStatus = TableNodeStatus.deserialize(byteBuffer);
174+
}
175+
}
176+
177+
@Override
178+
public boolean equals(final Object o) {
179+
return super.equals(o)
180+
&& replace == ((CreateTableViewProcedure) o).replace
181+
&& Objects.equals(oldView, ((CreateTableViewProcedure) o).oldView)
182+
&& Objects.equals(oldStatus, ((CreateTableViewProcedure) o).oldStatus);
183+
}
184+
185+
@Override
186+
public int hashCode() {
187+
return Objects.hash(super.hashCode(), replace, oldView, oldStatus);
188+
}
189+
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -382,8 +382,11 @@ protected void extractProgressReportEvent(final PipeRealtimeEvent event) {
382382
}
383383
pendingQueue.pollLast();
384384
}
385-
if (pendingQueue.peekLast() instanceof ProgressReportEvent) {
386-
final ProgressReportEvent oldEvent = (ProgressReportEvent) pendingQueue.peekLast();
385+
final Event last = pendingQueue.peekLast();
386+
if (last instanceof PipeRealtimeEvent
387+
&& ((PipeRealtimeEvent) last).getEvent() instanceof ProgressReportEvent) {
388+
final ProgressReportEvent oldEvent =
389+
(ProgressReportEvent) ((PipeRealtimeEvent) last).getEvent();
387390
oldEvent.bindProgressIndex(
388391
oldEvent
389392
.getProgressIndex()
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.db.queryengine.plan.execution.config.metadata.relational;
21+
22+
import org.apache.iotdb.commons.schema.column.ColumnHeader;
23+
import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
24+
import org.apache.iotdb.commons.schema.table.TreeViewSchema;
25+
import org.apache.iotdb.commons.schema.table.TsTable;
26+
import org.apache.iotdb.commons.schema.table.column.TsTableColumnSchema;
27+
import org.apache.iotdb.db.queryengine.common.header.DatasetHeader;
28+
import org.apache.iotdb.db.queryengine.common.header.DatasetHeaderFactory;
29+
import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
30+
import org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor;
31+
import org.apache.iotdb.rpc.TSStatusCode;
32+
33+
import com.google.common.util.concurrent.ListenableFuture;
34+
import com.google.common.util.concurrent.SettableFuture;
35+
import org.apache.tsfile.common.conf.TSFileConfig;
36+
import org.apache.tsfile.enums.TSDataType;
37+
import org.apache.tsfile.read.common.block.TsBlockBuilder;
38+
import org.apache.tsfile.utils.Binary;
39+
40+
import javax.annotation.Nonnull;
41+
42+
import java.util.List;
43+
import java.util.Objects;
44+
import java.util.stream.Collectors;
45+
46+
import static org.apache.iotdb.commons.conf.IoTDBConstant.TTL_INFINITE;
47+
48+
public class ShowCreateTableTask extends AbstractTableTask {
49+
public ShowCreateTableTask(final String database, final String tableName) {
50+
super(database, tableName);
51+
}
52+
53+
@Override
54+
public ListenableFuture<ConfigTaskResult> execute(final IConfigTaskExecutor configTaskExecutor)
55+
throws InterruptedException {
56+
return configTaskExecutor.describeTable(database, tableName, false, false);
57+
}
58+
59+
public static void buildTsBlock(
60+
final TsTable table, final SettableFuture<ConfigTaskResult> future) {
61+
if (TreeViewSchema.isTreeViewTable(table)) {
62+
ShowCreateViewTask.buildTsBlock(table, future);
63+
return;
64+
}
65+
final List<TSDataType> outputDataTypes =
66+
ColumnHeaderConstant.showCreateTableColumnHeaders.stream()
67+
.map(ColumnHeader::getColumnType)
68+
.collect(Collectors.toList());
69+
70+
final TsBlockBuilder builder = new TsBlockBuilder(outputDataTypes);
71+
builder.getTimeColumnBuilder().writeLong(0L);
72+
builder
73+
.getColumnBuilder(0)
74+
.writeBinary(new Binary(table.getTableName(), TSFileConfig.STRING_CHARSET));
75+
builder
76+
.getColumnBuilder(1)
77+
.writeBinary(new Binary(getShowCreateTableSQL(table), TSFileConfig.STRING_CHARSET));
78+
builder.declarePosition();
79+
80+
final DatasetHeader datasetHeader = DatasetHeaderFactory.getShowCreateTableColumnHeader();
81+
future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS, builder.build(), datasetHeader));
82+
}
83+
84+
private static String getShowCreateTableSQL(final TsTable table) {
85+
final StringBuilder builder =
86+
new StringBuilder("CREATE TABLE ").append(getIdentifier(table.getTableName())).append(" (");
87+
88+
for (final TsTableColumnSchema schema : table.getColumnList()) {
89+
switch (schema.getColumnCategory()) {
90+
case TAG:
91+
builder
92+
.append(getIdentifier(schema.getColumnName()))
93+
.append(" ")
94+
.append(schema.getDataType())
95+
.append(" ")
96+
.append("TAG");
97+
break;
98+
case TIME:
99+
builder
100+
.append(getIdentifier(schema.getColumnName()))
101+
.append(" ")
102+
.append(schema.getDataType())
103+
.append(" ")
104+
.append("TIME");
105+
break;
106+
case FIELD:
107+
builder
108+
.append(getIdentifier(schema.getColumnName()))
109+
.append(" ")
110+
.append(schema.getDataType())
111+
.append(" ")
112+
.append("FIELD");
113+
break;
114+
case ATTRIBUTE:
115+
builder
116+
.append(getIdentifier(schema.getColumnName()))
117+
.append(" ")
118+
.append(schema.getDataType())
119+
.append(" ")
120+
.append("ATTRIBUTE");
121+
break;
122+
default:
123+
throw new UnsupportedOperationException(
124+
"Unsupported column type: " + schema.getColumnCategory());
125+
}
126+
if (Objects.nonNull(schema.getProps().get(TsTable.COMMENT_KEY))) {
127+
builder.append(" COMMENT ").append(getString(schema.getProps().get(TsTable.COMMENT_KEY)));
128+
}
129+
builder.append(",");
130+
}
131+
132+
if (!table.getColumnList().isEmpty()) {
133+
builder.deleteCharAt(builder.length() - 1);
134+
}
135+
136+
builder.append(")");
137+
if (table.getPropValue(TsTable.COMMENT_KEY).isPresent()) {
138+
builder.append(" COMMENT ").append(getString(table.getPropValue(TsTable.COMMENT_KEY).get()));
139+
}
140+
141+
String ttlString = table.getPropValue(TsTable.TTL_PROPERTY).orElse(TTL_INFINITE);
142+
if (ttlString.equals(TTL_INFINITE)) {
143+
ttlString = "'" + ttlString + "'";
144+
}
145+
builder.append(" WITH (ttl=").append(ttlString).append(")");
146+
147+
return builder.toString();
148+
}
149+
150+
public static String getIdentifier(@Nonnull final String identifier) {
151+
return "\"" + identifier.replace("\"", "\"\"") + "\"";
152+
}
153+
154+
public static String getString(@Nonnull final String string) {
155+
return "'" + string.replace("'", "''") + "'";
156+
}
157+
}

0 commit comments

Comments
 (0)