This repository was archived by the owner on Oct 27, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathMessageSender.java
More file actions
79 lines (64 loc) · 2.86 KB
/
MessageSender.java
File metadata and controls
79 lines (64 loc) · 2.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package kr.jadekim.rxjava.websocket.processor;
import io.reactivex.rxjava3.core.Completable;
import io.reactivex.rxjava3.core.CompletableEmitter;
import io.reactivex.rxjava3.core.CompletableOnSubscribe;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.subjects.PublishSubject;
import kr.jadekim.rxjava.websocket.connection.Connection;
import kr.jadekim.rxjava.websocket.listener.WebSocketEventListener;
import kr.jadekim.rxjava.websocket.outbound.OutboundSerializer;
import java.io.IOException;
import java.util.Map;
class MessageSender {
private Connection connection;
private OutboundSerializer serializer;
private WebSocketEventListener listener;
private PublishSubject<MessageQueueItem> subject = PublishSubject.create();
MessageSender(Connection connection, OutboundSerializer serializer, WebSocketEventListener listener) {
this.connection = connection;
this.serializer = serializer;
this.listener = listener;
//noinspection ResultOfMethodCallIgnored
subject
.subscribeOn(Schedulers.newThread())
.subscribe(new Consumer<MessageQueueItem>() {
@Override
public void accept(MessageQueueItem messageQueueItem) throws Exception {
messageQueueItem.run();
}
});
}
Completable sendMessage(String messageType, Map<String, Object> parameterMap) {
return Completable.create(new MessageQueueItem(messageType, parameterMap));
}
private class MessageQueueItem implements CompletableOnSubscribe {
private String messageType;
private Map<String, Object> parameterMap;
private CompletableEmitter emitter;
public MessageQueueItem(String messageType, Map<String, Object> parameterMap) {
this.messageType = messageType;
this.parameterMap = parameterMap;
}
@Override
public void subscribe(CompletableEmitter emitter) throws Exception {
this.emitter = emitter;
listener.onAddMessageQueue(messageType, parameterMap);
subject.onNext(this);
}
public void run() {
if (emitter == null || emitter.isDisposed()) {
return;
}
String message = serializer.serialize(messageType, parameterMap);
listener.onSendMessage(messageType, parameterMap, message);
if (connection.sendMessage(message)) {
listener.onCompleteSendMessage(messageType, parameterMap, message);
emitter.onComplete();
} else {
listener.onErrorSendMessage(messageType, parameterMap, message);
emitter.onError(new IOException("Fail to send message : " + message));
}
}
}
}