Skip to content

Commit 77cf5e8

Browse files
authored
Add binary message support (#7)
* passing more Java version's tests * support binary message and ack * add ws binary test * fix test * fix broadcast case
1 parent ecfd74c commit 77cf5e8

File tree

33 files changed

+1766
-1306
lines changed

33 files changed

+1766
-1306
lines changed

README.md

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ IO.socket("http://localhost:3000", IO.Options()) { socket ->
4848
socket.on(Socket.EVENT_CONNECT) { args ->
4949
println("on connect ${args.joinToString()}")
5050

51-
socket.emit("echo", 1, "2", GMTDate())
51+
val bin = UnsafeByteStringOperations.wrapUnsafe(byteArrayOf(0x1, 0x3, 0x1, 0x4))
52+
socket.emit("echo", 1, "2", bin, GMTDate())
5253
}.on("echoBack") { args ->
5354
println("on echoBack ${args.joinToString()}")
5455
}
@@ -57,6 +58,29 @@ IO.socket("http://localhost:3000", IO.Options()) { socket ->
5758
}
5859
```
5960

61+
Most of the APIs are the same as socket.io-client-java, here are some differences:
62+
63+
- Create socket is asynchronous, to make it's easier to guarantee thread safety.
64+
- Binary messages can't be nested, because `emit` only accepts String/Boolean/Number/JsonElement/ByteString, other types will be converted to String using `toString()`, so there is no way to put ByteString in JsonElement.
65+
66+
### Set logging callback
67+
68+
```kotlin
69+
Logger.setLogger(object : LoggerInterface {
70+
override fun debug(tag: String, log: String) {
71+
TODO("Not yet implemented")
72+
}
73+
74+
override fun info(tag: String, log: String) {
75+
TODO("Not yet implemented")
76+
}
77+
78+
override fun error(tag: String, log: String) {
79+
TODO("Not yet implemented")
80+
}
81+
})
82+
```
83+
6084
## Example
6185

6286
### Android

build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ plugins {
2020
alias(libs.plugins.versionUpdate)
2121
}
2222

23+
// ./gradlew versionCatalogUpdate
2324
versionCatalogUpdate {
2425
sortByKey = false
2526
keep {

example/shared/src/commonMain/kotlin/com/piasy/kmp/socketio/example/Greeting.kt

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,37 @@
11
package com.piasy.kmp.socketio.example
22

3+
import com.piasy.kmp.socketio.logging.Logger
4+
import com.piasy.kmp.socketio.logging.LoggerInterface
35
import com.piasy.kmp.socketio.socketio.IO
46
import com.piasy.kmp.socketio.socketio.Socket
57
import io.ktor.util.date.GMTDate
8+
import kotlinx.io.bytestring.ByteString
9+
import kotlinx.io.bytestring.unsafe.UnsafeByteStringApi
10+
import kotlinx.io.bytestring.unsafe.UnsafeByteStringOperations
611

712
class Greeting {
13+
@OptIn(UnsafeByteStringApi::class)
814
fun greet() {
15+
Logger.setLogger(object : LoggerInterface {
16+
override fun debug(tag: String, log: String) {
17+
TODO("Not yet implemented")
18+
}
19+
20+
override fun info(tag: String, log: String) {
21+
TODO("Not yet implemented")
22+
}
23+
24+
override fun error(tag: String, log: String) {
25+
TODO("Not yet implemented")
26+
}
27+
})
28+
929
IO.socket("http://172.16.11.25:3000", IO.Options()) { socket ->
1030
socket.on(Socket.EVENT_CONNECT) { args ->
1131
println("on connect ${args.joinToString()}")
1232

13-
socket.emit("echo", 1, "2", GMTDate())
33+
val bin = UnsafeByteStringOperations.wrapUnsafe(byteArrayOf(0x1, 0x3, 0x1, 0x4))
34+
socket.emit("echo", 1, "2", bin, GMTDate())
1435
}.on("echoBack") { args ->
1536
println("on echoBack ${args.joinToString()}")
1637
}

gradle/libs.versions.toml

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,21 @@ iosDeploymentTarget = "14.0"
44
compileSdk = "35"
55
minSdk = "21"
66
targetSdk = "35"
7-
agp = "8.7.2"
7+
agp = "8.8.0"
88
kotlin = "2.1.0"
9-
mockk = "1.13.13"
9+
mockk = "1.13.16"
1010
ktor = "3.0.3"
1111
coroutine = "1.10.1"
1212
compose = "1.7.5"
1313

1414
[libraries]
1515
junit = "junit:junit:4.13.2"
16-
hamcrest = "org.hamcrest:hamcrest-library:1.3"
16+
hamcrest = "org.hamcrest:hamcrest-library:3.0"
1717
kotlin-test = { module = "org.jetbrains.kotlin:kotlin-test", version.ref = "kotlin" }
1818
mockk = { module = "io.mockk:mockk", version.ref = "mockk" }
1919
kotlinx-coroutines-test = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-test", version.ref = "coroutine" }
20-
json = "org.json:json:20241224"
21-
22-
socketioParser = "org.hildan.socketio:socketio-kotlin:1.6.0"
20+
json = "org.json:json:20250107"
21+
socketioParser = "org.hildan.socketio:socketio-kotlin:2.0.0"
2322
kotlinx-coroutines-core = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-core", version.ref = "coroutine" }
2423
ktor-client-core = { module = "io.ktor:ktor-client-core", version.ref = "ktor" }
2524
ktor-client-websockets = { module = "io.ktor:ktor-client-websockets", version.ref = "ktor" }
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#Wed Nov 16 22:27:43 CST 2022
22
distributionBase=GRADLE_USER_HOME
3-
distributionUrl=https\://services.gradle.org/distributions/gradle-8.9-all.zip
3+
distributionUrl=https\://services.gradle.org/distributions/gradle-8.12-all.zip
44
distributionPath=wrapper/dists
55
zipStorePath=wrapper/dists
66
zipStoreBase=GRADLE_USER_HOME

kmp-socketio/src/commonMain/kotlin/com/piasy/kmp/socketio/engineio/EngineSocket.kt

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ class EngineSocket(
1919
@JvmField internal val opt: Options,
2020
private val scope: CoroutineScope,
2121
private val factory: TransportFactory = DefaultTransportFactory,
22+
private val rawMessage: Boolean = false,
2223
) : Emitter() {
2324
open class Options : Transport.Options() {
2425
/**
@@ -42,7 +43,7 @@ class EngineSocket(
4243

4344
internal var disablePingTimeout = false // to help unit test
4445
private var state = State.INIT
45-
private var id = ""
46+
internal var id = ""
4647
private var upgrades = emptyList<String>()
4748
private var pingInterval = 0
4849
private var pingTimeout = 0
@@ -134,7 +135,12 @@ class EngineSocket(
134135
*/
135136
@WorkThread
136137
fun send(packet: EngineIOPacket<*>) {
137-
sendPacket(packet)
138+
sendPackets(listOf(packet))
139+
}
140+
141+
@WorkThread
142+
fun send(packets: List<EngineIOPacket<*>>) {
143+
sendPackets(packets)
138144
}
139145

140146
/**
@@ -212,7 +218,7 @@ class EngineSocket(
212218
opts.timestampParam = options?.timestampParam ?: opt.timestampParam
213219
opts.extraHeaders = opt.extraHeaders
214220

215-
val transport = factory.create(name, opts, scope)
221+
val transport = factory.create(name, opts, scope, rawMessage)
216222
emit(EVENT_TRANSPORT, transport)
217223
return transport
218224
}
@@ -283,15 +289,15 @@ class EngineSocket(
283289
}
284290

285291
@WorkThread
286-
private fun sendPacket(packet: EngineIOPacket<*>) {
287-
Logger.debug(TAG, "sendPacket: state $state, pkt $packet")
292+
private fun sendPackets(packets: List<EngineIOPacket<*>>) {
293+
Logger.debug(TAG, "sendPackets: state $state, $packets")
288294
if (state != State.OPENING && state != State.OPEN) {
289-
Logger.error(TAG, "sendPacket at wrong state: $state")
295+
Logger.error(TAG, "sendPackets at wrong state: $state")
290296
return
291297
}
292298

293-
emit(EVENT_PACKET_CREATE, packet)
294-
writeBuffer.addLast(packet)
299+
emit(EVENT_PACKET_CREATE, packets.size)
300+
writeBuffer.addAll(packets)
295301
flush()
296302
}
297303

@@ -310,14 +316,14 @@ class EngineSocket(
310316
is EngineIOPacket.Open -> onHandshake(packet)
311317
is EngineIOPacket.Ping -> {
312318
emit(EVENT_PING)
313-
sendPacket(EngineIOPacket.Pong(null))
319+
sendPackets(listOf(EngineIOPacket.Pong(null)))
314320
}
315321

322+
is EngineIOPacket.BinaryData -> emit(EVENT_DATA, packet.payload)
316323
is EngineIOPacket.Message<*> -> {
317324
val data = packet.payload
318325
if (data != null) {
319326
emit(EVENT_DATA, data)
320-
emit(EVENT_MESSAGE, data)
321327
}
322328
}
323329

@@ -357,7 +363,7 @@ class EngineSocket(
357363
emit(EVENT_OPEN)
358364
flush()
359365

360-
if (opt.upgrade && transport?.name == PollingXHR.NAME) {
366+
if (opt.upgrade && transport?.name == PollingXHR.NAME && upgrades.isNotEmpty()) {
361367
Logger.info(TAG, "starting upgrade probes")
362368
for (upgrade in upgrades) {
363369
probe(upgrade)
@@ -591,7 +597,7 @@ class EngineSocket(
591597
/**
592598
* Called when data is received from the server.
593599
*/
594-
const val EVENT_MESSAGE = "message"
600+
const val EVENT_DATA = "data"
595601

596602
/**
597603
* Called when an error occurs.
@@ -618,7 +624,6 @@ class EngineSocket(
618624
const val EVENT_PACKET = "packet"
619625
const val EVENT_PACKET_CREATE = "packetCreate"
620626
const val EVENT_HEARTBEAT = "heartbeat"
621-
const val EVENT_DATA = "data"
622627
const val EVENT_PING = "ping"
623628

624629
/**

kmp-socketio/src/commonMain/kotlin/com/piasy/kmp/socketio/engineio/Transport.kt

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,14 @@ import com.piasy.kmp.socketio.logging.Logger
55
import com.piasy.kmp.socketio.parseqs.ParseQS
66
import io.ktor.util.date.*
77
import kotlinx.coroutines.CoroutineScope
8-
import org.hildan.socketio.EngineIO
98
import org.hildan.socketio.EngineIOPacket
109
import kotlin.jvm.JvmField
1110

1211
abstract class Transport(
1312
internal val opt: Options,
1413
protected val scope: CoroutineScope,
1514
val name: String,
15+
protected val rawMessage: Boolean,
1616
) : Emitter() {
1717
open class Options {
1818
// @JvmField for jvm test code
@@ -60,7 +60,7 @@ abstract class Transport(
6060
if (state == State.OPEN) {
6161
doSend(packets)
6262
} else {
63-
throw RuntimeException("Transport not open")
63+
onError("Transport not open")
6464
}
6565
}
6666

@@ -88,21 +88,6 @@ abstract class Transport(
8888
}
8989
}
9090

91-
@WorkThread
92-
protected fun onWsData(data: String) {
93-
logD("onData: `$data`")
94-
if (stringMessagePayloadForTesting) {
95-
onPacket(EngineIO.decodeWsFrame(data, deserializePayload = { it }))
96-
} else {
97-
onPacket(EngineIO.decodeSocketIO(data))
98-
}
99-
}
100-
101-
@WorkThread
102-
protected fun onWsData(data: ByteArray) {
103-
// TODO: binary
104-
}
105-
10691
@WorkThread
10792
protected fun onPacket(packet: EngineIOPacket<*>) {
10893
logD("onPacket $packet")
@@ -184,7 +169,6 @@ abstract class Transport(
184169
const val EVENT_REQUEST_HEADERS: String = "requestHeaders"
185170
const val EVENT_RESPONSE_HEADERS: String = "responseHeaders"
186171

187-
internal var stringMessagePayloadForTesting = false
188172
private const val TAG = "Transport"
189173
}
190174
}

kmp-socketio/src/commonMain/kotlin/com/piasy/kmp/socketio/engineio/transports/PollingXHR.kt

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,15 @@ import io.ktor.util.*
1111
import kotlinx.coroutines.CoroutineScope
1212
import kotlinx.coroutines.Dispatchers
1313
import kotlinx.coroutines.launch
14-
import org.hildan.socketio.EngineIO
15-
import org.hildan.socketio.EngineIOPacket
16-
import org.hildan.socketio.SocketIO
17-
import org.hildan.socketio.SocketIOPacket
14+
import org.hildan.socketio.*
1815

1916
open class PollingXHR(
2017
opt: Options,
2118
scope: CoroutineScope,
2219
private val ioScope: CoroutineScope = CoroutineScope(Dispatchers.Default),
2320
private val factory: HttpClientFactory = DefaultHttpClientFactory,
24-
) : Transport(opt, scope, NAME) {
21+
rawMessage: Boolean,
22+
) : Transport(opt, scope, NAME, rawMessage) {
2523
private var polling = false
2624

2725
@WorkThread
@@ -138,10 +136,17 @@ open class PollingXHR(
138136
@WorkThread
139137
private fun onPollComplete(data: String) {
140138
logD("onPollComplete: state $state, `$data`")
141-
val packets = if (stringMessagePayloadForTesting) {
142-
EngineIO.decodeHttpBatch(data, deserializeTextPayload = { it })
143-
} else {
144-
EngineIO.decodeHttpBatch(data, SocketIO::decode)
139+
val packets = try {
140+
if (rawMessage) {
141+
EngineIO.decodeHttpBatch(data, deserializePayload = { it })
142+
} else {
143+
EngineIO.decodeHttpBatch(data, SocketIO::decode)
144+
}
145+
} catch (e: InvalidSocketIOPacketException) {
146+
val log = "onPollComplete decode error: ${e.message}"
147+
logE(log)
148+
onError(log)
149+
return
145150
}
146151
for (pkt in packets) {
147152
if ((state == State.OPENING || state == State.CLOSING) && pkt is EngineIOPacket.Open) {
@@ -171,17 +176,10 @@ open class PollingXHR(
171176
@WorkThread
172177
override fun doSend(packets: List<EngineIOPacket<*>>) {
173178
writable = false
174-
@Suppress("UNCHECKED_CAST")
175-
val data = if (stringMessagePayloadForTesting) {
176-
EngineIO.encodeHttpBatch(
177-
packets as List<EngineIOPacket<String>>,
178-
serializePayload = { it }
179-
)
179+
val data = if (rawMessage) {
180+
EngineIO.encodeHttpBatch(packets, serializePayload = { it.toString() })
180181
} else {
181-
EngineIO.encodeHttpBatch(
182-
packets as List<EngineIOPacket<SocketIOPacket>>,
183-
SocketIO::encode
184-
)
182+
EngineIO.encodeHttpBatch(packets, serializePayload = { SocketIO.encode(it as SocketIOPacket) })
185183
}
186184

187185
val method = HttpMethod.Post

0 commit comments

Comments
 (0)