This repository was archived by the owner on Oct 27, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathStreamRouter.java
More file actions
72 lines (61 loc) · 2.69 KB
/
StreamRouter.java
File metadata and controls
72 lines (61 loc) · 2.69 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package kr.jadekim.rxjava.websocket.processor;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.functions.Action;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.functions.Function;
import kr.jadekim.rxjava.websocket.inbound.Inbound;
import kr.jadekim.rxjava.websocket.inbound.InboundParser;
import kr.jadekim.rxjava.websocket.listener.WebSocketEventListener;
import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.Map;
class StreamRouter {
private InboundParser<Object> parser;
private Observable<Inbound<Object>> mainStream;
private Map<Integer, ChannelDistributor> distributorMap;
private WebSocketEventListener listener;
StreamRouter(final InboundParser<Object> parser, Observable<String> inboundStream, final WebSocketEventListener listener) {
this.parser = parser;
this.mainStream = inboundStream
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
listener.onStartInboundStream();
}
})
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
listener.onErrorInboundStream(throwable);
}
})
.doFinally(new Action() {
@Override
public void run() throws Exception {
listener.onStopInboundStream();
}
})
.map(new Function<String, Inbound<Object>>() {
@Override
public Inbound<Object> apply(String s) throws Exception {
return parser.parse(s);
}
})
.share();
this.distributorMap = new HashMap<Integer, ChannelDistributor>();
this.listener = listener;
}
<Model> ChannelStream<Model> getStream(Subscription subscription) {
final String channel = subscription.getChannel();
final Type modelType = subscription.getResponseType();
final int key = subscription.getChannelId();
//noinspection unchecked
ChannelDistributor<Model> distributor = distributorMap.get(key);
if (distributor == null) {
distributor = new ChannelDistributor<Model>(channel, mainStream, parser, modelType, listener);
distributorMap.put(key, distributor);
}
return distributor.getStream(subscription);
}
}