diff --git a/.typos.toml b/.typos.toml index bf6e17ec9b..f310651c39 100644 --- a/.typos.toml +++ b/.typos.toml @@ -142,4 +142,6 @@ extend-exclude = [ "go.work", # Benchmark dashboard frontend build output "core/bench/dashboard/frontend/dist", + # WireMock test fixtures with base64-encoded JWT data + "**/wiremock/__files/*.json", ] diff --git a/Cargo.lock b/Cargo.lock index cb11aa7ffb..c7416b0664 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -436,9 +436,9 @@ checksum = "c3d036a3c4ab069c7b410a2ce876bd74808d2d0888a82667669f8e783a898bf1" [[package]] name = "arc-swap" -version = "1.9.0" +version = "1.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a07d1f37ff60921c83bdfc7407723bdefe89b44b98a9b772f225c8f9d67141a6" +checksum = "6a3a1fd6f75306b68087b831f025c712524bcb19aad54e557b1129cfa0a2b207" dependencies = [ "rustversion", ] @@ -589,7 +589,7 @@ dependencies = [ "arrow-schema", "chrono", "half", - "indexmap 2.13.1", + "indexmap 2.14.0", "itoa", "lexical-core", "memchr", @@ -704,6 +704,16 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "assert-json-diff" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e4f2b81832e72834d7518d8487a0396a28cc408186a2e8854c0f98011faf12" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "assert_cmd" version = "2.2.0" @@ -1554,7 +1564,7 @@ dependencies = [ "getrandom 0.2.17", "getrandom 0.3.4", "hex", - "indexmap 2.13.1", + "indexmap 2.14.0", "js-sys", "once_cell", "rand 0.9.2", @@ -2148,9 +2158,9 @@ dependencies = [ [[package]] name = "compio-tls" -version = "0.9.0" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4e462f3f836226cc293795c87d8e7df783ca7f88811e433ee79a9a2eace0b253" +checksum = "3a7056da226af42cda4c83b00a021cce3e1ee5f4cffc8a0ff8801381e618cf1c" dependencies = [ "compio-buf", "compio-io", @@ -2161,9 +2171,9 @@ dependencies = [ [[package]] name = "compio-ws" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32b0174d0a3da33ac73efddbe62a3fb046a9bc3a58124b2f8c1d2e0354e54222" +checksum = "99d45f47c6e64babcaa6b8df1dffced56012e60e58401255e679f428ddbe9fb6" dependencies = [ "compio-buf", "compio-io", @@ -2927,6 +2937,24 @@ dependencies = [ "zeroize", ] +[[package]] +name = "deadpool" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0be2b1d1d6ec8d846f05e137292d0b89133caf95ef33695424c09568bdd39b1b" +dependencies = [ + "deadpool-runtime", + "lazy_static", + "num_cpus", + "tokio", +] + +[[package]] +name = "deadpool-runtime" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "092966b41edc516079bdf31ec78a2e0588d1d0c08f78b91d8307215928642b2b" + [[package]] name = "debugid" version = "0.8.0" @@ -2957,7 +2985,7 @@ dependencies = [ "deno_path_util", "deno_unsync", "futures", - "indexmap 2.13.1", + "indexmap 2.14.0", "libc", "parking_lot", "percent-encoding", @@ -3012,7 +3040,7 @@ version = "0.227.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1bab1eaf578a8cc0ae6fb933e91dc3388b41df22e5974d5891c17ba66b3a0bbb" dependencies = [ - "indexmap 2.13.1", + "indexmap 2.14.0", "proc-macro-rules", "proc-macro2", "quote", @@ -3715,11 +3743,11 @@ dependencies = [ [[package]] name = "fastrand" -version = "2.3.0" +version = "2.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" +checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6" dependencies = [ - "getrandom 0.2.17", + "getrandom 0.3.4", ] [[package]] @@ -4248,9 +4276,9 @@ dependencies = [ [[package]] name = "gif" -version = "0.14.1" +version = "0.14.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f5df2ba84018d80c213569363bdcd0c64e6933c67fe4c1d60ecf822971a3c35e" +checksum = "ee8cfcc411d9adbbaba82fb72661cc1bcca13e8bba98b364e62b2dba8f960159" dependencies = [ "color_quant", "weezl", @@ -4719,7 +4747,7 @@ dependencies = [ "futures-sink", "futures-util", "http 0.2.12", - "indexmap 2.13.1", + "indexmap 2.14.0", "slab", "tokio", "tokio-util", @@ -4738,7 +4766,7 @@ dependencies = [ "futures-core", "futures-sink", "http 1.4.0", - "indexmap 2.13.1", + "indexmap 2.14.0", "slab", "tokio", "tokio-util", @@ -4844,6 +4872,12 @@ dependencies = [ "foldhash 0.2.0", ] +[[package]] +name = "hashbrown" +version = "0.17.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f467dd6dccf739c208452f8014c75c18bb8301b050ad1cfb27153803edb0f51" + [[package]] name = "hashlink" version = "0.10.0" @@ -5434,7 +5468,7 @@ checksum = "cd62e6b5e86ea8eeeb8db1de02880a6abc01a397b2ebb64b5d74ac255318f5cb" [[package]] name = "iggy" -version = "0.10.0" +version = "0.10.1" dependencies = [ "async-broadcast", "async-dropper", @@ -5517,7 +5551,7 @@ dependencies = [ "tracing-subscriber", "uuid", "walkdir", - "zip 8.5.0", + "zip 8.5.1", ] [[package]] @@ -6006,7 +6040,7 @@ dependencies = [ "byteorder-lite", "color_quant", "exr", - "gif 0.14.1", + "gif 0.14.2", "image-webp", "moxcms", "num-traits", @@ -6055,7 +6089,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1689b939ee35e3a075b0834b5672efd43aec8a6e81a1c6002b76a5ca2f211ae0" dependencies = [ "implicit-clone-derive", - "indexmap 2.13.1", + "indexmap 2.14.0", ] [[package]] @@ -6081,12 +6115,12 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.13.1" +version = "2.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "45a8a2b9cb3e0b0c1803dbb0758ffac5de2f425b23c28f518faabd9d805342ff" +checksum = "d466e9454f08e4a911e14806c24e16fba1b4c121d1ea474396f396069cf949d9" dependencies = [ "equivalent", - "hashbrown 0.16.1", + "hashbrown 0.17.0", "serde", "serde_core", ] @@ -6160,6 +6194,7 @@ dependencies = [ "iggy_binary_protocol", "iggy_common", "iggy_connector_sdk", + "jsonwebtoken", "keyring", "lazy_static", "libc", @@ -6189,7 +6224,8 @@ dependencies = [ "tracing-subscriber", "twox-hash", "uuid", - "zip 8.5.0", + "wiremock", + "zip 8.5.1", ] [[package]] @@ -6690,9 +6726,9 @@ dependencies = [ [[package]] name = "liblzma-sys" -version = "0.4.5" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9f2db66f3268487b5033077f266da6777d057949b8f93c8ad82e441df25e6186" +checksum = "1a60851d15cd8c5346eca4ab8babff585be2ae4bc8097c067291d3ffe2add3b6" dependencies = [ "cc", "libc", @@ -6717,14 +6753,14 @@ dependencies = [ [[package]] name = "libredox" -version = "0.1.15" +version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ddbf48fd451246b1f8c2610bd3b4ac0cc6e149d89832867093ab69a17194f08" +checksum = "e02f3bb43d335493c96bf3fd3a321600bf6bd07ed34bc64118e9293bdffea46c" dependencies = [ "bitflags 2.11.0", "libc", "plain", - "redox_syscall 0.7.3", + "redox_syscall 0.7.4", ] [[package]] @@ -6739,9 +6775,9 @@ dependencies = [ [[package]] name = "libz-sys" -version = "1.1.25" +version = "1.1.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d52f4c29e2a68ac30c9087e1b772dc9f44a2b66ed44edf2266cf2be9b03dafc1" +checksum = "fc3a226e576f50782b3305c5ccf458698f92798987f551c6a02efe8276721e22" dependencies = [ "cc", "libc", @@ -7708,9 +7744,9 @@ checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" [[package]] name = "openssl-src" -version = "300.5.5+3.5.5" +version = "300.6.0+3.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3f1787d533e03597a7934fd0a765f0d28e94ecc5fb7789f8053b1e699a56f709" +checksum = "a8e8cbfd3a4a8c8f089147fd7aaa33cf8c7450c4d09f8f80698a0cf093abeff4" dependencies = [ "cc", ] @@ -8980,9 +9016,9 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.7.3" +version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ce70a74e890531977d37e532c34d45e9055d2409ed08ddba14529471ed0be16" +checksum = "f450ad9c3b1da563fb6948a8e0fb0fb9269711c9c73d9ea1de5058c79c8d643a" dependencies = [ "bitflags 2.11.0", ] @@ -9840,9 +9876,9 @@ dependencies = [ [[package]] name = "semver" -version = "1.0.27" +version = "1.0.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d767eb0aabc880b29956c35734170f26ed551a859dbd361d140cdbeca61ab1e2" +checksum = "8a7852d02fc848982e0c167ef163aaff9cd91dc640ba85e263cb1ce46fae51cd" dependencies = [ "serde", "serde_core", @@ -9940,7 +9976,7 @@ version = "1.0.149" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" dependencies = [ - "indexmap 2.13.1", + "indexmap 2.14.0", "itoa", "memchr", "serde", @@ -10024,7 +10060,7 @@ dependencies = [ "chrono", "hex", "indexmap 1.9.3", - "indexmap 2.13.1", + "indexmap 2.14.0", "schemars 0.9.0", "schemars 1.2.1", "serde_core", @@ -10051,7 +10087,7 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b4db627b98b36d4203a7b458cf3573730f2bb591b28871d916dfa9efabfd41f" dependencies = [ - "indexmap 2.13.1", + "indexmap 2.14.0", "itoa", "ryu", "serde", @@ -10139,6 +10175,7 @@ dependencies = [ "secrecy", "send_wrapper", "serde", + "serde_json", "slab", "socket2 0.6.3", "strum 0.28.0", @@ -10564,7 +10601,7 @@ dependencies = [ "futures-util", "hashbrown 0.15.5", "hashlink", - "indexmap 2.13.1", + "indexmap 2.14.0", "log", "memchr", "once_cell", @@ -11333,9 +11370,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.51.0" +version = "1.51.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2bd1c4c0fc4a7ab90fc15ef6daaa3ec3b893f004f915f2392557ed23237820cd" +checksum = "f66bf9585cda4b724d3e78ab34b73fb2bbaba9011b9bfdf69dc836382ea13b8c" dependencies = [ "bytes", "libc", @@ -11445,7 +11482,7 @@ version = "1.1.2+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81f3d15e84cbcd896376e6730314d59fb5a87f31e4b038454184435cd57defee" dependencies = [ - "indexmap 2.13.1", + "indexmap 2.14.0", "serde_core", "serde_spanned 1.1.1", "toml_datetime 1.1.1+spec-1.1.0", @@ -11478,7 +11515,7 @@ version = "0.19.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" dependencies = [ - "indexmap 2.13.1", + "indexmap 2.14.0", "toml_datetime 0.6.11", "winnow 0.5.40", ] @@ -11489,7 +11526,7 @@ version = "0.22.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "41fe8c660ae4257887cf66394862d21dbca4a6ddd26f04a3560410406a2f819a" dependencies = [ - "indexmap 2.13.1", + "indexmap 2.14.0", "serde", "serde_spanned 0.6.9", "toml_datetime 0.6.11", @@ -11579,7 +11616,7 @@ checksum = "ebe5ef63511595f1344e2d5cfa636d973292adc0eec1f0ad45fae9f0851ab1d4" dependencies = [ "futures-core", "futures-util", - "indexmap 2.13.1", + "indexmap 2.14.0", "pin-project-lite", "slab", "sync_wrapper", @@ -12397,7 +12434,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909" dependencies = [ "anyhow", - "indexmap 2.13.1", + "indexmap 2.14.0", "wasm-encoder", "wasmparser", ] @@ -12446,7 +12483,7 @@ checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" dependencies = [ "bitflags 2.11.0", "hashbrown 0.15.5", - "indexmap 2.13.1", + "indexmap 2.14.0", "semver", ] @@ -13114,6 +13151,29 @@ version = "0.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d135d17ab770252ad95e9a872d365cf3090e3be864a34ab46f48555993efc904" +[[package]] +name = "wiremock" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08db1edfb05d9b3c1542e521aea074442088292f00b5f28e435c714a98f85031" +dependencies = [ + "assert-json-diff", + "base64 0.22.1", + "deadpool", + "futures", + "http 1.4.0", + "http-body-util", + "hyper", + "hyper-util", + "log", + "once_cell", + "regex", + "serde", + "serde_json", + "tokio", + "url", +] + [[package]] name = "wit-bindgen" version = "0.51.0" @@ -13142,7 +13202,7 @@ checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21" dependencies = [ "anyhow", "heck", - "indexmap 2.13.1", + "indexmap 2.14.0", "prettyplease", "syn 2.0.117", "wasm-metadata", @@ -13173,7 +13233,7 @@ checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" dependencies = [ "anyhow", "bitflags 2.11.0", - "indexmap 2.13.1", + "indexmap 2.14.0", "log", "serde", "serde_derive", @@ -13192,7 +13252,7 @@ checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736" dependencies = [ "anyhow", "id-arena", - "indexmap 2.13.1", + "indexmap 2.14.0", "log", "semver", "serde", @@ -13282,7 +13342,7 @@ dependencies = [ "futures", "gloo 0.11.0", "implicit-clone", - "indexmap 2.13.1", + "indexmap 2.14.0", "js-sys", "rustversion", "serde", @@ -13472,13 +13532,13 @@ dependencies = [ [[package]] name = "zip" -version = "8.5.0" +version = "8.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2726508a48f38dceb22b35ecbbd2430efe34ff05c62bd3285f965d7911b33464" +checksum = "dcab981e19633ebcf0b001ddd37dd802996098bc1864f90b7c5d970ce76c1d59" dependencies = [ "crc32fast", "flate2", - "indexmap 2.13.1", + "indexmap 2.14.0", "memchr", "typed-path", "zopfli", diff --git a/Cargo.toml b/Cargo.toml index 6539453b29..320c827eb4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -166,7 +166,7 @@ hwlocality = "1.0.0-alpha.12" iceberg = "0.9.0" iceberg-catalog-rest = "0.9.0" iceberg-storage-opendal = "0.9.0" -iggy = { path = "core/sdk", version = "0.10.0" } +iggy = { path = "core/sdk", version = "0.10.1" } iggy-cli = { path = "core/cli", version = "0.13.0" } iggy_binary_protocol = { path = "core/binary_protocol", version = "0.10.0" } iggy_common = { path = "core/common", version = "0.10.0" } @@ -293,6 +293,7 @@ trait-variant = "0.1.2" tungstenite = "0.29.0" twox-hash = { version = "2.1.2", features = ["xxhash32"] } ulid = "1.2.1" +ureq = "2.10" uuid = { version = "1.23.0", features = ["v4", "v7", "fast-rng", "serde", "zerocopy"] } vergen-git2 = { version = "9.1.0", features = ["build", "cargo", "rustc", "si"] } walkdir = "2.5.0" diff --git a/DEPENDENCIES.md b/DEPENDENCIES.md index 5c5b78edf7..739266563e 100644 --- a/DEPENDENCIES.md +++ b/DEPENDENCIES.md @@ -33,7 +33,7 @@ anstyle-wincon: 3.0.11, "Apache-2.0 OR MIT", anyhow: 1.0.102, "Apache-2.0 OR MIT", apache-avro: 0.21.0, "Apache-2.0", arbitrary: 1.4.2, "Apache-2.0 OR MIT", -arc-swap: 1.9.0, "Apache-2.0 OR MIT", +arc-swap: 1.9.1, "Apache-2.0 OR MIT", arg_enum_proc_macro: 0.3.4, "MIT", argon2: 0.5.3, "Apache-2.0 OR MIT", array-init: 2.1.0, "Apache-2.0 OR MIT", @@ -55,6 +55,7 @@ as-slice: 0.2.1, "Apache-2.0 OR MIT", asn1-rs: 0.7.1, "Apache-2.0 OR MIT", asn1-rs-derive: 0.6.0, "Apache-2.0 OR MIT", asn1-rs-impl: 0.2.0, "Apache-2.0 OR MIT", +assert-json-diff: 2.0.2, "MIT", assert_cmd: 2.2.0, "Apache-2.0 OR MIT", astral-tokio-tar: 0.6.0, "Apache-2.0 OR MIT", async-broadcast: 0.7.2, "Apache-2.0 OR MIT", @@ -180,8 +181,8 @@ compio-macros: 0.1.2, "MIT", compio-net: 0.11.1, "MIT", compio-quic: 0.7.2, "MIT", compio-runtime: 0.11.0, "MIT", -compio-tls: 0.9.0, "MIT", -compio-ws: 0.3.0, "MIT", +compio-tls: 0.9.1, "MIT", +compio-ws: 0.3.1, "MIT", compression-codecs: 0.4.37, "Apache-2.0 OR MIT", compression-core: 0.4.31, "Apache-2.0 OR MIT", concurrent-queue: 2.5.0, "Apache-2.0 OR MIT", @@ -252,6 +253,8 @@ data-encoding: 2.10.0, "MIT", data-url: 0.3.2, "Apache-2.0 OR MIT", dbus: 0.9.10, "Apache-2.0 OR MIT", dbus-secret-service: 4.1.0, "Apache-2.0 OR MIT", +deadpool: 0.12.3, "Apache-2.0 OR MIT", +deadpool-runtime: 0.1.4, "Apache-2.0 OR MIT", debugid: 0.8.0, "Apache-2.0", deno_core: 0.351.0, "MIT", deno_core_icudata: 0.74.0, "MIT", @@ -325,7 +328,7 @@ ext-trait-proc_macros: 1.0.1, "Apache-2.0 OR MIT OR Zlib", extension-traits: 1.0.1, "Apache-2.0 OR MIT OR Zlib", fastbloom: 0.14.1, "Apache-2.0 OR MIT", fastnum: 0.7.4, "Apache-2.0 OR MIT", -fastrand: 2.3.0, "Apache-2.0 OR MIT", +fastrand: 2.4.1, "Apache-2.0 OR MIT", fax: 0.2.6, "MIT", fax_derive: 0.2.0, "MIT", fdeflate: 0.3.7, "Apache-2.0 OR MIT", @@ -379,7 +382,7 @@ getrandom: 0.4.2, "Apache-2.0 OR MIT", ghash: 0.5.1, "Apache-2.0 OR MIT", gherkin: 0.15.0, "Apache-2.0 OR MIT", gif: 0.13.3, "Apache-2.0 OR MIT", -gif: 0.14.1, "Apache-2.0 OR MIT", +gif: 0.14.2, "Apache-2.0 OR MIT", git2: 0.20.4, "Apache-2.0 OR MIT", glob: 0.3.3, "Apache-2.0 OR MIT", globset: 0.4.18, "MIT OR Unlicense", @@ -425,6 +428,7 @@ hashbrown: 0.12.3, "Apache-2.0 OR MIT", hashbrown: 0.14.5, "Apache-2.0 OR MIT", hashbrown: 0.15.5, "Apache-2.0 OR MIT", hashbrown: 0.16.1, "Apache-2.0 OR MIT", +hashbrown: 0.17.0, "Apache-2.0 OR MIT", hashlink: 0.10.0, "Apache-2.0 OR MIT", heapless: 0.7.17, "Apache-2.0 OR MIT", heck: 0.5.0, "Apache-2.0 OR MIT", @@ -471,7 +475,7 @@ ident_case: 1.0.1, "Apache-2.0 OR MIT", idna: 1.1.0, "Apache-2.0 OR MIT", idna_adapter: 1.2.1, "Apache-2.0 OR MIT", if_chain: 1.0.3, "Apache-2.0 OR MIT", -iggy: 0.10.0, "Apache-2.0", +iggy: 0.10.1, "Apache-2.0", iggy-bench: 0.5.0, "Apache-2.0", iggy-bench-dashboard-server: 0.7.0, "Apache-2.0", iggy-cli: 0.13.0, "Apache-2.0", @@ -502,7 +506,7 @@ impl-more: 0.1.9, "Apache-2.0 OR MIT", implicit-clone: 0.6.0, "Apache-2.0 OR MIT", implicit-clone-derive: 0.1.2, "Apache-2.0 OR MIT", indexmap: 1.9.3, "Apache-2.0 OR MIT", -indexmap: 2.13.1, "Apache-2.0 OR MIT", +indexmap: 2.14.0, "Apache-2.0 OR MIT", inflections: 1.1.1, "MIT", inlinable_string: 0.1.15, "Apache-2.0 OR MIT", inotify: 0.11.1, "ISC", @@ -560,12 +564,12 @@ libfuzzer-sys: 0.4.12, "(Apache-2.0 OR MIT) AND NCSA", libgit2-sys: 0.18.3+1.9.2, "Apache-2.0 OR MIT", libloading: 0.8.9, "ISC", liblzma: 0.4.6, "Apache-2.0 OR MIT", -liblzma-sys: 0.4.5, "Apache-2.0 OR MIT", +liblzma-sys: 0.4.6, "Apache-2.0 OR MIT", libm: 0.2.16, "MIT", libmimalloc-sys: 0.1.44, "MIT", -libredox: 0.1.15, "MIT", +libredox: 0.1.16, "MIT", libsqlite3-sys: 0.30.1, "MIT", -libz-sys: 1.1.25, "Apache-2.0 OR MIT", +libz-sys: 1.1.28, "Apache-2.0 OR MIT", linked-hash-map: 0.5.6, "Apache-2.0 OR MIT", linux-raw-sys: 0.4.15, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR MIT", linux-raw-sys: 0.12.1, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR MIT", @@ -657,7 +661,7 @@ opendal: 0.55.0, "Apache-2.0", openssl: 0.10.76, "Apache-2.0", openssl-macros: 0.1.1, "Apache-2.0 OR MIT", openssl-probe: 0.2.1, "Apache-2.0 OR MIT", -openssl-src: 300.5.5+3.5.5, "Apache-2.0 OR MIT", +openssl-src: 300.6.0+3.6.2, "Apache-2.0 OR MIT", openssl-sys: 0.9.112, "MIT", opentelemetry: 0.31.0, "Apache-2.0", opentelemetry-appender-tracing: 0.31.1, "Apache-2.0", @@ -776,7 +780,7 @@ rayon: 1.11.0, "Apache-2.0 OR MIT", rayon-core: 1.13.0, "Apache-2.0 OR MIT", rcgen: 0.14.7, "Apache-2.0 OR MIT", redox_syscall: 0.5.18, "MIT", -redox_syscall: 0.7.3, "MIT", +redox_syscall: 0.7.4, "MIT", redox_users: 0.5.2, "MIT", ref-cast: 1.0.25, "Apache-2.0 OR MIT", ref-cast-impl: 1.0.25, "Apache-2.0 OR MIT", @@ -846,7 +850,7 @@ secrecy: 0.10.3, "Apache-2.0 OR MIT", security-framework: 3.7.0, "Apache-2.0 OR MIT", security-framework-sys: 2.17.0, "Apache-2.0 OR MIT", seize: 0.5.1, "MIT", -semver: 1.0.27, "Apache-2.0 OR MIT", +semver: 1.0.28, "Apache-2.0 OR MIT", send_wrapper: 0.6.0, "Apache-2.0 OR MIT", seq-macro: 0.3.6, "Apache-2.0 OR MIT", serde: 1.0.228, "Apache-2.0 OR MIT", @@ -965,7 +969,7 @@ tiny-skia-path: 0.11.4, "BSD-3-Clause", tinystr: 0.8.3, "Unicode-3.0", tinyvec: 1.11.0, "Apache-2.0 OR MIT OR Zlib", tinyvec_macros: 0.1.1, "Apache-2.0 OR MIT OR Zlib", -tokio: 1.51.0, "MIT", +tokio: 1.51.1, "MIT", tokio-macros: 2.7.0, "MIT", tokio-rustls: 0.26.4, "Apache-2.0 OR MIT", tokio-stream: 0.1.18, "MIT", @@ -1153,6 +1157,7 @@ winnow: 0.5.40, "MIT", winnow: 0.7.15, "MIT", winnow: 1.0.1, "MIT", winsafe: 0.0.19, "MIT", +wiremock: 0.6.5, "Apache-2.0 OR MIT", wit-bindgen: 0.51.0, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR MIT", wit-bindgen-core: 0.51.0, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR MIT", wit-bindgen-rust: 0.51.0, "Apache-2.0 OR Apache-2.0 WITH LLVM-exception OR MIT", @@ -1183,7 +1188,7 @@ zerotrie: 0.2.4, "Unicode-3.0", zerovec: 0.11.6, "Unicode-3.0", zerovec-derive: 0.11.3, "Unicode-3.0", zip: 0.6.6, "MIT", -zip: 8.5.0, "MIT", +zip: 8.5.1, "MIT", zlib-rs: 0.6.3, "Zlib", zmij: 1.0.21, "MIT", zopfli: 0.8.3, "Apache-2.0", diff --git a/core/common/src/error/iggy_error.rs b/core/common/src/error/iggy_error.rs index fca1ed1e8f..79e1c81231 100644 --- a/core/common/src/error/iggy_error.rs +++ b/core/common/src/error/iggy_error.rs @@ -148,6 +148,8 @@ pub enum IggyError { AccessTokenMissing = 77, #[error("Invalid access token")] InvalidAccessToken = 78, + #[error("Cannot fetch JWKS from URL: {0}")] + CannotFetchJwks(String) = 79, #[error("Invalid size bytes")] InvalidSizeBytes = 80, #[error("Invalid UTF-8")] diff --git a/core/common/src/types/configuration/http_config/http_client_config.rs b/core/common/src/types/configuration/http_config/http_client_config.rs index 06ec4182f5..f182e1a917 100644 --- a/core/common/src/types/configuration/http_config/http_client_config.rs +++ b/core/common/src/types/configuration/http_config/http_client_config.rs @@ -25,6 +25,8 @@ pub struct HttpClientConfig { pub api_url: String, /// The number of retries to perform on transient errors. pub retries: u32, + /// The JWT for A2A authentication. + pub jwt: Option, } impl Default for HttpClientConfig { @@ -32,6 +34,7 @@ impl Default for HttpClientConfig { HttpClientConfig { api_url: "http://127.0.0.1:3000".to_string(), retries: 3, + jwt: None, } } } @@ -41,6 +44,7 @@ impl From> for HttpClientConfig { HttpClientConfig { api_url: format!("http://{}", connection_string.server_address()), retries: connection_string.options().retries().unwrap(), + jwt: None, } } } diff --git a/core/common/src/types/configuration/http_config/http_client_config_builder.rs b/core/common/src/types/configuration/http_config/http_client_config_builder.rs index 097df0da55..029eaf6b81 100644 --- a/core/common/src/types/configuration/http_config/http_client_config_builder.rs +++ b/core/common/src/types/configuration/http_config/http_client_config_builder.rs @@ -45,6 +45,12 @@ impl HttpClientConfigBuilder { self } + /// Sets the JWT for A2A authentication. + pub fn with_jwt(mut self, token: String) -> Self { + self.config.jwt = Some(token); + self + } + /// Builds the `HttpClientConfig` instance. pub fn build(self) -> HttpClientConfig { self.config diff --git a/core/configs/src/server_config/defaults.rs b/core/configs/src/server_config/defaults.rs index 27eae580df..7403196df8 100644 --- a/core/configs/src/server_config/defaults.rs +++ b/core/configs/src/server_config/defaults.rs @@ -264,6 +264,7 @@ impl Default for HttpJwtConfig { encoding_secret: SERVER_CONFIG.http.jwt.encoding_secret.parse().unwrap(), decoding_secret: SERVER_CONFIG.http.jwt.decoding_secret.parse().unwrap(), use_base64_secret: SERVER_CONFIG.http.jwt.use_base_64_secret, + trusted_issuers: None, } } } diff --git a/core/configs/src/server_config/http.rs b/core/configs/src/server_config/http.rs index b19b53caec..2e3f440a88 100644 --- a/core/configs/src/server_config/http.rs +++ b/core/configs/src/server_config/http.rs @@ -26,6 +26,15 @@ use serde::{Deserialize, Serialize}; use serde_with::DisplayFromStr; use serde_with::serde_as; +#[derive(Debug, Deserialize, Serialize, Clone, ConfigEnv)] +pub struct TrustedIssuerConfig { + pub issuer: String, + pub audience: String, + pub jwks_url: String, + #[serde(default)] + pub user_id: u32, +} + #[derive(Debug, Deserialize, Serialize, Clone, ConfigEnv)] pub struct HttpConfig { pub enabled: bool, @@ -72,6 +81,8 @@ pub struct HttpJwtConfig { #[config_env(secret)] pub decoding_secret: String, pub use_base64_secret: bool, + #[serde(default)] + pub trusted_issuers: Option>, } #[derive(Debug, Deserialize, Serialize, Clone, ConfigEnv)] diff --git a/core/harness_derive/src/attrs.rs b/core/harness_derive/src/attrs.rs index 2e4389faa7..7d8e68f7d8 100644 --- a/core/harness_derive/src/attrs.rs +++ b/core/harness_derive/src/attrs.rs @@ -57,6 +57,14 @@ pub struct IggyTestAttrs { pub server: ServerAttrs, pub seed_fn: Option, pub cluster_nodes: ClusterNodesValue, + pub jwks_server: Option, +} + +/// JWKS server attributes. +#[derive(Debug, Default, Clone)] +pub struct JwksAttrs { + pub enabled: bool, + pub store_path: Option, } /// MCP configuration attributes. @@ -82,6 +90,7 @@ impl IggyTestAttrs { server: ServerAttrs::default(), seed_fn: None, cluster_nodes: ClusterNodesValue::None, + jwks_server: None, } } } @@ -173,6 +182,9 @@ pub struct ServerAttrs { /// Dynamic config overrides using dot-notation paths. pub config_overrides: Vec, + /// Path to a TOML config file for the server. + pub config_path: Option, + /// Special cases requiring custom codegen. pub mcp: Option, pub connectors_runtime: Option, @@ -248,6 +260,9 @@ impl Parse for IggyTestAttrs { AttrItem::ClusterNodes(cluster) => { attrs.cluster_nodes = cluster; } + AttrItem::JwksServer(jwks) => { + attrs.jwks_server = Some(jwks); + } } } @@ -264,6 +279,7 @@ enum AttrItem { Server(Box), Seed(syn::Path), ClusterNodes(ClusterNodesValue), + JwksServer(JwksAttrs), } impl Parse for AttrItem { @@ -293,6 +309,12 @@ impl Parse for AttrItem { let path: syn::Path = input.parse()?; Ok(AttrItem::Seed(path)) } + "jwks_server" => { + let content; + parenthesized!(content in input); + let jwks = parse_jwks_attrs(&content)?; + Ok(AttrItem::JwksServer(jwks)) + } _ => Err(syn::Error::new( ident.span(), format!("unknown attribute: {ident_str}"), @@ -413,6 +435,11 @@ fn parse_server_attrs(input: ParseStream) -> syn::Result { input.parse::()?; server.websocket_tls = Some(parse_tls_value(input, span)?); } + "config_path" => { + input.parse::()?; + let lit: LitStr = input.parse()?; + server.config_path = Some(lit.value()); + } _ => { input.parse::()?; let value = parse_config_value(input)?; @@ -538,6 +565,34 @@ impl ArrayLiteral { } } +fn parse_jwks_attrs(input: ParseStream) -> syn::Result { + let mut attrs = JwksAttrs { + enabled: true, + ..Default::default() + }; + + let items: Punctuated = Punctuated::parse_terminated(input)?; + + for item in items { + match item.key.as_str() { + "enabled" => { + attrs.enabled = item.value.parse().map_err(|_| { + syn::Error::new(Span::call_site(), "enabled must be true or false") + })?; + } + "store_path" => attrs.store_path = Some(item.value), + other => { + return Err(syn::Error::new( + Span::call_site(), + format!("unknown jwks_server attribute: {other}"), + )); + } + } + } + + Ok(attrs) +} + #[cfg(test)] mod tests { use super::*; diff --git a/core/harness_derive/src/codegen.rs b/core/harness_derive/src/codegen.rs index 2abd543683..7f6e23a5e4 100644 --- a/core/harness_derive/src/codegen.rs +++ b/core/harness_derive/src/codegen.rs @@ -392,12 +392,12 @@ fn generate_harness_setup( quote! { let __config_overrides: ::std::collections::HashMap = [#(#config_entries),*].into_iter().collect(); - let __extra_envs = ::integration::harness::resolve_config_paths(&__config_overrides) + let mut __extra_envs = ::integration::harness::resolve_config_paths(&__config_overrides) .unwrap_or_else(|e| panic!("invalid config path in #[iggy_harness]:\n{}", e)); } } else { quote! { - let __extra_envs = ::std::collections::HashMap::::new(); + let mut __extra_envs = ::std::collections::HashMap::::new(); } }; @@ -424,6 +424,15 @@ fn generate_harness_setup( // Always add extra_envs (may be empty) server_builder_calls.push(quote!(.extra_envs(__extra_envs))); + // If a config_path is specified, inject IGGY_CONFIG_PATH into extra_envs + let config_path_setup = if let Some(ref config_path) = attrs.server.config_path { + quote! { + __extra_envs.insert("IGGY_CONFIG_PATH".to_string(), #config_path.to_string()); + } + } else { + quote!() + }; + let server_config = quote! { ::integration::harness::TestServerConfig::builder() #(#server_builder_calls)* @@ -491,14 +500,32 @@ fn generate_harness_setup( quote!() }; + let jwks_builder_call = if let Some(ref jwks) = attrs.jwks_server { + let enabled = jwks.enabled; + let store_path_call = match &jwks.store_path { + Some(p) => quote!(.store_path(#p)), + None => quote!(), + }; + quote! { + .jwks(::integration::harness::JwksConfig::builder() + .enabled(#enabled) + #store_path_call + .build()) + } + } else { + quote!() + }; + quote! { #config_resolution + #config_path_setup let mut __harness = ::integration::harness::TestHarness::builder() .server(#server_config) .primary_client(#client_config) #mcp_builder_call #connectors_runtime_builder_call #cluster_builder_call + #jwks_builder_call .build() .unwrap_or_else(|e| panic!("failed to build test harness: {e}")); let _ = ::integration::__macro_support::TransportProtocol::#transport; @@ -795,6 +822,7 @@ mod tests { }, seed_fn: None, cluster_nodes: crate::attrs::ClusterNodesValue::None, + jwks_server: None, }; let variants = generate_variants(&attrs); // 2 transports * 2 segment sizes * 2 cache modes = 8 variants diff --git a/core/integration/Cargo.toml b/core/integration/Cargo.toml index c8830b19fd..9b5a993dfe 100644 --- a/core/integration/Cargo.toml +++ b/core/integration/Cargo.toml @@ -45,6 +45,7 @@ iggy-cli = { workspace = true } iggy_binary_protocol = { workspace = true } iggy_common = { workspace = true } iggy_connector_sdk = { workspace = true, features = ["api"] } +jsonwebtoken = { workspace = true } keyring = { workspace = true } lazy_static = { workspace = true } libc = { workspace = true } @@ -79,4 +80,5 @@ tracing = { workspace = true } tracing-subscriber = { workspace = true } twox-hash = { workspace = true } uuid = { workspace = true } +wiremock = "0.6" zip = { workspace = true } diff --git a/core/integration/src/harness/config/jwks.rs b/core/integration/src/harness/config/jwks.rs new file mode 100644 index 0000000000..6632132772 --- /dev/null +++ b/core/integration/src/harness/config/jwks.rs @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use bon::Builder; + +#[derive(Debug, Clone, Builder, Default)] +pub struct JwksConfig { + pub enabled: bool, + pub issuer_url: Option, + #[builder(into)] + pub store_path: Option, +} diff --git a/core/integration/src/harness/config/mod.rs b/core/integration/src/harness/config/mod.rs index cc6c0ba6a3..c12b410296 100644 --- a/core/integration/src/harness/config/mod.rs +++ b/core/integration/src/harness/config/mod.rs @@ -20,6 +20,7 @@ mod client; mod common; mod connectors_runtime; +mod jwks; mod mcp; mod resolve; mod server; @@ -27,6 +28,7 @@ mod server; pub use client::{AutoLoginConfig, ClientConfig}; pub use common::{EncryptionConfig, IpAddrKind, TlsConfig}; pub use connectors_runtime::ConnectorsRuntimeConfig; +pub use jwks::JwksConfig; pub use mcp::McpConfig; pub use resolve::resolve_config_paths; pub use server::TestServerConfig; diff --git a/core/integration/src/harness/handle/server.rs b/core/integration/src/harness/handle/server.rs index 2c64ac5c45..e39612800e 100644 --- a/core/integration/src/harness/handle/server.rs +++ b/core/integration/src/harness/handle/server.rs @@ -216,7 +216,7 @@ impl ServerHandle { for (key, value) in std::env::vars() { if key.starts_with("IGGY_") && !PROTECTED_PREFIXES.iter().any(|p| key.starts_with(p)) { - self.envs.insert(key, value); + self.envs.entry(key).or_insert(value); } } diff --git a/core/integration/src/harness/mod.rs b/core/integration/src/harness/mod.rs index 13793ab05b..81d846728d 100644 --- a/core/integration/src/harness/mod.rs +++ b/core/integration/src/harness/mod.rs @@ -55,7 +55,7 @@ mod traits; pub use config::{ AutoLoginConfig, ClientConfig, ConnectorsRuntimeConfig, EncryptionConfig, IpAddrKind, - McpConfig, TestServerConfig, TlsConfig, resolve_config_paths, + JwksConfig, McpConfig, TestServerConfig, TlsConfig, resolve_config_paths, }; pub use context::{TestContext, get_test_directory}; diff --git a/core/integration/src/harness/orchestrator/builder.rs b/core/integration/src/harness/orchestrator/builder.rs index 46afddd3b0..3126fde63a 100644 --- a/core/integration/src/harness/orchestrator/builder.rs +++ b/core/integration/src/harness/orchestrator/builder.rs @@ -19,8 +19,9 @@ use super::harness::TestHarness; use crate::harness::config::{ - ClientConfig, ConnectorsRuntimeConfig, IpAddrKind, McpConfig, TestServerConfig, + ClientConfig, ConnectorsRuntimeConfig, IpAddrKind, JwksConfig, McpConfig, TestServerConfig, }; + use crate::harness::context::TestContext; use crate::harness::error::TestBinaryError; use crate::harness::handle::ServerHandle; @@ -36,6 +37,7 @@ pub struct TestHarnessBuilder { server_config: Option, mcp_config: Option, connectors_runtime_config: Option, + jwks_config: Option, primary_transport: Option, primary_client_config: Option, clients: Vec, @@ -49,6 +51,7 @@ impl Default for TestHarnessBuilder { test_name: None, server_config: None, mcp_config: None, + jwks_config: None, connectors_runtime_config: None, primary_transport: None, primary_client_config: None, @@ -102,6 +105,12 @@ impl TestHarnessBuilder { self } + /// Configure the JWKS server. + pub fn jwks(mut self, config: JwksConfig) -> Self { + self.jwks_config = Some(config); + self + } + /// Add a TCP client. pub fn tcp_client(mut self) -> Self { self.clients.push(ClientConfig::tcp()); @@ -225,6 +234,8 @@ impl TestHarnessBuilder { client_configs: self.clients, primary_transport, primary_client_config: self.primary_client_config, + jwks_config: self.jwks_config, + jwks_server: None, started: false, }) } diff --git a/core/integration/src/harness/orchestrator/harness.rs b/core/integration/src/harness/orchestrator/harness.rs index 6897b31b4e..3018cb39fd 100644 --- a/core/integration/src/harness/orchestrator/harness.rs +++ b/core/integration/src/harness/orchestrator/harness.rs @@ -18,7 +18,7 @@ */ use super::builder::TestHarnessBuilder; -use crate::harness::config::ClientConfig; +use crate::harness::config::{ClientConfig, JwksConfig}; use crate::harness::context::TestContext; use crate::harness::error::TestBinaryError; use crate::harness::handle::{ @@ -31,6 +31,8 @@ use iggy::prelude::{ClientWrapper, IggyClient}; use iggy_common::TransportProtocol; use std::path::Path; use std::sync::Arc; +use wiremock::matchers::{method, path}; +use wiremock::{Mock, MockServer, ResponseTemplate}; /// Collected logs from all binaries in the harness. #[derive(Debug)] @@ -46,6 +48,8 @@ pub struct TestHarness { pub(super) client_configs: Vec, pub(super) primary_transport: Option, pub(super) primary_client_config: Option, + pub(super) jwks_config: Option, + pub(super) jwks_server: Option, pub(super) started: bool, } @@ -106,6 +110,44 @@ impl TestHarness { return Err(TestBinaryError::AlreadyStarted); } + if let Some(jwks_config) = &self.jwks_config + && jwks_config.enabled + { + let mock_server = MockServer::start().await; + + if let Some(store_path) = &jwks_config.store_path { + let content = std::fs::read_to_string(store_path).map_err(|e| { + TestBinaryError::InvalidState { + message: format!("Failed to read JWKS file at {}: {}", store_path, e), + } + })?; + + Mock::given(method("GET")) + .and(path("/.well-known/jwks.json")) + .respond_with(ResponseTemplate::new(200).set_body_string(content)) + .mount(&mock_server) + .await; + } + + let jwks_url = format!("{}/.well-known/jwks.json", mock_server.uri()); + let issuer = jwks_config + .issuer_url + .as_deref() + .unwrap_or("https://test-issuer.com"); + + for server in &mut self.servers { + server.add_env("IGGY_HTTP_JWT_TRUSTED_ISSUERS_0_ISSUER", issuer); + server.add_env( + "IGGY_HTTP_JWT_TRUSTED_ISSUERS_0_JWKS_URL", + jwks_url.as_str(), + ); + server.add_env("IGGY_HTTP_JWT_TRUSTED_ISSUERS_0_AUDIENCE", "iggy"); + server.add_env("IGGY_HTTP_JWT_TRUSTED_ISSUERS_0_USER_ID", "1"); + } + + self.jwks_server = Some(mock_server); + } + for server in &mut self.servers { server.start()?; } diff --git a/core/integration/tests/server/a2a_jwt/config.toml b/core/integration/tests/server/a2a_jwt/config.toml new file mode 100644 index 0000000000..5c08041ec9 --- /dev/null +++ b/core/integration/tests/server/a2a_jwt/config.toml @@ -0,0 +1,66 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[http] +enabled = true +address = "127.0.0.1:0" + +[http.jwt] +enabled = true +issuer = "iggy" +audience = "iggy" +access_token_expiry = "1h" +refresh_token_expiry = "30d" +clock_skew = "5s" + +# Configure trusted A2A issuer +[[http.jwt.trusted_issuers]] +issuer = "https://test-issuer.com" +audience = "iggy" +jwks_url = "http://localhost:8080/.well-known/jwks.json" +user_id = 1 + +[system] +path = "local_data" + +[system.database] +type = "file" +path = "local_data/db" + +[system.message_bus] +type = "in_memory" + +[system.streams] +path = "local_data/streams" + +[system.topics] +path = "local_data/topics" + +[system.partitions] +path = "local_data/partitions" + +[system.segments] +path = "local_data/segments" + +[system.users] +path = "local_data/users" + +[system.personal_access_tokens] +path = "local_data/pats" + +[system.consumer_groups] +path = "local_data/consumer_groups" diff --git a/core/integration/tests/server/a2a_jwt/jwt_tests.rs b/core/integration/tests/server/a2a_jwt/jwt_tests.rs new file mode 100644 index 0000000000..0e152e9a7f --- /dev/null +++ b/core/integration/tests/server/a2a_jwt/jwt_tests.rs @@ -0,0 +1,331 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use iggy::prelude::{GlobalPermissions, IggyClientBuilder, Permissions, UserStatus}; +use iggy_common::{StreamClient, UserClient}; +use integration::iggy_harness; +use jsonwebtoken::{Algorithm, EncodingKey, Header, encode}; +use serde::{Deserialize, Serialize}; +use server::http::jwt::json_web_token::Audience; + +const TEST_ISSUER: &str = "https://test-issuer.com"; +const TEST_AUDIENCE: &str = "iggy"; +const TEST_KEY_ID: &str = "iggy-jwt-key-1"; +const TEST_PRIVATE_KEY: &[u8] = include_bytes!("../../../../certs/iggy_key.pem"); + +/// Seed function to create the A2A user with proper permissions +async fn seed_a2a_user( + client: &iggy::prelude::IggyClient, +) -> Result<(), Box> { + // Create a user that will be used for A2A JWT authentication. + // The first user created after root will have user_id = 1. + // Grant read_streams permission so the user can call get_streams(). + let permissions = Permissions { + global: GlobalPermissions { + read_streams: true, + ..GlobalPermissions::default() + }, + streams: None, + }; + + match client + .create_user( + "a2a-test-user", + "a2a-test-password", + UserStatus::Active, + Some(permissions), + ) + .await + { + Ok(user) => { + println!("A2A user created successfully with ID: {}", user.id); + } + Err(e) => { + println!( + "Note: Could not create A2A user (may already exist): {:?}", + e + ); + } + } + Ok(()) +} + +/// Test claims structure for JWT tokens +/// Supports both single string and array audience per RFC 7519 +#[derive(Debug, Serialize, Deserialize)] +struct TestClaims { + jti: String, + iss: String, + aud: Audience, + sub: String, + exp: u64, + iat: u64, + nbf: u64, +} + +/// Get current timestamp in seconds since Unix epoch +fn now_timestamp() -> u64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() +} + +/// Creates a valid JWT token with specified expiration time +fn create_valid_jwt(exp_seconds: u64) -> String { + let now = now_timestamp(); + let claims = TestClaims { + jti: uuid::Uuid::now_v7().to_string(), + iss: TEST_ISSUER.to_string(), + aud: Audience::from(TEST_AUDIENCE), + sub: "external-a2a-user-123".to_string(), + exp: now + exp_seconds, + iat: now, + nbf: now, + }; + + let mut header = Header::new(Algorithm::RS256); + header.kid = Some(TEST_KEY_ID.to_string()); + let encoding_key = EncodingKey::from_rsa_pem(TEST_PRIVATE_KEY).unwrap(); + + encode(&header, &claims, &encoding_key).unwrap() +} + +/// Creates a valid JWT token with audience as array +fn create_valid_jwt_with_array_aud(exp_seconds: u64) -> String { + let now = now_timestamp(); + let claims = TestClaims { + jti: uuid::Uuid::now_v7().to_string(), + iss: TEST_ISSUER.to_string(), + aud: Audience::from(vec![ + "some-other-service".to_string(), + TEST_AUDIENCE.to_string(), + "another-service".to_string(), + ]), + sub: "external-a2a-user-123".to_string(), + exp: now + exp_seconds, + iat: now, + nbf: now, + }; + + let mut header = Header::new(Algorithm::RS256); + header.kid = Some(TEST_KEY_ID.to_string()); + let encoding_key = EncodingKey::from_rsa_pem(TEST_PRIVATE_KEY).unwrap(); + + encode(&header, &claims, &encoding_key).unwrap() +} + +/// Creates an expired JWT token (expired 1 hour ago) +fn create_expired_jwt() -> String { + let now = now_timestamp(); + let claims = TestClaims { + jti: uuid::Uuid::now_v7().to_string(), + iss: TEST_ISSUER.to_string(), + aud: Audience::from(TEST_AUDIENCE), + sub: "external-a2a-user-123".to_string(), + exp: now.saturating_sub(3600), + iat: now.saturating_sub(7200), + nbf: now.saturating_sub(7200), + }; + + let mut header = Header::new(Algorithm::RS256); + header.kid = Some(TEST_KEY_ID.to_string()); + let encoding_key = EncodingKey::from_rsa_pem(TEST_PRIVATE_KEY).unwrap(); + + encode(&header, &claims, &encoding_key).unwrap() +} + +/// Creates a JWT token with unknown issuer +fn create_unknown_issuer_jwt() -> String { + let now = now_timestamp(); + let claims = TestClaims { + jti: uuid::Uuid::now_v7().to_string(), + iss: "https://unknown-issuer.com".to_string(), + aud: Audience::from(TEST_AUDIENCE), + sub: "external-a2a-user-123".to_string(), + exp: now + 3600, + iat: now, + nbf: now, + }; + + let mut header = Header::new(Algorithm::RS256); + header.kid = Some(TEST_KEY_ID.to_string()); + let encoding_key = EncodingKey::from_rsa_pem(TEST_PRIVATE_KEY).unwrap(); + + encode(&header, &claims, &encoding_key).unwrap() +} + +/// Create an IggyClient with the provided JWT token +async fn create_client_with_jwt(http_addr: &str, token: String) -> iggy::prelude::IggyClient { + IggyClientBuilder::new() + .with_http() + .with_api_url(format!("http://{}", http_addr)) + .with_jwt(token) + .build() + .expect("failed to build client") +} + +/// Test that valid A2A JWT token allows access to API +#[iggy_harness( + server(config_path = "tests/server/a2a_jwt/config.toml"), + jwks_server(store_path = "tests/server/a2a_jwt/wiremock/__files/jwks.json"), + seed = seed_a2a_user +)] +async fn test_a2a_jwt_valid_token(harness: &TestHarness) { + let server = harness + .all_servers() + .first() + .expect("server should be available"); + let http_addr = server + .http_addr() + .expect("http address should be available"); + + let token = create_valid_jwt(3600); + let client = create_client_with_jwt(&http_addr.to_string(), token).await; + + // get_streams() should succeed with valid JWT token + let result = client.get_streams().await; + assert!(result.is_ok(), "Expected Ok, got {:?}", result); +} + +/// Test that valid A2A JWT token with array audience allows access to API +#[iggy_harness( + server(config_path = "tests/server/a2a_jwt/config.toml"), + jwks_server(store_path = "tests/server/a2a_jwt/wiremock/__files/jwks.json"), + seed = seed_a2a_user +)] +async fn test_a2a_jwt_array_audience(harness: &TestHarness) { + let server = harness + .all_servers() + .first() + .expect("server should be available"); + let http_addr = server + .http_addr() + .expect("http address should be available"); + + let token = create_valid_jwt_with_array_aud(3600); + let client = create_client_with_jwt(&http_addr.to_string(), token).await; + + // get_streams() should succeed with valid JWT token + let result = client.get_streams().await; + assert!(result.is_ok(), "Expected Ok, got {:?}", result); +} + +/// Test that expired A2A JWT token is rejected +#[iggy_harness( + server(config_path = "tests/server/a2a_jwt/config.toml"), + jwks_server(store_path = "tests/server/a2a_jwt/wiremock/__files/jwks.json") +)] +async fn test_a2a_jwt_expired_token(harness: &TestHarness) { + let server = harness + .all_servers() + .first() + .expect("server should be available"); + let http_addr = server + .http_addr() + .expect("http address should be available"); + + let token = create_expired_jwt(); + let client = create_client_with_jwt(&http_addr.to_string(), token).await; + + // get_streams() should fail with Unauthenticated error + let result = client.get_streams().await; + assert!( + result.is_err(), + "Expected Unauthenticated error, got {:?}", + result + ); + let err = result.unwrap_err(); + assert_eq!( + err.as_code(), + iggy::prelude::IggyError::Unauthenticated.as_code(), + "Expected Unauthenticated error, got {:?}", + err + ); +} + +/// Test that JWT token with unknown issuer is rejected +#[iggy_harness( + server(config_path = "tests/server/a2a_jwt/config.toml"), + jwks_server(store_path = "tests/server/a2a_jwt/wiremock/__files/jwks.json") +)] +async fn test_a2a_jwt_unknown_issuer(harness: &TestHarness) { + let server = harness + .all_servers() + .first() + .expect("server should be available"); + let http_addr = server + .http_addr() + .expect("http address should be available"); + + let token = create_unknown_issuer_jwt(); + let client = create_client_with_jwt(&http_addr.to_string(), token).await; + + // get_streams() should fail with Unauthenticated error + let result = client.get_streams().await; + assert!( + result.is_err(), + "Expected Unauthenticated error, got {:?}", + result + ); + let err = result.unwrap_err(); + assert_eq!( + err.as_code(), + iggy::prelude::IggyError::Unauthenticated.as_code(), + "Expected Unauthenticated error, got {:?}", + err + ); +} + +/// Test that missing JWT token results in authentication failure +#[iggy_harness( + server(config_path = "tests/server/a2a_jwt/config.toml"), + jwks_server(store_path = "tests/server/a2a_jwt/wiremock/__files/jwks.json") +)] +async fn test_a2a_jwt_missing_token(harness: &TestHarness) { + let server = harness + .all_servers() + .first() + .expect("server should be available"); + let http_addr = server + .http_addr() + .expect("http address should be available"); + + // Create client without JWT token + let client = IggyClientBuilder::new() + .with_http() + .with_api_url(format!("http://{}", http_addr)) + .build() + .expect("failed to build client"); + + // get_streams() should fail with Unauthenticated error + let result = client.get_streams().await; + assert!( + result.is_err(), + "Expected Unauthenticated error, got {:?}", + result + ); + let err = result.unwrap_err(); + assert_eq!( + err.as_code(), + iggy::prelude::IggyError::Unauthenticated.as_code(), + "Expected Unauthenticated error, got {:?}", + err + ); +} diff --git a/core/integration/tests/server/a2a_jwt/mod.rs b/core/integration/tests/server/a2a_jwt/mod.rs new file mode 100644 index 0000000000..46392e47eb --- /dev/null +++ b/core/integration/tests/server/a2a_jwt/mod.rs @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +mod jwt_tests; diff --git a/core/integration/tests/server/a2a_jwt/wiremock/__files/jwks.json b/core/integration/tests/server/a2a_jwt/wiremock/__files/jwks.json new file mode 100644 index 0000000000..ede4eba57a --- /dev/null +++ b/core/integration/tests/server/a2a_jwt/wiremock/__files/jwks.json @@ -0,0 +1,10 @@ +{ + "keys": [ + { + "kty": "RSA", + "kid": "iggy-jwt-key-1", + "n": "2cuC8EotFJdBYkwg78EzQNIejpKcI_5EB2yNMskZK37KWQmyXtYBUS0NeSg3G7IxraFOq1RqibWzN7SF5GBwaReWefHqE2zbfLLJD_fZIiHzVE8fREzGLtAYIj1U8sP5pEWJYMhEKK35ARth8iRLuTbGQiKkmyiqkEsnIUwzbPr4oRrMltTQCPHHDzWds5IVIZIwPt0hWCeh34bKkwEqHOWx6-fkxNwQnVU9YCfBIuwVR5nUytQE6LZuvCsPvJjv1gDtyBm_K1MDf1-7BrCmTsDw0ACB2CoSwPZM4A1jHZo4unWdfsuyph5FzzCK4Lf5jdsxQD6W8bqQbATibKBqsw", + "e": "AQAB" + } + ] +} diff --git a/core/integration/tests/server/a2a_jwt/wiremock/mappings/jwks.json b/core/integration/tests/server/a2a_jwt/wiremock/mappings/jwks.json new file mode 100644 index 0000000000..c6009b9e31 --- /dev/null +++ b/core/integration/tests/server/a2a_jwt/wiremock/mappings/jwks.json @@ -0,0 +1,13 @@ +{ + "request": { + "method": "GET", + "url": "/.well-known/jwks.json" + }, + "response": { + "status": 200, + "headers": { + "Content-Type": "application/json" + }, + "bodyFileName": "jwks.json" + } +} diff --git a/core/integration/tests/server/mod.rs b/core/integration/tests/server/mod.rs index 6ea1338af5..546f95f158 100644 --- a/core/integration/tests/server/mod.rs +++ b/core/integration/tests/server/mod.rs @@ -16,6 +16,7 @@ * under the License. */ +mod a2a_jwt; mod cg; mod concurrent_addition; mod general; diff --git a/core/sdk/Cargo.toml b/core/sdk/Cargo.toml index d7ad2c2871..bb89dd70a1 100644 --- a/core/sdk/Cargo.toml +++ b/core/sdk/Cargo.toml @@ -17,7 +17,7 @@ [package] name = "iggy" -version = "0.10.0" +version = "0.10.1" description = "Iggy is the persistent message streaming platform written in Rust, supporting QUIC, TCP and HTTP transport protocols, capable of processing millions of messages per second." edition = "2024" license = "Apache-2.0" diff --git a/core/sdk/src/client_provider.rs b/core/sdk/src/client_provider.rs index 6943518159..87257363c0 100644 --- a/core/sdk/src/client_provider.rs +++ b/core/sdk/src/client_provider.rs @@ -127,6 +127,7 @@ impl ClientProviderConfig { config.http = Some(Arc::new(HttpClientConfig { api_url: args.http_api_url, retries: args.http_retries, + jwt: None, })); } TransportProtocol::Tcp => { diff --git a/core/sdk/src/clients/client_builder.rs b/core/sdk/src/clients/client_builder.rs index 9600f6b387..4f07c75abd 100644 --- a/core/sdk/src/clients/client_builder.rs +++ b/core/sdk/src/clients/client_builder.rs @@ -308,6 +308,12 @@ impl HttpClientBuilder { self } + /// Sets the JWT for A2A (Agent-to-Agent) authentication. + pub fn with_jwt(mut self, token: String) -> Self { + self.config = self.config.with_jwt(token); + self + } + /// Builds the parent `IggyClient` with HTTP configuration. pub fn build(self) -> Result { let client = HttpClient::create(Arc::new(self.config.build()))?; diff --git a/core/sdk/src/clients/consumer.rs b/core/sdk/src/clients/consumer.rs index 17d2c425ba..2b9177a5e0 100644 --- a/core/sdk/src/clients/consumer.rs +++ b/core/sdk/src/clients/consumer.rs @@ -569,6 +569,9 @@ impl IggyConsumer { DiagnosticEvent::Connected => { trace!("Connected to the server"); joined_consumer_group.store(false, ORDERING); + if !is_consumer_group { + can_poll.store(true, ORDERING); + } if disconnected { reconnected = true; disconnected = false; diff --git a/core/sdk/src/http/http_client.rs b/core/sdk/src/http/http_client.rs index a6ddcaa650..bdc093dcfe 100644 --- a/core/sdk/src/http/http_client.rs +++ b/core/sdk/src/http/http_client.rs @@ -277,11 +277,13 @@ impl HttpClient { .with(RetryTransientMiddleware::new_with_policy(retry_policy)) .build(); + let access_token = config.jwt.clone().unwrap_or_default(); + Ok(Self { api_url, client, heartbeat_interval: IggyDuration::from_str("5s").unwrap(), - access_token: IggyRwLock::new("".to_string()), + access_token: IggyRwLock::new(access_token), events: broadcast(1000), }) } diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index 4254c5788e..3125c43b3f 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -88,6 +88,7 @@ rustls-pemfile = { workspace = true } secrecy = { workspace = true } send_wrapper = { workspace = true } serde = { workspace = true } +serde_json = { workspace = true } slab = { workspace = true } socket2 = { workspace = true } strum = { workspace = true } diff --git a/core/server/config.toml b/core/server/config.toml index b3be68e580..367d5f995a 100644 --- a/core/server/config.toml +++ b/core/server/config.toml @@ -126,6 +126,12 @@ decoding_secret = "" # `false` means the secret is in plain text. use_base64_secret = false +# Trusted issuers for A2A (Application-to-Application) authentication +[[http.jwt.trusted_issuers]] +issuer = "test-issuer" +jwks_url = "http://127.0.0.1:8081/.well-known/jwks.json" +audience = "iggy.apache.org" + # Metrics configuration for HTTP. [http.metrics] # Enable or disable the metrics endpoint. diff --git a/core/server/src/http/jwt/json_web_token.rs b/core/server/src/http/jwt/json_web_token.rs index 93862c77e3..e0a64674e3 100644 --- a/core/server/src/http/jwt/json_web_token.rs +++ b/core/server/src/http/jwt/json_web_token.rs @@ -17,8 +17,9 @@ */ use iggy_common::UserId; -use serde::{Deserialize, Serialize}; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::net::SocketAddr; +use std::{fmt, fmt::Display}; #[derive(Debug, Clone)] pub struct Identity { @@ -28,12 +29,114 @@ pub struct Identity { pub ip_address: SocketAddr, } +#[derive(Debug, Clone)] +pub enum Audience { + Single(String), + Multiple(Vec), +} + +impl Audience { + pub fn contains(&self, audience: &str) -> bool { + match self { + Audience::Single(aud) => aud == audience, + Audience::Multiple(auds) => auds.iter().any(|a| a == audience), + } + } +} + +impl Display for Audience { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Audience::Single(aud) => f.write_str(aud), + Audience::Multiple(auds) => f.write_str(&auds.join(",")), + } + } +} + +impl From for Audience { + fn from(aud: String) -> Self { + Audience::Single(aud) + } +} + +impl From<&str> for Audience { + fn from(aud: &str) -> Self { + Audience::Single(aud.to_string()) + } +} + +impl From> for Audience { + fn from(auds: Vec) -> Self { + if auds.len() == 1 { + Audience::Single(auds.into_iter().next().unwrap()) + } else { + Audience::Multiple(auds) + } + } +} + +impl Serialize for Audience { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + match self { + Audience::Single(aud) => serializer.serialize_str(aud), + Audience::Multiple(auds) => auds.serialize(serializer), + } + } +} + +impl<'de> Deserialize<'de> for Audience { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + struct AudienceVisitor; + + impl<'de> serde::de::Visitor<'de> for AudienceVisitor { + type Value = Audience; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("a string or an array of strings") + } + + fn visit_str(self, value: &str) -> Result + where + E: serde::de::Error, + { + Ok(Audience::Single(value.to_string())) + } + + fn visit_string(self, value: String) -> Result + where + E: serde::de::Error, + { + Ok(Audience::Single(value)) + } + + fn visit_seq(self, mut seq: A) -> Result + where + A: serde::de::SeqAccess<'de>, + { + let mut auds = Vec::new(); + while let Some(aud) = seq.next_element::()? { + auds.push(aud); + } + Ok(Audience::Multiple(auds)) + } + } + + deserializer.deserialize_any(AudienceVisitor) + } +} + #[derive(Debug, Serialize, Deserialize, Clone)] pub struct JwtClaims { pub jti: String, pub iss: String, - pub aud: String, - pub sub: u32, + pub aud: Audience, + pub sub: String, pub iat: u64, pub exp: u64, pub nbf: u64, diff --git a/core/server/src/http/jwt/jwks.rs b/core/server/src/http/jwt/jwks.rs new file mode 100644 index 0000000000..db7a9a46aa --- /dev/null +++ b/core/server/src/http/jwt/jwks.rs @@ -0,0 +1,316 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +use dashmap::DashMap; +use iggy_common::IggyError; +use jsonwebtoken::DecodingKey; +use serde::Deserialize; +use std::hash::Hash; +use std::sync::OnceLock; +use strum::{Display, EnumString}; + +static HTTP_CLIENT: OnceLock = OnceLock::new(); + +fn get_http_client() -> &'static cyper::Client { + HTTP_CLIENT.get_or_init(cyper::Client::new) +} + +/// JWK key type enumeration +#[derive(Debug, Clone, Copy, Display, EnumString, Deserialize, PartialEq, Eq)] +#[strum(serialize_all = "UPPERCASE")] +#[serde(rename_all = "UPPERCASE")] +enum JwkKeyType { + /// RSA key type + #[strum(serialize = "RSA")] + Rsa, + /// EC (Elliptic Curve) key type + #[strum(serialize = "EC")] + Ec, +} + +/// EC curve type enumeration +#[derive(Debug, Clone, Copy, Display, EnumString, Deserialize, PartialEq, Eq)] +#[strum(serialize_all = "UPPERCASE")] +#[serde(rename_all = "UPPERCASE")] +enum EcCurve { + /// P-256 curve + #[strum(serialize = "P-256")] + P256, + /// P-384 curve + #[strum(serialize = "P-384")] + P384, + /// P-521 curve + #[strum(serialize = "P-521")] + P521, +} + +#[derive(Debug, Deserialize)] +struct Jwk { + kty: JwkKeyType, + kid: Option, + n: Option, + e: Option, + x: Option, + y: Option, + crv: Option, +} + +#[derive(Debug, Deserialize)] +struct JwkSet { + keys: Vec, +} + +#[derive(Debug, Clone, Hash, Eq, PartialEq)] +struct CacheKey { + issuer: String, + kid: String, +} + +#[derive(Debug, Clone)] +pub struct JwksClient { + cache: DashMap, +} + +impl Default for JwksClient { + fn default() -> Self { + Self { + cache: DashMap::new(), + } + } +} + +impl JwksClient { + pub async fn get_key(&self, issuer: &str, jwks_url: &str, kid: &str) -> Option { + let cache_key = CacheKey { + issuer: issuer.to_string(), + kid: kid.to_string(), + }; + + // try to get from cache first + if let Some(key) = self.cache.get(&cache_key) { + return Some(key.clone()); + } + + // fetch and cache if not found + if let Ok(key) = self.fetch_and_cache_key(issuer, jwks_url, kid).await { + return Some(key); + } + + None + } + + async fn fetch_and_cache_key( + &self, + issuer: &str, + jwks_url: &str, + kid: &str, + ) -> Result { + if let Err(e) = self.refresh_keys(issuer, jwks_url).await { + return Err(IggyError::CannotFetchJwks(format!( + "Failed to refresh keys: {}", + e + ))); + } + + let cache_key = CacheKey { + issuer: issuer.to_string(), + kid: kid.to_string(), + }; + + self.cache + .get(&cache_key) + .map(|entry| entry.clone()) + .ok_or(IggyError::InvalidAccessToken) + } + + async fn refresh_keys(&self, issuer: &str, jwks_url: &str) -> Result<(), IggyError> { + let client = get_http_client(); + let request = client + .get(jwks_url) + .map_err(|e| IggyError::CannotFetchJwks(format!("Failed to build request: {}", e)))? + .build(); + let response = client + .execute(request) + .await + .map_err(|e| IggyError::CannotFetchJwks(format!("HTTP request failed: {}", e)))?; + + let body = response.text().await.map_err(|e| { + IggyError::CannotFetchJwks(format!("Failed to read response body: {}", e)) + })?; + + let jwks: JwkSet = serde_json::from_str(&body) + .map_err(|e| IggyError::CannotFetchJwks(format!("Failed to parse JWKS: {}", e)))?; + + // Collect all current kids from the JWKS response + let current_kids: std::collections::HashSet = + jwks.keys.iter().filter_map(|key| key.kid.clone()).collect(); + + // Remove cached keys for this issuer that are no longer in the JWKS response + // Security fix: Clean up revoked/rotated keys to prevent accepting tokens signed with old keys + let keys_to_remove: Vec = self + .cache + .iter() + .filter(|entry| { + entry.key().issuer == issuer && !current_kids.contains(&entry.key().kid) + }) + .map(|entry| entry.key().clone()) + .collect(); + + for key in keys_to_remove { + self.cache.remove(&key); + } + + for key in jwks.keys { + if let Some(kid) = key.kid { + let decoding_key: DecodingKey = match key.kty { + JwkKeyType::Rsa => { + if let (Some(n), Some(e)) = (key.n.as_deref(), key.e.as_deref()) { + DecodingKey::from_rsa_components(n, e).map_err(|e| { + IggyError::CannotFetchJwks(format!("Invalid RSA key: {}", e)) + })? + } else { + continue; + } + } + JwkKeyType::Ec => { + if let (Some(x), Some(y), Some(crv_str)) = + (key.x.as_deref(), key.y.as_deref(), key.crv.as_deref()) + { + if let Ok(_curve) = crv_str.parse::() { + DecodingKey::from_ec_components(x, y).map_err(|e| { + IggyError::CannotFetchJwks(format!("Invalid EC key: {}", e)) + })? + } else { + continue; + } + } else { + continue; + } + } + }; + + let cache_key = CacheKey { + issuer: issuer.to_string(), + kid, + }; + self.cache.insert(cache_key, decoding_key); + } + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use jsonwebtoken::DecodingKey; + + const TEST_ISSUER: &str = "https://test-issuer.com"; + const TEST_KID: &str = "test-key"; + + fn create_test_decoding_key() -> DecodingKey { + // Use HMAC secret to create a simple test DecodingKey + // Note: This is only for testing cache logic, not a real RSA/EC key + DecodingKey::from_secret(b"test-secret-key-for-cache-testing-only") + } + + #[test] + fn test_cache_key_equality() { + let key1 = CacheKey { + issuer: TEST_ISSUER.to_string(), + kid: TEST_KID.to_string(), + }; + let key2 = CacheKey { + issuer: TEST_ISSUER.to_string(), + kid: TEST_KID.to_string(), + }; + assert_eq!(key1, key2); + } + + #[test] + fn test_cache_key_different_issuer() { + let key1 = CacheKey { + issuer: "issuer1".to_string(), + kid: TEST_KID.to_string(), + }; + let key2 = CacheKey { + issuer: "issuer2".to_string(), + kid: TEST_KID.to_string(), + }; + assert_ne!(key1, key2); + } + + #[test] + fn test_cache_key_different_kid() { + let key1 = CacheKey { + issuer: TEST_ISSUER.to_string(), + kid: "kid1".to_string(), + }; + let key2 = CacheKey { + issuer: TEST_ISSUER.to_string(), + kid: "kid2".to_string(), + }; + assert_ne!(key1, key2); + } + + #[test] + fn test_jwks_client_default() { + let client = JwksClient::default(); + assert!(client.cache.is_empty()); + } + + #[test] + fn test_cache_insert_and_get() { + let client = JwksClient::default(); + let cache_key = CacheKey { + issuer: TEST_ISSUER.to_string(), + kid: TEST_KID.to_string(), + }; + let decoding_key = create_test_decoding_key(); + + client.cache.insert(cache_key.clone(), decoding_key.clone()); + + let cached = client.cache.get(&cache_key); + assert!(cached.is_some()); + } + + #[test] + fn test_cache_multiple_keys() { + let client = JwksClient::default(); + + let key1 = CacheKey { + issuer: "issuer1".to_string(), + kid: "kid1".to_string(), + }; + let key2 = CacheKey { + issuer: "issuer2".to_string(), + kid: "kid2".to_string(), + }; + + let decoding_key1 = create_test_decoding_key(); + let decoding_key2 = create_test_decoding_key(); + + client.cache.insert(key1.clone(), decoding_key1); + client.cache.insert(key2.clone(), decoding_key2); + + assert_eq!(client.cache.len(), 2); + assert!(client.cache.get(&key1).is_some()); + assert!(client.cache.get(&key2).is_some()); + } +} diff --git a/core/server/src/http/jwt/jwt_manager.rs b/core/server/src/http/jwt/jwt_manager.rs index 8980a1389b..77a1813099 100644 --- a/core/server/src/http/jwt/jwt_manager.rs +++ b/core/server/src/http/jwt/jwt_manager.rs @@ -16,9 +16,10 @@ * under the License. */ -use crate::configs::http::HttpJwtConfig; +use crate::configs::http::{HttpJwtConfig, TrustedIssuerConfig}; use crate::http::jwt::COMPONENT; -use crate::http::jwt::json_web_token::{GeneratedToken, JwtClaims, RevokedAccessToken}; +use crate::http::jwt::json_web_token::{Audience, GeneratedToken, JwtClaims, RevokedAccessToken}; +use crate::http::jwt::jwks::JwksClient; use crate::http::jwt::storage::TokenStorage; use crate::streaming::persistence::persister::PersisterKind; use ahash::AHashMap; @@ -31,6 +32,7 @@ use iggy_common::UserId; use iggy_common::locking::IggyRwLock; use iggy_common::locking::IggyRwLockFn; use jsonwebtoken::{Algorithm, DecodingKey, EncodingKey, Header, TokenData, Validation, encode}; +use std::collections::HashMap; use std::sync::Arc; use tracing::{debug, error, info}; @@ -56,6 +58,8 @@ pub struct JwtManager { tokens_storage: TokenStorage, revoked_tokens: IggyRwLock>, validations: AHashMap, + jwks_client: JwksClient, + trusted_issuer: HashMap, } impl JwtManager { @@ -78,6 +82,8 @@ impl JwtManager { validator, tokens_storage: TokenStorage::new(persister, path), revoked_tokens: IggyRwLock::new(AHashMap::new()), + jwks_client: JwksClient::default(), + trusted_issuer: HashMap::new(), }) } @@ -105,7 +111,18 @@ impl JwtManager { format!("{COMPONENT} (error: {e}) - failed to get decoding key") })?, }; - JwtManager::new(persister, path, issuer, validator) + let mut manager = JwtManager::new(persister, path, issuer, validator)?; + + if let Some(trusted_issuers) = config.trusted_issuers.as_ref() { + for issuer_config in trusted_issuers { + let normalized_issuer = normalize_issuer_url(&issuer_config.issuer); + manager + .trusted_issuer + .insert(normalized_issuer, issuer_config.clone()); + } + } + + Ok(manager) } fn create_validation( @@ -181,8 +198,8 @@ impl JwtManager { let nbf = iat + self.issuer.not_before.as_secs() as u64; let claims = JwtClaims { jti: uuid::Uuid::now_v7().to_string(), - sub: user_id, - aud: self.issuer.audience.to_string(), + sub: user_id.to_string(), + aud: Audience::from(self.issuer.audience.clone()), iss: self.issuer.issuer.to_string(), iat, exp, @@ -210,7 +227,19 @@ impl JwtManager { let token_header = jsonwebtoken::decode_header(token).map_err(|_| IggyError::InvalidAccessToken)?; - let jwt_claims = self.decode(token, token_header.alg)?; + let jwt_claims = self.decode(token, token_header.alg).await?; + + // Security fix: Reject A2A tokens from external trusted issuers + // A2A tokens should not be refreshable - they have their own lifecycle + let normalized_iss = normalize_issuer_url(&jwt_claims.claims.iss); + if self.trusted_issuer.contains_key(&normalized_iss) { + error!( + "Cannot refresh A2A token from external issuer: {}", + jwt_claims.claims.iss + ); + return Err(IggyError::InvalidAccessToken); + } + let id = jwt_claims.claims.jti; let expiry = jwt_claims.claims.exp; if self @@ -232,26 +261,95 @@ impl JwtManager { .error(|e: &IggyError| { format!("{COMPONENT} (error: {e}) - failed to save revoked access token: {id}") })?; - self.generate(jwt_claims.claims.sub) + let user_id = jwt_claims + .claims + .sub + .parse::() + .map_err(|_| IggyError::InvalidAccessToken)?; + self.generate(user_id) } - pub fn decode( + pub async fn decode( &self, token: &str, algorithm: Algorithm, ) -> Result, IggyError> { let validation = self.validations.get(&algorithm); - if validation.is_none() { - return Err(IggyError::InvalidJwtAlgorithm( - Self::map_algorithm_to_string(algorithm), - )); - } + let kid = jsonwebtoken::decode_header(token).ok().and_then(|h| h.kid); + + // try to decode using JWKS if it's a trusted issuer + let insecure = match jsonwebtoken::dangerous::insecure_decode::(token) { + Ok(claims) => claims, + Err(_) => { + error!("Failed to decode JWT insecurely"); + return self.decode_with_fallback(token, validation, algorithm); + } + }; + + let normalized_iss = normalize_issuer_url(&insecure.claims.iss); + let config = match self.trusted_issuer.get(&normalized_iss) { + Some(config) => config, + None => { + debug!("No trusted issuer found for: {}", insecure.claims.iss); + return self.decode_with_fallback(token, validation, algorithm); + } + }; - let validation = validation.unwrap(); - match jsonwebtoken::decode::(token, &self.validator.key, validation) { - Ok(claims) => Ok(claims), - _ => Err(IggyError::Unauthenticated), + if config.user_id == 0 { + error!( + "A2A token cannot map to root user (user_id = 0) for issuer: {}", + config.issuer + ); + return Err(IggyError::Unauthenticated); } + + let kid_str = match kid.as_deref() { + Some(kid) => kid, + None => { + error!("No kid found in JWT header"); + return self.decode_with_fallback(token, validation, algorithm); + } + }; + + let decoding_key = match self + .jwks_client + .get_key(&config.issuer, &config.jwks_url, kid_str) + .await + { + Some(key) => key, + None => { + error!("Failed to get decoding key from JWKS for kid: {}", kid_str); + return self.decode_with_fallback(token, validation, algorithm); + } + }; + let mut validation = Validation::new(algorithm); + validation.set_issuer(std::slice::from_ref(&config.issuer)); + validation.set_audience(std::slice::from_ref(&config.audience)); + + let mut result = jsonwebtoken::decode::(token, &decoding_key, &validation) + .map_err(|e| { + error!("Failed to decode JWT: {}", e); + IggyError::Unauthenticated + })?; + + result.claims.sub = config.user_id.to_string(); + + Ok(result) + } + + /// fallback to standard JWT validation if JWKS validation fails + fn decode_with_fallback( + &self, + token: &str, + validation: Option<&Validation>, + algorithm: Algorithm, + ) -> Result, IggyError> { + let validation = validation.ok_or_else(|| { + IggyError::InvalidJwtAlgorithm(Self::map_algorithm_to_string(algorithm)) + })?; + + jsonwebtoken::decode::(token, &self.validator.key, validation) + .map_err(|_| IggyError::Unauthenticated) } fn map_algorithm_to_string(algorithm: Algorithm) -> String { @@ -290,3 +388,71 @@ impl JwtManager { revoked_tokens.contains_key(token_id) } } + +/// Normalize issuer URL by lowercasing scheme and host, preserving path case +/// +/// Example: "HTTPS://Example.COM/PATH" -> "https://example.com/PATH" +fn normalize_issuer_url(url: &str) -> String { + match url.split_once("://") { + Some((scheme, rest)) => { + let scheme = scheme.to_lowercase(); + // Find end of host (first '/' or end of string) + let (host, path) = match rest.find('/') { + Some(idx) => rest.split_at(idx), + None => (rest, ""), + }; + format!("{}://{}{}", scheme, host.to_lowercase(), path) + } + None => url.trim_end_matches('/').to_lowercase(), + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_normalize_issuer_url_basic() { + assert_eq!( + normalize_issuer_url("HTTPS://Example.COM/PATH"), + "https://example.com/PATH" + ); + } + + #[test] + fn test_normalize_issuer_url_no_path() { + assert_eq!( + normalize_issuer_url("HTTPS://Example.COM"), + "https://example.com" + ); + } + + #[test] + fn test_normalize_issuer_url_no_scheme() { + assert_eq!(normalize_issuer_url("Example.COM"), "example.com"); + } + + #[test] + fn test_normalize_issuer_url_trailing_slash() { + assert_eq!( + normalize_issuer_url("HTTPS://Example.COM/"), + "https://example.com/" + ); + } + + #[test] + fn test_normalize_issuer_url_preserves_path_case() { + assert_eq!( + normalize_issuer_url("https://EXAMPLE.com/MyPath/SubPath"), + "https://example.com/MyPath/SubPath" + ); + } + + #[test] + fn test_normalize_issuer_url_already_normalized() { + assert_eq!( + normalize_issuer_url("https://example.com/path"), + "https://example.com/path" + ); + } +} diff --git a/core/server/src/http/jwt/middleware.rs b/core/server/src/http/jwt/middleware.rs index 910e02ccb7..dfef00b038 100644 --- a/core/server/src/http/jwt/middleware.rs +++ b/core/server/src/http/jwt/middleware.rs @@ -26,7 +26,6 @@ use axum::{ response::Response, }; use err_trail::ErrContext; -use iggy_common::IggyError; use std::sync::Arc; const COMPONENT: &str = "JWT_MIDDLEWARE"; @@ -79,9 +78,7 @@ pub async fn jwt_auth( let jwt_claims = state .jwt_manager .decode(jwt_token, token_header.alg) - .error(|e: &IggyError| { - format!("{COMPONENT} (error: {e}) - failed to decode JWT with provided algorithm") - }) + .await .map_err(|_| UNAUTHORIZED)?; if state .jwt_manager @@ -92,10 +89,15 @@ pub async fn jwt_auth( } let request_details = request.extensions().get::().unwrap(); + let user_id = jwt_claims + .claims + .sub + .parse::() + .map_err(|_| UNAUTHORIZED)?; let identity = Identity { token_id: jwt_claims.claims.jti, token_expiry: jwt_claims.claims.exp, - user_id: jwt_claims.claims.sub, + user_id, ip_address: request_details.ip_address, }; request.extensions_mut().insert(identity); diff --git a/core/server/src/http/jwt/mod.rs b/core/server/src/http/jwt/mod.rs index 5481f110c4..d5e413cb4d 100644 --- a/core/server/src/http/jwt/mod.rs +++ b/core/server/src/http/jwt/mod.rs @@ -17,6 +17,7 @@ */ pub mod json_web_token; +pub mod jwks; pub mod jwt_manager; pub mod middleware; pub mod storage;