Skip to content

Commit 66581c4

Browse files
itizirhongalex
andauthored
fix(pubsub): Closes googleapis#10094 - memory leak in pubsub receive (googleapis#10153)
* fix(pubsub): make sure grpc stream gets closed * fix(pubsub): preserve cancellation error behaviour * fix(pubsub): the stream cancellation error is not necessarily a grpc error --------- Co-authored-by: Alex Hong <9397363+hongalex@users.noreply.github.com>
1 parent b371210 commit 66581c4

File tree

1 file changed

+11
-0
lines changed

1 file changed

+11
-0
lines changed

pubsub/iterator.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,9 @@ func (it *messageIterator) stop() {
163163
it.checkDrained()
164164
it.mu.Unlock()
165165
it.wg.Wait()
166+
if it.ps != nil {
167+
it.ps.cancel()
168+
}
166169
}
167170

168171
// checkDrained closes the drained channel if the iterator has been stopped and all
@@ -246,6 +249,14 @@ func (it *messageIterator) receive(maxToPull int32) ([]*Message, error) {
246249
rmsgs, err = it.pullMessages(maxToPull)
247250
} else {
248251
rmsgs, err = it.recvMessages()
252+
// If stopping the iterator results in the grpc stream getting shut down and
253+
// returning an error here, treat the same as above and return EOF.
254+
// If the cancellation comes from the underlying grpc client getting closed,
255+
// do propagate the cancellation error.
256+
// See https://github.com/googleapis/google-cloud-go/pull/10153#discussion_r1600814775
257+
if err != nil && it.ps.ctx.Err() == context.Canceled {
258+
err = io.EOF
259+
}
249260
}
250261
// Any error here is fatal.
251262
if err != nil {

0 commit comments

Comments
 (0)