25
25
loop_cls = asyncio .ProactorEventLoop # type: ignore[attr-defined,misc]
26
26
27
27
28
+ # pylint: disable=logging-fstring-interpolation
29
+
28
30
class AsyncioEventLoop (BaseEventLoop , asyncio .Protocol ,
29
31
asyncio .SubprocessProtocol ):
30
32
"""`BaseEventLoop` subclass that uses `asyncio` as a backend."""
@@ -42,6 +44,7 @@ def connection_made(self, transport):
42
44
43
45
def connection_lost (self , exc ):
44
46
"""Used to signal `asyncio.Protocol` of a lost connection."""
47
+ debug (f"connection_lost: exc = { exc } " )
45
48
self ._on_error (exc .args [0 ] if exc else 'EOF' )
46
49
47
50
def data_received (self , data : bytes ) -> None :
@@ -71,6 +74,7 @@ def pipe_data_received(self, fd, data):
71
74
72
75
def process_exited (self ) -> None :
73
76
"""Used to signal `asyncio.SubprocessProtocol` when the child exits."""
77
+ debug ("process_exited" )
74
78
self ._on_error ('EOF' )
75
79
76
80
def _init (self ) -> None :
@@ -81,50 +85,61 @@ def _init(self) -> None:
81
85
self ._child_watcher = None
82
86
83
87
def _connect_tcp (self , address : str , port : int ) -> None :
84
- coroutine = self ._loop .create_connection (self ._fact , address , port )
85
- self ._loop .run_until_complete (coroutine )
88
+ async def connect_tcp ():
89
+ await self ._loop .create_connection (self ._fact , address , port )
90
+ debug (f"tcp connection successful: { address } :{ port } " )
91
+
92
+ self ._loop .run_until_complete (connect_tcp ())
86
93
87
94
def _connect_socket (self , path : str ) -> None :
88
- if os .name == 'nt' :
89
- coroutine = self ._loop .create_pipe_connection ( # type: ignore[attr-defined]
90
- self ._fact , path
91
- )
92
- else :
93
- coroutine = self ._loop .create_unix_connection (self ._fact , path )
94
- self ._loop .run_until_complete (coroutine )
95
+ async def connect_socket ():
96
+ if os .name == 'nt' :
97
+ transport , _ = await self ._loop .create_pipe_connection (self ._fact , path )
98
+ else :
99
+ transport , _ = await self ._loop .create_unix_connection (self ._fact , path )
100
+ debug ("socket connection successful: %s" , transport )
101
+
102
+ self ._loop .run_until_complete (connect_socket ())
95
103
96
104
def _connect_stdio (self ) -> None :
97
- if os .name == 'nt' :
98
- pipe : Any = PipeHandle (
99
- msvcrt .get_osfhandle (sys .stdin .fileno ()) # type: ignore[attr-defined]
100
- )
101
- else :
102
- pipe = sys .stdin
103
- coroutine = self ._loop .connect_read_pipe (self ._fact , pipe )
104
- self ._loop .run_until_complete (coroutine )
105
- debug ("native stdin connection successful" )
105
+ async def connect_stdin ():
106
+ if os .name == 'nt' :
107
+ pipe = PipeHandle (msvcrt .get_osfhandle (sys .stdin .fileno ()))
108
+ else :
109
+ pipe = sys .stdin
110
+ await self ._loop .connect_read_pipe (self ._fact , pipe )
111
+ debug ("native stdin connection successful" )
112
+ self ._loop .run_until_complete (connect_stdin ())
106
113
107
114
# Make sure subprocesses don't clobber stdout,
108
115
# send the output to stderr instead.
109
116
rename_stdout = os .dup (sys .stdout .fileno ())
110
117
os .dup2 (sys .stderr .fileno (), sys .stdout .fileno ())
111
118
112
- if os .name == 'nt' :
113
- pipe = PipeHandle (
114
- msvcrt .get_osfhandle (rename_stdout ) # type: ignore[attr-defined]
115
- )
116
- else :
117
- pipe = os .fdopen (rename_stdout , 'wb' )
118
- coroutine = self ._loop .connect_write_pipe (self ._fact , pipe ) # type: ignore[assignment]
119
- self ._loop .run_until_complete (coroutine )
120
- debug ("native stdout connection successful" )
119
+ async def connect_stdout ():
120
+ if os .name == 'nt' :
121
+ pipe = PipeHandle (msvcrt .get_osfhandle (rename_stdout ))
122
+ else :
123
+ pipe = os .fdopen (rename_stdout , 'wb' )
124
+
125
+ await self ._loop .connect_write_pipe (self ._fact , pipe )
126
+ debug ("native stdout connection successful" )
127
+
128
+ self ._loop .run_until_complete (connect_stdout ())
121
129
122
130
def _connect_child (self , argv : List [str ]) -> None :
123
131
if os .name != 'nt' :
124
- self ._child_watcher = asyncio .get_child_watcher ()
125
- self ._child_watcher .attach_loop (self ._loop )
126
- coroutine = self ._loop .subprocess_exec (self ._fact , * argv )
127
- self ._loop .run_until_complete (coroutine )
132
+ # see #238, #241
133
+ _child_watcher = asyncio .get_child_watcher ()
134
+ _child_watcher .attach_loop (self ._loop )
135
+
136
+ async def create_subprocess ():
137
+ transport : asyncio .SubprocessTransport
138
+ transport , protocol = await self ._loop .subprocess_exec (self ._fact , * argv )
139
+ pid = transport .get_pid ()
140
+ debug ("child subprocess_exec successful, PID = %s" , pid )
141
+
142
+ self ._loop .run_until_complete (create_subprocess ())
128
143
129
144
def _start_reading (self ) -> None :
130
145
pass
0 commit comments