diff --git a/deploy/crds/vms-crd.yaml b/deploy/crds/vms-crd.yaml index 0b24bd6..713b785 100644 --- a/deploy/crds/vms-crd.yaml +++ b/deploy/crds/vms-crd.yaml @@ -9,6 +9,8 @@ spec: - name: v1 served: true storage: true + subresources: + status: {} schema: openAPIV3Schema: type: object @@ -1372,10 +1374,28 @@ spec: - vm status: type: object - properties: + default: {} + properties: + cpus: + description: >- + Number of CPUs currently in use. + type: integer + default: 0 + ram: + description: >- + Amount of memory in use. + type: string + default: "0" conditions: description: >- List of component conditions observed + default: + - type: Running + status: "False" + observedGeneration: 1 + lastTransitionTime: "1970-01-01T00:00:00Z" + reason: Creation + message: "Creation of CR" type: array items: type: object @@ -1383,6 +1403,12 @@ spec: Information about the condition of a component. See https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#spec-and-status and https://github.com/kubernetes/apimachinery/blob/release-1.23/pkg/apis/meta/v1/types.go#L1432-L1492 + required: + - type + - status + - lastTransitionTime + - reason + - message properties: type: type: string @@ -1428,12 +1454,6 @@ spec: Message is a human readable message indicating details about the transition. This may be an empty string. default: "" - required: - - type - - status - - lastTransitionTime - - reason - - message # either Namespaced or Cluster scope: Namespaced names: diff --git a/org.jdrupes.vmoperator.runner.qemu/build.gradle b/org.jdrupes.vmoperator.runner.qemu/build.gradle index 9f48a58..3e693bb 100644 --- a/org.jdrupes.vmoperator.runner.qemu/build.gradle +++ b/org.jdrupes.vmoperator.runner.qemu/build.gradle @@ -18,6 +18,8 @@ dependencies { implementation 'commons-cli:commons-cli:1.5.0' implementation 'org.freemarker:freemarker:[2.3.32,2.4)' implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:[2.15.1,3]' + + runtimeOnly 'org.slf4j:slf4j-jdk14:[2.0.7,3)' } application { diff --git a/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/Runner.java b/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/Runner.java index 1370e18..2e8216b 100644 --- a/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/Runner.java +++ b/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/Runner.java @@ -66,6 +66,7 @@ import org.jgrapes.core.annotation.Handler; import org.jgrapes.core.events.Start; import org.jgrapes.core.events.Started; import org.jgrapes.core.events.Stop; +import org.jgrapes.core.internal.EventProcessor; import org.jgrapes.io.NioDispatcher; import org.jgrapes.io.events.Input; import org.jgrapes.io.events.ProcessExited; @@ -90,6 +91,13 @@ import org.jgrapes.util.events.WatchFile; * * ![Runner state diagram](RunnerStates.svg) * + * The {@link Runner} associates an {@link EventProcessor} with the + * {@link Start} event. This "runner event processor" must be used + * for all events related to the application level function. Components + * that handle events from other sources (and thus event processors) + * must fire any resulting events on the runner event processor in order + * to maintain synchronization. + * * @startuml RunnerStates.svg * [*] --> Initializing * Initializing -> Initializing: InitialConfiguration/configure Runner @@ -149,8 +157,13 @@ import org.jgrapes.util.events.WatchFile; * error --> terminate * StartingProcess --> terminate: ProcessExited * + * state Stopped { + * state stopped <> * - * terminated --> [*] + * stopped --> [*] + * } + * + * terminated --> stopped * * @enduml * @@ -211,6 +224,7 @@ public class Runner extends Component { attach(new ProcessManager(channel())); attach(new SocketConnector(channel())); attach(qemuMonitor = new QemuMonitor(channel())); + attach(new StatusUpdater(channel())); // Configuration store with file in /etc/opt (default) File config = new File(cmdLine.getOptionValue('c', @@ -352,6 +366,11 @@ public class Runner extends Component { return; } + // Make sure to use thread specific client + // https://github.com/kubernetes-client/java/issues/100 + io.kubernetes.client.openapi.Configuration.setDefaultApiClient(null); + + // Prepare specific event pipeline to avoid concurrency. rep = newEventPipeline(); event.setAssociated(EventPipeline.class, rep); try { @@ -387,7 +406,8 @@ public class Runner extends Component { @Handler public void onStarted(Started event) { state = State.STARTING; - fire(new RunnerStateChange(state)); + rep.fire(new RunnerStateChange(state, "RunnerStarted", + "Runner has been started")); // Start first process if (config.vm.useTpm && swtpmDefinition != null) { startProcess(swtpmDefinition); @@ -476,7 +496,7 @@ public class Runner extends Component { */ @Handler public void onMonitorReady(MonitorReady event) { - fire(new RunnerConfigurationUpdate(config, state)); + rep.fire(new RunnerConfigurationUpdate(config, state)); } /** @@ -489,7 +509,8 @@ public class Runner extends Component { if (state == State.STARTING) { fire(new MonitorCommand(new QmpCont())); state = State.RUNNING; - fire(new RunnerStateChange(state)); + rep.fire(new RunnerStateChange(state, "VmStarted", + "Qemu has been configured and is continuing")); } } @@ -524,9 +545,22 @@ public class Runner extends Component { * @param event the event */ @Handler(priority = 10_000) - public void onStop(Stop event) { + public void onStopFirst(Stop event) { state = State.TERMINATING; - fire(new RunnerStateChange(state)); + rep.fire(new RunnerStateChange(state, "VmTerminating", + "The VM is being shut down")); + } + + /** + * On stop. + * + * @param event the event + */ + @Handler(priority = -10_000) + public void onStopLast(Stop event) { + state = State.STOPPED; + rep.fire(new RunnerStateChange(state, "VmStopped", + "The VM has been shut down")); } private void shutdown() { diff --git a/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/StatusUpdater.java b/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/StatusUpdater.java new file mode 100644 index 0000000..e45b5e6 --- /dev/null +++ b/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/StatusUpdater.java @@ -0,0 +1,244 @@ +/* + * VM-Operator + * Copyright (C) 2023 Michael N. Lipp + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package org.jdrupes.vmoperator.runner.qemu; + +import com.google.gson.JsonObject; +import io.kubernetes.client.openapi.ApiException; +import io.kubernetes.client.openapi.apis.ApiextensionsV1Api; +import io.kubernetes.client.openapi.models.V1CustomResourceDefinitionVersion; +import io.kubernetes.client.util.Config; +import io.kubernetes.client.util.generic.dynamic.DynamicKubernetesApi; +import io.kubernetes.client.util.generic.dynamic.DynamicKubernetesObject; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Instant; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.logging.Level; +import org.jdrupes.vmoperator.runner.qemu.events.RunnerConfigurationUpdate; +import org.jdrupes.vmoperator.runner.qemu.events.RunnerStateChange; +import org.jdrupes.vmoperator.runner.qemu.events.RunnerStateChange.State; +import static org.jdrupes.vmoperator.util.Constants.VM_OP_CRD_NAME; +import static org.jdrupes.vmoperator.util.Constants.VM_OP_GROUP; +import org.jgrapes.core.Channel; +import org.jgrapes.core.Component; +import org.jgrapes.core.annotation.Handler; +import org.jgrapes.core.events.Start; +import org.jgrapes.util.events.ConfigurationUpdate; +import org.jgrapes.util.events.InitialConfiguration; + +/** + * Updates the CR status. + */ +public class StatusUpdater extends Component { + + private static final Set RUNNING_STATES + = Set.of(State.RUNNING, State.TERMINATING); + + private String namespace; + private String vmName; + private DynamicKubernetesApi vmCrApi; + private long observedGeneration; + + /** + * Instantiates a new status updater. + * + * @param componentChannel the component channel + */ + public StatusUpdater(Channel componentChannel) { + super(componentChannel); + } + + /** + * On configuration update. + * + * @param event the event + */ + @Handler + @SuppressWarnings("unchecked") + public void onConfigurationUpdate(ConfigurationUpdate event) { + event.structured("/Runner").ifPresent(c -> { + if (event instanceof InitialConfiguration) { + namespace = (String) c.get("namespace"); + updateNamespace(); + vmName = Optional.ofNullable((Map) c.get("vm")) + .map(vm -> vm.get("name")).orElse(null); + } + }); + } + + private void updateNamespace() { + if (namespace == null) { + var path = Path + .of("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); + if (Files.isReadable(path)) { + try { + namespace = Files.lines(path).findFirst().orElse(null); + } catch (IOException e) { + logger.log(Level.WARNING, e, + () -> "Cannot read namespace."); + } + } + } + if (namespace == null) { + logger.warning(() -> "Namespace is unknown, some functions" + + " won't be available."); + } + } + + /** + * Handle the start event. + * + * @param event the event + */ + @Handler + @SuppressWarnings({ "PMD.DataflowAnomalyAnalysis", + "PMD.AvoidInstantiatingObjectsInLoops" }) + public void onStart(Start event) { + try { + var client = Config.defaultClient(); + var extsApi = new ApiextensionsV1Api(client); + var crds = extsApi.listCustomResourceDefinition(null, null, null, + "metadata.name=" + VM_OP_CRD_NAME, null, null, null, + null, null, null); + if (crds.getItems().isEmpty()) { + logger.warning(() -> "CRD is unknown, status will not" + + " be updated."); + return; + } + var crd = crds.getItems().get(0); + if (crd.getSpec().getVersions().stream() + .filter(v -> v.getSubresources() == null).findAny() + .isPresent()) { + logger.warning(() -> "You are using an old version of the CRD," + + " status will not be updated."); + return; + } + var crdPlural = crd.getSpec().getNames().getPlural(); + var vmOpApiVersions = crd.getSpec().getVersions().stream() + .map(V1CustomResourceDefinitionVersion::getName).toList(); + for (var apiVer : vmOpApiVersions) { + var api = new DynamicKubernetesApi(VM_OP_GROUP, apiVer, + crdPlural, client); + var res = api.get(namespace, vmName); + if (res.isSuccess()) { + vmCrApi = api; + observedGeneration + = res.getObject().getMetadata().getGeneration(); + break; + } + } + if (vmCrApi == null) { + logger.warning(() -> "VM's CR is unknown, status will not" + + " be updated."); + } + } catch (IOException | ApiException e) { + logger.log(Level.WARNING, e, () -> "Cannot access kubernetes: " + + e.getMessage()); + } + } + + @SuppressWarnings("PMD.AvoidDuplicateLiterals") + private JsonObject currentStatus(DynamicKubernetesObject vmCr) { + return vmCr.getRaw().getAsJsonObject("status").deepCopy(); + } + + /** + * On runner state changed. + * + * @param event the event + 8 * @throws ApiException the api exception + */ + @Handler + public void onRunnerStateChanged(RunnerStateChange event) + throws ApiException { + if (vmCrApi == null) { + return; + } + var vmCr = vmCrApi.get(namespace, vmName) + .throwsApiException().getObject(); + vmCrApi.updateStatus(vmCr, from -> { + JsonObject status = currentStatus(from); + status.getAsJsonArray("conditions").asList().stream() + .map(cond -> (JsonObject) cond) + .forEach(cond -> { + if ("Running".equals(cond.get("type").getAsString())) { + updateRunningCondition(event, from, cond); + } + }); + return status; + }); + } + + private void updateRunningCondition(RunnerStateChange event, + DynamicKubernetesObject from, JsonObject cond) { + boolean reportedRunning + = "True".equals(cond.get("status").getAsString()); + if (RUNNING_STATES.contains(event.state()) + && !reportedRunning) { + cond.addProperty("status", "True"); + cond.addProperty("lastTransitionTime", + Instant.now().toString()); + } + if (!RUNNING_STATES.contains(event.state()) + && reportedRunning) { + cond.addProperty("status", "False"); + cond.addProperty("lastTransitionTime", + Instant.now().toString()); + } + cond.addProperty("reason", event.reason()); + cond.addProperty("message", event.message()); + cond.addProperty("observedGeneration", + from.getMetadata().getGeneration()); + } + + /** + * On runner configuration update. + * + * @param event the event + * @throws ApiException + */ + @Handler + public void onRunnerConfigurationUpdate(RunnerConfigurationUpdate event) + throws ApiException { + if (vmCrApi == null) { + return; + } + // A change of the runner configuration is typically caused + // by a new version of the CR. So we observe the new CR. + var vmCr = vmCrApi.get(namespace, vmName).throwsApiException() + .getObject(); + if (vmCr.getMetadata().getGeneration() == observedGeneration) { + return; + } + vmCrApi.updateStatus(vmCr, from -> { + JsonObject status = currentStatus(from); + status.getAsJsonArray("conditions").asList().stream() + .map(cond -> (JsonObject) cond) + .filter( + cond -> "Running".equals(cond.get("type").getAsString())) + .forEach(cond -> cond.addProperty("observedGeneration", + from.getMetadata().getGeneration())); + return status; + }); + } + +} diff --git a/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/events/RunnerStateChange.java b/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/events/RunnerStateChange.java index a827fa4..46fa1f8 100644 --- a/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/events/RunnerStateChange.java +++ b/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/events/RunnerStateChange.java @@ -19,6 +19,7 @@ package org.jdrupes.vmoperator.runner.qemu.events; import org.jgrapes.core.Channel; +import org.jgrapes.core.Components; import org.jgrapes.core.Event; /** @@ -30,19 +31,24 @@ public class RunnerStateChange extends Event { * The state. */ public enum State { - INITIALIZING, STARTING, RUNNING, TERMINATING + INITIALIZING, STARTING, RUNNING, TERMINATING, STOPPED } private final State state; + private final String reason; + private final String message; /** * Instantiates a new runner state change. * * @param channels the channels */ - public RunnerStateChange(State state, Channel... channels) { + public RunnerStateChange(State state, String reason, String message, + Channel... channels) { super(channels); this.state = state; + this.reason = reason; + this.message = message; } /** @@ -53,4 +59,36 @@ public class RunnerStateChange extends Event { public State state() { return state; } + + /** + * Gets the reason. + * + * @return the reason + */ + public String reason() { + return reason; + } + + /** + * Gets the message. + * + * @return the message + */ + public String message() { + return message; + } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder(); + builder.append(Components.objectName(this)) + .append(" [").append(state).append(": ").append(reason); + if (channels() != null) { + builder.append(", channels="); + builder.append(Channel.toString(channels())); + } + builder.append(']'); + return builder.toString(); + } + }