PhysicalConnectivity/src/main/kotlin/net/shadowfacts/phycon/component/NetworkStackDispatcher.kt

103 lines
3.8 KiB
Kotlin

package net.shadowfacts.phycon.component
import alexiil.mc.lib.attributes.item.ItemStackUtil
import net.minecraft.block.entity.BlockEntity
import net.minecraft.item.ItemStack
import net.shadowfacts.phycon.api.util.IPAddress
import net.shadowfacts.phycon.packet.CapacityPacket
import net.shadowfacts.phycon.packet.CheckCapacityPacket
import net.shadowfacts.phycon.packet.ItemStackPacket
import net.shadowfacts.phycon.util.copyWithCount
import kotlin.math.min
/**
* @author shadowfacts
*/
interface NetworkStackDispatcher<Insertion: NetworkStackDispatcher.PendingInsertion<Insertion>>: ItemStackPacketHandler {
val counter: Long
val dispatchStackTimeout: Long
val pendingInsertions: MutableList<Insertion>
fun createPendingInsertion(stack: ItemStack): Insertion
fun dispatchItemStack(stack: ItemStack, modifyInsertion: ((Insertion) -> Unit)? = null) {
val insertion = createPendingInsertion(stack)
modifyInsertion?.invoke(insertion)
pendingInsertions.add(insertion)
sendPacket(CheckCapacityPacket(insertion.stack, ipAddress, IPAddress.BROADCAST))
}
fun handleCapacity(packet: CapacityPacket) {
pendingInsertions.firstOrNull { insertion ->
ItemStackUtil.areEqualIgnoreAmounts(packet.stack, insertion.stack) &&
insertion.results.none { it.second.ipAddress == packet.source }
}?.also { insertion ->
insertion.results.add(packet.capacity to packet.stackReceiver)
if (insertion.isFinishable(this)) {
val remaining = finishInsertion(insertion)
if (!remaining.isEmpty) {
pendingInsertions.add(createPendingInsertion(remaining))
}
}
}
}
fun finishInsertion(insertion: Insertion): ItemStack {
pendingInsertions.remove(insertion)
val sortedResults = insertion.results.toMutableList()//.sortedBy { it.first }.toMutableList()
sortedResults.sortWith { a, b ->
// sort results first by receiver priority, and then by capacity
if (a.second.receiverPriority == b.second.receiverPriority) {
b.first - a.first
} else {
b.second.receiverPriority - a.second.receiverPriority
}
}
// copy the insertion stack so subclasses that override this method can still see the originally dispatched stack after the super call
val remaining = insertion.stack.copy()
while (!remaining.isEmpty && sortedResults.isNotEmpty()) {
val (capacity, receiver) = sortedResults.removeFirst()
if (capacity <= 0) continue
val sentCount = min(capacity, remaining.count)
val copy = remaining.copyWithCount(sentCount)
sendPacket(ItemStackPacket(copy, ipAddress, receiver.ipAddress))
// todo: the destination should confirm how much was actually inserted, in case of race condition
remaining.count -= sentCount
}
return remaining
}
open class PendingInsertion<Self: PendingInsertion<Self>>(
var stack: ItemStack,
val timestamp: Long
) {
val results = mutableSetOf<Pair<Int, NetworkStackReceiver>>()
val totalCapacity: Int
get() = results.fold(0) { acc, (amount, _) -> acc + amount }
fun isFinishable(owner: NetworkStackDispatcher<Self>): Boolean {
// can't check totalCapacity >= stack.count because we need to wait for all responses to correctly sort by priority
return owner.counter - timestamp >= owner.dispatchStackTimeout
}
}
}
fun <Self, Insertion: NetworkStackDispatcher.PendingInsertion<Insertion>> Self.finishTimedOutPendingInsertions() where Self: BlockEntity, Self: NetworkStackDispatcher<Insertion> {
if (world!!.isClient) return
if (pendingInsertions.isEmpty()) return
val finishable = pendingInsertions.filter { it.isFinishable(this) }
// finishInsertion removes the object from pendingInsertions
for (insertion in finishable) {
val remaining = finishInsertion(insertion)
if (!remaining.isEmpty) {
pendingInsertions.add(createPendingInsertion(remaining))
}
}
// todo: if a timed-out insertion can't be finished, we should probably retry after some time (exponential backoff?)
}