From e3b5f5a04dcd92cfbf1b34af605cf6f77534c085 Mon Sep 17 00:00:00 2001 From: "Michael N. Lipp" Date: Mon, 24 Feb 2025 11:58:13 +0100 Subject: [PATCH] Refactor QEMU socket connection handling and start vmop agent. --- dev-example/test-vm.tpl.yaml | 2 +- .../runner/qemu/AgentConnector.java | 86 +++++++ .../vmoperator/runner/qemu/Configuration.java | 6 +- .../runner/qemu/GuestAgentClient.java | 200 ++------------- .../vmoperator/runner/qemu/QemuConnector.java | 234 ++++++++++++++++++ .../vmoperator/runner/qemu/QemuMonitor.java | 134 ++-------- .../vmoperator/runner/qemu/Runner.java | 42 +++- .../runner/qemu/VmopAgentClient.java | 48 ++++ .../templates/Standard-VM-latest.ftl.yaml | 11 +- webpages/vm-operator/upgrading.md | 7 + 10 files changed, 451 insertions(+), 319 deletions(-) create mode 100644 org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/AgentConnector.java create mode 100644 org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/QemuConnector.java create mode 100644 org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/VmopAgentClient.java diff --git a/dev-example/test-vm.tpl.yaml b/dev-example/test-vm.tpl.yaml index 50031bb..260341e 100644 --- a/dev-example/test-vm.tpl.yaml +++ b/dev-example/test-vm.tpl.yaml @@ -14,7 +14,7 @@ spec: # repository: ghcr.io # path: mnlipp/org.jdrupes.vmoperator.runner.qemu-alpine # version: "3.0.0" - source: registry.mnl.de/org/jdrupes/vm-operator/org.jdrupes.vmoperator.runner.qemu-arch:testing + source: registry.mnl.de/org/jdrupes/vm-operator/org.jdrupes.vmoperator.runner.qemu-arch:feature-auto-login pullPolicy: Always permissions: diff --git a/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/AgentConnector.java b/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/AgentConnector.java new file mode 100644 index 0000000..40db84a --- /dev/null +++ b/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/AgentConnector.java @@ -0,0 +1,86 @@ +/* + * VM-Operator + * Copyright (C) 2025 Michael N. Lipp + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package org.jdrupes.vmoperator.runner.qemu; + +import java.io.IOException; +import java.nio.file.Path; +import org.jdrupes.vmoperator.runner.qemu.events.VserportChangeEvent; +import org.jgrapes.core.Channel; +import org.jgrapes.core.annotation.Handler; +import org.jgrapes.util.events.ConfigurationUpdate; + +/** + * A component that handles the communication with an agent + * running in the VM. + * + * If the log level for this class is set to fine, the messages + * exchanged on the socket are logged. + */ +public abstract class AgentConnector extends QemuConnector { + + protected String channelId; + + /** + * Instantiates a new agent connector. + * + * @param componentChannel the component channel + * @throws IOException Signals that an I/O exception has occurred. + */ + public AgentConnector(Channel componentChannel) throws IOException { + super(componentChannel); + } + + /** + * As the initial configuration of this component depends on the + * configuration of the {@link Runner}, it doesn't have a handler + * for the {@link ConfigurationUpdate} event. The values are + * forwarded from the {@link Runner} instead. + * + * @param channelId the channel id + * @param socketPath the socket path + */ + /* default */ void configure(String channelId, Path socketPath) { + super.configure(socketPath); + this.channelId = channelId; + logger.fine(() -> getClass().getSimpleName() + " configured with" + + " channelId=" + channelId); + } + + /** + * When the virtual serial port with the configured channel id has + * been opened call {@link #agentConnected()}. + * + * @param event the event + */ + @Handler + public void onVserportChanged(VserportChangeEvent event) { + if (event.id().equals(channelId) && event.isOpen()) { + agentConnected(); + } + } + + /** + * Called when the agent in the VM opens the connection. The + * default implementation does nothing. + */ + @SuppressWarnings("PMD.EmptyMethodInAbstractClassShouldBeAbstract") + protected void agentConnected() { + // Default is to do nothing. + } +} diff --git a/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/Configuration.java b/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/Configuration.java index 086f085..20d4c66 100644 --- a/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/Configuration.java +++ b/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/Configuration.java @@ -39,7 +39,7 @@ import org.jdrupes.vmoperator.util.FsdUtils; /** * The configuration information from the configuration file. */ -@SuppressWarnings("PMD.ExcessivePublicCount") +@SuppressWarnings({ "PMD.ExcessivePublicCount", "PMD.TooManyFields" }) public class Configuration implements Dto { private static final String CI_INSTANCE_ID = "instance-id"; @@ -67,9 +67,6 @@ public class Configuration implements Dto { /** The monitor socket. */ public Path monitorSocket; - /** The guest agent socket socket. */ - public Path guestAgentSocket; - /** The firmware rom. */ public Path firmwareRom; @@ -344,7 +341,6 @@ public class Configuration implements Dto { runtimeDir.toFile().mkdir(); swtpmSocket = runtimeDir.resolve("swtpm-sock"); monitorSocket = runtimeDir.resolve("monitor.sock"); - guestAgentSocket = runtimeDir.resolve("org.qemu.guest_agent.0"); } if (!Files.isDirectory(runtimeDir) || !Files.isWritable(runtimeDir)) { logger.severe(() -> String.format( diff --git a/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/GuestAgentClient.java b/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/GuestAgentClient.java index fba975e..2e5e059 100644 --- a/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/GuestAgentClient.java +++ b/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/GuestAgentClient.java @@ -19,58 +19,26 @@ package org.jdrupes.vmoperator.runner.qemu; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import java.io.IOException; -import java.io.Writer; -import java.lang.reflect.UndeclaredThrowableException; -import java.net.UnixDomainSocketAddress; -import java.nio.file.Files; -import java.nio.file.Path; import java.util.LinkedList; -import java.util.List; -import java.util.Map; import java.util.Queue; import java.util.logging.Level; import org.jdrupes.vmoperator.runner.qemu.commands.QmpCommand; import org.jdrupes.vmoperator.runner.qemu.commands.QmpGuestGetOsinfo; import org.jdrupes.vmoperator.runner.qemu.events.GuestAgentCommand; import org.jdrupes.vmoperator.runner.qemu.events.OsinfoEvent; -import org.jdrupes.vmoperator.runner.qemu.events.VserportChangeEvent; import org.jgrapes.core.Channel; -import org.jgrapes.core.Component; -import org.jgrapes.core.EventPipeline; import org.jgrapes.core.annotation.Handler; -import org.jgrapes.core.events.Start; -import org.jgrapes.core.events.Stop; -import org.jgrapes.io.events.Closed; -import org.jgrapes.io.events.ConnectError; -import org.jgrapes.io.events.Input; -import org.jgrapes.io.events.OpenSocketConnection; -import org.jgrapes.io.util.ByteBufferWriter; -import org.jgrapes.io.util.LineCollector; -import org.jgrapes.net.SocketIOChannel; -import org.jgrapes.net.events.ClientConnected; -import org.jgrapes.util.events.ConfigurationUpdate; /** - * A component that handles the communication over the guest agent - * socket. + * A component that handles the communication with the guest agent. * * If the log level for this class is set to fine, the messages * exchanged on the monitor socket are logged. */ -@SuppressWarnings("PMD.DataflowAnomalyAnalysis") -public class GuestAgentClient extends Component { +public class GuestAgentClient extends AgentConnector { - private static ObjectMapper mapper = new ObjectMapper(); - - private EventPipeline rep; - private Path socketPath; - private List> guestAgentCmds; - private String guestAgentCmd; - private SocketIOChannel gaChannel; private final Queue executing = new LinkedList<>(); /** @@ -79,135 +47,36 @@ public class GuestAgentClient extends Component { * @param componentChannel the component channel * @throws IOException Signals that an I/O exception has occurred. */ - @SuppressWarnings({ "PMD.AssignmentToNonFinalStatic", - "PMD.ConstructorCallsOverridableMethod" }) public GuestAgentClient(Channel componentChannel) throws IOException { super(componentChannel); } /** - * As the initial configuration of this component depends on the - * configuration of the {@link Runner}, it doesn't have a handler - * for the {@link ConfigurationUpdate} event. The values are - * forwarded from the {@link Runner} instead. - * - * @param socketPath the socket path - * @param guestAgentCmds the guest agent cmds + * When the agent has connected, request the OS information. */ - @SuppressWarnings("PMD.EmptyCatchBlock") - /* default */ void configure(Path socketPath, ArrayNode guestAgentCmds) { - this.socketPath = socketPath; - try { - this.guestAgentCmds = mapper.convertValue(guestAgentCmds, - mapper.constructType(getClass() - .getDeclaredField("guestAgentCmds").getGenericType())); - } catch (IllegalArgumentException | NoSuchFieldException - | SecurityException e) { - // Cannot happen - } + @Override + protected void agentConnected() { + fire(new GuestAgentCommand(new QmpGuestGetOsinfo())); } /** - * Handle the start event. + * Process agent input. * - * @param event the event + * @param line the line * @throws IOException Signals that an I/O exception has occurred. */ - @Handler - public void onStart(Start event) throws IOException { - rep = event.associated(EventPipeline.class).get(); - if (socketPath == null) { - return; - } - Files.deleteIfExists(socketPath); - } - - /** - * When the virtual serial port "channel0" has been opened, - * establish the connection by opening the socket. - * - * @param event the event - */ - @Handler - public void onVserportChanged(VserportChangeEvent event) { - if ("channel0".equals(event.id()) && event.isOpen()) { - fire(new OpenSocketConnection( - UnixDomainSocketAddress.of(socketPath)) - .setAssociated(GuestAgentClient.class, this)); - } - } - - /** - * Check if this is from opening the monitor socket and if true, - * save the socket in the context and associate the channel with - * the context. Then send the initial message to the socket. - * - * @param event the event - * @param channel the channel - */ - @SuppressWarnings("resource") - @Handler - public void onClientConnected(ClientConnected event, - SocketIOChannel channel) { - event.openEvent().associated(GuestAgentClient.class).ifPresent(qm -> { - gaChannel = channel; - channel.setAssociated(GuestAgentClient.class, this); - channel.setAssociated(Writer.class, new ByteBufferWriter( - channel).nativeCharset()); - channel.setAssociated(LineCollector.class, - new LineCollector() - .consumer(line -> { - try { - processGuestAgentInput(line); - } catch (IOException e) { - throw new UndeclaredThrowableException(e); - } - })); - fire(new GuestAgentCommand(new QmpGuestGetOsinfo())); - }); - } - - /** - * Called when a connection attempt fails. - * - * @param event the event - * @param channel the channel - */ - @Handler - public void onConnectError(ConnectError event, SocketIOChannel channel) { - event.event().associated(GuestAgentClient.class).ifPresent(qm -> { - rep.fire(new Stop()); - }); - } - - /** - * Handle data from qemu monitor connection. - * - * @param event the event - * @param channel the channel - */ - @Handler - public void onInput(Input event, SocketIOChannel channel) { - if (channel.associated(GuestAgentClient.class).isEmpty()) { - return; - } - channel.associated(LineCollector.class).ifPresent(collector -> { - collector.feed(event); - }); - } - - private void processGuestAgentInput(String line) - throws IOException { + @Override + protected void processInput(String line) throws IOException { logger.fine(() -> "guest agent(in): " + line); try { var response = mapper.readValue(line, ObjectNode.class); if (response.has("return") || response.has("error")) { QmpCommand executed = executing.poll(); - logger.fine( - () -> String.format("(Previous \"guest agent(in)\" is " - + "result from executing %s)", executed)); + logger.fine(() -> String.format("(Previous \"guest agent(in)\"" + + " is result from executing %s)", executed)); if (executed instanceof QmpGuestGetOsinfo) { - processOsInfo(response); + var osInfo = new OsinfoEvent(response.get("return")); + rep().fire(osInfo); } } } catch (JsonProcessingException e) { @@ -215,48 +84,17 @@ public class GuestAgentClient extends Component { } } - private void processOsInfo(ObjectNode response) { - var osInfo = new OsinfoEvent(response.get("return")); - var osId = osInfo.osinfo().get("id").asText(); - for (var cmdDef : guestAgentCmds) { - if (osId.equals(cmdDef.get("osId")) - || "*".equals(cmdDef.get("osId"))) { - guestAgentCmd = cmdDef.get("executable"); - break; - } - } - if (guestAgentCmd == null) { - logger.warning(() -> "No guest agent command for OS " + osId); - } else { - logger.fine(() -> "Guest agent command for OS " + osId - + " is " + guestAgentCmd); - } - rep.fire(osInfo); - } - - /** - * On closed. - * - * @param event the event - */ - @Handler - @SuppressWarnings({ "PMD.AvoidSynchronizedStatement", - "PMD.AvoidDuplicateLiterals" }) - public void onClosed(Closed event, SocketIOChannel channel) { - channel.associated(QemuMonitor.class).ifPresent(qm -> { - gaChannel = null; - }); - } - /** * On guest agent command. * * @param event the event */ @Handler - @SuppressWarnings({ "PMD.AvoidLiteralsInIfCondition", - "PMD.AvoidSynchronizedStatement" }) + @SuppressWarnings("PMD.AvoidSynchronizedStatement") public void onGuestAgentCommand(GuestAgentCommand event) { + if (qemuChannel() == null) { + return; + } var command = event.command(); logger.fine(() -> "guest agent(out): " + command.toString()); String asText; @@ -268,7 +106,7 @@ public class GuestAgentClient extends Component { return; } synchronized (executing) { - gaChannel.associated(Writer.class).ifPresent(writer -> { + writer().ifPresent(writer -> { try { executing.add(command); writer.append(asText).append('\n').flush(); diff --git a/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/QemuConnector.java b/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/QemuConnector.java new file mode 100644 index 0000000..143cfc2 --- /dev/null +++ b/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/QemuConnector.java @@ -0,0 +1,234 @@ +/* + * VM-Operator + * Copyright (C) 2025 Michael N. Lipp + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package org.jdrupes.vmoperator.runner.qemu; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.io.Writer; +import java.lang.reflect.UndeclaredThrowableException; +import java.net.UnixDomainSocketAddress; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Optional; +import org.jgrapes.core.Channel; +import org.jgrapes.core.Component; +import org.jgrapes.core.EventPipeline; +import org.jgrapes.core.annotation.Handler; +import org.jgrapes.core.events.Start; +import org.jgrapes.core.events.Stop; +import org.jgrapes.io.events.Closed; +import org.jgrapes.io.events.ConnectError; +import org.jgrapes.io.events.Input; +import org.jgrapes.io.events.OpenSocketConnection; +import org.jgrapes.io.util.ByteBufferWriter; +import org.jgrapes.io.util.LineCollector; +import org.jgrapes.net.SocketIOChannel; +import org.jgrapes.net.events.ClientConnected; +import org.jgrapes.util.events.ConfigurationUpdate; +import org.jgrapes.util.events.FileChanged; +import org.jgrapes.util.events.WatchFile; + +/** + * A component that handles the communication with QEMU over a socket. + * + * If the log level for this class is set to fine, the messages + * exchanged on the socket are logged. + */ +public abstract class QemuConnector extends Component { + + @SuppressWarnings("PMD.FieldNamingConventions") + protected static final ObjectMapper mapper = new ObjectMapper(); + + private EventPipeline rep; + private Path socketPath; + private SocketIOChannel qemuChannel; + + /** + * Instantiates a new QEMU connector. + * + * @param componentChannel the component channel + * @throws IOException Signals that an I/O exception has occurred. + */ + public QemuConnector(Channel componentChannel) throws IOException { + super(componentChannel); + } + + /** + * As the initial configuration of this component depends on the + * configuration of the {@link Runner}, it doesn't have a handler + * for the {@link ConfigurationUpdate} event. The values are + * forwarded from the {@link Runner} instead. + * + * @param socketPath the socket path + */ + /* default */ void configure(Path socketPath) { + this.socketPath = socketPath; + logger.fine(() -> getClass().getSimpleName() + + " configured with socketPath=" + socketPath); + } + + /** + * Note the runner's event processor and delete the socket. + * + * @param event the event + * @throws IOException Signals that an I/O exception has occurred. + */ + @Handler + public void onStart(Start event) throws IOException { + rep = event.associated(EventPipeline.class).get(); + if (socketPath == null) { + return; + } + Files.deleteIfExists(socketPath); + fire(new WatchFile(socketPath)); + } + + /** + * Return the runner's event pipeline. + * + * @return the event pipeline + */ + protected EventPipeline rep() { + return rep; + } + + /** + * Watch for the creation of the swtpm socket and start the + * qemu process if it has been created. + * + * @param event the event + */ + @Handler + public void onFileChanged(FileChanged event) { + if (event.change() == FileChanged.Kind.CREATED + && event.path().equals(socketPath)) { + // qemu running, open socket + fire(new OpenSocketConnection( + UnixDomainSocketAddress.of(socketPath)) + .setAssociated(getClass(), this)); + } + } + + /** + * Check if this is from opening the agent socket and if true, + * save the socket in the context and associate the channel with + * the context. + * + * @param event the event + * @param channel the channel + */ + @SuppressWarnings("resource") + @Handler + public void onClientConnected(ClientConnected event, + SocketIOChannel channel) { + event.openEvent().associated(getClass()).ifPresent(qm -> { + qemuChannel = channel; + channel.setAssociated(getClass(), this); + channel.setAssociated(Writer.class, new ByteBufferWriter( + channel).nativeCharset()); + channel.setAssociated(LineCollector.class, + new LineCollector() + .consumer(line -> { + try { + processInput(line); + } catch (IOException e) { + throw new UndeclaredThrowableException(e); + } + })); + socketConnected(); + }); + } + + /** + * Return the QEMU channel if the connection has been established. + * + * @return the socket IO channel + */ + protected Optional qemuChannel() { + return Optional.ofNullable(qemuChannel); + } + + /** + * Return the {@link Writer} for the connection if the connection + * has been established. + * + * @return the optional + */ + protected Optional writer() { + return qemuChannel().flatMap(c -> c.associated(Writer.class)); + } + + /** + * Called when the connector has been connected to the socket. + */ + @SuppressWarnings("PMD.EmptyMethodInAbstractClassShouldBeAbstract") + protected void socketConnected() { + // Default is to do nothing. + } + + /** + * Called when a connection attempt fails. + * + * @param event the event + * @param channel the channel + */ + @Handler + public void onConnectError(ConnectError event, SocketIOChannel channel) { + event.event().associated(getClass()).ifPresent(qm -> { + rep.fire(new Stop()); + }); + } + + /** + * Handle data from the socket connection. + * + * @param event the event + * @param channel the channel + */ + @Handler + public void onInput(Input event, SocketIOChannel channel) { + if (channel.associated(getClass()).isEmpty()) { + return; + } + channel.associated(LineCollector.class).ifPresent(collector -> { + collector.feed(event); + }); + } + + /** + * Process agent input. + * + * @param line the line + * @throws IOException Signals that an I/O exception has occurred. + */ + protected abstract void processInput(String line) throws IOException; + + /** + * On closed. + * + * @param event the event + * @param channel the channel + */ + @Handler + public void onClosed(Closed event, SocketIOChannel channel) { + channel.associated(getClass()).ifPresent(qm -> { + qemuChannel = null; + }); + } +} diff --git a/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/QemuMonitor.java b/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/QemuMonitor.java index 7cac734..000a3bf 100644 --- a/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/QemuMonitor.java +++ b/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/QemuMonitor.java @@ -19,13 +19,8 @@ package org.jdrupes.vmoperator.runner.qemu; import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import java.io.IOException; -import java.io.Writer; -import java.lang.reflect.UndeclaredThrowableException; -import java.net.UnixDomainSocketAddress; -import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; import java.time.Instant; @@ -42,24 +37,13 @@ import org.jdrupes.vmoperator.runner.qemu.events.MonitorReady; import org.jdrupes.vmoperator.runner.qemu.events.MonitorResult; import org.jdrupes.vmoperator.runner.qemu.events.PowerdownEvent; import org.jgrapes.core.Channel; -import org.jgrapes.core.Component; import org.jgrapes.core.Components; import org.jgrapes.core.Components.Timer; -import org.jgrapes.core.EventPipeline; import org.jgrapes.core.annotation.Handler; -import org.jgrapes.core.events.Start; import org.jgrapes.core.events.Stop; import org.jgrapes.io.events.Closed; -import org.jgrapes.io.events.ConnectError; -import org.jgrapes.io.events.Input; -import org.jgrapes.io.events.OpenSocketConnection; -import org.jgrapes.io.util.ByteBufferWriter; -import org.jgrapes.io.util.LineCollector; import org.jgrapes.net.SocketIOChannel; -import org.jgrapes.net.events.ClientConnected; import org.jgrapes.util.events.ConfigurationUpdate; -import org.jgrapes.util.events.FileChanged; -import org.jgrapes.util.events.WatchFile; /** * A component that handles the communication over the Qemu monitor @@ -69,14 +53,9 @@ import org.jgrapes.util.events.WatchFile; * exchanged on the monitor socket are logged. */ @SuppressWarnings("PMD.DataflowAnomalyAnalysis") -public class QemuMonitor extends Component { +public class QemuMonitor extends QemuConnector { - private static ObjectMapper mapper = new ObjectMapper(); - - private EventPipeline rep; - private Path socketPath; private int powerdownTimeout; - private SocketIOChannel monitorChannel; private final Queue executing = new LinkedList<>(); private Instant powerdownStartedAt; private Stop suspendedStop; @@ -84,7 +63,7 @@ public class QemuMonitor extends Component { private boolean powerdownConfirmed; /** - * Instantiates a new qemu monitor. + * Instantiates a new QEMU monitor. * * @param componentChannel the component channel * @param configDir the config dir @@ -111,109 +90,26 @@ public class QemuMonitor extends Component { * @param powerdownTimeout */ /* default */ void configure(Path socketPath, int powerdownTimeout) { - this.socketPath = socketPath; + super.configure(socketPath); this.powerdownTimeout = powerdownTimeout; } /** - * Handle the start event. - * - * @param event the event - * @throws IOException Signals that an I/O exception has occurred. + * When the socket is connected, send the capabilities command. */ - @Handler - public void onStart(Start event) throws IOException { - rep = event.associated(EventPipeline.class).get(); - if (socketPath == null) { - return; - } - Files.deleteIfExists(socketPath); - fire(new WatchFile(socketPath)); + @Override + protected void socketConnected() { + fire(new MonitorCommand(new QmpCapabilities())); } - /** - * Watch for the creation of the swtpm socket and start the - * qemu process if it has been created. - * - * @param event the event - */ - @Handler - public void onFileChanged(FileChanged event) { - if (event.change() == FileChanged.Kind.CREATED - && event.path().equals(socketPath)) { - // qemu running, open socket - fire(new OpenSocketConnection( - UnixDomainSocketAddress.of(socketPath)) - .setAssociated(QemuMonitor.class, this)); - } - } - - /** - * Check if this is from opening the monitor socket and if true, - * save the socket in the context and associate the channel with - * the context. Then send the initial message to the socket. - * - * @param event the event - * @param channel the channel - */ - @SuppressWarnings("resource") - @Handler - public void onClientConnected(ClientConnected event, - SocketIOChannel channel) { - event.openEvent().associated(QemuMonitor.class).ifPresent(qm -> { - monitorChannel = channel; - channel.setAssociated(QemuMonitor.class, this); - channel.setAssociated(Writer.class, new ByteBufferWriter( - channel).nativeCharset()); - channel.setAssociated(LineCollector.class, - new LineCollector() - .consumer(line -> { - try { - processMonitorInput(line); - } catch (IOException e) { - throw new UndeclaredThrowableException(e); - } - })); - fire(new MonitorCommand(new QmpCapabilities())); - }); - } - - /** - * Called when a connection attempt fails. - * - * @param event the event - * @param channel the channel - */ - @Handler - public void onConnectError(ConnectError event, SocketIOChannel channel) { - event.event().associated(QemuMonitor.class).ifPresent(qm -> { - rep.fire(new Stop()); - }); - } - - /** - * Handle data from qemu monitor connection. - * - * @param event the event - * @param channel the channel - */ - @Handler - public void onInput(Input event, SocketIOChannel channel) { - if (channel.associated(QemuMonitor.class).isEmpty()) { - return; - } - channel.associated(LineCollector.class).ifPresent(collector -> { - collector.feed(event); - }); - } - - private void processMonitorInput(String line) + @Override + protected void processInput(String line) throws IOException { logger.fine(() -> "monitor(in): " + line); try { var response = mapper.readValue(line, ObjectNode.class); if (response.has("QMP")) { - rep.fire(new MonitorReady()); + rep().fire(new MonitorReady()); return; } if (response.has("return") || response.has("error")) { @@ -221,11 +117,11 @@ public class QemuMonitor extends Component { logger.fine( () -> String.format("(Previous \"monitor(in)\" is result " + "from executing %s)", executed)); - rep.fire(MonitorResult.from(executed, response)); + rep().fire(MonitorResult.from(executed, response)); return; } if (response.has("event")) { - MonitorEvent.from(response).ifPresent(rep::fire); + MonitorEvent.from(response).ifPresent(rep()::fire); } } catch (JsonProcessingException e) { throw new IOException(e); @@ -241,8 +137,8 @@ public class QemuMonitor extends Component { @SuppressWarnings({ "PMD.AvoidSynchronizedStatement", "PMD.AvoidDuplicateLiterals" }) public void onClosed(Closed event, SocketIOChannel channel) { + super.onClosed(event, channel); channel.associated(QemuMonitor.class).ifPresent(qm -> { - monitorChannel = null; synchronized (this) { if (powerdownTimer != null) { powerdownTimer.cancel(); @@ -275,7 +171,7 @@ public class QemuMonitor extends Component { return; } synchronized (executing) { - monitorChannel.associated(Writer.class).ifPresent(writer -> { + writer().ifPresent(writer -> { try { executing.add(command); writer.append(asText).append('\n').flush(); @@ -295,7 +191,7 @@ public class QemuMonitor extends Component { @Handler(priority = 100) @SuppressWarnings("PMD.AvoidSynchronizedStatement") public void onStop(Stop event) { - if (monitorChannel != null) { + if (qemuChannel() != null) { // We have a connection to Qemu, attempt ACPI shutdown. event.suspendHandling(); suspendedStop = event; diff --git a/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/Runner.java b/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/Runner.java index e0cd837..0eaabe9 100644 --- a/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/Runner.java +++ b/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/Runner.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.JsonMappingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; import com.fasterxml.jackson.dataformat.yaml.YAMLGenerator; import freemarker.core.ParseException; @@ -198,7 +197,6 @@ public class Runner extends Component { private static final String QEMU = "qemu"; private static final String SWTPM = "swtpm"; private static final String CLOUD_INIT_IMG = "cloudInitImg"; - private static final String GUEST_AGENT_CMDS = "guestAgentCmds"; private static final String TEMPLATE_DIR = "/opt/" + APP_NAME.replace("-", "") + "/templates"; private static final String DEFAULT_TEMPLATE @@ -222,6 +220,7 @@ public class Runner extends Component { private CommandDefinition qemuDefinition; private final QemuMonitor qemuMonitor; private final GuestAgentClient guestAgentClient; + private final VmopAgentClient vmopAgentClient; private Integer resetCounter; private RunState state = RunState.INITIALIZING; @@ -280,6 +279,7 @@ public class Runner extends Component { attach(new SocketConnector(channel())); attach(qemuMonitor = new QemuMonitor(channel(), configDir)); attach(guestAgentClient = new GuestAgentClient(channel())); + attach(vmopAgentClient = new VmopAgentClient(channel())); attach(new StatusUpdater(channel())); attach(new YamlConfigurationStore(channel(), configFile, false)); fire(new WatchFile(configFile.toPath())); @@ -350,16 +350,12 @@ public class Runner extends Component { .map(d -> new CommandDefinition(CLOUD_INIT_IMG, d)) .orElse(null); logger.finest(() -> cloudInitImgDefinition.toString()); - var guestAgentCmds = (ArrayNode) tplData.get(GUEST_AGENT_CMDS); - if (guestAgentCmds != null) { - logger.finest( - () -> "GuestAgentCmds: " + guestAgentCmds.toString()); - } // Forward some values to child components qemuMonitor.configure(config.monitorSocket, config.vm.powerdownTimeout); - guestAgentClient.configure(config.guestAgentSocket, guestAgentCmds); + configureAgentClient(guestAgentClient, "guest-agent-socket"); + configureAgentClient(vmopAgentClient, "vmop-agent-socket"); } catch (IllegalArgumentException | IOException | TemplateException e) { logger.log(Level.SEVERE, e, () -> "Invalid configuration: " + e.getMessage()); @@ -484,6 +480,36 @@ public class Runner extends Component { } } + @SuppressWarnings("PMD.CognitiveComplexity") + private void configureAgentClient(AgentConnector client, String chardev) { + String id = null; + Path path = null; + for (var arg : qemuDefinition.command) { + if (arg.startsWith("virtserialport,") + && arg.contains("chardev=" + chardev)) { + for (var prop : arg.split(",")) { + if (prop.startsWith("id=")) { + id = prop.substring(3); + } + } + } + if (arg.startsWith("socket,") + && arg.contains("id=" + chardev)) { + for (var prop : arg.split(",")) { + if (prop.startsWith("path=")) { + path = Path.of(prop.substring(5)); + } + } + } + } + if (id == null || path == null) { + logger.warning(() -> "Definition of chardev " + chardev + + " missing in runner template."); + return; + } + client.configure(id, path); + } + /** * Handle the started event. * diff --git a/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/VmopAgentClient.java b/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/VmopAgentClient.java new file mode 100644 index 0000000..a74432b --- /dev/null +++ b/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/VmopAgentClient.java @@ -0,0 +1,48 @@ +/* + * VM-Operator + * Copyright (C) 2025 Michael N. Lipp + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package org.jdrupes.vmoperator.runner.qemu; + +import java.io.IOException; +import org.jgrapes.core.Channel; + +/** + * A component that handles the communication over the vmop agent + * socket. + * + * If the log level for this class is set to fine, the messages + * exchanged on the socket are logged. + */ +public class VmopAgentClient extends AgentConnector { + + /** + * Instantiates a new VM operator agent client. + * + * @param componentChannel the component channel + * @throws IOException Signals that an I/O exception has occurred. + */ + public VmopAgentClient(Channel componentChannel) throws IOException { + super(componentChannel); + } + + @Override + protected void processInput(String line) throws IOException { + // TODO Auto-generated method stub + } + +} diff --git a/org.jdrupes.vmoperator.runner.qemu/templates/Standard-VM-latest.ftl.yaml b/org.jdrupes.vmoperator.runner.qemu/templates/Standard-VM-latest.ftl.yaml index 3eacfa3..c5c0252 100644 --- a/org.jdrupes.vmoperator.runner.qemu/templates/Standard-VM-latest.ftl.yaml +++ b/org.jdrupes.vmoperator.runner.qemu/templates/Standard-VM-latest.ftl.yaml @@ -122,11 +122,16 @@ # Best explanation found: # https://fedoraproject.org/wiki/Features/VirtioSerial - [ "-device", "virtio-serial-pci,id=virtio-serial0" ] - # - Guest agent serial connection. MUST have id "channel0"! + # - Guest agent serial connection. - [ "-device", "virtserialport,id=channel0,name=org.qemu.guest_agent.0,\ chardev=guest-agent-socket" ] - [ "-chardev","socket,id=guest-agent-socket,\ path=${ runtimeDir }/org.qemu.guest_agent.0,server=on,wait=off" ] + # - VM operator agent serial connection. + - [ "-device", "virtserialport,id=channel1,name=org.jdrupes.vmop_agent.0,\ + chardev=vmop-agent-socket" ] + - [ "-chardev","socket,id=vmop-agent-socket,\ + path=${ runtimeDir }/org.jdrupes.vmop_agent.0,server=on,wait=off" ] # * USB Hub and devices (more in SPICE configuration below) # https://qemu-project.gitlab.io/qemu/system/devices/usb.html # https://github.com/qemu/qemu/blob/master/hw/usb/hcd-xhci.c @@ -233,7 +238,3 @@ - -"guestAgentCmds": - - "osId": "*" - "executable": "/usr/local/libexec/vm-operator-cmd" diff --git a/webpages/vm-operator/upgrading.md b/webpages/vm-operator/upgrading.md index 2c4253e..422c32d 100644 --- a/webpages/vm-operator/upgrading.md +++ b/webpages/vm-operator/upgrading.md @@ -26,6 +26,13 @@ layout: vm-operator still accepted for backward compatibility until the next major version, but should be updated. + * The standard [template](./runner.html#stand-alone-configuration) used + to generate the QEMU command has been updated. Unless you have enabled + automatic updates of the template in the VM definition, you have to + update the template manually. If you're using your own template, you + have to add a virtual serial port (see the git history of the standard + template for the required addition). + ## To version 3.4.0 Starting with this version, the VM-Operator no longer uses a stateful set