-
Notifications
You must be signed in to change notification settings - Fork 372
Nested token Friday night PoC #721
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,82 @@ | ||
| # frozen_string_literal: true | ||
|
|
||
| module JWT | ||
| # Represents an encoded Nested JWT for verification, as defined in RFC 7519 Section 5.2. | ||
| # | ||
| # Unwraps all nesting levels and provides an Enumerable interface over the token layers | ||
| # (outermost to innermost). | ||
| # | ||
| # @example Verifying a Nested JWT | ||
| # nested = JWT::EncodedNestedToken.new(nested_jwt_string) | ||
| # nested.verify!( | ||
| # keys: [ | ||
| # { algorithm: 'RS256', key: rsa_public }, | ||
| # { algorithm: 'HS256', key: 'inner_secret' } | ||
| # ] | ||
| # ) | ||
| # nested.last.payload # => { 'user_id' => 123 } | ||
| # | ||
| # @example Inspecting layers | ||
| # nested = JWT::EncodedNestedToken.new(nested_jwt_string) | ||
| # nested.count # => 2 | ||
| # nested.map(&:header) # => [outer_header, inner_header] | ||
| # | ||
| # @see https://datatracker.ietf.org/doc/html/rfc7519#section-5.2 RFC 7519 Section 5.2 | ||
| class EncodedNestedToken | ||
| include Enumerable | ||
|
|
||
| MAX_DEPTH = 10 | ||
|
|
||
| def initialize(jwt) | ||
| raise ArgumentError, 'Provided JWT must be a String' unless jwt.is_a?(String) | ||
|
|
||
| @tokens = unwrap(jwt) | ||
| end | ||
|
|
||
| def each(&block) | ||
| @tokens.each(&block) | ||
| end | ||
|
|
||
| def last | ||
| @tokens.last | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can access last before the chain has been verified. Maybe we need to protect against that? |
||
| end | ||
|
|
||
| # Verifies signatures at each nesting level and claims on the innermost token. | ||
| # | ||
| # @param keys [Array<Hash>] key configurations ordered outermost to innermost. | ||
| # Each hash should contain :algorithm and :key (or :key_finder). | ||
| # @param claims [Array<Symbol>, Hash, nil] claim verification options for the innermost token. | ||
| # @return [self] | ||
| # @raise [JWT::DecodeError] if key count doesn't match nesting depth. | ||
| # @raise [JWT::VerificationError] if any signature verification fails. | ||
| def verify!(keys:, claims: nil) | ||
| raise JWT::DecodeError, "Expected #{count} key configurations, got #{keys.length}" unless keys.length == count | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we really care, the same key could be used to validate multiple tokens |
||
|
|
||
| each_with_index do |token, index| | ||
| token.verify_signature!(algorithm: keys[index][:algorithm], key: keys[index][:key]) | ||
| end | ||
|
|
||
| last.verify_claims!(*Array(claims).compact) | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do we only verify the last tokens claims? |
||
| self | ||
| end | ||
|
|
||
| private | ||
|
|
||
| def unwrap(jwt) | ||
| tokens = [] | ||
| current = jwt | ||
|
|
||
| loop do | ||
| raise JWT::DecodeError, "Nested JWT exceeds maximum depth of #{MAX_DEPTH}" if tokens.length >= MAX_DEPTH | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Max depth could be an instance variable, so it can be changed if needed |
||
|
|
||
| token = EncodedToken.new(current) | ||
| tokens << token | ||
| break unless token.header['cty']&.upcase == 'JWT' | ||
|
|
||
| current = ::JWT::Base64.url_decode(token.encoded_payload) | ||
| end | ||
|
|
||
| tokens | ||
| end | ||
| end | ||
| end | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,31 @@ | ||
| # frozen_string_literal: true | ||
|
|
||
| module JWT | ||
| # Represents a Nested JWT for creation, as defined in RFC 7519 Section 5.2. | ||
| # | ||
| # A Nested JWT wraps an existing JWT string as the payload of another signed JWT. | ||
| # The payload is base64url-encoded directly (not JSON-encoded). | ||
| # | ||
| # @example Creating a Nested JWT | ||
| # inner_jwt = JWT.encode({ user_id: 123 }, 'inner_secret', 'HS256') | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lets use the class based generations for consistency |
||
| # nested = JWT::NestedToken.new(inner_jwt) | ||
| # nested.sign!(algorithm: 'RS256', key: rsa_private_key) | ||
| # nested.jwt | ||
| # | ||
| # @example Multi-level nesting | ||
| # deeper = JWT::NestedToken.new(nested.jwt) | ||
| # deeper.sign!(algorithm: 'HS384', key: another_key) | ||
| # deeper.jwt | ||
| # | ||
| # @see https://datatracker.ietf.org/doc/html/rfc7519#section-5.2 RFC 7519 Section 5.2 | ||
| class NestedToken < Token | ||
| def initialize(inner_jwt) | ||
| super(payload: inner_jwt, header: { 'cty' => 'JWT' }) | ||
| end | ||
|
|
||
| # Override to skip JSON encoding — payload is already a raw JWT string. | ||
| def encoded_payload | ||
| @encoded_payload ||= ::JWT::Base64.url_encode(payload) | ||
| end | ||
| end | ||
| end | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,207 @@ | ||
| # frozen_string_literal: true | ||
|
|
||
| RSpec.describe JWT::EncodedNestedToken do | ||
| let(:inner_secret) { 'inner_secret_key' } | ||
| let(:outer_secret) { 'outer_secret_key' } | ||
| let(:inner_payload) { { 'user_id' => 123, 'role' => 'admin' } } | ||
|
|
||
| def create_signed_jwt(payload: inner_payload, algorithm: 'HS256', key: inner_secret) | ||
| token = JWT::Token.new(payload: payload) | ||
| token.sign!(algorithm: algorithm, key: key) | ||
| token.jwt | ||
| end | ||
|
|
||
| def create_nested(inner, algorithm:, key:) | ||
| JWT::NestedToken.new(inner).tap { |n| n.sign!(algorithm: algorithm, key: key) }.jwt | ||
| end | ||
|
|
||
| let(:inner_jwt) { create_signed_jwt } | ||
|
|
||
| describe 'Enumerable interface' do | ||
| let(:nested_jwt) { create_nested(inner_jwt, algorithm: 'HS256', key: outer_secret) } | ||
|
|
||
| it 'has the correct number of tokens' do | ||
| nested = described_class.new(nested_jwt) | ||
| expect(nested.count).to eq(2) | ||
| end | ||
|
|
||
| it 'orders tokens from outermost to innermost' do | ||
| nested = described_class.new(nested_jwt) | ||
| headers = nested.map(&:header) | ||
|
|
||
| expect(headers.first['cty']).to eq('JWT') | ||
| expect(headers.last).not_to have_key('cty') | ||
| end | ||
|
|
||
| it 'returns a single token for a non-nested JWT' do | ||
| nested = described_class.new(inner_jwt) | ||
| expect(nested.count).to eq(1) | ||
| end | ||
|
|
||
| it 'supports three nesting levels' do | ||
| level2 = create_nested(inner_jwt, algorithm: 'HS256', key: 'key2') | ||
| level3 = create_nested(level2, algorithm: 'HS384', key: 'key3') | ||
|
|
||
| nested = described_class.new(level3) | ||
| expect(nested.count).to eq(3) | ||
|
|
||
| algorithms = nested.map { |t| t.header['alg'] } | ||
| expect(algorithms).to eq(%w[HS384 HS256 HS256]) | ||
| end | ||
| end | ||
|
|
||
| describe '#last' do | ||
| it 'returns the innermost token' do | ||
| nested_jwt = create_nested(inner_jwt, algorithm: 'HS256', key: outer_secret) | ||
| nested = described_class.new(nested_jwt) | ||
|
|
||
| expect(nested.last.unverified_payload).to eq(inner_payload) | ||
| end | ||
| end | ||
|
|
||
| describe '#verify!' do | ||
| let(:nested_jwt) { create_nested(inner_jwt, algorithm: 'HS256', key: outer_secret) } | ||
|
|
||
| it 'verifies signatures and returns self' do | ||
| nested = described_class.new(nested_jwt) | ||
| result = nested.verify!( | ||
| keys: [ | ||
| { algorithm: 'HS256', key: outer_secret }, | ||
| { algorithm: 'HS256', key: inner_secret } | ||
| ] | ||
| ) | ||
|
|
||
| expect(result).to eq(nested) | ||
| end | ||
|
|
||
| it 'allows accessing innermost payload after verification' do | ||
| nested = described_class.new(nested_jwt) | ||
| nested.verify!( | ||
| keys: [ | ||
| { algorithm: 'HS256', key: outer_secret }, | ||
| { algorithm: 'HS256', key: inner_secret } | ||
| ] | ||
| ) | ||
|
|
||
| expect(nested.last.payload).to eq(inner_payload) | ||
| end | ||
|
|
||
| it 'raises VerificationError for invalid outer signature' do | ||
| nested = described_class.new(nested_jwt) | ||
|
|
||
| expect do | ||
| nested.verify!( | ||
| keys: [ | ||
| { algorithm: 'HS256', key: 'wrong_key' }, | ||
| { algorithm: 'HS256', key: inner_secret } | ||
| ] | ||
| ) | ||
| end.to raise_error(JWT::VerificationError, 'Signature verification failed') | ||
| end | ||
|
|
||
| it 'raises VerificationError for invalid inner signature' do | ||
| nested = described_class.new(nested_jwt) | ||
|
|
||
| expect do | ||
| nested.verify!( | ||
| keys: [ | ||
| { algorithm: 'HS256', key: outer_secret }, | ||
| { algorithm: 'HS256', key: 'wrong_key' } | ||
| ] | ||
| ) | ||
| end.to raise_error(JWT::VerificationError, 'Signature verification failed') | ||
| end | ||
|
|
||
| it 'raises DecodeError when key count does not match nesting depth' do | ||
| nested = described_class.new(nested_jwt) | ||
|
|
||
| expect do | ||
| nested.verify!(keys: [{ algorithm: 'HS256', key: outer_secret }]) | ||
| end.to raise_error(JWT::DecodeError, 'Expected 2 key configurations, got 1') | ||
| end | ||
|
|
||
| it 'handles case-insensitive cty header' do | ||
| signer = JWT::JWA.create_signer(algorithm: 'HS256', key: outer_secret) | ||
| header = { 'cty' => 'jwt' }.merge(signer.jwa.header) { |_k, old, _new| old } | ||
| encoded_header = JWT::Base64.url_encode(JWT::JSON.generate(header)) | ||
| encoded_payload = JWT::Base64.url_encode(inner_jwt) | ||
| signature = signer.sign(data: "#{encoded_header}.#{encoded_payload}") | ||
| lowercase_nested = "#{encoded_header}.#{encoded_payload}.#{JWT::Base64.url_encode(signature)}" | ||
|
|
||
| nested = described_class.new(lowercase_nested) | ||
| nested.verify!( | ||
| keys: [ | ||
| { algorithm: 'HS256', key: outer_secret }, | ||
| { algorithm: 'HS256', key: inner_secret } | ||
| ] | ||
| ) | ||
|
|
||
| expect(nested.last.payload).to eq(inner_payload) | ||
| end | ||
|
|
||
| context 'with different algorithms at each level' do | ||
| let(:rsa_private) { test_pkey('rsa-2048-private.pem') } | ||
| let(:rsa_public) { rsa_private.public_key } | ||
|
|
||
| it 'supports HS256 inner with RS256 outer' do | ||
| nested_jwt = create_nested(inner_jwt, algorithm: 'RS256', key: rsa_private) | ||
| nested = described_class.new(nested_jwt) | ||
|
|
||
| nested.verify!( | ||
| keys: [ | ||
| { algorithm: 'RS256', key: rsa_public }, | ||
| { algorithm: 'HS256', key: inner_secret } | ||
| ] | ||
| ) | ||
|
|
||
| expect(nested.last.payload).to eq(inner_payload) | ||
| end | ||
|
|
||
| it 'supports RS256 inner with HS256 outer' do | ||
| rsa_inner_jwt = create_signed_jwt(algorithm: 'RS256', key: rsa_private) | ||
| nested_jwt = create_nested(rsa_inner_jwt, algorithm: 'HS256', key: outer_secret) | ||
| nested = described_class.new(nested_jwt) | ||
|
|
||
| nested.verify!( | ||
| keys: [ | ||
| { algorithm: 'HS256', key: outer_secret }, | ||
| { algorithm: 'RS256', key: rsa_public } | ||
| ] | ||
| ) | ||
|
|
||
| expect(nested.last.payload).to eq(inner_payload) | ||
| end | ||
| end | ||
|
|
||
| context 'with multiple nesting levels' do | ||
| it 'verifies all levels' do | ||
| level2 = create_nested(inner_jwt, algorithm: 'HS384', key: 'key2') | ||
| level3 = create_nested(level2, algorithm: 'HS512', key: 'key3') | ||
|
|
||
| nested = described_class.new(level3) | ||
| nested.verify!( | ||
| keys: [ | ||
| { algorithm: 'HS512', key: 'key3' }, | ||
| { algorithm: 'HS384', key: 'key2' }, | ||
| { algorithm: 'HS256', key: inner_secret } | ||
| ] | ||
| ) | ||
|
|
||
| expect(nested.last.payload).to eq(inner_payload) | ||
| end | ||
| end | ||
| end | ||
|
|
||
| describe 'max depth protection' do | ||
| it 'raises DecodeError when nesting exceeds MAX_DEPTH' do | ||
| current = inner_jwt | ||
| (described_class::MAX_DEPTH + 1).times do |i| | ||
| current = create_nested(current, algorithm: 'HS256', key: "key_#{i}") | ||
| end | ||
|
|
||
| expect do | ||
| described_class.new(current) | ||
| end.to raise_error(JWT::DecodeError, /exceeds maximum depth/) | ||
| end | ||
| end | ||
| end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we make it unwrap lazy when tokens are acessed?