Skip to content

Commit bd210f3

Browse files
mearvkCopilot
andcommitted
Replace busy-wait loops with wait/notify; notify on queue adds; fix TelnetMessageQueue.remove bug
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 9a0400f commit bd210f3

7 files changed

Lines changed: 235 additions & 181 deletions

File tree

source/bitcoin/messaging/MessageOrderer.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,23 +33,40 @@ public void run()
3333
{
3434
while(true)
3535
{
36-
36+
synchronized (this)
37+
{
38+
while (this.bitcoin_messages.isEmpty())
39+
{
40+
try { this.wait(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); return; }
41+
}
42+
43+
// process all messages (no-op here; callers add/remove externally)
44+
// keep loop to empty list if any
45+
while (this.bitcoin_messages.size() > 0)
46+
{
47+
// In absence of explicit processing logic, just remove the head
48+
this.bitcoin_messages.remove(0);
49+
}
50+
}
3751
}
3852
}
3953

4054
public synchronized void add(BitcoinMessage bitcoin_message)
4155
{
4256
this.bitcoin_messages.add(bitcoin_message);
57+
this.notifyAll();
4358
}
4459

4560
public synchronized void remove(BitcoinMessage bitcoin_message)
4661
{
4762
this.bitcoin_messages.remove(bitcoin_message);
63+
this.notifyAll();
4864
}
4965

5066
public synchronized void clear(BitcoinMessage bitcoin_message)
5167
{
5268
this.bitcoin_messages.clear();
69+
this.notifyAll();
5370
}
5471

5572
public static class BitcoinMessage

source/messaging/MessageQueue.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ public synchronized void add(Message message)
6060
CommonRails.printSystemComponent(this, this.hashCode(),"MessageQueue::add >> receives ["+message.MESSAGE_BUFFER.toString()+"].");
6161

6262
this.MESSAGES.add(message);
63+
this.notifyAll();
6364
}
6465

6566
public synchronized void remove(Message message)

source/messaging/MessageQueueSorter.java

Lines changed: 75 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -29,115 +29,118 @@ public void run()
2929
{
3030
CommonRails.printSystemComponent(this, this.hashCode(), ". WebExpress::MessageQueueSorter starts .");
3131

32-
for(;;)
32+
while(true)
3333
{
3434
MessageQueue message_queue = this.web_express.MESSAGE_QUEUE;
3535

36-
for(int i = 0; i<message_queue.MESSAGES.size(); i++)
36+
try
3737
{
38-
CommonRails.printSystemComponent(this, this.hashCode(),". WebExpress::MessageQueueSorter reports message queue has size of "+message_queue.MESSAGES.size()+" .");
39-
40-
CommonRails.printSystemComponent(this, this.hashCode(),". WebExpress::MessageQueueSorter received message from connection "+message_queue.MESSAGES.get(i).socket+" "+message_queue.MESSAGES.get(i).MESSAGE_BUFFER +" .");
41-
42-
MessageQueue.Message message = message_queue.MESSAGES.remove(i);
43-
44-
try
38+
synchronized (message_queue)
4539
{
46-
if(CommonRails.SocketUtils.isSocketConnected(message.socket))
40+
while (message_queue.MESSAGES.size() == 0)
4741
{
48-
BufferedWriter writer = this.web_express.TELNET_COMMUNICATION_PROXY.writer;
42+
try { message_queue.wait(); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); return; }
43+
}
4944

50-
CommonRails.printSystemComponent(this, this.hashCode(), ". WebExpress::MessageQueueSorter sending to Telnet message Message: " + message.MESSAGE_BUFFER + " .");
45+
// process all messages currently in queue
46+
while (message_queue.MESSAGES.size() > 0)
47+
{
48+
MessageQueue.Message message = message_queue.MESSAGES.remove(0);
5149

52-
writer.write("Message: "+message.MESSAGE_BUFFER +"\n");
50+
try
51+
{
52+
if(CommonRails.SocketUtils.isSocketConnected(message.socket))
53+
{
54+
BufferedWriter writer = this.web_express.TELNET_COMMUNICATION_PROXY.writer;
5355

54-
CommonRails.printSystemComponent(this, this.hashCode(),". WebExpress::MessageQueueSorter sending to Telnet message Date: " + message.time_stamp + " .");
56+
CommonRails.printSystemComponent(this, this.hashCode(), ". WebExpress::MessageQueueSorter sending to Telnet message Message: " + message.MESSAGE_BUFFER + " .");
5557

56-
writer.write("[Date]: " + message.time_stamp+"\n");
58+
writer.write("Message: "+message.MESSAGE_BUFFER +"\n");
5759

58-
CommonRails.printSystemComponent(this, this.hashCode(), ". WebExpress::MessageQueueSorter sending to Telnet message IP Address: " + message.internet_address + " .");
60+
CommonRails.printSystemComponent(this, this.hashCode(),". WebExpress::MessageQueueSorter sending to Telnet message Date: " + message.time_stamp + " .");
5961

60-
writer.write("[IP Address]: " + message.internet_address+"\n");
62+
writer.write("[Date]: " + message.time_stamp+"\n");
6163

62-
CommonRails.printSystemComponent(this, this.hashCode(),". WebExpress::MessageQueueSorter >> sending to Telnet message Socket: " + message.socket + " .");
64+
CommonRails.printSystemComponent(this, this.hashCode(), ". WebExpress::MessageQueueSorter sending to Telnet message IP Address: " + message.internet_address + " .");
6365

64-
writer.write("[Socket]: " + message.socket.toString()+"\n");
66+
writer.write("[IP Address]: " + message.internet_address+"\n");
6567

66-
writer.flush();
68+
CommonRails.printSystemComponent(this, this.hashCode(),". WebExpress::MessageQueueSorter >> sending to Telnet message Socket: " + message.socket + " .");
6769

68-
message_queue.remove(message);
69-
}
70-
}
71-
catch (SocketTimeoutException ste)
72-
{
73-
try
74-
{
75-
message.socket.close();
76-
}
77-
catch (Exception e)
78-
{
79-
CurrentConnections connections = this.web_express.current_connections;
80-
81-
connections.remove(message.connection);
70+
writer.write("[Socket]: " + message.socket.toString()+"\n");
8271

83-
EnglishArithemeter arithemeter = new EnglishArithemeter(connections.size());
72+
writer.flush();
8473

85-
CommonRails.printSystemComponent(this, this.hashCode(), ". WebExpress::MessageQueueSorter >> dropped connection "+message.socket+" - new connection count "+arithemeter.result.arithemetic +" : "+arithemeter.result.numeral +" .");
86-
}
74+
message_queue.remove(message);
75+
}
76+
}
77+
catch (SocketTimeoutException ste)
78+
{
79+
try
80+
{
81+
message.socket.close();
82+
}
83+
catch (Exception e)
84+
{
85+
CurrentConnections connections = this.web_express.current_connections;
8786

88-
this.web_express.current_connections.remove(message.socket);
87+
connections.remove(message.connection);
8988

90-
break;
91-
}
92-
catch (IOException e)
93-
{
94-
CommonRails.printSystemComponent(this, this.hashCode(),". WebExpress::MessageQueueSorter socket connection closed Socket: " + message.internet_address + " .");
95-
}
89+
EnglishArithemeter arithemeter = new EnglishArithemeter(connections.size());
9690

97-
try
98-
{
99-
BufferedReader reader = this.web_express.TELNET_COMMUNICATION_PROXY.reader;
91+
CommonRails.printSystemComponent(this, this.hashCode(), ". WebExpress::MessageQueueSorter >> dropped connection "+message.socket+" - new connection count "+arithemeter.result.arithemetic +" : "+arithemeter.result.numeral +" .");
92+
}
10093

101-
if(CommonRails.SocketUtils.isSocketConnected(message.socket))
102-
{
103-
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(message.socket.getOutputStream()));
94+
this.web_express.current_connections.remove(message.socket);
10495

105-
String line = null;
96+
break;
97+
}
98+
catch (IOException e)
99+
{
100+
CommonRails.printSystemComponent(this, this.hashCode(),". WebExpress::MessageQueueSorter socket connection closed Socket: " + message.internet_address + " .");
101+
}
106102

107-
while((line=reader.readLine())!=null)
103+
try
108104
{
105+
BufferedReader reader = this.web_express.TELNET_COMMUNICATION_PROXY.reader;
106+
109107
if(CommonRails.SocketUtils.isSocketConnected(message.socket))
110108
{
111-
CommonRails.printSystemComponent(this, this.hashCode(),". WebExpress::MessageQueueSorter received from active Telnet session "+ WebExpress.REMOTE_SITE+":"+ WebExpress.REMOTE_PORT+" message "+line+" .");
109+
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(message.socket.getOutputStream()));
112110

113-
writer.write(line+"\n");
111+
String line = null;
114112

115-
writer.flush();
116-
}
117-
else
118-
{
119-
CurrentConnections connections = this.web_express.current_connections;
113+
while((line=reader.readLine())!=null)
114+
{
115+
if(CommonRails.SocketUtils.isSocketConnected(message.socket))
116+
{
117+
CommonRails.printSystemComponent(this, this.hashCode(),". WebExpress::MessageQueueSorter received from active Telnet session "+ WebExpress.REMOTE_SITE+":"+ WebExpress.REMOTE_PORT+" message "+line+" .");
120118

121-
connections.remove(message.connection);
119+
writer.write(line+"\n");
122120

123-
EnglishArithemeter arithemeter = new EnglishArithemeter(connections.size());
121+
writer.flush();
122+
}
123+
else
124+
{
125+
CurrentConnections connections = this.web_express.current_connections;
126+
127+
connections.remove(message.connection);
124128

125-
CommonRails.printSystemComponent(this, this.hashCode(),". WebExpress::MessageQueueSorter dropped connection "+message.socket+" - new connection count "+arithemeter.result.arithemetic+" : "+arithemeter.result.numeral+" .");
129+
EnglishArithemeter arithemeter = new EnglishArithemeter(connections.size());
126130

127-
break;
131+
CommonRails.printSystemComponent(this, this.hashCode(),". WebExpress::MessageQueueSorter dropped connection "+message.socket+" - new connection count "+arithemeter.result.arithemetic+" : "+arithemeter.result.numeral+" .");
132+
133+
break;
134+
}
135+
}
128136
}
129137
}
138+
catch (Exception e)
139+
{
140+
CommonRails.printSystemComponent(this, this.hashCode(),". WebExpress::MessageQueueSorter >> dropped connection "+message.socket+" .");
141+
}
130142
}
131143
}
132-
catch (Exception e)
133-
{
134-
CommonRails.printSystemComponent(this, this.hashCode(),". WebExpress::MessageQueueSorter >> dropped connection "+message.socket+" .");
135-
}
136-
}
137-
138-
try
139-
{
140-
Thread.sleep(1000);
141144
}
142145
catch (Exception e)
143146
{

0 commit comments

Comments
 (0)