diff --git a/app/src/main/AndroidManifest.xml b/app/src/main/AndroidManifest.xml index f3f70380..b1bda811 100644 --- a/app/src/main/AndroidManifest.xml +++ b/app/src/main/AndroidManifest.xml @@ -4,6 +4,7 @@ + server = null is InternalEvent.RtspServer.OnClientStats -> Unit // Intentional to trigger serverClientStats update + is InternalEvent.RtspServer.OnOnvifMessageReceived -> { + this@RtspStreamingService.onvifServerMessagesCount++ + } } } @@ -546,6 +551,7 @@ internal class RtspStreamingService( data class OnStart(override val generation: Long) : RtspServer(Priority.RECOVER_IGNORE) data class OnStop(override val generation: Long) : RtspServer(Priority.DESTROY_IGNORE) data class OnClientStats(override val generation: Long) : RtspServer(Priority.DESTROY_IGNORE) + data class OnOnvifMessageReceived(override val generation: Long) : RtspServer(Priority.DESTROY_IGNORE) } data class OnVideoFps(val fps: Int) : InternalEvent(Priority.DESTROY_IGNORE) @@ -743,6 +749,7 @@ internal class RtspStreamingService( selectedVideoEncoder = selectedVideoEncoderInfo, selectedAudioEncoder = selectedAudioEncoderInfo, serverClientStats = serverController?.statsSnapshot.orEmpty(), + onvifServerMessagesCount = onvifServerMessagesCount, error = currentError ) } diff --git a/rtsp/src/main/java/info/dvkr/screenstream/rtsp/internal/rtsp/server/OnvifServer.kt b/rtsp/src/main/java/info/dvkr/screenstream/rtsp/internal/rtsp/server/OnvifServer.kt new file mode 100644 index 00000000..52ec1e64 --- /dev/null +++ b/rtsp/src/main/java/info/dvkr/screenstream/rtsp/internal/rtsp/server/OnvifServer.kt @@ -0,0 +1,455 @@ +package info.dvkr.screenstream.rtsp.internal.rtsp.server + +import android.content.Context +import android.net.wifi.WifiManager +import com.elvishew.xlog.XLog +import info.dvkr.screenstream.common.getLog +import info.dvkr.screenstream.rtsp.internal.RtspNetInterface +import io.ktor.network.selector.SelectorManager +import io.ktor.network.sockets.aSocket +import io.ktor.network.sockets.isClosed +import io.ktor.network.sockets.openReadChannel +import io.ktor.network.sockets.openWriteChannel +import io.ktor.utils.io.ByteReadChannel +import io.ktor.utils.io.ByteWriteChannel +import io.ktor.utils.io.readUTF8Line +import io.ktor.utils.io.writeStringUtf8 +import io.ktor.utils.io.readFully +import info.dvkr.screenstream.rtsp.internal.AudioParams +import info.dvkr.screenstream.rtsp.internal.Codec +import info.dvkr.screenstream.rtsp.internal.VideoParams +import java.util.concurrent.atomic.AtomicReference +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.Job +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancelAndJoin +import kotlinx.coroutines.isActive +import kotlinx.coroutines.launch +import java.net.DatagramPacket +import java.net.Inet6Address +import java.net.InetAddress +import java.net.InetSocketAddress +import java.net.MulticastSocket +import java.net.NetworkInterface +import java.net.SocketTimeoutException +import java.util.UUID + +internal class OnvifServer( + private val context: Context, + private val rtspNetInterface: RtspNetInterface, + private val rtspPort: Int, + private val rtspPath: String, + private val onMessageReceived: () -> Unit +) { + private var scopeJob = SupervisorJob() + private var scope = CoroutineScope(scopeJob + Dispatchers.IO) + private var selectorManager: SelectorManager? = null + private var httpJob: Job? = null + private var wsDiscoveryJob: Job? = null + private var multicastLock: WifiManager.MulticastLock? = null + private var multicastSocket: MulticastSocket? = null + private var multicastSocketIPv6: MulticastSocket? = null + private var onvifHttpPort: Int = 0 + private val videoParams = AtomicReference() + private val audioParams = AtomicReference() + + internal fun setVideoData(codec: Codec.Video, sps: ByteArray, pps: ByteArray?, vps: ByteArray?) { + videoParams.set(VideoParams(codec, sps, pps, vps)) + } + + internal fun setAudioData(audioParams: AudioParams?) { + this.audioParams.set(audioParams) + } + + fun start() { + if (!scopeJob.isActive) { + scopeJob = SupervisorJob() + scope = CoroutineScope(scopeJob + Dispatchers.IO) + } + + val wifiManager = context.applicationContext.getSystemService(Context.WIFI_SERVICE) as? WifiManager + multicastLock = wifiManager?.createMulticastLock("ScreenStream:OnvifDiscovery") + multicastLock?.setReferenceCounted(true) + multicastLock?.acquire() + + selectorManager = SelectorManager(scope.coroutineContext) + + httpJob = scope.launch { + try { + val bindHost = rtspNetInterface.hostAddress.substringBefore('%') + var serverSocket = runCatching { aSocket(selectorManager!!).tcp().bind(bindHost, rtspPort + 1000) { reuseAddress = true } }.getOrNull() + if (serverSocket == null) { + serverSocket = aSocket(selectorManager!!).tcp().bind(bindHost, 0) { reuseAddress = true } + } + onvifHttpPort = (serverSocket.localAddress as io.ktor.network.sockets.InetSocketAddress).port + XLog.d(getLog("OnvifServer", "HTTP listening on $bindHost:$onvifHttpPort")) + + startWsDiscovery(bindHost) + + while (isActive) { + val socket = try { + serverSocket.accept() + } catch (e: Exception) { + if (!isActive) break + continue + } + + launch { + try { + handleHttpRequest(socket.openReadChannel(), socket.openWriteChannel(autoFlush = true), bindHost) + } catch (e: Exception) { + if (e !is CancellationException) { + XLog.w(getLog("OnvifServer", "HTTP Error: ${e.message}")) + } + } finally { + runCatching { socket.close() } + } + } + } + } catch (e: CancellationException) { + throw e + } catch (e: Exception) { + XLog.e(getLog("OnvifServer", "HTTP Server failed: ${e.message}")) + } + } + } + + suspend fun stop() { + runCatching { multicastSocket?.close() } + multicastSocket = null + + runCatching { multicastSocketIPv6?.close() } + multicastSocketIPv6 = null + + runCatching { multicastLock?.takeIf { it.isHeld }?.release() } + multicastLock = null + + runCatching { httpJob?.cancelAndJoin() } + httpJob = null + + runCatching { wsDiscoveryJob?.cancelAndJoin() } + wsDiscoveryJob = null + + runCatching { selectorManager?.close() } + selectorManager = null + + runCatching { scopeJob.cancel() } + } + + private fun startWsDiscovery(bindHost: String) { + val networkInterface = runCatching { NetworkInterface.getByInetAddress(rtspNetInterface.address) }.getOrNull() + + wsDiscoveryJob = scope.launch(Dispatchers.IO) { + try { + multicastSocket = MulticastSocket(3702).apply { + reuseAddress = true + soTimeout = 2000 + if (networkInterface != null) { + this.networkInterface = networkInterface + } + joinGroup(InetAddress.getByName("239.255.255.250")) + } + launch { runWsDiscoveryLoop(multicastSocket!!, bindHost) } + } catch (e: Exception) { + XLog.w(getLog("OnvifServer", "WS-Discovery IPv4 error: ${e.message}")) + } + + if (rtspNetInterface.address is Inet6Address) { + try { + multicastSocketIPv6 = MulticastSocket(3702).apply { + reuseAddress = true + soTimeout = 2000 + if (networkInterface != null) { + this.networkInterface = networkInterface + } + joinGroup(InetAddress.getByName("FF02::C")) + } + launch { runWsDiscoveryLoop(multicastSocketIPv6!!, bindHost) } + } catch (e: Exception) { + XLog.w(getLog("OnvifServer", "WS-Discovery IPv6 error: ${e.message}")) + } + } + } + } + + private fun CoroutineScope.runWsDiscoveryLoop(socket: MulticastSocket, bindHost: String) { + val buffer = ByteArray(4096) + while (isActive) { + val packet = DatagramPacket(buffer, buffer.size) + try { + socket.receive(packet) + } catch (e: SocketTimeoutException) { + continue + } catch (e: Exception) { + if (!isActive) break + continue + } + + val request = String(packet.data, packet.offset, packet.length) + if (request.contains("Probe", ignoreCase = true)) { + onMessageReceived() + val messageId = extractXmlTag(request, "MessageID") ?: "uuid:${UUID.randomUUID()}" + val response = buildProbeMatch(messageId, bindHost) + val responseBytes = response.toByteArray() + val responsePacket = DatagramPacket(responseBytes, responseBytes.size, packet.address, packet.port) + socket.send(responsePacket) + } + } + } + + private suspend fun handleHttpRequest(readChannel: ByteReadChannel, writeChannel: ByteWriteChannel, bindHost: String) { + val requestLine = readChannel.readUTF8Line() ?: return + val parts = requestLine.split(" ") + if (parts.size < 3) return + val method = parts[0] + val path = parts[1] + + var contentLength = 0 + while (true) { + val line = readChannel.readUTF8Line() ?: break + if (line.isEmpty()) break + val lowerLine = line.lowercase() + if (lowerLine.startsWith("content-length:")) { + contentLength = lowerLine.substringAfter(":").trim().toIntOrNull() ?: 0 + } + } + + if (method != "POST" || contentLength == 0) { + sendHttpResponse(writeChannel, 404, "Not Found", "") + return + } + + val bodyBytes = ByteArray(contentLength) + readChannel.readFully(bodyBytes, 0, contentLength) + val body = String(bodyBytes) + onMessageReceived() + + val responseXml = when { + "<.*:?GetCapabilities.*>".toRegex().containsMatchIn(body) -> buildGetCapabilitiesResponse(bindHost) + "<.*:?GetServices.*>".toRegex().containsMatchIn(body) -> buildGetServicesResponse(bindHost) + "<.*:?GetDeviceInformation.*>".toRegex().containsMatchIn(body) -> buildGetDeviceInformationResponse() + "<.*:?GetProfiles.*>".toRegex().containsMatchIn(body) -> buildGetProfilesResponse() + "<.*:?GetStreamUri.*>".toRegex().containsMatchIn(body) -> buildGetStreamUriResponse(bindHost) + "<.*:?GetVideoSources.*>".toRegex().containsMatchIn(body) -> buildGetVideoSourcesResponse() + "<.*:?GetAudioSources.*>".toRegex().containsMatchIn(body) -> buildGetAudioSourcesResponse() + else -> buildFaultResponse() + } + + sendHttpResponse(writeChannel, 200, "OK", responseXml) + } + + private suspend fun sendHttpResponse(writeChannel: ByteWriteChannel, statusCode: Int, statusText: String, body: String) { + val response = """ + HTTP/1.1 $statusCode $statusText + Content-Type: application/soap+xml; charset=utf-8 + Content-Length: ${body.toByteArray().size} + Connection: close + + $body + """.trimIndent() + + writeChannel.writeStringUtf8(response.replace("\n", "\r\n")) + } + + private fun extractXmlTag(xml: String, tag: String): String? { + val startTag = "<.*?:?$tag.*?>".toRegex() + val endTag = "".toRegex() + val startMatch = startTag.find(xml) ?: return null + val endMatch = endTag.find(xml, startMatch.range.last) ?: return null + return xml.substring(startMatch.range.last + 1, endMatch.range.first) + } + + private fun buildProbeMatch(messageId: String, bindHost: String): String { + val xAddr = "http://$bindHost:$onvifHttpPort/onvif/device_service" + return """ + + + urn:uuid:${UUID.randomUUID()} + $messageId + http://schemas.xmlsoap.org/ws/2004/08/addressing/role/anonymous + http://schemas.xmlsoap.org/ws/2005/04/discovery/ProbeMatches + + + + + + urn:uuid:12345678-1234-1234-1234-1234567890ab + + tds:Device + onvif://www.onvif.org/type/NetworkVideoTransmitter + $xAddr + 1 + + + +""" + } + + private fun buildGetCapabilitiesResponse(bindHost: String): String { + val mediaXAddr = "http://$bindHost:$onvifHttpPort/onvif/media_service" + return """ + + + + + + $mediaXAddr + + false + true + true + + + + + +""" + } + + private fun buildGetServicesResponse(bindHost: String): String { + val deviceXAddr = "http://$bindHost:$onvifHttpPort/onvif/device_service" + val mediaXAddr = "http://$bindHost:$onvifHttpPort/onvif/media_service" + return """ + + + + + http://www.onvif.org/ver10/device/wsdl + $deviceXAddr + + 2 + 0 + + + + http://www.onvif.org/ver20/media/wsdl + $mediaXAddr + + 2 + 0 + + + + +""" + } + + private fun buildGetDeviceInformationResponse(): String { + return """ + + + + ScreenStream + ScreenStream ONVIF Server + 1.0 + 1234567890 + 1.0 + + +""" + } + + private fun buildGetProfilesResponse(): String { + val codecName = when (videoParams.get()?.codec) { + Codec.Video.H264 -> "H264" + Codec.Video.H265 -> "H265" + Codec.Video.AV1 -> "AV1" + else -> "H264" + } + return """ + + + + + MainProfile + + VideoSourceConfig + 1 + VideoSource_1 + + + + AudioSourceConfig + 1 + AudioSource_1 + + + VideoEncoder + $codecName + + 1920 + 1080 + + 50 + + + + +""" + } + + private fun buildGetStreamUriResponse(bindHost: String): String { + val streamUrl = rtspNetInterface.buildUrl(rtspPort, rtspPath).replace("&", "&") + return """ + + + + + $streamUrl + false + false + PT0S + + + +""" + } + + private fun buildGetVideoSourcesResponse(): String { + return """ + + + + + 30 + + 1920 + 1080 + + + + +""" + } + + private fun buildGetAudioSourcesResponse(): String { + return """ + + + + + 1 + + + +""" + } + + private fun buildFaultResponse(): String { + return """ + + + + + env:Receiver + + + Action not supported + + + +""" + } +} diff --git a/rtsp/src/main/java/info/dvkr/screenstream/rtsp/internal/rtsp/server/RtspServer.kt b/rtsp/src/main/java/info/dvkr/screenstream/rtsp/internal/rtsp/server/RtspServer.kt index ae77106d..67d735d5 100644 --- a/rtsp/src/main/java/info/dvkr/screenstream/rtsp/internal/rtsp/server/RtspServer.kt +++ b/rtsp/src/main/java/info/dvkr/screenstream/rtsp/internal/rtsp/server/RtspServer.kt @@ -32,6 +32,7 @@ import java.net.SocketException import java.util.concurrent.atomic.AtomicReference internal class RtspServer( + private val context: android.content.Context, private val appVersion: String, private val generation: Long, private val onEvent: (RtspStreamingService.InternalEvent) -> Unit, @@ -46,6 +47,7 @@ internal class RtspServer( private val serverSockets: MutableList = mutableListOf() private val rtspServerConnections = mutableListOf() + private val onvifServers = mutableListOf() private val videoParams = AtomicReference() private val audioParams = AtomicReference() @@ -58,6 +60,8 @@ internal class RtspServer( if (videoParams.get()?.contentEquals(newParams) == true) return this.videoParams.set(newParams) + onvifServers.forEach { it.setVideoData(videoCodec, sps, pps, vps) } + val snapshot = synchronized(rtspServerConnections) { rtspServerConnections.toList() } if (snapshot.isNotEmpty()) { onRequestKeyFrame() @@ -67,6 +71,7 @@ internal class RtspServer( internal fun setAudioData(audioParams: AudioParams?) { this.audioParams.set(audioParams) + onvifServers.forEach { it.setAudioData(audioParams) } } internal fun clearMediaParams() { @@ -123,6 +128,14 @@ internal class RtspServer( onEvent(RtspStreamingService.InternalEvent.RtspServer.OnStart(generation)) bindResult.boundSockets.forEach { launchAcceptor(it, selectorManager, port, path, protocol) } + + addresses.forEach { netInterface -> + val onvifServer = OnvifServer(context, netInterface, port, path) { + onEvent(RtspStreamingService.InternalEvent.RtspServer.OnOnvifMessageReceived(generation)) + } + onvifServers.add(onvifServer) + onvifServer.start() + } } catch (e: CancellationException) { throw e } catch (e: Throwable) { @@ -226,6 +239,8 @@ internal class RtspServer( runCatching { serverJob?.cancelAndJoin() } serverJob = null runCatching { scopeJob.cancel() } + onvifServers.forEach { runCatching { it.stop() } } + onvifServers.clear() onEvent(RtspStreamingService.InternalEvent.RtspServer.OnStop(generation)) } diff --git a/rtsp/src/main/java/info/dvkr/screenstream/rtsp/ui/RtspMainScreenUI.kt b/rtsp/src/main/java/info/dvkr/screenstream/rtsp/ui/RtspMainScreenUI.kt index d89d2180..5e9feaa6 100644 --- a/rtsp/src/main/java/info/dvkr/screenstream/rtsp/ui/RtspMainScreenUI.kt +++ b/rtsp/src/main/java/info/dvkr/screenstream/rtsp/ui/RtspMainScreenUI.kt @@ -112,6 +112,7 @@ internal fun RtspMainScreenUI( isStreaming = state.isStreaming, serverBindings = state.serverBindings, clientStatus = state.clientStatus, + onvifServerMessagesCount = state.onvifServerMessagesCount, error = state.error, modifier = Modifier.padding(8.dp) ) diff --git a/rtsp/src/main/java/info/dvkr/screenstream/rtsp/ui/main/cards/ModeCard.kt b/rtsp/src/main/java/info/dvkr/screenstream/rtsp/ui/main/cards/ModeCard.kt index 5ebd29e1..ae6e4850 100644 --- a/rtsp/src/main/java/info/dvkr/screenstream/rtsp/ui/main/cards/ModeCard.kt +++ b/rtsp/src/main/java/info/dvkr/screenstream/rtsp/ui/main/cards/ModeCard.kt @@ -44,6 +44,7 @@ internal fun ModeCard( isStreaming: Boolean, serverBindings: List, clientStatus: RtspClientStatus, + onvifServerMessagesCount: Int, error: RtspError?, modifier: Modifier = Modifier, ) { @@ -78,6 +79,7 @@ internal fun ModeCard( serverBindings = serverBindings, error = error, isStreaming = isStreaming, + onvifServerMessagesCount = onvifServerMessagesCount, sendEvent = sendEvent ) diff --git a/rtsp/src/main/java/info/dvkr/screenstream/rtsp/ui/main/mode/ServerMode.kt b/rtsp/src/main/java/info/dvkr/screenstream/rtsp/ui/main/mode/ServerMode.kt index 57100b40..f9dfeb2c 100644 --- a/rtsp/src/main/java/info/dvkr/screenstream/rtsp/ui/main/mode/ServerMode.kt +++ b/rtsp/src/main/java/info/dvkr/screenstream/rtsp/ui/main/mode/ServerMode.kt @@ -57,6 +57,7 @@ internal fun ServerMode( serverBindings: List, error: RtspError?, isStreaming: Boolean, + onvifServerMessagesCount: Int, sendEvent: (RtspEvent) -> Unit, modifier: Modifier = Modifier ) { @@ -85,6 +86,16 @@ internal fun ServerMode( } } } + + if (isStreaming && onvifServerMessagesCount > 0) { + HorizontalDivider(modifier = Modifier.fillMaxWidth()) + Text( + text = stringResource(id = R.string.rtsp_onvif_requests, onvifServerMessagesCount), + modifier = Modifier.padding(12.dp), + color = MaterialTheme.colorScheme.secondary, + style = MaterialTheme.typography.bodyMedium + ) + } } } diff --git a/rtsp/src/main/java/info/dvkr/screenstream/rtsp/ui/models.kt b/rtsp/src/main/java/info/dvkr/screenstream/rtsp/ui/models.kt index 9b828a8e..f0e5e9e7 100644 --- a/rtsp/src/main/java/info/dvkr/screenstream/rtsp/ui/models.kt +++ b/rtsp/src/main/java/info/dvkr/screenstream/rtsp/ui/models.kt @@ -35,10 +35,11 @@ internal data class RtspState( val serverBindings: List = emptyList(), val serverClientStats: List = emptyList(), val clientStatus: RtspClientStatus = RtspClientStatus.IDLE, + val onvifServerMessagesCount: Int = 0, val error: RtspError? = null ) { override fun toString(): String = - "RtspState(mode=$mode busy=$isBusy wait=$waitingCastPermission start=$startAttemptId str=$isStreaming srvClients=${serverClientStats.size} client=$clientStatus err=$error)" + "RtspState(mode=$mode busy=$isBusy wait=$waitingCastPermission start=$startAttemptId str=$isStreaming srvClients=${serverClientStats.size} client=$clientStatus onvif=$onvifServerMessagesCount err=$error)" } @Immutable diff --git a/rtsp/src/main/res/values/strings.xml b/rtsp/src/main/res/values/strings.xml index 7ebd88e0..7f58c59e 100644 --- a/rtsp/src/main/res/values/strings.xml +++ b/rtsp/src/main/res/values/strings.xml @@ -117,4 +117,5 @@ Server port Set port for incoming connections Set server port for incoming connections.\nValues: 1025–65535\nDefault: 8554 + ONVIF requests received: %1$d