Extract network stack dispatching logic from terminal to other interface

This commit is contained in:
Shadowfacts 2021-02-17 22:09:44 -05:00
parent c6a5602ec1
commit f9196eea56
Signed by: shadowfacts
GPG Key ID: 94A5AB95422746E5
4 changed files with 109 additions and 63 deletions

View File

@ -45,7 +45,7 @@ abstract class DeviceBlockEntity(type: BlockEntityType<*>): BlockEntity(type),
private val arpTable = mutableMapOf<IPAddress, MACAddress>() private val arpTable = mutableMapOf<IPAddress, MACAddress>()
private val packetQueue = LinkedList<PendingPacket>() private val packetQueue = LinkedList<PendingPacket>()
protected var counter: Long = 0 var counter: Long = 0
override fun getIPAddress() = ipAddress override fun getIPAddress() = ipAddress
override fun getMACAddress() = macAddress override fun getMACAddress() = macAddress

View File

@ -3,7 +3,6 @@ package net.shadowfacts.phycon.network.block.terminal
import alexiil.mc.lib.attributes.item.GroupedItemInvView import alexiil.mc.lib.attributes.item.GroupedItemInvView
import alexiil.mc.lib.attributes.item.ItemStackCollections import alexiil.mc.lib.attributes.item.ItemStackCollections
import alexiil.mc.lib.attributes.item.ItemStackUtil import alexiil.mc.lib.attributes.item.ItemStackUtil
import it.unimi.dsi.fastutil.ints.Int2ObjectArrayMap
import net.fabricmc.fabric.api.block.entity.BlockEntityClientSerializable import net.fabricmc.fabric.api.block.entity.BlockEntityClientSerializable
import net.fabricmc.fabric.api.screenhandler.v1.ExtendedScreenHandlerFactory import net.fabricmc.fabric.api.screenhandler.v1.ExtendedScreenHandlerFactory
import net.minecraft.block.BlockState import net.minecraft.block.BlockState
@ -26,23 +25,26 @@ import net.shadowfacts.phycon.api.util.IPAddress
import net.shadowfacts.phycon.init.PhyBlockEntities import net.shadowfacts.phycon.init.PhyBlockEntities
import net.shadowfacts.phycon.network.DeviceBlockEntity import net.shadowfacts.phycon.network.DeviceBlockEntity
import net.shadowfacts.phycon.network.NetworkUtil import net.shadowfacts.phycon.network.NetworkUtil
import net.shadowfacts.phycon.network.component.ItemStackPacketHandler import net.shadowfacts.phycon.network.component.*
import net.shadowfacts.phycon.network.component.NetworkStackProvider
import net.shadowfacts.phycon.network.component.NetworkStackReceiver
import net.shadowfacts.phycon.network.component.handleItemStack
import net.shadowfacts.phycon.network.packet.* import net.shadowfacts.phycon.network.packet.*
import java.lang.ref.WeakReference import java.lang.ref.WeakReference
import java.util.* import java.util.*
import kotlin.math.min import kotlin.math.min
import kotlin.properties.Delegates
/** /**
* @author shadowfacts * @author shadowfacts
*/ */
class TerminalBlockEntity: DeviceBlockEntity(PhyBlockEntities.TERMINAL), InventoryChangedListener, BlockEntityClientSerializable, Tickable, ItemStackPacketHandler { class TerminalBlockEntity: DeviceBlockEntity(PhyBlockEntities.TERMINAL),
InventoryChangedListener,
BlockEntityClientSerializable,
Tickable,
ItemStackPacketHandler,
NetworkStackDispatcher<TerminalBlockEntity.PendingInsertion> {
companion object { companion object {
val LOCATE_REQUEST_TIMEOUT = 40 // ticks val LOCATE_REQUEST_TIMEOUT: Long = 40 // ticks
val INSERTION_TIMEOUT = 40 val INSERTION_TIMEOUT: Long = 40
} }
private val inventoryCache = mutableMapOf<IPAddress, GroupedItemInvView>() private val inventoryCache = mutableMapOf<IPAddress, GroupedItemInvView>()
@ -50,7 +52,8 @@ class TerminalBlockEntity: DeviceBlockEntity(PhyBlockEntities.TERMINAL), Invento
private val locateRequestQueue = LinkedList<StackLocateRequest>() private val locateRequestQueue = LinkedList<StackLocateRequest>()
private val pendingRequests = LinkedList<StackLocateRequest>() private val pendingRequests = LinkedList<StackLocateRequest>()
private val pendingInsertions = Int2ObjectArrayMap<PendingStackInsertion>() override val pendingInsertions = mutableListOf<PendingInsertion>()
override val dispatchStackTimeout = INSERTION_TIMEOUT
private var observers = 0 private var observers = 0
val cachedNetItems = ItemStackCollections.intMap() val cachedNetItems = ItemStackCollections.intMap()
@ -117,18 +120,6 @@ class TerminalBlockEntity: DeviceBlockEntity(PhyBlockEntities.TERMINAL), Invento
return remaining return remaining
} }
private fun handleCapacity(packet: CapacityPacket) {
val insertion = pendingInsertions.values.firstOrNull {
ItemStackUtil.areEqualIgnoreAmounts(packet.stack, it.stack)
}
if (insertion != null) {
insertion.results.add(packet.capacity to packet.stackReceiver)
if (insertion.isFinishable(counter)) {
finishInsertion(insertion)
}
}
}
private fun updateNetItems() { private fun updateNetItems() {
cachedNetItems.clear() cachedNetItems.clear()
for (inventory in inventoryCache.values) { for (inventory in inventoryCache.values) {
@ -150,20 +141,12 @@ class TerminalBlockEntity: DeviceBlockEntity(PhyBlockEntities.TERMINAL), Invento
for (slot in 0 until internalBuffer.size()) { for (slot in 0 until internalBuffer.size()) {
if (internalBuffer.getMode(slot) != TerminalBufferInventory.Mode.TO_NETWORK) continue if (internalBuffer.getMode(slot) != TerminalBufferInventory.Mode.TO_NETWORK) continue
if (slot in pendingInsertions) continue if (pendingInsertions.any { it.bufferSlot == slot }) continue
val stack = internalBuffer.getStack(slot) val stack = internalBuffer.getStack(slot)
pendingInsertions[slot] = PendingStackInsertion(slot, stack, counter) dispatchItemStack(stack) { insertion ->
sendPacket(CheckCapacityPacket(stack, ipAddress, IPAddress.BROADCAST)) insertion.bufferSlot = slot
} }
} }
private fun finishPendingInsertions() {
if (world!!.isClient) return
for (insertion in pendingInsertions.values) {
if (!insertion.isFinishable(counter)) continue
finishInsertion(insertion)
}
} }
private fun sendEnqueuedLocateRequests() { private fun sendEnqueuedLocateRequests() {
@ -202,7 +185,7 @@ class TerminalBlockEntity: DeviceBlockEntity(PhyBlockEntities.TERMINAL), Invento
sendEnqueuedLocateRequests() sendEnqueuedLocateRequests()
finishPendingRequests() finishPendingRequests()
beginInsertions() beginInsertions()
finishPendingInsertions() finishTimedOutPendingInsertions()
} }
if (observers > 0) { if (observers > 0) {
@ -257,24 +240,17 @@ class TerminalBlockEntity: DeviceBlockEntity(PhyBlockEntities.TERMINAL), Invento
} }
} }
private fun finishInsertion(insertion: PendingStackInsertion) { override fun createPendingInsertion(stack: ItemStack) = PendingInsertion(stack, counter)
pendingInsertions.remove(insertion.bufferSlot)
// todo: also sort results by interface priority override fun finishInsertion(insertion: PendingInsertion): ItemStack {
val sortedResults = insertion.results.sortedBy { it.first }.toMutableList() val remaining = super.finishInsertion(insertion)
val remaining = insertion.stack
while (!remaining.isEmpty && sortedResults.isNotEmpty()) {
val (capacity, receivingInterface) = sortedResults.removeAt(0)
if (capacity <= 0) continue
sendPacket(ItemStackPacket(remaining.copy(), ipAddress, receivingInterface.ipAddress))
// todo: the interface should confirm how much was actually inserted, in case of race condition
remaining.count -= capacity
}
internalBuffer.setStack(insertion.bufferSlot, remaining) internalBuffer.setStack(insertion.bufferSlot, remaining)
// as with extracting, we "know" the new amounts and so can update instantly without actually sending out packets // as with extracting, we "know" the new amounts and so can update instantly without actually sending out packets
updateNetItems() updateNetItems()
sync() sync()
return remaining
} }
override fun onInventoryChanged(inv: Inventory) { override fun onInventoryChanged(inv: Inventory) {
@ -327,6 +303,10 @@ class TerminalBlockEntity: DeviceBlockEntity(PhyBlockEntities.TERMINAL), Invento
fun netItemsChanged() fun netItemsChanged()
} }
class PendingInsertion(stack: ItemStack, timestamp: Long): NetworkStackDispatcher.PendingInsertion<PendingInsertion>(stack, timestamp) {
var bufferSlot by Delegates.notNull<Int>()
}
} }
data class StackLocateRequest( data class StackLocateRequest(
@ -342,17 +322,3 @@ data class StackLocateRequest(
return totalResultAmount >= amount || currentTimestamp - timestamp >= TerminalBlockEntity.LOCATE_REQUEST_TIMEOUT return totalResultAmount >= amount || currentTimestamp - timestamp >= TerminalBlockEntity.LOCATE_REQUEST_TIMEOUT
} }
} }
data class PendingStackInsertion(
val bufferSlot: Int,
val stack: ItemStack,
val timestamp: Long,
val results: MutableSet<Pair<Int, NetworkStackReceiver>> = mutableSetOf(),
) {
val totalCapacity: Int
get() = results.fold(0) { acc, (amount, _) -> acc + amount }
fun isFinishable(currentTimestamp: Long): Boolean {
return totalCapacity >= stack.count || currentTimestamp - timestamp >= TerminalBlockEntity.INSERTION_TIMEOUT
}
}

View File

@ -26,6 +26,6 @@ fun <BE> BE.handleItemStack(packet: ItemStackPacket) where BE: BlockEntity, BE:
val remainder = doHandleItemStack(packet) val remainder = doHandleItemStack(packet)
// if there are any items remaining, send them back to the source with incremented bounce count // if there are any items remaining, send them back to the source with incremented bounce count
if (!remainder.isEmpty) { if (!remainder.isEmpty) {
// sendToSingle(ItemStackPacket(remainder, packet.bounceCount + 1, macAddress, packet.source)) sendPacket(ItemStackPacket(remainder, packet.bounceCount + 1, ipAddress, packet.source))
} }
} }

View File

@ -0,0 +1,80 @@
package net.shadowfacts.phycon.network.component
import alexiil.mc.lib.attributes.item.ItemStackUtil
import net.minecraft.block.entity.BlockEntity
import net.minecraft.item.ItemStack
import net.shadowfacts.phycon.api.util.IPAddress
import net.shadowfacts.phycon.network.packet.CapacityPacket
import net.shadowfacts.phycon.network.packet.CheckCapacityPacket
import net.shadowfacts.phycon.network.packet.ItemStackPacket
/**
* @author shadowfacts
*/
interface NetworkStackDispatcher<Insertion: NetworkStackDispatcher.PendingInsertion<Insertion>>: ItemStackPacketHandler {
val counter: Long
val dispatchStackTimeout: Long
val pendingInsertions: MutableList<Insertion>
fun createPendingInsertion(stack: ItemStack): Insertion
fun dispatchItemStack(stack: ItemStack, modifyInsertion: ((Insertion) -> Unit)? = null) {
val insertion = createPendingInsertion(stack)
modifyInsertion?.invoke(insertion)
pendingInsertions.add(insertion)
sendPacket(CheckCapacityPacket(insertion.stack, ipAddress, IPAddress.BROADCAST))
}
fun handleCapacity(packet: CapacityPacket) {
val insertion = pendingInsertions.firstOrNull {
ItemStackUtil.areEqualIgnoreAmounts(packet.stack, it.stack)
}
if (insertion != null) {
insertion.results.add(packet.capacity to packet.stackReceiver)
if (insertion.isFinishable(this)) {
finishInsertion(insertion)
}
}
}
fun finishInsertion(insertion: Insertion): ItemStack {
pendingInsertions.remove(insertion)
// todo: also sort results by interface priority
val sortedResults = insertion.results.sortedBy { it.first }.toMutableList()
val remaining = insertion.stack
while (!remaining.isEmpty && sortedResults.isNotEmpty()) {
val (capacity, receivingInterface) = sortedResults.removeFirst()
if (capacity <= 0) continue
sendPacket(ItemStackPacket(remaining.copy(), ipAddress, receivingInterface.ipAddress))
// todo: the destination should confirm how much was actually inserted, in case of race condition
remaining.count -= capacity
}
return remaining
}
open class PendingInsertion<Self: PendingInsertion<Self>>(
val stack: ItemStack,
val timestamp: Long
) {
val results = mutableSetOf<Pair<Int, NetworkStackReceiver>>()
val totalCapacity: Int
get() = results.fold(0) { acc, (amount, _) -> acc + amount }
fun isFinishable(owner: NetworkStackDispatcher<Self>): Boolean {
return totalCapacity >= stack.count || owner.counter - timestamp >= owner.dispatchStackTimeout
}
}
}
fun <Self, Insertion: NetworkStackDispatcher.PendingInsertion<Insertion>> Self.finishTimedOutPendingInsertions() where Self: BlockEntity, Self: NetworkStackDispatcher<Insertion> {
if (world!!.isClient) return
for (insertion in pendingInsertions) {
if (!insertion.isFinishable(this)) continue
finishInsertion(insertion)
}
}