Reorganize handlers.

This commit is contained in:
Michael Lipp 2025-03-20 18:29:45 +01:00
parent 359b1fdb84
commit fe1d56517b
2 changed files with 166 additions and 158 deletions

View file

@ -20,29 +20,33 @@ package org.jdrupes.vmoperator.manager;
import com.google.gson.JsonObject; import com.google.gson.JsonObject;
import io.kubernetes.client.apimachinery.GroupVersionKind; import io.kubernetes.client.apimachinery.GroupVersionKind;
import io.kubernetes.client.custom.V1Patch;
import io.kubernetes.client.openapi.ApiException; import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.Configuration; import io.kubernetes.client.openapi.Configuration;
import java.io.IOException; import java.io.IOException;
import java.net.HttpURLConnection;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.time.Instant; import java.time.Instant;
import java.util.Comparator;
import java.util.Optional;
import java.util.logging.Level; import java.util.logging.Level;
import org.jdrupes.vmoperator.common.Constants.Crd; import org.jdrupes.vmoperator.common.Constants.Crd;
import org.jdrupes.vmoperator.common.Constants.Status; import org.jdrupes.vmoperator.common.Constants.Status;
import org.jdrupes.vmoperator.common.K8sClient; import org.jdrupes.vmoperator.common.K8sClient;
import org.jdrupes.vmoperator.common.K8sDynamicStub;
import org.jdrupes.vmoperator.common.K8sObserver.ResponseType; import org.jdrupes.vmoperator.common.K8sObserver.ResponseType;
import org.jdrupes.vmoperator.common.VmDefinition.Assignment;
import org.jdrupes.vmoperator.common.VmDefinitionStub; import org.jdrupes.vmoperator.common.VmDefinitionStub;
import org.jdrupes.vmoperator.common.VmPool;
import org.jdrupes.vmoperator.manager.events.AssignVm;
import org.jdrupes.vmoperator.manager.events.ChannelManager; import org.jdrupes.vmoperator.manager.events.ChannelManager;
import org.jdrupes.vmoperator.manager.events.Exit; import org.jdrupes.vmoperator.manager.events.Exit;
import org.jdrupes.vmoperator.manager.events.GetPools;
import org.jdrupes.vmoperator.manager.events.GetVms;
import org.jdrupes.vmoperator.manager.events.GetVms.VmData;
import org.jdrupes.vmoperator.manager.events.ModifyVm; import org.jdrupes.vmoperator.manager.events.ModifyVm;
import org.jdrupes.vmoperator.manager.events.PodChanged; import org.jdrupes.vmoperator.manager.events.PodChanged;
import org.jdrupes.vmoperator.manager.events.UpdateAssignment; import org.jdrupes.vmoperator.manager.events.UpdateAssignment;
import org.jdrupes.vmoperator.manager.events.VmChannel; import org.jdrupes.vmoperator.manager.events.VmChannel;
import org.jdrupes.vmoperator.manager.events.VmDefChanged; import org.jdrupes.vmoperator.manager.events.VmDefChanged;
import org.jdrupes.vmoperator.util.GsonPtr;
import org.jgrapes.core.Channel; import org.jgrapes.core.Channel;
import org.jgrapes.core.Component; import org.jgrapes.core.Component;
import org.jgrapes.core.annotation.Handler; import org.jgrapes.core.annotation.Handler;
@ -89,6 +93,7 @@ import org.jgrapes.util.events.ConfigurationUpdate;
public class Controller extends Component { public class Controller extends Component {
private String namespace; private String namespace;
private final ChannelManager<String, VmChannel, ?> chanMgr;
/** /**
* Creates a new instance. * Creates a new instance.
@ -97,8 +102,7 @@ public class Controller extends Component {
public Controller(Channel componentChannel) { public Controller(Channel componentChannel) {
super(componentChannel); super(componentChannel);
// Prepare component tree // Prepare component tree
ChannelManager<String, VmChannel, ?> chanMgr chanMgr = new ChannelManager<>(name -> {
= new ChannelManager<>(name -> {
try { try {
return new VmChannel(channel(), newEventPipeline(), return new VmChannel(channel(), newEventPipeline(),
new K8sClient()); new K8sClient());
@ -181,76 +185,102 @@ public class Controller extends Component {
} }
/** /**
* On modify vm. * Returns the VM data.
* *
* @param event the event * @param event the event
* @throws ApiException the api exception
* @throws IOException Signals that an I/O exception has occurred.
*/ */
@Handler @Handler
public void onModifyVm(ModifyVm event, VmChannel channel) public void onGetVms(GetVms event) {
throws ApiException, IOException { event.setResult(chanMgr.channels().stream()
patchVmDef(channel.client(), event.name(), "spec/vm/" + event.path(), .filter(c -> event.name().isEmpty()
event.value()); || c.vmDefinition().name().equals(event.name().get()))
} .filter(c -> event.user().isEmpty() && event.roles().isEmpty()
|| !c.vmDefinition().permissionsFor(event.user().orElse(null),
private void patchVmDef(K8sClient client, String name, String path, event.roles()).isEmpty())
Object value) throws ApiException, IOException { .filter(c -> event.fromPool().isEmpty()
var vmStub = K8sDynamicStub.get(client, || c.vmDefinition().assignment().map(Assignment::pool)
new GroupVersionKind(Crd.GROUP, "", Crd.KIND_VM), namespace, .map(p -> p.equals(event.fromPool().get())).orElse(false))
name); .filter(c -> event.toUser().isEmpty()
|| c.vmDefinition().assignment().map(Assignment::user)
// Patch running .map(u -> u.equals(event.toUser().get())).orElse(false))
String valueAsText = value instanceof String .map(c -> new VmData(c.vmDefinition(), c))
? "\"" + value + "\"" .toList());
: value.toString();
var res = vmStub.patch(V1Patch.PATCH_FORMAT_JSON_PATCH,
new V1Patch("[{\"op\": \"replace\", \"path\": \"/"
+ path + "\", \"value\": " + valueAsText + "}]"),
client.defaultPatchOptions());
if (!res.isPresent()) {
logger.warning(
() -> "Cannot patch definition for Vm " + vmStub.name());
}
} }
/** /**
* Attempt to Update the assignment information in the status of the * Assign a VM if not already assigned.
* VM CR. Returns true if successful. The handler does not attempt
* retries, because in case of failure it will be necessary to
* re-evaluate the chosen VM.
* *
* @param event the event * @param event the event
* @param channel the channel
* @throws ApiException the api exception * @throws ApiException the api exception
* @throws InterruptedException
*/ */
@Handler @Handler
public void onUpdatedAssignment(UpdateAssignment event, VmChannel channel) @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
throws ApiException { public void onAssignVm(AssignVm event)
try { throws ApiException, InterruptedException {
var vmDef = channel.vmDefinition(); while (true) {
var vmStub = VmDefinitionStub.get(channel.client(), // Search for existing assignment.
new GroupVersionKind(Crd.GROUP, "", Crd.KIND_VM), var vmQuery = chanMgr.channels().stream()
vmDef.namespace(), vmDef.name()); .filter(c -> c.vmDefinition().assignment().map(Assignment::pool)
if (vmStub.updateStatus(vmDef, from -> { .map(p -> p.equals(event.fromPool())).orElse(false))
JsonObject status = from.statusJson(); .filter(c -> c.vmDefinition().assignment().map(Assignment::user)
var assignment = GsonPtr.to(status).to(Status.ASSIGNMENT); .map(u -> u.equals(event.toUser())).orElse(false))
assignment.set("pool", event.fromPool().name()); .findFirst();
assignment.set("user", event.toUser()); if (vmQuery.isPresent()) {
assignment.set("lastUsed", Instant.now().toString()); var vmDef = vmQuery.get().vmDefinition();
return status; event.setResult(new VmData(vmDef, vmQuery.get()));
}).isPresent()) { return;
event.setResult(true);
} }
} catch (ApiException e) {
// Log exceptions except for conflict, which can be expected // Get the pool definition for checking possible assignment
if (HttpURLConnection.HTTP_CONFLICT != e.getCode()) { VmPool vmPool = newEventPipeline().fire(new GetPools()
throw e; .withName(event.fromPool())).get().stream().findFirst()
.orElse(null);
if (vmPool == null) {
return;
}
// Find available VM.
vmQuery = chanMgr.channels().stream()
.filter(c -> vmPool.isAssignable(c.vmDefinition()))
.sorted(Comparator.comparing((VmChannel c) -> c.vmDefinition()
.assignment().map(Assignment::lastUsed)
.orElse(Instant.ofEpochSecond(0)))
.thenComparing(preferRunning))
.findFirst();
// None found
if (vmQuery.isEmpty()) {
return;
}
// Assign to user
var chosenVm = vmQuery.get();
if (Optional.ofNullable(chosenVm.fire(new UpdateAssignment(
vmPool, event.toUser())).get()).orElse(false)) {
var vmDef = chosenVm.vmDefinition();
event.setResult(new VmData(vmDef, chosenVm));
// Make sure that a newly assigned VM is running.
chosenVm.fire(new ModifyVm(vmDef.name(), "state", "Running"));
return;
} }
} }
event.setResult(false);
} }
private static Comparator<VmChannel> preferRunning
= new Comparator<>() {
@Override
public int compare(VmChannel ch1, VmChannel ch2) {
if (ch1.vmDefinition().conditionStatus("Running").orElse(false)
&& !ch2.vmDefinition().conditionStatus("Running")
.orElse(false)) {
return -1;
}
return 0;
}
};
/** /**
* Remove runner version from status when pod is deleted * Remove runner version from status when pod is deleted
* *

View file

@ -18,21 +18,25 @@
package org.jdrupes.vmoperator.manager; package org.jdrupes.vmoperator.manager;
import com.google.gson.JsonObject;
import io.kubernetes.client.apimachinery.GroupVersionKind;
import io.kubernetes.client.custom.V1Patch;
import io.kubernetes.client.openapi.ApiException; import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.models.V1ObjectMeta; import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.util.Watch; import io.kubernetes.client.util.Watch;
import io.kubernetes.client.util.generic.options.ListOptions; import io.kubernetes.client.util.generic.options.ListOptions;
import java.io.IOException; import java.io.IOException;
import java.net.HttpURLConnection;
import java.time.Instant; import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.jdrupes.vmoperator.common.Constants.Crd; import org.jdrupes.vmoperator.common.Constants.Crd;
import org.jdrupes.vmoperator.common.Constants.Status;
import org.jdrupes.vmoperator.common.K8s; import org.jdrupes.vmoperator.common.K8s;
import org.jdrupes.vmoperator.common.K8sClient; import org.jdrupes.vmoperator.common.K8sClient;
import org.jdrupes.vmoperator.common.K8sDynamicStub; import org.jdrupes.vmoperator.common.K8sDynamicStub;
@ -40,23 +44,18 @@ import org.jdrupes.vmoperator.common.K8sObserver.ResponseType;
import org.jdrupes.vmoperator.common.K8sV1ConfigMapStub; import org.jdrupes.vmoperator.common.K8sV1ConfigMapStub;
import org.jdrupes.vmoperator.common.K8sV1StatefulSetStub; import org.jdrupes.vmoperator.common.K8sV1StatefulSetStub;
import org.jdrupes.vmoperator.common.VmDefinition; import org.jdrupes.vmoperator.common.VmDefinition;
import org.jdrupes.vmoperator.common.VmDefinition.Assignment;
import org.jdrupes.vmoperator.common.VmDefinitionStub; import org.jdrupes.vmoperator.common.VmDefinitionStub;
import org.jdrupes.vmoperator.common.VmDefinitions; import org.jdrupes.vmoperator.common.VmDefinitions;
import org.jdrupes.vmoperator.common.VmExtraData; import org.jdrupes.vmoperator.common.VmExtraData;
import org.jdrupes.vmoperator.common.VmPool;
import static org.jdrupes.vmoperator.manager.Constants.APP_NAME; import static org.jdrupes.vmoperator.manager.Constants.APP_NAME;
import static org.jdrupes.vmoperator.manager.Constants.VM_OP_NAME; import static org.jdrupes.vmoperator.manager.Constants.VM_OP_NAME;
import org.jdrupes.vmoperator.manager.events.AssignVm;
import org.jdrupes.vmoperator.manager.events.ChannelManager; import org.jdrupes.vmoperator.manager.events.ChannelManager;
import org.jdrupes.vmoperator.manager.events.GetPools;
import org.jdrupes.vmoperator.manager.events.GetVms;
import org.jdrupes.vmoperator.manager.events.GetVms.VmData;
import org.jdrupes.vmoperator.manager.events.ModifyVm; import org.jdrupes.vmoperator.manager.events.ModifyVm;
import org.jdrupes.vmoperator.manager.events.PodChanged; import org.jdrupes.vmoperator.manager.events.PodChanged;
import org.jdrupes.vmoperator.manager.events.UpdateAssignment; import org.jdrupes.vmoperator.manager.events.UpdateAssignment;
import org.jdrupes.vmoperator.manager.events.VmChannel; import org.jdrupes.vmoperator.manager.events.VmChannel;
import org.jdrupes.vmoperator.manager.events.VmDefChanged; import org.jdrupes.vmoperator.manager.events.VmDefChanged;
import org.jdrupes.vmoperator.util.GsonPtr;
import org.jgrapes.core.Channel; import org.jgrapes.core.Channel;
import org.jgrapes.core.Event; import org.jgrapes.core.Event;
import org.jgrapes.core.annotation.Handler; import org.jgrapes.core.annotation.Handler;
@ -233,100 +232,79 @@ public class VmMonitor extends
} }
/** /**
* Returns the VM data. * On modify vm.
*
* @param event the event
*/
@Handler
public void onGetVms(GetVms event) {
event.setResult(channelManager.channels().stream()
.filter(c -> event.name().isEmpty()
|| c.vmDefinition().name().equals(event.name().get()))
.filter(c -> event.user().isEmpty() && event.roles().isEmpty()
|| !c.vmDefinition().permissionsFor(event.user().orElse(null),
event.roles()).isEmpty())
.filter(c -> event.fromPool().isEmpty()
|| c.vmDefinition().assignment().map(Assignment::pool)
.map(p -> p.equals(event.fromPool().get())).orElse(false))
.filter(c -> event.toUser().isEmpty()
|| c.vmDefinition().assignment().map(Assignment::user)
.map(u -> u.equals(event.toUser().get())).orElse(false))
.map(c -> new VmData(c.vmDefinition(), c))
.toList());
}
/**
* Assign a VM if not already assigned.
* *
* @param event the event * @param event the event
* @throws ApiException the api exception * @throws ApiException the api exception
* @throws InterruptedException * @throws IOException Signals that an I/O exception has occurred.
*/ */
@Handler @Handler
@SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") public void onModifyVm(ModifyVm event, VmChannel channel)
public void onAssignVm(AssignVm event) throws ApiException, IOException {
throws ApiException, InterruptedException { patchVmDef(channel.client(), event.name(), "spec/vm/" + event.path(),
while (true) { event.value());
// Search for existing assignment.
var vmQuery = channelManager.channels().stream()
.filter(c -> c.vmDefinition().assignment().map(Assignment::pool)
.map(p -> p.equals(event.fromPool())).orElse(false))
.filter(c -> c.vmDefinition().assignment().map(Assignment::user)
.map(u -> u.equals(event.toUser())).orElse(false))
.findFirst();
if (vmQuery.isPresent()) {
var vmDef = vmQuery.get().vmDefinition();
event.setResult(new VmData(vmDef, vmQuery.get()));
return;
} }
// Get the pool definition for checking possible assignment private void patchVmDef(K8sClient client, String name, String path,
VmPool vmPool = newEventPipeline().fire(new GetPools() Object value) throws ApiException, IOException {
.withName(event.fromPool())).get().stream().findFirst() var vmStub = K8sDynamicStub.get(client,
.orElse(null); new GroupVersionKind(Crd.GROUP, "", Crd.KIND_VM), namespace(),
if (vmPool == null) { name);
return;
}
// Find available VM. // Patch running
vmQuery = channelManager.channels().stream() String valueAsText = value instanceof String
.filter(c -> vmPool.isAssignable(c.vmDefinition())) ? "\"" + value + "\""
.sorted(Comparator.comparing((VmChannel c) -> c.vmDefinition() : value.toString();
.assignment().map(Assignment::lastUsed) var res = vmStub.patch(V1Patch.PATCH_FORMAT_JSON_PATCH,
.orElse(Instant.ofEpochSecond(0))) new V1Patch("[{\"op\": \"replace\", \"path\": \"/"
.thenComparing(preferRunning)) + path + "\", \"value\": " + valueAsText + "}]"),
.findFirst(); client.defaultPatchOptions());
if (!res.isPresent()) {
// None found logger.warning(
if (vmQuery.isEmpty()) { () -> "Cannot patch definition for Vm " + vmStub.name());
return;
}
// Assign to user
var chosenVm = vmQuery.get();
if (Optional.ofNullable(chosenVm.fire(new UpdateAssignment(
vmPool, event.toUser())).get()).orElse(false)) {
var vmDef = chosenVm.vmDefinition();
event.setResult(new VmData(vmDef, chosenVm));
// Make sure that a newly assigned VM is running.
chosenVm.fire(new ModifyVm(vmDef.name(), "state", "Running"));
return;
}
} }
} }
private static Comparator<VmChannel> preferRunning /**
= new Comparator<>() { * Attempt to Update the assignment information in the status of the
@Override * VM CR. Returns true if successful. The handler does not attempt
public int compare(VmChannel ch1, VmChannel ch2) { * retries, because in case of failure it will be necessary to
if (ch1.vmDefinition().conditionStatus("Running").orElse(false) * re-evaluate the chosen VM.
&& !ch2.vmDefinition().conditionStatus("Running") *
.orElse(false)) { * @param event the event
return -1; * @param channel the channel
* @throws ApiException the api exception
*/
@Handler
public void onUpdatedAssignment(UpdateAssignment event, VmChannel channel)
throws ApiException {
try {
var vmDef = channel.vmDefinition();
var vmStub = VmDefinitionStub.get(channel.client(),
new GroupVersionKind(Crd.GROUP, "", Crd.KIND_VM),
vmDef.namespace(), vmDef.name());
if (vmStub.updateStatus(vmDef, from -> {
JsonObject status = from.statusJson();
if (event.toUser() == null) {
((JsonObject) GsonPtr.to(status).get())
.remove(Status.ASSIGNMENT);
} else {
var assignment = GsonPtr.to(status).to(Status.ASSIGNMENT);
assignment.set("pool", event.fromPool().name());
assignment.set("user", event.toUser());
assignment.set("lastUsed", Instant.now().toString());
} }
return 0; return status;
}).isPresent()) {
event.setResult(true);
}
} catch (ApiException e) {
// Log exceptions except for conflict, which can be expected
if (HttpURLConnection.HTTP_CONFLICT != e.getCode()) {
throw e;
}
}
event.setResult(false);
} }
};
} }