diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java index 5f399ed0c46fd..ba71761964fd3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java @@ -19,6 +19,8 @@ import org.apache.ignite.internal.codegen.InetAddressMessageSerializer; import org.apache.ignite.internal.codegen.InetSocketAddressMessageSerializer; +import org.apache.ignite.internal.codegen.SchemaFinishDiscoveryMessageSerializer; +import org.apache.ignite.internal.codegen.SchemaProposeDiscoveryMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryAuthFailedMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryCheckFailedMessageSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryClientMetricsUpdateMessageSerializer; @@ -34,6 +36,8 @@ import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer; import org.apache.ignite.internal.codegen.TcpDiscoveryRingLatencyCheckMessageSerializer; +import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessage; +import org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage; import org.apache.ignite.plugin.extensions.communication.MessageFactory; import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider; import org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessage; @@ -62,6 +66,7 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider { factory.register((short)-101, InetSocketAddressMessage::new, new InetSocketAddressMessageSerializer()); factory.register((short)-100, InetAddressMessage::new, new InetAddressMessageSerializer()); + // TcpDiscoveryAbstractMessage factory.register((short)0, TcpDiscoveryCheckFailedMessage::new, new TcpDiscoveryCheckFailedMessageSerializer()); factory.register((short)1, TcpDiscoveryPingRequest::new, new TcpDiscoveryPingRequestSerializer()); factory.register((short)2, TcpDiscoveryPingResponse::new, new TcpDiscoveryPingResponseSerializer()); @@ -76,5 +81,9 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider { factory.register((short)11, TcpDiscoveryAuthFailedMessage::new, new TcpDiscoveryAuthFailedMessageSerializer()); factory.register((short)12, TcpDiscoveryDuplicateIdMessage::new, new TcpDiscoveryDuplicateIdMessageSerializer()); factory.register((short)13, TcpDiscoveryClientMetricsUpdateMessage::new, new TcpDiscoveryClientMetricsUpdateMessageSerializer()); + + // DiscoveryCustomMessage + factory.register((short)500, SchemaProposeDiscoveryMessage::new, new SchemaProposeDiscoveryMessageSerializer()); + factory.register((short)501, SchemaFinishDiscoveryMessage::new, new SchemaFinishDiscoveryMessageSerializer()); } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java index 894382807e1de..e09aa64f3cf0e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java @@ -909,9 +909,6 @@ private void onSchemaFinishDiscovery(SchemaFinishDiscoveryMessage msg) { } } - // Propose message will be used from exchange thread to - msg.proposeMessage(proposeMsg); - if (exchangeReady) { SchemaOperation op = schemaOps.get(proposeMsg.schemaName()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java index f55eae0922f46..aafcff1dcf99b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java @@ -17,29 +17,62 @@ package org.apache.ignite.internal.processors.query.schema.message; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.discovery.DiscoCache; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.query.schema.SchemaOperationException; import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation; import org.apache.ignite.internal.util.tostring.GridToStringInclude; import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.marshaller.Marshallers; +import org.apache.ignite.plugin.extensions.communication.Message; import org.jetbrains.annotations.Nullable; /** * Abstract discovery message for schema operations. */ -public abstract class SchemaAbstractDiscoveryMessage implements DiscoveryCustomMessage { +public abstract class SchemaAbstractDiscoveryMessage implements DiscoveryCustomMessage, Message { /** */ private static final long serialVersionUID = 0L; /** ID */ - private final IgniteUuid id = IgniteUuid.randomUuid(); + @Order(0) + private IgniteUuid id; /** Operation. */ @GridToStringInclude - protected final SchemaAbstractOperation op; + protected SchemaAbstractOperation op; + + /** + * Operation bytes. Serialized reprezentation of schema operation. + * TODO Should be removed in IGNITE-27559 + */ + @Order(value = 1, method = "operationBytes") + private byte[] opBytes; + + /** Error message. */ + @Order(value = 2, method = "errorMessage") + private String errMsg; + + /** Error code. */ + @Order(value = 3, method = "errorCode") + private int errCode; + + /** Error. */ + protected SchemaOperationException err; + + /** + * Constructor. + */ + protected SchemaAbstractDiscoveryMessage() { + // No-op. + } /** * Constructor. @@ -47,6 +80,9 @@ public abstract class SchemaAbstractDiscoveryMessage implements DiscoveryCustomM * @param op Operation. */ protected SchemaAbstractDiscoveryMessage(SchemaAbstractOperation op) { + id = IgniteUuid.randomUuid(); + errCode = -1; + this.op = op; } @@ -55,17 +91,99 @@ protected SchemaAbstractDiscoveryMessage(SchemaAbstractOperation op) { return id; } - /** {@inheritDoc} */ - @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, - AffinityTopologyVersion topVer, DiscoCache discoCache) { - throw new UnsupportedOperationException(); + /** + * @param id New iD + */ + public void id(IgniteUuid id) { + this.id = id; } /** * @return Operation. */ public SchemaAbstractOperation operation() { - return op; + try { + return op != null ? op : U.unmarshal(Marshallers.jdk(), opBytes, null); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to unmarshal schema operation", e); + } + } + + /** + * @return Operation bytes. + */ + public byte[] operationBytes() { + try { + return opBytes != null ? opBytes : U.marshal(Marshallers.jdk(), op); + } + catch (IgniteCheckedException e) { + throw new IgniteException("Failed to marshal schema operation", e); + } + } + + /** + * @param opBytes Operation bytes. + */ + public void operationBytes(byte[] opBytes) { + this.opBytes = opBytes; + } + + /** + * @return Error message. + */ + public String errorMessage() { + return errMsg; + } + + /** + * @param errMsg Error message. + */ + public void errorMessage(String errMsg) { + this.errMsg = errMsg; + } + + /** + * @return Error code. + */ + public int errorCode() { + return errCode; + } + + /** + * @param errCode Error code. + */ + public void errorCode(int errCode) { + this.errCode = errCode; + } + + /** + * Set error. + * + * @param err Error. + */ + public void onError(SchemaOperationException err) { + if (!hasError()) { + this.err = err; + + errMsg = err.getMessage(); + errCode = err.code(); + } + } + + /** + * @return {@code True} if error was reported during init. + */ + public boolean hasError() { + return err != null || errMsg != null || errCode > -1; + } + + /** + * @return Error message (if any). + */ + @Nullable public SchemaOperationException error() { + return !hasError() ? null : + err != null ? err : new SchemaOperationException(errMsg, errCode); } /** @@ -73,6 +191,12 @@ public SchemaAbstractOperation operation() { */ public abstract boolean exchange(); + /** {@inheritDoc} */ + @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr, + AffinityTopologyVersion topVer, DiscoCache discoCache) { + throw new UnsupportedOperationException(); + } + /** {@inheritDoc} */ @Override public String toString() { return S.toString(SchemaAbstractDiscoveryMessage.class, this); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java index 3ae253cb1d715..4b68e456bf0e7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.processors.query.schema.message; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; import org.apache.ignite.internal.processors.query.schema.SchemaOperationException; import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation; @@ -30,14 +31,16 @@ public class SchemaFinishDiscoveryMessage extends SchemaAbstractDiscoveryMessage /** */ private static final long serialVersionUID = 0L; - /** Error. */ - private final SchemaOperationException err; - - /** Original propose message. */ - private transient SchemaProposeDiscoveryMessage proposeMsg; - /** No-op flag. */ - private final boolean nop; + @Order(4) + private boolean nop; + + /** + * Constructor. + */ + public SchemaFinishDiscoveryMessage() { + // No-op. + } /** * Constructor. @@ -69,42 +72,26 @@ public SchemaFinishDiscoveryMessage(SchemaAbstractOperation op, SchemaOperationE } /** - * @return {@code True} if error was reported during init. - */ - public boolean hasError() { - return err != null; - } - - /** - * @return Error message (if any). - */ - @Nullable public SchemaOperationException error() { - return err; - } - - /** - * @return Propose message. - */ - public SchemaProposeDiscoveryMessage proposeMessage() { - return proposeMsg; - } - - /** - * @param proposeMsg Propose message. + * @return True if message in no-op. */ - public void proposeMessage(SchemaProposeDiscoveryMessage proposeMsg) { - this.proposeMsg = proposeMsg; + public boolean nop() { + return nop; } /** - * @return True if message in no-op. + * @param nop No-op flag. */ - public boolean nop() { - return nop; + public void nop(boolean nop) { + this.nop = nop; } /** {@inheritDoc} */ @Override public String toString() { return S.toString(SchemaFinishDiscoveryMessage.class, this, "parent", super.toString()); } + + /** {@inheritDoc} */ + @Override public short directType() { + return 501; + } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java index 0e1270b17b623..b3485055476e5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java @@ -17,8 +17,8 @@ package org.apache.ignite.internal.processors.query.schema.message; +import org.apache.ignite.internal.Order; import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage; -import org.apache.ignite.internal.processors.query.schema.SchemaOperationException; import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.lang.IgniteUuid; @@ -32,13 +32,18 @@ public class SchemaProposeDiscoveryMessage extends SchemaAbstractDiscoveryMessag private static final long serialVersionUID = 0L; /** Cache deployment ID. */ + @Order(value = 4, method = "deploymentId") private IgniteUuid depId; - /** Error. */ - private SchemaOperationException err; - /** Whether to perform exchange. */ - private transient boolean exchange; + private boolean exchange; + + /** + * Constructor. + */ + public SchemaProposeDiscoveryMessage() { + // No-op. + } /** * Constructor. @@ -93,30 +98,6 @@ public boolean initialized() { return deploymentId() != null || hasError(); } - /** - * Set error. - * - * @param err Error. - */ - public void onError(SchemaOperationException err) { - if (!hasError()) - this.err = err; - } - - /** - * @return {@code True} if error was reported during init. - */ - public boolean hasError() { - return err != null; - } - - /** - * @return Error message (if any). - */ - @Nullable public SchemaOperationException error() { - return err; - } - /** * @return Schema name. */ @@ -128,4 +109,9 @@ public String schemaName() { @Override public String toString() { return S.toString(SchemaProposeDiscoveryMessage.class, this, "parent", super.toString()); } + + /** {@inheritDoc} */ + @Override public short directType() { + return 500; + } } diff --git a/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java b/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java index 046124622ca99..f19b39ea6bc5d 100644 --- a/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java +++ b/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java @@ -128,6 +128,10 @@ static class ExchangeWorkerFailureTask extends SchemaExchangeWorkerTask { @Override public boolean isMutable() { return false; } + + @Override public short directType() { + return 0; + } }); }