Support for display secrets (#21)
Some checks failed
Java CI with Gradle / build (push) Has been cancelled

This commit is contained in:
Michael N. Lipp 2024-03-20 11:03:09 +01:00 committed by GitHub
parent 85b0a160f3
commit 3103452170
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
38 changed files with 2081 additions and 658 deletions

View file

@ -126,8 +126,14 @@ spec:
# hostPath:
# path: /sys/fs/cgroup
- name: config
configMap:
name: ${ cr.metadata.name.asString }
projected:
sources:
- configMap:
name: ${ cr.metadata.name.asString }
<#if displaySecret??>
- secret:
name: ${ displaySecret }
</#if>
- name: vmop-image-repository
persistentVolumeClaim:
claimName: vmop-image-repository

View file

@ -0,0 +1,287 @@
/*
* VM-Operator
* Copyright (C) 2024 Michael N. Lipp
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.jdrupes.vmoperator.manager;
import io.kubernetes.client.Discovery.APIResource;
import io.kubernetes.client.common.KubernetesListObject;
import io.kubernetes.client.common.KubernetesObject;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.util.Watch.Response;
import io.kubernetes.client.util.generic.options.ListOptions;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import org.jdrupes.vmoperator.common.K8s;
import org.jdrupes.vmoperator.common.K8sClient;
import org.jdrupes.vmoperator.common.K8sObserver;
import org.jdrupes.vmoperator.common.K8sObserver.ResponseType;
import org.jdrupes.vmoperator.manager.events.ChannelManager;
import org.jdrupes.vmoperator.manager.events.Exit;
import org.jgrapes.core.Channel;
import org.jgrapes.core.Component;
import org.jgrapes.core.Components;
import org.jgrapes.core.annotation.Handler;
import org.jgrapes.core.events.Start;
import org.jgrapes.core.events.Stop;
import org.jgrapes.util.events.ConfigurationUpdate;
/**
* A base class for monitoring VM related resources.
*
* @param <O> the object type for the context
* @param <L> the object list type for the context
*/
@SuppressWarnings({ "PMD.DataflowAnomalyAnalysis" })
public abstract class AbstractMonitor<O extends KubernetesObject,
L extends KubernetesListObject, C extends Channel> extends Component {
private final Class<O> objectClass;
private final Class<L> objectListClass;
private K8sClient client;
private APIResource context;
private String namespace;
private ListOptions options = new ListOptions();
private final AtomicInteger observerCounter = new AtomicInteger(0);
private ChannelManager<String, C, ?> channelManager;
private boolean channelManagerMaster;
/**
* Initializes the instance.
*
* @param componentChannel the component channel
*/
protected AbstractMonitor(Channel componentChannel, Class<O> objectClass,
Class<L> objectListClass) {
super(componentChannel);
this.objectClass = objectClass;
this.objectListClass = objectListClass;
}
/**
* Return the client.
*
* @return the client
*/
public K8sClient client() {
return client;
}
/**
* Sets the client to be used.
*
* @param client the client
* @return the abstract monitor
*/
public AbstractMonitor<O, L, C> client(K8sClient client) {
this.client = client;
return this;
}
/**
* Return the observed namespace.
*
* @return the namespace
*/
public String namespace() {
return namespace;
}
/**
* Sets the namespace to be observed.
*
* @param namespace the namespaceToWatch to set
* @return the abstract monitor
*/
public AbstractMonitor<O, L, C> namespace(String namespace) {
this.namespace = namespace;
return this;
}
/**
* Returns the options for selecting the objects to observe.
*
* @return the options
*/
public ListOptions options() {
return options;
}
/**
* Sets the options for selecting the objects to observe.
*
* @param options the options to set
* @return the abstract monitor
*/
public AbstractMonitor<O, L, C> options(ListOptions options) {
this.options = options;
return this;
}
/**
* Returns the observed context.
*
* @return the context
*/
public APIResource context() {
return context;
}
/**
* Sets the context to observe.
*
* @param context the context
* @return the abstract monitor
*/
public AbstractMonitor<O, L, C> context(APIResource context) {
this.context = context;
return this;
}
/**
* Returns the channel manager.
*
* @return the context
*/
public ChannelManager<String, C, ?> channelManager() {
return channelManager;
}
/**
* Sets the channel manager.
*
* @param channelManager the channel manager
* @return the abstract monitor
*/
public AbstractMonitor<O, L, C>
channelManager(ChannelManager<String, C, ?> channelManager) {
this.channelManager = channelManager;
return this;
}
/**
* Looks for a key "namespace" in the configuration and, if found,
* sets the namespace to its value.
*
* @param event the event
*/
@Handler
public void onConfigurationUpdate(ConfigurationUpdate event) {
event.structured(Components.manager(parent()).componentPath())
.ifPresent(c -> {
if (c.containsKey("namespace")) {
namespace = (String) c.get("namespace");
}
});
}
/**
* Handle the start event. Configures the namespace invokes
* {@link #prepareMonitoring()} and starts the observers.
*
* @param event the event
*/
@Handler(priority = 10)
@SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
public void onStart(Start event) {
try {
// Get namespace
if (namespace == null) {
var path = Path
.of("/var/run/secrets/kubernetes.io/serviceaccount/namespace");
if (Files.isReadable(path)) {
namespace
= Files.lines(path).findFirst().orElse(null);
}
}
// Additional preparations by derived class
prepareMonitoring();
assert client != null;
assert context != null;
assert namespace != null;
logger.fine(() -> "Observing " + K8s.toString(context)
+ " objects in " + namespace);
// Monitor all versions
for (var version : context.getVersions()) {
createObserver(version);
}
registerAsGenerator();
} catch (IOException | ApiException e) {
logger.log(Level.SEVERE, e,
() -> "Cannot watch VMs, terminating.");
event.cancel(true);
fire(new Exit(1));
}
}
private void createObserver(String version) {
observerCounter.incrementAndGet();
new K8sObserver<>(objectClass, objectListClass, client,
K8s.preferred(context, version), namespace, options)
.handler((c, r) -> {
handleChange(c, r);
if (ResponseType.valueOf(r.type) == ResponseType.DELETED
&& channelManagerMaster) {
channelManager.remove(r.object.getMetadata().getName());
}
}).onTerminated((o, t) -> {
if (observerCounter.decrementAndGet() == 0) {
unregisterAsGenerator();
}
// Exception has been logged already
if (t != null) {
fire(new Stop());
}
}).start();
}
/**
* Invoked by {@link #onStart(Start)} after the namespace has
* been configured and before starting the observer.
*
* @throws IOException Signals that an I/O exception has occurred.
* @throws ApiException the api exception
*/
@SuppressWarnings("PMD.EmptyMethodInAbstractClassShouldBeAbstract")
protected void prepareMonitoring() throws IOException, ApiException {
// To be overridden by derived class.
}
/**
* Handle an observed change.
*
* @param client the client
* @param change the change
*/
protected abstract void handleChange(K8sClient client, Response<O> change);
/**
* Returns the {@link Channel} for the given name.
*
* @param name the name
* @return the channel used for events related to the specified object
*/
protected Optional<C> channel(String name) {
return channelManager.getChannel(name);
}
}

View file

@ -23,6 +23,9 @@ package org.jdrupes.vmoperator.manager;
*/
public class Constants extends org.jdrupes.vmoperator.common.Constants {
/** The Constant COMP_DISPLAY_SECRET. */
public static final String COMP_DISPLAY_SECRET = "display-secret";
/** The Constant STATE_RUNNING. */
public static final String STATE_RUNNING = "Running";

View file

@ -30,6 +30,7 @@ import static org.jdrupes.vmoperator.common.Constants.VM_OP_GROUP;
import static org.jdrupes.vmoperator.common.Constants.VM_OP_KIND_VM;
import org.jdrupes.vmoperator.common.K8sClient;
import org.jdrupes.vmoperator.common.K8sDynamicStub;
import org.jdrupes.vmoperator.manager.events.ChannelManager;
import org.jdrupes.vmoperator.manager.events.Exit;
import org.jdrupes.vmoperator.manager.events.ModifyVm;
import org.jdrupes.vmoperator.manager.events.VmChannel;
@ -46,7 +47,7 @@ import org.jgrapes.util.events.ConfigurationUpdate;
* [Operator Whitepaper](https://github.com/cncf/tag-app-delivery/blob/eece8f7307f2970f46f100f51932db106db46968/operator-wg/whitepaper/Operator-WhitePaper_v1-0.md#operator-components-in-kubernetes).
*
* The implementation splits the controller in two components. The
* {@link VmWatcher} and the {@link Reconciler}. The former watches
* {@link VmMonitor} and the {@link Reconciler}. The former watches
* the VM definitions (CRs) and generates {@link VmDefChanged} events
* when they change. The latter handles the changes and reconciles the
* resources in the cluster.
@ -87,7 +88,20 @@ public class Controller extends Component {
public Controller(Channel componentChannel) {
super(componentChannel);
// Prepare component tree
attach(new VmWatcher(channel()));
ChannelManager<String, VmChannel, ?> chanMgr
= new ChannelManager<>(name -> {
try {
return new VmChannel(channel(), newEventPipeline(),
new K8sClient());
} catch (IOException e) {
logger.log(Level.SEVERE, e, () -> "Failed to create client"
+ " for handling changes: " + e.getMessage());
return null;
}
});
attach(new VmMonitor(channel()).channelManager(chanMgr));
attach(new DisplaySecretsMonitor(channel())
.channelManager(chanMgr.fixed()));
attach(new Reconciler(channel()));
}

View file

@ -0,0 +1,77 @@
/*
* VM-Operator
* Copyright (C) 2024 Michael N. Lipp
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.jdrupes.vmoperator.manager;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.models.V1Secret;
import io.kubernetes.client.openapi.models.V1SecretList;
import io.kubernetes.client.util.Watch.Response;
import io.kubernetes.client.util.generic.options.ListOptions;
import java.io.IOException;
import static org.jdrupes.vmoperator.common.Constants.APP_NAME;
import org.jdrupes.vmoperator.common.K8sClient;
import org.jdrupes.vmoperator.common.K8sObserver.ResponseType;
import org.jdrupes.vmoperator.common.K8sV1SecretStub;
import static org.jdrupes.vmoperator.manager.Constants.COMP_DISPLAY_SECRET;
import org.jdrupes.vmoperator.manager.events.DisplaySecretChanged;
import org.jdrupes.vmoperator.manager.events.VmChannel;
import org.jgrapes.core.Channel;
/**
* Watches for changes of display secrets.
*/
@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
public class DisplaySecretsMonitor
extends AbstractMonitor<V1Secret, V1SecretList, VmChannel> {
/**
* Instantiates a new display secrets monitor.
*
* @param componentChannel the component channel
*/
public DisplaySecretsMonitor(Channel componentChannel) {
super(componentChannel, V1Secret.class, V1SecretList.class);
context(K8sV1SecretStub.CONTEXT);
ListOptions options = new ListOptions();
options.setLabelSelector("app.kubernetes.io/name=" + APP_NAME + ","
+ "app.kubernetes.io/component=" + COMP_DISPLAY_SECRET);
options(options);
}
@Override
protected void prepareMonitoring() throws IOException, ApiException {
client(new K8sClient());
}
@Override
protected void handleChange(K8sClient client, Response<V1Secret> change) {
String vmName = change.object.getMetadata().getLabels()
.get("app.kubernetes.io/instance");
if (vmName == null) {
return;
}
var channel = channel(vmName).orElse(null);
if (channel == null || channel.vmDefinition() == null) {
return;
}
channel.pipeline().fire(new DisplaySecretChanged(
ResponseType.valueOf(change.type), change.object), channel);
}
}

View file

@ -45,9 +45,9 @@ import java.util.Map;
import java.util.Optional;
import org.jdrupes.vmoperator.common.Convertions;
import org.jdrupes.vmoperator.common.K8sDynamicModel;
import org.jdrupes.vmoperator.common.K8sObserver;
import org.jdrupes.vmoperator.manager.events.VmChannel;
import org.jdrupes.vmoperator.manager.events.VmDefChanged;
import org.jdrupes.vmoperator.manager.events.VmDefChanged.Type;
import org.jdrupes.vmoperator.util.ExtendedObjectWrapper;
import org.jdrupes.vmoperator.util.GsonPtr;
import org.jgrapes.core.Channel;
@ -194,7 +194,7 @@ public class Reconciler extends Component {
// Ownership relationships takes care of deletions
var defMeta = event.vmDefinition().getMetadata();
if (event.type() == Type.DELETED) {
if (event.type() == K8sObserver.ResponseType.DELETED) {
logger.fine(() -> "VM \"" + defMeta.getName() + "\" deleted");
return;
}

View file

@ -28,6 +28,7 @@ import java.io.IOException;
import java.io.StringWriter;
import java.util.Map;
import java.util.logging.Logger;
import org.jdrupes.vmoperator.common.K8sV1SecretStub;
import org.jdrupes.vmoperator.common.K8sV1StatefulSetStub;
import org.jdrupes.vmoperator.manager.events.VmChannel;
import org.jdrupes.vmoperator.manager.events.VmDefChanged;
@ -69,6 +70,13 @@ import org.yaml.snakeyaml.constructor.SafeConstructor;
throws IOException, TemplateException, ApiException {
var metadata = event.vmDefinition().getMetadata();
// Check if we have a display secret
var dsStub = K8sV1SecretStub.get(channel.client(),
metadata.getNamespace(), metadata.getName() + "-display-secret");
dsStub.model().ifPresent(m -> {
model.put("displaySecret", m.getMetadata().getName());
});
// Combine template and data and parse result
var fmTemplate = fmConfig.getTemplate("runnerSts.ftl.yaml");
StringWriter out = new StringWriter();

View file

@ -0,0 +1,183 @@
/*
* VM-Operator
* Copyright (C) 2023,2024 Michael N. Lipp
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.jdrupes.vmoperator.manager;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.util.Watch;
import io.kubernetes.client.util.generic.options.ListOptions;
import java.io.IOException;
import java.util.Set;
import java.util.logging.Level;
import java.util.stream.Collectors;
import static org.jdrupes.vmoperator.common.Constants.VM_OP_GROUP;
import org.jdrupes.vmoperator.common.K8s;
import org.jdrupes.vmoperator.common.K8sClient;
import org.jdrupes.vmoperator.common.K8sDynamicModel;
import org.jdrupes.vmoperator.common.K8sDynamicModels;
import org.jdrupes.vmoperator.common.K8sDynamicStub;
import org.jdrupes.vmoperator.common.K8sObserver.ResponseType;
import org.jdrupes.vmoperator.common.K8sV1ConfigMapStub;
import org.jdrupes.vmoperator.common.K8sV1PodStub;
import org.jdrupes.vmoperator.common.K8sV1StatefulSetStub;
import static org.jdrupes.vmoperator.manager.Constants.APP_NAME;
import static org.jdrupes.vmoperator.manager.Constants.VM_OP_KIND_VM;
import static org.jdrupes.vmoperator.manager.Constants.VM_OP_NAME;
import org.jdrupes.vmoperator.manager.events.VmChannel;
import org.jdrupes.vmoperator.manager.events.VmDefChanged;
import org.jdrupes.vmoperator.util.GsonPtr;
import org.jgrapes.core.Channel;
/**
* Watches for changes of VM definitions.
*/
@SuppressWarnings({ "PMD.DataflowAnomalyAnalysis", "PMD.ExcessiveImports" })
public class VmMonitor
extends AbstractMonitor<K8sDynamicModel, K8sDynamicModels, VmChannel> {
/**
* Instantiates a new VM definition watcher.
*
* @param componentChannel the component channel
*/
public VmMonitor(Channel componentChannel) {
super(componentChannel, K8sDynamicModel.class, K8sDynamicModels.class);
}
@Override
protected void prepareMonitoring() throws IOException, ApiException {
client(new K8sClient());
// Get all our API versions
var ctx = K8s.context(client(), VM_OP_GROUP, "", VM_OP_KIND_VM);
if (ctx.isEmpty()) {
logger.severe(() -> "Cannot get CRD context.");
return;
}
context(ctx.get());
// Remove left over resources
purge();
}
@SuppressWarnings("PMD.CognitiveComplexity")
private void purge() throws ApiException {
// Get existing CRs (VMs)
var known = K8sDynamicStub.list(client(), context(), namespace())
.stream().map(stub -> stub.name()).collect(Collectors.toSet());
ListOptions opts = new ListOptions();
opts.setLabelSelector(
"app.kubernetes.io/managed-by=" + VM_OP_NAME + ","
+ "app.kubernetes.io/name=" + APP_NAME);
for (var context : Set.of(K8sV1StatefulSetStub.CONTEXT,
K8sV1ConfigMapStub.CONTEXT)) {
for (var resStub : K8sDynamicStub.list(client(), context,
namespace(), opts)) {
String instance = resStub.model()
.map(m -> m.metadata().getName()).orElse("(unknown)");
if (!known.contains(instance)) {
resStub.delete();
}
}
}
}
@Override
protected void handleChange(K8sClient client,
Watch.Response<K8sDynamicModel> response) {
V1ObjectMeta metadata = response.object.getMetadata();
VmChannel channel = channel(metadata.getName()).orElse(null);
if (channel == null) {
return;
}
// Get full definition and associate with channel as backup
var vmDef = response.object;
if (vmDef.data() == null) {
// ADDED event does not provide data, see
// https://github.com/kubernetes-client/java/issues/3215
vmDef = getModel(client, vmDef);
}
if (vmDef.data() != null) {
// New data, augment and save
addDynamicData(channel.client(), vmDef);
channel.setVmDefinition(vmDef);
} else {
// Reuse cached
vmDef = channel.vmDefinition();
}
if (vmDef == null) {
logger.warning(
() -> "Cannot get model for " + response.object.getMetadata());
return;
}
// Create and fire event
channel.pipeline()
.fire(new VmDefChanged(ResponseType.valueOf(response.type),
channel.setGeneration(
response.object.getMetadata().getGeneration()),
vmDef), channel);
}
private K8sDynamicModel getModel(K8sClient client, K8sDynamicModel vmDef) {
try {
return K8sDynamicStub.get(client, context(), namespace(),
vmDef.metadata().getName()).model().orElse(null);
} catch (ApiException e) {
return null;
}
}
private void addDynamicData(K8sClient client, K8sDynamicModel vmState) {
var rootNode = GsonPtr.to(vmState.data()).get(JsonObject.class);
rootNode.addProperty("nodeName", "");
// VM definition status changes before the pod terminates.
// This results in pod information being shown for a stopped
// VM which is irritating. So check condition first.
var isRunning = GsonPtr.to(rootNode).to("status", "conditions")
.get(JsonArray.class)
.asList().stream().filter(el -> "Running"
.equals(((JsonObject) el).get("type").getAsString()))
.findFirst().map(el -> "True"
.equals(((JsonObject) el).get("status").getAsString()))
.orElse(false);
if (!isRunning) {
return;
}
var podSearch = new ListOptions();
podSearch.setLabelSelector("app.kubernetes.io/name=" + APP_NAME
+ ",app.kubernetes.io/component=" + APP_NAME
+ ",app.kubernetes.io/instance=" + vmState.getMetadata().getName());
try {
var podList
= K8sV1PodStub.list(client, namespace(), podSearch);
for (var podStub : podList) {
rootNode.addProperty("nodeName",
podStub.model().get().getSpec().getNodeName());
}
} catch (ApiException e) {
logger.log(Level.WARNING, e,
() -> "Cannot access node information: " + e.getMessage());
}
}
}

View file

@ -1,360 +0,0 @@
/*
* VM-Operator
* Copyright (C) 2023,2024 Michael N. Lipp
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package org.jdrupes.vmoperator.manager;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.gson.reflect.TypeToken;
import io.kubernetes.client.apimachinery.GroupVersion;
import io.kubernetes.client.apimachinery.GroupVersionKind;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.apis.ApisApi;
import io.kubernetes.client.openapi.apis.CustomObjectsApi;
import io.kubernetes.client.openapi.models.V1APIGroup;
import io.kubernetes.client.openapi.models.V1APIResource;
import io.kubernetes.client.openapi.models.V1GroupVersionForDiscovery;
import io.kubernetes.client.openapi.models.V1Namespace;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.util.Config;
import io.kubernetes.client.util.Watch;
import io.kubernetes.client.util.generic.dynamic.DynamicKubernetesApi;
import io.kubernetes.client.util.generic.options.ListOptions;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Level;
import static org.jdrupes.vmoperator.common.Constants.VM_OP_GROUP;
import org.jdrupes.vmoperator.common.K8sClient;
import org.jdrupes.vmoperator.common.K8sDynamicModel;
import org.jdrupes.vmoperator.common.K8sDynamicStub;
import org.jdrupes.vmoperator.common.K8sV1PodStub;
import static org.jdrupes.vmoperator.manager.Constants.APP_NAME;
import static org.jdrupes.vmoperator.manager.Constants.VM_OP_KIND_VM;
import static org.jdrupes.vmoperator.manager.Constants.VM_OP_NAME;
import org.jdrupes.vmoperator.manager.events.Exit;
import org.jdrupes.vmoperator.manager.events.VmChannel;
import org.jdrupes.vmoperator.manager.events.VmDefChanged;
import org.jdrupes.vmoperator.manager.events.VmDefChanged.Type;
import org.jdrupes.vmoperator.util.GsonPtr;
import org.jgrapes.core.Channel;
import org.jgrapes.core.Component;
import org.jgrapes.core.Components;
import org.jgrapes.core.annotation.Handler;
import org.jgrapes.core.events.Start;
import org.jgrapes.core.events.Stop;
import org.jgrapes.util.events.ConfigurationUpdate;
/**
* Watches for changes of VM definitions.
*/
@SuppressWarnings({ "PMD.DataflowAnomalyAnalysis", "PMD.ExcessiveImports" })
public class VmWatcher extends Component {
private String namespaceToWatch;
private final Map<String, VmChannel> channels = new ConcurrentHashMap<>();
/**
* Instantiates a new VM definition watcher.
*
* @param componentChannel the component channel
*/
public VmWatcher(Channel componentChannel) {
super(componentChannel);
}
/**
* Configure the component.
*
* @param event the event
*/
@Handler
public void onConfigurationUpdate(ConfigurationUpdate event) {
event.structured(Components.manager(parent()).componentPath())
.ifPresent(c -> {
if (c.containsKey("namespace")) {
namespaceToWatch = (String) c.get("namespace");
}
});
}
/**
* Handle the start event.
*
* @param event the event
* @throws IOException
* @throws ApiException
*/
@Handler(priority = 10)
public void onStart(Start event) {
try {
startWatching();
} catch (IOException | ApiException e) {
logger.log(Level.SEVERE, e,
() -> "Cannot watch VMs, terminating.");
event.cancel(true);
fire(new Exit(1));
}
}
private void startWatching() throws IOException, ApiException {
// Get namespace
if (namespaceToWatch == null) {
var path = Path
.of("/var/run/secrets/kubernetes.io/serviceaccount/namespace");
if (Files.isReadable(path)) {
namespaceToWatch = Files.lines(path).findFirst().orElse(null);
}
}
// Availability already checked by Controller.onStart
logger.fine(() -> "Watching namespace \"" + namespaceToWatch + "\".");
// Get all our API versions
var client = Config.defaultClient();
var apis = new ApisApi(client).getAPIVersions();
var vmOpApiVersions = apis.getGroups().stream()
.filter(g -> g.getName().equals(VM_OP_GROUP)).findFirst()
.map(V1APIGroup::getVersions).stream().flatMap(l -> l.stream())
.map(V1GroupVersionForDiscovery::getVersion).toList();
// Remove left overs
var coa = new CustomObjectsApi(client);
purge(client, coa, vmOpApiVersions);
// Start a watcher thread for each existing CRD version.
// The watcher will send us an "ADDED" for each existing VM.
for (var version : vmOpApiVersions) {
coa.getAPIResources(VM_OP_GROUP, version)
.getResources().stream()
.filter(r -> VM_OP_KIND_VM.equals(r.getKind()))
.findFirst()
.ifPresent(crd -> watchVmDefs(crd, version));
}
}
@SuppressWarnings("PMD.CognitiveComplexity")
private void purge(ApiClient client, CustomObjectsApi coa,
List<String> vmOpApiVersions) throws ApiException {
// Get existing CRs (VMs)
Set<String> known = new HashSet<>();
for (var version : vmOpApiVersions) {
// Get all known CR instances.
coa.getAPIResources(VM_OP_GROUP, version)
.getResources().stream()
.filter(r -> VM_OP_KIND_VM.equals(r.getKind()))
.findFirst()
.ifPresent(crd -> known.addAll(getKnown(client, crd, version)));
}
ListOptions opts = new ListOptions();
opts.setLabelSelector(
"app.kubernetes.io/managed-by=" + VM_OP_NAME + ","
+ "app.kubernetes.io/name=" + APP_NAME);
for (String resource : List.of("apps/v1/statefulsets",
"v1/configmaps", "v1/secrets")) {
@SuppressWarnings({ "PMD.AvoidInstantiatingObjectsInLoops",
"PMD.AvoidDuplicateLiterals" })
var resParts = new LinkedList<>(List.of(resource.split("/")));
var group = resParts.size() == 3 ? resParts.poll() : "";
var version = resParts.poll();
var plural = resParts.poll();
// Get resources, selected by label
@SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops")
var api = new DynamicKubernetesApi(group, version, plural, client);
var listObj = api.list(namespaceToWatch, opts).getObject();
if (listObj == null) {
continue;
}
for (var obj : listObj.getItems()) {
String instance = obj.getMetadata().getLabels()
.get("app.kubernetes.io/instance");
if (!known.contains(instance)) {
var resName = obj.getMetadata().getName();
var result = api.delete(namespaceToWatch, resName);
if (!result.isSuccess()) {
logger.warning(() -> "Cannot cleanup resource \""
+ resName + "\": " + result.toString());
}
}
}
}
}
private Set<String> getKnown(ApiClient client, V1APIResource crd,
String version) {
Set<String> result = new HashSet<>();
var api = new DynamicKubernetesApi(VM_OP_GROUP, version,
crd.getName(), client);
for (var item : api.list(namespaceToWatch).getObject().getItems()) {
if (!VM_OP_KIND_VM.equals(item.getKind())) {
continue;
}
result.add(item.getMetadata().getName());
}
return result;
}
private void watchVmDefs(V1APIResource crd, String version) {
@SuppressWarnings({ "PMD.AvoidInstantiatingObjectsInLoops",
"PMD.AvoidCatchingThrowable", "PMD.AvoidCatchingGenericException" })
var watcher = new Thread(() -> {
try {
logger.info(() -> "Watching objects created from "
+ crd.getName() + "." + VM_OP_GROUP + "/" + version);
// Watch sometimes terminates without apparent reason.
while (true) {
Instant startedAt = Instant.now();
var client = Config.defaultClient();
var coa = new CustomObjectsApi(client);
var call = coa.listNamespacedCustomObjectCall(VM_OP_GROUP,
version, namespaceToWatch, crd.getName(), null, false,
null, null, null, null, null, null, null, true, null);
try (Watch<V1Namespace> watch
= Watch.createWatch(client, call,
new TypeToken<Watch.Response<V1Namespace>>() {
}.getType())) {
for (Watch.Response<V1Namespace> item : watch) {
handleVmDefinitionChange(crd, item);
}
} catch (IOException | ApiException | RuntimeException e) {
logger.log(Level.FINE, e, () -> "Problem watching \""
+ crd.getName() + "\" (will retry): "
+ e.getMessage());
delayRestart(startedAt);
}
}
} catch (Throwable e) {
logger.log(Level.SEVERE, e, () -> "Probem watching: "
+ e.getMessage());
}
fire(new Stop());
});
watcher.setDaemon(true);
watcher.start();
}
@SuppressWarnings("PMD.AvoidLiteralsInIfCondition")
private void delayRestart(Instant started) {
var runningFor = Duration
.between(started, Instant.now()).toMillis();
if (runningFor < 5000) {
logger.log(Level.FINE, () -> "Waiting... ");
try {
Thread.sleep(5000 - runningFor);
} catch (InterruptedException e1) { // NOPMD
// Retry
}
logger.log(Level.FINE, () -> "Retrying");
}
}
private void handleVmDefinitionChange(V1APIResource vmsCrd,
Watch.Response<V1Namespace> vmDefRef) throws ApiException {
V1ObjectMeta metadata = vmDefRef.object.getMetadata();
VmChannel channel = channels.computeIfAbsent(metadata.getName(),
k -> {
try {
return new VmChannel(channel(), newEventPipeline(),
new K8sClient());
} catch (IOException e) {
logger.log(Level.SEVERE, e, () -> "Failed to create client"
+ " for handling changes: " + e.getMessage());
return null;
}
});
if (channel == null) {
return;
}
// Get full definition and associate with channel as backup
@SuppressWarnings("PMD.ShortVariable")
var gv = GroupVersion.parse(vmDefRef.object.getApiVersion());
var vmStub = K8sDynamicStub.get(channel.client(),
new GroupVersionKind(gv.getGroup(), gv.getVersion(), VM_OP_KIND_VM),
metadata.getNamespace(), metadata.getName());
vmStub.model().ifPresent(vmDef -> {
addDynamicData(channel.client(), vmDef);
channel.setVmDefinition(vmDef);
// Create and fire event
channel.pipeline().fire(new VmDefChanged(VmDefChanged.Type
.valueOf(vmDefRef.type),
channel
.setGeneration(
vmDefRef.object.getMetadata().getGeneration()),
vmsCrd, vmDef), channel);
});
}
private void addDynamicData(K8sClient client, K8sDynamicModel vmState) {
var rootNode = GsonPtr.to(vmState.data()).get(JsonObject.class);
rootNode.addProperty("nodeName", "");
// VM definition status changes before the pod terminates.
// This results in pod information being shown for a stopped
// VM which is irritating. So check condition first.
var isRunning = GsonPtr.to(rootNode).to("status", "conditions")
.get(JsonArray.class)
.asList().stream().filter(el -> "Running"
.equals(((JsonObject) el).get("type").getAsString()))
.findFirst().map(el -> "True"
.equals(((JsonObject) el).get("status").getAsString()))
.orElse(false);
if (!isRunning) {
return;
}
var podSearch = new ListOptions();
podSearch.setLabelSelector("app.kubernetes.io/name=" + APP_NAME
+ ",app.kubernetes.io/component=" + APP_NAME
+ ",app.kubernetes.io/instance=" + vmState.getMetadata().getName());
try {
var podList
= K8sV1PodStub.list(client, namespaceToWatch, podSearch);
for (var podStub : podList) {
rootNode.addProperty("nodeName",
podStub.model().get().getSpec().getNodeName());
}
} catch (ApiException e) {
logger.log(Level.WARNING, e,
() -> "Cannot access node information: " + e.getMessage());
}
}
/**
* Remove VM channel when VM is deleted.
*
* @param event the event
* @param channel the channel
*/
@Handler(priority = -10_000)
public void onVmDefChanged(VmDefChanged event, VmChannel channel) {
if (event.type() == Type.DELETED) {
channels.remove(event.vmDefinition().getMetadata().getName());
}
}
}