diff --git a/buildSrc/src/main/kotlin/CrateSet.kt b/buildSrc/src/main/kotlin/CrateSet.kt index 1924badc68..1bb86b41fa 100644 --- a/buildSrc/src/main/kotlin/CrateSet.kt +++ b/buildSrc/src/main/kotlin/CrateSet.kt @@ -63,12 +63,14 @@ object CrateSet { "aws-smithy-compression", "aws-smithy-client", "aws-smithy-eventstream", + "aws-smithy-experimental", "aws-smithy-http", "aws-smithy-http-auth", "aws-smithy-http-tower", "aws-smithy-json", "aws-smithy-mocks-experimental", - "aws-smithy-experimental", + "aws-smithy-observability", + "aws-smithy-observability-otel", "aws-smithy-protocol-test", "aws-smithy-query", "aws-smithy-runtime", diff --git a/rust-runtime/Cargo.lock b/rust-runtime/Cargo.lock index c87622af04..dd99058fb6 100644 --- a/rust-runtime/Cargo.lock +++ b/rust-runtime/Cargo.lock @@ -121,10 +121,144 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" dependencies = [ "concurrent-queue", - "event-listener", + "event-listener 2.5.3", "futures-core", ] +[[package]] +name = "async-channel" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89b47800b0be77592da0afd425cc03468052844aff33b84e33cc696f64e77b6a" +dependencies = [ + "concurrent-queue", + "event-listener-strategy", + "futures-core", + "pin-project-lite", +] + +[[package]] +name = "async-executor" +version = "1.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30ca9a001c1e8ba5149f91a74362376cc6bc5b919d92d988668657bd570bdcec" +dependencies = [ + "async-task", + "concurrent-queue", + "fastrand", + "futures-lite", + "slab", +] + +[[package]] +name = "async-global-executor" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05b1b633a2115cd122d73b955eadd9916c18c8f510ec9cd1686404c60ad1c29c" +dependencies = [ + "async-channel 2.3.1", + "async-executor", + "async-io", + "async-lock", + "blocking", + "futures-lite", + "once_cell", +] + +[[package]] +name = "async-io" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43a2b323ccce0a1d90b449fd71f2a06ca7faa7c54c2751f06c9bd851fc061059" +dependencies = [ + "async-lock", + "cfg-if", + "concurrent-queue", + "futures-io", + "futures-lite", + "parking", + "polling", + "rustix", + "slab", + "tracing", + "windows-sys 0.59.0", +] + +[[package]] +name = "async-lock" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18" +dependencies = [ + "event-listener 5.4.0", + "event-listener-strategy", + "pin-project-lite", +] + +[[package]] +name = "async-process" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63255f1dc2381611000436537bbedfe83183faa303a5a0edaf191edef06526bb" +dependencies = [ + "async-channel 2.3.1", + "async-io", + "async-lock", + "async-signal", + "async-task", + "blocking", + "cfg-if", + "event-listener 5.4.0", + "futures-lite", + "rustix", + "tracing", +] + +[[package]] +name = "async-signal" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "637e00349800c0bdf8bfc21ebbc0b6524abea702b0da4168ac00d070d0c0b9f3" +dependencies = [ + "async-io", + "async-lock", + "atomic-waker", + "cfg-if", + "futures-core", + "futures-io", + "rustix", + "signal-hook-registry", + "slab", + "windows-sys 0.59.0", +] + +[[package]] +name = "async-std" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c634475f29802fde2b8f0b505b1bd00dfe4df7d4a000f0b36f7671197d5c3615" +dependencies = [ + "async-channel 1.9.0", + "async-global-executor", + "async-io", + "async-lock", + "async-process", + "crossbeam-utils", + "futures-channel", + "futures-core", + "futures-io", + "futures-lite", + "gloo-timers", + "kv-log-macro", + "log", + "memchr", + "once_cell", + "pin-project-lite", + "pin-utils", + "slab", + "wasm-bindgen-futures", +] + [[package]] name = "async-stream" version = "0.3.6" @@ -147,6 +281,23 @@ dependencies = [ "syn 2.0.96", ] +[[package]] +name = "async-task" +version = "4.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" + +[[package]] +name = "async-trait" +version = "0.1.85" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f934833b4b7233644e5848f235df3f57ed8c80f1528a26c3dfa13d2147fa056" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.96", +] + [[package]] name = "atomic-waker" version = "1.1.2" @@ -337,7 +488,7 @@ dependencies = [ [[package]] name = "aws-smithy-cbor" -version = "0.60.8" +version = "0.61.0" dependencies = [ "aws-smithy-types 1.2.12", "criterion", @@ -609,6 +760,30 @@ dependencies = [ "tokio", ] +[[package]] +name = "aws-smithy-observability" +version = "0.1.0" +dependencies = [ + "aws-smithy-runtime-api 1.7.3", + "once_cell", + "serial_test", +] + +[[package]] +name = "aws-smithy-observability-otel" +version = "0.1.0" +dependencies = [ + "async-global-executor", + "async-task", + "aws-smithy-observability", + "criterion", + "opentelemetry", + "opentelemetry_sdk", + "stats_alloc", + "tokio", + "value-bag", +] + [[package]] name = "aws-smithy-protocol-test" version = "0.63.0" @@ -1000,6 +1175,19 @@ dependencies = [ "generic-array", ] +[[package]] +name = "blocking" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "703f41c54fc768e63e091340b424302bb1c29ef4aa0c7f10fe849dfb114d29ea" +dependencies = [ + "async-channel 2.3.1", + "async-task", + "futures-io", + "futures-lite", + "piper", +] + [[package]] name = "bs58" version = "0.5.1" @@ -1327,6 +1515,7 @@ dependencies = [ "ciborium", "clap 4.5.26", "criterion-plot", + "futures", "is-terminal", "itertools 0.10.5", "num-traits", @@ -1339,6 +1528,7 @@ dependencies = [ "serde_derive", "serde_json", "tinytemplate", + "tokio", "walkdir", ] @@ -1563,6 +1753,27 @@ version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" +[[package]] +name = "event-listener" +version = "5.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3492acde4c3fc54c845eaab3eed8bd00c7a7d881f78bfc801e43a93dec1331ae" +dependencies = [ + "concurrent-queue", + "parking", + "pin-project-lite", +] + +[[package]] +name = "event-listener-strategy" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c3e4e0dd3673c1139bf041f3008816d9cf2946bbfac2945c09e523b8d7b05b2" +dependencies = [ + "event-listener 5.4.0", + "pin-project-lite", +] + [[package]] name = "extend" version = "0.1.2" @@ -1676,6 +1887,19 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" +[[package]] +name = "futures-lite" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5edaec856126859abb19ed65f39e90fea3a9574b9707f13539acf4abf7eb532" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "parking", + "pin-project-lite", +] + [[package]] name = "futures-macro" version = "0.3.31" @@ -1750,6 +1974,18 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2" +[[package]] +name = "gloo-timers" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "group" version = "0.12.1" @@ -2321,6 +2557,15 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "kv-log-macro" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" +dependencies = [ + "log", +] + [[package]] name = "lambda_http" version = "0.8.3" @@ -2436,6 +2681,9 @@ name = "log" version = "0.4.25" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "04cbf5b083de1c7e0222a7a51dbfdba1cbe1c6ab0b15e29fff3f6c077fd9cd9f" +dependencies = [ + "value-bag", +] [[package]] name = "lru" @@ -2488,9 +2736,9 @@ checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" [[package]] name = "minicbor" -version = "0.24.2" +version = "0.24.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f8e213c36148d828083ae01948eed271d03f95f7e72571fa242d78184029af2" +checksum = "29be4f60e41fde478b36998b88821946aafac540e53591e76db53921a0cc225b" dependencies = [ "half", "minicbor-derive", @@ -2635,6 +2883,42 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "opentelemetry" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "570074cc999d1a58184080966e5bd3bf3a9a4af650c3b05047c2621e7405cd17" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2c627d9f4c9cdc1f21a29ee4bfbd6028fcb8bcf2a857b43f3abdf72c9c862f3" +dependencies = [ + "async-std", + "async-trait", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "once_cell", + "opentelemetry", + "percent-encoding", + "rand", + "serde_json", + "thiserror", + "tokio", + "tokio-stream", +] + [[package]] name = "os_str_bytes" version = "6.6.1" @@ -2643,9 +2927,9 @@ checksum = "e2355d85b9a3786f481747ced0e0ff2ba35213a1f9bd406ed906554d7af805a1" [[package]] name = "outref" -version = "0.5.1" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4030760ffd992bef45b0ae3f10ce1aba99e33464c90d14dd7c039884963ddc7a" +checksum = "1a80800c0488c3a21695ea981a54918fbb37abf04f4d0720c453632255e2ff0e" [[package]] name = "overload" @@ -2664,6 +2948,12 @@ dependencies = [ "sha2", ] +[[package]] +name = "parking" +version = "2.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f38d5652c16fde515bb1ecef450ab0f6a219d619a7274976324d5e377f7dceba" + [[package]] name = "parking_lot" version = "0.12.3" @@ -2740,6 +3030,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "piper" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96c8c490f422ef9a4efd2cb5b42b76c8613d7e7dfc1caf667b8a3350a5acc066" +dependencies = [ + "atomic-waker", + "fastrand", + "futures-io", +] + [[package]] name = "pkcs8" version = "0.9.0" @@ -2778,6 +3079,21 @@ dependencies = [ "plotters-backend", ] +[[package]] +name = "polling" +version = "3.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a604568c3202727d1507653cb121dbd627a58684eb09a820fd746bee38b4442f" +dependencies = [ + "cfg-if", + "concurrent-queue", + "hermit-abi 0.4.0", + "pin-project-lite", + "rustix", + "tracing", + "windows-sys 0.59.0", +] + [[package]] name = "powerfmt" version = "0.2.0" @@ -2889,7 +3205,7 @@ version = "0.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3564762e37035cfc486228e10b0528460fa026d681b5763873c693aa0d5c260" dependencies = [ - "async-channel", + "async-channel 1.9.0", "clap 3.2.25", "futures", "inventory", @@ -3315,6 +3631,15 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "scc" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28e1c91382686d21b5ac7959341fcb9780fa7c03773646995a87c950fa7be640" +dependencies = [ + "sdd", +] + [[package]] name = "schannel" version = "0.1.27" @@ -3340,6 +3665,12 @@ dependencies = [ "untrusted 0.9.0", ] +[[package]] +name = "sdd" +version = "3.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "478f121bb72bbf63c52c93011ea1791dca40140dfe13f8336c4c5ac952c33aa9" + [[package]] name = "sec1" version = "0.3.0" @@ -3466,6 +3797,31 @@ dependencies = [ "serde", ] +[[package]] +name = "serial_test" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b258109f244e1d6891bf1053a55d63a5cd4f8f4c30cf9a1280989f80e7a1fa9" +dependencies = [ + "futures", + "log", + "once_cell", + "parking_lot", + "scc", + "serial_test_derive", +] + +[[package]] +name = "serial_test_derive" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d69265a08751de7844521fd15003ae0a888e035773ba05695c5c759a6f89eef" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.96", +] + [[package]] name = "sha1" version = "0.10.6" @@ -3586,6 +3942,12 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" +[[package]] +name = "stats_alloc" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c0e04424e733e69714ca1bbb9204c1a57f09f5493439520f9f68c132ad25eec" + [[package]] name = "strsim" version = "0.10.0" @@ -4162,6 +4524,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +[[package]] +name = "value-bag" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ef4c4aa54d5d05a279399bfa921ec387b7aba77caf7a682ae8d86785b8fdad2" + [[package]] name = "version_check" version = "0.9.5" @@ -4243,6 +4611,19 @@ dependencies = [ "wasm-bindgen-shared", ] +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.50" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "555d470ec0bc3bb57890405e5d4322cc9ea83cebb085523ced7be4144dac1e61" +dependencies = [ + "cfg-if", + "js-sys", + "once_cell", + "wasm-bindgen", + "web-sys", +] + [[package]] name = "wasm-bindgen-macro" version = "0.2.100" diff --git a/rust-runtime/Cargo.toml b/rust-runtime/Cargo.toml index d7853b0099..4c8b6ffed3 100644 --- a/rust-runtime/Cargo.toml +++ b/rust-runtime/Cargo.toml @@ -24,4 +24,6 @@ members = [ "aws-smithy-mocks-experimental", "aws-smithy-experimental", "aws-smithy-xml", + "aws-smithy-observability", + "aws-smithy-observability-otel", ] diff --git a/rust-runtime/aws-smithy-observability-otel/Cargo.toml b/rust-runtime/aws-smithy-observability-otel/Cargo.toml new file mode 100644 index 0000000000..f7b11ba27f --- /dev/null +++ b/rust-runtime/aws-smithy-observability-otel/Cargo.toml @@ -0,0 +1,44 @@ +[package] +name = "aws-smithy-observability-otel" +version = "0.1.0" +authors = [ + "AWS Rust SDK Team ", +] +description = "Smithy OpenTelemetry observability implementation." +edition = "2021" +license = "Apache-2.0" +repository = "https://github.com/awslabs/smithy-rs" + +[dependencies] +aws-smithy-observability = { path = "../aws-smithy-observability" } +opentelemetry = {version = "0.26.0", features = ["metrics"]} + +# The below dependencies are transitive, but must be pinned until the MSRV of +# OTel catches up with ours +value-bag = "1.10.0" +async-global-executor = "2.4.1" +async-task = "=4.7.1" + +# This crate cannot be used on powerpc +[target.'cfg(not(target_arch = "powerpc"))'.dependencies] +opentelemetry_sdk = {version = "0.26.0", features = ["metrics", "testing"]} + +[dev-dependencies] +tokio = { version = "1.23.1" } +criterion = {version = "0.5.1", features = ["async_tokio"]} +stats_alloc = "0.1.10" + +[package.metadata.docs.rs] +all-features = true +targets = ["x86_64-unknown-linux-gnu"] +cargo-args = ["-Zunstable-options", "-Zrustdoc-scrape-examples"] +rustdoc-args = ["--cfg", "docsrs"] +# End of docs.rs metadata + +[[bench]] +name = "sync_instruments" +harness = false + +[[bench]] +name = "async_instruments" +harness = false diff --git a/rust-runtime/aws-smithy-observability-otel/LICENSE b/rust-runtime/aws-smithy-observability-otel/LICENSE new file mode 100644 index 0000000000..67db858821 --- /dev/null +++ b/rust-runtime/aws-smithy-observability-otel/LICENSE @@ -0,0 +1,175 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. diff --git a/rust-runtime/aws-smithy-observability-otel/README.md b/rust-runtime/aws-smithy-observability-otel/README.md new file mode 100644 index 0000000000..8b12a3f044 --- /dev/null +++ b/rust-runtime/aws-smithy-observability-otel/README.md @@ -0,0 +1,7 @@ +# aws-smithy-observability-otel + +This crate contains OpenTelemetry based implementations of the metrics traits from the `aws-smithy-observability` crate. + + +This crate is part of the [AWS SDK for Rust](https://awslabs.github.io/aws-sdk-rust/) and the [smithy-rs](https://github.com/smithy-lang/smithy-rs) code generator. In most cases, it should not be used directly. + diff --git a/rust-runtime/aws-smithy-observability-otel/benches/async_instruments.rs b/rust-runtime/aws-smithy-observability-otel/benches/async_instruments.rs new file mode 100644 index 0000000000..014394f5cc --- /dev/null +++ b/rust-runtime/aws-smithy-observability-otel/benches/async_instruments.rs @@ -0,0 +1,104 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +use aws_smithy_observability::attributes::{AttributeValue, Attributes}; +use aws_smithy_observability::meter::{AsyncMeasure, Meter, ProvideMeter}; +use aws_smithy_observability::provider::TelemetryProvider; +use aws_smithy_observability_otel::meter::{ + AsyncInstrumentWrap, AwsSdkOtelMeterProvider, MeterWrap, +}; +use criterion::{criterion_group, criterion_main, Criterion}; +use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider}; +use opentelemetry_sdk::runtime::Tokio; +use opentelemetry_sdk::testing::metrics::InMemoryMetricsExporter; +use std::sync::Arc; + +use stats_alloc::{Region, StatsAlloc, INSTRUMENTED_SYSTEM}; +use std::alloc::System; + +async fn record_async_instruments(dyn_sdk_meter: Arc) { + //Create all async instruments and record some data + let gauge = dyn_sdk_meter.create_gauge( + "TestGauge".to_string(), + // Callback function records another value with different attributes so it is deduped + |measurement: &AsyncInstrumentWrap<'_, f64>| { + let mut attrs = Attributes::new(); + attrs.set( + "TestGaugeAttr", + AttributeValue::String("TestGaugeAttr".into()), + ); + measurement.record(6.789, Some(&attrs), None); + }, + None, + None, + ); + gauge.record(1.234, None, None); + + let async_ud_counter = dyn_sdk_meter.create_async_up_down_counter( + "TestAsyncUpDownCounter".to_string(), + |measurement: &AsyncInstrumentWrap<'_, i64>| { + let mut attrs = Attributes::new(); + attrs.set( + "TestAsyncUpDownCounterAttr", + AttributeValue::String("TestAsyncUpDownCounterAttr".into()), + ); + measurement.record(12, Some(&attrs), None); + }, + None, + None, + ); + async_ud_counter.record(-6, None, None); + + let async_mono_counter = dyn_sdk_meter.create_async_monotonic_counter( + "TestAsyncMonoCounter".to_string(), + |measurement: &AsyncInstrumentWrap<'_, u64>| { + let mut attrs = Attributes::new(); + attrs.set( + "TestAsyncMonoCounterAttr", + AttributeValue::String("TestAsyncMonoCounterAttr".into()), + ); + measurement.record(123, Some(&attrs), None); + }, + None, + None, + ); + async_mono_counter.record(4, None, None); +} + +fn async_instruments_benchmark(c: &mut Criterion) { + #[global_allocator] + static GLOBAL: &StatsAlloc = &INSTRUMENTED_SYSTEM; + let reg = Region::new(&GLOBAL); + + // Setup the Otel MeterProvider (which needs to be done inside an async runtime) + // The runtime is reused later for running the bench function + let runtime = tokio::runtime::Runtime::new().unwrap(); + let otel_mp = runtime.block_on(async { + let exporter = InMemoryMetricsExporter::default(); + let reader = PeriodicReader::builder(exporter.clone(), Tokio).build(); + SdkMeterProvider::builder().with_reader(reader).build() + }); + + // Create the SDK metrics types from the OTel objects + let sdk_mp = AwsSdkOtelMeterProvider::new(otel_mp); + let sdk_tp = TelemetryProvider::builder() + .meter_provider(sdk_mp) + .build() + .unwrap(); + + // Get the dyn versions of the SDK metrics objects + let dyn_sdk_mp = sdk_tp.meter_provider(); + let dyn_sdk_meter = dyn_sdk_mp.get_meter("TestMeter", None); + + c.bench_function("async_instruments", |b| { + b.to_async(&runtime) + .iter(|| async { record_async_instruments(dyn_sdk_meter.clone()) }); + }); + println!("FIINISHING"); + println!("Stats at end: {:#?}", reg.change()); +} + +criterion_group!(benches, async_instruments_benchmark); +criterion_main!(benches); diff --git a/rust-runtime/aws-smithy-observability-otel/benches/sync_instruments.rs b/rust-runtime/aws-smithy-observability-otel/benches/sync_instruments.rs new file mode 100644 index 0000000000..e9226ae1ae --- /dev/null +++ b/rust-runtime/aws-smithy-observability-otel/benches/sync_instruments.rs @@ -0,0 +1,67 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +use aws_smithy_observability::meter::{ + Histogram, Meter, MonotonicCounter, ProvideMeter, UpDownCounter, +}; +use aws_smithy_observability::provider::TelemetryProvider; +use aws_smithy_observability_otel::meter::{AwsSdkOtelMeterProvider, MeterWrap}; +use criterion::{criterion_group, criterion_main, Criterion}; +use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider}; +use opentelemetry_sdk::runtime::Tokio; +use opentelemetry_sdk::testing::metrics::InMemoryMetricsExporter; +use std::sync::Arc; + +use stats_alloc::{Region, StatsAlloc, INSTRUMENTED_SYSTEM}; +use std::alloc::System; + +async fn record_sync_instruments(sdk_meter: Arc) { + //Create all 3 sync instruments and record some data for each + let mono_counter = + sdk_meter.create_monotonic_counter("TestMonoCounter".to_string(), None, None); + mono_counter.add(4, None, None); + + let ud_counter = sdk_meter.create_up_down_counter("TestUpDownCounter".to_string(), None, None); + ud_counter.add(-6, None, None); + + let histogram = sdk_meter.create_histogram("TestHistogram".to_string(), None, None); + histogram.record(1.234, None, None); +} + +fn sync_instruments_benchmark(c: &mut Criterion) { + #[global_allocator] + static GLOBAL: &StatsAlloc = &INSTRUMENTED_SYSTEM; + let reg = Region::new(&GLOBAL); + + // Setup the Otel MeterProvider (which needs to be done inside an async runtime) + // The runtime is reused later for running the bench function + let runtime = tokio::runtime::Runtime::new().unwrap(); + let otel_mp = runtime.block_on(async { + let exporter = InMemoryMetricsExporter::default(); + let reader = PeriodicReader::builder(exporter.clone(), Tokio).build(); + SdkMeterProvider::builder().with_reader(reader).build() + }); + + // Create the SDK metrics types from the OTel objects + let sdk_mp = AwsSdkOtelMeterProvider::new(otel_mp); + let sdk_tp = TelemetryProvider::builder() + .meter_provider(sdk_mp) + .build() + .unwrap(); + + // Get the dyn versions of the SDK metrics objects + let dyn_sdk_mp = sdk_tp.meter_provider(); + let sdk_meter = dyn_sdk_mp.get_meter("TestMeter", None); + + c.bench_function("sync_instruments", |b| { + b.to_async(&runtime) + .iter(|| async { record_sync_instruments(sdk_meter.clone()) }); + }); + println!("FIINISHING"); + println!("Stats at end: {:#?}", reg.change()); +} + +criterion_group!(benches, sync_instruments_benchmark); +criterion_main!(benches); diff --git a/rust-runtime/aws-smithy-observability-otel/external-types.toml b/rust-runtime/aws-smithy-observability-otel/external-types.toml new file mode 100644 index 0000000000..717c67d5c2 --- /dev/null +++ b/rust-runtime/aws-smithy-observability-otel/external-types.toml @@ -0,0 +1,14 @@ +allowed_external_types = [ + "aws_smithy_observability::attributes::Context", + "aws_smithy_observability::attributes::Scope", + "aws_smithy_observability::error::ObservabilityError", + "aws_smithy_observability::meter::AsyncMeasure", + "aws_smithy_observability::meter::Histogram", + "aws_smithy_observability::meter::Meter", + "aws_smithy_observability::meter::MonotonicCounter", + "aws_smithy_observability::meter::ProvideMeter", + "aws_smithy_observability::meter::UpDownCounter", + "aws_smithy_observability::provider::TelemetryProvider", + "opentelemetry::metrics::meter::Meter", + "opentelemetry_sdk::metrics::meter_provider::SdkMeterProvider", +] diff --git a/rust-runtime/aws-smithy-observability-otel/src/attributes.rs b/rust-runtime/aws-smithy-observability-otel/src/attributes.rs new file mode 100644 index 0000000000..ef735b7a30 --- /dev/null +++ b/rust-runtime/aws-smithy-observability-otel/src/attributes.rs @@ -0,0 +1,140 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +//! Utilities to transform back and forth from Smithy Observability [Attributes] to +//! OTel [KeyValue]s. + +use std::ops::Deref; + +use aws_smithy_observability::attributes::{AttributeValue, Attributes}; +use opentelemetry::{KeyValue, Value}; + +pub(crate) struct AttributesWrap(Attributes); +impl AttributesWrap { + pub(crate) fn new(inner: Attributes) -> Self { + Self(inner) + } +} +impl Deref for AttributesWrap { + type Target = Attributes; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +pub(crate) fn kv_from_option_attr(input: Option<&Attributes>) -> Vec { + input + .map(|attr| AttributesWrap::new(attr.clone())) + .unwrap_or(AttributesWrap::new(Attributes::new())) + .into() +} + +#[allow(dead_code)] +pub(crate) fn option_attr_from_kv(input: &[KeyValue]) -> Option { + if input.is_empty() { + return None; + } + + Some(AttributesWrap::from(input).0) +} + +impl From for Vec { + fn from(value: AttributesWrap) -> Self { + value + .0 + .into_attributes() + .map(|(k, v)| { + KeyValue::new( + k, + match v { + AttributeValue::I64(val) => Value::I64(val), + AttributeValue::F64(val) => Value::F64(val), + AttributeValue::String(val) => Value::String(val.into()), + AttributeValue::Bool(val) => Value::Bool(val), + unknown => Value::String( + format!("UNSUPPORTED ATTRIBUTE VALUE TYPE: {unknown:?}").into(), + ), + }, + ) + }) + .collect::>() + } +} + +impl From<&[KeyValue]> for AttributesWrap { + fn from(value: &[KeyValue]) -> Self { + let mut attrs = Attributes::new(); + + value.iter().for_each(|kv| { + attrs.set( + kv.key.clone(), + match &kv.value { + Value::Bool(val) => AttributeValue::Bool(*val), + Value::I64(val) => AttributeValue::I64(*val), + Value::F64(val) => AttributeValue::F64(*val), + Value::String(val) => AttributeValue::String(val.clone().into()), + Value::Array(val) => AttributeValue::String(format!( + "UNSUPPORTED ATTRIBUTE VALUE TYPE ARRAY: {val}" + )), + }, + ) + }); + + AttributesWrap(attrs) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashMap; + + use super::*; + use aws_smithy_observability::attributes::{AttributeValue, Attributes}; + use opentelemetry::Value; + + #[test] + fn attr_to_kv() { + let mut attrs = Attributes::new(); + attrs.set("I64", AttributeValue::I64(64)); + attrs.set("F64", AttributeValue::F64(64.0)); + attrs.set("String", AttributeValue::String("I AM A STRING".into())); + attrs.set("Bool", AttributeValue::Bool(true)); + + let kv = kv_from_option_attr(Some(&attrs)); + + let kv_map: HashMap = kv + .into_iter() + .map(|kv| (kv.key.to_string(), kv.value)) + .collect(); + + assert_eq!(kv_map.get("I64").unwrap(), &Value::I64(64)); + assert_eq!(kv_map.get("F64").unwrap(), &Value::F64(64.0)); + assert_eq!( + kv_map.get("String").unwrap(), + &Value::String("I AM A STRING".into()) + ); + assert_eq!(kv_map.get("Bool").unwrap(), &Value::Bool(true)); + } + + #[test] + fn kv_to_attr() { + let kvs: Vec = vec![ + KeyValue::new("Bool", Value::Bool(true)), + KeyValue::new("String", Value::String("I AM A STRING".into())), + KeyValue::new("I64", Value::I64(64)), + KeyValue::new("F64", Value::F64(64.0)), + ]; + + let attrs = option_attr_from_kv(&kvs).unwrap(); + assert_eq!(attrs.get("Bool").unwrap(), &AttributeValue::Bool(true)); + assert_eq!( + attrs.get("String").unwrap(), + &AttributeValue::String("I AM A STRING".into()) + ); + assert_eq!(attrs.get("I64").unwrap(), &AttributeValue::I64(64)); + assert_eq!(attrs.get("F64").unwrap(), &AttributeValue::F64(64.0)); + } +} diff --git a/rust-runtime/aws-smithy-observability-otel/src/lib.rs b/rust-runtime/aws-smithy-observability-otel/src/lib.rs new file mode 100644 index 0000000000..0fb3d8c0cf --- /dev/null +++ b/rust-runtime/aws-smithy-observability-otel/src/lib.rs @@ -0,0 +1,81 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +/* Automatically managed default lints */ +#![cfg_attr(docsrs, feature(doc_auto_cfg))] +/* End of automatically managed default lints */ +#![warn( + missing_docs, + rustdoc::missing_crate_level_docs, + unreachable_pub, + rust_2018_idioms +)] +// The `opentelemetry_sdk` crate uses std::sync::atomic::{AtomicI64, AtomicU64} which are not available on powerpc +#![cfg(not(target_arch = "powerpc"))] + +//! Smithy Observability OpenTelemetry +//TODO(smithyobservability): once we have finalized everything and integrated metrics with our runtime +// libraries update this with detailed usage docs and examples + +pub mod attributes; +pub mod meter; + +#[cfg(test)] +mod tests { + + use crate::meter::AwsSdkOtelMeterProvider; + use aws_smithy_observability::global::{get_telemetry_provider, set_telemetry_provider}; + use aws_smithy_observability::meter::{Meter, MonotonicCounter, ProvideMeter}; + use aws_smithy_observability::provider::TelemetryProvider; + use opentelemetry_sdk::metrics::{data::Sum, PeriodicReader, SdkMeterProvider}; + use opentelemetry_sdk::runtime::Tokio; + use opentelemetry_sdk::testing::metrics::InMemoryMetricsExporter; + + // Without these tokio settings this test just stalls forever on flushing the metrics pipeline + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn can_construct_set_and_use_otel_as_global_telemetry_provider() { + // Create the OTel metrics objects + let exporter = InMemoryMetricsExporter::default(); + let reader = PeriodicReader::builder(exporter.clone(), Tokio).build(); + let otel_mp = SdkMeterProvider::builder().with_reader(reader).build(); + + // Create the SDK metrics types from the OTel objects + let sdk_mp = AwsSdkOtelMeterProvider::new(otel_mp); + let sdk_tp = TelemetryProvider::builder() + .meter_provider(sdk_mp) + .build() + .unwrap(); + + // Set the global TelemetryProvider and then get it back out + let _ = set_telemetry_provider(sdk_tp); + let global_tp = get_telemetry_provider::().unwrap(); + + // Create an instrument and record a value + let global_meter = global_tp + .meter_provider() + .get_meter("TestGlobalMeter", None); + + let mono_counter = + global_meter.create_monotonic_counter("TestMonoCounter".into(), None, None); + mono_counter.add(4, None, None); + + // Flush metric pipeline and extract metrics from exporter + global_tp.meter_provider().shutdown().unwrap(); + let finished_metrics = exporter.get_finished_metrics().unwrap(); + + let extracted_mono_counter_data = &finished_metrics[0].scope_metrics[0].metrics[0] + .data + .as_any() + .downcast_ref::>() + .unwrap() + .data_points[0] + .value; + assert_eq!(extracted_mono_counter_data, &4); + + // Get the OTel TP out and shut it down + let foo = global_tp.meter_provider(); + foo.shutdown().unwrap(); + } +} diff --git a/rust-runtime/aws-smithy-observability-otel/src/meter.rs b/rust-runtime/aws-smithy-observability-otel/src/meter.rs new file mode 100644 index 0000000000..083cc9c72f --- /dev/null +++ b/rust-runtime/aws-smithy-observability-otel/src/meter.rs @@ -0,0 +1,579 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +//! OpenTelemetry based implementations of the Smithy Observability Meter traits. + +use std::fmt::Debug; +use std::ops::Deref; +use std::sync::Arc; + +use crate::attributes::kv_from_option_attr; +use aws_smithy_observability::attributes::{Attributes, Context, Scope}; +use aws_smithy_observability::error::{ErrorKind, ObservabilityError}; +pub use aws_smithy_observability::meter::{ + AsyncMeasure, Histogram, Meter, MonotonicCounter, ProvideMeter, UpDownCounter, +}; +pub use aws_smithy_observability::provider::TelemetryProvider; +use opentelemetry::metrics::{ + AsyncInstrument as OtelAsyncInstrument, Counter as OtelCounter, Histogram as OtelHistogram, + Meter as OtelMeter, MeterProvider as OtelMeterProvider, + ObservableCounter as OtelObservableCounter, ObservableGauge as OtelObservableGauge, + ObservableUpDownCounter as OtelObservableUpDownCounter, UpDownCounter as OtelUpDownCounter, +}; +use opentelemetry_sdk::metrics::SdkMeterProvider as OtelSdkMeterProvider; + +#[derive(Debug)] +#[non_exhaustive] +/// A struct holding a noop implementation of the [Scope] trait. +pub struct ScopeWrap; + +impl Scope for ScopeWrap { + fn end(&self) {} +} + +#[derive(Debug)] +#[non_exhaustive] +/// A struct holding a noop implementation of the [Context] trait. +pub struct ContextWrap; +impl Context for ContextWrap { + type Scope = ScopeWrap; + + fn make_current(&self) -> &Self::Scope { + todo!() + } +} + +#[derive(Debug)] +#[non_exhaustive] +/// A wrapper for the [`OtelUpDownCounter`] type. +pub struct UpDownCounterWrap(OtelUpDownCounter); +impl UpDownCounter for UpDownCounterWrap { + type Context = ContextWrap; + + fn add(&self, value: i64, attributes: Option<&Attributes>, _context: Option<&ContextWrap>) { + self.0.add(value, &kv_from_option_attr(attributes)); + } +} + +#[derive(Debug)] +#[non_exhaustive] +/// A wrapper for the [`OtelHistogram`] type. +pub struct HistogramWrap(OtelHistogram); +impl Histogram for HistogramWrap { + type Context = ContextWrap; + + fn record(&self, value: f64, attributes: Option<&Attributes>, _context: Option<&ContextWrap>) { + self.0.record(value, &kv_from_option_attr(attributes)); + } +} + +#[derive(Debug)] +#[non_exhaustive] +/// A wrapper for the [`OtelCounter`] type. +pub struct MonotonicCounterWrap(OtelCounter); +impl MonotonicCounter for MonotonicCounterWrap { + type Context = ContextWrap; + + fn add(&self, value: u64, attributes: Option<&Attributes>, _context: Option<&ContextWrap>) { + self.0.add(value, &kv_from_option_attr(attributes)); + } +} + +#[derive(Debug)] +#[non_exhaustive] +/// A wrapper for the [`OtelObservableGauge`] type. +pub struct GaugeWrap(OtelObservableGauge); +impl AsyncMeasure for GaugeWrap { + type Value = f64; + type Context = ContextWrap; + + fn record( + &self, + value: Self::Value, + attributes: Option<&Attributes>, + _context: Option<&ContextWrap>, + ) { + self.0.observe(value, &kv_from_option_attr(attributes)); + } + + // OTel rust does not currently support unregistering callbacks + // https://github.com/open-telemetry/opentelemetry-rust/issues/2245 + fn stop(&self) {} +} + +#[derive(Debug)] +#[non_exhaustive] +/// A wrapper for the [`OtelObservableUpDownCounter`] type. +pub struct AsyncUpDownCounterWrap(OtelObservableUpDownCounter); +impl AsyncMeasure for AsyncUpDownCounterWrap { + type Value = i64; + type Context = ContextWrap; + + fn record( + &self, + value: Self::Value, + attributes: Option<&Attributes>, + _context: Option<&ContextWrap>, + ) { + self.0.observe(value, &kv_from_option_attr(attributes)); + } + + // OTel rust does not currently support unregistering callbacks + // https://github.com/open-telemetry/opentelemetry-rust/issues/2245 + fn stop(&self) {} +} + +#[derive(Debug)] +#[non_exhaustive] +/// A wrapper for the [`OtelObservableCounter`] type. +pub struct AsyncMonotonicCounterWrap(OtelObservableCounter); +impl AsyncMeasure for AsyncMonotonicCounterWrap { + type Value = u64; + type Context = ContextWrap; + + fn record( + &self, + value: Self::Value, + attributes: Option<&Attributes>, + _context: Option<&ContextWrap>, + ) { + self.0.observe(value, &kv_from_option_attr(attributes)); + } + + // OTel rust does not currently support unregistering callbacks + // https://github.com/open-telemetry/opentelemetry-rust/issues/2245 + fn stop(&self) {} +} + +#[non_exhaustive] +/// A wrapper for the [OtelAsyncInstrument] type. +pub struct AsyncInstrumentWrap<'a, T>(&'a (dyn OtelAsyncInstrument + Send + Sync)); +impl AsyncMeasure for AsyncInstrumentWrap<'_, T> { + type Value = T; + type Context = ContextWrap; + + fn record( + &self, + value: Self::Value, + attributes: Option<&Attributes>, + _context: Option<&ContextWrap>, + ) { + self.0.observe(value, &kv_from_option_attr(attributes)); + } + + // OTel rust does not currently support unregistering callbacks + // https://github.com/open-telemetry/opentelemetry-rust/issues/2245 + fn stop(&self) {} +} + +// The OtelAsyncInstrument trait does not have Debug as a supertrait, so we impl a minimal version +// for our wrapper struct +impl Debug for AsyncInstrumentWrap<'_, T> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_tuple("AsyncInstrumentWrap").finish() + } +} + +#[derive(Debug)] +#[non_exhaustive] +/// A wrapper for the [OtelMeter] type. +pub struct MeterWrap(OtelMeter); +impl Deref for MeterWrap { + type Target = OtelMeter; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Meter for MeterWrap { + type Context = ContextWrap; + type Gauge = GaugeWrap; + type UpDownCounter = UpDownCounterWrap; + type AsyncUDC = AsyncUpDownCounterWrap; + type MonotonicCounter = MonotonicCounterWrap; + type AsyncMC = AsyncMonotonicCounterWrap; + type Histogram = HistogramWrap; + type GaugeCallback<'a> = fn(&Self::GaugeCallbackInput<'_>); + type GaugeCallbackInput<'a> = AsyncInstrumentWrap<'a, f64>; + type AsyncUDCCallback<'a> = fn(&Self::AsyncUDCCallbackInput<'_>); + type AsyncUDCCallbackInput<'a> = AsyncInstrumentWrap<'a, i64>; + type AsyncMCCallback<'a> = fn(&Self::AsyncMCCallbackInput<'_>); + type AsyncMCCallbackInput<'a> = AsyncInstrumentWrap<'a, u64>; + + fn create_gauge( + &self, + name: String, + callback: fn(&AsyncInstrumentWrap<'_, f64>), + units: Option, + description: Option, + ) -> Arc { + let mut builder = self.f64_observable_gauge(name).with_callback( + move |input: &dyn OtelAsyncInstrument| { + callback(&AsyncInstrumentWrap(input)); + }, + ); + + if let Some(desc) = description { + builder = builder.with_description(desc); + } + + if let Some(u) = units { + builder = builder.with_unit(u); + } + + Arc::new(GaugeWrap(builder.init())) + } + + fn create_up_down_counter( + &self, + name: String, + units: Option, + description: Option, + ) -> Arc { + let mut builder = self.i64_up_down_counter(name); + if let Some(desc) = description { + builder = builder.with_description(desc); + } + + if let Some(u) = units { + builder = builder.with_unit(u); + } + + Arc::new(UpDownCounterWrap(builder.init())) + } + + fn create_async_up_down_counter( + &self, + name: String, + callback: fn(&AsyncInstrumentWrap<'_, i64>), + units: Option, + description: Option, + ) -> Arc { + let mut builder = self.i64_observable_up_down_counter(name).with_callback( + move |input: &dyn OtelAsyncInstrument| { + callback(&AsyncInstrumentWrap(input)); + }, + ); + + if let Some(desc) = description { + builder = builder.with_description(desc); + } + + if let Some(u) = units { + builder = builder.with_unit(u); + } + + Arc::new(AsyncUpDownCounterWrap(builder.init())) + } + + fn create_monotonic_counter( + &self, + name: String, + units: Option, + description: Option, + ) -> Arc { + let mut builder = self.u64_counter(name); + if let Some(desc) = description { + builder = builder.with_description(desc); + } + + if let Some(u) = units { + builder = builder.with_unit(u); + } + + Arc::new(MonotonicCounterWrap(builder.init())) + } + + fn create_async_monotonic_counter( + &self, + name: String, + callback: fn(&AsyncInstrumentWrap<'_, u64>), + units: Option, + description: Option, + ) -> Arc { + let mut builder = self.u64_observable_counter(name).with_callback( + move |input: &dyn OtelAsyncInstrument| { + callback(&AsyncInstrumentWrap(input)); + }, + ); + + if let Some(desc) = description { + builder = builder.with_description(desc); + } + + if let Some(u) = units { + builder = builder.with_unit(u); + } + + Arc::new(AsyncMonotonicCounterWrap(builder.init())) + } + + fn create_histogram( + &self, + name: String, + units: Option, + description: Option, + ) -> Arc { + let mut builder = self.f64_histogram(name); + if let Some(desc) = description { + builder = builder.with_description(desc); + } + + if let Some(u) = units { + builder = builder.with_unit(u); + } + + Arc::new(HistogramWrap(builder.init())) + } +} + +/// An OpenTelemetry based implementation of the AWS SDK's [ProvideMeter] trait +#[non_exhaustive] +#[derive(Debug)] +pub struct AwsSdkOtelMeterProvider { + meter_provider: OtelSdkMeterProvider, +} + +impl AwsSdkOtelMeterProvider { + /// Create a new [AwsSdkOtelMeterProvider] from an [OtelSdkMeterProvider]. + pub fn new(otel_meter_provider: OtelSdkMeterProvider) -> Self { + Self { + meter_provider: otel_meter_provider, + } + } + + /// Flush the metric pipeline. + pub fn flush(&self) -> Result<(), ObservabilityError> { + match self.meter_provider.force_flush() { + Ok(_) => Ok(()), + Err(err) => Err(ObservabilityError::new(ErrorKind::MetricsFlush, err)), + } + } + + /// Gracefully shutdown the metric pipeline. + pub fn shutdown(&self) -> Result<(), ObservabilityError> { + match self.meter_provider.force_flush() { + Ok(_) => Ok(()), + Err(err) => Err(ObservabilityError::new(ErrorKind::MetricsShutdown, err)), + } + } +} + +impl ProvideMeter for AwsSdkOtelMeterProvider { + type Meter = MeterWrap; + + fn get_meter(&self, scope: &'static str, _attributes: Option<&Attributes>) -> Arc { + Arc::new(MeterWrap(self.meter_provider.meter(scope))) + } +} + +#[cfg(test)] +mod tests { + + use super::*; + use aws_smithy_observability::attributes::{AttributeValue, Attributes}; + use aws_smithy_observability::meter::{ + AsyncMeasure, Histogram, Meter, MonotonicCounter, ProvideMeter, UpDownCounter, + }; + use aws_smithy_observability::provider::TelemetryProvider; + use opentelemetry_sdk::metrics::{ + data::{Gauge, Histogram as OtelHistogram, Sum}, + PeriodicReader, SdkMeterProvider, + }; + use opentelemetry_sdk::runtime::Tokio; + use opentelemetry_sdk::testing::metrics::InMemoryMetricsExporter; + + // Without these tokio settings this test just stalls forever on flushing the metrics pipeline + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn sync_instruments_work() { + // Create the OTel metrics objects + let exporter = InMemoryMetricsExporter::default(); + let reader = PeriodicReader::builder(exporter.clone(), Tokio).build(); + let otel_mp = SdkMeterProvider::builder().with_reader(reader).build(); + + // Create the SDK metrics types from the OTel objects + let sdk_mp = AwsSdkOtelMeterProvider::new(otel_mp); + let sdk_tp = TelemetryProvider::builder() + .meter_provider(sdk_mp) + .build() + .unwrap(); + + // Get the dyn versions of the SDK metrics objects + let dyn_sdk_mp = sdk_tp.meter_provider(); + let dyn_sdk_meter = dyn_sdk_mp.get_meter("TestMeter", None); + + //Create all 3 sync instruments and record some data for each + let mono_counter = + dyn_sdk_meter.create_monotonic_counter("TestMonoCounter".to_string(), None, None); + mono_counter.add(4, None, None); + let ud_counter = + dyn_sdk_meter.create_up_down_counter("TestUpDownCounter".to_string(), None, None); + ud_counter.add(-6, None, None); + let histogram = dyn_sdk_meter.create_histogram("TestHistogram".to_string(), None, None); + histogram.record(1.234, None, None); + + // Gracefully shutdown the metrics provider so all metrics are flushed through the pipeline + dyn_sdk_mp.shutdown().unwrap(); + + // Extract the metrics from the exporter and assert that they are what we expect + let finished_metrics = exporter.get_finished_metrics().unwrap(); + let extracted_mono_counter_data = &finished_metrics[0].scope_metrics[0].metrics[0] + .data + .as_any() + .downcast_ref::>() + .unwrap() + .data_points[0] + .value; + assert_eq!(extracted_mono_counter_data, &4); + + let extracted_ud_counter_data = &finished_metrics[0].scope_metrics[0].metrics[1] + .data + .as_any() + .downcast_ref::>() + .unwrap() + .data_points[0] + .value; + assert_eq!(extracted_ud_counter_data, &-6); + + let extracted_histogram_data = &finished_metrics[0].scope_metrics[0].metrics[2] + .data + .as_any() + .downcast_ref::>() + .unwrap() + .data_points[0] + .sum; + assert_eq!(extracted_histogram_data, &1.234); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + async fn async_instrument_work() { + // Create the OTel metrics objects + let exporter = InMemoryMetricsExporter::default(); + let reader = PeriodicReader::builder(exporter.clone(), Tokio).build(); + let otel_mp = SdkMeterProvider::builder().with_reader(reader).build(); + + // Create the SDK metrics types from the OTel objects + let sdk_mp = AwsSdkOtelMeterProvider::new(otel_mp); + let sdk_tp = TelemetryProvider::builder() + .meter_provider(sdk_mp) + .build() + .unwrap(); + + // Get the dyn versions of the SDK metrics objects + let dyn_sdk_mp = sdk_tp.meter_provider(); + let dyn_sdk_meter = dyn_sdk_mp.get_meter("TestMeter", None); + + //Create all async instruments and record some data + let gauge = dyn_sdk_meter.create_gauge( + "TestGauge".to_string(), + // Callback function records another value with different attributes so it is deduped + |measurement: &AsyncInstrumentWrap<'_, f64>| { + let mut attrs = Attributes::new(); + attrs.set( + "TestGaugeAttr", + AttributeValue::String("TestGaugeAttr".into()), + ); + measurement.record(6.789, Some(&attrs), None); + }, + None, + None, + ); + gauge.record(1.234, None, None); + + let async_ud_counter = dyn_sdk_meter.create_async_up_down_counter( + "TestAsyncUpDownCounter".to_string(), + |measurement: &AsyncInstrumentWrap<'_, i64>| { + let mut attrs = Attributes::new(); + attrs.set( + "TestAsyncUpDownCounterAttr", + AttributeValue::String("TestAsyncUpDownCounterAttr".into()), + ); + measurement.record(12, Some(&attrs), None); + }, + None, + None, + ); + async_ud_counter.record(-6, None, None); + + let async_mono_counter = dyn_sdk_meter.create_async_monotonic_counter( + "TestAsyncMonoCounter".to_string(), + |measurement: &AsyncInstrumentWrap<'_, u64>| { + let mut attrs = Attributes::new(); + attrs.set( + "TestAsyncMonoCounterAttr", + AttributeValue::String("TestAsyncMonoCounterAttr".into()), + ); + measurement.record(123, Some(&attrs), None); + }, + None, + None, + ); + async_mono_counter.record(4, None, None); + + // Gracefully shutdown the metrics provider so all metrics are flushed through the pipeline + dyn_sdk_mp.shutdown().unwrap(); + + // Extract the metrics from the exporter + let finished_metrics = exporter.get_finished_metrics().unwrap(); + + // Assert that the reported metrics are what we expect + let extracted_gauge_data = &finished_metrics[0].scope_metrics[0].metrics[0] + .data + .as_any() + .downcast_ref::>() + .unwrap() + .data_points[0] + .value; + assert_eq!(extracted_gauge_data, &1.234); + + let extracted_async_ud_counter_data = &finished_metrics[0].scope_metrics[0].metrics[1] + .data + .as_any() + .downcast_ref::>() + .unwrap() + .data_points[0] + .value; + assert_eq!(extracted_async_ud_counter_data, &-6); + + let extracted_async_mono_data = &finished_metrics[0].scope_metrics[0].metrics[2] + .data + .as_any() + .downcast_ref::>() + .unwrap() + .data_points[0] + .value; + assert_eq!(extracted_async_mono_data, &4); + + // Assert that the async callbacks ran + let finished_metrics = exporter.get_finished_metrics().unwrap(); + let extracted_gauge_data = &finished_metrics[0].scope_metrics[0].metrics[0] + .data + .as_any() + .downcast_ref::>() + .unwrap() + .data_points[1] + .value; + assert_eq!(extracted_gauge_data, &6.789); + + let extracted_async_ud_counter_data = &finished_metrics[0].scope_metrics[0].metrics[1] + .data + .as_any() + .downcast_ref::>() + .unwrap() + .data_points[1] + .value; + assert_eq!(extracted_async_ud_counter_data, &12); + + let extracted_async_mono_data = &finished_metrics[0].scope_metrics[0].metrics[2] + .data + .as_any() + .downcast_ref::>() + .unwrap() + .data_points[1] + .value; + assert_eq!(extracted_async_mono_data, &123); + } +} diff --git a/rust-runtime/aws-smithy-observability/Cargo.toml b/rust-runtime/aws-smithy-observability/Cargo.toml new file mode 100644 index 0000000000..76bb6d8fe4 --- /dev/null +++ b/rust-runtime/aws-smithy-observability/Cargo.toml @@ -0,0 +1,25 @@ +[package] +name = "aws-smithy-observability" +version = "0.1.0" +authors = [ + "AWS Rust SDK Team ", +] +description = "Smithy observability implementation." +edition = "2021" +license = "Apache-2.0" +repository = "https://github.com/awslabs/smithy-rs" + +[dependencies] +aws-smithy-runtime-api = { path = "../aws-smithy-runtime-api" } +once_cell = "1.19.0" + +[dev-dependencies] +serial_test = "3.1.1" + + +[package.metadata.docs.rs] +all-features = true +targets = ["x86_64-unknown-linux-gnu"] +cargo-args = ["-Zunstable-options", "-Zrustdoc-scrape-examples"] +rustdoc-args = ["--cfg", "docsrs"] +# End of docs.rs metadata diff --git a/rust-runtime/aws-smithy-observability/LICENSE b/rust-runtime/aws-smithy-observability/LICENSE new file mode 100644 index 0000000000..67db858821 --- /dev/null +++ b/rust-runtime/aws-smithy-observability/LICENSE @@ -0,0 +1,175 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. diff --git a/rust-runtime/aws-smithy-observability/README.md b/rust-runtime/aws-smithy-observability/README.md new file mode 100644 index 0000000000..a3b5b3a74a --- /dev/null +++ b/rust-runtime/aws-smithy-observability/README.md @@ -0,0 +1,7 @@ +# aws-smithy-observability + +This crate contains traits allowing for the implementation of `TelemetryProvider`s for the AWS SDK for Rust. It also contains a `global` module for setting and interacting with the current `GlobalTelemetryProvider`. + + +This crate is part of the [AWS SDK for Rust](https://awslabs.github.io/aws-sdk-rust/) and the [smithy-rs](https://github.com/smithy-lang/smithy-rs) code generator. In most cases, it should not be used directly. + diff --git a/rust-runtime/aws-smithy-observability/external-types.toml b/rust-runtime/aws-smithy-observability/external-types.toml new file mode 100644 index 0000000000..bae0c9f843 --- /dev/null +++ b/rust-runtime/aws-smithy-observability/external-types.toml @@ -0,0 +1,3 @@ +allowed_external_types = [ + "aws_smithy_runtime_api::box_error::BoxError", +] diff --git a/rust-runtime/aws-smithy-observability/src/attributes.rs b/rust-runtime/aws-smithy-observability/src/attributes.rs new file mode 100644 index 0000000000..ee9e5b599b --- /dev/null +++ b/rust-runtime/aws-smithy-observability/src/attributes.rs @@ -0,0 +1,85 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +//! Attributes (also referred to as tags or annotations in other telemetry systems) are structured +//! key-value pairs that annotate a span or event. Structured data allows observability backends +//! to index and process telemetry data in ways that simple log messages lack. + +use std::collections::HashMap; +use std::fmt::Debug; + +/// The valid types of values accepted by [Attributes]. +#[non_exhaustive] +#[derive(Clone, Debug, PartialEq)] +pub enum AttributeValue { + /// Holds an [i64] + I64(i64), + /// Holds an [f64] + F64(f64), + /// Holds a [String] + String(String), + /// Holds a [bool] + Bool(bool), +} + +/// Structured telemetry metadata. +#[non_exhaustive] +#[derive(Clone, Default)] +pub struct Attributes { + attrs: HashMap, +} + +impl Attributes { + /// Create a new empty instance of [Attributes]. + pub fn new() -> Self { + Self::default() + } + + /// Set an attribute. + pub fn set(&mut self, key: impl Into, value: impl Into) { + self.attrs.insert(key.into(), value.into()); + } + + /// Get an attribute. + pub fn get(&self, key: impl Into) -> Option<&AttributeValue> { + self.attrs.get(&key.into()) + } + + /// Get all of the attribute key value pairs. + pub fn attributes(&self) -> &HashMap { + &self.attrs + } + + /// Get an owned [Iterator] of ([String], [AttributeValue]). + pub fn into_attributes(self) -> impl Iterator { + self.attrs.into_iter() + } +} + +/// Delineates a logical scope that has some beginning and end +/// (e.g. a function or block of code). +pub trait Scope: Send + Sync + Debug { + /// invoke when the scope has ended. + fn end(&self); +} + +/// A cross cutting concern for carrying execution-scoped values across API +/// boundaries (both in-process and distributed). +pub trait Context: Send + Sync + Debug { + /// A type implementing the [Scope] trait. + type Scope: Scope; + /// Make this context the currently active context. + /// The returned handle is used to return the previous + /// context (if one existed) as active. + fn make_current(&self) -> &Self::Scope; +} + +/// Keeps track of the current [Context]. +pub trait ContextManager: Send + Sync + Debug { + /// A type implementing the [Context] trait. + type Context: Context; + ///Get the currently active context. + fn current(&self) -> &Self::Context; +} diff --git a/rust-runtime/aws-smithy-observability/src/error.rs b/rust-runtime/aws-smithy-observability/src/error.rs new file mode 100644 index 0000000000..7b93cf1dad --- /dev/null +++ b/rust-runtime/aws-smithy-observability/src/error.rs @@ -0,0 +1,96 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +//! Observability Errors + +use std::fmt; + +use aws_smithy_runtime_api::box_error::BoxError; + +/// An error in the SDKs Observability providers +#[non_exhaustive] +#[derive(Debug)] +pub struct ObservabilityError { + kind: ErrorKind, + source: BoxError, +} + +/// The types of errors associated with [ObservabilityError] +#[non_exhaustive] +#[derive(Debug)] +pub enum ErrorKind { + /// An error setting the global [crate::TelemetryProvider] + SettingGlobalProvider, + /// An error getting the global [crate::TelemetryProvider] + GettingGlobalProvider, + /// Error flushing metrics pipeline + MetricsFlush, + /// Error gracefully shutting down Metrics Provider + MetricsShutdown, + /// A custom error that does not fall under any other error kind + Other, +} + +impl ObservabilityError { + /// Create a new [`ObservabilityError`] from an [ErrorKind] and a [BoxError] + pub fn new(kind: ErrorKind, err: E) -> Self + where + E: Into, + { + Self { + kind, + source: err.into(), + } + } + + /// Returns the corresponding [`ErrorKind`] for this error. + pub fn kind(&self) -> &ErrorKind { + &self.kind + } +} + +impl fmt::Display for ObservabilityError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match &self.kind { + ErrorKind::Other => write!(f, "unclassified error"), + ErrorKind::SettingGlobalProvider => { + write!(f, "failed to set global TelemetryProvider") + } + ErrorKind::GettingGlobalProvider => { + write!(f, "failed to get global TelemetryProvider") + } + ErrorKind::MetricsFlush => write!(f, "failed to flush metrics pipeline"), + ErrorKind::MetricsShutdown => write!(f, "failed to shutdown metrics provider"), + } + } +} + +impl std::error::Error for ObservabilityError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + Some(self.source.as_ref()) + } +} + +/// An simple error to represent issues with the global [crate::TelemetryProvider]. +#[derive(Debug)] +#[non_exhaustive] +pub struct GlobalTelemetryProviderError { + reason: &'static str, +} + +impl GlobalTelemetryProviderError { + /// Create a new [GlobalTelemetryProviderError] with a reason for the error. + pub fn new(reason: &'static str) -> Self { + Self { reason } + } +} + +impl std::error::Error for GlobalTelemetryProviderError {} + +impl fmt::Display for GlobalTelemetryProviderError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "GlobalTelemetryProviderError: {}", self.reason) + } +} diff --git a/rust-runtime/aws-smithy-observability/src/global.rs b/rust-runtime/aws-smithy-observability/src/global.rs new file mode 100644 index 0000000000..98687a2886 --- /dev/null +++ b/rust-runtime/aws-smithy-observability/src/global.rs @@ -0,0 +1,107 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +//! Utilities for interacting with the currently set `GlobalTelemetryProvider` + +use once_cell::sync::Lazy; +use std::{ + any::Any, + mem, + sync::{Arc, RwLock}, +}; + +use crate::{ + error::{ErrorKind, GlobalTelemetryProviderError}, + meter::ProvideMeter, + provider::{GlobalTelemetryProvider, TelemetryProvider}, + ObservabilityError, +}; + +// Statically store the global provider +static GLOBAL_TELEMETRY_PROVIDER: Lazy>> = Lazy::new(|| { + RwLock::new(Box::new(GlobalTelemetryProvider::new( + TelemetryProvider::default(), + ))) +}); + +/// Set the current global [TelemetryProvider]. +/// +/// This is meant to be run once at the beginning of an application. Will return an [Err] if the +/// [RwLock] holding the global [TelemetryProvider] is locked or poisoned. +pub fn set_telemetry_provider( + new_provider: TelemetryProvider, +) -> Result<(), ObservabilityError> { + if let Ok(mut old_provider) = GLOBAL_TELEMETRY_PROVIDER.try_write() { + let new_global_provider = Box::new(GlobalTelemetryProvider::new(new_provider)); + + let _ = mem::replace(&mut *old_provider, new_global_provider); + + Ok(()) + } else { + Err(ObservabilityError::new( + ErrorKind::SettingGlobalProvider, + GlobalTelemetryProviderError::new( + "Failed to set TelemetryProvider, likely because the RwLock holding it is locked.", + ), + )) + } +} + +/// Get an [Arc] reference to the current global [TelemetryProvider]. Will return an [Err] if the +/// [RwLock] holding the global [TelemetryProvider] is locked or poisoned. +pub fn get_telemetry_provider( +) -> Result>, ObservabilityError> { + if let Ok(tp) = GLOBAL_TELEMETRY_PROVIDER.try_read() { + if let Some(typed_tp) = tp.downcast_ref::>() { + Ok(typed_tp.telemetry_provider().clone()) + } else { + Err(ObservabilityError::new( + ErrorKind::GettingGlobalProvider, + GlobalTelemetryProviderError::new( + "Failed to downcast TelemetryProvider to the provided type.", + ), + )) + } + } else { + Err(ObservabilityError::new( + ErrorKind::GettingGlobalProvider, + GlobalTelemetryProviderError::new("Failed to get the TelemetryProvider, likely because the RwLock containing it is locked."), + )) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + meter::{Meter, MonotonicCounter}, + noop::NoopMeterProvider, + provider::TelemetryProvider, + }; + use serial_test::serial; + + // Note: the tests in this module are run serially to prevent them from stepping on each other and poisoning the + // RwLock holding the GlobalTelemetryProvider + #[test] + #[serial] + fn can_set_global_telemetry_provider() { + let my_provider = TelemetryProvider::default(); + + // Set the new counter and get a reference to the old one + set_telemetry_provider(my_provider).unwrap(); + } + + #[test] + #[serial] + fn can_get_global_telemetry_provider() { + let curr_provider = get_telemetry_provider::().unwrap(); + + // Use the global provider to create an instrument and record a value with it + let curr_mp = curr_provider.meter_provider(); + let curr_meter = curr_mp.get_meter("TestMeter", None); + let instrument = curr_meter.create_monotonic_counter("TestMonoCounter".into(), None, None); + instrument.add(4, None, None); + } +} diff --git a/rust-runtime/aws-smithy-observability/src/lib.rs b/rust-runtime/aws-smithy-observability/src/lib.rs new file mode 100644 index 0000000000..06261214e3 --- /dev/null +++ b/rust-runtime/aws-smithy-observability/src/lib.rs @@ -0,0 +1,28 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +/* Automatically managed default lints */ +#![cfg_attr(docsrs, feature(doc_auto_cfg))] +/* End of automatically managed default lints */ +#![warn( + missing_docs, + rustdoc::missing_crate_level_docs, + unreachable_pub, + rust_2018_idioms +)] + +//! Smithy Observability +// TODO(smithyobservability): once we have finalized everything and integrated metrics with our runtime +// libraries update this with detailed usage docs and examples + +pub mod attributes; +pub use attributes::{AttributeValue, Attributes}; +pub mod error; +pub use error::{ErrorKind, ObservabilityError}; +pub mod global; +pub mod meter; +mod noop; +pub mod provider; +pub use provider::{TelemetryProvider, TelemetryProviderBuilder}; diff --git a/rust-runtime/aws-smithy-observability/src/meter.rs b/rust-runtime/aws-smithy-observability/src/meter.rs new file mode 100644 index 0000000000..817a512c59 --- /dev/null +++ b/rust-runtime/aws-smithy-observability/src/meter.rs @@ -0,0 +1,152 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +//! Metrics are used to gain insight into the operational performance and health of a system in +//! real time. + +use crate::attributes::{Attributes, Context}; +use std::{fmt::Debug, sync::Arc}; + +/// Provides named instances of [Meter]. +pub trait ProvideMeter: Send + Sync + Debug { + /// A type implementing the [Meter] trait. + type Meter: Meter; + /// Get or create a named [Meter]. + fn get_meter(&self, scope: &'static str, attributes: Option<&Attributes>) -> Arc; +} + +/// Collects a set of events with an event count and sum for all events. +pub trait Histogram: Send + Sync + Debug { + /// A type implementing [crate::attributes::Context] + type Context: Context; + /// Record a value. + fn record(&self, value: f64, attributes: Option<&Attributes>, context: Option<&Self::Context>); +} + +/// A counter that monotonically increases. +pub trait MonotonicCounter: Send + Sync + Debug { + /// A type implementing [crate::attributes::Context] + type Context: Context; + /// Increment a counter by a fixed amount. + fn add(&self, value: u64, attributes: Option<&Attributes>, context: Option<&Self::Context>); +} + +/// A counter that can increase or decrease. +pub trait UpDownCounter: Send + Sync + Debug { + /// A type implementing [crate::attributes::Context] + type Context: Context; + /// Increment or decrement a counter by a fixed amount. + fn add(&self, value: i64, attributes: Option<&Attributes>, context: Option<&Self::Context>); +} + +/// Foo +pub trait AsyncMeasure: Send + Sync + Debug { + /// A type implementing [crate::attributes::Context] + type Context: Context; + + /// The type of the value recorded by this instrument + type Value; + + /// Record a value + fn record( + &self, + value: Self::Value, + attributes: Option<&Attributes>, + context: Option<&Self::Context>, + ); + + /// Stop recording, unregister callback. + fn stop(&self); +} + +/// The entry point to creating instruments. A grouping of related metrics. +pub trait Meter: Send + Sync + Debug { + /// A type implementing [crate::attributes::Context] + type Context: Context; + + /// A type implementing [AsyncMeasure] for [f64] + type Gauge: AsyncMeasure; + /// The type of the callback function passed when creating a [Meter::Gauge] + type GaugeCallback<'a>: Fn(&Self::GaugeCallbackInput<'a>) + Send + Sync; + /// The type of the input to [Meter::GaugeCallback] + type GaugeCallbackInput<'a>: AsyncMeasure; + + /// A type implementing [UpDownCounter] + type UpDownCounter: UpDownCounter; + + /// A type implementing [AsyncMeasure] for [i64] + type AsyncUDC: AsyncMeasure; + /// The type of the callback function passed when creating a [Meter::AsyncUDC] + type AsyncUDCCallback<'a>: Fn(&Self::AsyncUDCCallbackInput<'a>) + Send + Sync; + /// The type of the input to [Meter::AsyncUDCCallback] + type AsyncUDCCallbackInput<'a>: AsyncMeasure; + + /// A type implementing [MonotonicCounter] + type MonotonicCounter: MonotonicCounter; + + /// A type implementing [AsyncMeasure] for [u64] + type AsyncMC: AsyncMeasure; + /// The type of the callback function passed when creating a [Meter::AsyncMC] + type AsyncMCCallback<'a>: Fn(&Self::AsyncMCCallbackInput<'a>) + Send + Sync; + /// The type of the input to [Meter::AsyncMCCallback] + type AsyncMCCallbackInput<'a>: AsyncMeasure; + + /// A type implementing [Histogram] + type Histogram: Histogram; + + /// Create a new Gauge. + #[allow(clippy::type_complexity)] + fn create_gauge( + &self, + name: String, + callback: Self::GaugeCallback<'_>, + units: Option, + description: Option, + ) -> Arc; + + /// Create a new [UpDownCounter]. + fn create_up_down_counter( + &self, + name: String, + units: Option, + description: Option, + ) -> Arc; + + /// Create a new AsyncUpDownCounter. + #[allow(clippy::type_complexity)] + fn create_async_up_down_counter( + &self, + name: String, + callback: Self::AsyncUDCCallback<'_>, + units: Option, + description: Option, + ) -> Arc; + + /// Create a new [MonotonicCounter]. + fn create_monotonic_counter( + &self, + name: String, + units: Option, + description: Option, + ) -> Arc; + + /// Create a new AsyncMonotonicCounter. + #[allow(clippy::type_complexity)] + fn create_async_monotonic_counter( + &self, + name: String, + callback: Self::AsyncMCCallback<'_>, + units: Option, + description: Option, + ) -> Arc; + + /// Create a new [Histogram]. + fn create_histogram( + &self, + name: String, + units: Option, + description: Option, + ) -> Arc; +} diff --git a/rust-runtime/aws-smithy-observability/src/noop.rs b/rust-runtime/aws-smithy-observability/src/noop.rs new file mode 100644 index 0000000000..8ea79cabdc --- /dev/null +++ b/rust-runtime/aws-smithy-observability/src/noop.rs @@ -0,0 +1,179 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +//! An noop implementation of the Meter traits + +use std::marker::PhantomData; +use std::{fmt::Debug, sync::Arc}; + +use crate::{ + attributes::{Attributes, Context, Scope}, + meter::{AsyncMeasure, Histogram, Meter, MonotonicCounter, ProvideMeter, UpDownCounter}, +}; + +#[derive(Debug)] +#[non_exhaustive] +pub struct NoopMeterProvider; +impl ProvideMeter for NoopMeterProvider { + type Meter = NoopMeter; + + fn get_meter( + &self, + _scope: &'static str, + _attributes: Option<&Attributes>, + ) -> Arc { + Arc::new(NoopMeter) + } +} + +#[derive(Debug)] +#[non_exhaustive] +pub struct NoopMeter; +impl Meter for NoopMeter { + type Context = NoopContext; + + type Gauge = NoopAsyncMeasurement; + + type GaugeCallback<'a> = fn(&NoopAsyncMeasurement); + + type GaugeCallbackInput<'a> = NoopAsyncMeasurement; + + type UpDownCounter = NoopUpDownCounter; + + type AsyncUDC = NoopAsyncMeasurement; + + type AsyncUDCCallback<'a> = fn(&NoopAsyncMeasurement); + + type AsyncUDCCallbackInput<'a> = NoopAsyncMeasurement; + + type MonotonicCounter = NoopMonotonicCounter; + + type AsyncMC = NoopAsyncMeasurement; + + type AsyncMCCallback<'a> = fn(&NoopAsyncMeasurement); + + type AsyncMCCallbackInput<'a> = NoopAsyncMeasurement; + + type Histogram = NoopHistogram; + + fn create_gauge( + &self, + _name: String, + _callback: Self::GaugeCallback<'_>, + _units: Option, + _description: Option, + ) -> Arc> { + Arc::new(NoopAsyncMeasurement(PhantomData::)) + } + + fn create_up_down_counter( + &self, + _name: String, + _units: Option, + _description: Option, + ) -> Arc { + Arc::new(NoopUpDownCounter) + } + + fn create_async_up_down_counter( + &self, + _name: String, + _callback: Self::AsyncUDCCallback<'_>, + _units: Option, + _description: Option, + ) -> Arc> { + Arc::new(NoopAsyncMeasurement(PhantomData::)) + } + + fn create_monotonic_counter( + &self, + _name: String, + _units: Option, + _description: Option, + ) -> Arc { + Arc::new(NoopMonotonicCounter) + } + + fn create_async_monotonic_counter( + &self, + _name: String, + _callback: Self::AsyncMCCallback<'_>, + _units: Option, + _description: Option, + ) -> Arc> { + Arc::new(NoopAsyncMeasurement(PhantomData::)) + } + + fn create_histogram( + &self, + _name: String, + _units: Option, + _description: Option, + ) -> Arc { + Arc::new(NoopHistogram) + } +} + +#[derive(Debug)] +#[non_exhaustive] +pub struct NoopAsyncMeasurement(PhantomData); +impl AsyncMeasure for NoopAsyncMeasurement { + type Value = T; + type Context = NoopContext; + + fn record(&self, _value: T, _attributes: Option<&Attributes>, _context: Option<&NoopContext>) {} + + fn stop(&self) {} +} + +#[derive(Debug)] +#[non_exhaustive] +pub struct NoopUpDownCounter; +impl UpDownCounter for NoopUpDownCounter { + type Context = NoopContext; + + fn add(&self, _value: i64, _attributes: Option<&Attributes>, _context: Option<&NoopContext>) {} +} + +#[derive(Debug)] +#[non_exhaustive] +pub struct NoopMonotonicCounter; +impl MonotonicCounter for NoopMonotonicCounter { + type Context = NoopContext; + + fn add(&self, _value: u64, _attributes: Option<&Attributes>, _context: Option<&NoopContext>) {} +} + +#[derive(Debug)] +#[non_exhaustive] +pub struct NoopHistogram; +impl Histogram for NoopHistogram { + type Context = NoopContext; + + fn record( + &self, + _value: f64, + _attributes: Option<&Attributes>, + _context: Option<&NoopContext>, + ) { + } +} + +#[derive(Debug)] +#[non_exhaustive] +pub struct NoopContext; +impl Context for NoopContext { + type Scope = NoopScope; + fn make_current(&self) -> &NoopScope { + &NoopScope + } +} + +#[derive(Debug)] +#[non_exhaustive] +pub struct NoopScope; +impl Scope for NoopScope { + fn end(&self) {} +} diff --git a/rust-runtime/aws-smithy-observability/src/provider.rs b/rust-runtime/aws-smithy-observability/src/provider.rs new file mode 100644 index 0000000000..643f6d8391 --- /dev/null +++ b/rust-runtime/aws-smithy-observability/src/provider.rs @@ -0,0 +1,92 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0 + */ + +//! Definitions of high level Telemetry Providers. + +use std::sync::Arc; + +use crate::{meter::ProvideMeter, noop::NoopMeterProvider}; + +/// A struct to hold the various types of telemetry providers. +#[non_exhaustive] +pub struct TelemetryProvider { + meter_provider: PM, +} + +impl TelemetryProvider { + /// Returns a builder struct for [TelemetryProvider] + pub fn builder() -> TelemetryProviderBuilder { + TelemetryProviderBuilder { + meter_provider: None, + } + } + + /// Returns a [TelemetryProvider] with a noop `meter_provider` + pub fn noop() -> TelemetryProvider { + TelemetryProvider { + meter_provider: NoopMeterProvider, + } + } + + /// Get a reference to the set [ProvideMeter] + pub fn meter_provider(&self) -> &PM { + &self.meter_provider + } +} + +// If we choose to expand our Telemetry provider and make Logging and Tracing +// configurable at some point in the future we can do that by adding default +// logger_provider and tracer_providers based on `tracing` to maintain backwards +// compatibilty with what we have today. +impl Default for TelemetryProvider { + fn default() -> Self { + Self { + meter_provider: NoopMeterProvider, + } + } +} + +/// A builder for [TelemetryProvider]. +// Implementation note: The meter_provider is wrapped in an Option to allow us to maintain +// the generic nature of the struct when first constructing it from TelemetryProvider::builder. +// Without the Option we would have to provide a properly typed instance of the meter_provider +// when calling the builder function. +#[non_exhaustive] +pub struct TelemetryProviderBuilder { + meter_provider: Option, +} + +impl TelemetryProviderBuilder { + /// Set the [ProvideMeter]. + pub fn meter_provider(mut self, meter_provider: PM) -> Self { + self.meter_provider = Some(meter_provider); + self + } + + /// Build the [TelemetryProvider]. Returns [None] if no `meter_provider` has been set. + pub fn build(self) -> Option> { + self.meter_provider + .map(|mp| TelemetryProvider { meter_provider: mp }) + } +} + +/// Wrapper type to hold a implementer of TelemetryProvider in an Arc so that +/// it can be safely used across threads. +#[non_exhaustive] +pub(crate) struct GlobalTelemetryProvider { + pub(crate) telemetry_provider: Arc>, +} + +impl GlobalTelemetryProvider { + pub(crate) fn new(telemetry_provider: TelemetryProvider) -> Self { + Self { + telemetry_provider: Arc::new(telemetry_provider), + } + } + + pub(crate) fn telemetry_provider(&self) -> &Arc> { + &self.telemetry_provider + } +}