Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions common/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,7 @@ event.subscribe = {
enable = false

native = {
# start the native ZMQ queue; if `path` is also set, the event plugin runs alongside it
useNativeQueue = true
bindport = 5555
sendqueuelength = 1000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ public class EventPluginLoader {
@Getter
private boolean useNativeQueue = false;

@Getter
private boolean useEventPlugin = false;

public static EventPluginLoader getInstance() {
if (Objects.isNull(instance)) {
synchronized (EventPluginLoader.class) {
Expand Down Expand Up @@ -234,12 +237,25 @@ public boolean start(EventPluginConfig config) {
this.triggerConfigList = config.getTriggerConfigList();

useNativeQueue = config.isUseNativeQueue();
useEventPlugin = StringUtils.hasText(config.getPluginPath());

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[SHOULD] @h45hc47, switching the "plugin enabled" condition from !useNativeQueue to "path is non-empty" introduces an upgrade regression: a node currently running with useNativeQueue = true that happens to still have a non-empty path in its config (the old docs said path is ignored when useNativeQueue = true, so such leftover config does exist) boots fine today as native-only; after the upgrade it will try to load that plugin because useEventPlugin = true, and if the plugin zip is missing/invalid, launchEventPlugin returns false -> start() returns false -> Manager.startEventSubscribing() throws TronError(EVENT_SUBSCRIBE_INIT), so the node fails to start outright.

Overall I'd recommend keeping the current behavior (native queue and plugin mutually exclusive); the existing design is fine as-is:

  • The "deliver to both" need can be solved cleanly outside the node: enable the native ZMQ queue and run a lightweight consumer that subscribes to ZMQ and persists to MongoDB. The node keeps a single sink and minimal responsibility, instead of coupling dual delivery into the core.
  • Mutual exclusivity is a safety property here, not a defect: it keeps plugin/MongoDB failures out of the node, so a failed plugin load or a MongoDB outage cannot affect the block-producing node. This PR removes that isolation (the startup failure above is the direct consequence) and also couples the plugin queue's back-pressure into the block-loading path (HistoryEventService / BlockEventLoad start throttling on the plugin queue even when the native queue is on), adding failure surface and coupling to a subsystem that is sensitive to block processing.
  • The trade-off doesn't pay off: the benefit is a niche scenario that already has a clean external alternative, while the cost is added coupling in a core subsystem, a new failure path that can prevent the node from starting, and the per-trigger overhead of an extra serialization plus dual dispatch on a hot path.

So I'd suggest not merging this for now and keeping the current mutually-exclusive semantics. If there's a strong need later, it should be redone as an explicit opt-in flag that guarantees zero behavior change for existing configs.


if (!useNativeQueue && !useEventPlugin) {
logger.error("no event output configured: native queue is off and plugin path is empty");
return false;
}

// Launch the plugin first so eventListeners are populated before any topic is
// registered. launchNativeQueue() also iterates the trigger config and (when the
// plugin is active) calls setPluginTopic(), which needs eventListeners to exist.
if (useEventPlugin && !launchEventPlugin(config)) {
return false;
}

if (config.isUseNativeQueue()) {
return launchNativeQueue(config);
if (useNativeQueue && !launchNativeQueue(config)) {
return false;
}

return launchEventPlugin(config);
return true;
}

private void setPluginConfig() {
Expand Down Expand Up @@ -271,7 +287,7 @@ private void setSingleTriggerConfig(TriggerConfig triggerConfig) {
blockLogTriggerSolidified = false;
}

if (!useNativeQueue) {
if (useEventPlugin) {
setPluginTopic(Trigger.BLOCK_TRIGGER, triggerConfig.getTopic());
}

Expand All @@ -291,7 +307,7 @@ private void setSingleTriggerConfig(TriggerConfig triggerConfig) {
transactionLogTriggerSolidified = false;
}

if (!useNativeQueue) {
if (useEventPlugin) {
setPluginTopic(Trigger.TRANSACTION_TRIGGER, triggerConfig.getTopic());
}

Expand All @@ -303,7 +319,7 @@ private void setSingleTriggerConfig(TriggerConfig triggerConfig) {
contractEventTriggerEnable = false;
}

if (!useNativeQueue) {
if (useEventPlugin) {
setPluginTopic(Trigger.CONTRACTEVENT_TRIGGER, triggerConfig.getTopic());
}

Expand All @@ -319,7 +335,7 @@ private void setSingleTriggerConfig(TriggerConfig triggerConfig) {
contractLogTriggerRedundancy = false;
}

if (!useNativeQueue) {
if (useEventPlugin) {
setPluginTopic(Trigger.CONTRACTLOG_TRIGGER, triggerConfig.getTopic());
}
} else if (EventPluginConfig.SOLIDITY_TRIGGER_NAME
Expand All @@ -329,7 +345,7 @@ private void setSingleTriggerConfig(TriggerConfig triggerConfig) {
} else {
solidityTriggerEnable = false;
}
if (!useNativeQueue) {
if (useEventPlugin) {
setPluginTopic(Trigger.SOLIDITY_TRIGGER, triggerConfig.getTopic());
}
} else if (EventPluginConfig.SOLIDITY_EVENT_NAME
Expand All @@ -340,7 +356,7 @@ private void setSingleTriggerConfig(TriggerConfig triggerConfig) {
solidityEventTriggerEnable = false;
}

if (!useNativeQueue) {
if (useEventPlugin) {
setPluginTopic(Trigger.SOLIDITY_EVENT_TRIGGER, triggerConfig.getTopic());
}
} else if (EventPluginConfig.SOLIDITY_LOG_NAME
Expand All @@ -354,7 +370,7 @@ private void setSingleTriggerConfig(TriggerConfig triggerConfig) {
solidityLogTriggerEnable = false;
solidityLogTriggerRedundancy = false;
}
if (!useNativeQueue) {
if (useEventPlugin) {
setPluginTopic(Trigger.SOLIDITY_LOG_TRIGGER, triggerConfig.getTopic());
}
}
Expand All @@ -364,7 +380,8 @@ public void postSolidityTrigger(SolidityTrigger trigger) {
if (useNativeQueue) {
NativeMessageQueue.getInstance()
.publishTrigger(toJsonString(trigger), trigger.getTriggerName());
} else {
}
if (useEventPlugin) {
eventListeners.forEach(listener ->
listener.handleSolidityTrigger(toJsonString(trigger)));
}
Expand Down Expand Up @@ -427,6 +444,9 @@ public synchronized boolean isContractLogTriggerRedundancy() {
}

private void setPluginTopic(int eventType, String topic) {
if (Objects.isNull(eventListeners)) {
return;
}
eventListeners.forEach(listener -> listener.setTopic(eventType, topic));
}

Expand Down Expand Up @@ -485,7 +505,8 @@ public void postBlockTrigger(BlockLogTrigger trigger) {
if (useNativeQueue) {
NativeMessageQueue.getInstance()
.publishTrigger(toJsonString(trigger), trigger.getTriggerName());
} else {
}
if (useEventPlugin) {
eventListeners.forEach(listener ->
listener.handleBlockEvent(toJsonString(trigger)));
}
Expand All @@ -495,7 +516,8 @@ public void postSolidityLogTrigger(ContractLogTrigger trigger) {
if (useNativeQueue) {
NativeMessageQueue.getInstance()
.publishTrigger(toJsonString(trigger), trigger.getTriggerName());
} else {
}
if (useEventPlugin) {
eventListeners.forEach(listener ->
listener.handleSolidityLogTrigger(toJsonString(trigger)));
}
Expand All @@ -505,7 +527,8 @@ public void postSolidityEventTrigger(ContractEventTrigger trigger) {
if (useNativeQueue) {
NativeMessageQueue.getInstance()
.publishTrigger(toJsonString(trigger), trigger.getTriggerName());
} else {
}
if (useEventPlugin) {
eventListeners.forEach(listener ->
listener.handleSolidityEventTrigger(toJsonString(trigger)));
}
Expand All @@ -515,7 +538,8 @@ public void postTransactionTrigger(TransactionLogTrigger trigger) {
if (useNativeQueue) {
NativeMessageQueue.getInstance()
.publishTrigger(toJsonString(trigger), trigger.getTriggerName());
} else {
}
if (useEventPlugin) {
eventListeners.forEach(listener -> listener.handleTransactionTrigger(toJsonString(trigger)));
}
}
Expand All @@ -524,7 +548,8 @@ public void postContractLogTrigger(ContractLogTrigger trigger) {
if (useNativeQueue) {
NativeMessageQueue.getInstance()
.publishTrigger(toJsonString(trigger), trigger.getTriggerName());
} else {
}
if (useEventPlugin) {
eventListeners.forEach(listener ->
listener.handleContractLogTrigger(toJsonString(trigger)));
}
Expand All @@ -534,14 +559,17 @@ public void postContractEventTrigger(ContractEventTrigger trigger) {
if (useNativeQueue) {
NativeMessageQueue.getInstance()
.publishTrigger(toJsonString(trigger), trigger.getTriggerName());
} else {
}
if (useEventPlugin) {
eventListeners.forEach(listener ->
listener.handleContractEventTrigger(toJsonString(trigger)));
}
}

public boolean isBusy() {
if (useNativeQueue) {
// Back-pressure only applies to the plugin's async queue. The native ZMQ queue
// has its own bounded send queue, so when the plugin is not active we never block.
if (!useEventPlugin) {
return false;
}
int queueSize = 0;
Expand Down
21 changes: 11 additions & 10 deletions framework/src/main/java/org/tron/core/config/args/Args.java
Original file line number Diff line number Diff line change
Expand Up @@ -372,16 +372,17 @@ private static void applyEventConfig(EventConfig ec) {
epc.setBindPort(nq.getBindport());
epc.setSendQueueLength(nq.getSendqueuelength());

if (!nq.isUseNativeQueue()) {
if (StringUtils.isNotEmpty(ec.getPath())) {
epc.setPluginPath(ec.getPath().trim());
}
if (StringUtils.isNotEmpty(ec.getServer())) {
epc.setServerAddress(ec.getServer().trim());
}
if (StringUtils.isNotEmpty(ec.getDbconfig())) {
epc.setDbConfig(ec.getDbconfig().trim());
}
// Plugin settings are always copied so the event plugin (e.g. MongoDB) can run
// alongside the native ZMQ queue. The native queue and the plugin are activated
// independently in EventPluginLoader (useNativeQueue + non-empty pluginPath).
if (StringUtils.isNotEmpty(ec.getPath())) {
epc.setPluginPath(ec.getPath().trim());
}
if (StringUtils.isNotEmpty(ec.getServer())) {
epc.setServerAddress(ec.getServer().trim());
}
if (StringUtils.isNotEmpty(ec.getDbconfig())) {
epc.setDbConfig(ec.getDbconfig().trim());
}

// topics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,16 @@ private void syncEvent() {
if (thread.isInterrupted() || isClosed) {
throw new InterruptedException();
}
if (instance.isUseNativeQueue()) {
Thread.sleep(20);
} else if (instance.isBusy()) {
// isBusy() returns false unless the event plugin is active, so native-only
// mode keeps its original 20ms pacing. In combined mode we still honor the
// plugin's back-pressure so its async queue is not flooded during sync.
if (instance.isBusy()) {
Thread.sleep(100);
continue;
}
if (instance.isUseNativeQueue()) {
Thread.sleep(20);
}
BlockEvent blockEvent = blockEventGet.getBlockEvent(tmp);
realtimeEventService.flush(blockEvent, false);
solidEventService.flush(blockEvent);
Expand Down
2 changes: 1 addition & 1 deletion framework/src/main/resources/config.conf
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,7 @@ committee = {
event.subscribe = {
enable = false // enable event subscribe, replaces deprecated CLI flag --es
native = {
useNativeQueue = true // if true, use native message queue, else use event plugin.
useNativeQueue = true // start the native ZMQ queue. If `path` below is also set, the event plugin runs alongside it (both receive every trigger).
bindport = 5555 // bind port
sendqueuelength = 1000 //max length of send queue
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@ public class EventPluginLoaderTest {
public void testIsBusy() {

EventPluginLoader eventPluginLoader = EventPluginLoader.getInstance();

// When the event plugin is not active, the node is never "busy" regardless of the
// native queue: back-pressure only throttles the plugin's async queue.
ReflectUtils.setFieldValue(eventPluginLoader, "useEventPlugin", false);
ReflectUtils.setFieldValue(eventPluginLoader, "useNativeQueue", true);
boolean flag = eventPluginLoader.isBusy();
Assert.assertFalse(flag);

ReflectUtils.setFieldValue(eventPluginLoader, "useNativeQueue", false);
// When the plugin is active (with or without the native queue), busy-ness is driven
// by the plugin queue size.
ReflectUtils.setFieldValue(eventPluginLoader, "useEventPlugin", true);

IPluginEventListener p1 = mock(IPluginEventListener.class);
List<IPluginEventListener> list = new ArrayList<>();
Expand Down
Loading