This repository was archived by the owner on Oct 27, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathOkHttpConnection.java
More file actions
137 lines (107 loc) · 3.59 KB
/
OkHttpConnection.java
File metadata and controls
137 lines (107 loc) · 3.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package kr.jadekim.rxjava.websocket.connection.okhttp;
import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.ObservableEmitter;
import io.reactivex.rxjava3.core.ObservableOnSubscribe;
import kr.jadekim.rxjava.websocket.connection.Connection;
import kr.jadekim.rxjava.websocket.listener.WebSocketEventListener;
import okhttp3.*;
import okio.ByteString;
public class OkHttpConnection extends WebSocketListener implements Connection, ObservableOnSubscribe<String> {
private String url;
private OkHttpClient okHttpClient;
private Request request;
private WebSocket webSocket;
private ObservableEmitter<String> emitter;
private boolean isErrorPropagation;
private boolean isOpened = false;
private WebSocketEventListener listener;
public OkHttpConnection(String url, OkHttpClient okHttpClient, Request request, boolean isErrorPropagation, WebSocketEventListener listener) {
this.url = url;
this.okHttpClient = okHttpClient;
this.request = request;
this.isErrorPropagation = isErrorPropagation;
this.listener = listener;
}
@Override
public String getUrl() {
return url;
}
@Override
public Observable<String> getInboundStream() {
return Observable.create(this);
}
@Override
public boolean sendMessage(String message) {
if (webSocket == null) {
return false;
}
return webSocket.send(message);
}
@Override
public void disconnect() {
if (webSocket == null) {
return;
}
webSocket.close(1000, null);
webSocket = null;
}
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
this.emitter = emitter;
}
@Override
public void onOpen(WebSocket webSocket, Response response) {
this.isOpened = true;
listener.onConnectedSocket(this);
}
@Override
public void onMessage(WebSocket webSocket, String text) {
if (emitter != null) {
emitter.onNext(text);
}
}
@Override
public void onMessage(WebSocket webSocket, ByteString bytes) {
//do nothing (not support)
}
@Override
public void onClosing(WebSocket webSocket, int code, String reason) {
listener.onDisconnectSocket(this);
}
@Override
public void onClosed(WebSocket webSocket, int code, String reason) {
this.isOpened = false;
if (emitter != null) {
emitter.onComplete();
}
listener.onDisconnectedSocket(this);
}
@Override
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
this.isOpened = false;
if (isErrorPropagation && emitter != null) {
emitter.onError(new OkHttpWebSocketException(t, response));
}
listener.onErrorSocket(t);
if (!isErrorPropagation) {
listener.onDisconnectSocket(this);
disconnect();
listener.onDisconnectedSocket(this);
}
}
public OkHttpConnection connect() {
if (webSocket != null && isOpened) {
throw new IllegalStateException("Another socket is connected");
}
listener.onConnectSocket();
this.webSocket = okHttpClient.newWebSocket(request, this);
return this;
}
public static class OkHttpWebSocketException extends RuntimeException {
public final Response response;
OkHttpWebSocketException(Throwable cause, Response response) {
super(cause);
this.response = response;
}
}
}