From 15cd341789dac94b5f5a9a148b62c4be1435aca0 Mon Sep 17 00:00:00 2001 From: Ilya Shishkov Date: Wed, 14 Jan 2026 14:37:58 +0300 Subject: [PATCH 1/4] IGNITE-27553 Use MessageSerializer for SchemaAbstractDiscoveryMessage and successors --- .../discovery/DiscoveryMessageFactory.java | 7 + .../processors/query/GridQueryProcessor.java | 3 - .../SchemaAbstractDiscoveryMessage.java | 138 +++++++++++++++++- .../message/SchemaFinishDiscoveryMessage.java | 55 +++---- .../SchemaProposeDiscoveryMessage.java | 42 ++---- .../failure/FailureHandlerTriggeredTest.java | 4 + 6 files changed, 176 insertions(+), 73 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index 5f399ed0c46fdf..191b02bb5cd157 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -19,6 +19,8 @@ import org.apache.ignite.internal.codegen.InetAddressMessageSerializer; import org.apache.ignite.internal.codegen.InetSocketAddressMessageSerializer; +import org.apache.ignite.internal.codegen.SchemaFinishDiscoveryMessageSerializer; +import org.apache.ignite.internal.codegen.SchemaProposeDiscoveryMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryAuthFailedMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryCheckFailedMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryClientMetricsUpdateMessageSerializer; @@ -34,6 +36,8 @@ import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryRingLatencyCheckMessageSerializer; +import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessage; +import org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage; import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; import org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessage; @@ -76,5 +80,8 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider { factory.register((short)11, TcpDiscoveryAuthFailedMessage::new, new TcpDiscoveryAuthFailedMessageSerializer()); factory.register((short)12, TcpDiscoveryDuplicateIdMessage::new, new TcpDiscoveryDuplicateIdMessageSerializer()); factory.register((short)13, TcpDiscoveryClientMetricsUpdateMessage::new, new TcpDiscoveryClientMetricsUpdateMessageSerializer()); + + factory.register((short)500, SchemaProposeDiscoveryMessage::new, new SchemaProposeDiscoveryMessageSerializer()); + factory.register((short)501, SchemaFinishDiscoveryMessage::new, new SchemaFinishDiscoveryMessageSerializer()); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 894382807e1de4..e09aa64f3cf0ef 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -909,9 +909,6 @@ private void onSchemaFinishDiscovery(SchemaFinishDiscoveryMessage msg) { } } - // Propose message will be used from exchange thread to - msg.proposeMessage(proposeMsg); - if (exchangeReady) { SchemaOperation op = schemaOps.get(proposeMsg.schemaName()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java index f55eae0922f46f..ff70889f56cab6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java @@ -17,29 +17,62 @@ package org.apache.ignite.internal.processors.query.schema.message; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.query.schema.SchemaOperationException; import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.marshaller.Marshallers; +import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; /** * Abstract discovery message for schema operations. */ -public abstract class SchemaAbstractDiscoveryMessage implements DiscoveryCustomMessage { +public abstract class SchemaAbstractDiscoveryMessage implements DiscoveryCustomMessage, Message { /** */ private static final long serialVersionUID = 0L; /** ID */ - private final IgniteUuid id = IgniteUuid.randomUuid(); + @Order(0) + private IgniteUuid id; /** Operation. */ @GridToStringInclude - protected final SchemaAbstractOperation op; + protected SchemaAbstractOperation op; + + /** + * Operation bytes. Serialized reprezentation of schema operation. + * TODO Should be removed in IGNITE-27559 + */ + @Order(value = 1, method = "operationBytes") + private byte[] opBytes; + + /** Error message. */ + @Order(value = 2, method = "errorMessage") + private String errMsg; + + /** Error code. */ + @Order(value = 3, method = "errorCode") + private int errCode = -1; + + /** Error. */ + protected SchemaOperationException err; + + /** + * Constructor. + */ + protected SchemaAbstractDiscoveryMessage() { + // No-op. + } /** * Constructor. @@ -47,6 +80,7 @@ public abstract class SchemaAbstractDiscoveryMessage implements DiscoveryCustomM * @param op Operation. */ protected SchemaAbstractDiscoveryMessage(SchemaAbstractOperation op) { + id = IgniteUuid.randomUuid(); this.op = op; } @@ -55,17 +89,99 @@ protected SchemaAbstractDiscoveryMessage(SchemaAbstractOperation op) { return id; } - /** {@inheritDoc} */ - @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, - AffinityTopologyVersion topVer, DiscoCache discoCache) { - throw new UnsupportedOperationException(); + /** + * @param id New iD + */ + public void id(IgniteUuid id) { + this.id = id; } /** * @return Operation. */ public SchemaAbstractOperation operation() { - return op; + try { + return op != null ? op : U.unmarshal(Marshallers.jdk(), opBytes, null); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to unmarshal schema operation", e); + } + } + + /** + * @return Operation bytes. + */ + public byte[] operationBytes() { + try { + return opBytes != null ? opBytes : U.marshal(Marshallers.jdk(), op); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to marshal schema operation", e); + } + } + + /** + * @param opBytes Operation bytes. + */ + public void operationBytes(byte[] opBytes) { + this.opBytes = opBytes; + } + + /** + * @return Error message. + */ + public String errorMessage() { + return errMsg; + } + + /** + * @param errMsg Error message. + */ + public void errorMessage(String errMsg) { + this.errMsg = errMsg; + } + + /** + * @return Error code. + */ + public int errorCode() { + return errCode; + } + + /** + * @param errCode Error code. + */ + public void errorCode(int errCode) { + this.errCode = errCode; + } + + /** + * Set error. + * + * @param err Error. + */ + public void onError(SchemaOperationException err) { + if (!hasError()) { + this.err = err; + + errMsg = err.getMessage(); + errCode = err.code(); + } + } + + /** + * @return {@code True} if error was reported during init. + */ + public boolean hasError() { + return err != null || errMsg != null || errCode > -1; + } + + /** + * @return Error message (if any). + */ + @Nullable public SchemaOperationException error() { + return !hasError() ? null : + err != null ? err : new SchemaOperationException(errMsg, errCode); } /** @@ -73,6 +189,12 @@ public SchemaAbstractOperation operation() { */ public abstract boolean exchange(); + /** {@inheritDoc} */ + @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, + AffinityTopologyVersion topVer, DiscoCache discoCache) { + throw new UnsupportedOperationException(); + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(SchemaAbstractDiscoveryMessage.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java index 3ae253cb1d7152..4b68e456bf0e79 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.query.schema.message; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.processors.query.schema.SchemaOperationException; import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation; @@ -30,14 +31,16 @@ public class SchemaFinishDiscoveryMessage extends SchemaAbstractDiscoveryMessage /** */ private static final long serialVersionUID = 0L; - /** Error. */ - private final SchemaOperationException err; - - /** Original propose message. */ - private transient SchemaProposeDiscoveryMessage proposeMsg; - /** No-op flag. */ - private final boolean nop; + @Order(4) + private boolean nop; + + /** + * Constructor. + */ + public SchemaFinishDiscoveryMessage() { + // No-op. + } /** * Constructor. @@ -69,42 +72,26 @@ public SchemaFinishDiscoveryMessage(SchemaAbstractOperation op, SchemaOperationE } /** - * @return {@code True} if error was reported during init. - */ - public boolean hasError() { - return err != null; - } - - /** - * @return Error message (if any). - */ - @Nullable public SchemaOperationException error() { - return err; - } - - /** - * @return Propose message. - */ - public SchemaProposeDiscoveryMessage proposeMessage() { - return proposeMsg; - } - - /** - * @param proposeMsg Propose message. + * @return True if message in no-op. */ - public void proposeMessage(SchemaProposeDiscoveryMessage proposeMsg) { - this.proposeMsg = proposeMsg; + public boolean nop() { + return nop; } /** - * @return True if message in no-op. + * @param nop No-op flag. */ - public boolean nop() { - return nop; + public void nop(boolean nop) { + this.nop = nop; } /** {@inheritDoc} */ @Override public String toString() { return S.toString(SchemaFinishDiscoveryMessage.class, this, "parent", super.toString()); } + + /** {@inheritDoc} */ + @Override public short directType() { + return 501; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java index 0e1270b17b623c..1ce78f361c1253 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java @@ -17,8 +17,8 @@ package org.apache.ignite.internal.processors.query.schema.message; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; -import org.apache.ignite.internal.processors.query.schema.SchemaOperationException; import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; @@ -32,14 +32,19 @@ public class SchemaProposeDiscoveryMessage extends SchemaAbstractDiscoveryMessag private static final long serialVersionUID = 0L; /** Cache deployment ID. */ + @Order(value = 4, method = "deploymentId") private IgniteUuid depId; - /** Error. */ - private SchemaOperationException err; - /** Whether to perform exchange. */ private transient boolean exchange; + /** + * Constructor. + */ + public SchemaProposeDiscoveryMessage() { + // No-op. + } + /** * Constructor. * @@ -93,30 +98,6 @@ public boolean initialized() { return deploymentId() != null || hasError(); } - /** - * Set error. - * - * @param err Error. - */ - public void onError(SchemaOperationException err) { - if (!hasError()) - this.err = err; - } - - /** - * @return {@code True} if error was reported during init. - */ - public boolean hasError() { - return err != null; - } - - /** - * @return Error message (if any). - */ - @Nullable public SchemaOperationException error() { - return err; - } - /** * @return Schema name. */ @@ -128,4 +109,9 @@ public String schemaName() { @Override public String toString() { return S.toString(SchemaProposeDiscoveryMessage.class, this, "parent", super.toString()); } + + /** {@inheritDoc} */ + @Override public short directType() { + return 500; + } } diff --git a/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java b/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java index 046124622ca991..f19b39ea6bc5da 100644 --- a/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java +++ b/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java @@ -128,6 +128,10 @@ static class ExchangeWorkerFailureTask extends SchemaExchangeWorkerTask { @Override public boolean isMutable() { return false; } + + @Override public short directType() { + return 0; + } }); } From 38d3e1e3a86191186d9b72c7b9bdd04685f0809c Mon Sep 17 00:00:00 2001 From: Ilya Shishkov Date: Wed, 14 Jan 2026 14:40:52 +0300 Subject: [PATCH 2/4] WIP --- .../internal/managers/discovery/DiscoveryMessageFactory.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index 191b02bb5cd157..ba71761964fd3c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -66,6 +66,7 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider { factory.register((short)-101, InetSocketAddressMessage::new, new InetSocketAddressMessageSerializer()); factory.register((short)-100, InetAddressMessage::new, new InetAddressMessageSerializer()); + // TcpDiscoveryAbstractMessage factory.register((short)0, TcpDiscoveryCheckFailedMessage::new, new TcpDiscoveryCheckFailedMessageSerializer()); factory.register((short)1, TcpDiscoveryPingRequest::new, new TcpDiscoveryPingRequestSerializer()); factory.register((short)2, TcpDiscoveryPingResponse::new, new TcpDiscoveryPingResponseSerializer()); @@ -81,6 +82,7 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider { factory.register((short)12, TcpDiscoveryDuplicateIdMessage::new, new TcpDiscoveryDuplicateIdMessageSerializer()); factory.register((short)13, TcpDiscoveryClientMetricsUpdateMessage::new, new TcpDiscoveryClientMetricsUpdateMessageSerializer()); + // DiscoveryCustomMessage factory.register((short)500, SchemaProposeDiscoveryMessage::new, new SchemaProposeDiscoveryMessageSerializer()); factory.register((short)501, SchemaFinishDiscoveryMessage::new, new SchemaFinishDiscoveryMessageSerializer()); } From 94a1a2e68eb6f0d1751f2e1d36f6df52db31bb29 Mon Sep 17 00:00:00 2001 From: Ilya Shishkov Date: Wed, 14 Jan 2026 16:28:12 +0300 Subject: [PATCH 3/4] WIP --- .../query/schema/message/SchemaAbstractDiscoveryMessage.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java index ff70889f56cab6..aafcff1dcf99b6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java @@ -62,7 +62,7 @@ public abstract class SchemaAbstractDiscoveryMessage implements DiscoveryCustomM /** Error code. */ @Order(value = 3, method = "errorCode") - private int errCode = -1; + private int errCode; /** Error. */ protected SchemaOperationException err; @@ -81,6 +81,8 @@ protected SchemaAbstractDiscoveryMessage() { */ protected SchemaAbstractDiscoveryMessage(SchemaAbstractOperation op) { id = IgniteUuid.randomUuid(); + errCode = -1; + this.op = op; } From bb36791096ad1bc43ee2e3065b97203accc2a986 Mon Sep 17 00:00:00 2001 From: Ilya Shishkov Date: Wed, 14 Jan 2026 16:35:01 +0300 Subject: [PATCH 4/4] WIP --- .../query/schema/message/SchemaProposeDiscoveryMessage.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java index 1ce78f361c1253..b3485055476e55 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java @@ -36,7 +36,7 @@ public class SchemaProposeDiscoveryMessage extends SchemaAbstractDiscoveryMessag private IgniteUuid depId; /** Whether to perform exchange. */ - private transient boolean exchange; + private boolean exchange; /** * Constructor.