From f56fe228aac8010198ff939e911adb3b3953fdde Mon Sep 17 00:00:00 2001 From: "Michael N. Lipp" Date: Tue, 15 Aug 2023 15:54:40 +0200 Subject: [PATCH] The client library is so not thread safe, adapt usage. See https://github.com/kubernetes-client/java/issues/100 --- .../vmoperator/manager/CmReconciler.java | 6 +++- .../vmoperator/manager/Controller.java | 6 ++-- .../vmoperator/manager/ServiceReconciler.java | 6 +++- .../vmoperator/manager/StsReconciler.java | 6 +++- .../jdrupes/vmoperator/manager/VmWatcher.java | 34 ++++++++++++------- 5 files changed, 40 insertions(+), 18 deletions(-) diff --git a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/CmReconciler.java b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/CmReconciler.java index aa1b69b..7b30213 100644 --- a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/CmReconciler.java +++ b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/CmReconciler.java @@ -33,6 +33,9 @@ import java.io.StringWriter; import java.util.Map; import java.util.logging.Logger; import org.jdrupes.vmoperator.manager.VmDefChanged.Type; +import org.yaml.snakeyaml.LoaderOptions; +import org.yaml.snakeyaml.Yaml; +import org.yaml.snakeyaml.constructor.SafeConstructor; /** * Delegee for reconciling the config map @@ -85,7 +88,8 @@ import org.jdrupes.vmoperator.manager.VmDefChanged.Type; fmTemplate.process(model, out); // Avoid Yaml.load due to // https://github.com/kubernetes-client/java/issues/2741 - var mapDef = Dynamics.newFromYaml(out.toString()); + var mapDef = Dynamics.newFromYaml( + new Yaml(new SafeConstructor(new LoaderOptions())), out.toString()); // Apply and maybe force pod update var newState = K8s.apply(cmApi, mapDef, out.toString()); diff --git a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/Controller.java b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/Controller.java index 582cb3c..fe668cb 100644 --- a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/Controller.java +++ b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/Controller.java @@ -20,7 +20,6 @@ package org.jdrupes.vmoperator.manager; import io.kubernetes.client.openapi.ApiException; import io.kubernetes.client.openapi.Configuration; -import io.kubernetes.client.util.Config; import java.io.IOException; import org.jgrapes.core.Channel; import org.jgrapes.core.Component; @@ -53,7 +52,8 @@ public class Controller extends Component { */ @Handler(priority = 100) public void onStart(Start event) throws IOException, ApiException { - var client = Config.defaultClient(); - Configuration.setDefaultApiClient(client); + // Make sure to use thread specific client + // https://github.com/kubernetes-client/java/issues/100 + Configuration.setDefaultApiClient(null); } } diff --git a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/ServiceReconciler.java b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/ServiceReconciler.java index aef4898..0a1c6f9 100644 --- a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/ServiceReconciler.java +++ b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/ServiceReconciler.java @@ -28,6 +28,9 @@ import java.io.StringWriter; import java.util.Map; import java.util.logging.Logger; import org.jdrupes.vmoperator.manager.VmDefChanged.Type; +import org.yaml.snakeyaml.LoaderOptions; +import org.yaml.snakeyaml.Yaml; +import org.yaml.snakeyaml.constructor.SafeConstructor; /** * Delegee for reconciling the service @@ -79,7 +82,8 @@ import org.jdrupes.vmoperator.manager.VmDefChanged.Type; fmTemplate.process(model, out); // Avoid Yaml.load due to // https://github.com/kubernetes-client/java/issues/2741 - var mapDef = Dynamics.newFromYaml(out.toString()); + var mapDef = Dynamics.newFromYaml( + new Yaml(new SafeConstructor(new LoaderOptions())), out.toString()); // Apply K8s.apply(svcApi, mapDef, out.toString()); diff --git a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/StsReconciler.java b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/StsReconciler.java index e8fd27c..a2fb7f1 100644 --- a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/StsReconciler.java +++ b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/StsReconciler.java @@ -30,6 +30,9 @@ import java.io.StringWriter; import java.util.Map; import java.util.logging.Logger; import org.jdrupes.vmoperator.manager.VmDefChanged.Type; +import org.yaml.snakeyaml.LoaderOptions; +import org.yaml.snakeyaml.Yaml; +import org.yaml.snakeyaml.constructor.SafeConstructor; /** * Delegee for reconciling the stateful set (effectively the pod). @@ -88,7 +91,8 @@ import org.jdrupes.vmoperator.manager.VmDefChanged.Type; fmTemplate.process(model, out); // Avoid Yaml.load due to // https://github.com/kubernetes-client/java/issues/2741 - var stsDef = Dynamics.newFromYaml(out.toString()); + var stsDef = Dynamics.newFromYaml( + new Yaml(new SafeConstructor(new LoaderOptions())), out.toString()); // If exists apply changes only when transitioning state // or not running. diff --git a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/VmWatcher.java b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/VmWatcher.java index 7256e35..f20c52e 100644 --- a/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/VmWatcher.java +++ b/org.jdrupes.vmoperator.manager/src/org/jdrupes/vmoperator/manager/VmWatcher.java @@ -21,7 +21,6 @@ package org.jdrupes.vmoperator.manager; import com.google.gson.reflect.TypeToken; import io.kubernetes.client.openapi.ApiClient; import io.kubernetes.client.openapi.ApiException; -import io.kubernetes.client.openapi.Configuration; import io.kubernetes.client.openapi.apis.ApisApi; import io.kubernetes.client.openapi.apis.CustomObjectsApi; import io.kubernetes.client.openapi.models.V1APIGroup; @@ -29,6 +28,7 @@ import io.kubernetes.client.openapi.models.V1APIResource; import io.kubernetes.client.openapi.models.V1GroupVersionForDiscovery; import io.kubernetes.client.openapi.models.V1Namespace; import io.kubernetes.client.openapi.models.V1ObjectMeta; +import io.kubernetes.client.util.Config; import io.kubernetes.client.util.Watch; import io.kubernetes.client.util.generic.dynamic.DynamicKubernetesApi; import io.kubernetes.client.util.generic.options.ListOptions; @@ -59,7 +59,6 @@ import org.jgrapes.util.events.ConfigurationUpdate; @SuppressWarnings("PMD.DataflowAnomalyAnalysis") public class VmWatcher extends Component { - private ApiClient client; private String namespaceToWatch; private final Map channels = new ConcurrentHashMap<>(); @@ -115,7 +114,7 @@ public class VmWatcher extends Component { .fine(() -> "Controlling namespace \"" + namespaceToWatch + "\"."); // Get all our API versions - client = Configuration.getDefaultApiClient(); + var client = Config.defaultClient(); var apis = new ApisApi(client).getAPIVersions(); var vmOpApiVersions = apis.getGroups().stream() .filter(g -> g.getName().equals(VM_OP_GROUP)).findFirst() @@ -124,7 +123,7 @@ public class VmWatcher extends Component { // Remove left overs var coa = new CustomObjectsApi(client); - purge(coa, vmOpApiVersions); + purge(client, coa, vmOpApiVersions); // Start a watcher thread for each existing CRD version. // The watcher will send us an "ADDED" for each existing VM. @@ -133,14 +132,14 @@ public class VmWatcher extends Component { .getResources().stream() .filter(r -> Constants.VM_OP_KIND_VM.equals(r.getKind())) .findFirst() - .ifPresent(crd -> watchVmDefs(coa, crd, version)); + .ifPresent(crd -> watchVmDefs(crd, version)); } } @SuppressWarnings({ "PMD.AvoidInstantiatingObjectsInLoops", "PMD.CognitiveComplexity" }) - private void purge(CustomObjectsApi coa, List vmOpApiVersions) - throws ApiException { + private void purge(ApiClient client, CustomObjectsApi coa, + List vmOpApiVersions) throws ApiException { // Get existing CRs (VMs) Set known = new HashSet<>(); for (var version : vmOpApiVersions) { @@ -149,7 +148,7 @@ public class VmWatcher extends Component { .getResources().stream() .filter(r -> Constants.VM_OP_KIND_VM.equals(r.getKind())) .findFirst() - .ifPresent(crd -> known.addAll(getKnown(crd, version))); + .ifPresent(crd -> known.addAll(getKnown(client, crd, version))); } ListOptions opts = new ListOptions(); @@ -184,7 +183,8 @@ public class VmWatcher extends Component { } } - private Set getKnown(V1APIResource crd, String version) { + private Set getKnown(ApiClient client, V1APIResource crd, + String version) { Set result = new HashSet<>(); var api = new DynamicKubernetesApi(VM_OP_GROUP, version, crd.getName(), client); @@ -197,13 +197,14 @@ public class VmWatcher extends Component { return result; } - private void watchVmDefs(CustomObjectsApi coa, V1APIResource crd, - String version) { + private void watchVmDefs(V1APIResource crd, String version) { @SuppressWarnings("PMD.AvoidInstantiatingObjectsInLoops") var watcher = new Thread(() -> { try { // Watch sometimes terminates without apparent reason. while (true) { + var client = Config.defaultClient(); + var coa = new CustomObjectsApi(client); var call = coa.listNamespacedCustomObjectCall(VM_OP_GROUP, version, namespaceToWatch, crd.getName(), null, false, null, null, null, null, null, null, null, true, null); @@ -233,7 +234,16 @@ public class VmWatcher extends Component { Watch.Response item) { V1ObjectMeta metadata = item.object.getMetadata(); VmChannel channel = channels.computeIfAbsent(metadata.getName(), - k -> new VmChannel(channel(), newEventPipeline(), client)); + k -> { + try { + return new VmChannel(channel(), newEventPipeline(), + Config.defaultClient()); + } catch (IOException e) { + logger.log(Level.SEVERE, e, () -> "Failed to create client" + + " for handling changes: " + e.getMessage()); + return null; + } + }); channel.pipeline().fire(new VmDefChanged(VmDefChanged.Type .valueOf(item.type), vmsCrd, item.object), channel); }