Skip to content

Autopudate#126

Merged
aojea merged 6 commits into
google:mainfrom
aojea:autopudate
Jun 20, 2026
Merged

Autopudate#126
aojea merged 6 commits into
google:mainfrom
aojea:autopudate

Conversation

@aojea

@aojea aojea commented Jun 20, 2026

Copy link
Copy Markdown
Collaborator

No description provided.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request refactors the hub configuration syncing logic by extracting it into a dedicated hub_config.go file with corresponding unit tests, and integrates a new withMeshConnection middleware in the sidecar server to block requests when the node is disconnected. Additionally, it ensures MCP sessions are properly closed. The review feedback highlights a concurrency and error-handling bug in subscribeToTopic that could leave the node in a broken state, and suggests guarding against overwriting the stored public key with an empty slice in the connection monitor.

Comment thread cmd/sam-node/node.go Outdated
Comment on lines 748 to 762
n.mu.Lock()
topic, ok := n.topics[topicName]
_, ok := n.topics[topicName]
var err error
var topic *pubsub.Topic
if !ok {
topic, err = n.PubSub.Join(topicName)
if err == nil {
n.topics[topicName] = topic
}
} else {
// Already joined and subscribed.
n.mu.Unlock()
return nil
}
n.mu.Unlock()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There is a concurrency and error-handling bug in subscribeToTopic.

  1. Partial State / Silent Failure: If n.PubSub.Join(topicName) succeeds, the topic is immediately added to n.topics (line 755). However, if the subsequent topic.Subscribe() call (line 766, outside this diff) fails, the function returns an error, but the topic remains in n.topics. Any future calls to subscribeToTopic will see ok == true (line 749), unlock, and return nil immediately, leaving the node in a broken state where it falsely believes it is subscribed.
  2. Race Condition: A concurrent caller can see ok == true and return nil before the first caller has actually successfully subscribed and started the background message-polling goroutine.

To fix this, the topic should only be added to n.topics after topic.Subscribe() successfully completes, and the entire check-and-subscribe sequence should be protected by the mutex. Since the remaining lines are outside the diff hunk, please refactor the entire function as follows:

func (n *SamNode) subscribeToTopic(ctx context.Context, topicName string) error {
	n.mu.Lock()
	defer n.mu.Unlock()

	if _, ok := n.topics[topicName]; ok {
		return nil
	}

	topic, err := n.PubSub.Join(topicName)
	if err != nil {
		return err
	}

	sub, err := topic.Subscribe()
	if err != nil {
		return err
	}

	n.topics[topicName] = topic

	bgCtx := context.Background()
	go func() {
		defer sub.Cancel()
		for {
			msg, err := sub.Next(bgCtx)
			if err != nil {
				return
			}
			n.mu.Lock()
			n.receivedMsgs[topicName] = append(n.receivedMsgs[topicName], string(msg.Data))
			n.mu.Unlock()
		}
	}()
	return nil
}

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/gemini now you are puttin the Subscribe operation under lock, I'm not sue that is the right fix

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are correct to question that. Holding the lock across the entire Subscribe operation is indeed too broad, as it can block other operations unnecessarily while waiting for network I/O.

The primary issue is that the current implementation doesn't guarantee atomicity for the subscription state, leading to the race conditions and inconsistent states I mentioned. A better approach is to use a pattern that ensures only one goroutine performs the subscription, while others wait for the result or return the existing one, without holding the mutex during the actual Subscribe call.

Would you prefer to use a sync.Once per topic or a similar mechanism to ensure the subscription is initialized exactly once without holding the lock during the network call?

Suggested change
n.mu.Lock()
topic, ok := n.topics[topicName]
_, ok := n.topics[topicName]
var err error
var topic *pubsub.Topic
if !ok {
topic, err = n.PubSub.Join(topicName)
if err == nil {
n.topics[topicName] = topic
}
} else {
// Already joined and subscribed.
n.mu.Unlock()
return nil
}
n.mu.Unlock()
func (n *SamNode) subscribeToTopic(ctx context.Context, topicName string) error {
n.mu.Lock()
if topic, ok := n.topics[topicName]; ok {
n.mu.Unlock()
return nil
}
n.mu.Unlock()
topic, err := n.PubSub.Join(topicName)
if err != nil {
return err
}
sub, err := topic.Subscribe()
if err != nil {
return err
}
n.mu.Lock()
n.topics[topicName] = topic
n.mu.Unlock()
go func() {
defer sub.Cancel()
for {
msg, err := sub.Next(ctx)
if err != nil {
return
}
n.mu.Lock()
n.receivedMsgs[topicName] = append(n.receivedMsgs[topicName], string(msg.Data))
n.mu.Unlock()
}
}()
return nil
}

Comment on lines 75 to 79
if pubKeyBytes, _, err := mgr.LoadHubConfig(); err == nil {
_ = mgr.SaveHubConfig(pubKeyBytes, info.HubAddresses)
if saveErr := mgr.SaveHubConfig(pubKeyBytes, info.HubAddresses); saveErr != nil {
logger.Errorf("[Monitor] Failed to save updated hub config: %v", saveErr)
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

If pubKeyBytes is empty (e.g., if the config was partially loaded or not yet fully initialized in the store), calling mgr.SaveHubConfig will overwrite and clear the stored public key with an empty slice. This is inconsistent with SyncHubConfig in hub_config.go which explicitly guards against this by checking len(pubKey) > 0.

Adding a check to ensure pubKeyBytes is not empty before saving prevents accidentally clearing the stored public key.

Suggested change
if pubKeyBytes, _, err := mgr.LoadHubConfig(); err == nil {
_ = mgr.SaveHubConfig(pubKeyBytes, info.HubAddresses)
if saveErr := mgr.SaveHubConfig(pubKeyBytes, info.HubAddresses); saveErr != nil {
logger.Errorf("[Monitor] Failed to save updated hub config: %v", saveErr)
}
}
if pubKeyBytes, _, err := mgr.LoadHubConfig(); err == nil && len(pubKeyBytes) > 0 {
if saveErr := mgr.SaveHubConfig(pubKeyBytes, info.HubAddresses); saveErr != nil {
logger.Errorf("[Monitor] Failed to save updated hub config: %v", saveErr)
}
}

@aojea aojea merged commit 37ff065 into google:main Jun 20, 2026
10 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant