From 3d67de36b51c861abba5f7813ca677ad1831e8ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Sun, 7 Jun 2026 16:51:30 +0200 Subject: [PATCH 01/12] improve: filter only own updates for read-after-write-conistency MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../controller/ControllerEventSource.java | 2 +- .../source/informer/EventFilterDetails.java | 11 ++++++ .../source/informer/InformerEventSource.java | 4 +-- .../informer/TemporaryResourceCache.java | 36 ++++++++++++++----- 4 files changed, 41 insertions(+), 12 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java index 07d59e039a..89fd2425d8 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java @@ -141,7 +141,7 @@ private void handleOnAddOrUpdate( ResourceAction action, T oldCustomResource, T newCustomResource) { var handling = temporaryResourceCache.onAddOrUpdateEvent(action, newCustomResource, oldCustomResource); - if (handling == EventHandling.NEW) { + if (handling == EventHandling.NEW || handling == EventHandling.IN_BETWEEN) { handleEvent(action, newCustomResource, oldCustomResource, null); } else if (log.isDebugEnabled()) { log.debug("{} event propagation for action: {}", handling, action); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java index b747c69dff..00b3c02931 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java @@ -15,7 +15,9 @@ */ package io.javaoperatorsdk.operator.processing.event.source.informer; +import java.util.HashSet; import java.util.Optional; +import java.util.Set; import java.util.function.UnaryOperator; import io.fabric8.kubernetes.api.model.HasMetadata; @@ -27,6 +29,7 @@ class EventFilterDetails { private int activeUpdates = 0; private ResourceEvent lastEvent; private String lastOwnUpdatedResourceVersion; + private Set allOwnResourceVersions = new HashSet<>(); public void increaseActiveUpdates() { activeUpdates = activeUpdates + 1; @@ -69,4 +72,12 @@ public Optional getLatestEventAfterLastUpdateEvent() { public int getActiveUpdates() { return activeUpdates; } + + void addToOwnResourceVersions(String updateVersion) { + allOwnResourceVersions.add(updateVersion); + } + + public boolean isOwnResourceVersions(String resourceVersion) { + return allOwnResourceVersions.contains(resourceVersion); + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index 93d3eb5e80..f3550470fb 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -154,12 +154,12 @@ private synchronized void onAddOrUpdate(ResourceAction action, R newObject, R ol var eventHandling = temporaryResourceCache.onAddOrUpdateEvent(action, newObject, oldObject); - if (eventHandling != EventHandling.NEW) { + if (eventHandling != EventHandling.NEW && eventHandling != EventHandling.IN_BETWEEN) { log.debug( "{} event propagation", eventHandling == EventHandling.DEFER ? "Deferring" : "Skipping"); } else if (eventAcceptedByFilter(action, newObject, oldObject)) { log.debug( - "Propagating event for {}, resource with same version not result of a reconciliation.", + "Propagating event for {}, resource with same version not result of a our update.", action); propagateEvent(newObject); } else { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index 405f52cc8d..feaa5cc04a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -65,6 +65,7 @@ public class TemporaryResourceCache { public enum EventHandling { DEFER, OBSOLETE, + IN_BETWEEN, NEW } @@ -145,17 +146,30 @@ private synchronized EventHandling onEvent( // additional event result = comp == 0 ? EventHandling.OBSOLETE : EventHandling.NEW; } else { - result = EventHandling.OBSOLETE; + // in this case we received and event that might be in some edge case that was + // already used in reconciler or after that, but before our updated resource version. + // That would be hard to distinguish, so for those we are propagating the event further. + log.debug("Received in between event."); + result = EventHandling.IN_BETWEEN; } } - var ed = activeUpdates.get(resourceId); - if (ed != null && result != EventHandling.OBSOLETE) { - log.debug("Setting last event for id: {} delete: {}", resourceId, delete); - ed.setLastEvent( - delete - ? new ResourceDeleteEvent(ResourceAction.DELETED, resourceId, resource, unknownState) - : new ExtendedResourceEvent(action, resourceId, resource, prevResourceVersion)); - return EventHandling.DEFER; + var au = activeUpdates.get(resourceId); + if (au != null) { + if (result == EventHandling.IN_BETWEEN) { + return au.isOwnResourceVersions(resource.getMetadata().getResourceVersion()) + ? EventHandling.DEFER + : EventHandling.IN_BETWEEN; + } + if (result == EventHandling.NEW) { + log.debug("Setting last event for id: {} delete: {}", resourceId, delete); + au.setLastEvent( + delete + ? new ResourceDeleteEvent( + ResourceAction.DELETED, resourceId, resource, unknownState) + : new ExtendedResourceEvent(action, resourceId, resource, prevResourceVersion)); + return EventHandling.DEFER; + } + return result; } else { return result; } @@ -216,6 +230,10 @@ public synchronized void putResource(T newResource) { newResource.getMetadata().getResourceVersion(), resourceId); cache.put(resourceId, newResource); + var au = activeUpdates.get(resourceId); + if (au != null) { + au.addToOwnResourceVersions(newResource.getMetadata().getResourceVersion()); + } } } From 6018bec5c7723aeb09e33ef425eadfe3cdc2b812 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Mon, 8 Jun 2026 11:00:58 +0200 Subject: [PATCH 02/12] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../controller/ControllerEventSource.java | 2 +- .../source/informer/InformerEventSource.java | 2 +- .../informer/TemporaryResourceCache.java | 10 +-- .../informer/InformerEventSourceTest.java | 59 ++++++++++++++++++ .../informer/TemporaryResourceCacheTest.java | 62 +++++++++++++++++++ 5 files changed, 128 insertions(+), 7 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java index 89fd2425d8..23b00499ba 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java @@ -141,7 +141,7 @@ private void handleOnAddOrUpdate( ResourceAction action, T oldCustomResource, T newCustomResource) { var handling = temporaryResourceCache.onAddOrUpdateEvent(action, newCustomResource, oldCustomResource); - if (handling == EventHandling.NEW || handling == EventHandling.IN_BETWEEN) { + if (handling == EventHandling.NEW || handling == EventHandling.INTERMEDIATE) { handleEvent(action, newCustomResource, oldCustomResource, null); } else if (log.isDebugEnabled()) { log.debug("{} event propagation for action: {}", handling, action); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index f3550470fb..eaf9ee8821 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -154,7 +154,7 @@ private synchronized void onAddOrUpdate(ResourceAction action, R newObject, R ol var eventHandling = temporaryResourceCache.onAddOrUpdateEvent(action, newObject, oldObject); - if (eventHandling != EventHandling.NEW && eventHandling != EventHandling.IN_BETWEEN) { + if (eventHandling != EventHandling.NEW && eventHandling != EventHandling.INTERMEDIATE) { log.debug( "{} event propagation", eventHandling == EventHandling.DEFER ? "Deferring" : "Skipping"); } else if (eventAcceptedByFilter(action, newObject, oldObject)) { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index feaa5cc04a..3593f797d8 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -65,7 +65,7 @@ public class TemporaryResourceCache { public enum EventHandling { DEFER, OBSOLETE, - IN_BETWEEN, + INTERMEDIATE, NEW } @@ -149,16 +149,16 @@ private synchronized EventHandling onEvent( // in this case we received and event that might be in some edge case that was // already used in reconciler or after that, but before our updated resource version. // That would be hard to distinguish, so for those we are propagating the event further. - log.debug("Received in between event."); - result = EventHandling.IN_BETWEEN; + log.debug("Received intermediate event."); + result = EventHandling.INTERMEDIATE; } } var au = activeUpdates.get(resourceId); if (au != null) { - if (result == EventHandling.IN_BETWEEN) { + if (result == EventHandling.INTERMEDIATE) { return au.isOwnResourceVersions(resource.getMetadata().getResourceVersion()) ? EventHandling.DEFER - : EventHandling.IN_BETWEEN; + : EventHandling.INTERMEDIATE; } if (result == EventHandling.NEW) { log.debug("Setting last event for id: {} delete: {}", resourceId, delete); diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index fe78bd3147..f52dd2e292 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -149,6 +149,15 @@ void processEventPropagationWithIncorrectAnnotation() { verify(eventHandlerMock, times(1)).handleEvent(any()); } + @Test + void propagatesIntermediateEventHandling() { + when(temporaryResourceCache.onAddOrUpdateEvent(any(), any(), any())) + .thenReturn(EventHandling.INTERMEDIATE); + informerEventSource.onUpdate(testDeployment(), testDeployment()); + + verify(eventHandlerMock, times(1)).handleEvent(any()); + } + @Test void propagateEventAndRemoveResourceFromTempCacheIfResourceVersionMismatch() { withRealTemporaryResourceCache(); @@ -439,6 +448,56 @@ void filteringUpdateAndGhostCheckWithNamespaceChange() { assertThat(temporaryResourceCache.getResourceFromCache(resourceId)).isEmpty(); } + @Test + void propagatesIntermediateEventForExternalUpdateDuringFiltering() { + // Causal-dependency fix: another controller updated the resource between our read + // and our write. The informer delivers that update during our active filter; since + // its resource version is NOT one of our own writes, it must be propagated. + var realCache = realCacheWithWatchedNamespace(); + var resourceId = ResourceID.fromResource(testDeployment()); + + realCache.startEventFilteringModify(resourceId); + realCache.putResource(deploymentWithResourceVersion(4)); + + informerEventSource.onUpdate( + deploymentWithResourceVersion(2), deploymentWithResourceVersion(3)); + + verify(eventHandlerMock, times(1)).handleEvent(any()); + + realCache.doneEventFilterModify(resourceId, "4"); + } + + @Test + void doesNotPropagateIntermediateEventForOurOwnIntermediateUpdate() { + // Two consecutive own writes within a single filter window: the older one's event + // arrives after the newer one has been cached. Because the version is recorded as + // our own, the event must be deferred (not propagated). + var realCache = realCacheWithWatchedNamespace(); + var resourceId = ResourceID.fromResource(testDeployment()); + + realCache.startEventFilteringModify(resourceId); + realCache.putResource(deploymentWithResourceVersion(3)); + realCache.putResource(deploymentWithResourceVersion(4)); + + informerEventSource.onUpdate( + deploymentWithResourceVersion(2), deploymentWithResourceVersion(3)); + + verify(eventHandlerMock, never()).handleEvent(any()); + + realCache.doneEventFilterModify(resourceId, "4"); + } + + private TemporaryResourceCache realCacheWithWatchedNamespace() { + var mes = mock(ManagedInformerEventSource.class); + var mim = mock(InformerManager.class); + when(mes.manager()).thenReturn(mim); + when(mim.isWatchingNamespace(any())).thenReturn(true); + when(mim.lastSyncResourceVersion(any())).thenReturn("1"); + temporaryResourceCache = spy(new TemporaryResourceCache<>(true, mes)); + informerEventSource.setTemporalResourceCache(temporaryResourceCache); + return temporaryResourceCache; + } + private void assertNoEventProduced() { await() .pollDelay(Duration.ofMillis(50)) diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java index 9a58b83f88..6d0c4b88d4 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java @@ -294,6 +294,68 @@ void putAfterEventWithEventFilteringWithPost() { assertTrue(postEvent.isPresent()); } + @Test + void intermediateEventPropagatedWhenNoActiveUpdate() { + // Cache holds a newer version from a prior own write; no active filter is in progress. + // An older event arriving used to be OBSOLETE; now it must be propagated as INTERMEDIATE + // so callers can react to changes that happened between read and write. + var olderEvent = testResource(); + var newer = testResource(); + newer.getMetadata().setResourceVersion("3"); + + temporaryResourceCache.putResource(newer); + assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(olderEvent))) + .isPresent(); + + var result = + temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.UPDATED, olderEvent, null); + + assertThat(result).isEqualTo(EventHandling.INTERMEDIATE); + } + + @Test + void intermediateEventPropagatedWhenNotOurOwnUpdate() { + // Causal-dependency scenario: a third party updated the resource between our read and + // our write. Its version arrives as an event but is NOT in our own resource versions, + // so it must be propagated (INTERMEDIATE), not deferred. + var external = testResource(); // rv=2 — written by another controller + var resourceId = ResourceID.fromResource(external); + + temporaryResourceCache.startEventFilteringModify(resourceId); + + var ourUpdate = testResource(); + ourUpdate.getMetadata().setResourceVersion("3"); + temporaryResourceCache.putResource(ourUpdate); + + var result = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.UPDATED, external, null); + + assertThat(result).isEqualTo(EventHandling.INTERMEDIATE); + } + + @Test + void intermediateEventDeferredWhenItIsOurOwnIntermediateUpdate() { + // Two consecutive own writes within the same filter window: the older one's event + // arrives after the newer one is cached. Because the version is recorded as our own, + // the event must be DEFERred rather than propagated. + var testResource = testResource(); + var resourceId = ResourceID.fromResource(testResource); + + temporaryResourceCache.startEventFilteringModify(resourceId); + + var ourFirst = testResource(); // rv=2 + temporaryResourceCache.putResource(ourFirst); + + var ourSecond = testResource(); + ourSecond.getMetadata().setResourceVersion("3"); + + temporaryResourceCache.startEventFilteringModify(resourceId); + temporaryResourceCache.putResource(ourSecond); + + var result = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.UPDATED, ourFirst, null); + + assertThat(result).isEqualTo(EventHandling.DEFER); + } + @Test void rapidDeletion() { var testResource = testResource(); From 4fad7566cfffc47844f212f100e1cbb8f5a773c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Mon, 8 Jun 2026 14:00:47 +0200 Subject: [PATCH 03/12] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../controller/ControllerEventSourceTest.java | 81 +++++++++++++++++++ .../informer/InformerEventSourceTest.java | 58 ++++++++----- 2 files changed, 118 insertions(+), 21 deletions(-) diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java index 4528fa8a83..72ea7df27f 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java @@ -35,12 +35,14 @@ import io.javaoperatorsdk.operator.processing.Controller; import io.javaoperatorsdk.operator.processing.event.EventHandler; import io.javaoperatorsdk.operator.processing.event.EventSourceManager; +import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.AbstractEventSourceTestBase; import io.javaoperatorsdk.operator.processing.event.source.EventFilterTestUtils; import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.filter.GenericFilter; import io.javaoperatorsdk.operator.processing.event.source.filter.OnAddFilter; import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter; +import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache; import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; import static io.javaoperatorsdk.operator.processing.event.source.EventFilterTestUtils.withResourceVersion; @@ -227,6 +229,74 @@ void eventFilteringExceptionDuringUpdate() { expectHandleEvent(2, 1); } + @Test + void propagatesIntermediateEventForExternalUpdateDuringFiltering() { + // Causal-dependency scenario: a third party updated the resource between our read and + // our write. The informer delivers that update during our active filter; since its + // resource version is NOT one of our own writes, it must be propagated. + var src = new TestableControllerEventSource(new TestController(null, null, null)); + setUpSource(src, true, controllerConfig); + + var resourceId = ResourceID.fromResource(TestUtils.testCustomResource1()); + + // first filter writes rv 4 (our own); a second concurrent filter keeps the + // active-updates window open while the event below is processed + var latch1 = sendForEventFilteringUpdate(4); + var latch2 = sendForEventFilteringUpdate(testResourceWithVersion(4), 5); + + latch1.countDown(); + awaitCachedResourceVersion(src.tempCache(), resourceId, "4"); + + // external update with rv 3 (older than our cached rv 4) — must propagate + source.onUpdate(testResourceWithVersion(2), testResourceWithVersion(3)); + + verify(eventHandler, times(1)).handleEvent(any()); + + latch2.countDown(); + } + + @Test + void doesNotPropagateIntermediateEventForOurOwnIntermediateUpdate() { + // Two consecutive own writes (rv 3 then rv 4) within an open filter window: an event + // for the older own version must be deferred since it's recognized as our own. A + // third concurrent filter keeps the active-updates window open while the event below + // is processed. + var src = new TestableControllerEventSource(new TestController(null, null, null)); + setUpSource(src, true, controllerConfig); + + var resourceId = ResourceID.fromResource(TestUtils.testCustomResource1()); + + var latch1 = sendForEventFilteringUpdate(3); + var latch2 = sendForEventFilteringUpdate(testResourceWithVersion(3), 4); + var latch3 = sendForEventFilteringUpdate(testResourceWithVersion(4), 5); + + latch1.countDown(); + awaitCachedResourceVersion(src.tempCache(), resourceId, "3"); + latch2.countDown(); + awaitCachedResourceVersion(src.tempCache(), resourceId, "4"); + + // event for our own rv 3 (older than cached rv 4) — must be deferred + source.onUpdate(testResourceWithVersion(2), testResourceWithVersion(3)); + + verify(eventHandler, never()).handleEvent(any()); + + latch3.countDown(); + } + + private void awaitCachedResourceVersion( + TemporaryResourceCache cache, + ResourceID resourceId, + String resourceVersion) { + await() + .untilAsserted( + () -> + assertThat( + cache + .getResourceFromCache(resourceId) + .map(r -> r.getMetadata().getResourceVersion())) + .hasValue(resourceVersion)); + } + private void expectHandleEvent(int newResourceVersion, int oldResourceVersion) { await() .untilAsserted( @@ -330,4 +400,15 @@ public TestConfiguration( false); } } + + private static class TestableControllerEventSource + extends ControllerEventSource { + TestableControllerEventSource(Controller controller) { + super(controller); + } + + TemporaryResourceCache tempCache() { + return temporaryResourceCache; + } + } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index f52dd2e292..b82d280397 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -453,49 +453,64 @@ void propagatesIntermediateEventForExternalUpdateDuringFiltering() { // Causal-dependency fix: another controller updated the resource between our read // and our write. The informer delivers that update during our active filter; since // its resource version is NOT one of our own writes, it must be propagated. - var realCache = realCacheWithWatchedNamespace(); + withRealTemporaryResourceCache(); + var resourceId = ResourceID.fromResource(testDeployment()); - realCache.startEventFilteringModify(resourceId); - realCache.putResource(deploymentWithResourceVersion(4)); + // first filter writes rv 4 (our own); a second concurrent filter keeps the + // active-updates window open so the event below hits the active path + var latch1 = sendForEventFilteringUpdate(4); + var latch2 = sendForEventFilteringUpdate(deploymentWithResourceVersion(4), 5); + + latch1.countDown(); + awaitCachedResourceVersion(resourceId, "4"); + // external update with rv 3 (older than our cached rv 4) — must propagate informerEventSource.onUpdate( deploymentWithResourceVersion(2), deploymentWithResourceVersion(3)); verify(eventHandlerMock, times(1)).handleEvent(any()); - realCache.doneEventFilterModify(resourceId, "4"); + latch2.countDown(); } @Test void doesNotPropagateIntermediateEventForOurOwnIntermediateUpdate() { - // Two consecutive own writes within a single filter window: the older one's event - // arrives after the newer one has been cached. Because the version is recorded as - // our own, the event must be deferred (not propagated). - var realCache = realCacheWithWatchedNamespace(); + // Two consecutive own writes (rv 3 then rv 4) within an open filter window: an + // event for the older own version must be deferred since it's recognized as our own. + // A third concurrent filter keeps the active-updates window open while the event + // below is processed. + withRealTemporaryResourceCache(); + var resourceId = ResourceID.fromResource(testDeployment()); - realCache.startEventFilteringModify(resourceId); - realCache.putResource(deploymentWithResourceVersion(3)); - realCache.putResource(deploymentWithResourceVersion(4)); + var latch1 = sendForEventFilteringUpdate(3); + var latch2 = sendForEventFilteringUpdate(deploymentWithResourceVersion(3), 4); + var latch3 = sendForEventFilteringUpdate(deploymentWithResourceVersion(4), 5); + + latch1.countDown(); + awaitCachedResourceVersion(resourceId, "3"); + latch2.countDown(); + awaitCachedResourceVersion(resourceId, "4"); + // event for our own rv 3 (older than cached rv 4) — must be deferred informerEventSource.onUpdate( deploymentWithResourceVersion(2), deploymentWithResourceVersion(3)); verify(eventHandlerMock, never()).handleEvent(any()); - realCache.doneEventFilterModify(resourceId, "4"); + latch3.countDown(); } - private TemporaryResourceCache realCacheWithWatchedNamespace() { - var mes = mock(ManagedInformerEventSource.class); - var mim = mock(InformerManager.class); - when(mes.manager()).thenReturn(mim); - when(mim.isWatchingNamespace(any())).thenReturn(true); - when(mim.lastSyncResourceVersion(any())).thenReturn("1"); - temporaryResourceCache = spy(new TemporaryResourceCache<>(true, mes)); - informerEventSource.setTemporalResourceCache(temporaryResourceCache); - return temporaryResourceCache; + private void awaitCachedResourceVersion(ResourceID resourceId, String resourceVersion) { + await() + .untilAsserted( + () -> + assertThat( + temporaryResourceCache + .getResourceFromCache(resourceId) + .map(d -> d.getMetadata().getResourceVersion())) + .hasValue(resourceVersion)); } private void assertNoEventProduced() { @@ -542,6 +557,7 @@ private void withRealTemporaryResourceCache() { var mes = mock(ManagedInformerEventSource.class); var mim = mock(InformerManager.class); when(mes.manager()).thenReturn(mim); + when(mim.isWatchingNamespace(any())).thenReturn(true); when(mim.lastSyncResourceVersion(any())).thenReturn("1"); temporaryResourceCache = spy(new TemporaryResourceCache<>(true, mes)); From a067b8c5be90787aaf4f6a222bc8153f0f1efc52 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Mon, 8 Jun 2026 16:45:21 +0200 Subject: [PATCH 04/12] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../controller/ControllerEventSource.java | 2 +- .../source/informer/EventFilterDetails.java | 32 ++++-- .../informer/ManagedInformerEventSource.java | 22 +++- .../informer/TemporaryResourceCache.java | 18 ++- ...etionDuringStatusUpdateCustomResource.java | 28 +++++ .../DeletionDuringStatusUpdateIT.java | 107 ++++++++++++++++++ .../DeletionDuringStatusUpdateReconciler.java | 80 +++++++++++++ .../DeletionDuringStatusUpdateStatus.java | 29 +++++ 8 files changed, 298 insertions(+), 20 deletions(-) create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/deletionduringstatusupdate/DeletionDuringStatusUpdateCustomResource.java create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/deletionduringstatusupdate/DeletionDuringStatusUpdateIT.java create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/deletionduringstatusupdate/DeletionDuringStatusUpdateReconciler.java create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/deletionduringstatusupdate/DeletionDuringStatusUpdateStatus.java diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java index 23b00499ba..3e0bc1617b 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java @@ -84,7 +84,7 @@ protected synchronized void handleEvent( try { if (log.isDebugEnabled()) { log.debug("Event received with action: {}", action); - log.trace("Event Old resource: {},\n new resource: {}", oldResource, resource); + log.debug("Event Old resource: {},\n new resource: {}", oldResource, resource); } MDCUtils.addResourceInfo(resource); controller.getEventSourceManager().broadcastOnResourceEvent(action, resource, oldResource); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java index 00b3c02931..4e1bd4ac47 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java @@ -27,9 +27,10 @@ class EventFilterDetails { private int activeUpdates = 0; - private ResourceEvent lastEvent; + private ResourceEvent lastRelevantEvent; private String lastOwnUpdatedResourceVersion; private Set allOwnResourceVersions = new HashSet<>(); + private Set uncertainEvents = new HashSet<>(); public void increaseActiveUpdates() { activeUpdates = activeUpdates + 1; @@ -53,18 +54,23 @@ public boolean decreaseActiveUpdates(String updatedResourceVersion) { return activeUpdates == 0; } - public void setLastEvent(ResourceEvent event) { - lastEvent = event; + public void setLastRelevantEvent(ResourceEvent event) { + lastRelevantEvent = event; } - public Optional getLatestEventAfterLastUpdateEvent() { - if (lastEvent != null - && (lastOwnUpdatedResourceVersion == null - || ReconcilerUtilsInternal.compareResourceVersions( - lastEvent.getResource().orElseThrow().getMetadata().getResourceVersion(), - lastOwnUpdatedResourceVersion) - > 0)) { - return Optional.of(lastEvent); + public Optional getRelevantEventToPropagate() { + if (lastRelevantEvent != null + && (lastOwnUpdatedResourceVersion == null + || ReconcilerUtilsInternal.compareResourceVersions( + lastRelevantEvent + .getResource() + .orElseThrow() + .getMetadata() + .getResourceVersion(), + lastOwnUpdatedResourceVersion) + > 0) + || allOwnResourceVersions.containsAll(uncertainEvents)) { + return Optional.of(lastRelevantEvent); } return Optional.empty(); } @@ -80,4 +86,8 @@ void addToOwnResourceVersions(String updateVersion) { public boolean isOwnResourceVersions(String resourceVersion) { return allOwnResourceVersions.contains(resourceVersion); } + + public void addUncertainResourceVersion(String resourceVersion) { + uncertainEvents.add(resourceVersion); + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index f021101229..07009f7db8 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -112,7 +112,6 @@ public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator< var updatedForLambda = updatedResource; res.ifPresentOrElse( r -> { - R latestResource = (R) r.getResource().orElseThrow(); // as previous resource version we use the one from successful update, since // we process new event here only if that is more recent then the event from our update. // Note that this is equivalent with the scenario when an informer watch connection @@ -123,8 +122,25 @@ public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator< (r instanceof ExtendedResourceEvent) ? (R) ((ExtendedResourceEvent) r).getPreviousResource().orElse(null) : null; - R prevVersionOfResource = - updatedForLambda != null ? updatedForLambda : extendedResourcePrevVersion; + R prevVersionOfResource = null; + R latestResource = null; + if (updatedForLambda != null) { + var updatedNewerThanRelated = + ReconcilerUtilsInternal.compareResourceVersions( + updatedForLambda, r.getResource().orElseThrow()) + > 0; + prevVersionOfResource = + updatedNewerThanRelated + ? (extendedResourcePrevVersion != null + ? extendedResourcePrevVersion + : prevVersionOfResource) + : updatedForLambda; + latestResource = updatedForLambda; + } else { + prevVersionOfResource = extendedResourcePrevVersion; + latestResource = (R) r.getResource().orElseThrow(); + } + if (log.isDebugEnabled()) { log.debug( "Previous resource version: {} resource from update present: {}" diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index 3593f797d8..a1517d86b0 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -98,7 +98,7 @@ public synchronized Optional doneEventFilterModify( return Optional.empty(); } activeUpdates.remove(resourceID); - var res = ed.getLatestEventAfterLastUpdateEvent(); + var res = ed.getRelevantEventToPropagate(); log.debug( "Zero active updates for resource id: {}; event after update event: {}; updated resource" + " version: {}", @@ -156,13 +156,21 @@ private synchronized EventHandling onEvent( var au = activeUpdates.get(resourceId); if (au != null) { if (result == EventHandling.INTERMEDIATE) { - return au.isOwnResourceVersions(resource.getMetadata().getResourceVersion()) - ? EventHandling.DEFER - : EventHandling.INTERMEDIATE; + var ownResourceVersion = + au.isOwnResourceVersions(resource.getMetadata().getResourceVersion()); + log.debug("Handling intermediate event. Own resource version: {}", ownResourceVersion); + return ownResourceVersion ? EventHandling.DEFER : EventHandling.INTERMEDIATE; } if (result == EventHandling.NEW) { + if (cached == null) { + // this is for the case when temp cache is null, we receive an event + // there is ongoing filtering-caching update; at this point we cannot tell + // if that event is from our update + log.debug("Setting uncertain resource version."); + au.addUncertainResourceVersion(resource.getMetadata().getResourceVersion()); + } log.debug("Setting last event for id: {} delete: {}", resourceId, delete); - au.setLastEvent( + au.setLastRelevantEvent( delete ? new ResourceDeleteEvent( ResourceAction.DELETED, resourceId, resource, unknownState) diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/deletionduringstatusupdate/DeletionDuringStatusUpdateCustomResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/deletionduringstatusupdate/DeletionDuringStatusUpdateCustomResource.java new file mode 100644 index 0000000000..5cb1170c34 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/deletionduringstatusupdate/DeletionDuringStatusUpdateCustomResource.java @@ -0,0 +1,28 @@ +/* + * Copyright Java Operator SDK Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.javaoperatorsdk.operator.baseapi.deletionduringstatusupdate; + +import io.fabric8.kubernetes.api.model.Namespaced; +import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.model.annotation.Group; +import io.fabric8.kubernetes.model.annotation.ShortNames; +import io.fabric8.kubernetes.model.annotation.Version; + +@Group("sample.javaoperatorsdk") +@Version("v1") +@ShortNames("ddsu") +public class DeletionDuringStatusUpdateCustomResource + extends CustomResource implements Namespaced {} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/deletionduringstatusupdate/DeletionDuringStatusUpdateIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/deletionduringstatusupdate/DeletionDuringStatusUpdateIT.java new file mode 100644 index 0000000000..7574dd07b4 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/deletionduringstatusupdate/DeletionDuringStatusUpdateIT.java @@ -0,0 +1,107 @@ +/* + * Copyright Java Operator SDK Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.javaoperatorsdk.operator.baseapi.deletionduringstatusupdate; + +import java.time.Duration; +import java.util.Collections; +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +/** + * Regression test for: deletion event dropped when resource is deleted concurrently with a status + * update. + */ +class DeletionDuringStatusUpdateIT { + + static final String RESOURCE_NAME = "test-resource"; + + @RegisterExtension + LocallyRunOperatorExtension extension = + LocallyRunOperatorExtension.builder() + .withReconciler(new DeletionDuringStatusUpdateReconciler()) + .build(); + + @AfterEach + void forceCleanup() { + // If the test failed, remove the finalizer so the resource can be deleted + var res = extension.get(DeletionDuringStatusUpdateCustomResource.class, RESOURCE_NAME); + if (res != null) { + res.getMetadata().setFinalizers(Collections.emptyList()); + extension.replace(res); + extension.delete(res); + } + } + + @Test + void deletionDuringStatusUpdateTriggersCleanup() throws InterruptedException { + var reconciler = extension.getReconcilerOfType(DeletionDuringStatusUpdateReconciler.class); + + extension.create(testResource()); + + // Wait until the reconciler is inside the update operation (active-update window is open) + assertThat(reconciler.patchStartedLatch.await(30, TimeUnit.SECONDS)) + .as("reconciler should enter the patch update operation") + .isTrue(); + + // Issue delete — K8s sets deletionTimestamp while the active-update window is open + extension.delete(testResource()); + + // Wait for deletionTimestamp to be confirmed on the resource in K8s + await() + .atMost(Duration.ofSeconds(30)) + .until( + () -> { + var res = + extension.get(DeletionDuringStatusUpdateCustomResource.class, RESOURCE_NAME); + return res != null && res.isMarkedForDeletion(); + }); + + // Signal the reconciler to proceed with the actual PATCH. K8s will merge deletionTimestamp + // into the response - the deletion event (lower RV) is now deferred and will be dropped + // without the fix. + reconciler.deleteConfirmedLatch.countDown(); + + // cleanup() must be called — the deletion must not be silently lost + assertThat(reconciler.cleanupCalledLatch.await(30, TimeUnit.SECONDS)) + .as("cleanup() must be called after the status update that races with the delete") + .isTrue(); + + // Resource must eventually disappear (finalizer removed) + await() + .atMost(Duration.ofSeconds(30)) + .untilAsserted( + () -> + assertThat( + extension.get( + DeletionDuringStatusUpdateCustomResource.class, RESOURCE_NAME)) + .isNull()); + } + + DeletionDuringStatusUpdateCustomResource testResource() { + var resource = new DeletionDuringStatusUpdateCustomResource(); + resource.setMetadata(new ObjectMetaBuilder().withName(RESOURCE_NAME).build()); + return resource; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/deletionduringstatusupdate/DeletionDuringStatusUpdateReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/deletionduringstatusupdate/DeletionDuringStatusUpdateReconciler.java new file mode 100644 index 0000000000..2c8943a977 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/deletionduringstatusupdate/DeletionDuringStatusUpdateReconciler.java @@ -0,0 +1,80 @@ +/* + * Copyright Java Operator SDK Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.javaoperatorsdk.operator.baseapi.deletionduringstatusupdate; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import io.javaoperatorsdk.operator.api.reconciler.Cleaner; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; +import io.javaoperatorsdk.operator.api.reconciler.Reconciler; +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; + +@ControllerConfiguration +public class DeletionDuringStatusUpdateReconciler + implements Reconciler, + Cleaner { + + final CountDownLatch patchStartedLatch = new CountDownLatch(1); + final CountDownLatch deleteConfirmedLatch = new CountDownLatch(1); + final CountDownLatch cleanupCalledLatch = new CountDownLatch(1); + + @Override + public UpdateControl reconcile( + DeletionDuringStatusUpdateCustomResource resource, + Context context) + throws InterruptedException { + if (resource.isMarkedForDeletion()) { + return UpdateControl.noUpdate(); + } + + var status = new DeletionDuringStatusUpdateStatus(); + status.setReady(true); + resource.setStatus(status); + + context + .resourceOperations() + .resourcePatch( + resource, + r -> { + patchStartedLatch.countDown(); + try { + if (!deleteConfirmedLatch.await(30, TimeUnit.SECONDS)) { + throw new RuntimeException("Timed out waiting for delete confirmation"); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + r.getMetadata().setResourceVersion(null); + return context.getClient().resource(r).patchStatus(); + }, + context.eventSourceRetriever().getControllerEventSource()); + + return UpdateControl.noUpdate(); + } + + @Override + public DeleteControl cleanup( + DeletionDuringStatusUpdateCustomResource resource, + Context context) { + System.out.println("DeletionDuringStatusUpdateReconciler.cleanup"); + cleanupCalledLatch.countDown(); + return DeleteControl.defaultDelete(); + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/deletionduringstatusupdate/DeletionDuringStatusUpdateStatus.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/deletionduringstatusupdate/DeletionDuringStatusUpdateStatus.java new file mode 100644 index 0000000000..52da516d00 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/deletionduringstatusupdate/DeletionDuringStatusUpdateStatus.java @@ -0,0 +1,29 @@ +/* + * Copyright Java Operator SDK Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.javaoperatorsdk.operator.baseapi.deletionduringstatusupdate; + +public class DeletionDuringStatusUpdateStatus { + + private boolean ready; + + public boolean isReady() { + return ready; + } + + public void setReady(boolean ready) { + this.ready = ready; + } +} From 317d8dfc1efdfc0ecee37f696ee473c6e8f4e40c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Mon, 8 Jun 2026 20:39:04 +0200 Subject: [PATCH 05/12] Event filtering with recording MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../source/informer/EventFilterDetails.java | 75 +++++++++---------- ...ceEvent.java => GenericResourceEvent.java} | 20 +++-- .../informer/ManagedInformerEventSource.java | 59 ++------------- .../informer/TemporaryResourceCache.java | 53 +++---------- .../controller/ControllerEventSourceTest.java | 7 +- .../informer/InformerEventSourceTest.java | 50 +++++++++---- .../informer/TemporaryResourceCacheTest.java | 18 ++--- .../DeletionDuringStatusUpdateReconciler.java | 1 - 8 files changed, 113 insertions(+), 170 deletions(-) rename operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/{ExtendedResourceEvent.java => GenericResourceEvent.java} (81%) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java index 4e1bd4ac47..fccc7b479c 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java @@ -15,22 +15,22 @@ */ package io.javaoperatorsdk.operator.processing.event.source.informer; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; import java.util.Optional; import java.util.Set; import java.util.function.UnaryOperator; +import java.util.stream.Collectors; import io.fabric8.kubernetes.api.model.HasMetadata; -import io.javaoperatorsdk.operator.ReconcilerUtilsInternal; -import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent; +import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; class EventFilterDetails { private int activeUpdates = 0; - private ResourceEvent lastRelevantEvent; - private String lastOwnUpdatedResourceVersion; + private List relatedEvents = new ArrayList<>(); private Set allOwnResourceVersions = new HashSet<>(); - private Set uncertainEvents = new HashSet<>(); public void increaseActiveUpdates() { activeUpdates = activeUpdates + 1; @@ -41,40 +41,11 @@ public void increaseActiveUpdates() { * controller to prevent race condition and send event from {@link * ManagedInformerEventSource#eventFilteringUpdateAndCacheResource(HasMetadata, UnaryOperator)} */ - public boolean decreaseActiveUpdates(String updatedResourceVersion) { - if (updatedResourceVersion != null - && (lastOwnUpdatedResourceVersion == null - || ReconcilerUtilsInternal.compareResourceVersions( - updatedResourceVersion, lastOwnUpdatedResourceVersion) - > 0)) { - lastOwnUpdatedResourceVersion = updatedResourceVersion; - } - + public boolean decreaseActiveUpdates() { activeUpdates = activeUpdates - 1; return activeUpdates == 0; } - public void setLastRelevantEvent(ResourceEvent event) { - lastRelevantEvent = event; - } - - public Optional getRelevantEventToPropagate() { - if (lastRelevantEvent != null - && (lastOwnUpdatedResourceVersion == null - || ReconcilerUtilsInternal.compareResourceVersions( - lastRelevantEvent - .getResource() - .orElseThrow() - .getMetadata() - .getResourceVersion(), - lastOwnUpdatedResourceVersion) - > 0) - || allOwnResourceVersions.containsAll(uncertainEvents)) { - return Optional.of(lastRelevantEvent); - } - return Optional.empty(); - } - public int getActiveUpdates() { return activeUpdates; } @@ -83,11 +54,37 @@ void addToOwnResourceVersions(String updateVersion) { allOwnResourceVersions.add(updateVersion); } - public boolean isOwnResourceVersions(String resourceVersion) { - return allOwnResourceVersions.contains(resourceVersion); + public void addRelatedEvent(GenericResourceEvent event) { + relatedEvents.add(event); } - public void addUncertainResourceVersion(String resourceVersion) { - uncertainEvents.add(resourceVersion); + public Optional prepareSummaryEventIfNotOwnEventsPresent() { + if (relatedEvents.isEmpty()) { + return Optional.empty(); + } + if (allOwnResourceVersions.containsAll( + relatedEvents.stream() + .map(e -> e.getResource().orElseThrow().getMetadata().getResourceVersion()) + .collect(Collectors.toSet()))) { + return Optional.empty(); + } + var deleteEvent = + relatedEvents.stream().filter(e -> e.getAction() == ResourceAction.DELETED).findFirst(); + if (deleteEvent.isPresent()) { + return deleteEvent; + } + if (relatedEvents.size() == 1) { + return Optional.of(relatedEvents.get(0)); + } + var firstEvent = relatedEvents.get(0); + var firstResource = + firstEvent.getPreviousResource().orElseGet(() -> firstEvent.getResource().orElseThrow()); + + return Optional.of( + new GenericResourceEvent( + ResourceAction.UPDATED, + relatedEvents.get(relatedEvents.size() - 1).getResource().orElseThrow(), + firstResource, + null)); } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ExtendedResourceEvent.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/GenericResourceEvent.java similarity index 81% rename from operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ExtendedResourceEvent.java rename to operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/GenericResourceEvent.java index 5d30d1b0e1..c6911f48cc 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ExtendedResourceEvent.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/GenericResourceEvent.java @@ -24,26 +24,32 @@ import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent; /** Used only for resource event filtering. */ -public class ExtendedResourceEvent extends ResourceEvent { +public class GenericResourceEvent extends ResourceEvent { private final HasMetadata previousResource; + private final Boolean lastStateUnknow; - public ExtendedResourceEvent( + public GenericResourceEvent( ResourceAction action, - ResourceID resourceID, HasMetadata latestResource, - HasMetadata previousResource) { - super(action, resourceID, latestResource); + HasMetadata previousResource, + Boolean lastStateUnknow) { + super(action, ResourceID.fromResource(latestResource), latestResource); this.previousResource = previousResource; + this.lastStateUnknow = lastStateUnknow; } public Optional getPreviousResource() { return Optional.ofNullable(previousResource); } + public Boolean getLastStateUnknow() { + return lastStateUnknow; + } + @Override public String toString() { - return "ExtendedResourceEvent{" + return "GenericResourceEvent{" + getPreviousResource() .map(r -> "previousResourceVersion=" + r.getMetadata().getResourceVersion()) .orElse("") @@ -61,7 +67,7 @@ public String toString() { public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; if (!super.equals(o)) return false; - ExtendedResourceEvent that = (ExtendedResourceEvent) o; + GenericResourceEvent that = (GenericResourceEvent) o; return Objects.equals(previousResource, that.previousResource); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 07009f7db8..52fd296773 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -46,7 +46,6 @@ import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.*; import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; -import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceDeleteEvent; @SuppressWarnings("rawtypes") public abstract class ManagedInformerEventSource< @@ -105,58 +104,14 @@ public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator< handleRecentResourceUpdate(id, updatedResource, resourceToUpdate); return updatedResource; } finally { - var res = - temporaryResourceCache.doneEventFilterModify( - id, - updatedResource == null ? null : updatedResource.getMetadata().getResourceVersion()); - var updatedForLambda = updatedResource; + var res = temporaryResourceCache.doneEventFilterModify(id); res.ifPresentOrElse( - r -> { - // as previous resource version we use the one from successful update, since - // we process new event here only if that is more recent then the event from our update. - // Note that this is equivalent with the scenario when an informer watch connection - // would reconnect and loose some events in between. - // If that update was not successful we still record the previous version from the - // actual event in the ExtendedResourceEvent. - R extendedResourcePrevVersion = - (r instanceof ExtendedResourceEvent) - ? (R) ((ExtendedResourceEvent) r).getPreviousResource().orElse(null) - : null; - R prevVersionOfResource = null; - R latestResource = null; - if (updatedForLambda != null) { - var updatedNewerThanRelated = - ReconcilerUtilsInternal.compareResourceVersions( - updatedForLambda, r.getResource().orElseThrow()) - > 0; - prevVersionOfResource = - updatedNewerThanRelated - ? (extendedResourcePrevVersion != null - ? extendedResourcePrevVersion - : prevVersionOfResource) - : updatedForLambda; - latestResource = updatedForLambda; - } else { - prevVersionOfResource = extendedResourcePrevVersion; - latestResource = (R) r.getResource().orElseThrow(); - } - - if (log.isDebugEnabled()) { - log.debug( - "Previous resource version: {} resource from update present: {}" - + " extendedPrevResource present: {}", - prevVersionOfResource.getMetadata().getResourceVersion(), - updatedForLambda != null, - extendedResourcePrevVersion != null); - } - handleEvent( - r.getAction(), - latestResource, - prevVersionOfResource, - (r instanceof ResourceDeleteEvent) - ? ((ResourceDeleteEvent) r).isDeletedFinalStateUnknown() - : null); - }, + r -> + handleEvent( + r.getAction(), + (R) r.getResource().orElseThrow(), + (R) r.getPreviousResource().orElse(null), + r.getLastStateUnknow()), () -> log.debug("No new event present after the filtering update")); } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index a1517d86b0..8ee3f44b4d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -29,8 +29,6 @@ import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource; import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; -import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceDeleteEvent; -import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent; /** * Temporal cache is used to solve the problem for {@link KubernetesDependentResource} that is, when @@ -84,13 +82,12 @@ public synchronized void startEventFilteringModify(ResourceID resourceID) { ed.increaseActiveUpdates(); } - public synchronized Optional doneEventFilterModify( - ResourceID resourceID, String updatedResourceVersion) { + public synchronized Optional doneEventFilterModify(ResourceID resourceID) { if (!comparableResourceVersions) { return Optional.empty(); } var ed = activeUpdates.get(resourceID); - if (ed == null || !ed.decreaseActiveUpdates(updatedResourceVersion)) { + if (ed == null || !ed.decreaseActiveUpdates()) { log.debug( "Active updates {} for resource id: {}", ed != null ? ed.getActiveUpdates() : 0, @@ -98,31 +95,20 @@ public synchronized Optional doneEventFilterModify( return Optional.empty(); } activeUpdates.remove(resourceID); - var res = ed.getRelevantEventToPropagate(); - log.debug( - "Zero active updates for resource id: {}; event after update event: {}; updated resource" - + " version: {}", - resourceID, - res.isPresent(), - updatedResourceVersion); - return res; + return ed.prepareSummaryEventIfNotOwnEventsPresent(); } public void onDeleteEvent(T resource, boolean unknownState) { - onEvent(ResourceAction.DELETED, resource, null, unknownState, true); + onEvent(ResourceAction.DELETED, resource, null, unknownState); } public EventHandling onAddOrUpdateEvent( ResourceAction action, T resource, T prevResourceVersion) { - return onEvent(action, resource, prevResourceVersion, false, false); + return onEvent(action, resource, prevResourceVersion, false); } private synchronized EventHandling onEvent( - ResourceAction action, - T resource, - T prevResourceVersion, - boolean unknownState, - boolean delete) { + ResourceAction action, T resource, T prevResourceVersion, boolean unknownState) { if (!comparableResourceVersions) { return EventHandling.NEW; } @@ -155,29 +141,10 @@ private synchronized EventHandling onEvent( } var au = activeUpdates.get(resourceId); if (au != null) { - if (result == EventHandling.INTERMEDIATE) { - var ownResourceVersion = - au.isOwnResourceVersions(resource.getMetadata().getResourceVersion()); - log.debug("Handling intermediate event. Own resource version: {}", ownResourceVersion); - return ownResourceVersion ? EventHandling.DEFER : EventHandling.INTERMEDIATE; - } - if (result == EventHandling.NEW) { - if (cached == null) { - // this is for the case when temp cache is null, we receive an event - // there is ongoing filtering-caching update; at this point we cannot tell - // if that event is from our update - log.debug("Setting uncertain resource version."); - au.addUncertainResourceVersion(resource.getMetadata().getResourceVersion()); - } - log.debug("Setting last event for id: {} delete: {}", resourceId, delete); - au.setLastRelevantEvent( - delete - ? new ResourceDeleteEvent( - ResourceAction.DELETED, resourceId, resource, unknownState) - : new ExtendedResourceEvent(action, resourceId, resource, prevResourceVersion)); - return EventHandling.DEFER; - } - return result; + log.debug("Recording relevant event"); + au.addRelatedEvent( + new GenericResourceEvent(action, resource, prevResourceVersion, unknownState)); + return EventHandling.DEFER; } else { return result; } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java index 72ea7df27f..b84b7992b7 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java @@ -249,10 +249,9 @@ void propagatesIntermediateEventForExternalUpdateDuringFiltering() { // external update with rv 3 (older than our cached rv 4) — must propagate source.onUpdate(testResourceWithVersion(2), testResourceWithVersion(3)); - - verify(eventHandler, times(1)).handleEvent(any()); - latch2.countDown(); + + await().untilAsserted(() -> verify(eventHandler, times(1)).handleEvent(any())); } @Test @@ -317,7 +316,7 @@ private void expectHandleEvent(int newResourceVersion, int oldResourceVersion) { .isEqualTo("" + oldResourceVersion); return true; }), - isNull()); + any()); }); } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index b82d280397..847556870c 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -71,7 +71,7 @@ class InformerEventSourceTest { private static final String PREV_RESOURCE_VERSION = "0"; - private static final String DEFAULT_RESOURCE_VERSION = "1"; + private static final String DEFAULT_RESOURCE_VERSION = "2"; private InformerEventSource informerEventSource; private final KubernetesClient clientMock = MockKubernetesClient.client(Deployment.class); @@ -218,12 +218,12 @@ void filtersOnDeleteEvents() { void handlesPrevResourceVersionForUpdate() { withRealTemporaryResourceCache(); - CountDownLatch latch = sendForEventFilteringUpdate(2); + CountDownLatch latch = sendForEventFilteringUpdate(3); informerEventSource.onUpdate( - deploymentWithResourceVersion(2), deploymentWithResourceVersion(3)); + deploymentWithResourceVersion(1), deploymentWithResourceVersion(2)); latch.countDown(); - expectHandleEvent(3, 2); + expectHandleAddEvent(2, 1); } @Test @@ -241,7 +241,7 @@ void handlesPrevResourceVersionForUpdateInCaseOfException() { deploymentWithResourceVersion(1), deploymentWithResourceVersion(2)); latch.countDown(); - expectHandleEvent(2, 1); + expectHandleAddEvent(2, 1); } @Test @@ -256,7 +256,7 @@ void handlesPrevResourceVersionForUpdateInCaseOfMultipleUpdates() { withResourceVersion(testDeployment(), 3), withResourceVersion(testDeployment(), 4)); latch.countDown(); - expectHandleEvent(4, 2); + expectHandleAddEvent(4, 2); } @Test @@ -275,11 +275,11 @@ void doesNotPropagateEventIfReceivedBeforeUpdate() { void filterAddEventBeforeUpdate() { withRealTemporaryResourceCache(); - CountDownLatch latch = sendForEventFilteringUpdate(2); - informerEventSource.onAdd(deploymentWithResourceVersion(1)); + CountDownLatch latch = sendForEventFilteringUpdate(3); + informerEventSource.onAdd(deploymentWithResourceVersion(2)); latch.countDown(); - assertNoEventProduced(); + expectHandleAddEvent(2); } @Test @@ -379,7 +379,7 @@ void ghostCheckRemovesCachedResourceDuringFilteringUpdate() { assertThat(temporaryResourceCache.getResourceFromCache(resourceId)).isEmpty(); // complete the filtering update - the resource should not reappear - temporaryResourceCache.doneEventFilterModify(resourceId, "2"); + temporaryResourceCache.doneEventFilterModify(resourceId); assertThat(temporaryResourceCache.getResourceFromCache(resourceId)).isEmpty(); } @@ -439,7 +439,7 @@ void filteringUpdateAndGhostCheckWithNamespaceChange() { assertThat(temporaryResourceCache.getResourceFromCache(resourceId)).isEmpty(); // complete the filtering update - var doneResult = temporaryResourceCache.doneEventFilterModify(resourceId, "2"); + var doneResult = temporaryResourceCache.doneEventFilterModify(resourceId); // resource was already cleaned by ghost check, so no deferred event assertThat(doneResult).isEmpty(); @@ -469,9 +469,9 @@ void propagatesIntermediateEventForExternalUpdateDuringFiltering() { informerEventSource.onUpdate( deploymentWithResourceVersion(2), deploymentWithResourceVersion(3)); - verify(eventHandlerMock, times(1)).handleEvent(any()); - latch2.countDown(); + + expectHandleAddEvent(3, 2); } @Test @@ -521,8 +521,28 @@ private void assertNoEventProduced() { () -> verify(informerEventSource, never()).handleEvent(any(), any(), any(), any())); } - private void expectHandleEvent(int newResourceVersion, int oldResourceVersion) { + private void expectHandleAddEvent(int newResourceVersion) { + await() + .atMost(Duration.ofSeconds(1)) + .untilAsserted( + () -> { + verify(informerEventSource, times(1)) + .handleEvent( + eq(ResourceAction.ADDED), + argThat( + newResource -> { + assertThat(newResource.getMetadata().getResourceVersion()) + .isEqualTo("" + newResourceVersion); + return true; + }), + isNull(), + any()); + }); + } + + private void expectHandleAddEvent(int newResourceVersion, int oldResourceVersion) { await() + .atMost(Duration.ofSeconds(1)) .untilAsserted( () -> { verify(informerEventSource, times(1)) @@ -540,7 +560,7 @@ private void expectHandleEvent(int newResourceVersion, int oldResourceVersion) { .isEqualTo("" + oldResourceVersion); return true; }), - isNull()); + any()); }); } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java index 6d0c4b88d4..2917367333 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java @@ -155,7 +155,7 @@ void eventReceivedDuringFiltering() { .isEmpty(); var doneRes = - temporaryResourceCache.doneEventFilterModify(ResourceID.fromResource(testResource), "2"); + temporaryResourceCache.doneEventFilterModify(ResourceID.fromResource(testResource)); assertThat(doneRes).isEmpty(); assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource))) @@ -179,7 +179,7 @@ void newerEventDuringFiltering() { .isEmpty(); var doneRes = - temporaryResourceCache.doneEventFilterModify(ResourceID.fromResource(testResource), "2"); + temporaryResourceCache.doneEventFilterModify(ResourceID.fromResource(testResource)); assertThat(doneRes).isPresent(); assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource))) @@ -197,7 +197,7 @@ void eventAfterFiltering() { .isPresent(); var doneRes = - temporaryResourceCache.doneEventFilterModify(ResourceID.fromResource(testResource), "2"); + temporaryResourceCache.doneEventFilterModify(ResourceID.fromResource(testResource)); assertThat(doneRes).isEmpty(); assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(testResource))) @@ -241,7 +241,7 @@ void putBeforeEventWithEventFiltering() { temporaryResourceCache.startEventFilteringModify(resourceId); temporaryResourceCache.putResource(nextResource); - temporaryResourceCache.doneEventFilterModify(resourceId, "3"); + temporaryResourceCache.doneEventFilterModify(resourceId); latestSyncVersion = "3"; result = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.UPDATED, nextResource, null); @@ -268,7 +268,7 @@ void putAfterEventWithEventFilteringNoPost() { // the result is deferred assertThat(result).isEqualTo(EventHandling.DEFER); temporaryResourceCache.putResource(nextResource); - var postEvent = temporaryResourceCache.doneEventFilterModify(resourceId, "3"); + var postEvent = temporaryResourceCache.doneEventFilterModify(resourceId); // there is no post event because the done call claimed responsibility for rv 3 assertTrue(postEvent.isEmpty()); @@ -280,7 +280,7 @@ void putAfterEventWithEventFilteringWithPost() { var resourceId = ResourceID.fromResource(testResource); temporaryResourceCache.startEventFilteringModify(resourceId); - // this should be a corner case - watch had a hard reset since the start of the + // this should be a corner case - watch had a hard reset since the start // of the update operation, such that 4 rv event is seen prior to the update // completing with the 3 rv. var nextResource = testResource(); @@ -289,7 +289,7 @@ void putAfterEventWithEventFilteringWithPost() { temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.ADDED, nextResource, null); assertThat(result).isEqualTo(EventHandling.DEFER); - var postEvent = temporaryResourceCache.doneEventFilterModify(resourceId, "3"); + var postEvent = temporaryResourceCache.doneEventFilterModify(resourceId); assertTrue(postEvent.isPresent()); } @@ -314,7 +314,7 @@ void intermediateEventPropagatedWhenNoActiveUpdate() { } @Test - void intermediateEventPropagatedWhenNotOurOwnUpdate() { + void intermediateEventRecorded() { // Causal-dependency scenario: a third party updated the resource between our read and // our write. Its version arrives as an event but is NOT in our own resource versions, // so it must be propagated (INTERMEDIATE), not deferred. @@ -329,7 +329,7 @@ void intermediateEventPropagatedWhenNotOurOwnUpdate() { var result = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.UPDATED, external, null); - assertThat(result).isEqualTo(EventHandling.INTERMEDIATE); + assertThat(result).isEqualTo(EventHandling.DEFER); } @Test diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/deletionduringstatusupdate/DeletionDuringStatusUpdateReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/deletionduringstatusupdate/DeletionDuringStatusUpdateReconciler.java index 2c8943a977..db05321ee7 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/deletionduringstatusupdate/DeletionDuringStatusUpdateReconciler.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/deletionduringstatusupdate/DeletionDuringStatusUpdateReconciler.java @@ -73,7 +73,6 @@ public UpdateControl reconcile( public DeleteControl cleanup( DeletionDuringStatusUpdateCustomResource resource, Context context) { - System.out.println("DeletionDuringStatusUpdateReconciler.cleanup"); cleanupCalledLatch.countDown(); return DeleteControl.defaultDelete(); } From 9273843fbf7bc4d4f2d0274517af7bd99a4443e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Mon, 8 Jun 2026 20:43:15 +0200 Subject: [PATCH 06/12] test fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../event/source/informer/InformerEventSourceTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index 847556870c..25ae10c321 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -339,14 +339,14 @@ void multipleCachingFilteringUpdates_variant3() { void multipleCachingFilteringUpdates_variant4() { withRealTemporaryResourceCache(); - CountDownLatch latch = sendForEventFilteringUpdate(2); + CountDownLatch latch = sendForEventFilteringUpdate(3); CountDownLatch latch2 = - sendForEventFilteringUpdate(withResourceVersion(testDeployment(), 2), 3); + sendForEventFilteringUpdate(withResourceVersion(testDeployment(), 3), 4); - informerEventSource.onUpdate( - deploymentWithResourceVersion(1), deploymentWithResourceVersion(2)); informerEventSource.onUpdate( deploymentWithResourceVersion(2), deploymentWithResourceVersion(3)); + informerEventSource.onUpdate( + deploymentWithResourceVersion(3), deploymentWithResourceVersion(4)); latch.countDown(); latch2.countDown(); From d8682cc5d06a997ee6ca6dfa5149183c6b3b25ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Tue, 9 Jun 2026 10:44:04 +0200 Subject: [PATCH 07/12] Simplified EventHandling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../controller/ControllerEventSource.java | 2 +- .../source/informer/InformerEventSource.java | 4 ++-- .../informer/TemporaryResourceCache.java | 15 ++++++-------- .../informer/InformerEventSourceTest.java | 8 ++++---- .../informer/TemporaryResourceCacheTest.java | 20 +++++++++---------- 5 files changed, 23 insertions(+), 26 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java index 3e0bc1617b..1ce8ce0620 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java @@ -141,7 +141,7 @@ private void handleOnAddOrUpdate( ResourceAction action, T oldCustomResource, T newCustomResource) { var handling = temporaryResourceCache.onAddOrUpdateEvent(action, newCustomResource, oldCustomResource); - if (handling == EventHandling.NEW || handling == EventHandling.INTERMEDIATE) { + if (handling == EventHandling.PROPAGATE) { handleEvent(action, newCustomResource, oldCustomResource, null); } else if (log.isDebugEnabled()) { log.debug("{} event propagation for action: {}", handling, action); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index eaf9ee8821..afbf0a33ab 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -154,9 +154,9 @@ private synchronized void onAddOrUpdate(ResourceAction action, R newObject, R ol var eventHandling = temporaryResourceCache.onAddOrUpdateEvent(action, newObject, oldObject); - if (eventHandling != EventHandling.NEW && eventHandling != EventHandling.INTERMEDIATE) { + if (eventHandling != EventHandling.PROPAGATE) { log.debug( - "{} event propagation", eventHandling == EventHandling.DEFER ? "Deferring" : "Skipping"); + "{} event propagation", eventHandling == EventHandling.IGNORE ? "Deferring" : "Skipping"); } else if (eventAcceptedByFilter(action, newObject, oldObject)) { log.debug( "Propagating event for {}, resource with same version not result of a our update.", diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index 8ee3f44b4d..b9f50c5ac9 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -61,10 +61,8 @@ public class TemporaryResourceCache { private final ManagedInformerEventSource managedInformerEventSource; public enum EventHandling { - DEFER, - OBSOLETE, - INTERMEDIATE, - NEW + IGNORE, + PROPAGATE } public TemporaryResourceCache( @@ -110,7 +108,7 @@ public EventHandling onAddOrUpdateEvent( private synchronized EventHandling onEvent( ResourceAction action, T resource, T prevResourceVersion, boolean unknownState) { if (!comparableResourceVersions) { - return EventHandling.NEW; + return EventHandling.PROPAGATE; } var resourceId = ResourceID.fromResource(resource); @@ -118,7 +116,7 @@ private synchronized EventHandling onEvent( log.debug("Processing event"); } var cached = cache.get(resourceId); - EventHandling result = EventHandling.NEW; + EventHandling result = EventHandling.PROPAGATE; if (cached != null) { int comp = ReconcilerUtilsInternal.compareResourceVersions(resource, cached); if (comp >= 0 || unknownState) { @@ -130,13 +128,12 @@ private synchronized EventHandling onEvent( // we propagate event only for our update or newer other can be discarded since we know we // will receive // additional event - result = comp == 0 ? EventHandling.OBSOLETE : EventHandling.NEW; + result = comp == 0 ? EventHandling.IGNORE : EventHandling.PROPAGATE; } else { // in this case we received and event that might be in some edge case that was // already used in reconciler or after that, but before our updated resource version. // That would be hard to distinguish, so for those we are propagating the event further. log.debug("Received intermediate event."); - result = EventHandling.INTERMEDIATE; } } var au = activeUpdates.get(resourceId); @@ -144,7 +141,7 @@ private synchronized EventHandling onEvent( log.debug("Recording relevant event"); au.addRelatedEvent( new GenericResourceEvent(action, resource, prevResourceVersion, unknownState)); - return EventHandling.DEFER; + return EventHandling.IGNORE; } else { return result; } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index 25ae10c321..4e3e9dacf2 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -118,7 +118,7 @@ void skipsEventPropagation() { .thenReturn(Optional.of(testDeployment())); when(temporaryResourceCache.onAddOrUpdateEvent(any(), any(), any())) - .thenReturn(EventHandling.OBSOLETE); + .thenReturn(EventHandling.IGNORE); informerEventSource.onAdd(testDeployment()); informerEventSource.onUpdate(testDeployment(), testDeployment()); @@ -129,7 +129,7 @@ void skipsEventPropagation() { @Test void processEventPropagationWithoutAnnotation() { when(temporaryResourceCache.onAddOrUpdateEvent(any(), any(), any())) - .thenReturn(EventHandling.NEW); + .thenReturn(EventHandling.PROPAGATE); informerEventSource.onUpdate(testDeployment(), testDeployment()); verify(eventHandlerMock, times(1)).handleEvent(any()); @@ -138,7 +138,7 @@ void processEventPropagationWithoutAnnotation() { @Test void processEventPropagationWithIncorrectAnnotation() { when(temporaryResourceCache.onAddOrUpdateEvent(any(), any(), any())) - .thenReturn(EventHandling.NEW); + .thenReturn(EventHandling.PROPAGATE); informerEventSource.onAdd( new DeploymentBuilder(testDeployment()) .editMetadata() @@ -152,7 +152,7 @@ void processEventPropagationWithIncorrectAnnotation() { @Test void propagatesIntermediateEventHandling() { when(temporaryResourceCache.onAddOrUpdateEvent(any(), any(), any())) - .thenReturn(EventHandling.INTERMEDIATE); + .thenReturn(EventHandling.PROPAGATE); informerEventSource.onUpdate(testDeployment(), testDeployment()); verify(eventHandlerMock, times(1)).handleEvent(any()); diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java index 2917367333..84530066e1 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java @@ -215,14 +215,14 @@ void putBeforeEvent() { // first ensure an event is not known var result = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.ADDED, testResource, null); - assertThat(result).isEqualTo(EventHandling.NEW); + assertThat(result).isEqualTo(EventHandling.PROPAGATE); var nextResource = testResource(); nextResource.getMetadata().setResourceVersion("3"); temporaryResourceCache.putResource(nextResource); result = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.UPDATED, nextResource, null); - assertThat(result).isEqualTo(EventHandling.OBSOLETE); + assertThat(result).isEqualTo(EventHandling.IGNORE); } @Test @@ -232,7 +232,7 @@ void putBeforeEventWithEventFiltering() { // first ensure an event is not known var result = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.ADDED, testResource, null); - assertThat(result).isEqualTo(EventHandling.NEW); + assertThat(result).isEqualTo(EventHandling.PROPAGATE); latestSyncVersion = RESOURCE_VERSION; var nextResource = testResource(); @@ -245,7 +245,7 @@ void putBeforeEventWithEventFiltering() { latestSyncVersion = "3"; result = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.UPDATED, nextResource, null); - assertThat(result).isEqualTo(EventHandling.OBSOLETE); + assertThat(result).isEqualTo(EventHandling.IGNORE); } @Test @@ -255,7 +255,7 @@ void putAfterEventWithEventFilteringNoPost() { // first ensure an event is not known var result = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.ADDED, testResource, null); - assertThat(result).isEqualTo(EventHandling.NEW); + assertThat(result).isEqualTo(EventHandling.PROPAGATE); var nextResource = testResource(); nextResource.getMetadata().setResourceVersion("3"); @@ -266,7 +266,7 @@ void putAfterEventWithEventFilteringNoPost() { temporaryResourceCache.onAddOrUpdateEvent( ResourceAction.UPDATED, nextResource, testResource); // the result is deferred - assertThat(result).isEqualTo(EventHandling.DEFER); + assertThat(result).isEqualTo(EventHandling.IGNORE); temporaryResourceCache.putResource(nextResource); var postEvent = temporaryResourceCache.doneEventFilterModify(resourceId); @@ -287,7 +287,7 @@ void putAfterEventWithEventFilteringWithPost() { nextResource.getMetadata().setResourceVersion("4"); var result = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.ADDED, nextResource, null); - assertThat(result).isEqualTo(EventHandling.DEFER); + assertThat(result).isEqualTo(EventHandling.IGNORE); var postEvent = temporaryResourceCache.doneEventFilterModify(resourceId); @@ -310,7 +310,7 @@ void intermediateEventPropagatedWhenNoActiveUpdate() { var result = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.UPDATED, olderEvent, null); - assertThat(result).isEqualTo(EventHandling.INTERMEDIATE); + assertThat(result).isEqualTo(EventHandling.PROPAGATE); } @Test @@ -329,7 +329,7 @@ void intermediateEventRecorded() { var result = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.UPDATED, external, null); - assertThat(result).isEqualTo(EventHandling.DEFER); + assertThat(result).isEqualTo(EventHandling.IGNORE); } @Test @@ -353,7 +353,7 @@ void intermediateEventDeferredWhenItIsOurOwnIntermediateUpdate() { var result = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.UPDATED, ourFirst, null); - assertThat(result).isEqualTo(EventHandling.DEFER); + assertThat(result).isEqualTo(EventHandling.IGNORE); } @Test From 41f6fcb937447b370ffd63e2c36099495c7fbfb2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Tue, 9 Jun 2026 11:16:19 +0200 Subject: [PATCH 08/12] unit tests fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../informer/InformerEventSourceTest.java | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index 4e3e9dacf2..a7d5423fb4 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -285,16 +285,16 @@ void filterAddEventBeforeUpdate() { @Test void multipleCachingFilteringUpdates() { withRealTemporaryResourceCache(); - CountDownLatch latch = sendForEventFilteringUpdate(2); + CountDownLatch latch = sendForEventFilteringUpdate(3); CountDownLatch latch2 = - sendForEventFilteringUpdate(withResourceVersion(testDeployment(), 2), 3); + sendForEventFilteringUpdate(withResourceVersion(testDeployment(), 3), 4); informerEventSource.onUpdate( - deploymentWithResourceVersion(1), deploymentWithResourceVersion(2)); + deploymentWithResourceVersion(2), deploymentWithResourceVersion(3)); latch.countDown(); latch2.countDown(); informerEventSource.onUpdate( - deploymentWithResourceVersion(2), deploymentWithResourceVersion(3)); + deploymentWithResourceVersion(3), deploymentWithResourceVersion(4)); assertNoEventProduced(); } @@ -303,15 +303,15 @@ void multipleCachingFilteringUpdates() { void multipleCachingFilteringUpdates_variant2() { withRealTemporaryResourceCache(); - CountDownLatch latch = sendForEventFilteringUpdate(2); + CountDownLatch latch = sendForEventFilteringUpdate(3); CountDownLatch latch2 = - sendForEventFilteringUpdate(withResourceVersion(testDeployment(), 2), 3); + sendForEventFilteringUpdate(withResourceVersion(testDeployment(), 3), 4); informerEventSource.onUpdate( - deploymentWithResourceVersion(1), deploymentWithResourceVersion(2)); + deploymentWithResourceVersion(2), deploymentWithResourceVersion(3)); latch.countDown(); informerEventSource.onUpdate( - deploymentWithResourceVersion(2), deploymentWithResourceVersion(3)); + deploymentWithResourceVersion(3), deploymentWithResourceVersion(4)); latch2.countDown(); assertNoEventProduced(); @@ -321,15 +321,15 @@ void multipleCachingFilteringUpdates_variant2() { void multipleCachingFilteringUpdates_variant3() { withRealTemporaryResourceCache(); - CountDownLatch latch = sendForEventFilteringUpdate(2); + CountDownLatch latch = sendForEventFilteringUpdate(3); CountDownLatch latch2 = - sendForEventFilteringUpdate(withResourceVersion(testDeployment(), 2), 3); + sendForEventFilteringUpdate(withResourceVersion(testDeployment(), 3), 4); latch.countDown(); - informerEventSource.onUpdate( - deploymentWithResourceVersion(1), deploymentWithResourceVersion(2)); informerEventSource.onUpdate( deploymentWithResourceVersion(2), deploymentWithResourceVersion(3)); + informerEventSource.onUpdate( + deploymentWithResourceVersion(4), deploymentWithResourceVersion(4)); latch2.countDown(); assertNoEventProduced(); From 723e03c0b0349548ea77cbd656e934a0c017ba13 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Tue, 9 Jun 2026 12:57:12 +0200 Subject: [PATCH 09/12] small fix, test repeats MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../informer/ManagedInformerEventSource.java | 16 +++++----- .../informer/TemporaryResourceCache.java | 8 ++--- .../informer/InformerEventSourceTest.java | 30 ++++++++++--------- 3 files changed, 29 insertions(+), 25 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 52fd296773..9dc487215a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -100,18 +100,20 @@ public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator< try { temporaryResourceCache.startEventFilteringModify(id); updatedResource = updateMethod.apply(resourceToUpdate); - log.debug("Resource update successful"); handleRecentResourceUpdate(id, updatedResource, resourceToUpdate); + log.debug("Caching resource update successful"); return updatedResource; } finally { var res = temporaryResourceCache.doneEventFilterModify(id); res.ifPresentOrElse( - r -> - handleEvent( - r.getAction(), - (R) r.getResource().orElseThrow(), - (R) r.getPreviousResource().orElse(null), - r.getLastStateUnknow()), + r -> { + log.debug("Propagating not own event"); + handleEvent( + r.getAction(), + (R) r.getResource().orElseThrow(), + (R) r.getPreviousResource().orElse(null), + r.getLastStateUnknow()); + }, () -> log.debug("No new event present after the filtering update")); } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index b9f50c5ac9..b98837a48b 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -143,6 +143,7 @@ private synchronized EventHandling onEvent( new GenericResourceEvent(action, resource, prevResourceVersion, unknownState)); return EventHandling.IGNORE; } else { + log.debug("No active recornding, event handling: {}", result); return result; } } @@ -194,6 +195,9 @@ public synchronized void putResource(T newResource) { // also make sure that we're later than the existing temporary entry var cachedResource = getResourceFromCache(resourceId).orElse(null); + Optional.ofNullable(activeUpdates.get(resourceId)) + .ifPresent( + au -> au.addToOwnResourceVersions(newResource.getMetadata().getResourceVersion())); if (cachedResource == null || ReconcilerUtilsInternal.compareResourceVersions(newResource, cachedResource) > 0) { @@ -202,10 +206,6 @@ public synchronized void putResource(T newResource) { newResource.getMetadata().getResourceVersion(), resourceId); cache.put(resourceId, newResource); - var au = activeUpdates.get(resourceId); - if (au != null) { - au.addToOwnResourceVersions(newResource.getMetadata().getResourceVersion()); - } } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index a7d5423fb4..f02082d7cd 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -25,6 +25,7 @@ import java.util.stream.Stream; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.RepeatedTest; import org.junit.jupiter.api.Test; import io.fabric8.kubernetes.api.model.ObjectMeta; @@ -72,6 +73,7 @@ class InformerEventSourceTest { private static final String PREV_RESOURCE_VERSION = "0"; private static final String DEFAULT_RESOURCE_VERSION = "2"; + public static final int REPEAT_COUNT = 10; private InformerEventSource informerEventSource; private final KubernetesClient clientMock = MockKubernetesClient.client(Deployment.class); @@ -214,7 +216,7 @@ void filtersOnDeleteEvents() { verify(eventHandlerMock, never()).handleEvent(any()); } - @Test + @RepeatedTest(REPEAT_COUNT) void handlesPrevResourceVersionForUpdate() { withRealTemporaryResourceCache(); @@ -226,7 +228,7 @@ void handlesPrevResourceVersionForUpdate() { expectHandleAddEvent(2, 1); } - @Test + @RepeatedTest(REPEAT_COUNT) void handlesPrevResourceVersionForUpdateInCaseOfException() { withRealTemporaryResourceCache(); @@ -244,7 +246,7 @@ void handlesPrevResourceVersionForUpdateInCaseOfException() { expectHandleAddEvent(2, 1); } - @Test + @RepeatedTest(REPEAT_COUNT) void handlesPrevResourceVersionForUpdateInCaseOfMultipleUpdates() { withRealTemporaryResourceCache(); @@ -259,7 +261,7 @@ void handlesPrevResourceVersionForUpdateInCaseOfMultipleUpdates() { expectHandleAddEvent(4, 2); } - @Test + @RepeatedTest(REPEAT_COUNT) void doesNotPropagateEventIfReceivedBeforeUpdate() { withRealTemporaryResourceCache(); @@ -271,7 +273,7 @@ void doesNotPropagateEventIfReceivedBeforeUpdate() { assertNoEventProduced(); } - @Test + @RepeatedTest(REPEAT_COUNT) void filterAddEventBeforeUpdate() { withRealTemporaryResourceCache(); @@ -282,7 +284,7 @@ void filterAddEventBeforeUpdate() { expectHandleAddEvent(2); } - @Test + @RepeatedTest(REPEAT_COUNT) void multipleCachingFilteringUpdates() { withRealTemporaryResourceCache(); CountDownLatch latch = sendForEventFilteringUpdate(3); @@ -299,7 +301,7 @@ void multipleCachingFilteringUpdates() { assertNoEventProduced(); } - @Test + @RepeatedTest(REPEAT_COUNT) void multipleCachingFilteringUpdates_variant2() { withRealTemporaryResourceCache(); @@ -317,7 +319,7 @@ void multipleCachingFilteringUpdates_variant2() { assertNoEventProduced(); } - @Test + @RepeatedTest(REPEAT_COUNT) void multipleCachingFilteringUpdates_variant3() { withRealTemporaryResourceCache(); @@ -335,7 +337,7 @@ void multipleCachingFilteringUpdates_variant3() { assertNoEventProduced(); } - @Test + @RepeatedTest(REPEAT_COUNT) void multipleCachingFilteringUpdates_variant4() { withRealTemporaryResourceCache(); @@ -353,7 +355,7 @@ void multipleCachingFilteringUpdates_variant4() { assertNoEventProduced(); } - @Test + @RepeatedTest(REPEAT_COUNT) void ghostCheckRemovesCachedResourceDuringFilteringUpdate() { var mes = mock(ManagedInformerEventSource.class); var mim = mock(InformerManager.class); @@ -383,7 +385,7 @@ void ghostCheckRemovesCachedResourceDuringFilteringUpdate() { assertThat(temporaryResourceCache.getResourceFromCache(resourceId)).isEmpty(); } - @Test + @RepeatedTest(REPEAT_COUNT) void ghostCheckRunsConcurrentlyWithPutResource() { var mes = mock(ManagedInformerEventSource.class); var mim = mock(InformerManager.class); @@ -414,7 +416,7 @@ void ghostCheckRunsConcurrentlyWithPutResource() { .isPresent(); } - @Test + @RepeatedTest(REPEAT_COUNT) void filteringUpdateAndGhostCheckWithNamespaceChange() { var mes = mock(ManagedInformerEventSource.class); var mim = mock(InformerManager.class); @@ -448,7 +450,7 @@ void filteringUpdateAndGhostCheckWithNamespaceChange() { assertThat(temporaryResourceCache.getResourceFromCache(resourceId)).isEmpty(); } - @Test + @RepeatedTest(REPEAT_COUNT) void propagatesIntermediateEventForExternalUpdateDuringFiltering() { // Causal-dependency fix: another controller updated the resource between our read // and our write. The informer delivers that update during our active filter; since @@ -474,7 +476,7 @@ void propagatesIntermediateEventForExternalUpdateDuringFiltering() { expectHandleAddEvent(3, 2); } - @Test + @RepeatedTest(REPEAT_COUNT) void doesNotPropagateIntermediateEventForOurOwnIntermediateUpdate() { // Two consecutive own writes (rv 3 then rv 4) within an open filter window: an // event for the older own version must be deferred since it's recognized as our own. From d0f5eef34edaaa79b58adf06b6ec95a95b3fe4d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Tue, 9 Jun 2026 14:31:29 +0200 Subject: [PATCH 10/12] improvements and releated unit tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../controller/ControllerEventSource.java | 27 ++++-- .../source/informer/EventFilterDetails.java | 33 +++++-- .../source/informer/InformerEventSource.java | 15 +-- .../informer/TemporaryResourceCache.java | 56 +++++++---- .../controller/ControllerEventSourceTest.java | 1 + .../informer/InformerEventSourceTest.java | 93 ++++++------------- .../informer/TemporaryResourceCacheTest.java | 36 ++++--- 7 files changed, 143 insertions(+), 118 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java index 1ce8ce0620..7afb62ea64 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSource.java @@ -31,8 +31,8 @@ import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.filter.OnDeleteFilter; import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter; +import io.javaoperatorsdk.operator.processing.event.source.informer.GenericResourceEvent; import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource; -import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling; import static io.javaoperatorsdk.operator.ReconcilerUtilsInternal.handleKubernetesClientException; import static io.javaoperatorsdk.operator.processing.event.source.controller.InternalEventFilters.*; @@ -141,11 +141,22 @@ private void handleOnAddOrUpdate( ResourceAction action, T oldCustomResource, T newCustomResource) { var handling = temporaryResourceCache.onAddOrUpdateEvent(action, newCustomResource, oldCustomResource); - if (handling == EventHandling.PROPAGATE) { - handleEvent(action, newCustomResource, oldCustomResource, null); - } else if (log.isDebugEnabled()) { - log.debug("{} event propagation for action: {}", handling, action); - } + handling.ifPresentOrElse( + this::handleEvent, + () -> { + if (log.isDebugEnabled()) { + log.debug("{} event propagation for action: {}", handling, action); + } + }); + } + + @SuppressWarnings("unchecked") + private void handleEvent(GenericResourceEvent r) { + handleEvent( + r.getAction(), + (T) r.getResource().orElseThrow(), + (T) r.getPreviousResource().orElse(null), + r.getLastStateUnknow()); } @Override @@ -154,10 +165,10 @@ public synchronized void onDelete(T resource, boolean deletedFinalStateUnknown) resource, ResourceAction.DELETED, () -> { - temporaryResourceCache.onDeleteEvent(resource, deletedFinalStateUnknown); + var res = temporaryResourceCache.onDeleteEvent(resource, deletedFinalStateUnknown); // delete event is quite special here, that requires special care, since we clean up // caches on delete event. - handleEvent(ResourceAction.DELETED, resource, null, deletedFinalStateUnknown); + res.ifPresent(this::handleEvent); }); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java index fccc7b479c..b9d12f9f10 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/EventFilterDetails.java @@ -24,13 +24,14 @@ import java.util.stream.Collectors; import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.ReconcilerUtilsInternal; import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; class EventFilterDetails { private int activeUpdates = 0; - private List relatedEvents = new ArrayList<>(); - private Set allOwnResourceVersions = new HashSet<>(); + private final List relatedEvents = new ArrayList<>(5); + private final Set allOwnResourceVersions = new HashSet<>(5); public void increaseActiveUpdates() { activeUpdates = activeUpdates + 1; @@ -50,6 +51,10 @@ public int getActiveUpdates() { return activeUpdates; } + public boolean isNoActiveUpdate() { + return activeUpdates == 0; + } + void addToOwnResourceVersions(String updateVersion) { allOwnResourceVersions.add(updateVersion); } @@ -62,10 +67,7 @@ public Optional prepareSummaryEventIfNotOwnEventsPresent() if (relatedEvents.isEmpty()) { return Optional.empty(); } - if (allOwnResourceVersions.containsAll( - relatedEvents.stream() - .map(e -> e.getResource().orElseThrow().getMetadata().getResourceVersion()) - .collect(Collectors.toSet()))) { + if (allOwnResourceVersions.containsAll(relatedEventResourceVersions())) { return Optional.empty(); } var deleteEvent = @@ -87,4 +89,23 @@ public Optional prepareSummaryEventIfNotOwnEventsPresent() firstResource, null)); } + + private Set relatedEventResourceVersions() { + return relatedEvents.stream() + .map(e -> e.getResource().orElseThrow().getMetadata().getResourceVersion()) + .collect(Collectors.toSet()); + } + + public boolean newerOrEqualEventReceivedForOwnLastUpdate() { + if (allOwnResourceVersions.isEmpty()) { + return true; + } + String lastOwn = + allOwnResourceVersions.stream() + .reduce((a, b) -> ReconcilerUtilsInternal.compareResourceVersions(a, b) >= 0 ? a : b) + .orElseThrow(); + return relatedEvents.stream() + .map(e -> e.getResource().orElseThrow().getMetadata().getResourceVersion()) + .anyMatch(rv -> ReconcilerUtilsInternal.compareResourceVersions(rv, lastOwn) >= 0); + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index afbf0a33ab..d0cec2e112 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -33,7 +33,6 @@ import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper; import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; -import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling; /** * Wraps informer(s) so they are connected to the eventing system of the framework. Note that since @@ -154,20 +153,24 @@ private synchronized void onAddOrUpdate(ResourceAction action, R newObject, R ol var eventHandling = temporaryResourceCache.onAddOrUpdateEvent(action, newObject, oldObject); - if (eventHandling != EventHandling.PROPAGATE) { - log.debug( - "{} event propagation", eventHandling == EventHandling.IGNORE ? "Deferring" : "Skipping"); + if (eventHandling.isEmpty()) { + log.debug("Deferring event propagation"); } else if (eventAcceptedByFilter(action, newObject, oldObject)) { log.debug( "Propagating event for {}, resource with same version not result of a our update.", action); - propagateEvent(newObject); + var event = eventHandling.get(); + handleEvent( + event.getAction(), + (R) event.getResource().orElseThrow(), + (R) event.getPreviousResource().orElse(null), + event.getLastStateUnknow()); } else { log.debug("Event filtered out for operation: {}, resourceID: {}", action, resourceID); } } - private void propagateEvent(R object) { + protected void propagateEvent(R object) { var primaryResourceIdSet = configuration().getSecondaryToPrimaryMapper().toPrimaryResourceIDs(object); if (primaryResourceIdSet.isEmpty()) { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index b98837a48b..7eadbfb67e 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -85,41 +85,44 @@ public synchronized Optional doneEventFilterModify(Resourc return Optional.empty(); } var ed = activeUpdates.get(resourceID); - if (ed == null || !ed.decreaseActiveUpdates()) { - log.debug( - "Active updates {} for resource id: {}", - ed != null ? ed.getActiveUpdates() : 0, - resourceID); + if (!ed.decreaseActiveUpdates()) { + log.debug("Active updates {} for resource id: {}", ed.getActiveUpdates(), resourceID); + return Optional.empty(); + } + + if (ed.newerOrEqualEventReceivedForOwnLastUpdate()) { + activeUpdates.remove(resourceID); + return ed.prepareSummaryEventIfNotOwnEventsPresent(); + } else { return Optional.empty(); } - activeUpdates.remove(resourceID); - return ed.prepareSummaryEventIfNotOwnEventsPresent(); } - public void onDeleteEvent(T resource, boolean unknownState) { - onEvent(ResourceAction.DELETED, resource, null, unknownState); + public Optional onDeleteEvent(T resource, boolean unknownState) { + return onEvent(ResourceAction.DELETED, resource, null, unknownState); } - public EventHandling onAddOrUpdateEvent( + public Optional onAddOrUpdateEvent( ResourceAction action, T resource, T prevResourceVersion) { - return onEvent(action, resource, prevResourceVersion, false); + return onEvent(action, resource, prevResourceVersion, null); } - private synchronized EventHandling onEvent( - ResourceAction action, T resource, T prevResourceVersion, boolean unknownState) { + private synchronized Optional onEvent( + ResourceAction action, T resource, T prevResourceVersion, Boolean unknownState) { + GenericResourceEvent actualEvent = + toGenericResourceEvent(action, resource, prevResourceVersion, unknownState); if (!comparableResourceVersions) { - return EventHandling.PROPAGATE; + return Optional.of(actualEvent); } - var resourceId = ResourceID.fromResource(resource); if (log.isDebugEnabled()) { log.debug("Processing event"); } var cached = cache.get(resourceId); - EventHandling result = EventHandling.PROPAGATE; + Optional result = Optional.of(actualEvent); if (cached != null) { int comp = ReconcilerUtilsInternal.compareResourceVersions(resource, cached); - if (comp >= 0 || unknownState) { + if (comp >= 0 || Boolean.TRUE.equals(unknownState)) { log.debug( "Removing resource from temp cache. comparison: {} unknown state: {}", comp, @@ -128,7 +131,9 @@ private synchronized EventHandling onEvent( // we propagate event only for our update or newer other can be discarded since we know we // will receive // additional event - result = comp == 0 ? EventHandling.IGNORE : EventHandling.PROPAGATE; + if (comp == 0) { + result = Optional.empty(); + } } else { // in this case we received and event that might be in some edge case that was // already used in reconciler or after that, but before our updated resource version. @@ -141,13 +146,24 @@ private synchronized EventHandling onEvent( log.debug("Recording relevant event"); au.addRelatedEvent( new GenericResourceEvent(action, resource, prevResourceVersion, unknownState)); - return EventHandling.IGNORE; + // this is to cover the situation when we finished the filtering and caching update but + // did not receive events for our own updates yet. + if (au.isNoActiveUpdate() && au.newerOrEqualEventReceivedForOwnLastUpdate()) { + activeUpdates.remove(resourceId); + return au.prepareSummaryEventIfNotOwnEventsPresent(); + } + return Optional.empty(); } else { - log.debug("No active recornding, event handling: {}", result); + log.debug("No active recording, event handling: {}", result); return result; } } + static GenericResourceEvent toGenericResourceEvent( + ResourceAction action, T resource, T prevResourceVersion, Boolean unknownState) { + return new GenericResourceEvent(action, resource, prevResourceVersion, unknownState); + } + /** put the item into the cache if it's for a later state than what has already been observed. */ public synchronized void putResource(T newResource) { if (!comparableResourceVersions) { diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java index b84b7992b7..a7765da4fa 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerEventSourceTest.java @@ -250,6 +250,7 @@ void propagatesIntermediateEventForExternalUpdateDuringFiltering() { // external update with rv 3 (older than our cached rv 4) — must propagate source.onUpdate(testResourceWithVersion(2), testResourceWithVersion(3)); latch2.countDown(); + source.onUpdate(testResourceWithVersion(3), testResourceWithVersion(5)); await().untilAsserted(() -> verify(eventHandler, times(1)).handleEvent(any())); } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index f02082d7cd..65bb3f0fea 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -30,7 +30,6 @@ import io.fabric8.kubernetes.api.model.ObjectMeta; import io.fabric8.kubernetes.api.model.apps.Deployment; -import io.fabric8.kubernetes.api.model.apps.DeploymentBuilder; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.KubernetesClientException; import io.javaoperatorsdk.operator.MockKubernetesClient; @@ -47,7 +46,6 @@ import io.javaoperatorsdk.operator.processing.event.source.EventFilterTestUtils; import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper; -import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling; import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET; @@ -114,52 +112,6 @@ public synchronized void start() {} informerEventSource.setTemporalResourceCache(temporaryResourceCache); } - @Test - void skipsEventPropagation() { - when(temporaryResourceCache.getResourceFromCache(any())) - .thenReturn(Optional.of(testDeployment())); - - when(temporaryResourceCache.onAddOrUpdateEvent(any(), any(), any())) - .thenReturn(EventHandling.IGNORE); - - informerEventSource.onAdd(testDeployment()); - informerEventSource.onUpdate(testDeployment(), testDeployment()); - - verify(eventHandlerMock, never()).handleEvent(any()); - } - - @Test - void processEventPropagationWithoutAnnotation() { - when(temporaryResourceCache.onAddOrUpdateEvent(any(), any(), any())) - .thenReturn(EventHandling.PROPAGATE); - informerEventSource.onUpdate(testDeployment(), testDeployment()); - - verify(eventHandlerMock, times(1)).handleEvent(any()); - } - - @Test - void processEventPropagationWithIncorrectAnnotation() { - when(temporaryResourceCache.onAddOrUpdateEvent(any(), any(), any())) - .thenReturn(EventHandling.PROPAGATE); - informerEventSource.onAdd( - new DeploymentBuilder(testDeployment()) - .editMetadata() - .addToAnnotations(InformerEventSource.PREVIOUS_ANNOTATION_KEY, "invalid") - .endMetadata() - .build()); - - verify(eventHandlerMock, times(1)).handleEvent(any()); - } - - @Test - void propagatesIntermediateEventHandling() { - when(temporaryResourceCache.onAddOrUpdateEvent(any(), any(), any())) - .thenReturn(EventHandling.PROPAGATE); - informerEventSource.onUpdate(testDeployment(), testDeployment()); - - verify(eventHandlerMock, times(1)).handleEvent(any()); - } - @Test void propagateEventAndRemoveResourceFromTempCacheIfResourceVersionMismatch() { withRealTemporaryResourceCache(); @@ -224,8 +176,10 @@ void handlesPrevResourceVersionForUpdate() { informerEventSource.onUpdate( deploymentWithResourceVersion(1), deploymentWithResourceVersion(2)); latch.countDown(); + informerEventSource.onUpdate( + deploymentWithResourceVersion(2), deploymentWithResourceVersion(3)); - expectHandleAddEvent(2, 1); + expectHandleAddEvent(3, 1); } @RepeatedTest(REPEAT_COUNT) @@ -273,17 +227,6 @@ void doesNotPropagateEventIfReceivedBeforeUpdate() { assertNoEventProduced(); } - @RepeatedTest(REPEAT_COUNT) - void filterAddEventBeforeUpdate() { - withRealTemporaryResourceCache(); - - CountDownLatch latch = sendForEventFilteringUpdate(3); - informerEventSource.onAdd(deploymentWithResourceVersion(2)); - latch.countDown(); - - expectHandleAddEvent(2); - } - @RepeatedTest(REPEAT_COUNT) void multipleCachingFilteringUpdates() { withRealTemporaryResourceCache(); @@ -355,6 +298,24 @@ void multipleCachingFilteringUpdates_variant4() { assertNoEventProduced(); } + @RepeatedTest(REPEAT_COUNT) + void multipleCachingFilteringUpdates_variant5() { + withRealTemporaryResourceCache(); + + CountDownLatch latch = sendForEventFilteringUpdate(3); + CountDownLatch latch2 = + sendForEventFilteringUpdate(withResourceVersion(testDeployment(), 3), 4); + latch.countDown(); + latch2.countDown(); + + informerEventSource.onUpdate( + deploymentWithResourceVersion(2), deploymentWithResourceVersion(3)); + informerEventSource.onUpdate( + deploymentWithResourceVersion(3), deploymentWithResourceVersion(4)); + + assertNoEventProduced(); + } + @RepeatedTest(REPEAT_COUNT) void ghostCheckRemovesCachedResourceDuringFilteringUpdate() { var mes = mock(ManagedInformerEventSource.class); @@ -470,10 +431,11 @@ void propagatesIntermediateEventForExternalUpdateDuringFiltering() { // external update with rv 3 (older than our cached rv 4) — must propagate informerEventSource.onUpdate( deploymentWithResourceVersion(2), deploymentWithResourceVersion(3)); - latch2.countDown(); + informerEventSource.onUpdate( + deploymentWithResourceVersion(4), deploymentWithResourceVersion(5)); - expectHandleAddEvent(3, 2); + expectHandleAddEvent(5, 2); } @RepeatedTest(REPEAT_COUNT) @@ -517,10 +479,9 @@ private void awaitCachedResourceVersion(ResourceID resourceId, String resourceVe private void assertNoEventProduced() { await() - .pollDelay(Duration.ofMillis(50)) - .timeout(Duration.ofMillis(51)) - .untilAsserted( - () -> verify(informerEventSource, never()).handleEvent(any(), any(), any(), any())); + .pollDelay(Duration.ofMillis(70)) + .timeout(Duration.ofMillis(71)) + .untilAsserted(() -> verify(informerEventSource, never()).propagateEvent(any())); } private void expectHandleAddEvent(int newResourceVersion) { diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java index 84530066e1..edae142770 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java @@ -25,7 +25,6 @@ import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; -import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache.EventHandling; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -215,14 +214,14 @@ void putBeforeEvent() { // first ensure an event is not known var result = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.ADDED, testResource, null); - assertThat(result).isEqualTo(EventHandling.PROPAGATE); + assertThat(result).isPresent(); var nextResource = testResource(); nextResource.getMetadata().setResourceVersion("3"); temporaryResourceCache.putResource(nextResource); result = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.UPDATED, nextResource, null); - assertThat(result).isEqualTo(EventHandling.IGNORE); + assertThat(result).isEmpty(); } @Test @@ -232,7 +231,7 @@ void putBeforeEventWithEventFiltering() { // first ensure an event is not known var result = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.ADDED, testResource, null); - assertThat(result).isEqualTo(EventHandling.PROPAGATE); + assertThat(result).isPresent(); latestSyncVersion = RESOURCE_VERSION; var nextResource = testResource(); @@ -245,7 +244,7 @@ void putBeforeEventWithEventFiltering() { latestSyncVersion = "3"; result = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.UPDATED, nextResource, null); - assertThat(result).isEqualTo(EventHandling.IGNORE); + assertThat(result).isEmpty(); } @Test @@ -255,7 +254,14 @@ void putAfterEventWithEventFilteringNoPost() { // first ensure an event is not known var result = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.ADDED, testResource, null); - assertThat(result).isEqualTo(EventHandling.PROPAGATE); + assertThat(result) + .hasValueSatisfying( + v -> { + assertThat(v.getAction()).isEqualTo(ResourceAction.ADDED); + assertThat(v.getPreviousResource()).isEmpty(); + assertThat(v.getResource()).contains(testResource); + assertThat(v.getLastStateUnknow()).isNull(); + }); var nextResource = testResource(); nextResource.getMetadata().setResourceVersion("3"); @@ -265,8 +271,8 @@ void putAfterEventWithEventFilteringNoPost() { result = temporaryResourceCache.onAddOrUpdateEvent( ResourceAction.UPDATED, nextResource, testResource); - // the result is deferred - assertThat(result).isEqualTo(EventHandling.IGNORE); + assertThat(result).isEmpty(); + temporaryResourceCache.putResource(nextResource); var postEvent = temporaryResourceCache.doneEventFilterModify(resourceId); @@ -287,7 +293,7 @@ void putAfterEventWithEventFilteringWithPost() { nextResource.getMetadata().setResourceVersion("4"); var result = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.ADDED, nextResource, null); - assertThat(result).isEqualTo(EventHandling.IGNORE); + assertThat(result).isEmpty(); var postEvent = temporaryResourceCache.doneEventFilterModify(resourceId); @@ -310,7 +316,13 @@ void intermediateEventPropagatedWhenNoActiveUpdate() { var result = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.UPDATED, olderEvent, null); - assertThat(result).isEqualTo(EventHandling.PROPAGATE); + assertThat(result) + .hasValueSatisfying( + e -> { + assertThat(e.getResource().orElseThrow()).isEqualTo(olderEvent); + assertThat(e.getPreviousResource()).isNotPresent(); + assertThat(e.getAction()).isEqualTo(ResourceAction.UPDATED); + }); } @Test @@ -329,7 +341,7 @@ void intermediateEventRecorded() { var result = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.UPDATED, external, null); - assertThat(result).isEqualTo(EventHandling.IGNORE); + assertThat(result).isEmpty(); } @Test @@ -353,7 +365,7 @@ void intermediateEventDeferredWhenItIsOurOwnIntermediateUpdate() { var result = temporaryResourceCache.onAddOrUpdateEvent(ResourceAction.UPDATED, ourFirst, null); - assertThat(result).isEqualTo(EventHandling.IGNORE); + assertThat(result).isEmpty(); } @Test From 5ea96dc22916d64dc30b95e61b22faec555ac303 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Tue, 9 Jun 2026 14:58:39 +0200 Subject: [PATCH 11/12] cleanup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../event/source/informer/TemporaryResourceCache.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index 7eadbfb67e..51ca7516ba 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -60,11 +60,6 @@ public class TemporaryResourceCache { private final ManagedInformerEventSource managedInformerEventSource; - public enum EventHandling { - IGNORE, - PROPAGATE - } - public TemporaryResourceCache( boolean comparableResourceVersions, ManagedInformerEventSource managedInformerEventSource) { From 3e986ab65695e29f8f173b7b45956b52cbbcebec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Tue, 9 Jun 2026 21:48:25 +0200 Subject: [PATCH 12/12] improvements on edge cases MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../informer/TemporaryResourceCache.java | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index 51ca7516ba..c7c301c94a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -80,8 +80,11 @@ public synchronized Optional doneEventFilterModify(Resourc return Optional.empty(); } var ed = activeUpdates.get(resourceID); - if (!ed.decreaseActiveUpdates()) { - log.debug("Active updates {} for resource id: {}", ed.getActiveUpdates(), resourceID); + if (ed == null || !ed.decreaseActiveUpdates()) { + log.debug( + "Active updates {} for resource id: {}", + ed == null ? null : ed.getActiveUpdates(), + resourceID); return Optional.empty(); } @@ -232,7 +235,7 @@ private String getLastSyncResourceVersion(String namespace) { * explicitly add resources to this cache. Those are cleaned up by this check, which is triggered * by the informer's onList callback. */ - public void checkGhostResources() { + public synchronized void checkGhostResources() { log.debug("Checking for ghost resources."); var iterator = cache.entrySet().iterator(); while (iterator.hasNext()) { @@ -247,19 +250,18 @@ public void checkGhostResources() { e.getKey(), ns); iterator.remove(); + activeUpdates.remove(e.getKey()); continue; } if ((ReconcilerUtilsInternal.compareResourceVersions( e.getValue().getMetadata().getResourceVersion(), getLastSyncResourceVersion(ns)) < 0) // making sure we have the situation where resource is missing from the cache - && managedInformerEventSource - .manager() - .get(ResourceID.fromResource(e.getValue())) - .isEmpty()) { + && managedInformerEventSource.manager().get(e.getKey()).isEmpty()) { + log.debug("Removing ghost resource with ID: {}", e.getKey()); iterator.remove(); + activeUpdates.remove(e.getKey()); managedInformerEventSource.handleEvent(ResourceAction.DELETED, e.getValue(), null, true); - log.debug("Removing ghost resource with ID: {}", e.getKey()); } } }