Deliver events on dedicated pipeline.

This commit is contained in:
Michael Lipp 2024-11-23 12:54:29 +01:00
parent eabb2d9cf0
commit 5efef2a083

View file

@ -43,10 +43,14 @@ import org.jdrupes.vmoperator.manager.events.VmDefChanged;
import org.jdrupes.vmoperator.manager.events.VmPoolChanged;
import org.jdrupes.vmoperator.util.GsonPtr;
import org.jgrapes.core.Channel;
import org.jgrapes.core.EventPipeline;
import org.jgrapes.core.annotation.Handler;
import org.jgrapes.core.events.Attached;
/**
* Watches for changes of VM pools.
* Watches for changes of VM pools. Reports the changes using
* {@link VmPoolChanged} events fired on a special pipeline to
* avoid concurrent change informations.
*/
@SuppressWarnings({ "PMD.DataflowAnomalyAnalysis", "PMD.ExcessiveImports" })
public class PoolManager extends
@ -55,6 +59,7 @@ public class PoolManager extends
private final ReentrantLock pendingLock = new ReentrantLock();
private final Map<String, Set<String>> pending = new ConcurrentHashMap<>();
private final Map<String, VmPool> pools = new ConcurrentHashMap<>();
private EventPipeline poolPipeline;
/**
* Instantiates a new VM pool manager.
@ -67,6 +72,19 @@ public class PoolManager extends
K8sDynamicModels.class);
}
/**
* On attached.
*
* @param event the event
*/
@Handler
@SuppressWarnings("PMD.CompareObjectsWithEquals")
public void onAttached(Attached event) {
if (event.node() == this) {
poolPipeline = newEventPipeline();
}
}
@Override
protected void prepareMonitoring() throws IOException, ApiException {
client(new K8sClient());
@ -96,7 +114,7 @@ public class PoolManager extends
pending.computeIfAbsent(poolName, k -> Collections
.synchronizedSet(new HashSet<>())).addAll(p.vms());
pools.remove(poolName);
fire(new VmPoolChanged(p, true));
poolPipeline.fire(new VmPoolChanged(p, true));
});
} finally {
pendingLock.unlock();
@ -138,7 +156,7 @@ public class PoolManager extends
});
pending.remove(poolName);
pools.put(poolName, vmPool);
fire(new VmPoolChanged(vmPool));
poolPipeline.fire(new VmPoolChanged(vmPool));
} finally {
pendingLock.unlock();
}
@ -164,7 +182,7 @@ public class PoolManager extends
pending.computeIfAbsent(p, k -> Collections
.synchronizedSet(new HashSet<>())).add(vmName);
}
fire(new VmPoolChanged(pools.get(p)));
poolPipeline.fire(new VmPoolChanged(pools.get(p)));
});
} finally {
pendingLock.unlock();
@ -175,7 +193,7 @@ public class PoolManager extends
pendingLock.lock();
pools.values().stream().forEach(p -> {
if (p.vms().remove(vmName)) {
fire(new VmPoolChanged(p));
poolPipeline.fire(new VmPoolChanged(p));
}
});
// Should not be necessary, but just in case