From 4d1146cf9972d0073e595b394d76f12345a1eec6 Mon Sep 17 00:00:00 2001 From: Marco Date: Sat, 30 Dec 2023 00:57:29 -0500 Subject: [PATCH] feat: add store and copy imap commands. improve structured concurrency. fix socket reading when it shouldnt --- .../src/commonMain/kotlin/ImapAgent.kt | 42 +++- .../src/commonMain/kotlin/ImapFolder.kt | 114 +++++----- .../src/commonMain/kotlin/ImapServer.kt | 13 +- .../kotlin/transports/ImapServerTransport.kt | 11 +- .../imap/src/commonMain/kotlin/Sequence.kt | 2 +- .../kotlin/frames/command/CopyCommand.kt | 18 ++ .../kotlin/frames/command/FetchCommand.kt | 2 +- .../kotlin/frames/command/ImapCommand.kt | 4 +- .../kotlin/frames/command/StoreCommand.kt | 44 ++++ .../kotlin/frames/command/UidCommand.kt | 2 + .../src/commonMain/kotlin/Pop3Server.kt | 11 +- .../kotlin/transports/Pop3ServerTransport.kt | 10 +- mailserver/runner/build.gradle.kts | 2 + mailserver/runner/src/main/kotlin/imap.kt | 39 ++-- mailserver/runner/src/main/kotlin/launcher.kt | 6 +- .../runner/src/main/kotlin/launcherJvm.kt | 24 +- mailserver/runner/src/main/kotlin/pop3.kt | 9 +- mailserver/runner/src/main/kotlin/sslJvm.kt | 2 +- .../kotlin/storage/filesystems/localJvm.kt | 12 + .../main/kotlin/storage/formats/maildir.kt | 2 +- .../runner/src/main/kotlin/submission.kt | 11 +- mailserver/runner/src/main/kotlin/transfer.kt | 43 ++-- mailserver/settings.gradle.kts | 7 + .../commonMain/kotlin/SmtpServerConnector.kt | 2 +- .../src/commonMain/kotlin/TransferAgentd.kt | 212 ------------------ .../commonMain/kotlin/connections/server.kt | 2 +- .../kotlin/submission/SubmissionServer.kt | 12 +- .../kotlin/transfer/OutgoingMessageQueue.kt | 3 +- .../kotlin/transfer/TransferReceiveServer.kt | 12 +- .../kotlin/transfer/TransferServer.kt | 10 +- .../transports/server/SmtpCommandPipeline.kt | 3 +- .../transports/server/SmtpServerTransport.kt | 2 +- mailserver/utils/build.gradle.kts | 5 + ...ceptionLoggingCoroutineExceptionHandler.kt | 8 + 34 files changed, 338 insertions(+), 363 deletions(-) create mode 100644 mailserver/imap/src/commonMain/kotlin/frames/command/CopyCommand.kt create mode 100644 mailserver/imap/src/commonMain/kotlin/frames/command/StoreCommand.kt delete mode 100644 mailserver/smtp-agent/src/commonMain/kotlin/TransferAgentd.kt create mode 100644 mailserver/utils/src/commonMain/kotlin/ExceptionLoggingCoroutineExceptionHandler.kt diff --git a/mailserver/imap-agent/src/commonMain/kotlin/ImapAgent.kt b/mailserver/imap-agent/src/commonMain/kotlin/ImapAgent.kt index 90010cd..895487b 100644 --- a/mailserver/imap-agent/src/commonMain/kotlin/ImapAgent.kt +++ b/mailserver/imap-agent/src/commonMain/kotlin/ImapAgent.kt @@ -2,6 +2,7 @@ package dev.sitar.kmail.imap.agent import dev.sitar.kmail.imap.Capability import dev.sitar.kmail.imap.agent.transports.ImapServerTransport +import dev.sitar.kmail.imap.frames.DataItem import dev.sitar.kmail.imap.frames.Tag import dev.sitar.kmail.imap.frames.command.* import dev.sitar.kmail.imap.frames.response.* @@ -103,6 +104,7 @@ sealed interface State { StartTlsCommand -> { agent.transport.send(tag + OkResponse(text = "Let the TLS negotiations begin.")) agent.transport.secure() + logger.debug { "secured ${agent.transport.commandPipeline}" } } is AuthenticateCommand -> { require(command.mechanism is SaslMechanism.Plain) @@ -118,6 +120,7 @@ sealed interface State { agent.transport.send(tag + OkResponse(text = "authenticated.")) agent.state = Authenticated(agent, mailbox) } else { + logger.debug { "user ${challenge.authenticationIdentity} failed to authenticate."} TODO("not authenticated") } } @@ -250,10 +253,18 @@ sealed interface State { is FetchCommand -> { fetch(context.command.tag, command) } + is StoreCommand -> { + store(context.command.tag, command) + } + is CopyCommand -> { + copy(context.command.tag, command) + } is UidCommand -> { when (val form = command.command) { is FetchCommand -> fetch(context.command.tag, form) - else -> error("shouldnt happen, it wouldnt get deserialized.") + is StoreCommand -> store(context.command.tag, form) + is CopyCommand -> copy(context.command.tag, form) + else -> throw Exception("shouldnt happen, it wont get deserialized.") } } is CheckCommand -> { @@ -270,6 +281,35 @@ sealed interface State { agent.transport.send(tag + OkResponse(text = "Here is your mail.")) } + + private suspend fun store(tag: Tag, command: StoreCommand) { + val resp = folder.store(command.sequence, command.item.flags.map { Flag.fromValue(it) }.toSet(), command.item.mode) + + if (!command.item.silent) { + resp.forEach { + agent.transport.send(Tag.Untagged + FetchResponse(it.key, setOf(DataItem.Response.Flags(it.value.map(Flag::value))))) + } + } + + agent.transport.send(tag + OkResponse(text = "Stored new flags.")) + } + + private suspend fun copy(tag: Tag, command: CopyCommand) { + val messages = folder.sequenceToMessages(command.sequence) + + val copy = authenticated.mailbox.folder(command.mailbox) + + if (copy == null) { + agent.transport.send(tag + BadResponse(text = "[TRYCREATE] dest doesn't exist.")) + return + } + + messages.forEach { + copy.save(it.flags, it.typedMessage().asText()) + } + + agent.transport.send(tag + OkResponse(text = "copy done.")) + } } class Logout(): State { override suspend fun handle(context: ImapCommandContext) { diff --git a/mailserver/imap-agent/src/commonMain/kotlin/ImapFolder.kt b/mailserver/imap-agent/src/commonMain/kotlin/ImapFolder.kt index 00a7e8c..3bf4051 100644 --- a/mailserver/imap-agent/src/commonMain/kotlin/ImapFolder.kt +++ b/mailserver/imap-agent/src/commonMain/kotlin/ImapFolder.kt @@ -3,8 +3,8 @@ package dev.sitar.kmail.imap.agent import dev.sitar.kmail.imap.PartSpecifier import dev.sitar.kmail.imap.Sequence import dev.sitar.kmail.imap.frames.DataItem +import dev.sitar.kmail.imap.frames.command.StoreMode import dev.sitar.kmail.message.Message -import java.util.Collections import kotlin.math.max import kotlin.math.min @@ -79,62 +79,18 @@ interface ImapFolder { suspend fun save(flags: Set, message: String) - suspend fun update(pos: Int, mode: Sequence.Mode, flags: Set) + suspend fun store(sequence: Sequence, flags: Set, mode: StoreMode, messagesSnapshot: List? = null): Map> suspend fun fetch(sequence: Sequence, dataItems: List): Map> { val messagesSnapshot = messages() - val start = when (sequence) { - is Sequence.Set -> with(sequence.start) { - when (this) { - is Sequence.Position.Actual -> pos - Sequence.Position.Any -> TODO() - } - } - is Sequence.Single -> with(sequence.pos) { - when (this) { - is Sequence.Position.Actual -> pos - Sequence.Position.Any -> TODO() - } - } - } + val selectedMessages = sequenceToMessages(sequence, messagesSnapshot).takeIf { it.isNotEmpty() } ?: return emptyMap() - val end = when (sequence) { - is Sequence.Set -> with(sequence.end) { - when (this) { - is Sequence.Position.Actual -> pos - Sequence.Position.Any -> messagesSnapshot.size - } - } - is Sequence.Single -> with(sequence.pos) { - when (this) { - is Sequence.Position.Actual -> pos - Sequence.Position.Any -> TODO() - } - } - } - - val exists = exists() - - if (!(start in 0..exists && end in 0..exists)) return emptyMap() - - val selectedMessages = when (sequence.mode) { - Sequence.Mode.SequenceNumber -> { - messagesSnapshot.subList(messagesSnapshot.size - end, messagesSnapshot.size + 1 - start) - } - Sequence.Mode.Uid -> { - val a = messagesSnapshot.indexOfFirst { it.uniqueIdentifier == start } - val b = messagesSnapshot.indexOfFirst { it.uniqueIdentifier == end } - - val start = min(a, b) - val end = max(a, b) - messagesSnapshot.subList(start, end + 1) - } - } + if (dataItems.any { it is DataItem.Fetch.Body }) store(sequence, setOf(Flag.Seen), StoreMode.Add, messagesSnapshot = messagesSnapshot) return selectedMessages.associate { message -> val pos = when (sequence.mode) { - Sequence.Mode.SequenceNumber -> message.sequenceNumber + Sequence.Mode.Sequence -> message.sequenceNumber Sequence.Mode.Uid -> message.uniqueIdentifier } @@ -149,8 +105,6 @@ interface ImapFolder { DataItem.Fetch.Rfc822Size -> add(DataItem.Response.Rfc822Size(message.size)) DataItem.Fetch.Uid -> add(DataItem.Response.Uid(message.uniqueIdentifier.toString())) is DataItem.Fetch.BodyType -> { - if (item is DataItem.Fetch.Body) update(pos, sequence.mode, setOf(Flag.Seen)) - val typed = message.typedMessage() if (item.parts.isEmpty()) { @@ -186,6 +140,64 @@ interface ImapFolder { } } + suspend fun sequenceToMessages( + sequence: Sequence, + messagesSnapshot: List? = null + ): List { + val messagesSnapshot = messagesSnapshot ?: messages() + + val start = when (sequence) { + is Sequence.Set -> with(sequence.start) { + when (this) { + is Sequence.Position.Actual -> pos + Sequence.Position.Any -> TODO() + } + } + + is Sequence.Single -> with(sequence.pos) { + when (this) { + is Sequence.Position.Actual -> pos + Sequence.Position.Any -> TODO() + } + } + } + + val end = when (sequence) { + is Sequence.Set -> with(sequence.end) { + when (this) { + is Sequence.Position.Actual -> pos + Sequence.Position.Any -> messagesSnapshot.size + } + } + + is Sequence.Single -> with(sequence.pos) { + when (this) { + is Sequence.Position.Actual -> pos + Sequence.Position.Any -> TODO() + } + } + } + + val exists = exists() + + if (!(start in 0..exists && end in 0..exists)) return listOf() + + return when (sequence.mode) { + Sequence.Mode.Sequence -> { + messagesSnapshot.subList(messagesSnapshot.size - end, messagesSnapshot.size + 1 - start) + } + + Sequence.Mode.Uid -> { + val a = messagesSnapshot.indexOfFirst { it.uniqueIdentifier == start } + val b = messagesSnapshot.indexOfFirst { it.uniqueIdentifier == end } + + val start = min(a, b) + val end = max(a, b) + messagesSnapshot.subList(start, end + 1) + } + } + } + companion object { const val DELIM = "/" } diff --git a/mailserver/imap-agent/src/commonMain/kotlin/ImapServer.kt b/mailserver/imap-agent/src/commonMain/kotlin/ImapServer.kt index 184f0b4..f107087 100644 --- a/mailserver/imap-agent/src/commonMain/kotlin/ImapServer.kt +++ b/mailserver/imap-agent/src/commonMain/kotlin/ImapServer.kt @@ -1,6 +1,7 @@ package dev.sitar.kmail.imap.agent import dev.sitar.kmail.imap.agent.transports.ImapServerTransport +import dev.sitar.kmail.utils.ExceptionLoggingCoroutineExceptionHandler import dev.sitar.kmail.utils.server.ServerSocket import kotlinx.coroutines.* import mu.KotlinLogging @@ -14,13 +15,21 @@ class ImapServer( val layer: ImapLayer ) { suspend fun listen() = supervisorScope { + logger.debug { "IMAP server is listening." } + while (isActive) { val transport = ImapServerTransport(socket.accept()) - launch { + launch(Dispatchers.IO) { logger.debug { "Accepted a connection from ${transport.remote}" } - ImapAgent(transport, layer).handle() + try { + ImapAgent(transport, layer).handle() + } catch (e: Exception) { + logger.error(e) { "IMAP session encountered an exception." } + } + + logger.debug { "IMAP session completed." } } } } diff --git a/mailserver/imap-agent/src/commonMain/kotlin/transports/ImapServerTransport.kt b/mailserver/imap-agent/src/commonMain/kotlin/transports/ImapServerTransport.kt index 6af412c..dacd450 100644 --- a/mailserver/imap-agent/src/commonMain/kotlin/transports/ImapServerTransport.kt +++ b/mailserver/imap-agent/src/commonMain/kotlin/transports/ImapServerTransport.kt @@ -31,9 +31,14 @@ class ImapServerTransport(var connection: Connection) { suspend fun startPipeline() = coroutineScope { while (isActive && reader.openForRead) { - val command = reader.readCommand() - val context = ImapCommandContext(command, false) - commandPipeline.process(context) + try { + val command = reader.readCommand() + val context = ImapCommandContext(command, false) + + commandPipeline.process(context) + } catch (e: Exception) { + logger.error(e) { "imap transport stream encountered exception." } + } } } diff --git a/mailserver/imap/src/commonMain/kotlin/Sequence.kt b/mailserver/imap/src/commonMain/kotlin/Sequence.kt index af85cf4..2620642 100644 --- a/mailserver/imap/src/commonMain/kotlin/Sequence.kt +++ b/mailserver/imap/src/commonMain/kotlin/Sequence.kt @@ -11,7 +11,7 @@ sealed class Sequence { data class Set(val start: Position, val end: Position, override val mode: Mode): Sequence() enum class Mode { - SequenceNumber, + Sequence, Uid } diff --git a/mailserver/imap/src/commonMain/kotlin/frames/command/CopyCommand.kt b/mailserver/imap/src/commonMain/kotlin/frames/command/CopyCommand.kt new file mode 100644 index 0000000..b2946da --- /dev/null +++ b/mailserver/imap/src/commonMain/kotlin/frames/command/CopyCommand.kt @@ -0,0 +1,18 @@ +package dev.sitar.kmail.imap.frames.command + +import dev.sitar.kio.async.readers.AsyncReader +import dev.sitar.kmail.imap.Sequence +import dev.sitar.kmail.imap.readValue + +data class CopyCommand(val sequence: Sequence, val mailbox: String): ImapCommand { + override val identifier: ImapCommand.Identifier = ImapCommand.Identifier.Copy + + companion object: ImapCommandSerializer { + suspend fun deserialize(mode: Sequence.Mode, input: AsyncReader): CopyCommand { + return CopyCommand(Sequence.deserialize(mode, input), input.readValue(isEnd = true)) + } + + override suspend fun deserialize(input: AsyncReader): CopyCommand = + deserialize(mode = Sequence.Mode.Sequence, input) + } +} \ No newline at end of file diff --git a/mailserver/imap/src/commonMain/kotlin/frames/command/FetchCommand.kt b/mailserver/imap/src/commonMain/kotlin/frames/command/FetchCommand.kt index 8eb24dd..d57c4f1 100644 --- a/mailserver/imap/src/commonMain/kotlin/frames/command/FetchCommand.kt +++ b/mailserver/imap/src/commonMain/kotlin/frames/command/FetchCommand.kt @@ -44,6 +44,6 @@ data class FetchCommand(val sequence: Sequence, val dataItems: List { + suspend fun deserialize(mode: Sequence.Mode, input: AsyncReader): StoreCommand { + val sequence = Sequence.deserialize(mode, input) + + val rawName = input.readUtf8StringUntil { it == ' ' } + val flags = input.readList() + + val parts = rawName.split('.') + + val mode: StoreMode = when (parts[0]) { + "FLAGS" -> StoreMode.Set + "+FLAGS" -> StoreMode.Add + "-FLGS" -> StoreMode.Remove + else -> throw Exception("could not parse store mode ${parts[0]}") + } + + val silent = parts.getOrNull(1)?.lowercase()?.toBooleanStrictOrNull() ?: false + + return StoreCommand(sequence, StoreDataItem(mode, silent, flags)) + } + + override suspend fun deserialize(input: AsyncReader): StoreCommand = + deserialize(mode = Sequence.Mode.Sequence, input) + } +} + +enum class StoreMode { + Set, + Add, + Remove; +} + +data class StoreDataItem(val mode: StoreMode, var silent: Boolean, val flags: List) diff --git a/mailserver/imap/src/commonMain/kotlin/frames/command/UidCommand.kt b/mailserver/imap/src/commonMain/kotlin/frames/command/UidCommand.kt index c1c5d0a..b33c328 100644 --- a/mailserver/imap/src/commonMain/kotlin/frames/command/UidCommand.kt +++ b/mailserver/imap/src/commonMain/kotlin/frames/command/UidCommand.kt @@ -25,6 +25,8 @@ data class UidCommand(val command: ImapCommand): ImapCommand { val command = when (ImapCommand.Identifier.findByIdentifier(identifier)) { ImapCommand.Identifier.Fetch -> FetchCommand.deserialize(Sequence.Mode.Uid, input) + ImapCommand.Identifier.Store -> StoreCommand.deserialize(Sequence.Mode.Uid, input) + ImapCommand.Identifier.Copy -> CopyCommand.deserialize(Sequence.Mode.Uid, input) // ImapCommand.Identifier.Search else -> TODO("got $identifier") } diff --git a/mailserver/pop3-agent/src/commonMain/kotlin/Pop3Server.kt b/mailserver/pop3-agent/src/commonMain/kotlin/Pop3Server.kt index afca0ae..94ef548 100644 --- a/mailserver/pop3-agent/src/commonMain/kotlin/Pop3Server.kt +++ b/mailserver/pop3-agent/src/commonMain/kotlin/Pop3Server.kt @@ -2,9 +2,7 @@ package dev.sitar.kmail.agents.pop3 import dev.sitar.kmail.agents.pop3.transports.Pop3ServerTransport import dev.sitar.kmail.utils.server.ServerSocket -import kotlinx.coroutines.isActive -import kotlinx.coroutines.launch -import kotlinx.coroutines.supervisorScope +import kotlinx.coroutines.* import mu.KotlinLogging private val logger = KotlinLogging.logger { } @@ -14,12 +12,13 @@ class Pop3Server( val layer: Pop3Layer, ) { suspend fun listen() = supervisorScope { + logger.debug { "POP3 server is listening." } + while (isActive) { val transport = Pop3ServerTransport(socket.accept()) + logger.debug { "Accepted a connection from ${transport.remote}." } - launch { - logger.debug { "Accepted a connection from ${transport.remote}." } - + launch(Dispatchers.IO) { val agent = Pop3Agent(transport, layer) agent.handle() } diff --git a/mailserver/pop3-agent/src/commonMain/kotlin/transports/Pop3ServerTransport.kt b/mailserver/pop3-agent/src/commonMain/kotlin/transports/Pop3ServerTransport.kt index fca33d5..17e6fdd 100644 --- a/mailserver/pop3-agent/src/commonMain/kotlin/transports/Pop3ServerTransport.kt +++ b/mailserver/pop3-agent/src/commonMain/kotlin/transports/Pop3ServerTransport.kt @@ -24,9 +24,13 @@ class Pop3ServerTransport(val connection: Connection) { suspend fun startPipeline() = coroutineScope { while (isActive && reader.openForRead) { - val command = reader.readCommand() - val context = Pop3CommandContext(command, false) - commandPipeline.process(context) + try { + val command = reader.readCommand() + val context = Pop3CommandContext(command, false) + commandPipeline.process(context) + } catch (e: Exception) { + logger.error(e) { "pop3 transport stream encountered exception." } + } } } diff --git a/mailserver/runner/build.gradle.kts b/mailserver/runner/build.gradle.kts index e110ee4..637039f 100644 --- a/mailserver/runner/build.gradle.kts +++ b/mailserver/runner/build.gradle.kts @@ -22,6 +22,8 @@ kotlin { } dependencies { + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-debug:1.4.0") + implementation("io.github.microutils:kotlin-logging:3.0.2") implementation("org.apache.logging.log4j:log4j-slf4j2-impl:2.19.0") implementation("org.apache.logging.log4j:log4j-core:2.19.0") diff --git a/mailserver/runner/src/main/kotlin/imap.kt b/mailserver/runner/src/main/kotlin/imap.kt index bba73b9..3b244b1 100644 --- a/mailserver/runner/src/main/kotlin/imap.kt +++ b/mailserver/runner/src/main/kotlin/imap.kt @@ -2,6 +2,7 @@ package dev.sitar.kmail.runner import dev.sitar.kmail.imap.Sequence import dev.sitar.kmail.imap.agent.* +import dev.sitar.kmail.imap.frames.command.StoreMode import dev.sitar.kmail.message.Message import dev.sitar.kmail.runner.storage.StorageLayer import dev.sitar.kmail.runner.storage.formats.Mailbox @@ -9,25 +10,15 @@ import dev.sitar.kmail.runner.storage.formats.MailboxFolder import dev.sitar.kmail.runner.storage.formats.MailboxMessage import dev.sitar.kmail.sasl.SaslChallenge import dev.sitar.kmail.utils.server.ServerSocketFactory -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.launch import kotlinx.datetime.Clock import mu.KotlinLogging -import kotlin.math.max -import kotlin.math.min private val logger = KotlinLogging.logger { } -suspend fun imap(socket: ServerSocketFactory, layer: ImapLayer): ImapServer = coroutineScope { +suspend fun imap(socket: ServerSocketFactory, layer: ImapLayer) { logger.info("Starting IMAP server.") - val server = ImapServer(socket.bind(IMAP_SERVER), layer) - launch { server.listen() } - - logger.info("Started IMAP server.") - - server + ImapServer(socket.bind(IMAP_SERVER), layer).listen() } // TODO: this is horrible @@ -91,15 +82,29 @@ class KmailImapFolder(val folder: MailboxFolder) : ImapFolder { // TODO: mix-matched nullability suspend fun get(pos: Int, mode: Sequence.Mode): MailboxMessage { return when (mode) { - Sequence.Mode.SequenceNumber -> folder.message(exists() - pos) + Sequence.Mode.Sequence -> folder.message(exists() - pos) Sequence.Mode.Uid -> folder.messageByUid(pos)!! } } - override suspend fun update(pos: Int, mode: Sequence.Mode, flags: Set) { - val message = get(pos, mode) + override suspend fun store(sequence: Sequence, flags: Set, mode: StoreMode, messagesSnapshot: List?) : Map> { + val messages = sequenceToMessages(sequence, messagesSnapshot ?: messages()) + + return messages.associate { + val newFlags = when (mode) { + StoreMode.Set -> flags + StoreMode.Add -> it.flags + flags + StoreMode.Remove -> it.flags - flags + } - message.updateFlags(flags) + val oldMessage = get(it.sequenceNumber, Sequence.Mode.Sequence) + oldMessage.updateFlags(newFlags) + + when (sequence.mode) { + Sequence.Mode.Sequence -> it.sequenceNumber + Sequence.Mode.Uid -> it.uniqueIdentifier + } to newFlags + } } override suspend fun onMessageStore(handler: (suspend (ImapMessage) -> Unit)?) { @@ -137,7 +142,7 @@ class KmailImapMailbox(val mailbox: Mailbox) : ImapMailbox { class KmailImapLayer(val storage: StorageLayer): ImapLayer { override suspend fun create(username: String, mailbox: String) { storage.user(username).folder(mailbox) - println("creating a mailbox called $mailbox") + logger.debug { "creating a mailbox called $mailbox" } } override suspend fun authenticate(challenge: SaslChallenge): String? { diff --git a/mailserver/runner/src/main/kotlin/launcher.kt b/mailserver/runner/src/main/kotlin/launcher.kt index 53b56e1..c4880e0 100644 --- a/mailserver/runner/src/main/kotlin/launcher.kt +++ b/mailserver/runner/src/main/kotlin/launcher.kt @@ -9,17 +9,19 @@ import dev.sitar.kmail.imap.agent.transports.ImapServerTransport import dev.sitar.kmail.runner.storage.mailbox import dev.sitar.kmail.sasl.SaslChallenge import dev.sitar.kmail.smtp.InternetMessage +import dev.sitar.kmail.utils.ExceptionLoggingCoroutineExceptionHandler import dev.sitar.kmail.utils.connection.ConnectionFactory import dev.sitar.kmail.utils.server.ServerSocketFactory import kotlinx.coroutines.* +import kotlinx.coroutines.debug.DebugProbes import kotlinx.coroutines.flow.MutableSharedFlow import mu.KotlinLogging private val logger = KotlinLogging.logger { } @OptIn(FlowPreview::class) -suspend fun run(serverSocketFactory: ServerSocketFactory, connectionFactory: ConnectionFactory) = supervisorScope { - withContext(CoroutineExceptionHandler { coroutineContext, throwable -> logger.error(throwable) { } }) { +suspend fun run(serverSocketFactory: ServerSocketFactory, connectionFactory: ConnectionFactory) = coroutineScope { + withContext(ExceptionLoggingCoroutineExceptionHandler(logger)) { println(KMAIL_ASCII) logger.info("Kmail is starting.") diff --git a/mailserver/runner/src/main/kotlin/launcherJvm.kt b/mailserver/runner/src/main/kotlin/launcherJvm.kt index 2421ee9..3907697 100644 --- a/mailserver/runner/src/main/kotlin/launcherJvm.kt +++ b/mailserver/runner/src/main/kotlin/launcherJvm.kt @@ -1,13 +1,34 @@ package dev.sitar.kmail.runner +import dev.sitar.kio.async.readers.readBytes +import dev.sitar.kio.buffers.buffer +import dev.sitar.kio.buffers.contentEquals +import dev.sitar.kio.buffers.writeBytes +import dev.sitar.kmail.utils.connection.KtorConnectionFactory import dev.sitar.kmail.utils.connection.TlsCapableConnectionFactory +import dev.sitar.kmail.utils.io.readStringUtf8 +import dev.sitar.kmail.utils.io.writeStringUtf8 import dev.sitar.kmail.utils.server.TlsCapableServerSocketFactory +import io.ktor.network.sockets.* +import io.ktor.network.tls.* +import kotlinx.coroutines.async import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.debug.DebugProbes +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch import mu.KotlinLogging +import java.util.* +import kotlin.random.Random +import kotlin.random.nextUBytes +import kotlin.system.measureTimeMillis private val logger = KotlinLogging.logger { } suspend fun main() { +// DebugProbes.enableCreationStackTraces = true +// DebugProbes.sanitizeStackTraces = true +// DebugProbes.install() + logger.info { "Using JVM version: ${Runtime.version()}" } dns() @@ -18,6 +39,7 @@ suspend fun main() { val socketFactory = TlsCapableServerSocketFactory(ssl) // TODO: i cant get ktor connection factory to work val connectionFactory = TlsCapableConnectionFactory(ssl) -// val connectionFactory = KtorConnectionFactory(TLSConfigBuilder().apply { addKeyStore(keystore, null, "private") }.build()) +// val connectionFactory = KtorConnectionFactory(TLSConfigBuilder().apply { addKeyStore(keystore, Config.security.password.toCharArray()) }.build()) + run(socketFactory, connectionFactory) } \ No newline at end of file diff --git a/mailserver/runner/src/main/kotlin/pop3.kt b/mailserver/runner/src/main/kotlin/pop3.kt index fcf791e..3381f92 100644 --- a/mailserver/runner/src/main/kotlin/pop3.kt +++ b/mailserver/runner/src/main/kotlin/pop3.kt @@ -15,15 +15,10 @@ import mu.KotlinLogging private val logger = KotlinLogging.logger { } -suspend fun pop3(socket: ServerSocketFactory, layer: Pop3Layer): Pop3Server = coroutineScope { +suspend fun pop3(socket: ServerSocketFactory, layer: Pop3Layer) { logger.info("Starting Pop3 server.") - val server = Pop3Server(socket.bind(POP3_SERVER), layer) - launch { server.listen() } - - logger.info("Started Pop3 server.") - - server + Pop3Server(socket.bind(POP3_SERVER), layer).listen() } class KmailPop3Layer(val storage: StorageLayer): Pop3Layer { diff --git a/mailserver/runner/src/main/kotlin/sslJvm.kt b/mailserver/runner/src/main/kotlin/sslJvm.kt index 677175f..fd4bb07 100644 --- a/mailserver/runner/src/main/kotlin/sslJvm.kt +++ b/mailserver/runner/src/main/kotlin/sslJvm.kt @@ -21,7 +21,7 @@ fun ssl(): Pair { keyStore.load(FileInputStream(Config.security.keystore), Config.security.password.toCharArray()) logger.debug { "Generating SSL context." } - val ssl = SSLContext.getInstance("TLS") + val ssl = SSLContext.getInstance("TLSv1.3") val keyManagerFactory = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm()) keyManagerFactory.init(keyStore, Config.security.password.toCharArray()) diff --git a/mailserver/runner/src/main/kotlin/storage/filesystems/localJvm.kt b/mailserver/runner/src/main/kotlin/storage/filesystems/localJvm.kt index 06282a0..d3898ca 100644 --- a/mailserver/runner/src/main/kotlin/storage/filesystems/localJvm.kt +++ b/mailserver/runner/src/main/kotlin/storage/filesystems/localJvm.kt @@ -2,8 +2,11 @@ package dev.sitar.kmail.runner.storage.filesystems import dev.sitar.kmail.runner.storage.Attributable import dev.sitar.kmail.runner.storage.Attributes +import mu.KotlinLogging import java.io.File +private val logger = KotlinLogging.logger { } + class LocalFileSystem(dir: String): FileSystem { val root = File(dir) @@ -46,6 +49,8 @@ class LocalFolder(val file: File): FsFolder, Attributable { } override suspend fun createFolder(name: String): FsFolder { + logger.trace { "creating folder $name in ${file.path}" } + return LocalFolder(file.resolve(name).also { it.mkdir() }) } @@ -63,10 +68,14 @@ class LocalFolder(val file: File): FsFolder, Attributable { } override suspend fun readFile(name: String): ByteArray? { + logger.trace { "reading from $name in ${file.path}" } + return file.resolve(name).takeIf { it.exists() }?.readBytes() } override suspend fun writeFile(name: String, contents: ByteArray): FsFile { + logger.trace { "writing to $name in ${file.path}" } + val file = file.resolve(name).also { it.createNewFile() } file.writeBytes(contents) return FsFile(file.name, file.length()) @@ -75,10 +84,13 @@ class LocalFolder(val file: File): FsFolder, Attributable { override suspend fun move(file: String, folder: FsFolder) { require(folder is LocalFolder) + logger.trace { "moving $file from ${this.file.path} to ${folder.file.path}" } + this.file.resolve(file).renameTo(folder.file.resolve(file)) } override suspend fun rename(from: String, to: String) { + logger.trace { "renaming $from to $to in ${file.path}" } file.resolve(from).renameTo(file.resolve(to)) } } diff --git a/mailserver/runner/src/main/kotlin/storage/formats/maildir.kt b/mailserver/runner/src/main/kotlin/storage/formats/maildir.kt index 73c19a9..5e82837 100644 --- a/mailserver/runner/src/main/kotlin/storage/formats/maildir.kt +++ b/mailserver/runner/src/main/kotlin/storage/formats/maildir.kt @@ -185,7 +185,7 @@ private val logger = KotlinLogging.logger { } class MaildirMessage(var fileName: String, override val length: Long, var folder: FsFolder, val mailbox: Maildir): MailboxMessage { override val name: String = fileName.split(':').first() - override val flags: Set = MaildirUniqueName(fileName).flags.toSet() + override val flags: Set get() = MaildirUniqueName(fileName).flags.toSet() override suspend fun updateFlags(flags: Set) { if (Flag.Seen in flags) { diff --git a/mailserver/runner/src/main/kotlin/submission.kt b/mailserver/runner/src/main/kotlin/submission.kt index d6b49e7..971aca2 100644 --- a/mailserver/runner/src/main/kotlin/submission.kt +++ b/mailserver/runner/src/main/kotlin/submission.kt @@ -19,19 +19,14 @@ import mu.KotlinLogging private val logger = KotlinLogging.logger { } -suspend fun submission(factory: ServerSocketFactory, outgoing: OutgoingMessageQueue): SubmissionServer = coroutineScope { +suspend fun submission(factory: ServerSocketFactory, outgoing: OutgoingMessageQueue) { logger.info("SMTP submission agent is starting.") - val server = SubmissionServer( + SubmissionServer( factory.bind(Config.smtp.submission.port), SubmissionConfig(Config.domains.first(), requiresEncryption = true, KmailAuthenticationManager), outgoing - ) - launch { server.listen() } - - logger.info("SMTP submission agent has started.") - - server + ).listen() } data class KmailAuthenticatedUser(val email: String) : SmtpAuthenticatedUser diff --git a/mailserver/runner/src/main/kotlin/transfer.kt b/mailserver/runner/src/main/kotlin/transfer.kt index 4684bb5..96da92b 100644 --- a/mailserver/runner/src/main/kotlin/transfer.kt +++ b/mailserver/runner/src/main/kotlin/transfer.kt @@ -14,37 +14,36 @@ import mu.KotlinLogging private val logger = KotlinLogging.logger { } -suspend fun transfer( +fun CoroutineScope.transfer( serverFactory: ServerSocketFactory, connectionFactory: ConnectionFactory, outgoingMessages: OutgoingMessageQueue, incomingMessages: MutableSharedFlow -): TransferReceiveServer = coroutineScope { +) { logger.info("SMTP transfer server is starting.") - val server = TransferServer( - TransferConfig( - Config.domains.first(), - requireEncryption = Config.smtp.transfer.encryption, - proxy = Config.proxy?.intoSmtpProxy(), - connector = DefaultTransferSessionSmtpConnector(connectionFactory) - ), outgoingMessages - ) - launch { server.handle() } + launch { + val server = TransferServer( + TransferConfig( + Config.domains.first(), + requireEncryption = Config.smtp.transfer.encryption, + proxy = Config.proxy?.intoSmtpProxy(), + connector = DefaultTransferSessionSmtpConnector(connectionFactory) + ), outgoingMessages + ) - logger.info("SMTP transfer server has started.") + server.handle() + } logger.info("SMTP receive server is starting.") - val receiveServer = TransferReceiveServer( - serverFactory.bind(Config.smtp.transfer.port), - TransferReceiveConfig(Config.domains.first(), requiresEncryption = true, Config.accounts.map { it.email }), - incomingMessages - ) + launch { + val receiveServer = TransferReceiveServer( + serverFactory.bind(Config.smtp.transfer.port), + TransferReceiveConfig(Config.domains.first(), requiresEncryption = true, Config.accounts.map { it.email }), + incomingMessages + ) - launch { receiveServer.listen() } - - logger.info("SMTP receive server has started.") - - receiveServer + receiveServer.listen() + } } \ No newline at end of file diff --git a/mailserver/settings.gradle.kts b/mailserver/settings.gradle.kts index e13713a..6b01b58 100644 --- a/mailserver/settings.gradle.kts +++ b/mailserver/settings.gradle.kts @@ -22,4 +22,11 @@ pluginManagement { if (requested.id.id == "kotlinx-atomicfu") useModule("org.jetbrains.kotlinx:atomicfu-gradle-plugin:0.18.5") } } +} + +// TODO: publish latest kio +includeBuild("../../kio") { + dependencySubstitution { + substitute(module("dev.sitar:kio:1.1.2")).using(project(":")) + } } \ No newline at end of file diff --git a/mailserver/smtp-agent/src/commonMain/kotlin/SmtpServerConnector.kt b/mailserver/smtp-agent/src/commonMain/kotlin/SmtpServerConnector.kt index f537eef..8223f14 100644 --- a/mailserver/smtp-agent/src/commonMain/kotlin/SmtpServerConnector.kt +++ b/mailserver/smtp-agent/src/commonMain/kotlin/SmtpServerConnector.kt @@ -37,7 +37,7 @@ interface SmtpServerConnector { class DefaultTransferSessionSmtpConnector( override val connectionFactory: ConnectionFactory, - override val timeout: Long = 500 + override val timeout: Long = 2000 ) : SmtpServerConnector { companion object { val STANDARD_PORTS: Set = setOf( diff --git a/mailserver/smtp-agent/src/commonMain/kotlin/TransferAgentd.kt b/mailserver/smtp-agent/src/commonMain/kotlin/TransferAgentd.kt deleted file mode 100644 index f4fbfff..0000000 --- a/mailserver/smtp-agent/src/commonMain/kotlin/TransferAgentd.kt +++ /dev/null @@ -1,212 +0,0 @@ -package dev.sitar.kmail.agents.smtp -// -//import dev.sitar.dns.dnsResolver -//import dev.sitar.dns.records.MXResourceRecord -//import dev.sitar.dns.records.ResourceType -//import dev.sitar.dns.transports.DnsServer -//import dev.sitar.kmail.smtp.* -//import dev.sitar.kmail.smtp.agent.transports.client.SmtpTransportConnection -//import dev.sitar.kmail.smtp.frames.replies.* -//import dev.sitar.kmail.smtp.frames.reply.EhloReply -//import dev.sitar.kmail.smtp.io.smtp.reader.AsyncSmtpClientReader -//import dev.sitar.kmail.smtp.io.smtp.reader.asAsyncSmtpClientReader -//import dev.sitar.kmail.smtp.io.smtp.writer.AsyncSmtpClientWriter -//import dev.sitar.kmail.smtp.io.smtp.writer.asAsyncSmtpClientWriter -//import kotlinx.coroutines.CoroutineName -//import kotlinx.coroutines.CoroutineScope -//import kotlinx.coroutines.Job -//import kotlinx.coroutines.flow.Flow -//import kotlinx.coroutines.launch -//import mu.KotlinLogging -//import kotlin.coroutines.CoroutineContext -//import kotlin.coroutines.EmptyCoroutineContext -// -//private val logger = KotlinLogging.logger { } -// -//data class TransferConfig( -// val domain: Domain, -// val requireEncryption: Boolean, -//) -// -//class TransferAgent( -// val config: TransferConfig, -// val outgoingMessages: Flow, -// val connector: SmtpServerConnector = DefaultTransferSessionSmtpConnector(), -// coroutineContext: CoroutineContext = EmptyCoroutineContext -//) { -// private companion object { -// private val GOOGLE_DNS = listOf("8.8.8.8", "8.8.4.4").map { DnsServer(it) } // Google's Public DNS -// } -// -// private val resolver = dnsResolver() -// private val coroutineScope = CoroutineScope(coroutineContext + Job() + CoroutineName("transfer-agent")) -// -// init { -// coroutineScope.launch { -// outgoingMessages.collect { -// launch { -// transfer(it) -// } -// } -// } -// } -// -// private data class TransferSession( -// val id: String, -// var exchange: String?, -// ) { -// lateinit var connection: SmtpTransportConnection -// -// lateinit var reader: AsyncSmtpClientReader -// lateinit var writer: AsyncSmtpClientWriter -// -// fun updateChannels() { -// reader = connection.reader.asAsyncSmtpClientReader() -// writer = connection.writer.asAsyncSmtpClientWriter() -// } -// -// suspend inline fun send(command: T) { -// logger.trace { "TRANSFER ($id/$exchange) >>> $command" } -// -// writer.writeCommand(command) -// } -// -// suspend inline fun , reified T: C> recv(): SmtpReply<*> { -// var reply = reader.readSmtpReply(EhloReply) -// -// logger.trace { "TRANSFER ($id/$exchange) <<< $reply" } -// -// if (reply is C) { -// reply = reply.tryAs() as SmtpReply<*> -// } -// -// return reply -// } -// -// suspend inline fun , reified T: C> recvCoerced(): StepProgression { -// var reply = reader.readSmtpReply() -// -// if (reply is C) { -// (reply.tryAs() as? SmtpReply<*>)?.let { reply = it } -// } -// -// logger.trace { "TRANSFER ($id/$exchange) <<< $reply" } -// -// return reply.coerceToStepProgression() -// } -// } -// -// private suspend fun transfer(mail: InternetMessage) { -// logger.info("Beginning transfer of the message ${mail.queueId}.") -// -// mail.envelope.recipientAddresses.forEach { -// logger.info("Beginning transfer of the message ${mail.queueId} to ${it}.") -// mail.sendTo(it) -// } -// -// logger.info("Finished transfer of the message ${mail.queueId}.") -// } -// -// private suspend fun InternetMessage.sendTo(rcpt: Path) = with(TransferSession(queueId, null)) { -// val host = when (val domain = rcpt.mailbox.domain) { -// is Domain.Actual -> domain.domain -// is Domain.AddressLiteral -> domain.networkAddress.toString() -// } -// -// logger.debug { "Attempting to resolve MX records for $host." } -// -// val mxRecords = resolver.resolveRecursively(host, GOOGLE_DNS) { -// qType = ResourceType.MX -// }?.filterIsInstance().orEmpty() -// -// logger.debug { "Resolved ${mxRecords.size} MX records.${mxRecords.joinToString(prefix = "\n", separator = "\n")}" } -// -// mxRecords -// .sortedBy { it.data.preference } -// .firstNotNullOfOrNull { connector.connect(it.data.exchange)?.run { it.data.exchange to this } } -// ?.let { (exchange, connection) -> -// this.exchange = exchange -// this.connection = connection -// updateChannels() -// } ?: TODO("could not connect to any exchange servers") -// -// var isEncrypted = false -// -// machine { -// step { -// recvCoerced() -// } -// -// step { -// send(EhloCommand(config.domain)) -// -// val ehlo = recv() -// -// if (ehlo !is EhloCompletion) return@step StepProgression.Abort("Expected an EHLO reply, got $ehlo instead.") -// -// // negotiate TLS -// if (connection.isImplicitlyEncrypted || isEncrypted) return@step StepProgression.Continue -// -// if (!ehlo.capabilities.containsKey(STARTTLS) || !connection.supportsClientTls) { -// if (config.requireEncryption) return@step StepProgression.Abort("Encryption is required however encryption could not be negotiated.") -// else logger.debug { "Continuing the transfer of $queueId without any encryption!" } -// return@step StepProgression.Continue -// } -// -// send(StartTlsCommand) -// -// when (val resp = recv()) { -// is SmtpReply.PermanentNegative -> return@step StepProgression.Abort("STARTTLS was denied by the server!") -// is SmtpReply.TransientNegative -> TODO("figure out what we should do") -// else -> {} -// } -// -// logger.debug { "Starting TLS negotiations." } -// connection.upgradeToTls() -// isEncrypted = true -// updateChannels() -// logger.debug { "Upgraded connection to TLS."} -// -// StepProgression.Retry -// } -// -// // TODO: implement pipelining -// step { -// send(MailCommand(envelope.originatorAddress)) -// recvCoerced() -// } -// -// step { -// send(RecipientCommand(rcpt)) -// recvCoerced() -// } -// -// step { -// send(DataCommand) -// recvCoerced() -// } -// -// step { -// writer.writeMessageData(message) -// recvCoerced() -// } -// -// stop { -// if (it is StopReason.Abrupt) { -// logger.warn { "The transfer of $queueId to $rcpt was abruptly stopped because of: ${it.reason}" } -// } else { -// logger.info { "The transfer of $queueId to $rcpt was successful." } -// } -// -// send(QuitCommand) -// -// try { -// recvCoerced() -// } catch (_: Throwable) { -// } -// -// connection.close() -// } -// } -// } -//} \ No newline at end of file diff --git a/mailserver/smtp-agent/src/commonMain/kotlin/connections/server.kt b/mailserver/smtp-agent/src/commonMain/kotlin/connections/server.kt index 7e193aa..8665c93 100644 --- a/mailserver/smtp-agent/src/commonMain/kotlin/connections/server.kt +++ b/mailserver/smtp-agent/src/commonMain/kotlin/connections/server.kt @@ -29,7 +29,7 @@ abstract class ServerConnection(val transport: SmtpServerTransport, private val when (this) { is SmtpCommandContext.Known -> logger.trace { "FROM ${transport.remote}: $command" } is SmtpCommandContext.Unknown -> { - logger.warn { "FROM ${transport.remote}: unknown!" } + logger.warn(cause) { "FROM ${transport.remote}: unknown!" } continuePropagation = false transport.send(SmtpReply.PermanentNegative.Default(code = 500)) close() diff --git a/mailserver/smtp-agent/src/commonMain/kotlin/submission/SubmissionServer.kt b/mailserver/smtp-agent/src/commonMain/kotlin/submission/SubmissionServer.kt index 74148d0..71541ee 100644 --- a/mailserver/smtp-agent/src/commonMain/kotlin/submission/SubmissionServer.kt +++ b/mailserver/smtp-agent/src/commonMain/kotlin/submission/SubmissionServer.kt @@ -4,10 +4,8 @@ import dev.sitar.kmail.agents.smtp.transfer.OutgoingMessageQueue import dev.sitar.kmail.agents.smtp.transports.server.SmtpServerTransport import dev.sitar.kmail.smtp.InternetMessage import dev.sitar.kmail.utils.server.ServerSocket +import kotlinx.coroutines.* import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.isActive -import kotlinx.coroutines.launch -import kotlinx.coroutines.supervisorScope import mu.KotlinLogging private val logger = KotlinLogging.logger { } @@ -17,9 +15,11 @@ class SubmissionServer( val config: SubmissionConfig, val outgoing: OutgoingMessageQueue ) { - suspend fun listen() { - supervisorScope { - while (isActive) { + suspend fun listen() = supervisorScope { + logger.debug { "Submission SMTP server is listening." } + + while (isActive) { + withContext(Dispatchers.IO) { val transport = SmtpServerTransport(socket.accept()) logger.debug { "Accepted a connection from ${transport.remote}." } diff --git a/mailserver/smtp-agent/src/commonMain/kotlin/transfer/OutgoingMessageQueue.kt b/mailserver/smtp-agent/src/commonMain/kotlin/transfer/OutgoingMessageQueue.kt index 62d8887..d91311b 100644 --- a/mailserver/smtp-agent/src/commonMain/kotlin/transfer/OutgoingMessageQueue.kt +++ b/mailserver/smtp-agent/src/commonMain/kotlin/transfer/OutgoingMessageQueue.kt @@ -30,8 +30,7 @@ class OutgoingMessageQueue { // TODO: we should store messages in transit (in case of failure/loss) until a new entry is received suspend fun collect(scope: CoroutineScope, block: suspend (OutgoingMessage) -> OutgoingMessage?) { - outgoing - .collect { + outgoing.collect { logger.debug { "queue selected $it" } scope.launch { diff --git a/mailserver/smtp-agent/src/commonMain/kotlin/transfer/TransferReceiveServer.kt b/mailserver/smtp-agent/src/commonMain/kotlin/transfer/TransferReceiveServer.kt index e55ce14..533d052 100644 --- a/mailserver/smtp-agent/src/commonMain/kotlin/transfer/TransferReceiveServer.kt +++ b/mailserver/smtp-agent/src/commonMain/kotlin/transfer/TransferReceiveServer.kt @@ -3,12 +3,10 @@ package dev.sitar.kmail.agents.smtp.transfer import dev.sitar.kmail.agents.smtp.transports.server.SmtpServerTransport import dev.sitar.kmail.smtp.InternetMessage import dev.sitar.kmail.utils.server.ServerSocket +import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.channels.ReceiveChannel import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.isActive -import kotlinx.coroutines.launch -import kotlinx.coroutines.supervisorScope import mu.KotlinLogging private val logger = KotlinLogging.logger { } @@ -18,9 +16,11 @@ class TransferReceiveServer( val config: TransferReceiveConfig, val incoming: MutableSharedFlow ) { - suspend fun listen() { - supervisorScope { - while (isActive) { + suspend fun listen() = supervisorScope { + logger.info("SMTP receive server is listening.") + + while (isActive) { + withContext(Dispatchers.IO) { val transport = SmtpServerTransport(socket.accept()) logger.debug { "Accepted a connection from ${transport.remote}." } diff --git a/mailserver/smtp-agent/src/commonMain/kotlin/transfer/TransferServer.kt b/mailserver/smtp-agent/src/commonMain/kotlin/transfer/TransferServer.kt index c77b02e..8a07bff 100644 --- a/mailserver/smtp-agent/src/commonMain/kotlin/transfer/TransferServer.kt +++ b/mailserver/smtp-agent/src/commonMain/kotlin/transfer/TransferServer.kt @@ -14,11 +14,11 @@ class TransferServer( val config: TransferConfig, val queue: OutgoingMessageQueue ) { - suspend fun handle() { - supervisorScope { - queue.collect(this) { - TransferSendAgent(config, it).transfer() - } + suspend fun handle() = supervisorScope { + logger.info("SMTP transfer server is listening.") + + queue.collect(this) { + TransferSendAgent(config, it).transfer() } } } \ No newline at end of file diff --git a/mailserver/smtp-agent/src/commonMain/kotlin/transports/server/SmtpCommandPipeline.kt b/mailserver/smtp-agent/src/commonMain/kotlin/transports/server/SmtpCommandPipeline.kt index dd7a4b6..3a1f47a 100644 --- a/mailserver/smtp-agent/src/commonMain/kotlin/transports/server/SmtpCommandPipeline.kt +++ b/mailserver/smtp-agent/src/commonMain/kotlin/transports/server/SmtpCommandPipeline.kt @@ -3,12 +3,13 @@ package dev.sitar.kmail.agents.smtp.transports.server import dev.sitar.keystone.Pipeline import dev.sitar.keystone.Stage import dev.sitar.kmail.smtp.SmtpCommand +import kotlin.Exception sealed interface SmtpCommandContext { var continuePropagation: Boolean data class Known(val command: SmtpCommand, override var continuePropagation: Boolean): SmtpCommandContext - class Unknown(override var continuePropagation: Boolean): SmtpCommandContext + class Unknown(val cause: Exception, override var continuePropagation: Boolean): SmtpCommandContext } class SmtpCommandPipeline: Pipeline(setOf(Logging, Global, Process), onFilter = { it.continuePropagation }) { diff --git a/mailserver/smtp-agent/src/commonMain/kotlin/transports/server/SmtpServerTransport.kt b/mailserver/smtp-agent/src/commonMain/kotlin/transports/server/SmtpServerTransport.kt index 5c14d91..a65b52e 100644 --- a/mailserver/smtp-agent/src/commonMain/kotlin/transports/server/SmtpServerTransport.kt +++ b/mailserver/smtp-agent/src/commonMain/kotlin/transports/server/SmtpServerTransport.kt @@ -48,7 +48,7 @@ class SmtpServerTransport(connection: Connection, coroutineContext: CoroutineCon val command = lock.withLock { reader.readSmtpCommand() } SmtpCommandContext.Known(command, true) } catch (e: Exception) { - SmtpCommandContext.Unknown(true) + SmtpCommandContext.Unknown(e, true) } commandPipeline.process(context) diff --git a/mailserver/utils/build.gradle.kts b/mailserver/utils/build.gradle.kts index 8ba5aa6..301b95f 100644 --- a/mailserver/utils/build.gradle.kts +++ b/mailserver/utils/build.gradle.kts @@ -17,4 +17,9 @@ kotlin { api("io.ktor:ktor-network:2.1.3") api("io.ktor:ktor-network-tls:2.1.3") } + + sourceSets["jvmMain"].dependencies { + implementation("io.netty:netty-all:4.1.101.Final") + + } } \ No newline at end of file diff --git a/mailserver/utils/src/commonMain/kotlin/ExceptionLoggingCoroutineExceptionHandler.kt b/mailserver/utils/src/commonMain/kotlin/ExceptionLoggingCoroutineExceptionHandler.kt new file mode 100644 index 0000000..e339b0f --- /dev/null +++ b/mailserver/utils/src/commonMain/kotlin/ExceptionLoggingCoroutineExceptionHandler.kt @@ -0,0 +1,8 @@ +package dev.sitar.kmail.utils + +import kotlinx.coroutines.CoroutineExceptionHandler +import mu.KLogger + +fun ExceptionLoggingCoroutineExceptionHandler(logger: KLogger): CoroutineExceptionHandler { + return CoroutineExceptionHandler { _, throwable -> logger.error(throwable) { "Kmail encountered the following exception." } } +} \ No newline at end of file