Refactor and implement shutdown.
This commit is contained in:
parent
0a6cb82516
commit
7a9ed262c1
9 changed files with 493 additions and 188 deletions
|
|
@ -10,7 +10,7 @@ plugins {
|
|||
|
||||
dependencies {
|
||||
implementation 'org.jgrapes:org.jgrapes.core:[1.19.0,2)'
|
||||
implementation 'org.jgrapes:org.jgrapes.io:[2.5.0,3)'
|
||||
implementation 'org.jgrapes:org.jgrapes.io:[2.7.0,3)'
|
||||
implementation 'org.jgrapes:org.jgrapes.http:[3.1.0,4)'
|
||||
implementation 'org.jgrapes:org.jgrapes.util:[1.28.0,2)'
|
||||
|
||||
|
|
|
|||
|
|
@ -18,7 +18,6 @@
|
|||
|
||||
package org.jdrupes.vmoperator.runner.qemu;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Files;
|
||||
|
|
@ -46,7 +45,6 @@ class Configuration implements Dto {
|
|||
public Path monitorSocket;
|
||||
public Path firmwareRom;
|
||||
public Path firmwareFlash;
|
||||
public JsonNode monitorMessages;
|
||||
@SuppressWarnings("PMD.ShortVariable")
|
||||
public Vm vm;
|
||||
|
||||
|
|
@ -126,10 +124,8 @@ class Configuration implements Dto {
|
|||
}
|
||||
}
|
||||
runtimeDir += "/vmrunner/" + vm.name;
|
||||
swtpmSocket
|
||||
= Path.of(runtimeDir, "swtpm-sock");
|
||||
monitorSocket
|
||||
= Path.of(runtimeDir, "monitor.sock");
|
||||
swtpmSocket = Path.of(runtimeDir, "swtpm-sock");
|
||||
monitorSocket = Path.of(runtimeDir, "monitor.sock");
|
||||
}
|
||||
Path runtimePath = Path.of(runtimeDir);
|
||||
if (!Files.exists(runtimePath)) {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,251 @@
|
|||
/*
|
||||
* VM-Operator
|
||||
* Copyright (C) 2023 Michael N. Lipp
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as
|
||||
* published by the Free Software Foundation, either version 3 of the
|
||||
* License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package org.jdrupes.vmoperator.runner.qemu;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import java.io.IOException;
|
||||
import java.io.Writer;
|
||||
import java.lang.reflect.UndeclaredThrowableException;
|
||||
import java.net.UnixDomainSocketAddress;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
import org.jgrapes.core.Channel;
|
||||
import org.jgrapes.core.Component;
|
||||
import org.jgrapes.core.Components;
|
||||
import org.jgrapes.core.annotation.Handler;
|
||||
import org.jgrapes.core.events.Start;
|
||||
import org.jgrapes.core.events.Stop;
|
||||
import org.jgrapes.io.events.Closed;
|
||||
import org.jgrapes.io.events.ConnectError;
|
||||
import org.jgrapes.io.events.Input;
|
||||
import org.jgrapes.io.events.OpenSocketConnection;
|
||||
import org.jgrapes.io.util.ByteBufferWriter;
|
||||
import org.jgrapes.io.util.LineCollector;
|
||||
import org.jgrapes.net.SocketIOChannel;
|
||||
import org.jgrapes.net.events.ClientConnected;
|
||||
import org.jgrapes.util.events.ConfigurationUpdate;
|
||||
import org.jgrapes.util.events.FileChanged;
|
||||
import org.jgrapes.util.events.FileChanged.Kind;
|
||||
import org.jgrapes.util.events.WatchFile;
|
||||
|
||||
/**
|
||||
* A component that handles the communication over the Qemu monitor
|
||||
* socket.
|
||||
*/
|
||||
public class QemuMonitor extends Component {
|
||||
|
||||
@SuppressWarnings({ "PMD.FieldNamingConventions",
|
||||
"PMD.VariableNamingConventions" })
|
||||
private static final Logger monitorLog
|
||||
= Logger.getLogger(QemuMonitor.class.getName());
|
||||
|
||||
@SuppressWarnings("PMD.UseConcurrentHashMap")
|
||||
private final Map<String, String> monitorMessages = new HashMap<>(Map.of(
|
||||
"connect", "{ \"execute\": \"qmp_capabilities\" }",
|
||||
"powerdown", "{ \"execute\": \"system_powerdown\" }"));
|
||||
|
||||
private Path socketPath;
|
||||
|
||||
private SocketIOChannel monitorChannel;
|
||||
|
||||
private Stop suspendedStop;
|
||||
|
||||
/**
|
||||
* Instantiates a new qemu monitor.
|
||||
*
|
||||
* @param componentChannel the component channel
|
||||
*/
|
||||
public QemuMonitor(Channel componentChannel) {
|
||||
super(componentChannel);
|
||||
}
|
||||
|
||||
/**
|
||||
* As the configuration of this component depends on the configuration
|
||||
* of the {@link Runner}, it doesn't have a handler for the
|
||||
* {@link ConfigurationUpdate} event. The values are forwarded from the
|
||||
* {@link Runner} instead.
|
||||
*
|
||||
* @param socketPath the socket path
|
||||
*/
|
||||
/* default */ void configure(Path socketPath) {
|
||||
this.socketPath = socketPath;
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle the start event.
|
||||
*
|
||||
* @param event the event
|
||||
* @throws IOException Signals that an I/O exception has occurred.
|
||||
*/
|
||||
@Handler
|
||||
public void onStart(Start event) throws IOException {
|
||||
Files.deleteIfExists(socketPath);
|
||||
fire(new WatchFile(socketPath));
|
||||
}
|
||||
|
||||
/**
|
||||
* Watch for the creation of the swtpm socket and start the
|
||||
* qemu process if it has been created.
|
||||
*
|
||||
* @param event the event
|
||||
* @param context the context
|
||||
*/
|
||||
@Handler
|
||||
public void onFileChanged(FileChanged event) {
|
||||
if (event.change() == Kind.CREATED && event.path().equals(socketPath)) {
|
||||
// qemu running, open socket
|
||||
fire(new OpenSocketConnection(
|
||||
UnixDomainSocketAddress.of(socketPath))
|
||||
.setAssociated(QemuMonitor.class, this));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if this is from opening the monitor socket and if true,
|
||||
* save the socket in the context and associate the channel with
|
||||
* the context. Then send the initial message to the socket.
|
||||
*
|
||||
* @param event the event
|
||||
* @param channel the channel
|
||||
*/
|
||||
@Handler
|
||||
public void onClientConnected(ClientConnected event,
|
||||
SocketIOChannel channel) {
|
||||
event.openEvent().associated(QemuMonitor.class).ifPresent(qm -> {
|
||||
monitorChannel = channel;
|
||||
channel.setAssociated(QemuMonitor.class, this);
|
||||
channel.setAssociated(Writer.class, new ByteBufferWriter(
|
||||
channel).nativeCharset());
|
||||
channel.setAssociated(LineCollector.class,
|
||||
new LineCollector()
|
||||
.consumer(line -> {
|
||||
try {
|
||||
processMonitorInput(line);
|
||||
} catch (IOException e) {
|
||||
throw new UndeclaredThrowableException(e);
|
||||
}
|
||||
}));
|
||||
writeToMonitor(monitorMessages.get("connect"));
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when a connection attempt fails.
|
||||
*
|
||||
* @param event the event
|
||||
* @param channel the channel
|
||||
*/
|
||||
@Handler
|
||||
public void onConnectError(ConnectError event, SocketIOChannel channel) {
|
||||
event.event().associated(QemuMonitor.class).ifPresent(qm -> {
|
||||
fire(new Stop());
|
||||
});
|
||||
}
|
||||
|
||||
private void writeToMonitor(String message) {
|
||||
monitorLog.fine(() -> "monitor(out): " + message);
|
||||
monitorChannel.associated(Writer.class).ifPresent(writer -> {
|
||||
try {
|
||||
writer.append(message).append('\n').flush();
|
||||
} catch (IOException e) {
|
||||
// Cannot happen, but...
|
||||
logger.log(Level.WARNING, e, () -> e.getMessage());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle data from qemu monitor connection.
|
||||
*
|
||||
* @param event the event
|
||||
* @param channel the channel
|
||||
*/
|
||||
@Handler
|
||||
public void onInput(Input<?> event, SocketIOChannel channel) {
|
||||
if (channel.associated(QemuMonitor.class).isEmpty()) {
|
||||
return;
|
||||
}
|
||||
channel.associated(LineCollector.class).ifPresent(collector -> {
|
||||
collector.feed(event);
|
||||
});
|
||||
}
|
||||
|
||||
private void processMonitorInput(String line)
|
||||
throws IOException {
|
||||
monitorLog.fine(() -> "monitor(in): " + line);
|
||||
try {
|
||||
var response
|
||||
= ((Runner) channel()).mapper().readValue(line, JsonNode.class);
|
||||
if (response.has("QMP")) {
|
||||
fire(new QemuMonitorOpened());
|
||||
}
|
||||
} catch (JsonProcessingException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* On closed.
|
||||
*
|
||||
* @param event the event
|
||||
*/
|
||||
@Handler
|
||||
public void onClosed(Closed<?> event, SocketIOChannel channel) {
|
||||
channel.associated(QemuMonitor.class).ifPresent(qm -> {
|
||||
monitorChannel = null;
|
||||
synchronized (this) {
|
||||
if (suspendedStop != null) {
|
||||
suspendedStop.resumeHandling();
|
||||
suspendedStop = null;
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Shutdown the VM.
|
||||
*
|
||||
* @param event the event
|
||||
*/
|
||||
@Handler(priority = 100)
|
||||
public void onStop(Stop event) {
|
||||
if (monitorChannel != null) {
|
||||
// We have a connection to Qemu, attempt ACPI shutdown.
|
||||
event.suspendHandling();
|
||||
suspendedStop = event;
|
||||
writeToMonitor(monitorMessages.get("powerdown"));
|
||||
|
||||
// Schedule timer as fallback
|
||||
Components.schedule(t -> {
|
||||
synchronized (this) {
|
||||
if (suspendedStop != null) {
|
||||
suspendedStop.resumeHandling();
|
||||
suspendedStop = null;
|
||||
}
|
||||
}
|
||||
}, Duration.ofMillis(5000));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,29 @@
|
|||
/*
|
||||
* VM-Operator
|
||||
* Copyright (C) 2023 Michael N. Lipp
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as
|
||||
* published by the Free Software Foundation, either version 3 of the
|
||||
* License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package org.jdrupes.vmoperator.runner.qemu;
|
||||
|
||||
import org.jgrapes.io.events.Opened;
|
||||
|
||||
/**
|
||||
* Signals that the connection to the Qemu monitor socket has been
|
||||
* established successfully.
|
||||
*/
|
||||
public class QemuMonitorOpened extends Opened<Void> {
|
||||
|
||||
}
|
||||
|
|
@ -33,17 +33,18 @@ import java.io.File;
|
|||
import java.io.FileDescriptor;
|
||||
import java.io.IOException;
|
||||
import java.io.StringWriter;
|
||||
import java.io.Writer;
|
||||
import java.net.UnixDomainSocketAddress;
|
||||
import java.lang.reflect.UndeclaredThrowableException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
import static org.jdrupes.vmoperator.runner.qemu.Configuration.BOOT_MODE_SECURE;
|
||||
import static org.jdrupes.vmoperator.runner.qemu.Configuration.BOOT_MODE_UEFI;
|
||||
import org.jdrupes.vmoperator.runner.qemu.StateController.State;
|
||||
import org.jdrupes.vmoperator.util.ExtendedObjectWrapper;
|
||||
import org.jgrapes.core.Channel;
|
||||
import org.jgrapes.core.Component;
|
||||
|
|
@ -51,36 +52,34 @@ import org.jgrapes.core.Components;
|
|||
import org.jgrapes.core.TypedIdKey;
|
||||
import org.jgrapes.core.annotation.Handler;
|
||||
import org.jgrapes.core.events.Start;
|
||||
import org.jgrapes.core.events.Started;
|
||||
import org.jgrapes.core.events.Stop;
|
||||
import org.jgrapes.io.NioDispatcher;
|
||||
import org.jgrapes.io.events.ConnectError;
|
||||
import org.jgrapes.io.events.Input;
|
||||
import org.jgrapes.io.events.OpenSocketConnection;
|
||||
import org.jgrapes.io.events.ProcessExited;
|
||||
import org.jgrapes.io.events.ProcessStarted;
|
||||
import org.jgrapes.io.events.StartProcess;
|
||||
import org.jgrapes.io.process.ProcessManager;
|
||||
import org.jgrapes.io.process.ProcessManager.ProcessChannel;
|
||||
import org.jgrapes.io.util.ByteBufferWriter;
|
||||
import org.jgrapes.io.util.LineCollector;
|
||||
import org.jgrapes.net.SocketConnector;
|
||||
import org.jgrapes.net.SocketIOChannel;
|
||||
import org.jgrapes.net.events.ClientConnected;
|
||||
import org.jgrapes.util.FileSystemWatcher;
|
||||
import org.jgrapes.util.YamlConfigurationStore;
|
||||
import org.jgrapes.util.events.ConfigurationUpdate;
|
||||
import org.jgrapes.util.events.FileChanged;
|
||||
import org.jgrapes.util.events.FileChanged.Kind;
|
||||
import org.jgrapes.util.events.InitialConfiguration;
|
||||
import org.jgrapes.util.events.WatchFile;
|
||||
|
||||
/**
|
||||
* The Runner.
|
||||
*
|
||||
* @startuml
|
||||
* [*] --> Setup
|
||||
* Setup --> Setup: InitialConfiguration/configure Runner
|
||||
* [*] --> Initializing
|
||||
* Initializing -> Initializing: InitialConfiguration/configure Runner
|
||||
* Initializing -> Initializing: Start/start Runner
|
||||
*
|
||||
* state Startup {
|
||||
* state "Starting (Processes)" as StartingProcess {
|
||||
*
|
||||
* state which <<choice>>
|
||||
* state "Start swtpm" as swtpm
|
||||
|
|
@ -105,9 +104,9 @@ import org.jgrapes.util.events.WatchFile;
|
|||
* monitor --> error: ConnectError[for monitor]
|
||||
* }
|
||||
*
|
||||
* Setup --> which: Start
|
||||
* Initializing --> which: Started
|
||||
*
|
||||
* success --> Run
|
||||
* success --> Running
|
||||
* error --> [*]
|
||||
*
|
||||
* @enduml
|
||||
|
|
@ -124,18 +123,15 @@ public class Runner extends Component {
|
|||
private static final String SAVED_TEMPLATE = "VM.ftl.yaml";
|
||||
private static final String FW_FLASH = "fw-flash.fd";
|
||||
|
||||
@SuppressWarnings({ "PMD.FieldNamingConventions",
|
||||
"PMD.VariableNamingConventions" })
|
||||
private static final Logger monitorLog
|
||||
= Logger.getLogger(Runner.class.getPackageName() + ".monitor");
|
||||
|
||||
private static Runner app;
|
||||
|
||||
private final ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
|
||||
private final JsonNode defaults;
|
||||
@SuppressWarnings("PMD.UseConcurrentHashMap")
|
||||
private Configuration config = new Configuration();
|
||||
private final freemarker.template.Configuration fmConfig;
|
||||
private final StateController state;
|
||||
private CommandDefinition swtpmDefinition;
|
||||
private CommandDefinition qemuDefinition;
|
||||
private final QemuMonitor qemuMonitor;
|
||||
|
||||
/**
|
||||
* Instantiates a new runner.
|
||||
|
|
@ -143,7 +139,7 @@ public class Runner extends Component {
|
|||
* @throws IOException Signals that an I/O exception has occurred.
|
||||
*/
|
||||
public Runner() throws IOException {
|
||||
super(new Context());
|
||||
state = new StateController(this);
|
||||
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
|
||||
false);
|
||||
|
||||
|
|
@ -167,6 +163,7 @@ public class Runner extends Component {
|
|||
attach(new FileSystemWatcher(channel()));
|
||||
attach(new ProcessManager(channel()));
|
||||
attach(new SocketConnector(channel()));
|
||||
attach(qemuMonitor = new QemuMonitor(channel()));
|
||||
|
||||
// Configuration store with file in /etc (default)
|
||||
File config = new File(System.getProperty(
|
||||
|
|
@ -176,6 +173,10 @@ public class Runner extends Component {
|
|||
fire(new WatchFile(config.toPath()));
|
||||
}
|
||||
|
||||
/* default */ ObjectMapper mapper() {
|
||||
return mapper;
|
||||
}
|
||||
|
||||
/**
|
||||
* On configuration update.
|
||||
*
|
||||
|
|
@ -184,58 +185,64 @@ public class Runner extends Component {
|
|||
@Handler
|
||||
public void onConfigurationUpdate(ConfigurationUpdate event) {
|
||||
event.structured(componentPath()).ifPresent(c -> {
|
||||
try {
|
||||
config = mapper.convertValue(c, Configuration.class);
|
||||
} catch (IllegalArgumentException e) {
|
||||
logger.log(Level.SEVERE, e, () -> "Invalid configuration: "
|
||||
+ e.getMessage());
|
||||
// Don't use default configuration
|
||||
config = null;
|
||||
if (event instanceof InitialConfiguration) {
|
||||
processInitialConfiguration(c);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
private void processInitialConfiguration(
|
||||
Map<String, Object> runnerConfiguration) {
|
||||
try {
|
||||
config = mapper.convertValue(runnerConfiguration,
|
||||
Configuration.class);
|
||||
if (!config.check()) {
|
||||
// Invalid configuration, not used, problems already logged.
|
||||
config = null;
|
||||
}
|
||||
// Forward some values to child components
|
||||
qemuMonitor.configure(config.monitorSocket);
|
||||
} catch (IllegalArgumentException e) {
|
||||
logger.log(Level.SEVERE, e, () -> "Invalid configuration: "
|
||||
+ e.getMessage());
|
||||
// Don't use default configuration
|
||||
config = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle the start event.
|
||||
*
|
||||
* @param event the event
|
||||
*/
|
||||
@Handler
|
||||
@SuppressWarnings({ "PMD.SystemPrintln" })
|
||||
public void onStart(Start event) {
|
||||
try {
|
||||
if (config == null || !config.check()) {
|
||||
// Invalid configuration, fail
|
||||
if (config == null) {
|
||||
// Missing configuration, fail
|
||||
fire(new Stop());
|
||||
return;
|
||||
}
|
||||
|
||||
// Store process id
|
||||
try (var pidFile = Files.newBufferedWriter(
|
||||
Path.of(config.runtimeDir, "runner.pid"))) {
|
||||
pidFile.write(ProcessHandle.current().pid() + "\n");
|
||||
}
|
||||
|
||||
// Prepare firmware files and add to config
|
||||
setFirmwarePaths();
|
||||
|
||||
// Obtain more data from template
|
||||
// Obtain more context data from template
|
||||
var tplData = dataFromTemplate();
|
||||
|
||||
// Get process definitions etc. from processed data
|
||||
Context context = (Context) channel();
|
||||
context.swtpmDefinition = Optional.ofNullable(tplData.get("swtpm"))
|
||||
swtpmDefinition = Optional.ofNullable(tplData.get("swtpm"))
|
||||
.map(d -> new CommandDefinition("swtpm", d)).orElse(null);
|
||||
context.qemuDefinition = Optional.ofNullable(tplData.get("qemu"))
|
||||
qemuDefinition = Optional.ofNullable(tplData.get("qemu"))
|
||||
.map(d -> new CommandDefinition("qemu", d)).orElse(null);
|
||||
config.monitorMessages = tplData.get("monitorMessages");
|
||||
|
||||
// Files to watch for
|
||||
Files.deleteIfExists(config.swtpmSocket);
|
||||
fire(new WatchFile(config.swtpmSocket));
|
||||
Files.deleteIfExists(config.monitorSocket);
|
||||
fire(new WatchFile(config.monitorSocket));
|
||||
|
||||
// Start first
|
||||
if (config.vm.useTpm && context.swtpmDefinition != null) {
|
||||
startProcess(context, context.swtpmDefinition);
|
||||
return;
|
||||
}
|
||||
startProcess(context, context.qemuDefinition);
|
||||
} catch (IOException | TemplateException e) {
|
||||
logger.log(Level.SEVERE, e,
|
||||
() -> "Cannot configure runner: " + e.getMessage());
|
||||
|
|
@ -307,12 +314,27 @@ public class Runner extends Component {
|
|||
return mapper.readValue(out.toString(), JsonNode.class);
|
||||
}
|
||||
|
||||
private boolean startProcess(Context context, CommandDefinition toStart) {
|
||||
/**
|
||||
* Handle the started event.
|
||||
*
|
||||
* @param event the event
|
||||
*/
|
||||
@Handler
|
||||
public void onStarted(Started event) {
|
||||
state.set(State.STARTING);
|
||||
// Start first process
|
||||
if (config.vm.useTpm && swtpmDefinition != null) {
|
||||
startProcess(swtpmDefinition);
|
||||
return;
|
||||
}
|
||||
startProcess(qemuDefinition);
|
||||
}
|
||||
|
||||
private boolean startProcess(CommandDefinition toStart) {
|
||||
logger.fine(
|
||||
() -> "Starting process: " + String.join(" ", toStart.command));
|
||||
fire(new StartProcess(toStart.command)
|
||||
.setAssociated(Context.class, context)
|
||||
.setAssociated(CommandDefinition.class, toStart), channel());
|
||||
.setAssociated(CommandDefinition.class, toStart));
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
@ -324,22 +346,13 @@ public class Runner extends Component {
|
|||
* @param context the context
|
||||
*/
|
||||
@Handler
|
||||
public void onFileChanged(FileChanged event, Context context) {
|
||||
public void onFileChanged(FileChanged event) {
|
||||
if (event.change() == Kind.CREATED
|
||||
&& event.path()
|
||||
.equals(Path.of(config.runtimeDir, "swtpm-sock"))) {
|
||||
&& event.path().equals(config.swtpmSocket)) {
|
||||
// swtpm running, start qemu
|
||||
startProcess(context, context.qemuDefinition);
|
||||
startProcess(qemuDefinition);
|
||||
return;
|
||||
}
|
||||
var monSockPath = Path.of(config.runtimeDir, "monitor.sock");
|
||||
if (event.change() == Kind.CREATED
|
||||
&& event.path().equals(monSockPath)) {
|
||||
// qemu running, open socket
|
||||
fire(new OpenSocketConnection(
|
||||
UnixDomainSocketAddress.of(monSockPath))
|
||||
.setAssociated(Context.class, context));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -355,34 +368,27 @@ public class Runner extends Component {
|
|||
"PMD.TooFewBranchesForASwitchStatement" })
|
||||
public void onProcessStarted(ProcessStarted event, ProcessChannel channel)
|
||||
throws InterruptedException {
|
||||
event.startEvent().associated(Context.class).ifPresent(context -> {
|
||||
// Associate the process channel with the general context
|
||||
// and with its process definition (both carried over by
|
||||
// the start event).
|
||||
channel.setAssociated(Context.class, context);
|
||||
CommandDefinition procDef
|
||||
= event.startEvent().associated(CommandDefinition.class).get();
|
||||
channel.setAssociated(CommandDefinition.class, procDef);
|
||||
event.startEvent().associated(CommandDefinition.class)
|
||||
.ifPresent(procDef -> {
|
||||
channel.setAssociated(CommandDefinition.class, procDef);
|
||||
try (var pidFile = Files.newBufferedWriter(
|
||||
Path.of(config.runtimeDir, procDef.name + ".pid"))) {
|
||||
pidFile.write(channel.process().toHandle().pid() + "\n");
|
||||
} catch (IOException e) {
|
||||
throw new UndeclaredThrowableException(e);
|
||||
}
|
||||
|
||||
// Associate the channel with a line collector (one for
|
||||
// each stream) for logging the process's output.
|
||||
TypedIdKey.associate(channel, 1, new LineCollector().nativeCharset()
|
||||
.consumer(line -> logger
|
||||
.info(() -> procDef.name() + "(out): " + line)));
|
||||
TypedIdKey.associate(channel, 2, new LineCollector().nativeCharset()
|
||||
.consumer(line -> logger
|
||||
.info(() -> procDef.name() + "(err): " + line)));
|
||||
|
||||
// Register the channel in the context.
|
||||
switch (procDef.name) {
|
||||
case "swtpm":
|
||||
context.swtpmChannel = channel;
|
||||
break;
|
||||
case "qemu":
|
||||
context.qemuChannel = channel;
|
||||
break;
|
||||
}
|
||||
});
|
||||
// Associate the channel with a line collector (one for
|
||||
// each stream) for logging the process's output.
|
||||
TypedIdKey.associate(channel, 1,
|
||||
new LineCollector().nativeCharset()
|
||||
.consumer(line -> logger
|
||||
.info(() -> procDef.name() + "(out): " + line)));
|
||||
TypedIdKey.associate(channel, 2,
|
||||
new LineCollector().nativeCharset()
|
||||
.consumer(line -> logger
|
||||
.info(() -> procDef.name() + "(err): " + line)));
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -399,16 +405,14 @@ public class Runner extends Component {
|
|||
}
|
||||
|
||||
/**
|
||||
* Handle data from qemu monitor connection.
|
||||
* On qemu monitor started.
|
||||
*
|
||||
* @param event the event
|
||||
* @param channel the channel
|
||||
* @param context the context
|
||||
*/
|
||||
@Handler
|
||||
public void onInput(Input<?> event, SocketIOChannel channel) {
|
||||
channel.associated(LineCollector.class).ifPresent(collector -> {
|
||||
collector.feed(event);
|
||||
});
|
||||
public void onQemuMonitorOpened(QemuMonitorOpened event) {
|
||||
state.set(State.RUNNING);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -423,84 +427,19 @@ public class Runner extends Component {
|
|||
}
|
||||
|
||||
/**
|
||||
* Check if this is from opening the monitor socket and if true,
|
||||
* save the socket in the context and associate the channel with
|
||||
* the context. Then send the initial message to the socket.
|
||||
* On stop.
|
||||
*
|
||||
* @param event the event
|
||||
* @param channel the channel
|
||||
*/
|
||||
@Handler
|
||||
public void onClientConnected(ClientConnected event,
|
||||
SocketIOChannel channel) {
|
||||
if (event.openEvent().address() instanceof UnixDomainSocketAddress addr
|
||||
&& addr.getPath()
|
||||
.equals(Path.of(config.runtimeDir, "monitor.sock"))) {
|
||||
event.openEvent().associated(Context.class).ifPresent(context -> {
|
||||
context.monitorChannel = channel;
|
||||
channel.setAssociated(Context.class, context);
|
||||
channel.setAssociated(LineCollector.class,
|
||||
new LineCollector().consumer(line -> {
|
||||
monitorLog.fine(() -> "monitor(in): " + line);
|
||||
}));
|
||||
channel.setAssociated(Writer.class, new ByteBufferWriter(
|
||||
channel).nativeCharset());
|
||||
writeToMonitor(context,
|
||||
config.monitorMessages.get("connect").asText());
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when a connection attempt fails.
|
||||
*
|
||||
* @param event the event
|
||||
* @param channel the channel
|
||||
*/
|
||||
@Handler
|
||||
public void onConnectError(ConnectError event, SocketIOChannel channel) {
|
||||
if (event.event() instanceof OpenSocketConnection openEvent
|
||||
&& openEvent.address() instanceof UnixDomainSocketAddress addr
|
||||
&& addr.getPath()
|
||||
.equals(Path.of(config.runtimeDir, "monitor.sock"))) {
|
||||
openEvent.associated(Context.class).ifPresent(context -> {
|
||||
fire(new Stop());
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void writeToMonitor(Context context, String message) {
|
||||
monitorLog.fine(() -> "monitor(out): " + message);
|
||||
context.monitorChannel.associated(Writer.class)
|
||||
.ifPresent(writer -> {
|
||||
try {
|
||||
writer.append(message).append('\n').flush();
|
||||
} catch (IOException e) {
|
||||
// Cannot happen, but...
|
||||
logger.log(Level.WARNING, e, () -> e.getMessage());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* The context.
|
||||
*/
|
||||
private static class Context implements Channel {
|
||||
public CommandDefinition swtpmDefinition;
|
||||
public CommandDefinition qemuDefinition;
|
||||
public ProcessChannel swtpmChannel;
|
||||
public ProcessChannel qemuChannel;
|
||||
public SocketIOChannel monitorChannel;
|
||||
|
||||
@Override
|
||||
public Object defaultCriterion() {
|
||||
return "ProcMgr";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ProcMgr";
|
||||
}
|
||||
public void onStop(Stop event) {
|
||||
// Context context = (Context) channel();
|
||||
// if (context.qemuChannel != null) {
|
||||
// event.suspendHandling();
|
||||
// context.suspendedStop = event;
|
||||
// writeToMonitor(context,
|
||||
// config.monitorMessages.get("powerdown").asText());
|
||||
// }
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -511,7 +450,7 @@ public class Runner extends Component {
|
|||
public static void main(String[] args) {
|
||||
// The Runner is the root component
|
||||
try {
|
||||
app = new Runner();
|
||||
var app = new Runner();
|
||||
|
||||
// Prepare Stop
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
|
||||
|
|
@ -529,6 +468,5 @@ public class Runner extends Component {
|
|||
Logger.getLogger(Runner.class.getName()).log(Level.SEVERE, e,
|
||||
() -> "Failed to start runner: " + e.getMessage());
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,54 @@
|
|||
/*
|
||||
* VM-Operator
|
||||
* Copyright (C) 2023 Michael N. Lipp
|
||||
*
|
||||
* This program is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU Affero General Public License as
|
||||
* published by the Free Software Foundation, either version 3 of the
|
||||
* License, or (at your option) any later version.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU Affero General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
package org.jdrupes.vmoperator.runner.qemu;
|
||||
|
||||
/**
|
||||
* The context.
|
||||
*/
|
||||
/* default */ class StateController {
|
||||
|
||||
private Runner runner;
|
||||
|
||||
/**
|
||||
* The state.
|
||||
*/
|
||||
enum State {
|
||||
INITIALIZING, STARTING, RUNNING, TERMINATING
|
||||
}
|
||||
|
||||
private State state = State.INITIALIZING;
|
||||
|
||||
public StateController(Runner runner) {
|
||||
this.runner = runner;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the state.
|
||||
*
|
||||
* @param state the new state
|
||||
*/
|
||||
public void set(State state) {
|
||||
this.state = state;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "StateController [state=" + state + "]";
|
||||
}
|
||||
}
|
||||
|
|
@ -152,8 +152,3 @@
|
|||
- [ "-chardev", "spicevmc,id=charredir${ index },name=usbredir" ]
|
||||
- [ "-device", "usb-redir,id=redir${ index },chardev=charredir${ index }" ]
|
||||
</#list>
|
||||
|
||||
|
||||
|
||||
"monitorMessages":
|
||||
"connect": '{ "execute": "qmp_capabilities" }'
|
||||
Loading…
Add table
Add a link
Reference in a new issue