From a8bf75a977173ecd381aa820610aac9ce1ef40db Mon Sep 17 00:00:00 2001 From: kraftwerk28 Date: Tue, 29 Jun 2021 22:40:23 +0300 Subject: [PATCH] Close connection in job cancellation --- build.gradle.kts | 2 +- .../spigot_tg_bridge/ApiService.kt | 5 +- .../kraftwerk28/spigot_tg_bridge/Plugin.kt | 16 +++-- .../org/kraftwerk28/spigot_tg_bridge/TgBot.kt | 67 ++++++++++--------- 4 files changed, 49 insertions(+), 41 deletions(-) diff --git a/build.gradle.kts b/build.gradle.kts index 3be5592..d9cd66b 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -37,7 +37,7 @@ repositories { val tgBotVersion = "6.0.4" val retrofitVersion = "2.7.1" -val plugDir = "MinecraftServers/spigot_1.17/plugins/" +val plugDir = "MinecraftServers/spigot_1.16.5/plugins/" val homeDir = System.getProperty("user.home") tasks { diff --git a/src/main/kotlin/org/kraftwerk28/spigot_tg_bridge/ApiService.kt b/src/main/kotlin/org/kraftwerk28/spigot_tg_bridge/ApiService.kt index abda980..19e1b52 100644 --- a/src/main/kotlin/org/kraftwerk28/spigot_tg_bridge/ApiService.kt +++ b/src/main/kotlin/org/kraftwerk28/spigot_tg_bridge/ApiService.kt @@ -1,6 +1,7 @@ package org.kraftwerk28.spigot_tg_bridge import com.google.gson.annotations.SerializedName as Name +import retrofit2.Call import retrofit2.http.* interface TgApiService { @@ -56,11 +57,11 @@ interface TgApiService { ): TgResponse @GET("getUpdates") - suspend fun getUpdates( + fun getUpdates( @Query("offset") offset: Long, @Query("limit") limit: Int = 100, @Query("timeout") timeout: Int = 0, - ): TgResponse> + ): Call>> @GET("getMe") suspend fun getMe(): TgResponse diff --git a/src/main/kotlin/org/kraftwerk28/spigot_tg_bridge/Plugin.kt b/src/main/kotlin/org/kraftwerk28/spigot_tg_bridge/Plugin.kt index 1c298f3..7c13e3f 100644 --- a/src/main/kotlin/org/kraftwerk28/spigot_tg_bridge/Plugin.kt +++ b/src/main/kotlin/org/kraftwerk28/spigot_tg_bridge/Plugin.kt @@ -22,10 +22,12 @@ class Plugin : JavaPlugin() { return val cmdHandler = CommandHandler(this) - tgBot = TgBot(this, config) + loadBot() + tgBot?.let { bot -> + val eventHandler = EventHandler(bot, config) + server.pluginManager.registerEvents(eventHandler, this) + } getCommand(C.COMMANDS.PLUGIN_RELOAD)?.setExecutor(cmdHandler) - val eventHandler = EventHandler(tgBot!!, config) - server.pluginManager.registerEvents(eventHandler, this) // Notify Telegram groups about server start config.serverStartMessage?.let { message -> @@ -34,6 +36,11 @@ class Plugin : JavaPlugin() { logger.info("Plugin started.") } + fun loadBot() { + tgBot?.let { it.stop() } + tgBot = TgBot(this, config) + } + override fun onDisable() { if (!config.isEnabled) return config.serverStopMessage?.let { @@ -55,8 +62,7 @@ class Plugin : JavaPlugin() { fun reload() { logger.info(C.INFO.reloading) config.reload(this) - tgBot?.stop() - tgBot = TgBot(this, config) + loadBot() logger.info(C.INFO.reloadComplete) } } diff --git a/src/main/kotlin/org/kraftwerk28/spigot_tg_bridge/TgBot.kt b/src/main/kotlin/org/kraftwerk28/spigot_tg_bridge/TgBot.kt index cf41360..397254a 100644 --- a/src/main/kotlin/org/kraftwerk28/spigot_tg_bridge/TgBot.kt +++ b/src/main/kotlin/org/kraftwerk28/spigot_tg_bridge/TgBot.kt @@ -6,6 +6,7 @@ import kotlinx.coroutines.channels.* import okhttp3.OkHttpClient import okhttp3.logging.HttpLoggingInterceptor import retrofit2.Retrofit +import retrofit2.Call import retrofit2.converter.gson.GsonConverterFactory import java.time.Duration @@ -32,13 +33,8 @@ class TgBot( } init { - val interceptor = HttpLoggingInterceptor().apply { - level = HttpLoggingInterceptor.Level.NONE; - } - client = OkHttpClient .Builder() - .addInterceptor(interceptor) .readTimeout(Duration.ZERO) .build(); @@ -65,40 +61,45 @@ class TgBot( api.setMyCommands(commands) } - pollJob = scope.launch { - try { - while (true) { - try { - pollUpdates() - } catch (e: Exception) { - e.printStackTrace() + pollJob = initPolling() + handlerJob = initHandler() + } + + private fun initPolling() = scope.launch { + var request: + Call>>? = null + try { + while (true) { + try { + request = api.getUpdates( + offset = currentOffset, + timeout = pollTimeout, + ) + val response = request.execute().body() + response?.result?.let { updates -> + if (!updates.isEmpty()) { + updates.forEach { updateChan.send(it) } + currentOffset = updates.last().updateId + 1 + } } + } catch (e: Exception) { + e.printStackTrace() } - } catch (e: CancellationException) {} - } - - handlerJob = scope.launch { - try { - while (true) { - handleUpdate() - } - } catch (e: CancellationException) {} - } - } - - private suspend fun pollUpdates() { - val updatesResponse = api - .getUpdates(offset = currentOffset, timeout = pollTimeout) - updatesResponse.result?.let { updates -> - if (!updates.isEmpty()) { - updates.forEach { updateChan.send(it) } - currentOffset = updates.last().updateId + 1 } + } catch (e: CancellationException) { + request?.cancel() } } - private suspend fun handleUpdate() { - val update = updateChan.receive() + private fun initHandler() = scope.launch { + try { + while (true) { + handleUpdate(updateChan.receive()) + } + } catch (e: CancellationException) {} + } + + suspend fun handleUpdate(update: TgApiService.Update) { update.message?.text?.let { commandRegex.matchEntire(it)?.groupValues?.let { commandMap[it[1]]?.let { it(update) }