11package mds .connection ;
22
33import java .io .*;
4- import java .net .*;
5- import java .util .*;
6- public class MdsConnection
4+ import java .net .Socket ;
5+ import java .net .UnknownHostException ;
6+ import java .util .Enumeration ;
7+ import java .util .Hashtable ;
8+ import java .util .NoSuchElementException ;
9+ import java .util .Vector ;
10+ import java .util .concurrent .ExecutorService ;
11+ import java .util .concurrent .Executors ;
12+ import java .util .concurrent .TimeUnit ;
13+
14+ public class MdsConnection implements AutoCloseable
715{
816 public static final int DEFAULT_PORT = 8000 ;
917 public static final String DEFAULT_USER = "JAVA_USER" ;
@@ -17,7 +25,7 @@ public class MdsConnection
1725 protected DataOutputStream dos ;
1826 public String error ;
1927 protected MRT receiveThread ;
20- protected boolean connected ;
28+ protected volatile boolean connected ;
2129 private int pending_count = 0 ;
2230 private final Vector <ConnectionListener > connection_listener = new Vector <ConnectionListener >();
2331 private final boolean event_flags [] = new boolean [MAX_NUM_EVENTS ];
@@ -37,6 +45,11 @@ public final boolean isConnected()
3745 public String getProvider ()
3846 { return provider ; }
3947
48+ /**
49+ * Use {@link #closeQuietly(AutoCloseable)} instead to avoid expansive
50+ * reflection.
51+ */
52+ @ Deprecated
4053 public static final void tryClose (final Object obj )
4154 {
4255 if (obj != null )
@@ -48,6 +61,34 @@ public static final void tryClose(final Object obj)
4861 {}
4962 }
5063
64+ @ Override
65+ public void close () throws Exception
66+ {
67+ connected = false ;
68+ // Close socket first to unblock any blocking IO
69+ closeQuietly (sock );
70+ if (receiveThread != null )
71+ {
72+ try
73+ {
74+ receiveThread .interrupt ();
75+ }
76+ catch (Exception ignore )
77+ {
78+ // Ignore interrupt failures - thread might already be dead or corrupted
79+ }
80+ try
81+ {
82+ receiveThread .join (1_200L ); // TODO: need to be customizable?
83+ }
84+ catch (InterruptedException e )
85+ {
86+ Thread .currentThread ().interrupt (); // Restore interrupt status
87+ }
88+ }
89+ QuitFromMds ();
90+ }
91+
5192 static class EventItem
5293 {
5394 String name ;
@@ -68,15 +109,33 @@ public String toString()
68109 }
69110 }
70111
71- class PMET extends Thread // Process Mds Event Thread
112+ /**
113+ * Visible for test
114+ */
115+ class PMET implements Runnable // Process Mds Event Thread
72116 {
73- int eventId = -1 ;
74- String eventName ;
117+ private final int eventId ;
118+ private final String eventName ;
119+ private final String threadName ;
120+
121+ public PMET (int id )
122+ {
123+ eventId = id ;
124+ eventName = null ;
125+ threadName = "Process Mds Event Thread - " + eventId ;
126+ }
127+
128+ public PMET (String name )
129+ {
130+ eventId = -1 ;
131+ eventName = name ;
132+ threadName = "Process Mds Event Thread - " + name ;
133+ }
75134
76135 @ Override
77136 public void run ()
78137 {
79- setName ("Process Mds Event Thread" );
138+ Thread . currentThread (). setName (threadName );
80139 if (MdsConnection .this .busy )
81140 return ;
82141 if (eventName != null )
@@ -90,27 +149,21 @@ else if (eventId != -1)
90149 dispatchUpdateEvent (eventId );
91150 }
92151 }
93-
94- public void SetEventid (int id )
95- {
96- // System.out.println("Received Event ID " + id);
97- eventId = id ;
98- eventName = null ;
99- }
100-
101- public void SetEventName (String name )
102- {
103- // System.out.println("Received Event Name " + name);
104- eventId = -1 ;
105- eventName = name ;
106- }
107152 }// end PMET class
108153
154+ /**
155+ * Visible for test
156+ * TODO: consider migration to Runnable with FixedThreadPool or VirtualThread
157+ */
109158 class MRT extends Thread // Mds Receive Thread
110159 {
111- MdsMessage message ;
112- boolean pending = false ;
113- boolean killed = false ;
160+ private MdsMessage message ;
161+ private volatile boolean killed = false ;
162+ /**
163+ * Thread pool for PMET, aims to shut down graceful and succinct
164+ * TODO:newFixedThreadPool or VirtualThread
165+ */
166+ private final ExecutorService executorService = Executors .newCachedThreadPool ();
114167
115168 @ Override
116169 public void run ()
@@ -119,15 +172,13 @@ public void run()
119172 MdsMessage curr_message ;
120173 try
121174 {
122- while (true )
175+ while (! killed )
123176 {
124177 curr_message = new MdsMessage ("" , MdsConnection .this .connection_listener );
125178 curr_message .Receive (dis );
126179 if (curr_message .dtype == Descriptor .DTYPE_EVENT )
127180 {
128- final PMET PMdsEvent = new PMET ();
129- PMdsEvent .SetEventid (curr_message .body [12 ]);
130- PMdsEvent .start ();
181+ executorService .submit (createPMET (curr_message .body [12 ]));
131182 }
132183 else
133184 {
@@ -147,6 +198,17 @@ public void run()
147198 synchronized (this )
148199 {
149200 killed = true ;
201+ try
202+ {
203+ executorService .shutdown ();
204+ executorService .awaitTermination (1_000 , TimeUnit .MILLISECONDS );
205+ }
206+ catch (Exception ignore )
207+ {}
208+ finally
209+ {
210+ executorService .shutdownNow ();
211+ }
150212 notifyAll ();
151213 }
152214 if (connected )
@@ -394,15 +456,22 @@ public void sendArg(byte descr_idx, byte dtype, byte nargs, int dims[], byte bod
394456 public int DisconnectFromMds ()
395457 {
396458 connection_listener .removeAllElements ();
459+ // TODO should we keep the EventItem even after Disconnect?
460+ hashEventName .clear ();
461+ hashEventId .clear ();
397462 connected = false ;
398463 return 1 ;
399464 }
400465
466+ /**
467+ * Use {@link #close()} instead
468+ */
401469 public void QuitFromMds ()
402470 {
403471 DisconnectFromMds ();
404- tryClose (dos );
405- tryClose (dis );
472+ closeQuietly (sock );
473+ closeQuietly (dos );
474+ closeQuietly (dis );
406475 }
407476
408477 public void connectToServer () throws IOException
@@ -600,4 +669,26 @@ public void dispatchConnectionEvent(ConnectionEvent e)
600669 }
601670 }
602671 }
672+
673+ /**
674+ * Factory method for {@link PMET} as a workaround for inner classes.
675+ */
676+ protected PMET createPMET (int id )
677+ {
678+ return new PMET (id );
679+ }
680+
681+ /**
682+ * Close the given object quietly. <br/>
683+ * Eqivalent to {@link #tryClose(Object)} without expansive reflection.
684+ */
685+ public static void closeQuietly (AutoCloseable obj )
686+ {
687+ try
688+ {
689+ obj .close ();
690+ }
691+ catch (Exception ignore )
692+ {}
693+ }
603694}
0 commit comments