diff --git a/org.jdrupes.vmoperator.manager.events/src/org/jdrupes/vmoperator/manager/events/ChannelDictionary.java b/org.jdrupes.vmoperator.manager.events/src/org/jdrupes/vmoperator/manager/events/ChannelDictionary.java
new file mode 100644
index 0000000..05a079e
--- /dev/null
+++ b/org.jdrupes.vmoperator.manager.events/src/org/jdrupes/vmoperator/manager/events/ChannelDictionary.java
@@ -0,0 +1,113 @@
+/*
+ * VM-Operator
+ * Copyright (C) 2024 Michael N. Lipp
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation, either version 3 of the
+ * License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+package org.jdrupes.vmoperator.manager.events;
+
+import java.util.Collection;
+import java.util.Optional;
+import java.util.Set;
+import org.jgrapes.core.Channel;
+
+/**
+ * Supports the lookup of a channel by a name (an id). As a convenience,
+ * it is possible to additionally associate arbitrary data with the entry
+ * (and thus with the channel). Note that this interface defines a
+ * read-only view of the dictionary.
+ *
+ * @param the key type
+ * @param the channel type
+ * @param the type of the associated data
+ */
+public interface ChannelDictionary {
+
+ /**
+ * Combines the channel and the associated data.
+ *
+ * @param the channel type
+ * @param the type of the associated data
+ * @param channel the channel
+ * @param associated the associated
+ */
+ @SuppressWarnings("PMD.ShortClassName")
+ public record Value(C channel, A associated) {
+ }
+
+ /**
+ * Returns all known keys.
+ *
+ * @return the keys
+ */
+ Set keys();
+
+ /**
+ * Return all known values.
+ *
+ * @return the collection
+ */
+ Collection> values();
+
+ /**
+ * Returns the channel and associates data registered for the key
+ * or an empty optional if no entry exists.
+ *
+ * @param key the key
+ * @return the result
+ */
+ Optional> value(K key);
+
+ /**
+ * Return all known channels.
+ *
+ * @return the collection
+ */
+ default Collection channels() {
+ return values().stream().map(v -> v.channel).toList();
+ }
+
+ /**
+ * Returns the channel registered for the key or an empty optional
+ * if no mapping exists.
+ *
+ * @param key the key
+ * @return the optional
+ */
+ default Optional channel(K key) {
+ return value(key).map(b -> b.channel);
+ }
+
+ /**
+ * Returns all known associated data.
+ *
+ * @return the collection
+ */
+ default Collection associated() {
+ return values().stream()
+ .filter(v -> v.associated() != null)
+ .map(v -> v.associated).toList();
+ }
+
+ /**
+ * Return the data associated with the entry for the channel.
+ *
+ * @param key the key
+ * @return the data
+ */
+ default Optional associated(K key) {
+ return value(key).map(b -> b.associated);
+ }
+}
diff --git a/org.jdrupes.vmoperator.manager.events/src/org/jdrupes/vmoperator/manager/events/ChannelManager.java b/org.jdrupes.vmoperator.manager.events/src/org/jdrupes/vmoperator/manager/events/ChannelManager.java
index eb27ea0..2cf7a85 100644
--- a/org.jdrupes.vmoperator.manager.events/src/org/jdrupes/vmoperator/manager/events/ChannelManager.java
+++ b/org.jdrupes.vmoperator.manager.events/src/org/jdrupes/vmoperator/manager/events/ChannelManager.java
@@ -27,53 +27,24 @@ import java.util.function.Function;
import org.jgrapes.core.Channel;
/**
- * A channel manager that maintains mappings from a key to a channel.
- * As a convenience, it is possible to additionally associate arbitrary
- * data with the entry (and thus with the channel).
+ * Provides an actively managed implementation of the {@link ChannelDictionary}.
*
- * The manager should be used by a component that defines channels for
- * housekeeping. It can be shared between this component and another
- * component, preferably using the {@link #fixed()} view for the
- * second component. Alternatively, the second component can use a
- * {@link ChannelCache} to track the mappings using events.
+ * The {@link ChannelManager} can be used for housekeeping by any component
+ * that creates channels. It can be shared between this component and
+ * some other component, preferably passing it as {@link ChannelDictionary}
+ * (the read-only view) to the second component. Alternatively, the other
+ * component can use a {@link ChannelTracker} to track the mappings using
+ * events.
*
* @param the key type
* @param the channel type
* @param the type of the associated data
*/
-public class ChannelManager {
+public class ChannelManager
+ implements ChannelDictionary {
- private final Map> channels = new ConcurrentHashMap<>();
+ private final Map> entries = new ConcurrentHashMap<>();
private final Function supplier;
- private ChannelManager readOnly;
-
- /**
- * Combines the channel and the associated data.
- *
- * @param the generic type
- * @param the generic type
- */
- @SuppressWarnings("PMD.ShortClassName")
- public static class Both {
-
- /** The channel. */
- public C channel;
-
- /** The associated. */
- public A associated;
-
- /**
- * Instantiates a new both.
- *
- * @param channel the channel
- * @param associated the associated
- */
- public Both(C channel, A associated) {
- super();
- this.channel = channel;
- this.associated = associated;
- }
- }
/**
* Instantiates a new channel manager.
@@ -91,6 +62,21 @@ public class ChannelManager {
this(k -> null);
}
+ @Override
+ public Set keys() {
+ return entries.keySet();
+ }
+
+ /**
+ * Return all known values.
+ *
+ * @return the collection
+ */
+ @Override
+ public Collection> values() {
+ return entries.values();
+ }
+
/**
* Returns the channel and associates data registered for the key
* or an empty optional if no mapping exists.
@@ -98,10 +84,8 @@ public class ChannelManager {
* @param key the key
* @return the result
*/
- public Optional> both(K key) {
- synchronized (channels) {
- return Optional.ofNullable(channels.get(key));
- }
+ public Optional> value(K key) {
+ return Optional.ofNullable(entries.get(key));
}
/**
@@ -113,7 +97,7 @@ public class ChannelManager {
* @return the channel manager
*/
public ChannelManager put(K key, C channel, A associated) {
- channels.put(key, new Both<>(channel, associated));
+ entries.put(key, new Value<>(channel, associated));
return this;
}
@@ -129,17 +113,6 @@ public class ChannelManager {
return this;
}
- /**
- * Returns the channel registered for the key or an empty optional
- * if no mapping exists.
- *
- * @param key the key
- * @return the optional
- */
- public Optional channel(K key) {
- return both(key).map(b -> b.channel);
- }
-
/**
* Returns the {@link Channel} for the given name, creating it using
* the supplier passed to the constructor if it doesn't exist yet.
@@ -147,8 +120,8 @@ public class ChannelManager {
* @param key the key
* @return the channel
*/
- public Optional getChannel(K key) {
- return getChannel(key, supplier);
+ public C channelGet(K key) {
+ return computeIfAbsent(key, supplier);
}
/**
@@ -161,17 +134,9 @@ public class ChannelManager {
*/
@SuppressWarnings({ "PMD.AssignmentInOperand",
"PMD.DataflowAnomalyAnalysis" })
- public Optional getChannel(K key, Function supplier) {
- synchronized (channels) {
- return Optional
- .of(Optional.ofNullable(channels.get(key))
- .map(v -> v.channel)
- .orElseGet(() -> {
- var channel = supplier.apply(key);
- channels.put(key, new Both<>(channel, null));
- return channel;
- }));
- }
+ public C computeIfAbsent(K key, Function supplier) {
+ return entries.computeIfAbsent(key,
+ k -> new Value<>(supplier.apply(k), null)).channel();
}
/**
@@ -183,121 +148,17 @@ public class ChannelManager {
* @return the channel manager
*/
public ChannelManager associate(K key, A data) {
- synchronized (channels) {
- Optional.ofNullable(channels.get(key))
- .ifPresent(v -> v.associated = data);
- }
+ Optional.ofNullable(entries.computeIfPresent(key,
+ (k, existing) -> new Value<>(existing.channel(), data)));
return this;
}
- /**
- * Return the data associated with the entry for the channel.
- *
- * @param key the key
- * @return the data
- */
- public Optional associated(K key) {
- return both(key).map(b -> b.associated);
- }
-
- /**
- * Returns all associated data.
- *
- * @return the collection
- */
- public Collection associated() {
- synchronized (channels) {
- return channels.values().stream()
- .filter(v -> v.associated != null)
- .map(v -> v.associated).toList();
- }
- }
-
/**
* Removes the channel with the given name.
*
* @param name the name
*/
public void remove(String name) {
- synchronized (channels) {
- channels.remove(name);
- }
- }
-
- /**
- * Returns all known keys.
- *
- * @return the sets the
- */
- public Set keys() {
- return channels.keySet();
- }
-
- /**
- * Returns a read only view of this channel manager. The methods
- * that usually create a new entry refrain from doing so. The
- * methods that change the value of channel and {@link #remove(String)}
- * do nothing. The associated data, however, can still be changed.
- *
- * @return the channel manager
- */
- public ChannelManager fixed() {
- if (readOnly == null) {
- readOnly = new ChannelManager<>(supplier) {
-
- @Override
- public Optional> both(K key) {
- return ChannelManager.this.both(key);
- }
-
- @Override
- public ChannelManager put(K key, C channel,
- A associated) {
- return associate(key, associated);
- }
-
- @Override
- public Optional getChannel(K key) {
- return ChannelManager.this.channel(key);
- }
-
- @Override
- public Optional getChannel(K key, Function supplier) {
- return ChannelManager.this.channel(key);
- }
-
- @Override
- public ChannelManager associate(K key, A data) {
- return ChannelManager.this.associate(key, data);
- }
-
- @Override
- public Optional associated(K key) {
- return ChannelManager.this.associated(key);
- }
-
- @Override
- public Collection associated() {
- return ChannelManager.this.associated();
- }
-
- @Override
- public void remove(String name) {
- // Do nothing
- }
-
- @Override
- public Set keys() {
- return ChannelManager.this.keys();
- }
-
- @Override
- public ChannelManager fixed() {
- return ChannelManager.this.fixed();
- }
-
- };
- }
- return readOnly;
+ entries.remove(name);
}
}
diff --git a/org.jdrupes.vmoperator.manager.events/src/org/jdrupes/vmoperator/manager/events/ChannelCache.java b/org.jdrupes.vmoperator.manager.events/src/org/jdrupes/vmoperator/manager/events/ChannelTracker.java
similarity index 52%
rename from org.jdrupes.vmoperator.manager.events/src/org/jdrupes/vmoperator/manager/events/ChannelCache.java
rename to org.jdrupes.vmoperator.manager.events/src/org/jdrupes/vmoperator/manager/events/ChannelTracker.java
index 1e6d031..8a41908 100644
--- a/org.jdrupes.vmoperator.manager.events/src/org/jdrupes/vmoperator/manager/events/ChannelCache.java
+++ b/org.jdrupes.vmoperator.manager.events/src/org/jdrupes/vmoperator/manager/events/ChannelTracker.java
@@ -19,6 +19,7 @@
package org.jdrupes.vmoperator.manager.events;
import java.lang.ref.WeakReference;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
@@ -27,20 +28,30 @@ import java.util.concurrent.ConcurrentHashMap;
import org.jgrapes.core.Channel;
/**
- * A channel manager that tracks mappings from a key to a channel using
- * "add/remove" (or "open/close") events and the channels on which they
- * are delivered.
+ * Used to track mapping from a key to a channel. Entries must
+ * be maintained by handlers for "add/remove" (or "open/close")
+ * events delivered on the channels that are to be
+ * made available by the tracker.
+ *
+ * The channels are stored in the dictionary using {@link WeakReference}s.
+ * Removing entries is therefore best practice but not an absolute necessity
+ * as entries for cleared references are removed when one of the methods
+ * {@link #values()}, {@link #channels()} or {@link #associated()} is called.
*
* @param the key type
* @param the channel type
* @param the type of the associated data
*/
-public class ChannelCache {
+public class ChannelTracker
+ implements ChannelDictionary {
- private final Map> channels = new ConcurrentHashMap<>();
+ private final Map> entries = new ConcurrentHashMap<>();
/**
- * Helper
+ * Combines the channel and associated data.
+ *
+ * @param the generic type
+ * @param the generic type
*/
@SuppressWarnings("PMD.ShortClassName")
private static class Data {
@@ -57,32 +68,24 @@ public class ChannelCache {
}
}
- /**
- * Combines the channel and the associated data.
- *
- * @param the generic type
- * @param the generic type
- */
- @SuppressWarnings("PMD.ShortClassName")
- public static class Both {
+ @Override
+ public Set keys() {
+ return entries.keySet();
+ }
- /** The channel. */
- public C channel;
-
- /** The associated. */
- public A associated;
-
- /**
- * Instantiates a new both.
- *
- * @param channel the channel
- * @param associated the associated
- */
- public Both(C channel, A associated) {
- super();
- this.channel = channel;
- this.associated = associated;
+ @Override
+ public Collection> values() {
+ var result = new ArrayList>();
+ for (var itr = entries.entrySet().iterator(); itr.hasNext();) {
+ var value = itr.next().getValue();
+ var channel = value.channel.get();
+ if (channel == null) {
+ itr.remove();
+ continue;
+ }
+ result.add(new Value<>(channel, value.associated));
}
+ return result;
}
/**
@@ -92,20 +95,18 @@ public class ChannelCache {
* @param key the key
* @return the result
*/
- public Optional> both(K key) {
- synchronized (channels) {
- var value = channels.get(key);
- if (value == null) {
- return Optional.empty();
- }
- var channel = value.channel.get();
- if (channel == null) {
- // Cleanup old reference
- channels.remove(key);
- return Optional.empty();
- }
- return Optional.of(new Both<>(channel, value.associated));
+ public Optional> value(K key) {
+ var value = entries.get(key);
+ if (value == null) {
+ return Optional.empty();
}
+ var channel = value.channel.get();
+ if (channel == null) {
+ // Cleanup old reference
+ entries.remove(key);
+ return Optional.empty();
+ }
+ return Optional.of(new Value<>(channel, value.associated));
}
/**
@@ -116,10 +117,10 @@ public class ChannelCache {
* @param associated the associated
* @return the channel manager
*/
- public ChannelCache put(K key, C channel, A associated) {
+ public ChannelTracker put(K key, C channel, A associated) {
Data data = new Data<>(channel);
data.associated = associated;
- channels.put(key, data);
+ entries.put(key, data);
return this;
}
@@ -130,22 +131,11 @@ public class ChannelCache {
* @param channel the channel
* @return the channel manager
*/
- public ChannelCache put(K key, C channel) {
+ public ChannelTracker put(K key, C channel) {
put(key, channel, null);
return this;
}
- /**
- * Returns the channel registered for the key or an empty optional
- * if no mapping exists.
- *
- * @param key the key
- * @return the optional
- */
- public Optional channel(K key) {
- return both(key).map(b -> b.channel);
- }
-
/**
* Associate the entry for the channel with the given data. The entry
* for the channel must already exist.
@@ -154,54 +144,18 @@ public class ChannelCache {
* @param data the data
* @return the channel manager
*/
- public ChannelCache associate(K key, A data) {
- synchronized (channels) {
- Optional.ofNullable(channels.get(key))
- .ifPresent(v -> v.associated = data);
- }
+ public ChannelTracker associate(K key, A data) {
+ Optional.ofNullable(entries.get(key))
+ .ifPresent(v -> v.associated = data);
return this;
}
- /**
- * Return the data associated with the entry for the channel.
- *
- * @param key the key
- * @return the data
- */
- public Optional associated(K key) {
- return both(key).map(b -> b.associated);
- }
-
- /**
- * Returns all associated data.
- *
- * @return the collection
- */
- public Collection associated() {
- synchronized (channels) {
- return channels.values().stream()
- .filter(v -> v.channel.get() != null && v.associated != null)
- .map(v -> v.associated).toList();
- }
- }
-
/**
* Removes the channel with the given name.
*
* @param name the name
*/
public void remove(String name) {
- synchronized (channels) {
- channels.remove(name);
- }
- }
-
- /**
- * Returns all known keys.
- *
- * @return the sets the
- */
- public Set keys() {
- return channels.keySet();
+ entries.remove(name);
}
}
diff --git a/org.jdrupes.vmoperator.manager.events/src/org/jdrupes/vmoperator/manager/events/ServiceChanged.java b/org.jdrupes.vmoperator.manager.events/src/org/jdrupes/vmoperator/manager/events/ServiceChanged.java
deleted file mode 100644
index a8008e0..0000000
--- a/org.jdrupes.vmoperator.manager.events/src/org/jdrupes/vmoperator/manager/events/ServiceChanged.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * VM-Operator
- * Copyright (C) 2024 Michael N. Lipp
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of the
- * License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-
-package org.jdrupes.vmoperator.manager.events;
-
-import io.kubernetes.client.openapi.models.V1Service;
-import org.jdrupes.vmoperator.common.K8sObserver.ResponseType;
-import org.jgrapes.core.Channel;
-import org.jgrapes.core.Components;
-import org.jgrapes.core.Event;
-
-/**
- * Indicates that a service has changed.
- */
-@SuppressWarnings("PMD.DataClass")
-public class ServiceChanged extends Event {
-
- private final ResponseType type;
- private final V1Service service;
-
- /**
- * Initializes a new service changed event.
- *
- * @param type the type
- * @param service the service
- */
- public ServiceChanged(ResponseType type, V1Service service) {
- this.type = type;
- this.service = service;
- }
-
- /**
- * Returns the type.
- *
- * @return the type
- */
- public ResponseType type() {
- return type;
- }
-
- /**
- * Gets the service.
- *
- * @return the service
- */
- public V1Service service() {
- return service;
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append(Components.objectName(this)).append(" [")
- .append(service.getMetadata().getName()).append(' ').append(type);
- if (channels() != null) {
- builder.append(", channels=").append(Channel.toString(channels()));
- }
- builder.append(']');
- return builder.toString();
- }
-}
diff --git a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/AbstractMonitor.java b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/AbstractMonitor.java
index 0c9b08c..43f4287 100644
--- a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/AbstractMonitor.java
+++ b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/AbstractMonitor.java
@@ -27,14 +27,11 @@ import io.kubernetes.client.util.generic.options.ListOptions;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import org.jdrupes.vmoperator.common.K8s;
import org.jdrupes.vmoperator.common.K8sClient;
import org.jdrupes.vmoperator.common.K8sObserver;
-import org.jdrupes.vmoperator.common.K8sObserver.ResponseType;
-import org.jdrupes.vmoperator.manager.events.ChannelManager;
import org.jdrupes.vmoperator.manager.events.Exit;
import org.jgrapes.core.Channel;
import org.jgrapes.core.Component;
@@ -45,7 +42,11 @@ import org.jgrapes.core.events.Stop;
import org.jgrapes.util.events.ConfigurationUpdate;
/**
- * A base class for monitoring VM related resources.
+ * A base class for monitoring VM related resources. When started,
+ * it creates observers for all versions of the the {@link APIResource}
+ * configured by {@link #context(APIResource)}. The APIResource is not
+ * passed to the constructor because in some cases it has to be
+ * evaluated lazily.
*
* @param the object type for the context
* @param the object list type for the context
@@ -61,15 +62,17 @@ public abstract class AbstractMonitor channelManager;
/**
* Initializes the instance.
*
* @param componentChannel the component channel
+ * @param objectClass the class of the Kubernetes object to watch
+ * @param objectListClass the class of the list of Kubernetes objects
+ * to watch
*/
- protected AbstractMonitor(Channel componentChannel, Class objectClass,
- Class objectListClass) {
+ protected AbstractMonitor(Channel componentChannel,
+ Class objectClass, Class objectListClass) {
super(componentChannel);
this.objectClass = objectClass;
this.objectListClass = objectListClass;
@@ -155,27 +158,6 @@ public abstract class AbstractMonitor channelManager() {
- return channelManager;
- }
-
- /**
- * Sets the channel manager.
- *
- * @param channelManager the channel manager
- * @return the abstract monitor
- */
- public AbstractMonitor
- channelManager(ChannelManager channelManager) {
- this.channelManager = channelManager;
- return this;
- }
-
/**
* Looks for a key "namespace" in the configuration and, if found,
* sets the namespace to its value.
@@ -193,7 +175,7 @@ public abstract class AbstractMonitor {
handleChange(c, r);
- if (ResponseType.valueOf(r.type) == ResponseType.DELETED) {
- channelManager.remove(r.object.getMetadata().getName());
- }
}).onTerminated((o, t) -> {
if (observerCounter.decrementAndGet() == 0) {
unregisterAsGenerator();
@@ -255,7 +234,8 @@ public abstract class AbstractMonitor change);
-
- /**
- * Returns the {@link Channel} for the given name.
- *
- * @param name the name
- * @return the channel used for events related to the specified object
- */
- protected Optional channel(String name) {
- return channelManager.getChannel(name);
- }
}
diff --git a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/Controller.java b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/Controller.java
index 86e3751..effc938 100644
--- a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/Controller.java
+++ b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/Controller.java
@@ -100,9 +100,8 @@ public class Controller extends Component {
return null;
}
});
- attach(new VmMonitor(channel()).channelManager(chanMgr));
- attach(new DisplaySecretMonitor(channel())
- .channelManager(chanMgr.fixed()));
+ attach(new VmMonitor(channel(), chanMgr));
+ attach(new DisplaySecretMonitor(channel(), chanMgr));
// Currently, we don't use the IP assigned by the load balancer
// to access the VM's console. Might change in the future.
// attach(new ServiceMonitor(channel()).channelManager(chanMgr));
diff --git a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/DisplaySecretMonitor.java b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/DisplaySecretMonitor.java
index c113253..fa0bbf0 100644
--- a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/DisplaySecretMonitor.java
+++ b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/DisplaySecretMonitor.java
@@ -44,6 +44,7 @@ import org.jdrupes.vmoperator.common.K8sV1SecretStub;
import static org.jdrupes.vmoperator.manager.Constants.COMP_DISPLAY_SECRET;
import static org.jdrupes.vmoperator.manager.Constants.DATA_DISPLAY_PASSWORD;
import static org.jdrupes.vmoperator.manager.Constants.DATA_PASSWORD_EXPIRY;
+import org.jdrupes.vmoperator.manager.events.ChannelDictionary;
import org.jdrupes.vmoperator.manager.events.GetDisplayPassword;
import org.jdrupes.vmoperator.manager.events.VmChannel;
import org.jdrupes.vmoperator.manager.events.VmDefChanged;
@@ -68,14 +69,18 @@ public class DisplaySecretMonitor
private int passwordValidity = 10;
private final List pendingGets
= Collections.synchronizedList(new LinkedList<>());
+ private final ChannelDictionary channelDictionary;
/**
* Instantiates a new display secrets monitor.
*
* @param componentChannel the component channel
+ * @param channelDictionary the channel dictionary
*/
- public DisplaySecretMonitor(Channel componentChannel) {
+ public DisplaySecretMonitor(Channel componentChannel,
+ ChannelDictionary channelDictionary) {
super(componentChannel, V1Secret.class, V1SecretList.class);
+ this.channelDictionary = channelDictionary;
context(K8sV1SecretStub.CONTEXT);
ListOptions options = new ListOptions();
options.setLabelSelector("app.kubernetes.io/name=" + APP_NAME + ","
@@ -116,7 +121,7 @@ public class DisplaySecretMonitor
if (vmName == null) {
return;
}
- var channel = channel(vmName).orElse(null);
+ var channel = channelDictionary.channel(vmName).orElse(null);
if (channel == null || channel.vmDefinition() == null) {
return;
}
@@ -248,6 +253,7 @@ public class DisplaySecretMonitor
* @param channel the channel
*/
@Handler
+ @SuppressWarnings("PMD.AvoidSynchronizedStatement")
public void onVmDefChanged(VmDefChanged event, Channel channel) {
synchronized (pendingGets) {
String vmName = event.vmDefinition().metadata().getName();
diff --git a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/ServiceMonitor.java b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/ServiceMonitor.java
deleted file mode 100644
index bd5635e..0000000
--- a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/ServiceMonitor.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * VM-Operator
- * Copyright (C) 2024 Michael N. Lipp
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation, either version 3 of the
- * License, or (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-
-package org.jdrupes.vmoperator.manager;
-
-import io.kubernetes.client.openapi.ApiException;
-import io.kubernetes.client.openapi.models.V1Service;
-import io.kubernetes.client.openapi.models.V1ServiceList;
-import io.kubernetes.client.util.Watch.Response;
-import io.kubernetes.client.util.generic.options.ListOptions;
-import java.io.IOException;
-import static org.jdrupes.vmoperator.common.Constants.APP_NAME;
-import org.jdrupes.vmoperator.common.K8sClient;
-import org.jdrupes.vmoperator.common.K8sObserver.ResponseType;
-import org.jdrupes.vmoperator.common.K8sV1ServiceStub;
-import org.jdrupes.vmoperator.manager.events.ServiceChanged;
-import org.jdrupes.vmoperator.manager.events.VmChannel;
-import org.jgrapes.core.Channel;
-
-/**
- * Watches for changes of services.
- */
-@SuppressWarnings("PMD.DataflowAnomalyAnalysis")
-public class ServiceMonitor
- extends AbstractMonitor {
-
- /**
- * Instantiates a new display secrets monitor.
- *
- * @param componentChannel the component channel
- */
- public ServiceMonitor(Channel componentChannel) {
- super(componentChannel, V1Service.class, V1ServiceList.class);
- context(K8sV1ServiceStub.CONTEXT);
- ListOptions options = new ListOptions();
- options.setLabelSelector("app.kubernetes.io/name=" + APP_NAME);
- options(options);
- }
-
- @Override
- protected void prepareMonitoring() throws IOException, ApiException {
- client(new K8sClient());
- }
-
- @Override
- protected void handleChange(K8sClient client, Response change) {
- String vmName = change.object.getMetadata().getLabels()
- .get("app.kubernetes.io/instance");
- if (vmName == null) {
- return;
- }
- var channel = channel(vmName).orElse(null);
- if (channel == null || channel.vmDefinition() == null) {
- return;
- }
- channel.pipeline().fire(new ServiceChanged(
- ResponseType.valueOf(change.type), change.object), channel);
- }
-}
diff --git a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/VmMonitor.java b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/VmMonitor.java
index e049b17..0ad5017 100644
--- a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/VmMonitor.java
+++ b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/VmMonitor.java
@@ -43,10 +43,12 @@ import org.jdrupes.vmoperator.common.VmDefinitionStub;
import static org.jdrupes.vmoperator.manager.Constants.APP_NAME;
import static org.jdrupes.vmoperator.manager.Constants.VM_OP_KIND_VM;
import static org.jdrupes.vmoperator.manager.Constants.VM_OP_NAME;
+import org.jdrupes.vmoperator.manager.events.ChannelManager;
import org.jdrupes.vmoperator.manager.events.VmChannel;
import org.jdrupes.vmoperator.manager.events.VmDefChanged;
import org.jdrupes.vmoperator.util.GsonPtr;
import org.jgrapes.core.Channel;
+import org.jgrapes.core.Event;
/**
* Watches for changes of VM definitions.
@@ -55,14 +57,19 @@ import org.jgrapes.core.Channel;
public class VmMonitor extends
AbstractMonitor {
+ private final ChannelManager channelManager;
+
/**
* Instantiates a new VM definition watcher.
*
* @param componentChannel the component channel
+ * @param channelManager the channel manager
*/
- public VmMonitor(Channel componentChannel) {
+ public VmMonitor(Channel componentChannel,
+ ChannelManager channelManager) {
super(componentChannel, VmDefinitionModel.class,
VmDefinitionModels.class);
+ this.channelManager = channelManager;
}
@Override
@@ -107,10 +114,7 @@ public class VmMonitor extends
protected void handleChange(K8sClient client,
Watch.Response response) {
V1ObjectMeta metadata = response.object.getMetadata();
- VmChannel channel = channel(metadata.getName()).orElse(null);
- if (channel == null) {
- return;
- }
+ VmChannel channel = channelManager.channelGet(metadata.getName());
// Get full definition and associate with channel as backup
var vmDef = response.object;
@@ -132,13 +136,24 @@ public class VmMonitor extends
() -> "Cannot get model for " + response.object.getMetadata());
return;
}
+ if (ResponseType.valueOf(response.type) == ResponseType.DELETED) {
+ channelManager.remove(metadata.getName());
+ }
- // Create and fire event
+ // Create and fire changed event. Remove channel from channel
+ // manager on completion.
channel.pipeline()
- .fire(new VmDefChanged(ResponseType.valueOf(response.type),
- channel.setGeneration(
- response.object.getMetadata().getGeneration()),
- vmDef), channel);
+ .fire(Event.onCompletion(
+ new VmDefChanged(ResponseType.valueOf(response.type),
+ channel.setGeneration(
+ response.object.getMetadata().getGeneration()),
+ vmDef),
+ e -> {
+ if (e.type() == ResponseType.DELETED) {
+ channelManager
+ .remove(e.vmDefinition().metadata().getName());
+ }
+ }), channel);
}
private VmDefinitionModel getModel(K8sClient client,
diff --git a/org.jdrupes.vmoperator.vmconlet/src/org/jdrupes/vmoperator/vmconlet/TimeSeries.java b/org.jdrupes.vmoperator.vmconlet/src/org/jdrupes/vmoperator/vmconlet/TimeSeries.java
index ee1667c..62bdf55 100644
--- a/org.jdrupes.vmoperator.vmconlet/src/org/jdrupes/vmoperator/vmconlet/TimeSeries.java
+++ b/org.jdrupes.vmoperator.vmconlet/src/org/jdrupes/vmoperator/vmconlet/TimeSeries.java
@@ -51,7 +51,8 @@ public class TimeSeries {
* @param numbers the numbers
* @return the time series
*/
- @SuppressWarnings("PMD.AvoidLiteralsInIfCondition")
+ @SuppressWarnings({ "PMD.AvoidLiteralsInIfCondition",
+ "PMD.AvoidSynchronizedStatement" })
public TimeSeries add(Instant time, Number... numbers) {
var newEntry = new Entry(time, numbers);
boolean nothingNew = false;
@@ -83,6 +84,7 @@ public class TimeSeries {
*
* @return the list
*/
+ @SuppressWarnings("PMD.AvoidSynchronizedStatement")
public List entries() {
synchronized (data) {
return new ArrayList<>(data);
diff --git a/org.jdrupes.vmoperator.vmconlet/src/org/jdrupes/vmoperator/vmconlet/VmConlet.java b/org.jdrupes.vmoperator.vmconlet/src/org/jdrupes/vmoperator/vmconlet/VmConlet.java
index a8bb1ae..b8ff79f 100644
--- a/org.jdrupes.vmoperator.vmconlet/src/org/jdrupes/vmoperator/vmconlet/VmConlet.java
+++ b/org.jdrupes.vmoperator.vmconlet/src/org/jdrupes/vmoperator/vmconlet/VmConlet.java
@@ -30,14 +30,14 @@ import java.math.BigDecimal;
import java.math.BigInteger;
import java.time.Duration;
import java.time.Instant;
-import java.util.HashSet;
+import java.util.EnumSet;
import java.util.Optional;
import java.util.Set;
import org.jdrupes.json.JsonBeanDecoder;
import org.jdrupes.json.JsonDecodeException;
import org.jdrupes.vmoperator.common.K8sObserver;
import org.jdrupes.vmoperator.common.VmDefinitionModel;
-import org.jdrupes.vmoperator.manager.events.ChannelCache;
+import org.jdrupes.vmoperator.manager.events.ChannelTracker;
import org.jdrupes.vmoperator.manager.events.ModifyVm;
import org.jdrupes.vmoperator.manager.events.VmChannel;
import org.jdrupes.vmoperator.manager.events.VmDefChanged;
@@ -68,8 +68,8 @@ public class VmConlet extends FreeMarkerConlet {
private static final Set MODES = RenderMode.asSet(
RenderMode.Preview, RenderMode.View);
- private final ChannelCache channelManager = new ChannelCache<>();
+ private final ChannelTracker channelTracker = new ChannelTracker<>();
private final TimeSeries summarySeries = new TimeSeries(Duration.ofDays(1));
private Summary cachedSummary;
@@ -128,7 +128,7 @@ public class VmConlet extends FreeMarkerConlet {
protected Set doRenderConlet(RenderConletRequestBase> event,
ConsoleConnection channel, String conletId, VmsModel conletState)
throws Exception {
- Set renderedAs = new HashSet<>();
+ Set renderedAs = EnumSet.noneOf(RenderMode.class);
boolean sendVmInfos = false;
if (event.renderAs().contains(RenderMode.Preview)) {
Template tpl
@@ -160,7 +160,7 @@ public class VmConlet extends FreeMarkerConlet {
sendVmInfos = true;
}
if (sendVmInfos) {
- for (var vmDef : channelManager.associated()) {
+ for (var vmDef : channelTracker.associated()) {
var def
= JsonBeanDecoder.create(vmDef.data().toString())
.readObject();
@@ -188,7 +188,7 @@ public class VmConlet extends FreeMarkerConlet {
throws JsonDecodeException, IOException {
var vmName = event.vmDefinition().getMetadata().getName();
if (event.type() == K8sObserver.ResponseType.DELETED) {
- channelManager.remove(vmName);
+ channelTracker.remove(vmName);
for (var entry : conletIdsByConsoleConnection().entrySet()) {
for (String conletId : entry.getValue()) {
entry.getKey().respond(new NotifyConletView(type(),
@@ -198,7 +198,7 @@ public class VmConlet extends FreeMarkerConlet {
} else {
var vmDef = new VmDefinitionModel(channel.client().getJSON()
.getGson(), cleanup(event.vmDefinition().data()));
- channelManager.put(vmName, channel, vmDef);
+ channelTracker.put(vmName, channel, vmDef);
var def = JsonBeanDecoder.create(vmDef.data().toString())
.readObject();
for (var entry : conletIdsByConsoleConnection().entrySet()) {
@@ -321,7 +321,7 @@ public class VmConlet extends FreeMarkerConlet {
return cachedSummary;
}
Summary summary = new Summary();
- for (var vmDef : channelManager.associated()) {
+ for (var vmDef : channelTracker.associated()) {
summary.totalVms += 1;
var status = GsonPtr.to(vmDef.data()).to("status");
summary.usedCpus += status.getAsInt("cpus").orElse(0);
@@ -347,7 +347,7 @@ public class VmConlet extends FreeMarkerConlet {
throws Exception {
event.stop();
var vmName = event.params().asString(0);
- var vmChannel = channelManager.channel(vmName).orElse(null);
+ var vmChannel = channelTracker.channel(vmName).orElse(null);
if (vmChannel == null) {
return;
}
diff --git a/org.jdrupes.vmoperator.vmviewer/src/org/jdrupes/vmoperator/vmviewer/VmViewer.java b/org.jdrupes.vmoperator.vmviewer/src/org/jdrupes/vmoperator/vmviewer/VmViewer.java
index e1e5c6d..db89b81 100644
--- a/org.jdrupes.vmoperator.vmviewer/src/org/jdrupes/vmoperator/vmviewer/VmViewer.java
+++ b/org.jdrupes.vmoperator.vmviewer/src/org/jdrupes/vmoperator/vmviewer/VmViewer.java
@@ -53,7 +53,7 @@ import org.jdrupes.vmoperator.common.K8sDynamicModel;
import org.jdrupes.vmoperator.common.K8sObserver;
import org.jdrupes.vmoperator.common.VmDefinitionModel;
import org.jdrupes.vmoperator.common.VmDefinitionModel.Permission;
-import org.jdrupes.vmoperator.manager.events.ChannelCache;
+import org.jdrupes.vmoperator.manager.events.ChannelTracker;
import org.jdrupes.vmoperator.manager.events.GetDisplayPassword;
import org.jdrupes.vmoperator.manager.events.ModifyVm;
import org.jdrupes.vmoperator.manager.events.ResetVm;
@@ -122,8 +122,8 @@ public class VmViewer extends FreeMarkerConlet {
RenderMode.Preview, RenderMode.Edit);
private static final Set MODES_FOR_GENERATED = RenderMode.asSet(
RenderMode.Preview, RenderMode.StickyPreview);
- private final ChannelCache channelManager = new ChannelCache<>();
+ private final ChannelTracker channelTracker = new ChannelTracker<>();
private static ObjectMapper objectMapper
= new ObjectMapper().registerModule(new JavaTimeModule());
private Class> preferredIpVersion = Inet4Address.class;
@@ -349,7 +349,7 @@ public class VmViewer extends FreeMarkerConlet {
// Remove conlet if definition has been removed
if (model.vmName() != null
- && !channelManager.associated(model.vmName()).isPresent()) {
+ && !channelTracker.associated(model.vmName()).isPresent()) {
channel.respond(
new DeleteConlet(conletId, Collections.emptySet()));
return Collections.emptySet();
@@ -357,7 +357,7 @@ public class VmViewer extends FreeMarkerConlet {
// Don't render if user has not at least one permission
if (model.vmName() != null
- && channelManager.associated(model.vmName())
+ && channelTracker.associated(model.vmName())
.map(d -> permissions(d, channel.session()).isEmpty())
.orElse(true)) {
return Collections.emptySet();
@@ -395,7 +395,7 @@ public class VmViewer extends FreeMarkerConlet {
}
private List accessibleVms(ConsoleConnection channel) {
- return channelManager.associated().stream()
+ return channelTracker.associated().stream()
.filter(d -> !permissions(d, channel.session()).isEmpty())
.map(d -> d.getMetadata().getName()).sorted().toList();
}
@@ -419,7 +419,7 @@ public class VmViewer extends FreeMarkerConlet {
if (Strings.isNullOrEmpty(model.vmName())) {
return;
}
- channelManager.associated(model.vmName()).ifPresent(vmDef -> {
+ channelTracker.associated(model.vmName()).ifPresent(vmDef -> {
try {
var def = JsonBeanDecoder.create(vmDef.data().toString())
.readObject();
@@ -465,9 +465,9 @@ public class VmViewer extends FreeMarkerConlet {
.remove("managedFields");
var vmName = vmDef.getMetadata().getName();
if (event.type() == K8sObserver.ResponseType.DELETED) {
- channelManager.remove(vmName);
+ channelTracker.remove(vmName);
} else {
- channelManager.put(vmName, channel, vmDef);
+ channelTracker.put(vmName, channel, vmDef);
}
for (var entry : conletIdsByConsoleConnection().entrySet()) {
var connection = entry.getKey();
@@ -502,12 +502,12 @@ public class VmViewer extends FreeMarkerConlet {
// Handle command for selected VM
var both = Optional.ofNullable(model.vmName())
- .flatMap(vm -> channelManager.both(vm));
+ .flatMap(vm -> channelTracker.value(vm));
if (both.isEmpty()) {
return;
}
- var vmChannel = both.get().channel;
- var vmDef = both.get().associated;
+ var vmChannel = both.get().channel();
+ var vmDef = both.get().associated();
var vmName = vmDef.metadata().getName();
var perms = permissions(vmDef, channel.session());
var resourceBundle = resourceBundle(channel.locale());
@@ -556,7 +556,7 @@ public class VmViewer extends FreeMarkerConlet {
private void openConsole(String vmName, ConsoleConnection connection,
ViewerModel model, String password) {
- var vmDef = channelManager.associated(vmName).orElse(null);
+ var vmDef = channelTracker.associated(vmName).orElse(null);
if (vmDef == null) {
return;
}