Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import org.apache.ignite.internal.codegen.InetAddressMessageSerializer;
import org.apache.ignite.internal.codegen.InetSocketAddressMessageSerializer;
import org.apache.ignite.internal.codegen.SchemaFinishDiscoveryMessageSerializer;
import org.apache.ignite.internal.codegen.SchemaProposeDiscoveryMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryAuthFailedMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryCheckFailedMessageSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryClientMetricsUpdateMessageSerializer;
Expand All @@ -34,6 +36,8 @@
import org.apache.ignite.internal.codegen.TcpDiscoveryPingRequestSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryPingResponseSerializer;
import org.apache.ignite.internal.codegen.TcpDiscoveryRingLatencyCheckMessageSerializer;
import org.apache.ignite.internal.processors.query.schema.message.SchemaFinishDiscoveryMessage;
import org.apache.ignite.internal.processors.query.schema.message.SchemaProposeDiscoveryMessage;
import org.apache.ignite.plugin.extensions.communication.MessageFactory;
import org.apache.ignite.plugin.extensions.communication.MessageFactoryProvider;
import org.apache.ignite.spi.discovery.tcp.messages.InetAddressMessage;
Expand Down Expand Up @@ -62,6 +66,7 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider {
factory.register((short)-101, InetSocketAddressMessage::new, new InetSocketAddressMessageSerializer());
factory.register((short)-100, InetAddressMessage::new, new InetAddressMessageSerializer());

// TcpDiscoveryAbstractMessage
factory.register((short)0, TcpDiscoveryCheckFailedMessage::new, new TcpDiscoveryCheckFailedMessageSerializer());
factory.register((short)1, TcpDiscoveryPingRequest::new, new TcpDiscoveryPingRequestSerializer());
factory.register((short)2, TcpDiscoveryPingResponse::new, new TcpDiscoveryPingResponseSerializer());
Expand All @@ -76,5 +81,9 @@ public class DiscoveryMessageFactory implements MessageFactoryProvider {
factory.register((short)11, TcpDiscoveryAuthFailedMessage::new, new TcpDiscoveryAuthFailedMessageSerializer());
factory.register((short)12, TcpDiscoveryDuplicateIdMessage::new, new TcpDiscoveryDuplicateIdMessageSerializer());
factory.register((short)13, TcpDiscoveryClientMetricsUpdateMessage::new, new TcpDiscoveryClientMetricsUpdateMessageSerializer());

// DiscoveryCustomMessage
factory.register((short)500, SchemaProposeDiscoveryMessage::new, new SchemaProposeDiscoveryMessageSerializer());
factory.register((short)501, SchemaFinishDiscoveryMessage::new, new SchemaFinishDiscoveryMessageSerializer());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -909,9 +909,6 @@ private void onSchemaFinishDiscovery(SchemaFinishDiscoveryMessage msg) {
}
}

// Propose message will be used from exchange thread to
msg.proposeMessage(proposeMsg);

if (exchangeReady) {
SchemaOperation op = schemaOps.get(proposeMsg.schemaName());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,72 @@

package org.apache.ignite.internal.processors.query.schema.message;

import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.marshaller.Marshallers;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.jetbrains.annotations.Nullable;

/**
* Abstract discovery message for schema operations.
*/
public abstract class SchemaAbstractDiscoveryMessage implements DiscoveryCustomMessage {
public abstract class SchemaAbstractDiscoveryMessage implements DiscoveryCustomMessage, Message {
/** */
private static final long serialVersionUID = 0L;

/** ID */
private final IgniteUuid id = IgniteUuid.randomUuid();
@Order(0)
private IgniteUuid id;

/** Operation. */
@GridToStringInclude
protected final SchemaAbstractOperation op;
protected SchemaAbstractOperation op;

/**
* Operation bytes. Serialized reprezentation of schema operation.
* TODO Should be removed in IGNITE-27559
*/
@Order(value = 1, method = "operationBytes")
private byte[] opBytes;

/** Error message. */
@Order(value = 2, method = "errorMessage")
private String errMsg;

/** Error code. */
@Order(value = 3, method = "errorCode")
private int errCode;

/** Error. */
protected SchemaOperationException err;

/**
* Constructor.
*/
protected SchemaAbstractDiscoveryMessage() {
// No-op.
}

/**
* Constructor.
*
* @param op Operation.
*/
protected SchemaAbstractDiscoveryMessage(SchemaAbstractOperation op) {
id = IgniteUuid.randomUuid();
errCode = -1;

this.op = op;
}

Expand All @@ -55,24 +91,112 @@ protected SchemaAbstractDiscoveryMessage(SchemaAbstractOperation op) {
return id;
}

/** {@inheritDoc} */
@Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
AffinityTopologyVersion topVer, DiscoCache discoCache) {
throw new UnsupportedOperationException();
/**
* @param id New iD
*/
public void id(IgniteUuid id) {
this.id = id;
}

/**
* @return Operation.
*/
public SchemaAbstractOperation operation() {
return op;
try {
return op != null ? op : U.unmarshal(Marshallers.jdk(), opBytes, null);
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to unmarshal schema operation", e);
}
}

/**
* @return Operation bytes.
*/
public byte[] operationBytes() {
try {
return opBytes != null ? opBytes : U.marshal(Marshallers.jdk(), op);
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to marshal schema operation", e);
}
}

/**
* @param opBytes Operation bytes.
*/
public void operationBytes(byte[] opBytes) {
this.opBytes = opBytes;
}

/**
* @return Error message.
*/
public String errorMessage() {
return errMsg;
}

/**
* @param errMsg Error message.
*/
public void errorMessage(String errMsg) {
this.errMsg = errMsg;
}

/**
* @return Error code.
*/
public int errorCode() {
return errCode;
}

/**
* @param errCode Error code.
*/
public void errorCode(int errCode) {
this.errCode = errCode;
}

/**
* Set error.
*
* @param err Error.
*/
public void onError(SchemaOperationException err) {
if (!hasError()) {
this.err = err;

errMsg = err.getMessage();
errCode = err.code();
}
}

/**
* @return {@code True} if error was reported during init.
*/
public boolean hasError() {
return err != null || errMsg != null || errCode > -1;
}

/**
* @return Error message (if any).
*/
@Nullable public SchemaOperationException error() {
return !hasError() ? null :
err != null ? err : new SchemaOperationException(errMsg, errCode);
}

/**
* @return Whether request must be propagated to exchange thread.
*/
public abstract boolean exchange();

/** {@inheritDoc} */
@Nullable @Override public DiscoCache createDiscoCache(GridDiscoveryManager mgr,
AffinityTopologyVersion topVer, DiscoCache discoCache) {
throw new UnsupportedOperationException();
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(SchemaAbstractDiscoveryMessage.class, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.ignite.internal.processors.query.schema.message;

import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation;
Expand All @@ -30,14 +31,16 @@ public class SchemaFinishDiscoveryMessage extends SchemaAbstractDiscoveryMessage
/** */
private static final long serialVersionUID = 0L;

/** Error. */
private final SchemaOperationException err;

/** Original propose message. */
private transient SchemaProposeDiscoveryMessage proposeMsg;

/** No-op flag. */
private final boolean nop;
@Order(4)
private boolean nop;

/**
* Constructor.
*/
public SchemaFinishDiscoveryMessage() {
// No-op.
}

/**
* Constructor.
Expand Down Expand Up @@ -69,42 +72,26 @@ public SchemaFinishDiscoveryMessage(SchemaAbstractOperation op, SchemaOperationE
}

/**
* @return {@code True} if error was reported during init.
*/
public boolean hasError() {
return err != null;
}

/**
* @return Error message (if any).
*/
@Nullable public SchemaOperationException error() {
return err;
}

/**
* @return Propose message.
*/
public SchemaProposeDiscoveryMessage proposeMessage() {
return proposeMsg;
}

/**
* @param proposeMsg Propose message.
* @return <code>True</code> if message in no-op.
*/
public void proposeMessage(SchemaProposeDiscoveryMessage proposeMsg) {
this.proposeMsg = proposeMsg;
public boolean nop() {
return nop;
}

/**
* @return <code>True</code> if message in no-op.
* @param nop No-op flag.
*/
public boolean nop() {
return nop;
public void nop(boolean nop) {
this.nop = nop;
}

/** {@inheritDoc} */
@Override public String toString() {
return S.toString(SchemaFinishDiscoveryMessage.class, this, "parent", super.toString());
}

/** {@inheritDoc} */
@Override public short directType() {
return 501;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

package org.apache.ignite.internal.processors.query.schema.message;

import org.apache.ignite.internal.Order;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.processors.query.schema.SchemaOperationException;
import org.apache.ignite.internal.processors.query.schema.operation.SchemaAbstractOperation;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.lang.IgniteUuid;
Expand All @@ -32,13 +32,18 @@ public class SchemaProposeDiscoveryMessage extends SchemaAbstractDiscoveryMessag
private static final long serialVersionUID = 0L;

/** Cache deployment ID. */
@Order(value = 4, method = "deploymentId")
private IgniteUuid depId;

/** Error. */
private SchemaOperationException err;

/** Whether to perform exchange. */
private transient boolean exchange;
private boolean exchange;

/**
* Constructor.
*/
public SchemaProposeDiscoveryMessage() {
// No-op.
}

/**
* Constructor.
Expand Down Expand Up @@ -93,30 +98,6 @@ public boolean initialized() {
return deploymentId() != null || hasError();
}

/**
* Set error.
*
* @param err Error.
*/
public void onError(SchemaOperationException err) {
if (!hasError())
this.err = err;
}

/**
* @return {@code True} if error was reported during init.
*/
public boolean hasError() {
return err != null;
}

/**
* @return Error message (if any).
*/
@Nullable public SchemaOperationException error() {
return err;
}

/**
* @return Schema name.
*/
Expand All @@ -128,4 +109,9 @@ public String schemaName() {
@Override public String toString() {
return S.toString(SchemaProposeDiscoveryMessage.class, this, "parent", super.toString());
}

/** {@inheritDoc} */
@Override public short directType() {
return 500;
}
}
Loading
Loading