@@ -157,7 +157,7 @@ def test_decode_jwt(self):
157
157
'user_claims' : {'foo' : 'bar' },
158
158
}
159
159
encoded_token = jwt .encode (token_data , 'secret' , 'HS256' ).decode ('utf-8' )
160
- data = decode_jwt (encoded_token , 'secret' , 'HS256' , csrf = False )
160
+ data = decode_jwt (encoded_token , 'secret' , 'HS256' , csrf = False , identity_claim = 'identity' )
161
161
self .assertIn ('exp' , data )
162
162
self .assertIn ('iat' , data )
163
163
self .assertIn ('nbf' , data )
@@ -188,7 +188,7 @@ def test_decode_jwt(self):
188
188
'type' : 'refresh' ,
189
189
}
190
190
encoded_token = jwt .encode (token_data , 'secret' , 'HS256' ).decode ('utf-8' )
191
- data = decode_jwt (encoded_token , 'secret' , 'HS256' , csrf = False )
191
+ data = decode_jwt (encoded_token , 'secret' , 'HS256' , csrf = False , identity_claim = 'identity' )
192
192
self .assertIn ('exp' , data )
193
193
self .assertIn ('iat' , data )
194
194
self .assertIn ('nbf' , data )
@@ -210,7 +210,7 @@ def test_decode_invalid_jwt(self):
210
210
'exp' : datetime .utcnow () - timedelta (minutes = 5 ),
211
211
}
212
212
encoded_token = jwt .encode (token_data , 'secret' , 'HS256' ).decode ('utf-8' )
213
- decode_jwt (encoded_token , 'secret' , 'HS256' , csrf = False )
213
+ decode_jwt (encoded_token , 'secret' , 'HS256' , csrf = False , identity_claim = 'identity' )
214
214
215
215
# Missing jti
216
216
with self .assertRaises (JWTDecodeError ):
@@ -220,7 +220,7 @@ def test_decode_invalid_jwt(self):
220
220
'type' : 'refresh'
221
221
}
222
222
encoded_token = jwt .encode (token_data , 'secret' , 'HS256' ).decode ('utf-8' )
223
- decode_jwt (encoded_token , 'secret' , 'HS256' , csrf = False )
223
+ decode_jwt (encoded_token , 'secret' , 'HS256' , csrf = False , identity_claim = 'identity' )
224
224
225
225
# Missing identity
226
226
with self .assertRaises (JWTDecodeError ):
@@ -230,7 +230,17 @@ def test_decode_invalid_jwt(self):
230
230
'type' : 'refresh'
231
231
}
232
232
encoded_token = jwt .encode (token_data , 'secret' , 'HS256' ).decode ('utf-8' )
233
- decode_jwt (encoded_token , 'secret' , 'HS256' , csrf = False )
233
+ decode_jwt (encoded_token , 'secret' , 'HS256' , csrf = False , identity_claim = 'identity' )
234
+
235
+ # Non-matching identity claim
236
+ with self .assertRaises (JWTDecodeError ):
237
+ token_data = {
238
+ 'exp' : datetime .utcnow () + timedelta (minutes = 5 ),
239
+ 'identity' : 'banana' ,
240
+ 'type' : 'refresh'
241
+ }
242
+ encoded_token = jwt .encode (token_data , 'secret' , 'HS256' ).decode ('utf-8' )
243
+ decode_jwt (encoded_token , 'secret' , 'HS256' , csrf = False , identity_claim = 'sub' )
234
244
235
245
# Missing type
236
246
with self .assertRaises (JWTDecodeError ):
@@ -240,7 +250,7 @@ def test_decode_invalid_jwt(self):
240
250
'exp' : datetime .utcnow () + timedelta (minutes = 5 ),
241
251
}
242
252
encoded_token = jwt .encode (token_data , 'secret' , 'HS256' ).decode ('utf-8' )
243
- decode_jwt (encoded_token , 'secret' , 'HS256' , csrf = False )
253
+ decode_jwt (encoded_token , 'secret' , 'HS256' , csrf = False , identity_claim = 'identity' )
244
254
245
255
# Missing fresh in access token
246
256
with self .assertRaises (JWTDecodeError ):
@@ -252,7 +262,7 @@ def test_decode_invalid_jwt(self):
252
262
'user_claims' : {}
253
263
}
254
264
encoded_token = jwt .encode (token_data , 'secret' , 'HS256' ).decode ('utf-8' )
255
- decode_jwt (encoded_token , 'secret' , 'HS256' , csrf = False )
265
+ decode_jwt (encoded_token , 'secret' , 'HS256' , csrf = False , identity_claim = 'identity' )
256
266
257
267
# Missing user claims in access token
258
268
with self .assertRaises (JWTDecodeError ):
@@ -264,7 +274,7 @@ def test_decode_invalid_jwt(self):
264
274
'fresh' : True
265
275
}
266
276
encoded_token = jwt .encode (token_data , 'secret' , 'HS256' ).decode ('utf-8' )
267
- decode_jwt (encoded_token , 'secret' , 'HS256' , csrf = False )
277
+ decode_jwt (encoded_token , 'secret' , 'HS256' , csrf = False , identity_claim = 'identity' )
268
278
269
279
# Bad token type
270
280
with self .assertRaises (JWTDecodeError ):
@@ -277,7 +287,7 @@ def test_decode_invalid_jwt(self):
277
287
'user_claims' : 'banana'
278
288
}
279
289
encoded_token = jwt .encode (token_data , 'secret' , 'HS256' ).decode ('utf-8' )
280
- decode_jwt (encoded_token , 'secret' , 'HS256' , csrf = False )
290
+ decode_jwt (encoded_token , 'secret' , 'HS256' , csrf = False , identity_claim = 'identity' )
281
291
282
292
# Missing csrf in csrf enabled token
283
293
with self .assertRaises (JWTDecodeError ):
@@ -290,7 +300,7 @@ def test_decode_invalid_jwt(self):
290
300
'user_claims' : 'banana'
291
301
}
292
302
encoded_token = jwt .encode (token_data , 'secret' , 'HS256' ).decode ('utf-8' )
293
- decode_jwt (encoded_token , 'secret' , 'HS256' , csrf = True )
303
+ decode_jwt (encoded_token , 'secret' , 'HS256' , csrf = True , identity_claim = 'identity' )
294
304
295
305
def test_create_jwt_with_object (self ):
296
306
# Complex object to test building a JWT from. Normally if you are using
@@ -322,12 +332,15 @@ def user_identity_lookup(user):
322
332
user = TestUser (username = 'foo' , roles = ['bar' , 'baz' ])
323
333
access_token = create_access_token (identity = user )
324
334
refresh_token = create_refresh_token (identity = user )
335
+ identity = 'identity'
325
336
326
337
# Decode the tokens and make sure the values are set properly
327
338
access_token_data = decode_jwt (access_token , app .secret_key ,
328
- app .config ['JWT_ALGORITHM' ], csrf = False )
339
+ app .config ['JWT_ALGORITHM' ], csrf = False ,
340
+ identity_claim = identity )
329
341
refresh_token_data = decode_jwt (refresh_token , app .secret_key ,
330
- app .config ['JWT_ALGORITHM' ], csrf = False )
331
- self .assertEqual (access_token_data ['identity' ], 'foo' )
342
+ app .config ['JWT_ALGORITHM' ], csrf = False ,
343
+ identity_claim = identity )
344
+ self .assertEqual (access_token_data [identity ], 'foo' )
332
345
self .assertEqual (access_token_data ['user_claims' ]['roles' ], ['bar' , 'baz' ])
333
- self .assertEqual (refresh_token_data [' identity' ], 'foo' )
346
+ self .assertEqual (refresh_token_data [identity ], 'foo' )
0 commit comments