From 4d447717c2a40a9e933bf95ab60921fa72e88366 Mon Sep 17 00:00:00 2001 From: "Michael N. Lipp" Date: Wed, 13 Nov 2024 23:45:54 +0100 Subject: [PATCH] Improve tracking. --- .../runner/qemu/ConsoleTracker.java | 159 +++++++++++++++ .../vmoperator/runner/qemu/StatusUpdater.java | 183 ++---------------- .../vmoperator/runner/qemu/VmDefUpdater.java | 141 ++++++++++++++ .../runner/qemu/events/MonitorEvent.java | 15 +- .../runner/qemu/events/SpiceEvent.java | 9 + .../qemu/events/SpiceInitializedEvent.java | 46 +++++ 6 files changed, 375 insertions(+), 178 deletions(-) create mode 100644 org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/ConsoleTracker.java create mode 100644 org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/VmDefUpdater.java create mode 100644 org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/events/SpiceInitializedEvent.java diff --git a/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/ConsoleTracker.java b/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/ConsoleTracker.java new file mode 100644 index 0000000..7d54235 --- /dev/null +++ b/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/ConsoleTracker.java @@ -0,0 +1,159 @@ +/* + * VM-Operator + * Copyright (C) 2024 Michael N. Lipp + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package org.jdrupes.vmoperator.runner.qemu; + +import com.google.gson.JsonObject; +import io.kubernetes.client.apimachinery.GroupVersionKind; +import io.kubernetes.client.openapi.ApiException; +import io.kubernetes.client.openapi.models.EventsV1Event; +import java.io.IOException; +import java.util.logging.Level; +import static org.jdrupes.vmoperator.common.Constants.APP_NAME; +import static org.jdrupes.vmoperator.common.Constants.VM_OP_GROUP; +import static org.jdrupes.vmoperator.common.Constants.VM_OP_KIND_VM; +import org.jdrupes.vmoperator.common.K8s; +import org.jdrupes.vmoperator.common.K8sClient; +import org.jdrupes.vmoperator.common.VmDefinitionStub; +import org.jdrupes.vmoperator.runner.qemu.events.Exit; +import org.jdrupes.vmoperator.runner.qemu.events.SpiceDisconnectedEvent; +import org.jdrupes.vmoperator.runner.qemu.events.SpiceInitializedEvent; +import org.jgrapes.core.Channel; +import org.jgrapes.core.annotation.Handler; +import org.jgrapes.core.events.Start; + +/** + * A (sub)component that updates the console status in the CR status. + * Created as child of {@link StatusUpdater}. + */ +@SuppressWarnings("PMD.DataflowAnomalyAnalysis") +public class ConsoleTracker extends VmDefUpdater { + + private final K8sClient apiClient; + private VmDefinitionStub vmStub; + private String mainChannelClientHost; + private long mainChannelClientPort; + + /** + * Instantiates a new status updater. + * + * @param componentChannel the component channel + */ + @SuppressWarnings("PMD.ConstructorCallsOverridableMethod") + public ConsoleTracker(Channel componentChannel) { + super(componentChannel); + apiClient = (K8sClient) io.kubernetes.client.openapi.Configuration + .getDefaultApiClient(); + } + + /** + * Handle the start event. + * + * @param event the event + * @throws IOException + * @throws ApiException + */ + @Handler + public void onStart(Start event) { + if (namespace == null) { + return; + } + try { + vmStub = VmDefinitionStub.get(apiClient, + new GroupVersionKind(VM_OP_GROUP, "", VM_OP_KIND_VM), + namespace, vmName); + } catch (ApiException e) { + logger.log(Level.SEVERE, e, + () -> "Cannot access VM object, terminating."); + event.cancel(true); + fire(new Exit(1)); + } + } + + /** + * On spice connected. + * + * @param event the event + * @throws ApiException the api exception + */ + @Handler + @SuppressWarnings({ "PMD.AvoidLiteralsInIfCondition", + "PMD.AvoidDuplicateLiterals" }) + public void onSpiceInitialized(SpiceInitializedEvent event) + throws ApiException { + if (vmStub == null) { + return; + } + + // Only process connections using main channel. + if (event.channelType() != 1) { + return; + } + mainChannelClientHost = event.clientHost(); + mainChannelClientPort = event.clientPort(); + vmStub.updateStatus(from -> { + JsonObject status = from.status(); + status.addProperty("consoleClient", event.clientHost()); + updateCondition(apiClient, from, status, "ConsoleConnected", + true, "Connection from " + event.clientHost(), null); + return status; + }); + + // Log event + var evt = new EventsV1Event() + .reportingController(VM_OP_GROUP + "/" + APP_NAME) + .action("ConsoleConnectionUpdate") + .reason("Connection from " + event.clientHost()); + K8s.createEvent(apiClient, vmStub.model().get(), evt); + } + + /** + * On spice disconnected. + * + * @param event the event + * @throws ApiException the api exception + */ + @Handler + @SuppressWarnings("PMD.AvoidDuplicateLiterals") + public void onSpiceDisconnected(SpiceDisconnectedEvent event) + throws ApiException { + if (vmStub == null) { + return; + } + + // Only process disconnects from main channel. + if (!event.clientHost().equals(mainChannelClientHost) + || event.clientPort() != mainChannelClientPort) { + return; + } + vmStub.updateStatus(from -> { + JsonObject status = from.status(); + status.addProperty("consoleClient", ""); + updateCondition(apiClient, from, status, "ConsoleConnected", + false, event.clientHost() + " has disconnected", null); + return status; + }); + + // Log event + var evt = new EventsV1Event() + .reportingController(VM_OP_GROUP + "/" + APP_NAME) + .action("ConsoleConnectionUpdate") + .reason("Disconnected from " + event.clientHost()); + K8s.createEvent(apiClient, vmStub.model().get(), evt); + } +} diff --git a/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/StatusUpdater.java b/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/StatusUpdater.java index f6814b3..ca5d46a 100644 --- a/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/StatusUpdater.java +++ b/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/StatusUpdater.java @@ -27,22 +27,13 @@ import io.kubernetes.client.openapi.ApiException; import io.kubernetes.client.openapi.models.EventsV1Event; import java.io.IOException; import java.math.BigDecimal; -import java.nio.file.Files; -import java.nio.file.Path; -import java.time.Instant; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.logging.Level; -import java.util.stream.Collectors; import static org.jdrupes.vmoperator.common.Constants.APP_NAME; import static org.jdrupes.vmoperator.common.Constants.VM_OP_GROUP; import static org.jdrupes.vmoperator.common.Constants.VM_OP_KIND_VM; import org.jdrupes.vmoperator.common.K8s; import org.jdrupes.vmoperator.common.K8sClient; -import org.jdrupes.vmoperator.common.K8sDynamicModel; import org.jdrupes.vmoperator.common.VmDefinitionModel; import org.jdrupes.vmoperator.common.VmDefinitionStub; import org.jdrupes.vmoperator.runner.qemu.events.BalloonChangeEvent; @@ -53,28 +44,21 @@ import org.jdrupes.vmoperator.runner.qemu.events.HotpluggableCpuStatus; import org.jdrupes.vmoperator.runner.qemu.events.RunnerStateChange; import org.jdrupes.vmoperator.runner.qemu.events.RunnerStateChange.RunState; import org.jdrupes.vmoperator.runner.qemu.events.ShutdownEvent; -import org.jdrupes.vmoperator.runner.qemu.events.SpiceConnectedEvent; -import org.jdrupes.vmoperator.runner.qemu.events.SpiceDisconnectedEvent; import org.jdrupes.vmoperator.util.GsonPtr; import org.jgrapes.core.Channel; -import org.jgrapes.core.Component; import org.jgrapes.core.annotation.Handler; import org.jgrapes.core.events.HandlingError; import org.jgrapes.core.events.Start; -import org.jgrapes.util.events.ConfigurationUpdate; -import org.jgrapes.util.events.InitialConfiguration; /** * Updates the CR status. */ @SuppressWarnings("PMD.DataflowAnomalyAnalysis") -public class StatusUpdater extends Component { +public class StatusUpdater extends VmDefUpdater { private static final Set RUNNING_STATES = Set.of(RunState.RUNNING, RunState.TERMINATING); - private String namespace; - private String vmName; private K8sClient apiClient; private long observedGeneration; private boolean guestShutdownStops; @@ -98,6 +82,7 @@ public class StatusUpdater extends Component { () -> "Cannot access events API, terminating."); fire(new Exit(1)); } + attach(new ConsoleTracker(componentChannel)); } /** @@ -114,43 +99,6 @@ public class StatusUpdater extends Component { } } - /** - * On configuration update. - * - * @param event the event - */ - @Handler - @SuppressWarnings("unchecked") - public void onConfigurationUpdate(ConfigurationUpdate event) { - event.structured("/Runner").ifPresent(c -> { - if (event instanceof InitialConfiguration) { - namespace = (String) c.get("namespace"); - updateNamespace(); - vmName = Optional.ofNullable((Map) c.get("vm")) - .map(vm -> vm.get("name")).orElse(null); - } - }); - } - - private void updateNamespace() { - if (namespace == null) { - var path = Path - .of("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); - if (Files.isReadable(path)) { - try { - namespace = Files.lines(path).findFirst().orElse(null); - } catch (IOException e) { - logger.log(Level.WARNING, e, - () -> "Cannot read namespace."); - } - } - } - if (namespace == null) { - logger.warning(() -> "Namespace is unknown, some functions" - + " won't be available."); - } - } - /** * Handle the start event. * @@ -238,13 +186,9 @@ public class StatusUpdater extends Component { } vmStub.updateStatus(vmDef, from -> { JsonObject status = from.status(); - status.getAsJsonArray("conditions").asList().stream() - .map(cond -> (JsonObject) cond) - .forEach(cond -> { - if ("Running".equals(cond.get("type").getAsString())) { - updateRunningCondition(event, from, cond); - } - }); + boolean running = RUNNING_STATES.contains(event.runState()); + updateCondition(apiClient, vmDef, vmDef.status(), "Running", + running, event.reason(), event.message()); if (event.runState() == RunState.STARTING) { status.addProperty("ram", GsonPtr.to(from.data()) .getAsString("spec", "vm", "maximumRam").orElse("0")); @@ -253,6 +197,13 @@ public class StatusUpdater extends Component { status.addProperty("ram", "0"); status.addProperty("cpus", 0); } + + // In case console connection was still present + if (!running) { + status.addProperty("consoleClient", ""); + updateCondition(apiClient, from, status, "ConsoleConnected", + false, "VM has stopped", null); + } return status; }); @@ -278,29 +229,6 @@ public class StatusUpdater extends Component { K8s.createEvent(apiClient, vmDef, evt); } - private void updateRunningCondition(RunnerStateChange event, - K8sDynamicModel from, JsonObject cond) { - @SuppressWarnings("PMD.AvoidDuplicateLiterals") - boolean reportedRunning - = "True".equals(cond.get("status").getAsString()); - if (RUNNING_STATES.contains(event.runState()) - && !reportedRunning) { - cond.addProperty("status", "True"); - cond.addProperty("lastTransitionTime", - Instant.now().toString()); - } - if (!RUNNING_STATES.contains(event.runState()) - && reportedRunning) { - cond.addProperty("status", "False"); - cond.addProperty("lastTransitionTime", - Instant.now().toString()); - } - cond.addProperty("reason", event.reason()); - cond.addProperty("message", event.message()); - cond.addProperty("observedGeneration", - from.getMetadata().getGeneration()); - } - /** * On ballon change. * @@ -369,91 +297,4 @@ public class StatusUpdater extends Component { public void onShutdown(ShutdownEvent event) throws ApiException { shutdownByGuest = event.byGuest(); } - - /** - * On spice connected. - * - * @param event the event - * @throws ApiException the api exception - */ - @Handler - public void onSpiceConnected(SpiceConnectedEvent event) - throws ApiException { - if (vmStub == null) { - return; - } - vmStub.updateStatus(from -> { - JsonObject status = from.status(); - status.addProperty("consoleClient", event.clientHost()); - updateConsoleConnectedCondition(from, status, true); - return status; - }); - - // Log event - var evt = new EventsV1Event() - .reportingController(VM_OP_GROUP + "/" + APP_NAME) - .action("ConsoleConnectionUpdate") - .reason("Connection from " + event.clientHost()); - K8s.createEvent(apiClient, vmStub.model().get(), evt); - } - - /** - * On spice disconnected. - * - * @param event the event - * @throws ApiException the api exception - */ - @Handler - public void onSpiceDisconnected(SpiceDisconnectedEvent event) - throws ApiException { - if (vmStub == null) { - return; - } - vmStub.updateStatus(from -> { - JsonObject status = from.status(); - status.addProperty("consoleClient", ""); - updateConsoleConnectedCondition(from, status, false); - return status; - }); - - // Log event - var evt = new EventsV1Event() - .reportingController(VM_OP_GROUP + "/" + APP_NAME) - .action("ConsoleConnectionUpdate") - .reason("Disconnected from " + event.clientHost()); - K8s.createEvent(apiClient, vmStub.model().get(), evt); - } - - private void updateConsoleConnectedCondition(VmDefinitionModel from, - JsonObject status, boolean connected) { - // Optimize, as we can get this several times - var current = status.getAsJsonArray("conditions").asList().stream() - .map(cond -> (JsonObject) cond) - .filter(cond -> "ConsoleConnected" - .equals(cond.get("type").getAsString())) - .findFirst() - .map(cond -> "True".equals(cond.get("status").getAsString())); - if (current.isPresent() && current.get() == connected) { - return; - } - - // Do update - final var condition = Map.of("type", "ConsoleConnected", - "status", connected ? "True" : "False", - "observedGeneration", from.getMetadata().getGeneration(), - "reason", connected ? "Connected" : "Disconnected", - "lastTransitionTime", Instant.now().toString()); - List toReplace = new ArrayList<>(List.of(condition)); - List newConds - = status.getAsJsonArray("conditions").asList().stream() - .map(cond -> (JsonObject) cond) - .map(cond -> "ConsoleConnected" - .equals(cond.get("type").getAsString()) - ? toReplace.remove(0) - : cond) - .collect(Collectors.toCollection(() -> new ArrayList<>())); - newConds.addAll(toReplace); - status.add("conditions", - apiClient.getJSON().getGson().toJsonTree(newConds)); - } } diff --git a/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/VmDefUpdater.java b/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/VmDefUpdater.java new file mode 100644 index 0000000..893fc61 --- /dev/null +++ b/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/VmDefUpdater.java @@ -0,0 +1,141 @@ +/* + * VM-Operator + * Copyright (C) 2024 Michael N. Lipp + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package org.jdrupes.vmoperator.runner.qemu; + +import com.google.gson.JsonObject; +import io.kubernetes.client.openapi.ApiClient; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.logging.Level; +import java.util.stream.Collectors; +import org.jdrupes.vmoperator.common.VmDefinitionModel; +import org.jgrapes.core.Channel; +import org.jgrapes.core.Component; +import org.jgrapes.core.annotation.Handler; +import org.jgrapes.util.events.ConfigurationUpdate; +import org.jgrapes.util.events.InitialConfiguration; + +/** + * Updates the CR status. + */ +@SuppressWarnings("PMD.DataflowAnomalyAnalysis") +public class VmDefUpdater extends Component { + + protected String namespace; + protected String vmName; + + /** + * Instantiates a new status updater. + * + * @param componentChannel the component channel + */ + @SuppressWarnings("PMD.ConstructorCallsOverridableMethod") + public VmDefUpdater(Channel componentChannel) { + super(componentChannel); + } + + /** + * On configuration update. + * + * @param event the event + */ + @Handler + @SuppressWarnings("unchecked") + public void onConfigurationUpdate(ConfigurationUpdate event) { + event.structured("/Runner").ifPresent(c -> { + if (event instanceof InitialConfiguration) { + namespace = (String) c.get("namespace"); + updateNamespace(); + vmName = Optional.ofNullable((Map) c.get("vm")) + .map(vm -> vm.get("name")).orElse(null); + } + }); + } + + private void updateNamespace() { + if (namespace == null) { + var path = Path + .of("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); + if (Files.isReadable(path)) { + try { + namespace = Files.lines(path).findFirst().orElse(null); + } catch (IOException e) { + logger.log(Level.WARNING, e, + () -> "Cannot read namespace."); + } + } + } + if (namespace == null) { + logger.warning(() -> "Namespace is unknown, some functions" + + " won't be available."); + } + } + + /** + * Update condition. + * + * @param apiClient the api client + * @param from the vM definition + * @param status the current status + * @param type the condition type + * @param state the new state + * @param reason the reason for the change + */ + protected void updateCondition(ApiClient apiClient, VmDefinitionModel from, + JsonObject status, String type, boolean state, String reason, + String message) { + // Optimize, as we can get this several times + var current = status.getAsJsonArray("conditions").asList().stream() + .map(cond -> (JsonObject) cond) + .filter(cond -> type.equals(cond.get("type").getAsString())) + .findFirst() + .map(cond -> "True".equals(cond.get("status").getAsString())); + if (current.isPresent() && current.get() == state) { + return; + } + + // Do update + final var condition = new HashMap<>(Map.of("type", type, + "status", state ? "True" : "False", + "observedGeneration", from.getMetadata().getGeneration(), + "reason", reason, + "lastTransitionTime", Instant.now().toString())); + if (message != null) { + condition.put("message", message); + } + List toReplace = new ArrayList<>(List.of(condition)); + List newConds + = status.getAsJsonArray("conditions").asList().stream() + .map(cond -> (JsonObject) cond) + .map(cond -> type.equals(cond.get("type").getAsString()) + ? toReplace.remove(0) + : cond) + .collect(Collectors.toCollection(() -> new ArrayList<>())); + newConds.addAll(toReplace); + status.add("conditions", + apiClient.getJSON().getGson().toJsonTree(newConds)); + } +} diff --git a/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/events/MonitorEvent.java b/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/events/MonitorEvent.java index 2cc0f33..df981c8 100644 --- a/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/events/MonitorEvent.java +++ b/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/events/MonitorEvent.java @@ -35,7 +35,7 @@ public class MonitorEvent extends Event { */ public enum Kind { READY, POWERDOWN, DEVICE_TRAY_MOVED, BALLOON_CHANGE, SHUTDOWN, - SPICE_CONNECTED, SPICE_DISCONNECTED + SPICE_CONNECTED, SPICE_INITIALIZED, SPICE_DISCONNECTED } private final Kind kind; @@ -64,13 +64,14 @@ public class MonitorEvent extends Event { return Optional .of(new ShutdownEvent(kind, response.get(EVENT_DATA))); case SPICE_CONNECTED: - return Optional - .of(new SpiceConnectedEvent(kind, - response.get(EVENT_DATA))); + return Optional.of(new SpiceConnectedEvent(kind, + response.get(EVENT_DATA))); + case SPICE_INITIALIZED: + return Optional.of(new SpiceInitializedEvent(kind, + response.get(EVENT_DATA))); case SPICE_DISCONNECTED: - return Optional - .of(new SpiceDisconnectedEvent(kind, - response.get(EVENT_DATA))); + return Optional.of(new SpiceDisconnectedEvent(kind, + response.get(EVENT_DATA))); default: return Optional .of(new MonitorEvent(kind, response.get(EVENT_DATA))); diff --git a/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/events/SpiceEvent.java b/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/events/SpiceEvent.java index 6706f0c..4ce27e2 100644 --- a/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/events/SpiceEvent.java +++ b/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/events/SpiceEvent.java @@ -43,4 +43,13 @@ public class SpiceEvent extends MonitorEvent { public String clientHost() { return data().get("client").get("host").asText(); } + + /** + * Returns the client's port. + * + * @return the client's port number + */ + public long clientPort() { + return data().get("client").get("port").asLong(); + } } diff --git a/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/events/SpiceInitializedEvent.java b/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/events/SpiceInitializedEvent.java new file mode 100644 index 0000000..7bb84b7 --- /dev/null +++ b/org.jdrupes.vmoperator.runner.qemu/src/org/jdrupes/vmoperator/runner/qemu/events/SpiceInitializedEvent.java @@ -0,0 +1,46 @@ +/* + * VM-Operator + * Copyright (C) 2023 Michael N. Lipp + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package org.jdrupes.vmoperator.runner.qemu.events; + +import com.fasterxml.jackson.databind.JsonNode; + +/** + * Signals a connection from a client. + */ +public class SpiceInitializedEvent extends SpiceEvent { + + /** + * Instantiates a new spice connected event. + * + * @param kind the kind + * @param data the data + */ + public SpiceInitializedEvent(Kind kind, JsonNode data) { + super(kind, data); + } + + /** + * Returns the channel type. + * + * @return the channel type + */ + public int channelType() { + return data().get("client").get("channel-type").asInt(); + } +}