Merge branch 'fix/race-condition'

This commit is contained in:
Michael Lipp 2025-03-12 17:45:07 +01:00
commit fae75dafa9
8 changed files with 129 additions and 104 deletions

View file

@ -8,9 +8,9 @@ metadata:
spec: spec:
image: image:
repository: ghcr.io # source: ghcr.io/mnlipp/org.jdrupes.vmoperator.runner.qemu-arch:3.3.1
path: mnlipp/org.jdrupes.vmoperator.runner.qemu-alpine # source: registry.mnl.de/org/jdrupes/vm-operator/org.jdrupes.vmoperator.runner.qemu-arch:testing
version: latest source: docker-registry.lan.mnl.de/vmoperator/org.jdrupes.vmoperator.runner.qemu-arch:feature-pools
pullPolicy: Always pullPolicy: Always
permissions: permissions:
@ -31,8 +31,8 @@ spec:
bootMenu: true bootMenu: true
maximumCpus: 4 maximumCpus: 4
currentCpus: 2 currentCpus: 2
maximumRam: 4Gi maximumRam: 6Gi
currentRam: 3Gi currentRam: 4Gi
networks: networks:
# No bridge on TC1 # No bridge on TC1

View file

@ -1,5 +1,8 @@
#!/usr/bin/bash #!/usr/bin/bash
# Note that this script requires "jq" to be installed and a version
# of loginctl that accepts the "-j" option.
while [ "$#" -gt 0 ]; do while [ "$#" -gt 0 ]; do
case "$1" in case "$1" in
--path) shift; ttyPath="$1";; --path) shift; ttyPath="$1";;

View file

@ -20,10 +20,10 @@ package org.jdrupes.vmoperator.runner.qemu;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.List;
import org.jdrupes.vmoperator.runner.qemu.events.VserportChangeEvent; import org.jdrupes.vmoperator.runner.qemu.events.VserportChangeEvent;
import org.jgrapes.core.Channel; import org.jgrapes.core.Channel;
import org.jgrapes.core.annotation.Handler; import org.jgrapes.core.annotation.Handler;
import org.jgrapes.util.events.ConfigurationUpdate;
/** /**
* A component that handles the communication with an agent * A component that handles the communication with an agent
@ -47,19 +47,41 @@ public abstract class AgentConnector extends QemuConnector {
} }
/** /**
* As the initial configuration of this component depends on the * Extracts the channel id and the socket path from the QEMU
* configuration of the {@link Runner}, it doesn't have a handler * command line.
* for the {@link ConfigurationUpdate} event. The values are
* forwarded from the {@link Runner} instead.
* *
* @param channelId the channel id * @param command the command
* @param socketPath the socket path * @param chardev the chardev
*/ */
/* default */ void configure(String channelId, Path socketPath) { @SuppressWarnings("PMD.CognitiveComplexity")
super.configure(socketPath); protected void configureConnection(List<String> command, String chardev) {
this.channelId = channelId; Path socketPath = null;
for (var arg : command) {
if (arg.startsWith("virtserialport,")
&& arg.contains("chardev=" + chardev)) {
for (var prop : arg.split(",")) {
if (prop.startsWith("id=")) {
channelId = prop.substring(3);
}
}
}
if (arg.startsWith("socket,")
&& arg.contains("id=" + chardev)) {
for (var prop : arg.split(",")) {
if (prop.startsWith("path=")) {
socketPath = Path.of(prop.substring(5));
}
}
}
}
if (channelId == null || socketPath == null) {
logger.warning(() -> "Definition of chardev " + chardev
+ " missing in runner template.");
return;
}
logger.fine(() -> getClass().getSimpleName() + " configured with" logger.fine(() -> getClass().getSimpleName() + " configured with"
+ " channelId=" + channelId); + " channelId=" + channelId);
super.configure(socketPath);
} }
/** /**

View file

@ -49,6 +49,7 @@ public class DisplayController extends Component {
private String currentPassword; private String currentPassword;
private String protocol; private String protocol;
private final Path configDir; private final Path configDir;
private boolean canBeUpdated;
private boolean vmopAgentConnected; private boolean vmopAgentConnected;
private String loggedInUser; private String loggedInUser;
@ -83,6 +84,7 @@ public class DisplayController extends Component {
if (event.runState() == RunState.STARTING) { if (event.runState() == RunState.STARTING) {
configurePassword(); configurePassword();
} }
canBeUpdated = true;
} }
/** /**
@ -114,7 +116,8 @@ public class DisplayController extends Component {
@Handler @Handler
@SuppressWarnings("PMD.EmptyCatchBlock") @SuppressWarnings("PMD.EmptyCatchBlock")
public void onFileChanged(FileChanged event) { public void onFileChanged(FileChanged event) {
if (event.path().equals(configDir.resolve(DisplaySecret.PASSWORD))) { if (event.path().equals(configDir.resolve(DisplaySecret.PASSWORD))
&& canBeUpdated) {
configurePassword(); configurePassword();
} }
} }

View file

@ -56,7 +56,7 @@ public class GuestAgentClient extends AgentConnector {
*/ */
@Override @Override
protected void agentConnected() { protected void agentConnected() {
fire(new GuestAgentCommand(new QmpGuestGetOsinfo())); rep().fire(new GuestAgentCommand(new QmpGuestGetOsinfo()));
} }
/** /**

View file

@ -121,7 +121,7 @@ public abstract class QemuConnector extends Component {
// qemu running, open socket // qemu running, open socket
fire(new OpenSocketConnection( fire(new OpenSocketConnection(
UnixDomainSocketAddress.of(socketPath)) UnixDomainSocketAddress.of(socketPath))
.setAssociated(getClass(), this)); .setAssociated(this, this));
} }
} }
@ -137,21 +137,21 @@ public abstract class QemuConnector extends Component {
@Handler @Handler
public void onClientConnected(ClientConnected event, public void onClientConnected(ClientConnected event,
SocketIOChannel channel) { SocketIOChannel channel) {
event.openEvent().associated(getClass()).ifPresent(qm -> { event.openEvent().associated(this, getClass()).ifPresent(qc -> {
qemuChannel = channel; qemuChannel = channel;
channel.setAssociated(getClass(), this); channel.setAssociated(this, this);
channel.setAssociated(Writer.class, new ByteBufferWriter( channel.setAssociated(Writer.class, new ByteBufferWriter(
channel).nativeCharset()); channel).nativeCharset());
channel.setAssociated(LineCollector.class, channel.setAssociated(LineCollector.class,
new LineCollector() new LineCollector()
.consumer(line -> { .consumer(line -> {
try { try {
processInput(line); qc.processInput(line);
} catch (IOException e) { } catch (IOException e) {
throw new UndeclaredThrowableException(e); throw new UndeclaredThrowableException(e);
} }
})); }));
socketConnected(); qc.socketConnected();
}); });
} }
@ -202,11 +202,10 @@ public abstract class QemuConnector extends Component {
* Called when a connection attempt fails. * Called when a connection attempt fails.
* *
* @param event the event * @param event the event
* @param channel the channel
*/ */
@Handler @Handler
public void onConnectError(ConnectError event, SocketIOChannel channel) { public void onConnectError(ConnectError event) {
event.event().associated(getClass()).ifPresent(qm -> { event.event().associated(this, getClass()).ifPresent(qc -> {
rep.fire(new Stop()); rep.fire(new Stop());
}); });
} }
@ -219,7 +218,7 @@ public abstract class QemuConnector extends Component {
*/ */
@Handler @Handler
public void onInput(Input<?> event, SocketIOChannel channel) { public void onInput(Input<?> event, SocketIOChannel channel) {
if (channel.associated(getClass()).isEmpty()) { if (channel.associated(this, getClass()).isEmpty()) {
return; return;
} }
channel.associated(LineCollector.class).ifPresent(collector -> { channel.associated(LineCollector.class).ifPresent(collector -> {
@ -243,7 +242,7 @@ public abstract class QemuConnector extends Component {
*/ */
@Handler @Handler
public void onClosed(Closed<?> event, SocketIOChannel channel) { public void onClosed(Closed<?> event, SocketIOChannel channel) {
channel.associated(getClass()).ifPresent(qm -> { channel.associated(this, getClass()).ifPresent(qm -> {
qemuChannel = null; qemuChannel = null;
}); });
} }

View file

@ -61,6 +61,7 @@ public class QemuMonitor extends QemuConnector {
private Stop suspendedStop; private Stop suspendedStop;
private Timer powerdownTimer; private Timer powerdownTimer;
private boolean powerdownConfirmed; private boolean powerdownConfirmed;
private boolean monitorReady;
/** /**
* Instantiates a new QEMU monitor. * Instantiates a new QEMU monitor.
@ -99,7 +100,7 @@ public class QemuMonitor extends QemuConnector {
*/ */
@Override @Override
protected void socketConnected() { protected void socketConnected() {
fire(new MonitorCommand(new QmpCapabilities())); rep().fire(new MonitorCommand(new QmpCapabilities()));
} }
@Override @Override
@ -109,6 +110,7 @@ public class QemuMonitor extends QemuConnector {
try { try {
var response = mapper.readValue(line, ObjectNode.class); var response = mapper.readValue(line, ObjectNode.class);
if (response.has("QMP")) { if (response.has("QMP")) {
monitorReady = true;
rep().fire(new MonitorReady()); rep().fire(new MonitorReady());
return; return;
} }
@ -137,8 +139,9 @@ public class QemuMonitor extends QemuConnector {
@SuppressWarnings({ "PMD.AvoidSynchronizedStatement", @SuppressWarnings({ "PMD.AvoidSynchronizedStatement",
"PMD.AvoidDuplicateLiterals" }) "PMD.AvoidDuplicateLiterals" })
public void onClosed(Closed<?> event, SocketIOChannel channel) { public void onClosed(Closed<?> event, SocketIOChannel channel) {
logger.finer(() -> "Closing QMP socket.");
super.onClosed(event, channel); super.onClosed(event, channel);
channel.associated(QemuMonitor.class).ifPresent(qm -> { channel.associated(this, getClass()).ifPresent(qm -> {
synchronized (this) { synchronized (this) {
if (powerdownTimer != null) { if (powerdownTimer != null) {
powerdownTimer.cancel(); powerdownTimer.cancel();
@ -149,6 +152,7 @@ public class QemuMonitor extends QemuConnector {
} }
} }
}); });
logger.finer(() -> "QMP socket closed.");
} }
/** /**
@ -158,9 +162,17 @@ public class QemuMonitor extends QemuConnector {
* @throws IOException * @throws IOException
*/ */
@Handler @Handler
@SuppressWarnings({ "PMD.AvoidLiteralsInIfCondition", @SuppressWarnings("PMD.AvoidSynchronizedStatement")
"PMD.AvoidSynchronizedStatement" }) public void onMonitorCommand(MonitorCommand event) throws IOException {
public void onExecQmpCommand(MonitorCommand event) throws IOException { // Check prerequisites
if (!monitorReady && !(event.command() instanceof QmpCapabilities)) {
logger.severe(() -> "Premature monitor command (not ready): "
+ event.command());
rep().fire(new Stop());
return;
}
// Send the command
var command = event.command(); var command = event.command();
logger.fine(() -> "monitor(out): " + command.toString()); logger.fine(() -> "monitor(out): " + command.toString());
String asText; String asText;

View file

@ -214,12 +214,14 @@ public class Runner extends Component {
@SuppressWarnings("PMD.UseConcurrentHashMap") @SuppressWarnings("PMD.UseConcurrentHashMap")
private final File configFile; private final File configFile;
private final Path configDir; private final Path configDir;
private Configuration config = new Configuration(); private Configuration initialConfig;
private Configuration pendingConfig;
private final freemarker.template.Configuration fmConfig; private final freemarker.template.Configuration fmConfig;
private CommandDefinition swtpmDefinition; private CommandDefinition swtpmDefinition;
private CommandDefinition cloudInitImgDefinition; private CommandDefinition cloudInitImgDefinition;
private CommandDefinition qemuDefinition; private CommandDefinition qemuDefinition;
private final QemuMonitor qemuMonitor; private final QemuMonitor qemuMonitor;
private boolean qmpConfigured;
private final GuestAgentClient guestAgentClient; private final GuestAgentClient guestAgentClient;
private final VmopAgentClient vmopAgentClient; private final VmopAgentClient vmopAgentClient;
private Integer resetCounter; private Integer resetCounter;
@ -301,7 +303,10 @@ public class Runner extends Component {
} }
/** /**
* On configuration update. * Process the initial configuration. The initial configuration
* and any subsequent updates will be forwarded to other components
* only when the QMP connection is ready
* (see @link #onQmpConfigured(QmpConfigured)).
* *
* @param event the event * @param event the event
*/ */
@ -318,27 +323,33 @@ public class Runner extends Component {
// Special actions for initial configuration (startup) // Special actions for initial configuration (startup)
if (event instanceof InitialConfiguration) { if (event instanceof InitialConfiguration) {
processInitialConfiguration(newConf); processInitialConfiguration(newConf);
return;
} }
logger.fine(() -> "Updating configuration");
// Check if to be sent immediately or later
if (qmpConfigured) {
rep.fire(new ConfigureQemu(newConf, state)); rep.fire(new ConfigureQemu(newConf, state));
} else {
pendingConfig = newConf;
}
}); });
} }
@SuppressWarnings("PMD.LambdaCanBeMethodReference") @SuppressWarnings("PMD.LambdaCanBeMethodReference")
private void processInitialConfiguration(Configuration newConfig) { private void processInitialConfiguration(Configuration newConfig) {
try { try {
config = newConfig; if (!newConfig.check()) {
if (!config.check()) {
// Invalid configuration, not used, problems already logged. // Invalid configuration, not used, problems already logged.
config = null; return;
} }
// Prepare firmware files and add to config // Prepare firmware files and add to config
setFirmwarePaths(); setFirmwarePaths(newConfig);
// Obtain more context data from template // Obtain more context data from template
var tplData = dataFromTemplate(); var tplData = dataFromTemplate(newConfig);
initialConfig = newConfig;
// Configure
swtpmDefinition = Optional.ofNullable(tplData.get(SWTPM)) swtpmDefinition = Optional.ofNullable(tplData.get(SWTPM))
.map(d -> new CommandDefinition(SWTPM, d)).orElse(null); .map(d -> new CommandDefinition(SWTPM, d)).orElse(null);
logger.finest(() -> swtpmDefinition.toString()); logger.finest(() -> swtpmDefinition.toString());
@ -352,21 +363,21 @@ public class Runner extends Component {
logger.finest(() -> cloudInitImgDefinition.toString()); logger.finest(() -> cloudInitImgDefinition.toString());
// Forward some values to child components // Forward some values to child components
qemuMonitor.configure(config.monitorSocket, qemuMonitor.configure(initialConfig.monitorSocket,
config.vm.powerdownTimeout); initialConfig.vm.powerdownTimeout);
configureAgentClient(guestAgentClient, "guest-agent-socket"); guestAgentClient.configureConnection(qemuDefinition.command,
configureAgentClient(vmopAgentClient, "vmop-agent-socket"); "guest-agent-socket");
vmopAgentClient.configureConnection(qemuDefinition.command,
"vmop-agent-socket");
} catch (IllegalArgumentException | IOException | TemplateException e) { } catch (IllegalArgumentException | IOException | TemplateException e) {
logger.log(Level.SEVERE, e, () -> "Invalid configuration: " logger.log(Level.SEVERE, e, () -> "Invalid configuration: "
+ e.getMessage()); + e.getMessage());
// Don't use default configuration
config = null;
} }
} }
@SuppressWarnings({ "PMD.CognitiveComplexity", @SuppressWarnings({ "PMD.CognitiveComplexity",
"PMD.DataflowAnomalyAnalysis" }) "PMD.DataflowAnomalyAnalysis" })
private void setFirmwarePaths() throws IOException { private void setFirmwarePaths(Configuration config) throws IOException {
JsonNode firmware = defaults.path("firmware").path(config.vm.firmware); JsonNode firmware = defaults.path("firmware").path(config.vm.firmware);
// Get file for firmware ROM // Get file for firmware ROM
JsonNode codePaths = firmware.path("rom"); JsonNode codePaths = firmware.path("rom");
@ -396,7 +407,7 @@ public class Runner extends Component {
} }
} }
private JsonNode dataFromTemplate() private JsonNode dataFromTemplate(Configuration config)
throws IOException, TemplateNotFoundException, throws IOException, TemplateNotFoundException,
MalformedTemplateNameException, ParseException, TemplateException, MalformedTemplateNameException, ParseException, TemplateException,
JsonProcessingException, JsonMappingException { JsonProcessingException, JsonMappingException {
@ -435,6 +446,21 @@ public class Runner extends Component {
return yamlMapper.readValue(out.toString(), JsonNode.class); return yamlMapper.readValue(out.toString(), JsonNode.class);
} }
/**
* Note ready state and send a {@link ConfigureQemu} event for
* any pending configuration (initial or change).
*
* @param event the event
*/
@Handler
public void onQmpConfigured(QmpConfigured event) {
qmpConfigured = true;
if (pendingConfig != null) {
rep.fire(new ConfigureQemu(pendingConfig, state));
pendingConfig = null;
}
}
/** /**
* Handle the start event. * Handle the start event.
* *
@ -442,7 +468,7 @@ public class Runner extends Component {
*/ */
@Handler(priority = 100) @Handler(priority = 100)
public void onStart(Start event) { public void onStart(Start event) {
if (config == null) { if (initialConfig == null) {
// Missing configuration, fail // Missing configuration, fail
event.cancel(true); event.cancel(true);
fire(new Stop()); fire(new Stop());
@ -458,19 +484,19 @@ public class Runner extends Component {
try { try {
// Store process id // Store process id
try (var pidFile = Files.newBufferedWriter( try (var pidFile = Files.newBufferedWriter(
config.runtimeDir.resolve("runner.pid"))) { initialConfig.runtimeDir.resolve("runner.pid"))) {
pidFile.write(ProcessHandle.current().pid() + "\n"); pidFile.write(ProcessHandle.current().pid() + "\n");
} }
// Files to watch for // Files to watch for
Files.deleteIfExists(config.swtpmSocket); Files.deleteIfExists(initialConfig.swtpmSocket);
fire(new WatchFile(config.swtpmSocket)); fire(new WatchFile(initialConfig.swtpmSocket));
// Helper files // Helper files
var ticket = Optional.ofNullable(config.vm.display) var ticket = Optional.ofNullable(initialConfig.vm.display)
.map(d -> d.spice).map(s -> s.ticket); .map(d -> d.spice).map(s -> s.ticket);
if (ticket.isPresent()) { if (ticket.isPresent()) {
Files.write(config.runtimeDir.resolve("ticket.txt"), Files.write(initialConfig.runtimeDir.resolve("ticket.txt"),
ticket.get().getBytes()); ticket.get().getBytes());
} }
} catch (IOException e) { } catch (IOException e) {
@ -480,36 +506,6 @@ public class Runner extends Component {
} }
} }
@SuppressWarnings("PMD.CognitiveComplexity")
private void configureAgentClient(AgentConnector client, String chardev) {
String id = null;
Path path = null;
for (var arg : qemuDefinition.command) {
if (arg.startsWith("virtserialport,")
&& arg.contains("chardev=" + chardev)) {
for (var prop : arg.split(",")) {
if (prop.startsWith("id=")) {
id = prop.substring(3);
}
}
}
if (arg.startsWith("socket,")
&& arg.contains("id=" + chardev)) {
for (var prop : arg.split(",")) {
if (prop.startsWith("path=")) {
path = Path.of(prop.substring(5));
}
}
}
}
if (id == null || path == null) {
logger.warning(() -> "Definition of chardev " + chardev
+ " missing in runner template.");
return;
}
client.configure(id, path);
}
/** /**
* Handle the started event. * Handle the started event.
* *
@ -522,12 +518,12 @@ public class Runner extends Component {
"Runner has been started")); "Runner has been started"));
// Start first process(es) // Start first process(es)
qemuLatch.add(QemuPreps.Config); qemuLatch.add(QemuPreps.Config);
if (config.vm.useTpm && swtpmDefinition != null) { if (initialConfig.vm.useTpm && swtpmDefinition != null) {
startProcess(swtpmDefinition); startProcess(swtpmDefinition);
qemuLatch.add(QemuPreps.Tpm); qemuLatch.add(QemuPreps.Tpm);
} }
if (config.cloudInit != null) { if (initialConfig.cloudInit != null) {
generateCloudInitImg(); generateCloudInitImg(initialConfig);
qemuLatch.add(QemuPreps.CloudInit); qemuLatch.add(QemuPreps.CloudInit);
} }
mayBeStartQemu(QemuPreps.Config); mayBeStartQemu(QemuPreps.Config);
@ -546,7 +542,7 @@ public class Runner extends Component {
} }
} }
private void generateCloudInitImg() { private void generateCloudInitImg(Configuration config) {
try { try {
var cloudInitDir = config.dataDir.resolve("cloud-init"); var cloudInitDir = config.dataDir.resolve("cloud-init");
cloudInitDir.toFile().mkdir(); cloudInitDir.toFile().mkdir();
@ -583,7 +579,7 @@ public class Runner extends Component {
private boolean startProcess(CommandDefinition toStart) { private boolean startProcess(CommandDefinition toStart) {
logger.info( logger.info(
() -> "Starting process: " + String.join(" ", toStart.command)); () -> "Starting process: " + String.join(" ", toStart.command));
fire(new StartProcess(toStart.command) rep.fire(new StartProcess(toStart.command)
.setAssociated(CommandDefinition.class, toStart)); .setAssociated(CommandDefinition.class, toStart));
return true; return true;
} }
@ -597,7 +593,7 @@ public class Runner extends Component {
@Handler @Handler
public void onFileChanged(FileChanged event) { public void onFileChanged(FileChanged event) {
if (event.change() == Kind.CREATED if (event.change() == Kind.CREATED
&& event.path().equals(config.swtpmSocket)) { && event.path().equals(initialConfig.swtpmSocket)) {
// swtpm running, maybe start qemu // swtpm running, maybe start qemu
mayBeStartQemu(QemuPreps.Tpm); mayBeStartQemu(QemuPreps.Tpm);
} }
@ -620,7 +616,7 @@ public class Runner extends Component {
.ifPresent(procDef -> { .ifPresent(procDef -> {
channel.setAssociated(CommandDefinition.class, procDef); channel.setAssociated(CommandDefinition.class, procDef);
try (var pidFile = Files.newBufferedWriter( try (var pidFile = Files.newBufferedWriter(
config.runtimeDir.resolve(procDef.name + ".pid"))) { initialConfig.runtimeDir.resolve(procDef.name + ".pid"))) {
pidFile.write(channel.process().toHandle().pid() + "\n"); pidFile.write(channel.process().toHandle().pid() + "\n");
} catch (IOException e) { } catch (IOException e) {
throw new UndeclaredThrowableException(e); throw new UndeclaredThrowableException(e);
@ -652,16 +648,6 @@ public class Runner extends Component {
.ifPresent(lc -> lc.feed(event))); .ifPresent(lc -> lc.feed(event)));
} }
/**
* When the monitor is ready, send QEMU its initial configuration.
*
* @param event the event
*/
@Handler
public void onQmpConfigured(QmpConfigured event) {
rep.fire(new ConfigureQemu(config, state));
}
/** /**
* Whenever a new QEMU configuration is available, check if it * Whenever a new QEMU configuration is available, check if it
* is supposed to trigger a reset. * is supposed to trigger a reset.
@ -791,7 +777,7 @@ public class Runner extends Component {
logger.log(Level.WARNING, e, () -> "Proper shutdown failed."); logger.log(Level.WARNING, e, () -> "Proper shutdown failed.");
} }
Optional.ofNullable(config).map(c -> c.runtimeDir) Optional.ofNullable(initialConfig).map(c -> c.runtimeDir)
.ifPresent(runtimeDir -> { .ifPresent(runtimeDir -> {
try { try {
Files.walk(runtimeDir).sorted(Comparator.reverseOrder()) Files.walk(runtimeDir).sorted(Comparator.reverseOrder())