Skip to content

Commit baf7b58

Browse files
committed
System Touch
1 parent 8bcc707 commit baf7b58

4 files changed

Lines changed: 190 additions & 65 deletions

File tree

2.61 KB
Binary file not shown.

source/connections/ConnectionPoller.java

Lines changed: 161 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@
99
import telnet.TelnetMessageQueue;
1010

1111
import java.io.BufferedReader;
12+
import java.io.BufferedWriter;
1213
import java.io.InputStreamReader;
14+
import java.io.OutputStreamWriter;
1315
import java.net.ServerSocket;
1416
import java.net.SocketTimeoutException;
1517
import java.util.Date;
@@ -32,6 +34,9 @@ public class ConnectionPoller extends Thread
3234

3335
protected static final Integer READ_WRITE_STANDARD_SOCKET_TIMEOUT = 60*2*1000;
3436

37+
/** Timeout (ms) for reading client request and telnet backend response per session. */
38+
protected static final int PROXY_READ_TIMEOUT_MS = 5000;
39+
3540
public ConnectionPoller(BaseServer BASESERVER, String HOST, Integer PORT)
3641
{
3742
this.BASESERVER = BASESERVER;
@@ -67,9 +72,10 @@ public void run()
6772
{
6873
CURRENT_CONNECTIONS = this.BASESERVER.CURRENT_CONNECTIONS;
6974

70-
for(int i=0; i<CURRENT_CONNECTIONS.size(); i++)
75+
for(int i = 0; i < CURRENT_CONNECTIONS.size(); i++)
7176
{
72-
if(this.WEBEXPRESS==null || this.WEBEXPRESS.CURRENT_CONNECTIONS==null) throw new SecurityException("//bodi/exceptions");
77+
if(this.WEBEXPRESS == null || this.WEBEXPRESS.CURRENT_CONNECTIONS == null)
78+
throw new SecurityException("//bodi/exceptions");
7379

7480
CurrentConnections CONNECTIONS = this.WEBEXPRESS.CURRENT_CONNECTIONS;
7581

@@ -79,91 +85,187 @@ public void run()
7985
{
8086
EnglishArithemeter arithemeter = new EnglishArithemeter(CONNECTIONS.size());
8187

82-
CommonRails.printSystemComponent(this, this.hashCode(), "WebExpress ConnectionPoller >> new CONNECTION from ["+CONNECTION.SOCKET.getRemoteSocketAddress()+"] total CONNECTION count ["+arithemeter.result.arithemetic+" : "+arithemeter.result.numeral+"].");
88+
CommonRails.printSystemComponent(this, this.hashCode(),
89+
"WebExpress ConnectionPoller >> new CONNECTION from ["
90+
+ CONNECTION.SOCKET.getRemoteSocketAddress()
91+
+ "] total CONNECTION count ["
92+
+ arithemeter.result.arithemetic + " : " + arithemeter.result.numeral + "].");
8393

8494
TelnetMessageQueue.Message TELNET_MESSAGE = new TelnetMessageQueue.Message();
8595

86-
TELNET_MESSAGE.PORT = Integer.parseInt(WebExpress.REMOTE_PORT);
87-
88-
TELNET_MESSAGE.SOCKET = this.WEBEXPRESS.TELNET_COMMUNICATION_PROXY.socket;
89-
96+
TELNET_MESSAGE.PORT = Integer.parseInt(WebExpress.REMOTE_PORT);
97+
TELNET_MESSAGE.SOCKET = this.WEBEXPRESS.TELNET_COMMUNICATION_PROXY.socket;
9098
TELNET_MESSAGE.TIMESTAMP = new Date();
91-
9299
TELNET_MESSAGE.MESSAGE_BUFFER = new StringBuffer("US6");
93100

94101
this.WEBEXPRESS.TELNET_COMMUNICATION_PROXY.OUTPUT_BUILDER.TELNET_MESSAGE_QUEUE.add(TELNET_MESSAGE);
95102

96103
CONNECTION.IS_TELNET_EXCELSIOR_CONNECTED = Boolean.TRUE;
97104
}
98105

99-
if(CommonRails.SocketUtils.isSocketConnected(MESSAGE.SOCKET))
106+
// Assign MESSAGE fields from current CONNECTION before any guard
107+
MESSAGE.CONNECTION = CONNECTION;
108+
MESSAGE.SOCKET = CONNECTION.SOCKET;
109+
MESSAGE.INTERNET_ADDRESS = CONNECTION.SOCKET.getInetAddress();
110+
MESSAGE.TIME_STAMP = new Date(System.currentTimeMillis());
111+
112+
if(CommonRails.SocketUtils.isSocketConnected(CONNECTION.SOCKET))
100113
{
101-
MESSAGE.CONNECTION = CONNECTION;
114+
MESSAGE.MESSAGE_BUFFER = new StringBuffer("US22.09");
102115

103-
MESSAGE.SOCKET = CONNECTION.SOCKET;
116+
StringBuilder BUFFER = new StringBuilder();
104117

105-
MESSAGE.INTERNET_ADDRESS = CONNECTION.SOCKET.getInetAddress();
118+
// ── 1. Read the client request with a bounded timeout ──────────────
119+
try
120+
{
121+
CONNECTION.SOCKET.setSoTimeout(PROXY_READ_TIMEOUT_MS);
106122

107-
MESSAGE.TIME_STAMP = new Date(System.currentTimeMillis());
123+
BufferedReader READER = new BufferedReader(
124+
new InputStreamReader(CONNECTION.SOCKET.getInputStream()));
108125

109-
MESSAGE.MESSAGE_BUFFER = new StringBuffer("US22.09");
126+
String LINE;
110127

111-
BufferedReader READER = new BufferedReader(new InputStreamReader(CONNECTION.SOCKET.getInputStream()));
128+
if((LINE = READER.readLine()) != null)
129+
{
130+
BUFFER.append(LINE).append(LINE_FEED);
112131

113-
StringBuilder BUFFER = new StringBuilder();
132+
while((LINE = READER.readLine()) != null)
133+
{
134+
CommonRails.printSystemComponent(this, this.hashCode(),
135+
"WebExpress ConnectionPoller >> reading input ["
136+
+ MESSAGE.SOCKET + "] line [" + LINE + "].");
114137

115-
String LINE = null;
138+
BUFFER.append(LINE).append(LINE_FEED);
139+
}
140+
}
141+
}
142+
catch(SocketTimeoutException clientReadDone)
143+
{
144+
// read window expired — proceed with what arrived
145+
}
146+
finally
147+
{
148+
CONNECTION.SOCKET.setSoTimeout(READ_WRITE_STANDARD_SOCKET_TIMEOUT);
149+
}
116150

151+
MESSAGE.MESSAGE_BUFFER = new StringBuffer(BUFFER);
152+
153+
// ── 2. Forward request and write response directly back to client ──
117154
try
118155
{
119-
if ((LINE=READER.readLine())!=null)
156+
if(BUFFER.length() > 0)
120157
{
121-
String LOCAL_TEMP = LINE;
158+
// Spawn a dedicated telnet subprocess per session so concurrent
159+
// clients each get their own independent backend connection.
160+
ProcessBuilder PER_PB = new ProcessBuilder(WebExpress.TELNET_PROXY_SERVER_ARGS);
122161

123-
BUFFER.append(LOCAL_TEMP);
162+
Process PER_PROC = PER_PB.start();
124163

125-
for(LINE=null;(LINE=READER.readLine())!=null;)
164+
try
126165
{
127-
CommonRails.printSystemComponent(this, this.hashCode(), "WebExpress ConnectionPoller >> reading in input ["+MESSAGE.SOCKET +"] for Telnet Proxy message ["+LINE+"].");
128-
129-
MESSAGE.SOCKET.setSoTimeout(ConnectionPoller.READ_WRITE_STANDARD_SOCKET_TIMEOUT);
130-
131-
BUFFER.append(LINE).append(LINE_FEED);
166+
// Send request bytes to the per-session telnet process
167+
java.io.OutputStream PER_OUT = PER_PROC.getOutputStream();
168+
byte[] REQUEST_BYTES = BUFFER.toString().getBytes();
169+
PER_OUT.write(REQUEST_BYTES);
170+
PER_OUT.write('\n');
171+
PER_OUT.flush();
172+
173+
CommonRails.printSystemComponent(this, this.hashCode(),
174+
"WebExpress ConnectionPoller >> forwarded ["
175+
+ REQUEST_BYTES.length + " bytes] to per-session telnet backend.");
176+
177+
// Blocking byte-copy with wall-clock timeout using a reader thread.
178+
// available() can't be used here — the telnet process needs time
179+
// to resolve DNS and establish the remote connection.
180+
java.io.InputStream PER_IN = PER_PROC.getInputStream();
181+
java.io.OutputStream CLIENT_OUT = CONNECTION.SOCKET.getOutputStream();
182+
183+
java.util.concurrent.atomic.AtomicLong LAST_READ =
184+
new java.util.concurrent.atomic.AtomicLong(System.currentTimeMillis());
185+
186+
java.util.concurrent.ExecutorService READER_EXEC =
187+
java.util.concurrent.Executors.newSingleThreadExecutor();
188+
189+
java.util.concurrent.Future<?> READER_FUTURE = READER_EXEC.submit(() ->
190+
{
191+
byte[] CHUNK = new byte[4096];
192+
int READ;
193+
try
194+
{
195+
while ((READ = PER_IN.read(CHUNK)) != -1)
196+
{
197+
CLIENT_OUT.write(CHUNK, 0, READ);
198+
CLIENT_OUT.flush();
199+
LAST_READ.set(System.currentTimeMillis());
200+
201+
CommonRails.printSystemComponent(this, this.hashCode(),
202+
"WebExpress ConnectionPoller >> proxied ["
203+
+ READ + " bytes] to client.");
204+
}
205+
}
206+
catch (Exception ignored) {}
207+
});
208+
209+
// Wait up to PROXY_READ_TIMEOUT_MS of idle silence after last read
210+
long WALL = System.currentTimeMillis() + 15_000L;
211+
212+
while (System.currentTimeMillis() < WALL)
213+
{
214+
if (READER_FUTURE.isDone()) break;
215+
216+
if (System.currentTimeMillis() - LAST_READ.get() > PROXY_READ_TIMEOUT_MS)
217+
break;
218+
219+
Thread.sleep(100);
220+
}
221+
222+
READER_FUTURE.cancel(true);
223+
READER_EXEC.shutdownNow();
224+
225+
CommonRails.printSystemComponent(this, this.hashCode(),
226+
"WebExpress ConnectionPoller >> per-session proxy read window closed.");
227+
}
228+
finally
229+
{
230+
try { PER_PROC.destroyForcibly(); } catch(Exception ignored) {}
132231
}
133232
}
134233

135-
MESSAGE.MESSAGE_BUFFER = new StringBuffer(BUFFER);
136-
137234
this.WEBEXPRESS.MESSAGE_QUEUE.add(MESSAGE);
138235
}
139-
catch (SocketTimeoutException ste)
236+
catch(SocketTimeoutException ste)
140237
{
141-
MESSAGE.MESSAGE_BUFFER = new StringBuffer(BUFFER);
142-
143238
this.WEBEXPRESS.MESSAGE_QUEUE.add(MESSAGE);
144239

145240
CONNECTIONS.remove(CONNECTION);
146241

147-
CommonRails.printSystemComponent(this, this.hashCode(), "WebExpress ConnectionPoller >> graceful disconnect ["+MESSAGE.SOCKET.getRemoteSocketAddress()+"] ["+ste.getMessage()+"] total CONNECTION count ["+CONNECTIONS.size()+"].");
242+
CommonRails.printSystemComponent(this, this.hashCode(),
243+
"WebExpress ConnectionPoller >> graceful disconnect ["
244+
+ MESSAGE.SOCKET.getRemoteSocketAddress() + "] ["
245+
+ ste.getMessage() + "] total count [" + CONNECTIONS.size() + "].");
148246
}
149-
catch (Exception e)
247+
catch(Exception e)
150248
{
151249
ExceptionHandler.dispatch(e);
250+
152251
CONNECTIONS.remove(CONNECTION);
153252

154-
CommonRails.printSystemComponent(this, this.hashCode(), "WebExpress ConnectionPoller >> socket exception ["+e.getMessage()+"].");
253+
CommonRails.printSystemComponent(this, this.hashCode(),
254+
"WebExpress ConnectionPoller >> socket exception [" + e.getMessage() + "].");
155255
}
156256
finally
157257
{
158-
for(int k=0; k<CURRENT_CONNECTIONS.size(); k++)
258+
for(int k = 0; k < CURRENT_CONNECTIONS.size(); k++)
159259
{
160260
Connection LATENT = CURRENT_CONNECTIONS.CURRENT_CONNECTION.get(k);
161261

162262
if(CommonRails.SocketUtils.isSocketClosed(LATENT.SOCKET))
163263
{
164264
CURRENT_CONNECTIONS.remove(LATENT);
165265

166-
CommonRails.printSystemComponent(this, this.hashCode(), "WebExpress ConnectionPoller >> closed a sleeping turtle ["+LATENT.SOCKET +"].");
266+
CommonRails.printSystemComponent(this, this.hashCode(),
267+
"WebExpress ConnectionPoller >> closed sleeping turtle ["
268+
+ LATENT.SOCKET + "].");
167269
}
168270
}
169271

@@ -177,48 +279,55 @@ public void run()
177279
{
178280
try
179281
{
180-
if(CONNECTION.SOCKET !=null)
282+
if(CONNECTION.SOCKET != null)
181283
{
182284
CONNECTION.SOCKET.close();
183285
}
184286
}
185-
catch (Exception e)
287+
catch(Exception e)
186288
{
187289
ExceptionHandler.dispatch(e);
188-
CommonRails.printSystemComponent(this, this.hashCode(), "WebExpress ConnectionPoller >> closed CONNECTION close.");
290+
291+
CommonRails.printSystemComponent(this, this.hashCode(),
292+
"WebExpress ConnectionPoller >> closed CONNECTION close.");
189293
}
190294
}
191295
}
192296
}
193-
catch (SocketTimeoutException ste)
297+
catch(SocketTimeoutException ste)
194298
{
195-
CommonRails.printSystemComponent(this, this.hashCode(), "WebExpress ConnectionPoller >> closing socket due to timeout ["+MESSAGE.SOCKET +"].");
299+
CommonRails.printSystemComponent(this, this.hashCode(),
300+
"WebExpress ConnectionPoller >> closing socket due to timeout [" + MESSAGE.SOCKET + "].");
196301

197302
CURRENT_CONNECTIONS.remove(CONNECTION);
198303

199-
if(MESSAGE.MESSAGE_BUFFER.length()>0)
304+
if(MESSAGE.MESSAGE_BUFFER.length() > 0)
200305
{
201306
this.WEBEXPRESS.MESSAGE_QUEUE.add(MESSAGE);
202307
}
203308

204-
CommonRails.printSystemComponent(this, this.hashCode(), "WebExpress ConnectionPoller >> new CONNECTION count ["+CURRENT_CONNECTIONS.size()+"].");
309+
CommonRails.printSystemComponent(this, this.hashCode(),
310+
"WebExpress ConnectionPoller >> new CONNECTION count [" + CURRENT_CONNECTIONS.size() + "].");
205311

206312
try
207313
{
208-
if(CONNECTION!=null && CONNECTION.SOCKET !=null)
314+
if(CONNECTION != null && CONNECTION.SOCKET != null)
209315
{
210316
CONNECTION.SOCKET.close();
211317
}
212318
}
213-
catch (Exception e)
319+
catch(Exception e)
214320
{
215321
ExceptionHandler.dispatch(e);
216-
CommonRails.printSystemComponent(this, this.hashCode(), "WebExpress ConnectionPoller >> closed CONNECTION close.");
322+
323+
CommonRails.printSystemComponent(this, this.hashCode(),
324+
"WebExpress ConnectionPoller >> closed CONNECTION close.");
217325
}
218326
}
219-
catch (Exception e)
327+
catch(Exception e)
220328
{
221329
ExceptionHandler.dispatch(e);
330+
222331
e.printStackTrace(System.err);
223332
}
224333
finally
@@ -227,14 +336,14 @@ public void run()
227336
{
228337
Thread.sleep(1500);
229338
}
230-
catch (Exception e)
339+
catch(Exception e)
231340
{
232341
ExceptionHandler.dispatch(e);
233-
CommonRails.printSystemComponent(this, this.hashCode(), "WebExpress ConnectionPoller >> closed CONNECTION on main polling thread sleep.");
342+
343+
CommonRails.printSystemComponent(this, this.hashCode(),
344+
"WebExpress ConnectionPoller >> closed CONNECTION on main polling thread sleep.");
234345
}
235346
}
236-
237-
238347
}
239348
}
240-
}
349+
}
163 Bytes
Binary file not shown.

0 commit comments

Comments
 (0)