diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
index 5f399ed0c46fd..ba71761964fd3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryMessageFactory.java
@@ -19,6 +19,8 @@
import org.apache.ignite.internal.codegen.InetAddressMessageSerializer;
import org.apache.ignite.internal.codegen.InetSocketAddressMessageSerializer;
+import org.apache.ignite.internal.codegen.SchemaFinishDiscoveryMessageSerializer;
+import org.apache.ignite.internal.codegen.SchemaProposeDiscoveryMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryAuthFailedMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryCheckFailedMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryClientMetricsUpdateMessageSerializer;
@@ -34,6 +36,8 @@
import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryRingLatencyCheckMessageSerializer;
+import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessage;
+import org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
import org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessage;
@@ -62,6 +66,7 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider {
factory.register((short)-101, InetSocketAddressMessage::new, new InetSocketAddressMessageSerializer());
factory.register((short)-100, InetAddressMessage::new, new InetAddressMessageSerializer());
+ // TcpDiscoveryAbstractMessage
factory.register((short)0, TcpDiscoveryCheckFailedMessage::new, new TcpDiscoveryCheckFailedMessageSerializer());
factory.register((short)1, TcpDiscoveryPingRequest::new, new TcpDiscoveryPingRequestSerializer());
factory.register((short)2, TcpDiscoveryPingResponse::new, new TcpDiscoveryPingResponseSerializer());
@@ -76,5 +81,9 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider {
factory.register((short)11, TcpDiscoveryAuthFailedMessage::new, new TcpDiscoveryAuthFailedMessageSerializer());
factory.register((short)12, TcpDiscoveryDuplicateIdMessage::new, new TcpDiscoveryDuplicateIdMessageSerializer());
factory.register((short)13, TcpDiscoveryClientMetricsUpdateMessage::new, new TcpDiscoveryClientMetricsUpdateMessageSerializer());
+
+ // DiscoveryCustomMessage
+ factory.register((short)500, SchemaProposeDiscoveryMessage::new, new SchemaProposeDiscoveryMessageSerializer());
+ factory.register((short)501, SchemaFinishDiscoveryMessage::new, new SchemaFinishDiscoveryMessageSerializer());
}
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
index 894382807e1de..e09aa64f3cf0e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/GridQueryProcessor.java
@@ -909,9 +909,6 @@ private void onSchemaFinishDiscovery(SchemaFinishDiscoveryMessage msg) {
}
}
- // Propose message will be used from exchange thread to
- msg.proposeMessage(proposeMsg);
-
if (exchangeReady) {
SchemaOperation op = schemaOps.get(proposeMsg.schemaName());
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java
index f55eae0922f46..aafcff1dcf99b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaAbstractDiscoveryMessage.java
@@ -17,29 +17,62 @@
package org.apache.ignite.internal.processors.query.schema.message;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.marshaller.Marshallers;
+import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;
/**
* Abstract discovery message for schema operations.
*/
-public abstract class SchemaAbstractDiscoveryMessage implements DiscoveryCustomMessage {
+public abstract class SchemaAbstractDiscoveryMessage implements DiscoveryCustomMessage, Message {
/** */
private static final long serialVersionUID = 0L;
/** ID */
- private final IgniteUuid id = IgniteUuid.randomUuid();
+ @Order(0)
+ private IgniteUuid id;
/** Operation. */
@GridToStringInclude
- protected final SchemaAbstractOperation op;
+ protected SchemaAbstractOperation op;
+
+ /**
+ * Operation bytes. Serialized reprezentation of schema operation.
+ * TODO Should be removed in IGNITE-27559
+ */
+ @Order(value = 1, method = "operationBytes")
+ private byte[] opBytes;
+
+ /** Error message. */
+ @Order(value = 2, method = "errorMessage")
+ private String errMsg;
+
+ /** Error code. */
+ @Order(value = 3, method = "errorCode")
+ private int errCode;
+
+ /** Error. */
+ protected SchemaOperationException err;
+
+ /**
+ * Constructor.
+ */
+ protected SchemaAbstractDiscoveryMessage() {
+ // No-op.
+ }
/**
* Constructor.
@@ -47,6 +80,9 @@ public abstract class SchemaAbstractDiscoveryMessage implements DiscoveryCustomM
* @param op Operation.
*/
protected SchemaAbstractDiscoveryMessage(SchemaAbstractOperation op) {
+ id = IgniteUuid.randomUuid();
+ errCode = -1;
+
this.op = op;
}
@@ -55,17 +91,99 @@ protected SchemaAbstractDiscoveryMessage(SchemaAbstractOperation op) {
return id;
}
- /** {@inheritDoc} */
- @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
- AffinityTopologyVersion topVer, DiscoCache discoCache) {
- throw new UnsupportedOperationException();
+ /**
+ * @param id New iD
+ */
+ public void id(IgniteUuid id) {
+ this.id = id;
}
/**
* @return Operation.
*/
public SchemaAbstractOperation operation() {
- return op;
+ try {
+ return op != null ? op : U.unmarshal(Marshallers.jdk(), opBytes, null);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException("Failed to unmarshal schema operation", e);
+ }
+ }
+
+ /**
+ * @return Operation bytes.
+ */
+ public byte[] operationBytes() {
+ try {
+ return opBytes != null ? opBytes : U.marshal(Marshallers.jdk(), op);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException("Failed to marshal schema operation", e);
+ }
+ }
+
+ /**
+ * @param opBytes Operation bytes.
+ */
+ public void operationBytes(byte[] opBytes) {
+ this.opBytes = opBytes;
+ }
+
+ /**
+ * @return Error message.
+ */
+ public String errorMessage() {
+ return errMsg;
+ }
+
+ /**
+ * @param errMsg Error message.
+ */
+ public void errorMessage(String errMsg) {
+ this.errMsg = errMsg;
+ }
+
+ /**
+ * @return Error code.
+ */
+ public int errorCode() {
+ return errCode;
+ }
+
+ /**
+ * @param errCode Error code.
+ */
+ public void errorCode(int errCode) {
+ this.errCode = errCode;
+ }
+
+ /**
+ * Set error.
+ *
+ * @param err Error.
+ */
+ public void onError(SchemaOperationException err) {
+ if (!hasError()) {
+ this.err = err;
+
+ errMsg = err.getMessage();
+ errCode = err.code();
+ }
+ }
+
+ /**
+ * @return {@code True} if error was reported during init.
+ */
+ public boolean hasError() {
+ return err != null || errMsg != null || errCode > -1;
+ }
+
+ /**
+ * @return Error message (if any).
+ */
+ @Nullable public SchemaOperationException error() {
+ return !hasError() ? null :
+ err != null ? err : new SchemaOperationException(errMsg, errCode);
}
/**
@@ -73,6 +191,12 @@ public SchemaAbstractOperation operation() {
*/
public abstract boolean exchange();
+ /** {@inheritDoc} */
+ @Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
+ AffinityTopologyVersion topVer, DiscoCache discoCache) {
+ throw new UnsupportedOperationException();
+ }
+
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(SchemaAbstractDiscoveryMessage.class, this);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java
index 3ae253cb1d715..4b68e456bf0e7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaFinishDiscoveryMessage.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.query.schema.message;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation;
@@ -30,14 +31,16 @@ public class SchemaFinishDiscoveryMessage extends SchemaAbstractDiscoveryMessage
/** */
private static final long serialVersionUID = 0L;
- /** Error. */
- private final SchemaOperationException err;
-
- /** Original propose message. */
- private transient SchemaProposeDiscoveryMessage proposeMsg;
-
/** No-op flag. */
- private final boolean nop;
+ @Order(4)
+ private boolean nop;
+
+ /**
+ * Constructor.
+ */
+ public SchemaFinishDiscoveryMessage() {
+ // No-op.
+ }
/**
* Constructor.
@@ -69,42 +72,26 @@ public SchemaFinishDiscoveryMessage(SchemaAbstractOperation op, SchemaOperationE
}
/**
- * @return {@code True} if error was reported during init.
- */
- public boolean hasError() {
- return err != null;
- }
-
- /**
- * @return Error message (if any).
- */
- @Nullable public SchemaOperationException error() {
- return err;
- }
-
- /**
- * @return Propose message.
- */
- public SchemaProposeDiscoveryMessage proposeMessage() {
- return proposeMsg;
- }
-
- /**
- * @param proposeMsg Propose message.
+ * @return True if message in no-op.
*/
- public void proposeMessage(SchemaProposeDiscoveryMessage proposeMsg) {
- this.proposeMsg = proposeMsg;
+ public boolean nop() {
+ return nop;
}
/**
- * @return True if message in no-op.
+ * @param nop No-op flag.
*/
- public boolean nop() {
- return nop;
+ public void nop(boolean nop) {
+ this.nop = nop;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(SchemaFinishDiscoveryMessage.class, this, "parent", super.toString());
}
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 501;
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java
index 0e1270b17b623..b3485055476e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/query/schema/message/SchemaProposeDiscoveryMessage.java
@@ -17,8 +17,8 @@
package org.apache.ignite.internal.processors.query.schema.message;
+import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
-import org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
@@ -32,13 +32,18 @@ public class SchemaProposeDiscoveryMessage extends SchemaAbstractDiscoveryMessag
private static final long serialVersionUID = 0L;
/** Cache deployment ID. */
+ @Order(value = 4, method = "deploymentId")
private IgniteUuid depId;
- /** Error. */
- private SchemaOperationException err;
-
/** Whether to perform exchange. */
- private transient boolean exchange;
+ private boolean exchange;
+
+ /**
+ * Constructor.
+ */
+ public SchemaProposeDiscoveryMessage() {
+ // No-op.
+ }
/**
* Constructor.
@@ -93,30 +98,6 @@ public boolean initialized() {
return deploymentId() != null || hasError();
}
- /**
- * Set error.
- *
- * @param err Error.
- */
- public void onError(SchemaOperationException err) {
- if (!hasError())
- this.err = err;
- }
-
- /**
- * @return {@code True} if error was reported during init.
- */
- public boolean hasError() {
- return err != null;
- }
-
- /**
- * @return Error message (if any).
- */
- @Nullable public SchemaOperationException error() {
- return err;
- }
-
/**
* @return Schema name.
*/
@@ -128,4 +109,9 @@ public String schemaName() {
@Override public String toString() {
return S.toString(SchemaProposeDiscoveryMessage.class, this, "parent", super.toString());
}
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 500;
+ }
}
diff --git a/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java b/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java
index 046124622ca99..f19b39ea6bc5d 100644
--- a/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/failure/FailureHandlerTriggeredTest.java
@@ -128,6 +128,10 @@ static class ExchangeWorkerFailureTask extends SchemaExchangeWorkerTask {
@Override public boolean isMutable() {
return false;
}
+
+ @Override public short directType() {
+ return 0;
+ }
});
}