Support more than one CRD version.

This commit is contained in:
Michael Lipp 2023-08-03 15:58:59 +02:00
parent fc60c3edf1
commit 2f101f2acc
9 changed files with 91 additions and 93 deletions

View file

@ -63,7 +63,7 @@ import org.jdrupes.vmoperator.manager.VmDefChanged.Type;
// Get API and check if exists // Get API and check if exists
DynamicKubernetesApi cmApi = new DynamicKubernetesApi("", "v1", DynamicKubernetesApi cmApi = new DynamicKubernetesApi("", "v1",
"configmaps", channel.client()); "configmaps", channel.client());
var existing = K8s.get(cmApi, event.metadata()); var existing = K8s.get(cmApi, event.object().getMetadata());
// If deleted, delete // If deleted, delete
if (event.type() == Type.DELETED) { if (event.type() == Type.DELETED) {

View file

@ -29,9 +29,6 @@ public class Constants {
/** The Constant VM_OP_GROUP. */ /** The Constant VM_OP_GROUP. */
public static final String VM_OP_GROUP = "vmoperator.jdrupes.org"; public static final String VM_OP_GROUP = "vmoperator.jdrupes.org";
/** The Constant VM_OP_VERSION. */
public static final String VM_OP_VERSION = "v1";
/** The Constant VM_OP_KIND_VM. */ /** The Constant VM_OP_KIND_VM. */
public static final String VM_OP_KIND_VM = "VirtualMachine"; public static final String VM_OP_KIND_VM = "VirtualMachine";

View file

@ -18,21 +18,20 @@
package org.jdrupes.vmoperator.manager; package org.jdrupes.vmoperator.manager;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.Configuration;
import io.kubernetes.client.util.Config;
import java.io.IOException; import java.io.IOException;
import java.util.Map;
import org.jgrapes.core.Channel; import org.jgrapes.core.Channel;
import org.jgrapes.core.Component; import org.jgrapes.core.Component;
import org.jgrapes.core.annotation.Handler; import org.jgrapes.core.annotation.Handler;
import org.jgrapes.util.events.ConfigurationUpdate; import org.jgrapes.core.events.Start;
import org.jgrapes.util.events.InitialConfiguration;
/** /**
* The application class. * The application class.
*/ */
public class Controller extends Component { public class Controller extends Component {
// private final ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
/** /**
* Instantiates a new manager. * Instantiates a new manager.
* *
@ -46,47 +45,16 @@ public class Controller extends Component {
} }
/** /**
* On configuration update. * Handle the start event. Has higher priority because it configures
* the default Kubernetes client.
* *
* @param event the event * @param event the event
* @throws IOException
* @throws ApiException
*/ */
@Handler @Handler(priority = 100)
public void onConfigurationUpdate(ConfigurationUpdate event) { public void onStart(Start event) throws IOException, ApiException {
event.structured(componentPath()).ifPresent(c -> { var client = Config.defaultClient();
if (event instanceof InitialConfiguration) { Configuration.setDefaultApiClient(client);
processInitialConfiguration(c);
}
});
}
private void processInitialConfiguration(
Map<String, Object> runnerConfiguration) {
// try {
// config = mapper.convertValue(runnerConfiguration,
// Configuration.class);
// if (!config.check()) {
// // Invalid configuration, not used, problems already logged.
// config = null;
// }
//
// // Prepare firmware files and add to config
// setFirmwarePaths();
//
// // Obtain more context data from template
// var tplData = dataFromTemplate();
// swtpmDefinition = Optional.ofNullable(tplData.get("swtpm"))
// .map(d -> new CommandDefinition("swtpm", d)).orElse(null);
// qemuDefinition = Optional.ofNullable(tplData.get("qemu"))
// .map(d -> new CommandDefinition("qemu", d)).orElse(null);
//
// // Forward some values to child components
// qemuMonitor.configure(config.monitorSocket,
// config.vm.powerdownTimeout);
// } catch (IllegalArgumentException | IOException | TemplateException e) {
// logger.log(Level.SEVERE, e, () -> "Invalid configuration: "
// + e.getMessage());
// // Don't use default configuration
// config = null;
// }
} }
} }

View file

@ -111,12 +111,12 @@ import static org.jdrupes.vmoperator.manager.Constants.VM_OP_NAME;
throws ApiException { throws ApiException {
// Get API and check and list related // Get API and check and list related
var pvcApi = K8s.pvcApi(channel.client()); var pvcApi = K8s.pvcApi(channel.client());
var pvcs = pvcApi.list(event.metadata().getNamespace(), var pvcs = pvcApi.list(event.object().getMetadata().getNamespace(),
new ListOptions().labelSelector( new ListOptions().labelSelector(
"app.kubernetes.io/managed-by=" + VM_OP_NAME "app.kubernetes.io/managed-by=" + VM_OP_NAME
+ ",app.kubernetes.io/name=" + APP_NAME + ",app.kubernetes.io/name=" + APP_NAME
+ ",app.kubernetes.io/instance=" + ",app.kubernetes.io/instance="
+ event.metadata().getName())); + event.object().getMetadata().getName()));
for (var pvc : pvcs.getObject().getItems()) { for (var pvc : pvcs.getObject().getItems()) {
K8s.delete(pvcApi, pvc); K8s.delete(pvcApi, pvc);
} }

View file

@ -40,6 +40,16 @@ import java.util.Optional;
@SuppressWarnings({ "PMD.ShortClassName", "PMD.UseUtilityClass" }) @SuppressWarnings({ "PMD.ShortClassName", "PMD.UseUtilityClass" })
public class K8s { public class K8s {
/**
* Given a groupVersion, returns only the version.
*
* @param groupVersion the group version
* @return the string
*/
public static String version(String groupVersion) {
return groupVersion.substring(groupVersion.lastIndexOf('/') + 1);
}
/** /**
* Get PVC API. * Get PVC API.
* *

View file

@ -64,7 +64,7 @@ import org.jdrupes.vmoperator.manager.VmDefChanged.Type;
// Check if exists // Check if exists
DynamicKubernetesApi podApi = new DynamicKubernetesApi("", "v1", DynamicKubernetesApi podApi = new DynamicKubernetesApi("", "v1",
"pods", channel.client()); "pods", channel.client());
var existing = K8s.get(podApi, event.metadata()); var existing = K8s.get(podApi, event.object().getMetadata());
// Get state // Get state
var state = GsonPtr.to((JsonObject) model.get("cr")).to("spec", "vm") var state = GsonPtr.to((JsonObject) model.get("cr")).to("spec", "vm")

View file

@ -33,7 +33,6 @@ import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import static org.jdrupes.vmoperator.manager.Constants.VM_OP_GROUP; import static org.jdrupes.vmoperator.manager.Constants.VM_OP_GROUP;
import static org.jdrupes.vmoperator.manager.Constants.VM_OP_VERSION;
import org.jdrupes.vmoperator.manager.VmDefChanged.Type; import org.jdrupes.vmoperator.manager.VmDefChanged.Type;
import org.jdrupes.vmoperator.util.ExtendedObjectWrapper; import org.jdrupes.vmoperator.util.ExtendedObjectWrapper;
import org.jgrapes.core.Channel; import org.jgrapes.core.Channel;
@ -96,9 +95,10 @@ public class Reconciler extends Component {
public void onVmDefChanged(VmDefChanged event, WatchChannel channel) public void onVmDefChanged(VmDefChanged event, WatchChannel channel)
throws ApiException, TemplateException, IOException { throws ApiException, TemplateException, IOException {
// Get complete VM (CR) definition // Get complete VM (CR) definition
var apiVersion = K8s.version(event.object().getApiVersion());
DynamicKubernetesApi vmCrApi = new DynamicKubernetesApi(VM_OP_GROUP, DynamicKubernetesApi vmCrApi = new DynamicKubernetesApi(VM_OP_GROUP,
VM_OP_VERSION, event.crd().getName(), channel.client()); apiVersion, event.crd().getName(), channel.client());
var defMeta = event.metadata(); var defMeta = event.object().getMetadata();
// Get common data for all reconciles // Get common data for all reconciles
DynamicKubernetesObject vmDef = null; DynamicKubernetesObject vmDef = null;

View file

@ -19,7 +19,7 @@
package org.jdrupes.vmoperator.manager; package org.jdrupes.vmoperator.manager;
import io.kubernetes.client.openapi.models.V1APIResource; import io.kubernetes.client.openapi.models.V1APIResource;
import io.kubernetes.client.openapi.models.V1ObjectMeta; import io.kubernetes.client.openapi.models.V1Namespace;
import org.jgrapes.core.Channel; import org.jgrapes.core.Channel;
import org.jgrapes.core.Components; import org.jgrapes.core.Components;
import org.jgrapes.core.Event; import org.jgrapes.core.Event;
@ -38,7 +38,7 @@ public class VmDefChanged extends Event<Void> {
private final Type type; private final Type type;
private final V1APIResource crd; private final V1APIResource crd;
private final V1ObjectMeta metadata; private final V1Namespace object;
/** /**
* Instantiates a new VM changed event. * Instantiates a new VM changed event.
@ -46,10 +46,10 @@ public class VmDefChanged extends Event<Void> {
* @param type the type * @param type the type
* @param metadata the metadata * @param metadata the metadata
*/ */
public VmDefChanged(Type type, V1APIResource crd, V1ObjectMeta metadata) { public VmDefChanged(Type type, V1APIResource crd, V1Namespace object) {
this.type = type; this.type = type;
this.crd = crd; this.crd = crd;
this.metadata = metadata; this.object = object;
} }
/** /**
@ -71,19 +71,19 @@ public class VmDefChanged extends Event<Void> {
} }
/** /**
* Returns the metadata. * Returns the object.
* *
* @return the metadata * @return the object.
*/ */
public V1ObjectMeta metadata() { public V1Namespace object() {
return metadata; return object;
} }
@Override @Override
public String toString() { public String toString() {
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
builder.append(Components.objectName(this)).append(" [").append(type) builder.append(Components.objectName(this)).append(" [").append(type)
.append(' ').append(metadata.getName()); .append(' ').append(object.getMetadata().getName());
if (channels() != null) { if (channels() != null) {
builder.append(", channels="); builder.append(", channels=");
builder.append(Channel.toString(channels())); builder.append(Channel.toString(channels()));

View file

@ -22,11 +22,13 @@ import com.google.gson.reflect.TypeToken;
import io.kubernetes.client.openapi.ApiClient; import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException; import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.Configuration; import io.kubernetes.client.openapi.Configuration;
import io.kubernetes.client.openapi.apis.ApisApi;
import io.kubernetes.client.openapi.apis.CustomObjectsApi; import io.kubernetes.client.openapi.apis.CustomObjectsApi;
import io.kubernetes.client.openapi.models.V1APIGroup;
import io.kubernetes.client.openapi.models.V1APIResource; import io.kubernetes.client.openapi.models.V1APIResource;
import io.kubernetes.client.openapi.models.V1GroupVersionForDiscovery;
import io.kubernetes.client.openapi.models.V1Namespace; import io.kubernetes.client.openapi.models.V1Namespace;
import io.kubernetes.client.openapi.models.V1ObjectMeta; import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.util.Config;
import io.kubernetes.client.util.Watch; import io.kubernetes.client.util.Watch;
import java.io.IOException; import java.io.IOException;
import java.util.Map; import java.util.Map;
@ -34,7 +36,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level; import java.util.logging.Level;
import okhttp3.Call; import okhttp3.Call;
import static org.jdrupes.vmoperator.manager.Constants.VM_OP_GROUP; import static org.jdrupes.vmoperator.manager.Constants.VM_OP_GROUP;
import static org.jdrupes.vmoperator.manager.Constants.VM_OP_VERSION;
import org.jdrupes.vmoperator.manager.VmDefChanged.Type; import org.jdrupes.vmoperator.manager.VmDefChanged.Type;
import org.jgrapes.core.Channel; import org.jgrapes.core.Channel;
import org.jgrapes.core.Component; import org.jgrapes.core.Component;
@ -49,7 +50,6 @@ import org.jgrapes.core.events.Stop;
public class VmWatcher extends Component { public class VmWatcher extends Component {
private ApiClient client; private ApiClient client;
private V1APIResource vmsCrd;
private String managedNamespace = "qemu-vms"; private String managedNamespace = "qemu-vms";
private final Map<String, WatchChannel> channels private final Map<String, WatchChannel> channels
= new ConcurrentHashMap<>(); = new ConcurrentHashMap<>();
@ -72,44 +72,67 @@ public class VmWatcher extends Component {
*/ */
@Handler @Handler
public void onStart(Start event) throws IOException, ApiException { public void onStart(Start event) throws IOException, ApiException {
client = Config.defaultClient(); client = Configuration.getDefaultApiClient();
Configuration.setDefaultApiClient(client); // Get all our API versions
var apis = new ApisApi(client).getAPIVersions();
// Get access to API var vmOpApiVersions = apis.getGroups().stream()
.filter(g -> g.getName().equals(VM_OP_GROUP)).findFirst()
.map(V1APIGroup::getVersions).stream().flatMap(l -> l.stream())
.map(V1GroupVersionForDiscovery::getVersion).toList();
var coa = new CustomObjectsApi(client); var coa = new CustomObjectsApi(client);
for (var version : vmOpApiVersions) {
// Start a watcher for each existing CRD version.
coa.getAPIResources(VM_OP_GROUP, version)
.getResources().stream()
.filter(r -> Constants.VM_OP_KIND_VM.equals(r.getKind()))
.findFirst()
.ifPresent(crd -> serveCrVersion(coa, crd, version));
}
}
// Derive all information from the CRD private void serveCrVersion(CustomObjectsApi coa, V1APIResource crd,
var resources String version) {
= coa.getAPIResources(VM_OP_GROUP, VM_OP_VERSION); Call call;
vmsCrd = resources.getResources().stream() try {
.filter(r -> Constants.VM_OP_KIND_VM.equals(r.getKind())) call = coa.listNamespacedCustomObjectCall(VM_OP_GROUP, version,
.findFirst().get(); managedNamespace, crd.getName(), null, false, null, null, null,
null, null, null, null, true, null);
// Watch the resources (vm definitions) } catch (ApiException e) {
Call call = coa.listNamespacedCustomObjectCall( logger.log(Level.FINE, e,
VM_OP_GROUP, VM_OP_VERSION, managedNamespace, vmsCrd.getName(), () -> "Probem watching: " + e.getMessage());
null, false, null, null, null, null, null, null, null, true, null); return;
new Thread(() -> { }
try (Watch<V1Namespace> watch = Watch.createWatch(client, call, @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
new TypeToken<Watch.Response<V1Namespace>>() { var watcher = new Thread(() -> {
}.getType())) { try {
for (Watch.Response<V1Namespace> item : watch) { // Watch sometimes terminates without apparent reason.
handleVmDefinitionEvent(item); while (true) {
try (Watch<V1Namespace> watch
= Watch.createWatch(client, call,
new TypeToken<Watch.Response<V1Namespace>>() {
}.getType())) {
for (Watch.Response<V1Namespace> item : watch) {
handleVmDefinitionEvent(crd, item);
}
}
} }
} catch (IOException | ApiException e) { } catch (IOException | ApiException e) {
logger.log(Level.FINE, e, () -> "Probem while watching: " logger.log(Level.FINE, e, () -> "Probem watching: "
+ e.getMessage()); + e.getMessage());
} }
fire(new Stop()); fire(new Stop());
}).start(); });
watcher.setDaemon(true);
watcher.start();
} }
private void handleVmDefinitionEvent(Watch.Response<V1Namespace> item) { private void handleVmDefinitionEvent(V1APIResource vmsCrd,
Watch.Response<V1Namespace> item) {
V1ObjectMeta metadata = item.object.getMetadata(); V1ObjectMeta metadata = item.object.getMetadata();
WatchChannel channel = channels.computeIfAbsent(metadata.getName(), WatchChannel channel = channels.computeIfAbsent(metadata.getName(),
k -> new WatchChannel(channel(), newEventPipeline(), client)); k -> new WatchChannel(channel(), newEventPipeline(), client));
channel.pipeline().fire(new VmDefChanged( channel.pipeline().fire(new VmDefChanged(VmDefChanged.Type
VmDefChanged.Type.valueOf(item.type), vmsCrd, metadata), channel); .valueOf(item.type), vmsCrd, item.object), channel);
} }
/** /**
@ -121,7 +144,7 @@ public class VmWatcher extends Component {
@Handler(priority = -10_000) @Handler(priority = -10_000)
public void onVmDefChanged(VmDefChanged event, WatchChannel channel) { public void onVmDefChanged(VmDefChanged event, WatchChannel channel) {
if (event.type() == Type.DELETED) { if (event.type() == Type.DELETED) {
channels.remove(event.metadata().getName()); channels.remove(event.object().getMetadata().getName());
} }
} }