From 10f3028f06d58d81f6972d5c29fcf8f1dfc698a7 Mon Sep 17 00:00:00 2001 From: "Michael N. Lipp" Date: Wed, 30 Apr 2025 16:27:15 +0200 Subject: [PATCH 1/6] Increase concurrency and avoid race condition. --- .../vmoperator/manager/Controller.java | 3 +- .../jdrupes/vmoperator/manager/VmMonitor.java | 58 +++++++++++++------ 2 files changed, 42 insertions(+), 19 deletions(-) diff --git a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/Controller.java b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/Controller.java index c15acc5..ce14488 100644 --- a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/Controller.java +++ b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/Controller.java @@ -50,6 +50,7 @@ import org.jdrupes.vmoperator.manager.events.VmPoolChanged; import org.jdrupes.vmoperator.manager.events.VmResourceChanged; import org.jgrapes.core.Channel; import org.jgrapes.core.Component; +import org.jgrapes.core.EventPipeline; import org.jgrapes.core.annotation.Handler; import org.jgrapes.core.events.HandlingError; import org.jgrapes.core.events.Start; @@ -94,7 +95,7 @@ import org.jgrapes.util.events.ConfigurationUpdate; public class Controller extends Component { private String namespace; - private final ChannelManager chanMgr; + private final ChannelManager chanMgr; /** * Creates a new instance. diff --git a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/VmMonitor.java b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/VmMonitor.java index 09bade8..b667aa6 100644 --- a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/VmMonitor.java +++ b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/VmMonitor.java @@ -22,7 +22,6 @@ import com.google.gson.JsonObject; import io.kubernetes.client.apimachinery.GroupVersionKind; import io.kubernetes.client.custom.V1Patch; import io.kubernetes.client.openapi.ApiException; -import io.kubernetes.client.openapi.models.V1ObjectMeta; import io.kubernetes.client.util.Watch; import io.kubernetes.client.util.generic.options.ListOptions; import java.io.IOException; @@ -32,7 +31,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Optional; import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import org.jdrupes.vmoperator.common.Constants.Crd; import org.jdrupes.vmoperator.common.Constants.Status; @@ -57,16 +55,28 @@ import org.jdrupes.vmoperator.manager.events.VmResourceChanged; import org.jdrupes.vmoperator.util.GsonPtr; import org.jgrapes.core.Channel; import org.jgrapes.core.Event; +import org.jgrapes.core.EventPipeline; import org.jgrapes.core.annotation.Handler; /** - * Watches for changes of VM definitions. + * Watches for changes of VM definitions. When a VM definition (CR) + * becomes known, is is registered with a {@link ChannelManager} and thus + * gets an associated {@link VmChannel} and an associated + * {@link EventPipeline}. + * + * The {@link EventPipeline} is used for submitting an action that processes + * the change data from kubernetes, eventually transforming it to a + * {@link VmResourceChanged} event that is handled by another + * {@link EventPipeline} associated with the {@link VmChannel}. This + * event pipeline should be used for all events related to changes of + * a particular VM. */ @SuppressWarnings({ "PMD.DataflowAnomalyAnalysis", "PMD.ExcessiveImports" }) public class VmMonitor extends AbstractMonitor { - private final ChannelManager channelManager; + private final ChannelManager channelManager; /** * Instantiates a new VM definition watcher. @@ -75,7 +85,7 @@ public class VmMonitor extends * @param channelManager the channel manager */ public VmMonitor(Channel componentChannel, - ChannelManager channelManager) { + ChannelManager channelManager) { super(componentChannel, VmDefinition.class, VmDefinitions.class); this.channelManager = channelManager; @@ -122,14 +132,18 @@ public class VmMonitor extends @Override protected void handleChange(K8sClient client, Watch.Response response) { - V1ObjectMeta metadata = response.object.getMetadata(); - AtomicBoolean toBeAdded = new AtomicBoolean(false); - VmChannel channel = channelManager.channel(metadata.getName()) - .orElseGet(() -> { - toBeAdded.set(true); - return channelManager.createChannel(metadata.getName()); - }); + var name = response.object.getMetadata().getName(); + // Process the response data on a VM specific pipeline to + // increase concurrency when e.g. starting many VMs. + var preparing = channelManager.associated(name) + .orElseGet(() -> newEventPipeline()); + preparing.submit("VmChange[" + name + "]", + () -> processChange(client, response, preparing)); + } + + private void processChange(K8sClient client, + Watch.Response response, EventPipeline preparing) { // Get full definition and associate with channel as backup var vmDef = response.object; if (vmDef.data() == null) { @@ -137,6 +151,9 @@ public class VmMonitor extends // https://github.com/kubernetes-client/java/issues/3215 vmDef = getModel(client, vmDef); } + var name = response.object.getMetadata().getName(); + var channel = channelManager.channel(name) + .orElseGet(() -> channelManager.createChannel(name)); if (vmDef.data() != null) { // New data, augment and save addExtraData(vmDef, channel.vmDefinition()); @@ -150,9 +167,7 @@ public class VmMonitor extends + response.object.getMetadata()); return; } - if (toBeAdded.get()) { - channelManager.put(vmDef.name(), channel); - } + channelManager.put(name, channel, preparing); // Create and fire changed event. Remove channel from channel // manager on completion. @@ -199,9 +214,16 @@ public class VmMonitor extends @Handler public void onPodChanged(PodChanged event, VmChannel channel) { var vmDef = channel.vmDefinition(); - updateNodeInfo(event, vmDef); - channel - .fire(new VmResourceChanged(ResponseType.MODIFIED, vmDef, false, true)); + + // Make sure that this is properly sync'd with VM CR changes. + channelManager.associated(vmDef.name()) + .orElseGet(() -> activeEventPipeline()) + .submit("NodeInfo[" + vmDef.name() + "]", + () -> { + updateNodeInfo(event, vmDef); + channel.fire(new VmResourceChanged(ResponseType.MODIFIED, + vmDef, false, true)); + }); } private void updateNodeInfo(PodChanged event, VmDefinition vmDef) { From a5433c869bb5d25601060200cba2335bf250788a Mon Sep 17 00:00:00 2001 From: "Michael N. Lipp" Date: Sat, 3 May 2025 22:29:42 +0200 Subject: [PATCH 2/6] Upgrade webconsole base library. --- org.jdrupes.vmoperator.manager/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/org.jdrupes.vmoperator.manager/build.gradle b/org.jdrupes.vmoperator.manager/build.gradle index eda5ce0..4ce4ed0 100644 --- a/org.jdrupes.vmoperator.manager/build.gradle +++ b/org.jdrupes.vmoperator.manager/build.gradle @@ -17,7 +17,7 @@ dependencies { implementation 'org.jgrapes:org.jgrapes.io:[2.12.1,3)' implementation 'org.jgrapes:org.jgrapes.http:[3.5.0,4)' - implementation 'org.jgrapes:org.jgrapes.webconsole.base:[2.2.0,3)' + implementation 'org.jgrapes:org.jgrapes.webconsole.base:[2.3.0,3)' implementation 'org.jgrapes:org.jgrapes.webconsole.vuejs:[1.8.0,2)' implementation 'org.jgrapes:org.jgrapes.webconsole.rbac:[1.4.0,2)' implementation 'org.jgrapes:org.jgrapes.webconlet.oidclogin:[1.7.0,2)' From 76b579c404f4ea691bf5a00cde47ddb8d35b1342 Mon Sep 17 00:00:00 2001 From: "Michael N. Lipp" Date: Sun, 4 May 2025 11:36:55 +0200 Subject: [PATCH 3/6] Add key, allowing vue to optimize. --- .../org/jdrupes/vmoperator/vmmgmt/VmMgmt-view.ftl.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/org.jdrupes.vmoperator.vmmgmt/resources/org/jdrupes/vmoperator/vmmgmt/VmMgmt-view.ftl.html b/org.jdrupes.vmoperator.vmmgmt/resources/org/jdrupes/vmoperator/vmmgmt/VmMgmt-view.ftl.html index 5a28cb8..3197440 100644 --- a/org.jdrupes.vmoperator.vmmgmt/resources/org/jdrupes/vmoperator/vmmgmt/VmMgmt-view.ftl.html +++ b/org.jdrupes.vmoperator.vmmgmt/resources/org/jdrupes/vmoperator/vmmgmt/VmMgmt-view.ftl.html @@ -30,7 +30,7 @@ -