refactor: unify db refresh flows

This commit is contained in:
Codex
2026-05-14 20:45:24 +03:00
parent f24eae8209
commit f6c6ed59f3
32 changed files with 882 additions and 408 deletions
@@ -8,6 +8,9 @@ import kotlinx.coroutines.flow.Flow
interface ConvoUseCase : BaseUseCase {
suspend fun storeConvos(convos: List<VkConvo>)
suspend fun getLocalConvos(): List<VkConvo>
suspend fun getLocalConvoById(peerId: Long): VkConvo?
suspend fun deleteLocalConvo(peerId: Long)
fun getConvos(
count: Int? = null,
@@ -19,6 +19,18 @@ class ConvoUseCaseImpl(
repository.storeConvos(convos)
}
override suspend fun getLocalConvos(): List<VkConvo> = withContext(Dispatchers.IO) {
repository.getLocalConvos()
}
override suspend fun getLocalConvoById(peerId: Long): VkConvo? = withContext(Dispatchers.IO) {
repository.getLocalConvoById(peerId)
}
override suspend fun deleteLocalConvo(peerId: Long) = withContext(Dispatchers.IO) {
repository.deleteLocalConvo(peerId)
}
override fun getConvos(
count: Int?,
offset: Int?,
@@ -66,6 +66,14 @@ class LongPollUpdatesParser(
eventDispatcher.registerListener(LongPollEvent.MESSAGE_RESTORED, assembleEventCallback(block))
}
fun onMessageUpdated(block: (LongPollParsedEvent.MessageUpdated) -> Unit) {
eventDispatcher.registerListener(LongPollEvent.MESSAGE_UPDATED, assembleEventCallback(block))
}
fun onMessageCacheClear(block: (LongPollParsedEvent.MessageCacheClear) -> Unit) {
eventDispatcher.registerListener(LongPollEvent.MESSAGE_CACHE_CLEAR, assembleEventCallback(block))
}
fun onNewMessage(block: (LongPollParsedEvent.NewMessage) -> Unit) {
eventDispatcher.registerListener(LongPollEvent.MESSAGE_NEW, assembleEventCallback(block))
}
@@ -0,0 +1,213 @@
package dev.meloda.fast.domain
import android.util.Log
import dev.meloda.fast.model.LongPollParsedEvent
import dev.meloda.fast.model.api.domain.VkMessage
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.launch
class LongPollUpdatesReducer(
updatesParser: LongPollUpdatesParser,
private val messagesUseCase: MessagesUseCase,
private val convoUseCase: ConvoUseCase
) {
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
private val _events = MutableSharedFlow<LongPollParsedEvent>(extraBufferCapacity = 256)
val events: SharedFlow<LongPollParsedEvent> = _events.asSharedFlow()
val newMessages = events.filterIsInstance<LongPollParsedEvent.NewMessage>()
val messageEdited = events.filterIsInstance<LongPollParsedEvent.MessageEdited>()
val messageIncomingRead = events.filterIsInstance<LongPollParsedEvent.IncomingMessageRead>()
val messageOutgoingRead = events.filterIsInstance<LongPollParsedEvent.OutgoingMessageRead>()
val messageDeleted = events.filterIsInstance<LongPollParsedEvent.MessageDeleted>()
val messageRestored = events.filterIsInstance<LongPollParsedEvent.MessageRestored>()
val messageMarkedAsImportant = events.filterIsInstance<LongPollParsedEvent.MessageMarkedAsImportant>()
val messageMarkedAsSpam = events.filterIsInstance<LongPollParsedEvent.MessageMarkedAsSpam>()
val messageMarkedAsNotSpam = events.filterIsInstance<LongPollParsedEvent.MessageMarkedAsNotSpam>()
val interactions = events.filterIsInstance<LongPollParsedEvent.Interaction>()
val chatMajorChanged = events.filterIsInstance<LongPollParsedEvent.ChatMajorChanged>()
val chatMinorChanged = events.filterIsInstance<LongPollParsedEvent.ChatMinorChanged>()
val chatCleared = events.filterIsInstance<LongPollParsedEvent.ChatCleared>()
val chatArchived = events.filterIsInstance<LongPollParsedEvent.ChatArchived>()
val messageUpdated = events.filterIsInstance<LongPollParsedEvent.MessageUpdated>()
val messageCacheClear = events.filterIsInstance<LongPollParsedEvent.MessageCacheClear>()
init {
updatesParser.onNewMessage { publish(it) }
updatesParser.onMessageEdited { publish(it) }
updatesParser.onMessageIncomingRead { publish(it) }
updatesParser.onMessageOutgoingRead { publish(it) }
updatesParser.onMessageDeleted { publish(it) }
updatesParser.onMessageRestored { publish(it) }
updatesParser.onMessageUpdated { publish(it) }
updatesParser.onMessageCacheClear { publish(it) }
updatesParser.onMessageMarkedAsImportant { publish(it) }
updatesParser.onMessageMarkedAsSpam { publish(it) }
updatesParser.onMessageMarkedAsNotSpam { publish(it) }
updatesParser.onInteractions { publish(it) }
updatesParser.onChatMajorChanged { publish(it) }
updatesParser.onChatMinorChanged { publish(it) }
updatesParser.onChatCleared { publish(it) }
updatesParser.onChatArchived { publish(it) }
}
private fun publish(event: LongPollParsedEvent) {
scope.launch {
runCatching { applyCommon(event) }
.onFailure { throwable ->
Log.e("LongPollUpdatesReducer", "applyCommon failed: $event", throwable)
}
_events.emit(event)
}
}
private suspend fun applyCommon(event: LongPollParsedEvent) {
when (event) {
is LongPollParsedEvent.NewMessage -> {
messagesUseCase.storeMessages(listOf(event.message))
updateConvoForMessage(event.message, unreadIncrement = if (event.message.isOut) 0 else 1)
}
is LongPollParsedEvent.MessageEdited -> {
messagesUseCase.storeMessage(event.message)
}
is LongPollParsedEvent.MessageUpdated -> {
messagesUseCase.storeMessage(event.message)
}
is LongPollParsedEvent.MessageCacheClear -> {
messagesUseCase.storeMessage(event.message)
}
is LongPollParsedEvent.IncomingMessageRead -> {
updateConvoReadState(
peerId = event.peerId,
inReadCmId = event.cmId,
unreadCount = event.unreadCount
)
}
is LongPollParsedEvent.OutgoingMessageRead -> {
updateConvoReadState(
peerId = event.peerId,
outReadCmId = event.cmId,
unreadCount = event.unreadCount
)
}
is LongPollParsedEvent.MessageDeleted -> {
val message = messagesUseCase.getLocalMessageByConvoMessageId(
convoId = event.peerId,
cmId = event.cmId
)
if (message != null) {
messagesUseCase.deleteLocalMessages(listOf(message.id))
}
}
is LongPollParsedEvent.MessageRestored -> {
messagesUseCase.storeMessage(event.message)
}
is LongPollParsedEvent.MessageMarkedAsImportant -> {
val message = messagesUseCase.getLocalMessageByConvoMessageId(
convoId = event.peerId,
cmId = event.cmId
) ?: return
messagesUseCase.storeMessage(message.copy(isImportant = event.marked))
}
is LongPollParsedEvent.MessageMarkedAsSpam -> {
val message = messagesUseCase.getLocalMessageByConvoMessageId(
convoId = event.peerId,
cmId = event.cmId
)
if (message != null) {
messagesUseCase.deleteLocalMessages(listOf(message.id))
}
}
is LongPollParsedEvent.MessageMarkedAsNotSpam -> {
messagesUseCase.storeMessage(event.message)
}
is LongPollParsedEvent.ChatMajorChanged -> {
updateConvoSortState(event.peerId, majorId = event.majorId)
}
is LongPollParsedEvent.ChatMinorChanged -> {
updateConvoSortState(event.peerId, minorId = event.minorId)
}
is LongPollParsedEvent.ChatCleared -> {
convoUseCase.deleteLocalConvo(event.peerId)
}
is LongPollParsedEvent.ChatArchived -> {
event.convo.lastMessage?.let(messagesUseCase::storeMessage)
convoUseCase.storeConvos(listOf(event.convo.copy(isArchived = event.archived)))
}
is LongPollParsedEvent.Interaction -> Unit
}
}
private suspend fun updateConvoReadState(
peerId: Long,
inReadCmId: Long? = null,
outReadCmId: Long? = null,
unreadCount: Int
) {
val convo = convoUseCase.getLocalConvoById(peerId) ?: return
convoUseCase.storeConvos(
listOf(
convo.copy(
inReadCmId = inReadCmId ?: convo.inReadCmId,
outReadCmId = outReadCmId ?: convo.outReadCmId,
unreadCount = unreadCount
)
)
)
}
private suspend fun updateConvoSortState(
peerId: Long,
majorId: Int? = null,
minorId: Int? = null
) {
val convo = convoUseCase.getLocalConvoById(peerId) ?: return
convoUseCase.storeConvos(
listOf(
convo.copy(
majorId = majorId ?: convo.majorId,
minorId = minorId ?: convo.minorId
)
)
)
}
private suspend fun updateConvoForMessage(
message: VkMessage,
unreadIncrement: Int
) {
val convo = convoUseCase.getLocalConvoById(message.peerId) ?: return
convoUseCase.storeConvos(
listOf(
convo.copy(
lastMessageId = message.id,
lastCmId = message.cmId,
unreadCount = convo.unreadCount + unreadIncrement
)
)
)
}
}
@@ -1,6 +1,7 @@
package dev.meloda.fast.domain
import dev.meloda.fast.data.State
import dev.meloda.fast.model.api.data.LongPollHistoryResponse
import dev.meloda.fast.model.api.data.LongPollUpdates
import dev.meloda.fast.model.api.data.VkLongPollData
import kotlinx.coroutines.flow.Flow
@@ -21,4 +22,14 @@ interface LongPollUseCase {
mode: Int,
version: Int
): Flow<State<LongPollUpdates>>
fun getLongPollHistory(
ts: Int,
pts: Int,
lpVersion: Int,
lastN: Int? = null,
maxMsgId: Long? = null,
eventsLimit: Int? = null,
msgsLimit: Int? = null
): Flow<State<LongPollHistoryResponse>>
}
@@ -3,6 +3,7 @@ package dev.meloda.fast.domain
import dev.meloda.fast.data.State
import dev.meloda.fast.data.api.longpoll.LongPollRepository
import dev.meloda.fast.data.mapToState
import dev.meloda.fast.model.api.data.LongPollHistoryResponse
import dev.meloda.fast.model.api.data.LongPollUpdates
import dev.meloda.fast.model.api.data.VkLongPollData
import kotlinx.coroutines.flow.Flow
@@ -48,4 +49,27 @@ class LongPollUseCaseImpl(
).mapToState()
emit(newState)
}
override fun getLongPollHistory(
ts: Int,
pts: Int,
lpVersion: Int,
lastN: Int?,
maxMsgId: Long?,
eventsLimit: Int?,
msgsLimit: Int?
): Flow<State<LongPollHistoryResponse>> = flow {
emit(State.Loading)
val newState = repository.getLongPollHistory(
ts = ts,
pts = pts,
lpVersion = lpVersion,
lastN = lastN,
maxMsgId = maxMsgId,
eventsLimit = eventsLimit,
msgsLimit = msgsLimit
).mapToState()
emit(newState)
}
}
@@ -12,6 +12,11 @@ interface MessagesUseCase : BaseUseCase {
suspend fun storeMessage(message: VkMessage)
suspend fun storeMessages(messages: List<VkMessage>)
suspend fun getLocalMessages(convoId: Long): List<VkMessage>
suspend fun getLocalMessageById(messageId: Long): VkMessage?
suspend fun getLocalMessageByConvoMessageId(convoId: Long, cmId: Long): VkMessage?
suspend fun getLocalMaxMessageId(): Long?
suspend fun deleteLocalMessages(messageIds: List<Long>)
fun getMessagesHistory(
convoId: Long,
@@ -22,6 +22,26 @@ class MessagesUseCaseImpl(
repository.storeMessages(messages)
}
override suspend fun getLocalMessages(convoId: Long): List<VkMessage> {
return repository.getLocalMessages(convoId)
}
override suspend fun getLocalMessageById(messageId: Long): VkMessage? {
return repository.getLocalMessageById(messageId)
}
override suspend fun getLocalMessageByConvoMessageId(convoId: Long, cmId: Long): VkMessage? {
return repository.getLocalMessageByConvoMessageId(convoId, cmId)
}
override suspend fun getLocalMaxMessageId(): Long? {
return repository.getLocalMaxMessageId()
}
override suspend fun deleteLocalMessages(messageIds: List<Long>) {
repository.deleteLocalMessages(messageIds)
}
override fun getMessagesHistory(
convoId: Long,
count: Int?,