Fixed multithread chunk IO

This commit is contained in:
OLEGSHA 2021-08-28 21:14:35 +03:00
parent 98250cd524
commit a222ea8f67
Signed by: OLEGSHA
GPG Key ID: E57A4B08D64AFF7A
7 changed files with 191 additions and 104 deletions

View File

@ -25,8 +25,9 @@ import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet;
import java.util.Objects; import java.util.Objects;
import java.util.Set;
import glm.vec._3.i.Vec3i; import glm.vec._3.i.Vec3i;
import ru.windcorp.progressia.common.world.block.BlockData; import ru.windcorp.progressia.common.world.block.BlockData;
@ -49,7 +50,7 @@ public class DefaultChunkData implements ChunkData {
public boolean isEmpty = false; public boolean isEmpty = false;
public boolean isOpaque = false; public boolean isOpaque = false;
public static HashSet<BlockData> transparent; public static Set<BlockData> transparent = Collections.emptySet();
private final Vec3i position = new Vec3i(); private final Vec3i position = new Vec3i();
private final DefaultWorldData world; private final DefaultWorldData world;
@ -205,12 +206,9 @@ public class DefaultChunkData implements ChunkData {
this.generationHint = generationHint; this.generationHint = generationHint;
} }
public void computeOpaque() public void computeOpaque() {
{ for (int xyz = 0; xyz < BLOCKS_PER_CHUNK * BLOCKS_PER_CHUNK * BLOCKS_PER_CHUNK; xyz++) {
for (int xyz=0;xyz<BLOCKS_PER_CHUNK*BLOCKS_PER_CHUNK*BLOCKS_PER_CHUNK;xyz++) if (transparent.contains(blocks[xyz])) {
{
if (transparent.contains( blocks[xyz]))
{
isOpaque = false; isOpaque = false;
return; return;
} }
@ -218,18 +216,14 @@ public class DefaultChunkData implements ChunkData {
isOpaque = true; isOpaque = true;
} }
public boolean isOpaque() public boolean isOpaque() {
{
return isOpaque; return isOpaque;
} }
public void computeEmpty() public void computeEmpty() {
{
BlockData air = new BlockData("Test:Air"); BlockData air = new BlockData("Test:Air");
for (int xyz=0;xyz<BLOCKS_PER_CHUNK*BLOCKS_PER_CHUNK*BLOCKS_PER_CHUNK;xyz++) for (int xyz = 0; xyz < BLOCKS_PER_CHUNK * BLOCKS_PER_CHUNK * BLOCKS_PER_CHUNK; xyz++) {
{ if (blocks[xyz] != air) {
if (blocks[xyz] != air)
{
isEmpty = false; isEmpty = false;
return; return;
} }
@ -237,12 +231,10 @@ public class DefaultChunkData implements ChunkData {
isEmpty = true; isEmpty = true;
} }
public boolean isEmpty() public boolean isEmpty() {
{
return isEmpty; return isEmpty;
} }
/** /**
* Implementation of {@link TileDataStack} used internally by * Implementation of {@link TileDataStack} used internally by
* {@link DefaultChunkData} to * {@link DefaultChunkData} to

View File

@ -72,6 +72,9 @@ public class Server {
private final PlayerManager playerManager; private final PlayerManager playerManager;
private final LoadManager loadManager; private final LoadManager loadManager;
private final ChunkRequestDaemon chunkRequestDaemon;
private final EntityRequestDaemon entityRequestDaemon;
private final TaskQueue taskQueue = new TaskQueue(this::isServerThread); private final TaskQueue taskQueue = new TaskQueue(this::isServerThread);
private final EventBus eventBus = ReportingEventBus.create("ServerEvents"); private final EventBus eventBus = ReportingEventBus.create("ServerEvents");
@ -91,9 +94,12 @@ public class Server {
this.playerManager = new PlayerManager(this); this.playerManager = new PlayerManager(this);
this.loadManager = new LoadManager(this); this.loadManager = new LoadManager(this);
this.chunkRequestDaemon = new ChunkRequestDaemon(loadManager.getChunkManager());
this.entityRequestDaemon = new EntityRequestDaemon(loadManager.getEntityManager());
schedule(getClientManager()::processPackets); schedule(getClientManager()::processPackets);
schedule(new ChunkRequestDaemon(loadManager.getChunkManager())::tick); schedule(this.chunkRequestDaemon::tick);
schedule(new EntityRequestDaemon(loadManager.getEntityManager())::tick); schedule(this.entityRequestDaemon::tick);
// Must run after request daemons so it only schedules chunks that // Must run after request daemons so it only schedules chunks that
// hadn't unloaded // hadn't unloaded
@ -329,6 +335,7 @@ public class Server {
* operation or fails to start. * operation or fails to start.
*/ */
public void start() { public void start() {
this.chunkRequestDaemon.start();
this.serverThread.start(); this.serverThread.start();
} }
@ -349,6 +356,7 @@ public class Server {
public void shutdown(String message) { public void shutdown(String message) {
LogManager.getLogger().warn("Server.shutdown() is not yet implemented"); LogManager.getLogger().warn("Server.shutdown() is not yet implemented");
serverThread.stop(); serverThread.stop();
chunkRequestDaemon.stop();
} }
private void scheduleWorldTicks(Server server) { private void scheduleWorldTicks(Server server) {

View File

@ -34,7 +34,7 @@ public class ServerState {
} }
public static void startServer() { public static void startServer() {
Server server = new Server(new DefaultWorldData(), TestGenerationConfig.createGenerator()); Server server = new Server(new DefaultWorldData(), new TestGenerationConfig().getGenerator());
setInstance(server); setInstance(server);
server.start(); server.start();
} }

View File

@ -20,8 +20,9 @@ package ru.windcorp.progressia.server.management.load;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.Executors; import java.util.function.Consumer;
import glm.vec._3.i.Vec3i; import glm.vec._3.i.Vec3i;
import ru.windcorp.progressia.common.Units; import ru.windcorp.progressia.common.Units;
import ru.windcorp.progressia.common.world.generic.ChunkMap; import ru.windcorp.progressia.common.world.generic.ChunkMap;
@ -31,7 +32,8 @@ import ru.windcorp.progressia.common.world.generic.ChunkSets;
import ru.windcorp.progressia.server.Server; import ru.windcorp.progressia.server.Server;
/** /**
* Chunk request daemon gathers chunk requests from players (via {@link VisionManager}) and loads or unloads chunks appropriately. * Chunk request daemon gathers chunk requests from players (via
* {@link VisionManager}) and loads or unloads chunks appropriately.
*/ */
public class ChunkRequestDaemon { public class ChunkRequestDaemon {
@ -45,8 +47,6 @@ public class ChunkRequestDaemon {
private final ChunkSet toGenerate = ChunkSets.newHashSet(); private final ChunkSet toGenerate = ChunkSets.newHashSet();
private final ChunkSet toRequestUnload = ChunkSets.newHashSet(); private final ChunkSet toRequestUnload = ChunkSets.newHashSet();
private final ExecutorService executor = Executors.newSingleThreadExecutor();
private final Collection<Vec3i> buffer = new ArrayList<>(); private final Collection<Vec3i> buffer = new ArrayList<>();
private static class ChunkUnloadRequest { private static class ChunkUnloadRequest {
@ -75,16 +75,39 @@ public class ChunkRequestDaemon {
private final ChunkMap<ChunkUnloadRequest> unloadSchedule = ChunkMaps.newHashMap(); private final ChunkMap<ChunkUnloadRequest> unloadSchedule = ChunkMaps.newHashMap();
private Thread thread = null;
private final AtomicBoolean shouldRun = new AtomicBoolean(false);
public ChunkRequestDaemon(ChunkManager chunkManager) { public ChunkRequestDaemon(ChunkManager chunkManager) {
this.chunkManager = chunkManager; this.chunkManager = chunkManager;
this.loaded = getServer().getWorld().getData().getLoadedChunks(); this.loaded = getServer().getWorld().getData().getLoadedChunks();
} }
public void tick() { public synchronized void start() {
if (this.thread != null) {
throw new IllegalStateException("Already running");
}
this.thread = new Thread(this::runOffthread, getClass().getSimpleName());
this.shouldRun.set(true);
this.thread.start();
}
public synchronized void stop() {
this.shouldRun.set(false);
synchronized (this) {
notify();
}
}
public synchronized void tick() {
synchronized (getServer().getWorld().getData()) { synchronized (getServer().getWorld().getData()) {
synchronized (getServer().getPlayerManager().getMutex()) { synchronized (getServer().getPlayerManager().getMutex()) {
loadAndUnloadChunks(); loadAndUnloadChunks();
sendAndRevokeChunks(); sendAndRevokeChunks();
notify();
} }
} }
} }
@ -116,16 +139,11 @@ public class ChunkRequestDaemon {
} }
private void processLoadQueues() { private void processLoadQueues() {
toRequestUnload.forEach((pos) -> executor.submit(() -> scheduleUnload(pos))); toGenerate.forEach(getChunkManager()::loadOrGenerateChunk);
toRequestUnload.clear();
toLoad.forEach((pos) -> executor.submit(() -> getChunkManager().loadOrGenerateChunk(pos)));
toLoad.clear();
toGenerate.forEach((pos) -> executor.submit(() -> getChunkManager().loadOrGenerateChunk(pos)));
toGenerate.clear(); toGenerate.clear();
executor.submit(() -> unloadScheduledChunks()); toRequestUnload.forEach(this::scheduleUnload);
toRequestUnload.clear();
} }
private void scheduleUnload(Vec3i chunkPos) { private void scheduleUnload(Vec3i chunkPos) {
@ -140,25 +158,6 @@ public class ChunkRequestDaemon {
unloadSchedule.put(chunkPosCopy, new ChunkUnloadRequest(chunkPosCopy, unloadAt)); unloadSchedule.put(chunkPosCopy, new ChunkUnloadRequest(chunkPosCopy, unloadAt));
} }
private void unloadScheduledChunks() {
long now = System.currentTimeMillis();
for (Iterator<ChunkUnloadRequest> it = unloadSchedule.values().iterator(); it.hasNext();) {
ChunkUnloadRequest request = it.next();
if (request.getUnloadAt() < now) {
it.remove();
if (requested.contains(request.getChunkPos())) {
continue; // do not unload chunks that became requested
}
getChunkManager().unloadChunk(request.getChunkPos());
}
}
}
private void sendAndRevokeChunks() { private void sendAndRevokeChunks() {
getChunkManager().getLoadManager().getVisionManager().forEachVision(vision -> { getChunkManager().getLoadManager().getVisionManager().forEachVision(vision -> {
revokeChunks(vision); revokeChunks(vision);
@ -180,7 +179,8 @@ public class ChunkRequestDaemon {
buffer.add(chunk.getPosition()); buffer.add(chunk.getPosition());
}); });
if (buffer.isEmpty()) return; if (buffer.isEmpty())
return;
for (Vec3i chunkPos : buffer) { for (Vec3i chunkPos : buffer) {
getChunkManager().sendChunk(vision.getPlayer(), chunkPos); getChunkManager().sendChunk(vision.getPlayer(), chunkPos);
} }
@ -195,7 +195,8 @@ public class ChunkRequestDaemon {
buffer.add(new Vec3i(chunkPos)); buffer.add(new Vec3i(chunkPos));
}); });
if (buffer.isEmpty()) return; if (buffer.isEmpty())
return;
for (Vec3i chunkPos : buffer) { for (Vec3i chunkPos : buffer) {
getChunkManager().revokeChunk(vision.getPlayer(), chunkPos); getChunkManager().revokeChunk(vision.getPlayer(), chunkPos);
} }
@ -203,6 +204,82 @@ public class ChunkRequestDaemon {
buffer.clear(); buffer.clear();
} }
/*
* Off-thread activity
*/
private void runOffthread() {
while (true) {
processLoadQueue();
processUnloadQueue();
synchronized (this) {
try {
// We're not afraid of spurious wakeups
wait();
} catch (InterruptedException e) {
// Pretend nothing happened
}
if (!shouldRun.get()) {
return;
}
}
}
}
private void processQueueOffthread(ChunkSet queue, Consumer<Vec3i> action) {
while (true) {
synchronized (this) {
Iterator<Vec3i> iterator = toLoad.iterator();
if (!iterator.hasNext()) {
return;
}
Vec3i position = iterator.next();
iterator.remove();
action.accept(position);
}
}
}
private void processLoadQueue() {
processQueueOffthread(toLoad, getChunkManager()::loadOrGenerateChunk);
}
private void processUnloadQueue() {
long now = System.currentTimeMillis();
Collection<Vec3i> toUnload = null;
synchronized (this) {
for (Iterator<ChunkUnloadRequest> it = unloadSchedule.values().iterator(); it.hasNext();) {
ChunkUnloadRequest request = it.next();
if (request.getUnloadAt() < now) {
it.remove();
if (requested.contains(request.getChunkPos())) {
continue; // do not unload chunks that became requested
}
if (toUnload == null) {
toUnload = new ArrayList<>();
}
toUnload.add(request.getChunkPos());
}
}
}
if (toUnload == null) {
return;
}
toUnload.forEach(getChunkManager()::unloadChunk);
}
/** /**
* @return the minimum amount of time a chunk will spend in the unload queue * @return the minimum amount of time a chunk will spend in the unload queue
*/ */

View File

@ -34,7 +34,7 @@ import ru.windcorp.progressia.common.world.rels.RelFace;
import ru.windcorp.progressia.common.world.TileDataStack; import ru.windcorp.progressia.common.world.TileDataStack;
import ru.windcorp.progressia.common.world.generic.GenericChunks; import ru.windcorp.progressia.common.world.generic.GenericChunks;
import ru.windcorp.progressia.common.world.TileDataReference; import ru.windcorp.progressia.common.world.TileDataReference;
import ru.windcorp.progressia.server.Server; import ru.windcorp.progressia.server.ServerState;
import ru.windcorp.progressia.server.world.block.BlockLogic; import ru.windcorp.progressia.server.world.block.BlockLogic;
import ru.windcorp.progressia.server.world.block.BlockLogicRegistry; import ru.windcorp.progressia.server.world.block.BlockLogicRegistry;
import ru.windcorp.progressia.server.world.block.TickableBlock; import ru.windcorp.progressia.server.world.block.TickableBlock;
@ -226,7 +226,7 @@ public class DefaultChunkLogic implements ChunkLogic {
} }
private void tmp_generateTickLists() { private void tmp_generateTickLists() {
ServerWorldContextRO context = Server.getCurrentServer().createContext(getUp()); ServerWorldContextRO context = ServerState.getInstance().createContext(getUp());
GenericChunks.forEachBiC(blockInChunk -> { GenericChunks.forEachBiC(blockInChunk -> {

View File

@ -19,6 +19,7 @@
package ru.windcorp.progressia.server.world; package ru.windcorp.progressia.server.world;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -43,7 +44,7 @@ public class DefaultWorldLogic implements WorldLogic {
private final WorldGenerator generator; private final WorldGenerator generator;
private final Map<DefaultChunkData, DefaultChunkLogic> chunks = new HashMap<>(); private final Map<DefaultChunkData, DefaultChunkLogic> chunks = Collections.synchronizedMap(new HashMap<>());
private final Evaluation tickEntitiesTask = new TickEntitiesTask(); private final Evaluation tickEntitiesTask = new TickEntitiesTask();

View File

@ -49,9 +49,18 @@ public class TestGenerationConfig {
private static final float CURVATURE = Units.get("100 m"); private static final float CURVATURE = Units.get("100 m");
private static final float INNER_RADIUS = Units.get("200 m"); private static final float INNER_RADIUS = Units.get("200 m");
private static final Fields FIELDS = new Fields(SEED); private final Fields fields = new Fields(SEED);
private final Function<Server, WorldGenerator> generator;
public static Function<Server, WorldGenerator> createGenerator() { public TestGenerationConfig() {
this.generator = createGenerator();
}
public Function<Server, WorldGenerator> getGenerator() {
return generator;
}
private Function<Server, WorldGenerator> createGenerator() {
Planet planet = new Planet( Planet planet = new Planet(
((int) PLANET_RADIUS) / Coordinates.CHUNK_SIZE, ((int) PLANET_RADIUS) / Coordinates.CHUNK_SIZE,
@ -60,7 +69,7 @@ public class TestGenerationConfig {
INNER_RADIUS INNER_RADIUS
); );
TestHeightMap heightMap = new TestHeightMap(planet, planet.getRadius() / 4, FIELDS); TestHeightMap heightMap = new TestHeightMap(planet, planet.getRadius() / 4, fields);
FloatRangeMap<TerrainLayer> layers = new ArrayFloatRangeMap<>(); FloatRangeMap<TerrainLayer> layers = new ArrayFloatRangeMap<>();
registerTerrainLayers(layers); registerTerrainLayers(layers);
@ -72,11 +81,11 @@ public class TestGenerationConfig {
} }
private static void registerTerrainLayers(FloatRangeMap<TerrainLayer> layers) { private void registerTerrainLayers(FloatRangeMap<TerrainLayer> layers) {
BlockData dirt = BlockDataRegistry.getInstance().get("Test:Dirt"); BlockData dirt = BlockDataRegistry.getInstance().get("Test:Dirt");
BlockData air = BlockDataRegistry.getInstance().get("Test:Air"); BlockData air = BlockDataRegistry.getInstance().get("Test:Air");
SurfaceFloatField cliffs = FIELDS.get("Test:Cliff"); SurfaceFloatField cliffs = fields.get("Test:Cliff");
WorleyProceduralNoise.Builder<TerrainLayer> builder = WorleyProceduralNoise.builder(); WorleyProceduralNoise.Builder<TerrainLayer> builder = WorleyProceduralNoise.builder();
TestContent.ROCKS.getRocks().forEach(rock -> { TestContent.ROCKS.getRocks().forEach(rock -> {
@ -88,9 +97,9 @@ public class TestGenerationConfig {
} }
}, 1); }, 1);
}); });
SurfaceFloatField rockDepthOffsets = FIELDS.register( SurfaceFloatField rockDepthOffsets = fields.register(
"Test:RockDepthOffsets", "Test:RockDepthOffsets",
() -> tweak(FIELDS.primitive(), 40, 5) () -> tweak(fields.primitive(), 40, 5)
); );
RockLayer rockLayer = new RockLayer(builder.build(SEED), rockDepthOffsets); RockLayer rockLayer = new RockLayer(builder.build(SEED), rockDepthOffsets);
@ -105,28 +114,28 @@ public class TestGenerationConfig {
layers.put(4, Float.POSITIVE_INFINITY, rockLayer); layers.put(4, Float.POSITIVE_INFINITY, rockLayer);
} }
private static void registerFeatures(List<SurfaceFeature> features) { private void registerFeatures(List<SurfaceFeature> features) {
SurfaceFloatField forestiness = FIELDS.register( SurfaceFloatField forestiness = fields.register(
"Test:Forest", "Test:Forest",
() -> squash(scale(FIELDS.primitive(), 200), 5) () -> squash(scale(fields.primitive(), 200), 5)
); );
SurfaceFloatField grassiness = FIELDS.register( SurfaceFloatField grassiness = fields.register(
"Test:Grass", "Test:Grass",
f -> multiply( f -> multiply(
tweak(octaves(FIELDS.primitive(), 2, 2), 40, 0.5, 1.2), tweak(octaves(fields.primitive(), 2, 2), 40, 0.5, 1.2),
squash(tweak(FIELDS.get("Test:Forest", f), 1, -1, 1), 10), squash(tweak(fields.get("Test:Forest", f), 1, -1, 1), 10),
anti(squash(FIELDS.get("Test:Cliff", f), 10)) anti(squash(fields.get("Test:Cliff", f), 10))
) )
); );
Function<String, SurfaceFloatField> floweriness = flowerName -> FIELDS.register( Function<String, SurfaceFloatField> floweriness = flowerName -> fields.register(
"Test:Flower" + flowerName, "Test:Flower" + flowerName,
f -> multiply( f -> multiply(
selectPositive(squash(scale(octaves(FIELDS.primitive(), 2, 3), 100), 2), 1, 0.5), selectPositive(squash(scale(octaves(fields.primitive(), 2, 3), 100), 2), 1, 0.5),
tweak(FIELDS.get("Test:Forest", f), 1, -1, 1.1), tweak(fields.get("Test:Forest", f), 1, -1, 1.1),
anti(squash(FIELDS.get("Test:Cliff", f), 10)) anti(squash(fields.get("Test:Cliff", f), 10))
) )
); );