Adjust grace period to powerdown timeout.

This commit is contained in:
Michael Lipp 2023-08-08 19:00:15 +02:00
parent 093c6cf1d0
commit 65306f27b3
10 changed files with 112 additions and 48 deletions

View file

@ -58,7 +58,7 @@ import org.jdrupes.vmoperator.manager.VmDefChanged.Type;
* @throws ApiException the api exception * @throws ApiException the api exception
*/ */
public DynamicKubernetesObject reconcile(VmDefChanged event, public DynamicKubernetesObject reconcile(VmDefChanged event,
Map<String, Object> model, WatchChannel channel) Map<String, Object> model, VmChannel channel)
throws IOException, TemplateException, ApiException { throws IOException, TemplateException, ApiException {
// Get API and check if exists // Get API and check if exists
DynamicKubernetesApi cmApi = new DynamicKubernetesApi("", "v1", DynamicKubernetesApi cmApi = new DynamicKubernetesApi("", "v1",

View file

@ -55,7 +55,7 @@ import java.util.Map;
* @throws IOException Signals that an I/O exception has occurred. * @throws IOException Signals that an I/O exception has occurred.
*/ */
@SuppressWarnings("PMD.ConfusingTernary") @SuppressWarnings("PMD.ConfusingTernary")
public void reconcile(Map<String, Object> model, WatchChannel channel) public void reconcile(Map<String, Object> model, VmChannel channel)
throws TemplateException, ApiException, IOException { throws TemplateException, ApiException, IOException {
// Combine template and data and parse result // Combine template and data and parse result
var fmTemplate = fmConfig.getTemplate("runnerDataPvc.ftl.yaml"); var fmTemplate = fmConfig.getTemplate("runnerDataPvc.ftl.yaml");

View file

@ -42,10 +42,10 @@ import static org.jdrupes.vmoperator.manager.Constants.VM_OP_NAME;
* @param channel the channel * @param channel the channel
* @throws ApiException the api exception * @throws ApiException the api exception
*/ */
public void reconcile(DynamicKubernetesObject vmDef, public void reconcile(JsonObject vmDef,
WatchChannel channel) throws ApiException { VmChannel channel) throws ApiException {
@SuppressWarnings("PMD.AvoidDuplicateLiterals") @SuppressWarnings("PMD.AvoidDuplicateLiterals")
var disks = GsonPtr.to(vmDef.getRaw()) var disks = GsonPtr.to(vmDef)
.get(JsonArray.class, "spec", "vm", "disks") .get(JsonArray.class, "spec", "vm", "disks")
.map(JsonArray::asList).orElse(Collections.emptyList()); .map(JsonArray::asList).orElse(Collections.emptyList());
int index = 0; int index = 0;
@ -55,15 +55,15 @@ import static org.jdrupes.vmoperator.manager.Constants.VM_OP_NAME;
} }
@SuppressWarnings({ "PMD.AvoidDuplicateLiterals", "PMD.ConfusingTernary" }) @SuppressWarnings({ "PMD.AvoidDuplicateLiterals", "PMD.ConfusingTernary" })
private void reconcileDisk(DynamicKubernetesObject vmDefinition, private void reconcileDisk(JsonObject vmDefinition,
int index, JsonObject diskDef, WatchChannel channel) int index, JsonObject diskDef, VmChannel channel)
throws ApiException { throws ApiException {
if (!diskDef.has("volumeClaimTemplate")) { if (!diskDef.has("volumeClaimTemplate")) {
return; return;
} }
var pvcObject = new DynamicKubernetesObject(); var pvcObject = new DynamicKubernetesObject();
var pvcRaw = GsonPtr.to(pvcObject.getRaw()); var pvcRaw = GsonPtr.to(pvcObject.getRaw());
var vmRaw = GsonPtr.to(vmDefinition.getRaw()); var vmRaw = GsonPtr.to(vmDefinition);
var pvcTpl = GsonPtr.to(diskDef).to("volumeClaimTemplate"); var pvcTpl = GsonPtr.to(diskDef).to("volumeClaimTemplate");
// Copy base and metadata from template and add missing/additional data. // Copy base and metadata from template and add missing/additional data.
@ -110,7 +110,7 @@ import static org.jdrupes.vmoperator.manager.Constants.VM_OP_NAME;
* @param channel the channel * @param channel the channel
* @throws ApiException the api exception * @throws ApiException the api exception
*/ */
public void deleteDisks(VmDefChanged event, WatchChannel channel) public void deleteDisks(VmDefChanged event, VmChannel channel)
throws ApiException { throws ApiException {
// Get API and check and list related // Get API and check and list related
var pvcApi = K8s.pvcApi(channel.client()); var pvcApi = K8s.pvcApi(channel.client());

View file

@ -164,6 +164,28 @@ public class GsonPtr {
.map(JsonPrimitive::getAsString); .map(JsonPrimitive::getAsString);
} }
/**
* Returns the Integer value of the selected {@link JsonPrimitive}.
*
* @param selectors the selectors
* @return the as string
*/
public Optional<Integer> getAsInt(Object... selectors) {
return get(JsonPrimitive.class, selectors)
.map(JsonPrimitive::getAsInt);
}
/**
* Returns the Long value of the selected {@link JsonPrimitive}.
*
* @param selectors the selectors
* @return the as string
*/
public Optional<Long> getAsLong(Object... selectors) {
return get(JsonPrimitive.class, selectors)
.map(JsonPrimitive::getAsLong);
}
/** /**
* Sets the selected value. This pointer must point to a * Sets the selected value. This pointer must point to a
* {@link JsonObject} or {@link JsonArray}. The selector must * {@link JsonObject} or {@link JsonArray}. The selector must

View file

@ -31,6 +31,7 @@ import io.kubernetes.client.openapi.models.V1PersistentVolumeClaimList;
import io.kubernetes.client.openapi.models.V1Pod; import io.kubernetes.client.openapi.models.V1Pod;
import io.kubernetes.client.openapi.models.V1PodList; import io.kubernetes.client.openapi.models.V1PodList;
import io.kubernetes.client.util.generic.GenericKubernetesApi; import io.kubernetes.client.util.generic.GenericKubernetesApi;
import io.kubernetes.client.util.generic.options.DeleteOptions;
import io.kubernetes.client.util.generic.options.PatchOptions; import io.kubernetes.client.util.generic.options.PatchOptions;
import java.util.Optional; import java.util.Optional;
@ -97,8 +98,8 @@ public class K8s {
* @return the object * @return the object
*/ */
public static <T extends KubernetesObject, LT extends KubernetesListObject> public static <T extends KubernetesObject, LT extends KubernetesListObject>
Optional<T> get(GenericKubernetesApi<T, LT> api, V1ObjectMeta meta) Optional<T>
throws ApiException { get(GenericKubernetesApi<T, LT> api, V1ObjectMeta meta) {
var response = api.get(meta.getNamespace(), meta.getName()); var response = api.get(meta.getNamespace(), meta.getName());
if (response.isSuccess()) { if (response.isSuccess()) {
return Optional.of(response.getObject()); return Optional.of(response.getObject());
@ -121,6 +122,21 @@ public class K8s {
object.getMetadata().getName()).throwsApiException(); object.getMetadata().getName()).throwsApiException();
} }
/**
* Delete an object.
*
* @param <T> the generic type
* @param <LT> the generic type
* @param api the api
* @param object the object
*/
public static <T extends KubernetesObject, LT extends KubernetesListObject>
void delete(GenericKubernetesApi<T, LT> api, T object,
DeleteOptions options) throws ApiException {
api.delete(object.getMetadata().getNamespace(),
object.getMetadata().getName(), options).throwsApiException();
}
/** /**
* Apply the given patch data. * Apply the given patch data.
* *

View file

@ -25,6 +25,7 @@ import io.kubernetes.client.custom.V1Patch;
import io.kubernetes.client.openapi.ApiException; import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.util.generic.dynamic.DynamicKubernetesApi; import io.kubernetes.client.util.generic.dynamic.DynamicKubernetesApi;
import io.kubernetes.client.util.generic.dynamic.Dynamics; import io.kubernetes.client.util.generic.dynamic.Dynamics;
import io.kubernetes.client.util.generic.options.DeleteOptions;
import java.io.IOException; import java.io.IOException;
import java.io.StringWriter; import java.io.StringWriter;
import java.util.Map; import java.util.Map;
@ -59,15 +60,14 @@ import org.jdrupes.vmoperator.manager.VmDefChanged.Type;
* @throws ApiException the api exception * @throws ApiException the api exception
*/ */
public void reconcile(VmDefChanged event, Map<String, Object> model, public void reconcile(VmDefChanged event, Map<String, Object> model,
WatchChannel channel) VmChannel channel)
throws IOException, TemplateException, ApiException { throws IOException, TemplateException, ApiException {
// Check if exists // Check if exists
DynamicKubernetesApi podApi = new DynamicKubernetesApi("", "v1", DynamicKubernetesApi podApi = new DynamicKubernetesApi("", "v1",
"pods", channel.client()); "pods", channel.client());
var existing = K8s.get(podApi, event.object().getMetadata()); var existing = K8s.get(podApi, event.object().getMetadata());
// Get state. Note that model is only available if event type // Get desired state.
// is not DELETED.
var delete = event.type() == Type.DELETED var delete = event.type() == Type.DELETED
|| GsonPtr.to((JsonObject) model.get("cr")).to("spec", "vm") || GsonPtr.to((JsonObject) model.get("cr")).to("spec", "vm")
.getAsString("state").orElse("").equals(STATE_STOPPED); .getAsString("state").orElse("").equals(STATE_STOPPED);
@ -75,7 +75,11 @@ import org.jdrupes.vmoperator.manager.VmDefChanged.Type;
// If deleted or stopped, delete // If deleted or stopped, delete
if (delete) { if (delete) {
if (existing.isPresent()) { if (existing.isPresent()) {
K8s.delete(podApi, existing.get()); var opts = new DeleteOptions();
opts.setGracePeriodSeconds(
GsonPtr.to((JsonObject) model.get("cr")).to("spec", "vm")
.getAsLong("powerdownTimeout").get() + 1);
K8s.delete(podApi, existing.get(), opts);
} }
return; return;
} }

View file

@ -30,7 +30,6 @@ import freemarker.template.TemplateHashModel;
import freemarker.template.TemplateNotFoundException; import freemarker.template.TemplateNotFoundException;
import io.kubernetes.client.openapi.ApiException; import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.util.generic.dynamic.DynamicKubernetesApi; import io.kubernetes.client.util.generic.dynamic.DynamicKubernetesApi;
import io.kubernetes.client.util.generic.dynamic.DynamicKubernetesObject;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
@ -94,7 +93,7 @@ public class Reconciler extends Component {
*/ */
@Handler @Handler
@SuppressWarnings("PMD.ConfusingTernary") @SuppressWarnings("PMD.ConfusingTernary")
public void onVmDefChanged(VmDefChanged event, WatchChannel channel) public void onVmDefChanged(VmDefChanged event, VmChannel channel)
throws ApiException, TemplateException, IOException { throws ApiException, TemplateException, IOException {
// Get complete VM (CR) definition // Get complete VM (CR) definition
var apiVersion = K8s.version(event.object().getApiVersion()); var apiVersion = K8s.version(event.object().getApiVersion());
@ -102,22 +101,23 @@ public class Reconciler extends Component {
apiVersion, event.crd().getName(), channel.client()); apiVersion, event.crd().getName(), channel.client());
var defMeta = event.object().getMetadata(); var defMeta = event.object().getMetadata();
// Get common data for all reconciles // Update state
DynamicKubernetesObject vmDef = null;
Map<String, Object> model = null;
if (event.type() != Type.DELETED) { if (event.type() != Type.DELETED) {
vmDef = K8s.get(vmCrApi, defMeta).get(); channel.setState(
patchCr(K8s.get(vmCrApi, defMeta).get().getRaw().deepCopy()));
// Prepare Freemarker model
model = new HashMap<>();
model.put("cr", patchCr(vmDef.getRaw().deepCopy()));
model.put("constants",
(TemplateHashModel) new DefaultObjectWrapperBuilder(
Configuration.VERSION_2_3_32)
.build().getStaticModels()
.get(Constants.class.getName()));
} }
// Get common data for all reconciles
JsonObject vmDef = channel.state();
@SuppressWarnings("PMD.UseConcurrentHashMap")
Map<String, Object> model = new HashMap<>();
model.put("cr", vmDef);
model.put("constants",
(TemplateHashModel) new DefaultObjectWrapperBuilder(
Configuration.VERSION_2_3_32)
.build().getStaticModels()
.get(Constants.class.getName()));
// Reconcile // Reconcile
if (event.type() != Type.DELETED) { if (event.type() != Type.DELETED) {
dataReconciler.reconcile(model, channel); dataReconciler.reconcile(model, channel);
@ -132,7 +132,7 @@ public class Reconciler extends Component {
} }
} }
private Object patchCr(JsonObject vmDef) { private JsonObject patchCr(JsonObject vmDef) {
// Adjust cdromImage path // Adjust cdromImage path
var disks var disks
= GsonPtr.to(vmDef).to("spec", "vm", "disks").get(JsonArray.class); = GsonPtr.to(vmDef).to("spec", "vm", "disks").get(JsonArray.class);

View file

@ -18,34 +18,56 @@
package org.jdrupes.vmoperator.manager; package org.jdrupes.vmoperator.manager;
import com.google.gson.JsonObject;
import io.kubernetes.client.openapi.ApiClient; import io.kubernetes.client.openapi.ApiClient;
import org.jgrapes.core.Channel; import org.jgrapes.core.Channel;
import org.jgrapes.core.EventPipeline; import org.jgrapes.core.EventPipeline;
import org.jgrapes.core.Subchannel.DefaultSubchannel; import org.jgrapes.core.Subchannel.DefaultSubchannel;
/** /**
* A subchannel used to send the events related to a specific * A subchannel used to send the events related to a specific VM.
* VM.
*/ */
public class WatchChannel extends DefaultSubchannel { public class VmChannel extends DefaultSubchannel {
private final EventPipeline pipeline; private final EventPipeline pipeline;
private final ApiClient client; private final ApiClient client;
private JsonObject state;
/** /**
* Instantiates a new watch channel. * Instantiates a new watch channel.
* *
* @param mainChannel the main channel * @param mainChannel the main channel
* @param pipeline the pipeline * @param pipeline the pipeline
* @param client * @param client the client
*/ */
public WatchChannel(Channel mainChannel, EventPipeline pipeline, public VmChannel(Channel mainChannel, EventPipeline pipeline,
ApiClient client) { ApiClient client) {
super(mainChannel); super(mainChannel);
this.pipeline = pipeline; this.pipeline = pipeline;
this.client = client; this.client = client;
} }
/**
* Sets the last known state of the resource.
*
* @param state the state
* @return the watch channel
*/
@SuppressWarnings("PMD.LinguisticNaming")
public VmChannel setState(JsonObject state) {
this.state = state;
return this;
}
/**
* Returns the last known state of the resource.
*
* @return the json object
*/
public JsonObject state() {
return state;
}
/** /**
* Returns the pipeline. * Returns the pipeline.
* *

View file

@ -83,8 +83,8 @@ public class VmDefChanged extends Event<Void> {
@Override @Override
public String toString() { public String toString() {
StringBuilder builder = new StringBuilder(); StringBuilder builder = new StringBuilder();
builder.append(Components.objectName(this)).append(" [").append(type) builder.append(Components.objectName(this)).append(" [")
.append(' ').append(object.getMetadata().getName()); .append(object.getMetadata().getName()).append(' ').append(type);
if (channels() != null) { if (channels() != null) {
builder.append(", channels="); builder.append(", channels=");
builder.append(Channel.toString(channels())); builder.append(Channel.toString(channels()));

View file

@ -39,7 +39,6 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level; import java.util.logging.Level;
import okhttp3.Call;
import static org.jdrupes.vmoperator.manager.Constants.VM_OP_GROUP; import static org.jdrupes.vmoperator.manager.Constants.VM_OP_GROUP;
import static org.jdrupes.vmoperator.manager.Constants.VM_OP_KIND_VM; import static org.jdrupes.vmoperator.manager.Constants.VM_OP_KIND_VM;
import org.jdrupes.vmoperator.manager.VmDefChanged.Type; import org.jdrupes.vmoperator.manager.VmDefChanged.Type;
@ -57,7 +56,7 @@ public class VmWatcher extends Component {
private ApiClient client; private ApiClient client;
private String managedNamespace = "qemu-vms"; private String managedNamespace = "qemu-vms";
private final Map<String, WatchChannel> channels private final Map<String, VmChannel> channels
= new ConcurrentHashMap<>(); = new ConcurrentHashMap<>();
/** /**
@ -90,13 +89,14 @@ public class VmWatcher extends Component {
var coa = new CustomObjectsApi(client); var coa = new CustomObjectsApi(client);
purge(coa, vmOpApiVersions); purge(coa, vmOpApiVersions);
// Start a watcher for each existing CRD version. // Start a watcher thread for each existing CRD version.
// The watcher will send us an "ADDED" for each existing VM.
for (var version : vmOpApiVersions) { for (var version : vmOpApiVersions) {
coa.getAPIResources(VM_OP_GROUP, version) coa.getAPIResources(VM_OP_GROUP, version)
.getResources().stream() .getResources().stream()
.filter(r -> Constants.VM_OP_KIND_VM.equals(r.getKind())) .filter(r -> Constants.VM_OP_KIND_VM.equals(r.getKind()))
.findFirst() .findFirst()
.ifPresent(crd -> serveCrVersion(coa, crd, version)); .ifPresent(crd -> watchVmDefs(coa, crd, version));
} }
} }
@ -150,7 +150,7 @@ public class VmWatcher extends Component {
return result; return result;
} }
private void serveCrVersion(CustomObjectsApi coa, V1APIResource crd, private void watchVmDefs(CustomObjectsApi coa, V1APIResource crd,
String version) { String version) {
@SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
var watcher = new Thread(() -> { var watcher = new Thread(() -> {
@ -165,7 +165,7 @@ public class VmWatcher extends Component {
new TypeToken<Watch.Response<V1Namespace>>() { new TypeToken<Watch.Response<V1Namespace>>() {
}.getType())) { }.getType())) {
for (Watch.Response<V1Namespace> item : watch) { for (Watch.Response<V1Namespace> item : watch) {
handleVmDefinitionEvent(crd, item); handleVmDefinitionChange(crd, item);
} }
} catch (IllegalStateException e) { } catch (IllegalStateException e) {
logger.log(Level.FINE, e, () -> "Probem watching: " logger.log(Level.FINE, e, () -> "Probem watching: "
@ -182,11 +182,11 @@ public class VmWatcher extends Component {
watcher.start(); watcher.start();
} }
private void handleVmDefinitionEvent(V1APIResource vmsCrd, private void handleVmDefinitionChange(V1APIResource vmsCrd,
Watch.Response<V1Namespace> item) { Watch.Response<V1Namespace> item) {
V1ObjectMeta metadata = item.object.getMetadata(); V1ObjectMeta metadata = item.object.getMetadata();
WatchChannel channel = channels.computeIfAbsent(metadata.getName(), VmChannel channel = channels.computeIfAbsent(metadata.getName(),
k -> new WatchChannel(channel(), newEventPipeline(), client)); k -> new VmChannel(channel(), newEventPipeline(), client));
channel.pipeline().fire(new VmDefChanged(VmDefChanged.Type channel.pipeline().fire(new VmDefChanged(VmDefChanged.Type
.valueOf(item.type), vmsCrd, item.object), channel); .valueOf(item.type), vmsCrd, item.object), channel);
} }
@ -198,7 +198,7 @@ public class VmWatcher extends Component {
* @param channel the channel * @param channel the channel
*/ */
@Handler(priority = -10_000) @Handler(priority = -10_000)
public void onVmDefChanged(VmDefChanged event, WatchChannel channel) { public void onVmDefChanged(VmDefChanged event, VmChannel channel) {
if (event.type() == Type.DELETED) { if (event.type() == Type.DELETED) {
channels.remove(event.object().getMetadata().getName()); channels.remove(event.object().getMetadata().getName());
} }