17from collections
import defaultdict
18from typing
import Dict
23 Core implementation of the authorization server. The API is
24 inheritance-based,
with entry points at
do_GET()
and do_POST(). See the
25 documentation
for BaseHTTPRequestHandler.
28 JsonObject = Dict[str, object]
32 Switches the behavior of the provider depending on the issuer URI.
35 self.path.startswith("/alternate/")
36 or self.
path ==
"/.well-known/oauth-authorization-server/alternate"
44 if self.
path.startswith(
"/.well-known/"):
53 Checks the expected value of the Authorization header, if any.
55 secret = self._get_param("expected_secret",
None)
59 assert "Authorization" in self.headers
60 method, creds = self.headers[
"Authorization"].split()
63 raise RuntimeError(f
"client used {method} auth; expected Basic")
67 username = urllib.parse.quote_plus(self.
client_id, safe=
"~")
68 password = urllib.parse.quote_plus(secret, safe=
"~")
69 expected_creds = f
"{username}:{password}"
71 if creds.encode() != base64.b64encode(expected_creds.encode()):
73 f
"client sent '{creds}'; expected b64encode('{expected_creds}')"
80 config_path =
"/.well-known/openid-configuration"
82 config_path =
"/.well-known/oauth-authorization-server"
84 if self.
path == config_path:
87 self.send_error(404,
"Not Found")
94 Parses apart the form-urlencoded request body and returns the resulting
97 size = int(self.headers["Content-Length"])
98 form = self.rfile.
read(size)
100 assert self.headers[
"Content-Type"] ==
"application/x-www-form-urlencoded"
101 return urllib.parse.parse_qs(
102 form.decode(
"utf-8"),
104 keep_blank_values=
True,
112 Returns the client_id sent in the POST body
or the Authorization header.
115 if "client_id" in self.
_params:
116 return self.
_params[
"client_id"][0]
118 if "Authorization" not in self.headers:
119 raise RuntimeError(
"client did not send any client_id")
121 _, creds = self.headers[
"Authorization"].split()
123 decoded = base64.b64decode(creds).decode(
"utf-8")
124 username, _ = decoded.split(
":", 1)
126 return urllib.parse.unquote_plus(username)
141 if self.
path ==
"/authorize":
143 elif self.
path ==
"/token":
153 Returns True if the client has requested a modification to this stage of
156 if not hasattr(self,
"_test_params"):
165 and self.
path ==
"/.well-known/openid-configuration"
167 or (stage ==
"device" and self.
path ==
"/authorize")
168 or (stage ==
"token" and self.
path ==
"/token")
173 If the client has requested a modification to this stage (see
174 _should_modify()), this method searches the provided test parameters for
175 a key of the given name,
and returns it
if found. Otherwise the provided
186 Returns "application/json" unless the test has requested something
189 return self.
_get_param(
"content_type",
"application/json")
194 Returns 0 unless the test has requested something different.
201 Returns "authorization_pending" unless the test has requested something
204 return self.
_get_param(
"retry_code",
"authorization_pending")
209 Returns "verification_uri" unless the test has requested something
212 return self.
_get_param(
"uri_spelling",
"verification_uri")
217 Returns a dict with any additional entries that should be folded into a
218 JSON response,
as determined by test parameters provided by the client:
220 - huge_response:
if set to
True, the dict will contain a gigantic string
223 - nested_array:
if set to nonzero, the dict will contain a deeply nested
224 array so that the top-level object has the given depth
226 - nested_object:
if set to nonzero, the dict will contain a deeply
227 nested JSON object so that the top-level object has the given depth
232 ret[
"_pad_"] =
"x" * 1024 * 1024
236 ret[
"_arr_"] = functools.reduce(
lambda x, _: [x],
range(depth))
240 ret[
"_obj_"] = functools.reduce(
lambda x, _: {
"": x},
range(depth))
247 The actual Bearer token sent back to the client on success. Tests may
248 override this with the
"token" test parameter.
251 if token
is not None:
262 Trims the response JSON, if necessary,
and logs it
for later debugging.
271 js[
"_pad_"] = pad[:64] + f
"[...truncated from {len(pad)} bytes]"
273 resp = json.dumps(js).encode(
"ascii")
274 self.log_message(
"sending JSON response: %s", resp)
278 assert len(resp) < 1024,
"_log_response must be adjusted for new JSON"
282 Sends the provided JSON dict as an application/json response.
285 resp = json.dumps(js).encode("ascii")
290 self.send_header(
"Content-Length",
str(
len(resp)))
293 self.wfile.
write(resp)
296 port = self.server.socket.getsockname()[1]
298 issuer = f
"http://127.0.0.1:{port}"
300 issuer +=
"/alternate"
306 "token_endpoint": issuer +
"/token",
307 "device_authorization_endpoint": issuer +
"/authorize",
308 "response_types_supported": [
"token"],
309 "subject_types_supported": [
"public"],
310 "id_token_signing_alg_values_supported": [
"RS256"],
311 "grant_types_supported": [
312 "authorization_code",
313 "urn:ietf:params:oauth:grant-type:device_code",
320 A cached _TokenState object for the connected client (
as determined by
321 the request
's client_id), or a new one if it doesn't already exist.
323 This relies on the existence of a defaultdict attached to the server;
326 return self.server.token_state[self.
client_id]
330 Removes any cached _TokenState for the current client_id. Call this
331 after the token exchange ends to get rid of unnecessary state.
333 if self.
client_id in self.server.token_state:
334 del self.server.token_state[self.
client_id]
337 uri =
"https://example.com/"
339 uri =
"https://example.org/"
342 "device_code":
"postgres",
343 "user_code":
"postgresuser",
350 if interval
is not None:
351 resp[
"interval"] = interval
358 assert self.
_params[
"scope"][0],
"empty scopes should be omitted"
367 resp = {
"error": err}
371 resp[
"error_description"] = desc
379 now = time.monotonic()
384 ), f
"client waited only {delay} seconds between token requests (expected {self._token_state.min_delay})"
401 "token_type":
"bearer",
408 Starts the authorization server on localhost. The ephemeral port in use will
409 be printed to stdout.
412 s = http.server.HTTPServer(("127.0.0.1", 0), OAuthHandler)
422 s.token_state = defaultdict(_TokenState)
426 port = s.socket.getsockname()[1]
430 stdout = sys.stdout.fileno()
437if __name__ ==
"__main__":
void print(const void *obj)
Dict[str, str] _parse_params(self)
JsonObject authorization(self)
None _send_json(self, JsonObject js)
None _log_response(self, JsonObject js)
def _remove_token_state(self)
def _get_param(self, name, default)
def _response_padding(self)
bool _should_modify(self)
static struct cvec * range(struct vars *v, chr a, chr b, int cases)