@@ -25,7 +25,7 @@ StreamPipe::StreamPipe(StreamBase* source,
25
25
source->PushStreamListener (&readable_listener_);
26
26
sink->PushStreamListener (&writable_listener_);
27
27
28
- CHECK ( sink->HasWantsWrite () );
28
+ uses_wants_write_ = sink->HasWantsWrite ();
29
29
30
30
// Set up links between this object and the source/sink objects.
31
31
// In particular, this makes sure that they are garbage collected as a group,
@@ -66,7 +66,8 @@ void StreamPipe::Unpipe(bool is_in_deletion) {
66
66
is_closed_ = true ;
67
67
is_reading_ = false ;
68
68
source ()->RemoveStreamListener (&readable_listener_);
69
- sink ()->RemoveStreamListener (&writable_listener_);
69
+ if (pending_writes_ == 0 )
70
+ sink ()->RemoveStreamListener (&writable_listener_);
70
71
71
72
if (is_in_deletion) return ;
72
73
@@ -126,13 +127,16 @@ void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
126
127
// EOF or error; stop reading and pass the error to the previous listener
127
128
// (which might end up in JS).
128
129
pipe ->is_eof_ = true ;
130
+ // Cache `sink()` here because the previous listener might do things
131
+ // that eventually lead to an `Unpipe()` call.
132
+ StreamBase* sink = pipe ->sink ();
129
133
stream ()->ReadStop ();
130
134
CHECK_NOT_NULL (previous_listener_);
131
135
previous_listener_->OnStreamRead (nread, uv_buf_init (nullptr , 0 ));
132
136
// If we’re not writing, close now. Otherwise, we’ll do that in
133
137
// `OnStreamAfterWrite()`.
134
- if (! pipe ->is_writing_ ) {
135
- pipe -> ShutdownWritable ();
138
+ if (pipe ->pending_writes_ == 0 ) {
139
+ sink-> Shutdown ();
136
140
pipe ->Unpipe ();
137
141
}
138
142
return ;
@@ -142,32 +146,40 @@ void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
142
146
}
143
147
144
148
void StreamPipe::ProcessData (size_t nread, AllocatedBuffer&& buf) {
149
+ CHECK (uses_wants_write_ || pending_writes_ == 0 );
145
150
uv_buf_t buffer = uv_buf_init (buf.data (), nread);
146
151
StreamWriteResult res = sink ()->Write (&buffer, 1 );
152
+ pending_writes_++;
147
153
if (!res.async ) {
148
154
writable_listener_.OnStreamAfterWrite (nullptr , res.err );
149
155
} else {
150
- is_writing_ = true ;
151
156
is_reading_ = false ;
152
157
res.wrap ->SetAllocatedStorage (std::move (buf));
153
158
if (source () != nullptr )
154
159
source ()->ReadStop ();
155
160
}
156
161
}
157
162
158
- void StreamPipe::ShutdownWritable () {
159
- sink ()->Shutdown ();
160
- }
161
-
162
163
void StreamPipe::WritableListener::OnStreamAfterWrite (WriteWrap* w,
163
164
int status) {
164
165
StreamPipe* pipe = ContainerOf (&StreamPipe::writable_listener_, this );
165
- pipe ->is_writing_ = false ;
166
+ pipe ->pending_writes_ --;
167
+ if (pipe ->is_closed_ ) {
168
+ if (pipe ->pending_writes_ == 0 ) {
169
+ Environment* env = pipe ->env ();
170
+ HandleScope handle_scope (env->isolate ());
171
+ Context::Scope context_scope (env->context ());
172
+ pipe ->MakeCallback (env->oncomplete_string (), 0 , nullptr ).ToLocalChecked ();
173
+ stream ()->RemoveStreamListener (this );
174
+ }
175
+ return ;
176
+ }
177
+
166
178
if (pipe ->is_eof_ ) {
167
179
HandleScope handle_scope (pipe ->env ()->isolate ());
168
180
InternalCallbackScope callback_scope (pipe ,
169
181
InternalCallbackScope::kSkipTaskQueues );
170
- pipe ->ShutdownWritable ();
182
+ pipe ->sink ()-> Shutdown ();
171
183
pipe ->Unpipe ();
172
184
return ;
173
185
}
@@ -179,6 +191,10 @@ void StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w,
179
191
prev->OnStreamAfterWrite (w, status);
180
192
return ;
181
193
}
194
+
195
+ if (!pipe ->uses_wants_write_ ) {
196
+ OnStreamWantsWrite (65536 );
197
+ }
182
198
}
183
199
184
200
void StreamPipe::WritableListener::OnStreamAfterShutdown (ShutdownWrap* w,
@@ -202,6 +218,7 @@ void StreamPipe::WritableListener::OnStreamDestroy() {
202
218
StreamPipe* pipe = ContainerOf (&StreamPipe::writable_listener_, this );
203
219
pipe ->sink_destroyed_ = true ;
204
220
pipe ->is_eof_ = true ;
221
+ pipe ->pending_writes_ = 0 ;
205
222
pipe ->Unpipe ();
206
223
}
207
224
@@ -242,8 +259,7 @@ void StreamPipe::Start(const FunctionCallbackInfo<Value>& args) {
242
259
StreamPipe* pipe ;
243
260
ASSIGN_OR_RETURN_UNWRAP (&pipe , args.Holder ());
244
261
pipe ->is_closed_ = false ;
245
- if (pipe ->wanted_data_ > 0 )
246
- pipe ->writable_listener_ .OnStreamWantsWrite (pipe ->wanted_data_ );
262
+ pipe ->writable_listener_ .OnStreamWantsWrite (65536 );
247
263
}
248
264
249
265
void StreamPipe::Unpipe (const FunctionCallbackInfo<Value>& args) {
@@ -252,6 +268,18 @@ void StreamPipe::Unpipe(const FunctionCallbackInfo<Value>& args) {
252
268
pipe ->Unpipe ();
253
269
}
254
270
271
+ void StreamPipe::IsClosed (const FunctionCallbackInfo<Value>& args) {
272
+ StreamPipe* pipe ;
273
+ ASSIGN_OR_RETURN_UNWRAP (&pipe , args.Holder ());
274
+ args.GetReturnValue ().Set (pipe ->is_closed_ );
275
+ }
276
+
277
+ void StreamPipe::PendingWrites (const FunctionCallbackInfo<Value>& args) {
278
+ StreamPipe* pipe ;
279
+ ASSIGN_OR_RETURN_UNWRAP (&pipe , args.Holder ());
280
+ args.GetReturnValue ().Set (pipe ->pending_writes_ );
281
+ }
282
+
255
283
namespace {
256
284
257
285
void InitializeStreamPipe (Local<Object> target,
@@ -266,6 +294,8 @@ void InitializeStreamPipe(Local<Object> target,
266
294
FIXED_ONE_BYTE_STRING (env->isolate (), " StreamPipe" );
267
295
env->SetProtoMethod (pipe , " unpipe" , StreamPipe::Unpipe);
268
296
env->SetProtoMethod (pipe , " start" , StreamPipe::Start);
297
+ env->SetProtoMethod (pipe , " isClosed" , StreamPipe::IsClosed);
298
+ env->SetProtoMethod (pipe , " pendingWrites" , StreamPipe::PendingWrites);
269
299
pipe ->Inherit (AsyncWrap::GetConstructorTemplate (env));
270
300
pipe ->SetClassName (stream_pipe_string);
271
301
pipe ->InstanceTemplate ()->SetInternalFieldCount (1 );
0 commit comments