Skip to content

Commit 0d247e6

Browse files
fix: set x-goog-request-params for streaming pull request (#884)
* samples: schema evolution * Add command-line commands * Fix tag for rollback * Make formatting fixes * Formatting fixes * Fix exceptions * fix: Set x-goog-request-params for streaming pull request
1 parent a6df376 commit 0d247e6

File tree

2 files changed

+10
-0
lines changed

2 files changed

+10
-0
lines changed

google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,9 @@ def __init__(
279279
self._await_callbacks_on_shutdown = await_callbacks_on_shutdown
280280
self._ack_histogram = histogram.Histogram()
281281
self._last_histogram_size = 0
282+
self._stream_metadata = [
283+
["x-goog-request-params", "subscription=" + subscription]
284+
]
282285

283286
# If max_duration_per_lease_extension is the default
284287
# we set the stream_ack_deadline to the default of 60
@@ -845,6 +848,7 @@ def open(
845848
initial_request=get_initial_request,
846849
should_recover=self._should_recover,
847850
should_terminate=self._should_terminate,
851+
metadata=self._stream_metadata,
848852
throttle_reopen=True,
849853
)
850854
self._rpc.add_done_callback(self._on_rpc_done)

tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ def test__wrap_callback_errors_error():
9191

9292

9393
def test_constructor_and_default_state():
94+
mock.sentinel.subscription = str()
9495
manager = streaming_pull_manager.StreamingPullManager(
9596
mock.sentinel.client, mock.sentinel.subscription
9697
)
@@ -113,6 +114,7 @@ def test_constructor_and_default_state():
113114

114115

115116
def test_constructor_with_default_options():
117+
mock.sentinel.subscription = str()
116118
flow_control_ = types.FlowControl()
117119
manager = streaming_pull_manager.StreamingPullManager(
118120
mock.sentinel.client,
@@ -128,6 +130,7 @@ def test_constructor_with_default_options():
128130

129131

130132
def test_constructor_with_min_and_max_duration_per_lease_extension_():
133+
mock.sentinel.subscription = str()
131134
flow_control_ = types.FlowControl(
132135
min_duration_per_lease_extension=15, max_duration_per_lease_extension=20
133136
)
@@ -142,6 +145,7 @@ def test_constructor_with_min_and_max_duration_per_lease_extension_():
142145

143146

144147
def test_constructor_with_min_duration_per_lease_extension_too_low():
148+
mock.sentinel.subscription = str()
145149
flow_control_ = types.FlowControl(
146150
min_duration_per_lease_extension=9, max_duration_per_lease_extension=9
147151
)
@@ -156,6 +160,7 @@ def test_constructor_with_min_duration_per_lease_extension_too_low():
156160

157161

158162
def test_constructor_with_max_duration_per_lease_extension_too_high():
163+
mock.sentinel.subscription = str()
159164
flow_control_ = types.FlowControl(
160165
max_duration_per_lease_extension=601, min_duration_per_lease_extension=601
161166
)
@@ -1181,6 +1186,7 @@ def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bi
11811186
initial_request=mock.ANY,
11821187
should_recover=manager._should_recover,
11831188
should_terminate=manager._should_terminate,
1189+
metadata=manager._stream_metadata,
11841190
throttle_reopen=True,
11851191
)
11861192
initial_request_arg = resumable_bidi_rpc.call_args.kwargs["initial_request"]

0 commit comments

Comments
 (0)