This is page 53 of 74. Use http://codebase.md/apache/opendal?lines=true&page={x} to view the full context.
# Directory Structure
```
├── .asf.yaml
├── .config
│ └── nextest.toml
├── .devcontainer
│ ├── devcontainer.json
│ └── post_create.sh
├── .editorconfig
├── .env.example
├── .gitattributes
├── .github
│ ├── actions
│ │ ├── fuzz_test
│ │ │ └── action.yaml
│ │ ├── setup
│ │ │ └── action.yaml
│ │ ├── setup-hadoop
│ │ │ └── action.yaml
│ │ ├── setup-ocaml
│ │ │ └── action.yaml
│ │ ├── test_behavior_binding_c
│ │ │ └── action.yaml
│ │ ├── test_behavior_binding_cpp
│ │ │ └── action.yaml
│ │ ├── test_behavior_binding_go
│ │ │ └── action.yaml
│ │ ├── test_behavior_binding_java
│ │ │ └── action.yaml
│ │ ├── test_behavior_binding_nodejs
│ │ │ └── action.yaml
│ │ ├── test_behavior_binding_python
│ │ │ └── action.yaml
│ │ ├── test_behavior_core
│ │ │ └── action.yaml
│ │ └── test_behavior_integration_object_store
│ │ └── action.yml
│ ├── CODEOWNERS
│ ├── dependabot.yml
│ ├── ISSUE_TEMPLATE
│ │ ├── 1-bug-report.yml
│ │ ├── 2-feature-request.yml
│ │ ├── 3-new-release.md
│ │ └── config.yml
│ ├── pull_request_template.md
│ ├── release.yml
│ ├── scripts
│ │ ├── test_behavior
│ │ │ ├── __init__.py
│ │ │ ├── plan.py
│ │ │ └── test_plan.py
│ │ ├── test_go_binding
│ │ │ ├── generate_test_scheme.py
│ │ │ └── matrix.yaml
│ │ └── weekly_update
│ │ ├── .gitignore
│ │ ├── .python-version
│ │ ├── main.py
│ │ ├── pyproject.toml
│ │ ├── README.md
│ │ └── uv.lock
│ ├── services
│ │ ├── aliyun_drive
│ │ │ └── aliyun_drive
│ │ │ └── disable_action.yml
│ │ ├── alluxio
│ │ │ └── alluxio
│ │ │ └── action.yml
│ │ ├── azblob
│ │ │ ├── azure_azblob
│ │ │ │ └── action.yml
│ │ │ └── azurite_azblob
│ │ │ └── action.yml
│ │ ├── azdls
│ │ │ └── azdls
│ │ │ └── action.yml
│ │ ├── azfile
│ │ │ └── azfile
│ │ │ └── action.yml
│ │ ├── b2
│ │ │ └── b2
│ │ │ └── action.yml
│ │ ├── cacache
│ │ │ └── cacache
│ │ │ └── action.yml
│ │ ├── compfs
│ │ │ └── compfs
│ │ │ └── action.yml
│ │ ├── cos
│ │ │ └── cos
│ │ │ └── action.yml
│ │ ├── dashmap
│ │ │ └── dashmap
│ │ │ └── action.yml
│ │ ├── dropbox
│ │ │ └── dropbox
│ │ │ └── disable_action.yml
│ │ ├── etcd
│ │ │ ├── etcd
│ │ │ │ └── action.yml
│ │ │ ├── etcd-cluster
│ │ │ │ └── action.yml
│ │ │ └── etcd-tls
│ │ │ └── action.yml
│ │ ├── fs
│ │ │ └── local_fs
│ │ │ └── action.yml
│ │ ├── ftp
│ │ │ └── vsftpd
│ │ │ └── disable_action.yml
│ │ ├── gcs
│ │ │ ├── gcs
│ │ │ │ └── action.yml
│ │ │ └── gcs_with_default_storage_class
│ │ │ └── action.yml
│ │ ├── gdrive
│ │ │ └── gdrive
│ │ │ └── action.yml
│ │ ├── gridfs
│ │ │ ├── gridfs
│ │ │ │ └── action.yml
│ │ │ └── gridfs_with_basic_auth
│ │ │ └── action.yml
│ │ ├── hdfs
│ │ │ ├── hdfs_cluster
│ │ │ │ └── action.yml
│ │ │ ├── hdfs_cluster_with_atomic_write_dir
│ │ │ │ └── action.yml
│ │ │ ├── hdfs_default
│ │ │ │ └── action.yml
│ │ │ ├── hdfs_default_gcs
│ │ │ │ └── action.yml
│ │ │ ├── hdfs_default_on_azurite_azblob
│ │ │ │ └── action.yml
│ │ │ ├── hdfs_default_on_minio_s3
│ │ │ │ └── action.yml
│ │ │ └── hdfs_default_with_atomic_write_dir
│ │ │ └── action.yml
│ │ ├── hdfs_native
│ │ │ └── hdfs_native_cluster
│ │ │ └── action.yml
│ │ ├── http
│ │ │ ├── caddy
│ │ │ │ └── action.yml
│ │ │ └── nginx
│ │ │ └── action.yml
│ │ ├── huggingface
│ │ │ └── huggingface
│ │ │ └── action.yml
│ │ ├── koofr
│ │ │ └── koofr
│ │ │ └── disable_action.yml
│ │ ├── memcached
│ │ │ ├── memcached
│ │ │ │ └── action.yml
│ │ │ └── memcached_with_auth
│ │ │ └── action.yml
│ │ ├── memory
│ │ │ └── memory
│ │ │ └── action.yml
│ │ ├── mini_moka
│ │ │ └── mini_moka
│ │ │ └── action.yml
│ │ ├── moka
│ │ │ └── moka
│ │ │ └── action.yml
│ │ ├── mongodb
│ │ │ ├── mongodb_with_basic_auth
│ │ │ │ └── action.yml
│ │ │ └── mongodb_with_no_auth
│ │ │ └── action.yml
│ │ ├── monoiofs
│ │ │ └── monoiofs
│ │ │ └── action.yml
│ │ ├── mysql
│ │ │ └── mysql
│ │ │ └── action.yml
│ │ ├── oss
│ │ │ ├── oss
│ │ │ │ └── action.yml
│ │ │ └── oss_with_versioning
│ │ │ └── action.yml
│ │ ├── persy
│ │ │ └── persy
│ │ │ └── action.yml
│ │ ├── postgresql
│ │ │ └── postgresql
│ │ │ └── action.yml
│ │ ├── redb
│ │ │ └── redb
│ │ │ └── action.yml
│ │ ├── redis
│ │ │ ├── dragonfly
│ │ │ │ └── action.yml
│ │ │ ├── kvrocks
│ │ │ │ └── action.yml
│ │ │ ├── redis
│ │ │ │ └── action.yml
│ │ │ ├── redis_tls
│ │ │ │ └── action.yml
│ │ │ ├── redis_with_cluster
│ │ │ │ └── action.yml
│ │ │ └── redis_with_cluster_tls
│ │ │ └── action.yml
│ │ ├── rocksdb
│ │ │ └── rocksdb
│ │ │ └── action.yml
│ │ ├── s3
│ │ │ ├── 0_minio_s3
│ │ │ │ └── action.yml
│ │ │ ├── aws_s3
│ │ │ │ └── action.yml
│ │ │ ├── aws_s3_with_list_objects_v1
│ │ │ │ └── action.yml
│ │ │ ├── aws_s3_with_sse_c
│ │ │ │ └── action.yml
│ │ │ ├── aws_s3_with_versioning
│ │ │ │ └── action.yml
│ │ │ ├── aws_s3_with_virtual_host
│ │ │ │ └── action.yml
│ │ │ ├── ceph_radios_s3_with_versioning
│ │ │ │ └── disable_action.yml
│ │ │ ├── ceph_rados_s3
│ │ │ │ └── disable_action.yml
│ │ │ ├── minio_s3_with_anonymous
│ │ │ │ └── action.yml
│ │ │ ├── minio_s3_with_list_objects_v1
│ │ │ │ └── action.yml
│ │ │ ├── minio_s3_with_versioning
│ │ │ │ └── action.yml
│ │ │ └── r2
│ │ │ └── disabled_action.yml
│ │ ├── seafile
│ │ │ └── seafile
│ │ │ └── action.yml
│ │ ├── sftp
│ │ │ ├── sftp
│ │ │ │ └── action.yml
│ │ │ └── sftp_with_default_root
│ │ │ └── action.yml
│ │ ├── sled
│ │ │ ├── sled
│ │ │ │ └── action.yml
│ │ │ └── sled_with_tree
│ │ │ └── action.yml
│ │ ├── sqlite
│ │ │ └── sqlite
│ │ │ └── action.yml
│ │ ├── swift
│ │ │ ├── ceph_rados_swift
│ │ │ │ └── action.yml
│ │ │ └── swift
│ │ │ └── action.yml
│ │ ├── tikv
│ │ │ └── tikv
│ │ │ └── disable_action.yml
│ │ ├── webdav
│ │ │ ├── 0_nginx
│ │ │ │ └── action.yml
│ │ │ ├── jfrog
│ │ │ │ └── disabled_action.yml
│ │ │ ├── nextcloud
│ │ │ │ └── action.yml
│ │ │ ├── nginx_with_empty_password
│ │ │ │ └── action.yml
│ │ │ ├── nginx_with_password
│ │ │ │ └── action.yml
│ │ │ ├── nginx_with_redirect
│ │ │ │ └── action.yml
│ │ │ └── owncloud
│ │ │ └── action.yml
│ │ └── webhdfs
│ │ ├── webhdfs
│ │ │ └── action.yml
│ │ ├── webhdfs_with_list_batch_disabled
│ │ │ └── action.yml
│ │ └── webhdfs_with_user_name
│ │ └── action.yml
│ └── workflows
│ ├── ci_bindings_c.yml
│ ├── ci_bindings_cpp.yml
│ ├── ci_bindings_d.yml
│ ├── ci_bindings_dart.yml
│ ├── ci_bindings_dotnet.yml
│ ├── ci_bindings_go.yml
│ ├── ci_bindings_haskell.yml
│ ├── ci_bindings_java.yml
│ ├── ci_bindings_lua.yml
│ ├── ci_bindings_nodejs.yml
│ ├── ci_bindings_ocaml.yml
│ ├── ci_bindings_php.yml
│ ├── ci_bindings_python.yml
│ ├── ci_bindings_ruby.yml
│ ├── ci_bindings_swift.yml
│ ├── ci_bindings_zig.yml
│ ├── ci_check.yml
│ ├── ci_core.yml
│ ├── ci_integration_dav_server.yml
│ ├── ci_integration_object_store.yml
│ ├── ci_integration_parquet.yml
│ ├── ci_integration_spring.yml
│ ├── ci_integration_unftp_sbe.yml
│ ├── ci_odev.yml
│ ├── ci_weekly_update.yml
│ ├── discussion-thread-link.yml
│ ├── docs.yml
│ ├── full-ci-promote.yml
│ ├── release_dart.yml
│ ├── release_java.yml
│ ├── release_nodejs.yml
│ ├── release_python.yml
│ ├── release_ruby.yml
│ ├── release_rust.yml
│ ├── service_test_ghac.yml
│ ├── test_behavior_binding_c.yml
│ ├── test_behavior_binding_cpp.yml
│ ├── test_behavior_binding_go.yml
│ ├── test_behavior_binding_java.yml
│ ├── test_behavior_binding_nodejs.yml
│ ├── test_behavior_binding_python.yml
│ ├── test_behavior_core.yml
│ ├── test_behavior_integration_object_store.yml
│ ├── test_behavior.yml
│ ├── test_edge.yml
│ └── test_fuzz.yml
├── .gitignore
├── .taplo.toml
├── .typos.toml
├── .vscode
│ └── settings.json
├── .yamlfmt
├── AGENTS.md
├── bindings
│ ├── java
│ │ ├── .cargo
│ │ │ └── config.toml
│ │ ├── .gitignore
│ │ ├── .mvn
│ │ │ └── wrapper
│ │ │ └── maven-wrapper.properties
│ │ ├── Cargo.toml
│ │ ├── DEPENDENCIES.md
│ │ ├── DEPENDENCIES.rust.tsv
│ │ ├── mvnw
│ │ ├── mvnw.cmd
│ │ ├── pom.xml
│ │ ├── README.md
│ │ ├── src
│ │ │ ├── async_operator.rs
│ │ │ ├── convert.rs
│ │ │ ├── error.rs
│ │ │ ├── executor.rs
│ │ │ ├── layer.rs
│ │ │ ├── lib.rs
│ │ │ ├── main
│ │ │ │ ├── java
│ │ │ │ │ └── org
│ │ │ │ │ └── apache
│ │ │ │ │ └── opendal
│ │ │ │ │ ├── AsyncExecutor.java
│ │ │ │ │ ├── AsyncOperator.java
│ │ │ │ │ ├── Capability.java
│ │ │ │ │ ├── Entry.java
│ │ │ │ │ ├── Environment.java
│ │ │ │ │ ├── layer
│ │ │ │ │ │ ├── ConcurrentLimitLayer.java
│ │ │ │ │ │ ├── package-info.java
│ │ │ │ │ │ └── RetryLayer.java
│ │ │ │ │ ├── Layer.java
│ │ │ │ │ ├── ListOptions.java
│ │ │ │ │ ├── Metadata.java
│ │ │ │ │ ├── NativeLibrary.java
│ │ │ │ │ ├── NativeObject.java
│ │ │ │ │ ├── OpenDAL.java
│ │ │ │ │ ├── OpenDALException.java
│ │ │ │ │ ├── Operator.java
│ │ │ │ │ ├── OperatorInfo.java
│ │ │ │ │ ├── OperatorInputStream.java
│ │ │ │ │ ├── OperatorOutputStream.java
│ │ │ │ │ ├── package-info.java
│ │ │ │ │ ├── PresignedRequest.java
│ │ │ │ │ ├── ReadOptions.java
│ │ │ │ │ ├── ServiceConfig.java
│ │ │ │ │ ├── StatOptions.java
│ │ │ │ │ └── WriteOptions.java
│ │ │ │ └── resources
│ │ │ │ ├── bindings.properties
│ │ │ │ └── META-INF
│ │ │ │ └── NOTICE
│ │ │ ├── operator_input_stream.rs
│ │ │ ├── operator_output_stream.rs
│ │ │ ├── operator.rs
│ │ │ ├── test
│ │ │ │ └── java
│ │ │ │ └── org
│ │ │ │ └── apache
│ │ │ │ └── opendal
│ │ │ │ └── test
│ │ │ │ ├── AsyncExecutorTest.java
│ │ │ │ ├── behavior
│ │ │ │ │ ├── AsyncCopyTest.java
│ │ │ │ │ ├── AsyncCreateDirTest.java
│ │ │ │ │ ├── AsyncListTest.java
│ │ │ │ │ ├── AsyncPresignTest.java
│ │ │ │ │ ├── AsyncReadOnlyTest.java
│ │ │ │ │ ├── AsyncRenameTest.java
│ │ │ │ │ ├── AsyncStatOptionsTest.java
│ │ │ │ │ ├── AsyncWriteOptionsTest.java
│ │ │ │ │ ├── AsyncWriteTest.java
│ │ │ │ │ ├── BehaviorExtension.java
│ │ │ │ │ ├── BehaviorTestBase.java
│ │ │ │ │ ├── BlockingCopyTest.java
│ │ │ │ │ ├── BlockingCreateDirTest.java
│ │ │ │ │ ├── BlockingListTest.java
│ │ │ │ │ ├── BlockingReadOnlyTest.java
│ │ │ │ │ ├── BlockingRenameTest.java
│ │ │ │ │ ├── BlockingStatOptionsTest.java
│ │ │ │ │ ├── BlockingWriteOptionTest.java
│ │ │ │ │ ├── BlockingWriteTest.java
│ │ │ │ │ └── RegressionTest.java
│ │ │ │ ├── condition
│ │ │ │ │ └── OpenDALExceptionCondition.java
│ │ │ │ ├── LayerTest.java
│ │ │ │ ├── MetadataTest.java
│ │ │ │ ├── OperatorDuplicateTest.java
│ │ │ │ ├── OperatorInfoTest.java
│ │ │ │ ├── OperatorInputOutputStreamTest.java
│ │ │ │ ├── OperatorUtf8DecodeTest.java
│ │ │ │ └── UtilityTest.java
│ │ │ └── utility.rs
│ │ ├── tools
│ │ │ └── build.py
│ │ ├── upgrade.md
│ │ └── users.md
│ ├── nodejs
│ │ ├── .cargo
│ │ │ └── config.toml
│ │ ├── .gitignore
│ │ ├── .node-version
│ │ ├── .npmignore
│ │ ├── .npmrc
│ │ ├── .prettierignore
│ │ ├── benchmark
│ │ │ ├── deno.ts
│ │ │ ├── node.js
│ │ │ └── README.md
│ │ ├── build.rs
│ │ ├── Cargo.toml
│ │ ├── CONTRIBUTING.md
│ │ ├── DEPENDENCIES.md
│ │ ├── DEPENDENCIES.rust.tsv
│ │ ├── devbox.json
│ │ ├── devbox.lock
│ │ ├── generated.d.ts
│ │ ├── generated.js
│ │ ├── index.cjs
│ │ ├── index.d.ts
│ │ ├── index.mjs
│ │ ├── npm
│ │ │ ├── darwin-arm64
│ │ │ │ ├── package.json
│ │ │ │ └── README.md
│ │ │ ├── darwin-x64
│ │ │ │ ├── package.json
│ │ │ │ └── README.md
│ │ │ ├── linux-arm64-gnu
│ │ │ │ ├── package.json
│ │ │ │ └── README.md
│ │ │ ├── linux-arm64-musl
│ │ │ │ ├── package.json
│ │ │ │ └── README.md
│ │ │ ├── linux-x64-gnu
│ │ │ │ ├── package.json
│ │ │ │ └── README.md
│ │ │ ├── linux-x64-musl
│ │ │ │ ├── package.json
│ │ │ │ └── README.md
│ │ │ ├── win32-arm64-msvc
│ │ │ │ ├── package.json
│ │ │ │ └── README.md
│ │ │ └── win32-x64-msvc
│ │ │ ├── package.json
│ │ │ └── README.md
│ │ ├── package.json
│ │ ├── pnpm-lock.yaml
│ │ ├── README.md
│ │ ├── scripts
│ │ │ └── header.mjs
│ │ ├── src
│ │ │ ├── capability.rs
│ │ │ ├── layer.rs
│ │ │ ├── lib.rs
│ │ │ └── options.rs
│ │ ├── tests
│ │ │ ├── service.test.mjs
│ │ │ ├── suites
│ │ │ │ ├── async.suite.mjs
│ │ │ │ ├── asyncDeleteOptions.suite.mjs
│ │ │ │ ├── asyncLister.suite.mjs
│ │ │ │ ├── asyncListOptions.suite.mjs
│ │ │ │ ├── asyncReadOptions.suite.mjs
│ │ │ │ ├── asyncStatOptions.suite.mjs
│ │ │ │ ├── asyncWriteOptions.suite.mjs
│ │ │ │ ├── index.mjs
│ │ │ │ ├── layer.suite.mjs
│ │ │ │ ├── services.suite.mjs
│ │ │ │ ├── sync.suite.mjs
│ │ │ │ ├── syncDeleteOptions.suite.mjs
│ │ │ │ ├── syncLister.suite.mjs
│ │ │ │ ├── syncListOptions.suite.mjs
│ │ │ │ ├── syncReadOptions.suite.mjs
│ │ │ │ ├── syncStatOptions.suite.mjs
│ │ │ │ └── syncWriteOptions.suite.mjs
│ │ │ └── utils.mjs
│ │ ├── theme
│ │ │ ├── index.tsx
│ │ │ └── package.json
│ │ ├── tsconfig.json
│ │ ├── tsconfig.theme.json
│ │ ├── typedoc.json
│ │ ├── upgrade.md
│ │ └── vitest.config.mjs
│ ├── python
│ │ ├── .gitignore
│ │ ├── benchmark
│ │ │ ├── async_opendal_benchmark.py
│ │ │ ├── async_origin_s3_benchmark_with_gevent.py
│ │ │ └── README.md
│ │ ├── Cargo.toml
│ │ ├── CONTRIBUTING.md
│ │ ├── DEPENDENCIES.md
│ │ ├── DEPENDENCIES.rust.tsv
│ │ ├── docs
│ │ │ ├── api
│ │ │ │ ├── async_file.md
│ │ │ │ ├── async_operator.md
│ │ │ │ ├── capability.md
│ │ │ │ ├── exceptions.md
│ │ │ │ ├── file.md
│ │ │ │ ├── layers.md
│ │ │ │ ├── operator.md
│ │ │ │ └── types.md
│ │ │ └── index.md
│ │ ├── justfile
│ │ ├── mkdocs.yml
│ │ ├── pyproject.toml
│ │ ├── pyrightconfig.json
│ │ ├── python
│ │ │ └── opendal
│ │ │ ├── __init__.py
│ │ │ ├── capability.pyi
│ │ │ ├── exceptions.pyi
│ │ │ ├── file.pyi
│ │ │ ├── layers.pyi
│ │ │ ├── operator.pyi
│ │ │ ├── py.typed
│ │ │ ├── services.pyi
│ │ │ └── types.pyi
│ │ ├── README.md
│ │ ├── ruff.toml
│ │ ├── src
│ │ │ ├── capability.rs
│ │ │ ├── errors.rs
│ │ │ ├── file.rs
│ │ │ ├── layers.rs
│ │ │ ├── lib.rs
│ │ │ ├── lister.rs
│ │ │ ├── metadata.rs
│ │ │ ├── operator.rs
│ │ │ ├── options.rs
│ │ │ ├── services.rs
│ │ │ └── utils.rs
│ │ ├── template
│ │ │ └── module.html.jinja2
│ │ ├── tests
│ │ │ ├── conftest.py
│ │ │ ├── test_async_check.py
│ │ │ ├── test_async_copy.py
│ │ │ ├── test_async_delete.py
│ │ │ ├── test_async_exists.py
│ │ │ ├── test_async_list.py
│ │ │ ├── test_async_pickle_types.py
│ │ │ ├── test_async_rename.py
│ │ │ ├── test_capability.py
│ │ │ ├── test_exceptions.py
│ │ │ ├── test_pickle_rw.py
│ │ │ ├── test_read.py
│ │ │ ├── test_sync_check.py
│ │ │ ├── test_sync_copy.py
│ │ │ ├── test_sync_delete.py
│ │ │ ├── test_sync_exists.py
│ │ │ ├── test_sync_list.py
│ │ │ ├── test_sync_pickle_types.py
│ │ │ ├── test_sync_rename.py
│ │ │ └── test_write.py
│ │ ├── upgrade.md
│ │ ├── users.md
│ │ └── uv.lock
│ └── README.md
├── CHANGELOG.md
├── CITATION.cff
├── CLAUDE.md
├── CONTRIBUTING.md
├── core
│ ├── benches
│ │ ├── ops
│ │ │ ├── main.rs
│ │ │ ├── read.rs
│ │ │ ├── README.md
│ │ │ ├── utils.rs
│ │ │ └── write.rs
│ │ ├── README.md
│ │ ├── types
│ │ │ ├── buffer.rs
│ │ │ ├── main.rs
│ │ │ ├── README.md
│ │ │ └── tasks.rs
│ │ ├── vs_fs
│ │ │ ├── Cargo.toml
│ │ │ ├── README.md
│ │ │ └── src
│ │ │ └── main.rs
│ │ └── vs_s3
│ │ ├── Cargo.toml
│ │ ├── README.md
│ │ └── src
│ │ └── main.rs
│ ├── Cargo.lock
│ ├── Cargo.toml
│ ├── CHANGELOG.md
│ ├── CONTRIBUTING.md
│ ├── core
│ │ ├── Cargo.toml
│ │ └── src
│ │ ├── blocking
│ │ │ ├── delete.rs
│ │ │ ├── list.rs
│ │ │ ├── mod.rs
│ │ │ ├── operator.rs
│ │ │ ├── read
│ │ │ │ ├── buffer_iterator.rs
│ │ │ │ ├── mod.rs
│ │ │ │ ├── reader.rs
│ │ │ │ ├── std_bytes_iterator.rs
│ │ │ │ └── std_reader.rs
│ │ │ └── write
│ │ │ ├── mod.rs
│ │ │ ├── std_writer.rs
│ │ │ └── writer.rs
│ │ ├── docs
│ │ │ ├── comparisons
│ │ │ │ ├── mod.rs
│ │ │ │ └── vs_object_store.md
│ │ │ ├── concepts.rs
│ │ │ ├── internals
│ │ │ │ ├── accessor.rs
│ │ │ │ ├── layer.rs
│ │ │ │ └── mod.rs
│ │ │ ├── mod.rs
│ │ │ ├── performance
│ │ │ │ ├── concurrent_write.md
│ │ │ │ ├── http_optimization.md
│ │ │ │ └── mod.rs
│ │ │ ├── rfcs
│ │ │ │ ├── 0000_example.md
│ │ │ │ ├── 0041_object_native_api.md
│ │ │ │ ├── 0044_error_handle.md
│ │ │ │ ├── 0057_auto_region.md
│ │ │ │ ├── 0069_object_stream.md
│ │ │ │ ├── 0090_limited_reader.md
│ │ │ │ ├── 0112_path_normalization.md
│ │ │ │ ├── 0191_async_streaming_io.md
│ │ │ │ ├── 0203_remove_credential.md
│ │ │ │ ├── 0221_create_dir.md
│ │ │ │ ├── 0247_retryable_error.md
│ │ │ │ ├── 0293_object_id.md
│ │ │ │ ├── 0337_dir_entry.md
│ │ │ │ ├── 0409_accessor_capabilities.md
│ │ │ │ ├── 0413_presign.md
│ │ │ │ ├── 0423_command_line_interface.md
│ │ │ │ ├── 0429_init_from_iter.md
│ │ │ │ ├── 0438_multipart.md
│ │ │ │ ├── 0443_gateway.md
│ │ │ │ ├── 0501_new_builder.md
│ │ │ │ ├── 0554_write_refactor.md
│ │ │ │ ├── 0561_list_metadata_reuse.md
│ │ │ │ ├── 0599_blocking_api.md
│ │ │ │ ├── 0623_redis_service.md
│ │ │ │ ├── 0627_split_capabilities.md
│ │ │ │ ├── 0661_path_in_accessor.md
│ │ │ │ ├── 0793_generic_kv_services.md
│ │ │ │ ├── 0926_object_reader.md
│ │ │ │ ├── 0977_refactor_error.md
│ │ │ │ ├── 1085_object_handler.md
│ │ │ │ ├── 1391_object_metadataer.md
│ │ │ │ ├── 1398_query_based_metadata.md
│ │ │ │ ├── 1420_object_writer.md
│ │ │ │ ├── 1477_remove_object_concept.md
│ │ │ │ ├── 1735_operation_extension.md
│ │ │ │ ├── 2083_writer_sink_api.md
│ │ │ │ ├── 2133_append_api.md
│ │ │ │ ├── 2299_chain_based_operator_api.md
│ │ │ │ ├── 2602_object_versioning.md
│ │ │ │ ├── 2758_merge_append_into_write.md
│ │ │ │ ├── 2774_lister_api.md
│ │ │ │ ├── 2779_list_with_metakey.md
│ │ │ │ ├── 2852_native_capability.md
│ │ │ │ ├── 2884_merge_range_read_into_read.md
│ │ │ │ ├── 3017_remove_write_copy_from.md
│ │ │ │ ├── 3197_config.md
│ │ │ │ ├── 3232_align_list_api.md
│ │ │ │ ├── 3243_list_prefix.md
│ │ │ │ ├── 3356_lazy_reader.md
│ │ │ │ ├── 3526_list_recursive.md
│ │ │ │ ├── 3574_concurrent_stat_in_list.md
│ │ │ │ ├── 3734_buffered_reader.md
│ │ │ │ ├── 3898_concurrent_writer.md
│ │ │ │ ├── 3911_deleter_api.md
│ │ │ │ ├── 4382_range_based_read.md
│ │ │ │ ├── 4638_executor.md
│ │ │ │ ├── 5314_remove_metakey.md
│ │ │ │ ├── 5444_operator_from_uri.md
│ │ │ │ ├── 5479_context.md
│ │ │ │ ├── 5485_conditional_reader.md
│ │ │ │ ├── 5495_list_with_deleted.md
│ │ │ │ ├── 5556_write_returns_metadata.md
│ │ │ │ ├── 5871_read_returns_metadata.md
│ │ │ │ ├── 6189_remove_native_blocking.md
│ │ │ │ ├── 6209_glob_support.md
│ │ │ │ ├── 6213_options_api.md
│ │ │ │ ├── 6370_foyer_integration.md
│ │ │ │ ├── 6678_simulate_layer.md
│ │ │ │ ├── 6707_capability_override_layer.md
│ │ │ │ ├── 6817_checksum.md
│ │ │ │ ├── 6828_core.md
│ │ │ │ ├── 7130_route_layer.md
│ │ │ │ ├── mod.rs
│ │ │ │ └── README.md
│ │ │ └── upgrade.md
│ │ ├── layers
│ │ │ ├── complete.rs
│ │ │ ├── correctness_check.rs
│ │ │ ├── error_context.rs
│ │ │ ├── http_client.rs
│ │ │ ├── mod.rs
│ │ │ ├── simulate.rs
│ │ │ └── type_eraser.rs
│ │ ├── lib.rs
│ │ ├── raw
│ │ │ ├── accessor.rs
│ │ │ ├── atomic_util.rs
│ │ │ ├── enum_utils.rs
│ │ │ ├── futures_util.rs
│ │ │ ├── http_util
│ │ │ │ ├── body.rs
│ │ │ │ ├── bytes_content_range.rs
│ │ │ │ ├── bytes_range.rs
│ │ │ │ ├── client.rs
│ │ │ │ ├── error.rs
│ │ │ │ ├── header.rs
│ │ │ │ ├── mod.rs
│ │ │ │ ├── multipart.rs
│ │ │ │ └── uri.rs
│ │ │ ├── layer.rs
│ │ │ ├── mod.rs
│ │ │ ├── oio
│ │ │ │ ├── buf
│ │ │ │ │ ├── flex_buf.rs
│ │ │ │ │ ├── mod.rs
│ │ │ │ │ ├── pooled_buf.rs
│ │ │ │ │ └── queue_buf.rs
│ │ │ │ ├── delete
│ │ │ │ │ ├── api.rs
│ │ │ │ │ ├── batch_delete.rs
│ │ │ │ │ ├── mod.rs
│ │ │ │ │ └── one_shot_delete.rs
│ │ │ │ ├── entry.rs
│ │ │ │ ├── list
│ │ │ │ │ ├── api.rs
│ │ │ │ │ ├── flat_list.rs
│ │ │ │ │ ├── hierarchy_list.rs
│ │ │ │ │ ├── mod.rs
│ │ │ │ │ ├── page_list.rs
│ │ │ │ │ └── prefix_list.rs
│ │ │ │ ├── mod.rs
│ │ │ │ ├── read
│ │ │ │ │ ├── api.rs
│ │ │ │ │ └── mod.rs
│ │ │ │ └── write
│ │ │ │ ├── api.rs
│ │ │ │ ├── append_write.rs
│ │ │ │ ├── block_write.rs
│ │ │ │ ├── mod.rs
│ │ │ │ ├── multipart_write.rs
│ │ │ │ ├── one_shot_write.rs
│ │ │ │ └── position_write.rs
│ │ │ ├── operation.rs
│ │ │ ├── ops.rs
│ │ │ ├── path_cache.rs
│ │ │ ├── path.rs
│ │ │ ├── rps.rs
│ │ │ ├── serde_util.rs
│ │ │ ├── std_io_util.rs
│ │ │ ├── time.rs
│ │ │ ├── tokio_util.rs
│ │ │ └── version.rs
│ │ ├── services
│ │ │ ├── memory
│ │ │ │ ├── backend.rs
│ │ │ │ ├── config.rs
│ │ │ │ ├── core.rs
│ │ │ │ ├── deleter.rs
│ │ │ │ ├── docs.md
│ │ │ │ ├── lister.rs
│ │ │ │ ├── mod.rs
│ │ │ │ └── writer.rs
│ │ │ └── mod.rs
│ │ └── types
│ │ ├── buffer.rs
│ │ ├── builder.rs
│ │ ├── capability.rs
│ │ ├── context
│ │ │ ├── mod.rs
│ │ │ ├── read.rs
│ │ │ └── write.rs
│ │ ├── delete
│ │ │ ├── deleter.rs
│ │ │ ├── futures_delete_sink.rs
│ │ │ ├── input.rs
│ │ │ └── mod.rs
│ │ ├── entry.rs
│ │ ├── error.rs
│ │ ├── execute
│ │ │ ├── api.rs
│ │ │ ├── executor.rs
│ │ │ ├── executors
│ │ │ │ ├── mod.rs
│ │ │ │ └── tokio_executor.rs
│ │ │ └── mod.rs
│ │ ├── list.rs
│ │ ├── metadata.rs
│ │ ├── mod.rs
│ │ ├── mode.rs
│ │ ├── operator
│ │ │ ├── builder.rs
│ │ │ ├── info.rs
│ │ │ ├── mod.rs
│ │ │ ├── operator_futures.rs
│ │ │ ├── operator.rs
│ │ │ ├── registry.rs
│ │ │ └── uri.rs
│ │ ├── options.rs
│ │ ├── read
│ │ │ ├── buffer_stream.rs
│ │ │ ├── futures_async_reader.rs
│ │ │ ├── futures_bytes_stream.rs
│ │ │ ├── mod.rs
│ │ │ └── reader.rs
│ │ └── write
│ │ ├── buffer_sink.rs
│ │ ├── futures_async_writer.rs
│ │ ├── futures_bytes_sink.rs
│ │ ├── mod.rs
│ │ └── writer.rs
│ ├── DEPENDENCIES.md
│ ├── DEPENDENCIES.rust.tsv
│ ├── edge
│ │ ├── file_write_on_full_disk
│ │ │ ├── Cargo.toml
│ │ │ ├── README.md
│ │ │ └── src
│ │ │ └── main.rs
│ │ ├── README.md
│ │ ├── s3_aws_assume_role_with_web_identity
│ │ │ ├── Cargo.toml
│ │ │ ├── README.md
│ │ │ └── src
│ │ │ └── main.rs
│ │ └── s3_read_on_wasm
│ │ ├── .gitignore
│ │ ├── Cargo.toml
│ │ ├── README.md
│ │ ├── src
│ │ │ └── lib.rs
│ │ └── webdriver.json
│ ├── fuzz
│ │ ├── .gitignore
│ │ ├── Cargo.toml
│ │ ├── fuzz_reader.rs
│ │ ├── fuzz_writer.rs
│ │ └── README.md
│ ├── layers
│ │ ├── async-backtrace
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ └── lib.rs
│ │ ├── await-tree
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ └── lib.rs
│ │ ├── capability-check
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ └── lib.rs
│ │ ├── chaos
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ └── lib.rs
│ │ ├── concurrent-limit
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ └── lib.rs
│ │ ├── dtrace
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ └── lib.rs
│ │ ├── fastmetrics
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ └── lib.rs
│ │ ├── fastrace
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ └── lib.rs
│ │ ├── foyer
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ └── lib.rs
│ │ ├── hotpath
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ └── lib.rs
│ │ ├── immutable-index
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ └── lib.rs
│ │ ├── logging
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ └── lib.rs
│ │ ├── metrics
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ └── lib.rs
│ │ ├── mime-guess
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ └── lib.rs
│ │ ├── observe-metrics-common
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ └── lib.rs
│ │ ├── otelmetrics
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ └── lib.rs
│ │ ├── oteltrace
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ └── lib.rs
│ │ ├── prometheus
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ └── lib.rs
│ │ ├── prometheus-client
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ └── lib.rs
│ │ ├── retry
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ └── lib.rs
│ │ ├── route
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ └── lib.rs
│ │ ├── tail-cut
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ └── lib.rs
│ │ ├── throttle
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ └── lib.rs
│ │ ├── timeout
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ └── lib.rs
│ │ └── tracing
│ │ ├── Cargo.toml
│ │ └── src
│ │ └── lib.rs
│ ├── LICENSE
│ ├── README.md
│ ├── services
│ │ ├── aliyun-drive
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── error.rs
│ │ │ ├── lib.rs
│ │ │ ├── lister.rs
│ │ │ └── writer.rs
│ │ ├── alluxio
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── error.rs
│ │ │ ├── lib.rs
│ │ │ ├── lister.rs
│ │ │ └── writer.rs
│ │ ├── azblob
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── error.rs
│ │ │ ├── lib.rs
│ │ │ ├── lister.rs
│ │ │ └── writer.rs
│ │ ├── azdls
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── error.rs
│ │ │ ├── lib.rs
│ │ │ ├── lister.rs
│ │ │ └── writer.rs
│ │ ├── azfile
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── error.rs
│ │ │ ├── lib.rs
│ │ │ ├── lister.rs
│ │ │ └── writer.rs
│ │ ├── azure-common
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ └── lib.rs
│ │ ├── b2
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── error.rs
│ │ │ ├── lib.rs
│ │ │ ├── lister.rs
│ │ │ └── writer.rs
│ │ ├── cacache
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── lib.rs
│ │ │ └── writer.rs
│ │ ├── cloudflare-kv
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── error.rs
│ │ │ ├── lib.rs
│ │ │ ├── lister.rs
│ │ │ ├── model.rs
│ │ │ └── writer.rs
│ │ ├── compfs
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── lib.rs
│ │ │ ├── lister.rs
│ │ │ ├── reader.rs
│ │ │ └── writer.rs
│ │ ├── cos
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── error.rs
│ │ │ ├── lib.rs
│ │ │ ├── lister.rs
│ │ │ └── writer.rs
│ │ ├── d1
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── error.rs
│ │ │ ├── lib.rs
│ │ │ ├── model.rs
│ │ │ └── writer.rs
│ │ ├── dashmap
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── lib.rs
│ │ │ ├── lister.rs
│ │ │ └── writer.rs
│ │ ├── dbfs
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── error.rs
│ │ │ ├── lib.rs
│ │ │ ├── lister.rs
│ │ │ └── writer.rs
│ │ ├── dropbox
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── builder.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── error.rs
│ │ │ ├── lib.rs
│ │ │ ├── lister.rs
│ │ │ └── writer.rs
│ │ ├── etcd
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── error.rs
│ │ │ ├── lib.rs
│ │ │ ├── lister.rs
│ │ │ └── writer.rs
│ │ ├── foundationdb
│ │ │ ├── build.rs
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── lib.rs
│ │ │ └── writer.rs
│ │ ├── fs
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── error.rs
│ │ │ ├── lib.rs
│ │ │ ├── lister.rs
│ │ │ ├── reader.rs
│ │ │ └── writer.rs
│ │ ├── ftp
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── err.rs
│ │ │ ├── lib.rs
│ │ │ ├── lister.rs
│ │ │ ├── reader.rs
│ │ │ └── writer.rs
│ │ ├── gcs
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── error.rs
│ │ │ ├── lib.rs
│ │ │ ├── lister.rs
│ │ │ ├── uri.rs
│ │ │ └── writer.rs
│ │ ├── gdrive
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── builder.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── error.rs
│ │ │ ├── lib.rs
│ │ │ ├── lister.rs
│ │ │ └── writer.rs
│ │ ├── ghac
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── docs.md
│ │ │ ├── error.rs
│ │ │ ├── lib.rs
│ │ │ └── writer.rs
│ │ ├── github
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── error.rs
│ │ │ ├── lib.rs
│ │ │ ├── lister.rs
│ │ │ ├── mod.rs
│ │ │ └── writer.rs
│ │ ├── gridfs
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── lib.rs
│ │ │ └── writer.rs
│ │ ├── hdfs
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── lib.rs
│ │ │ ├── lister.rs
│ │ │ ├── reader.rs
│ │ │ └── writer.rs
│ │ ├── hdfs-native
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── error.rs
│ │ │ ├── lib.rs
│ │ │ ├── lister.rs
│ │ │ ├── reader.rs
│ │ │ └── writer.rs
│ │ ├── http
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── docs.md
│ │ │ ├── error.rs
│ │ │ └── lib.rs
│ │ ├── huggingface
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── docs.md
│ │ │ ├── error.rs
│ │ │ ├── lib.rs
│ │ │ └── lister.rs
│ │ ├── ipfs
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── docs.md
│ │ │ ├── error.rs
│ │ │ ├── ipld.rs
│ │ │ └── lib.rs
│ │ ├── ipmfs
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── builder.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── error.rs
│ │ │ ├── lib.rs
│ │ │ ├── lister.rs
│ │ │ └── writer.rs
│ │ ├── koofr
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── error.rs
│ │ │ ├── lib.rs
│ │ │ ├── lister.rs
│ │ │ └── writer.rs
│ │ ├── lakefs
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── error.rs
│ │ │ ├── lib.rs
│ │ │ ├── lister.rs
│ │ │ └── writer.rs
│ │ ├── memcached
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── binary.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── lib.rs
│ │ │ └── writer.rs
│ │ ├── mini_moka
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── lib.rs
│ │ │ ├── lister.rs
│ │ │ └── writer.rs
│ │ ├── moka
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── lib.rs
│ │ │ ├── lister.rs
│ │ │ └── writer.rs
│ │ ├── mongodb
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── lib.rs
│ │ │ └── writer.rs
│ │ ├── monoiofs
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── lib.rs
│ │ │ ├── reader.rs
│ │ │ └── writer.rs
│ │ ├── mysql
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── lib.rs
│ │ │ └── writer.rs
│ │ ├── obs
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── error.rs
│ │ │ ├── lib.rs
│ │ │ ├── lister.rs
│ │ │ └── writer.rs
│ │ ├── onedrive
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── builder.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── error.rs
│ │ │ ├── graph_model.rs
│ │ │ ├── lib.rs
│ │ │ ├── lister.rs
│ │ │ └── writer.rs
│ │ ├── opfs
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── docs.md
│ │ │ ├── error.rs
│ │ │ ├── lib.rs
│ │ │ └── utils.rs
│ │ ├── oss
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── error.rs
│ │ │ ├── lib.rs
│ │ │ ├── lister.rs
│ │ │ └── writer.rs
│ │ ├── pcloud
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── error.rs
│ │ │ ├── lib.rs
│ │ │ ├── lister.rs
│ │ │ └── writer.rs
│ │ ├── persy
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── lib.rs
│ │ │ └── writer.rs
│ │ ├── postgresql
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── lib.rs
│ │ │ └── writer.rs
│ │ ├── redb
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── lib.rs
│ │ │ └── writer.rs
│ │ ├── redis
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── delete.rs
│ │ │ ├── docs.md
│ │ │ ├── lib.rs
│ │ │ └── writer.rs
│ │ ├── rocksdb
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── lib.rs
│ │ │ ├── lister.rs
│ │ │ └── writer.rs
│ │ ├── s3
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── compatible_services.md
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── error.rs
│ │ │ ├── lib.rs
│ │ │ ├── lister.rs
│ │ │ ├── mod.rs
│ │ │ └── writer.rs
│ │ ├── seafile
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── error.rs
│ │ │ ├── lib.rs
│ │ │ ├── lister.rs
│ │ │ └── writer.rs
│ │ ├── sftp
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── error.rs
│ │ │ ├── lib.rs
│ │ │ ├── lister.rs
│ │ │ ├── reader.rs
│ │ │ ├── utils.rs
│ │ │ └── writer.rs
│ │ ├── sled
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── lib.rs
│ │ │ ├── lister.rs
│ │ │ └── writer.rs
│ │ ├── sqlite
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── lib.rs
│ │ │ └── writer.rs
│ │ ├── surrealdb
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── lib.rs
│ │ │ └── writer.rs
│ │ ├── swift
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── compatible_services.md
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── error.rs
│ │ │ ├── lib.rs
│ │ │ ├── lister.rs
│ │ │ └── writer.rs
│ │ ├── tikv
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── lib.rs
│ │ │ └── writer.rs
│ │ ├── upyun
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── error.rs
│ │ │ ├── lib.rs
│ │ │ ├── lister.rs
│ │ │ └── writer.rs
│ │ ├── vercel-artifacts
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── builder.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── docs.md
│ │ │ ├── error.rs
│ │ │ ├── lib.rs
│ │ │ └── writer.rs
│ │ ├── vercel-blob
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── error.rs
│ │ │ ├── lib.rs
│ │ │ ├── lister.rs
│ │ │ └── writer.rs
│ │ ├── webdav
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── error.rs
│ │ │ ├── lib.rs
│ │ │ ├── lister.rs
│ │ │ └── writer.rs
│ │ ├── webhdfs
│ │ │ ├── Cargo.toml
│ │ │ └── src
│ │ │ ├── backend.rs
│ │ │ ├── config.rs
│ │ │ ├── core.rs
│ │ │ ├── deleter.rs
│ │ │ ├── docs.md
│ │ │ ├── error.rs
│ │ │ ├── lib.rs
│ │ │ ├── lister.rs
│ │ │ ├── message.rs
│ │ │ └── writer.rs
│ │ └── yandex-disk
│ │ ├── Cargo.toml
│ │ └── src
│ │ ├── backend.rs
│ │ ├── config.rs
│ │ ├── core.rs
│ │ ├── deleter.rs
│ │ ├── docs.md
│ │ ├── error.rs
│ │ ├── lib.rs
│ │ ├── lister.rs
│ │ └── writer.rs
│ ├── src
│ │ └── lib.rs
│ ├── testkit
│ │ ├── Cargo.toml
│ │ └── src
│ │ ├── lib.rs
│ │ ├── read.rs
│ │ ├── utils.rs
│ │ └── write.rs
│ ├── tests
│ │ ├── behavior
│ │ │ ├── async_copy.rs
│ │ │ ├── async_create_dir.rs
│ │ │ ├── async_delete.rs
│ │ │ ├── async_list.rs
│ │ │ ├── async_presign.rs
│ │ │ ├── async_read.rs
│ │ │ ├── async_rename.rs
│ │ │ ├── async_stat.rs
│ │ │ ├── async_write.rs
│ │ │ ├── main.rs
│ │ │ ├── README.md
│ │ │ └── utils.rs
│ │ └── data
│ │ ├── normal_dir
│ │ │ └── .gitkeep
│ │ ├── normal_file.txt
│ │ ├── special_dir !@#$%^&()_+-=;',
│ │ │ └── .gitkeep
│ │ └── special_file !@#$%^&()_+-=;',.txt
│ ├── upgrade.md
│ └── users.md
├── deny.toml
├── DEPENDENCIES.md
├── dev
│ ├── Cargo.lock
│ ├── Cargo.toml
│ ├── README.md
│ └── src
│ ├── generate
│ │ ├── java.j2
│ │ ├── java.rs
│ │ ├── mod.rs
│ │ ├── parser.rs
│ │ ├── python.j2
│ │ └── python.rs
│ ├── main.rs
│ └── release
│ ├── mod.rs
│ └── package.rs
├── doap.rdf
├── fixtures
│ ├── alluxio
│ │ └── docker-compose-alluxio.yml
│ ├── azblob
│ │ └── docker-compose-azurite.yml
│ ├── data
│ │ ├── normal_dir
│ │ │ └── .gitkeep
│ │ ├── normal_file.txt
│ │ ├── special_dir !@#$%^&()_+-=;',
│ │ │ └── .gitkeep
│ │ └── special_file !@#$%^&()_+-=;',.txt
│ ├── etcd
│ │ ├── ca-key.pem
│ │ ├── ca.pem
│ │ ├── client-key.pem
│ │ ├── client.pem
│ │ ├── docker-compose-cluster.yml
│ │ ├── docker-compose-standalone-tls.yml
│ │ ├── docker-compose-standalone.yml
│ │ ├── server-key.pem
│ │ └── server.pem
│ ├── ftp
│ │ └── docker-compose-vsftpd.yml
│ ├── hdfs
│ │ ├── azurite-azblob-core-site.xml
│ │ ├── docker-compose-hdfs-cluster.yml
│ │ ├── gcs-core-site.xml
│ │ ├── hdfs-site.xml
│ │ └── minio-s3-core-site.xml
│ ├── http
│ │ ├── Caddyfile
│ │ ├── docker-compose-caddy.yml
│ │ ├── docker-compose-nginx.yml
│ │ └── nginx.conf
│ ├── libsql
│ │ ├── docker-compose-auth.yml
│ │ └── docker-compose.yml
│ ├── memcached
│ │ ├── docker-compose-memcached-with-auth.yml
│ │ └── docker-compose-memcached.yml
│ ├── mongodb
│ │ ├── docker-compose-basic-auth.yml
│ │ └── docker-compose-no-auth.yml
│ ├── mysql
│ │ ├── docker-compose.yml
│ │ └── init.sql
│ ├── postgresql
│ │ ├── docker-compose.yml
│ │ └── init.sql
│ ├── redis
│ │ ├── docker-compose-dragonfly.yml
│ │ ├── docker-compose-kvrocks.yml
│ │ ├── docker-compose-redis-cluster-tls.yml
│ │ ├── docker-compose-redis-cluster.yml
│ │ ├── docker-compose-redis-tls.yml
│ │ ├── docker-compose-redis.yml
│ │ └── ssl
│ │ ├── .gitignore
│ │ ├── ca.crt
│ │ ├── ca.key
│ │ ├── ca.srl
│ │ ├── README.md
│ │ ├── redis.crt
│ │ ├── redis.key
│ │ └── req.conf
│ ├── s3
│ │ ├── docker-compose-ceph-rados.yml
│ │ └── docker-compose-minio.yml
│ ├── seafile
│ │ └── docker-compose-seafile.yml
│ ├── sftp
│ │ ├── change_root_dir.sh
│ │ ├── docker-compose-sftp-with-default-root.yml
│ │ ├── docker-compose-sftp.yml
│ │ ├── health-check.sh
│ │ ├── test_ssh_key
│ │ └── test_ssh_key.pub
│ ├── sqlite
│ │ └── data.sql
│ ├── swift
│ │ ├── docker-compose-ceph-rados.yml
│ │ └── docker-compose-swift.yml
│ ├── tikv
│ │ ├── gen_cert.sh
│ │ ├── pd-tls.toml
│ │ ├── pd.toml
│ │ ├── ssl
│ │ │ ├── ca-key.pem
│ │ │ ├── ca.pem
│ │ │ ├── client-key.pem
│ │ │ ├── client.pem
│ │ │ ├── pd-server-key.pem
│ │ │ ├── pd-server.pem
│ │ │ ├── tikv-server-key.pem
│ │ │ └── tikv-server.pem
│ │ ├── tikv-tls.toml
│ │ └── tikv.toml
│ ├── webdav
│ │ ├── config
│ │ │ └── nginx
│ │ │ └── http.conf
│ │ ├── docker-compose-webdav-jfrog.yml
│ │ ├── docker-compose-webdav-nextcloud.yml
│ │ ├── docker-compose-webdav-owncloud.yml
│ │ ├── docker-compose-webdav-with-auth.yml
│ │ ├── docker-compose-webdav-with-empty-passwd.yml
│ │ ├── docker-compose-webdav.yml
│ │ └── health-check-nextcloud.sh
│ └── webhdfs
│ └── docker-compose-webhdfs.yml
├── justfile
├── LICENSE
├── licenserc.toml
├── NOTICE
├── README.md
├── rust-toolchain.toml
├── rustfmt.toml
└── scripts
├── constants.py
├── dependencies.py
├── merge_local_staging.py
├── README.md
├── verify.py
└── workspace.py
```
# Files
--------------------------------------------------------------------------------
/core/layers/foyer/src/lib.rs:
--------------------------------------------------------------------------------
```rust
1 | // Licensed to the Apache Software Foundation (ASF) under one
2 | // or more contributor license agreements. See the NOTICE file
3 | // distributed with this work for additional information
4 | // regarding copyright ownership. The ASF licenses this file
5 | // to you under the Apache License, Version 2.0 (the
6 | // "License"); you may not use this file except in compliance
7 | // with the License. You may obtain a copy of the License at
8 | //
9 | // http://www.apache.org/licenses/LICENSE-2.0
10 | //
11 | // Unless required by applicable law or agreed to in writing,
12 | // software distributed under the License is distributed on an
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 | // KIND, either express or implied. See the License for the
15 | // specific language governing permissions and limitations
16 | // under the License.
17 |
18 | use std::{
19 | future::Future,
20 | ops::{Bound, Deref, Range, RangeBounds},
21 | sync::Arc,
22 | };
23 |
24 | use foyer::{Code, CodeError, Error as FoyerError, HybridCache};
25 |
26 | use opendal_core::raw::oio::*;
27 | use opendal_core::raw::*;
28 | use opendal_core::*;
29 |
30 | /// Custom error type for when fetched data exceeds size limit.
31 | #[derive(Debug)]
32 | struct FetchSizeTooLarge;
33 |
34 | impl std::fmt::Display for FetchSizeTooLarge {
35 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 | write!(f, "fetched data size exceeds size limit")
37 | }
38 | }
39 |
40 | impl std::error::Error for FetchSizeTooLarge {}
41 |
42 | fn extract_err(e: FoyerError) -> Error {
43 | let e = match e.downcast::<Error>() {
44 | Ok(e) => return e,
45 | Err(e) => e,
46 | };
47 | Error::new(ErrorKind::Unexpected, e.to_string())
48 | }
49 |
50 | /// [`FoyerKey`] is a key for the foyer cache. It's encoded via bincode, which is
51 | /// backed by foyer's "serde" feature.
52 | ///
53 | /// It's possible to specify a version in the [`OpRead`] args:
54 | ///
55 | /// - If a version is given, the object is cached under that versioned key.
56 | /// - If version is not supplied, the object is cached exactly as returned by the backend,
57 | /// We do NOT interpret `None` as "latest" and we do not promote it to any other version.
58 | #[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Serialize, serde::Deserialize)]
59 | pub struct FoyerKey {
60 | pub path: String,
61 | pub version: Option<String>,
62 | }
63 |
64 | /// [`FoyerValue`] is a wrapper around `Buffer` that implements the `Code` trait.
65 | #[derive(Debug)]
66 | pub struct FoyerValue(pub Buffer);
67 |
68 | impl Deref for FoyerValue {
69 | type Target = Buffer;
70 |
71 | fn deref(&self) -> &Self::Target {
72 | &self.0
73 | }
74 | }
75 |
76 | impl Code for FoyerValue {
77 | fn encode(&self, writer: &mut impl std::io::Write) -> std::result::Result<(), CodeError> {
78 | let len = self.0.len() as u64;
79 | writer.write_all(&len.to_le_bytes())?;
80 | std::io::copy(&mut self.0.clone(), writer)?;
81 | Ok(())
82 | }
83 |
84 | fn decode(reader: &mut impl std::io::Read) -> std::result::Result<Self, CodeError>
85 | where
86 | Self: Sized,
87 | {
88 | let mut len_bytes = [0u8; 8];
89 | reader.read_exact(&mut len_bytes)?;
90 | let len = u64::from_le_bytes(len_bytes) as usize;
91 | let mut buffer = vec![0u8; len];
92 | reader.read_exact(&mut buffer[..len])?;
93 | Ok(FoyerValue(buffer.into()))
94 | }
95 |
96 | fn estimated_size(&self) -> usize {
97 | 8 + self.0.len()
98 | }
99 | }
100 |
101 | /// Hybrid cache layer for OpenDAL that uses [foyer](https://github.com/foyer-rs/foyer) for caching.
102 | ///
103 | /// # Operation Behavior
104 | /// - `write`: [`FoyerLayer`] will write to the foyer hybrid cache after the service's write operation is completed.
105 | /// - `read`: [`FoyerLayer`] will first check the foyer hybrid cache for the data. If the data is not found, it will perform the read operation on the service and cache the result.
106 | /// - `delete`: [`FoyerLayer`] will remove the data from the foyer hybrid cache regardless of whether the service's delete operation is successful.
107 | /// - Other operations: [`FoyerLayer`] will not cache the results of other operations, such as `list`, `copy`, `rename`, etc. They will be passed through to the underlying accessor without caching.
108 | ///
109 | /// # Examples
110 | ///
111 | /// ```no_run
112 | /// use opendal_core::{Operator, services::Memory};
113 | /// use opendal_layer_foyer::FoyerLayer;
114 | /// use foyer::{HybridCacheBuilder, Engine};
115 | ///
116 | /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
117 | /// let cache = HybridCacheBuilder::new()
118 | /// .memory(64 * 1024 * 1024) // 64MB memory cache
119 | /// .with_shards(4)
120 | /// .storage(Engine::Large(Default::default()))
121 | /// .build()
122 | /// .await?;
123 | ///
124 | /// let op = Operator::new(Memory::default())?
125 | /// .layer(FoyerLayer::new(cache))
126 | /// .finish();
127 | /// # Ok(())
128 | /// # }
129 | /// ```
130 | ///
131 | /// # Note
132 | ///
133 | /// If the object version is enabled, the foyer cache layer will treat the objects with same path but different versions as different objects.
134 | #[derive(Debug)]
135 | pub struct FoyerLayer {
136 | cache: HybridCache<FoyerKey, FoyerValue>,
137 | size_limit: Range<usize>,
138 | }
139 |
140 | impl FoyerLayer {
141 | /// Creates a new `FoyerLayer` with the given foyer hybrid cache.
142 | pub fn new(cache: HybridCache<FoyerKey, FoyerValue>) -> Self {
143 | FoyerLayer {
144 | cache,
145 | size_limit: 0..usize::MAX,
146 | }
147 | }
148 |
149 | /// Sets the size limit for caching.
150 | ///
151 | /// It is recommended to set a size limit to avoid caching large files that may not be suitable for caching.
152 | pub fn with_size_limit<R: RangeBounds<usize>>(mut self, size_limit: R) -> Self {
153 | let start = match size_limit.start_bound() {
154 | Bound::Included(v) => *v,
155 | Bound::Excluded(v) => *v + 1,
156 | Bound::Unbounded => 0,
157 | };
158 | let end = match size_limit.end_bound() {
159 | Bound::Included(v) => *v + 1,
160 | Bound::Excluded(v) => *v,
161 | Bound::Unbounded => usize::MAX,
162 | };
163 | self.size_limit = start..end;
164 | self
165 | }
166 | }
167 |
168 | impl<A: Access> Layer<A> for FoyerLayer {
169 | type LayeredAccess = FoyerAccessor<A>;
170 |
171 | fn layer(&self, accessor: A) -> Self::LayeredAccess {
172 | let cache = self.cache.clone();
173 | FoyerAccessor {
174 | inner: Arc::new(Inner {
175 | accessor,
176 | cache,
177 | size_limit: self.size_limit.clone(),
178 | }),
179 | }
180 | }
181 | }
182 |
183 | #[derive(Debug)]
184 | struct Inner<A: Access> {
185 | accessor: A,
186 | cache: HybridCache<FoyerKey, FoyerValue>,
187 | size_limit: Range<usize>,
188 | }
189 |
190 | #[derive(Debug)]
191 | pub struct FoyerAccessor<A: Access> {
192 | inner: Arc<Inner<A>>,
193 | }
194 |
195 | impl<A: Access> LayeredAccess for FoyerAccessor<A> {
196 | type Inner = A;
197 | type Reader = Buffer;
198 | type Writer = Writer<A>;
199 | type Lister = A::Lister;
200 | type Deleter = Deleter<A>;
201 |
202 | fn inner(&self) -> &Self::Inner {
203 | &self.inner.accessor
204 | }
205 |
206 | fn info(&self) -> Arc<AccessorInfo> {
207 | self.inner.accessor.info()
208 | }
209 |
210 | async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
211 | let path_str = path.to_string();
212 | let version = args.version().map(|v| v.to_string());
213 | let original_args = args.clone();
214 |
215 | // Extract range bounds before async block to avoid lifetime issues
216 | let (range_start, range_end) = {
217 | let r = args.range();
218 | let start = r.offset();
219 | let end = r.size().map(|size| start + size);
220 | (start, end)
221 | };
222 |
223 | // Use fetch to read data from cache or fallback to remote. fetch() can automatically
224 | // handle the thundering herd problem by ensuring only one request is made for a given
225 | // key.
226 | //
227 | // Please note that we only cache the object if it's smaller than size_limit. And we'll
228 | // fetch the ENTIRE object from remote to put it into cache, then slice it to the requested
229 | // range.
230 | let result = self
231 | .inner
232 | .cache
233 | .fetch(
234 | FoyerKey {
235 | path: path_str.clone(),
236 | version: version.clone(),
237 | },
238 | || {
239 | let inner = self.inner.clone();
240 | let path_clone = path_str.clone();
241 | async move {
242 | // read the metadata first, if it's too large, do not cache
243 | let metadata = inner
244 | .accessor
245 | .stat(&path_clone, OpStat::default())
246 | .await
247 | .map_err(FoyerError::other)?
248 | .into_metadata();
249 |
250 | let size = metadata.content_length() as usize;
251 | if !inner.size_limit.contains(&size) {
252 | return Err(FoyerError::other(FetchSizeTooLarge));
253 | }
254 |
255 | // fetch the ENTIRE object from remote.
256 | let (_, mut reader) = inner
257 | .accessor
258 | .read(
259 | &path_clone,
260 | OpRead::default().with_range(BytesRange::new(0, None)),
261 | )
262 | .await
263 | .map_err(FoyerError::other)?;
264 | let buffer = reader.read_all().await.map_err(FoyerError::other)?;
265 |
266 | Ok(FoyerValue(buffer))
267 | }
268 | },
269 | )
270 | .await;
271 |
272 | // If got entry from cache, slice it to the requested range. If it's larger than size_limit,
273 | // we'll simply forward the request to the underlying accessor with user's given range.
274 | match result {
275 | Ok(entry) => {
276 | let end = range_end.unwrap_or(entry.len() as u64);
277 | let range = BytesContentRange::default()
278 | .with_range(range_start, end - 1)
279 | .with_size(entry.len() as _);
280 | let buffer = entry.slice(range_start as usize..end as usize);
281 | let rp = RpRead::new()
282 | .with_size(Some(buffer.len() as _))
283 | .with_range(Some(range));
284 | Ok((rp, buffer))
285 | }
286 | Err(e) => match e.downcast::<FetchSizeTooLarge>() {
287 | Ok(_) => {
288 | let (rp, mut reader) = self.inner.accessor.read(path, original_args).await?;
289 | let buffer = reader.read_all().await?;
290 | Ok((rp, buffer))
291 | }
292 | Err(e) => Err(extract_err(e)),
293 | },
294 | }
295 | }
296 |
297 | fn write(
298 | &self,
299 | path: &str,
300 | args: OpWrite,
301 | ) -> impl Future<Output = Result<(RpWrite, Self::Writer)>> + MaybeSend {
302 | let inner = self.inner.clone();
303 | async move {
304 | let (rp, w) = self.inner.accessor.write(path, args).await?;
305 | Ok((
306 | rp,
307 | Writer {
308 | w,
309 | buf: QueueBuf::new(),
310 | path: path.to_string(),
311 | inner,
312 | skip_cache: false,
313 | },
314 | ))
315 | }
316 | }
317 |
318 | fn delete(&self) -> impl Future<Output = Result<(RpDelete, Self::Deleter)>> + MaybeSend {
319 | let inner = self.inner.clone();
320 | async move {
321 | let (rp, d) = inner.accessor.delete().await?;
322 | Ok((
323 | rp,
324 | Deleter {
325 | deleter: d,
326 | keys: vec![],
327 | inner,
328 | },
329 | ))
330 | }
331 | }
332 |
333 | async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
334 | self.inner.accessor.list(path, args).await
335 | }
336 |
337 | // TODO(MrCroxx): Implement copy, rename with foyer cache.
338 | }
339 |
340 | pub struct Writer<A: Access> {
341 | w: A::Writer,
342 | buf: QueueBuf,
343 | path: String,
344 | inner: Arc<Inner<A>>,
345 | skip_cache: bool,
346 | }
347 |
348 | impl<A: Access> oio::Write for Writer<A> {
349 | async fn write(&mut self, bs: Buffer) -> Result<()> {
350 | if self.inner.size_limit.contains(&(self.buf.len() + bs.len())) {
351 | self.buf.push(bs.clone());
352 | self.skip_cache = false;
353 | } else {
354 | self.buf.clear();
355 | self.skip_cache = true;
356 | }
357 | self.w.write(bs).await
358 | }
359 |
360 | async fn close(&mut self) -> Result<Metadata> {
361 | let buffer = self.buf.clone().collect();
362 | let metadata = self.w.close().await?;
363 | if !self.skip_cache {
364 | self.inner.cache.insert(
365 | FoyerKey {
366 | path: self.path.clone(),
367 | version: metadata.version().map(|v| v.to_string()),
368 | },
369 | FoyerValue(buffer),
370 | );
371 | }
372 | Ok(metadata)
373 | }
374 |
375 | async fn abort(&mut self) -> Result<()> {
376 | self.buf.clear();
377 | self.w.abort().await
378 | }
379 | }
380 |
381 | pub struct Deleter<A: Access> {
382 | deleter: A::Deleter,
383 | keys: Vec<FoyerKey>,
384 | inner: Arc<Inner<A>>,
385 | }
386 |
387 | impl<A: Access> oio::Delete for Deleter<A> {
388 | async fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
389 | self.deleter.delete(path, args.clone()).await?;
390 | self.keys.push(FoyerKey {
391 | path: path.to_string(),
392 | version: args.version().map(|v| v.to_string()),
393 | });
394 | Ok(())
395 | }
396 |
397 | async fn close(&mut self) -> Result<()> {
398 | for key in &self.keys {
399 | self.inner.cache.remove(key);
400 | }
401 | self.deleter.close().await
402 | }
403 | }
404 |
405 | #[cfg(test)]
406 | mod tests {
407 | use foyer::{
408 | DirectFsDeviceOptions, Engine, HybridCacheBuilder, LargeEngineOptions, RecoverMode,
409 | };
410 | use opendal_core::{Operator, services::Memory};
411 | use size::consts::MiB;
412 | use std::io::Cursor;
413 |
414 | use super::*;
415 |
416 | fn key(i: u8) -> String {
417 | format!("obj-{i}")
418 | }
419 |
420 | fn value(i: u8) -> Vec<u8> {
421 | // ~ 64KiB with metadata
422 | vec![i; 63 * 1024]
423 | }
424 |
425 | #[tokio::test]
426 | async fn test() {
427 | let dir = tempfile::tempdir().unwrap();
428 |
429 | let cache = HybridCacheBuilder::new()
430 | .memory(10)
431 | .with_shards(1)
432 | .storage(Engine::Large(LargeEngineOptions::new()))
433 | .with_device_options(
434 | DirectFsDeviceOptions::new(dir.path())
435 | .with_capacity(16 * MiB as usize)
436 | .with_file_size(MiB as usize),
437 | )
438 | .with_recover_mode(RecoverMode::None)
439 | .build()
440 | .await
441 | .unwrap();
442 |
443 | let op = Operator::new(Memory::default())
444 | .unwrap()
445 | .layer(FoyerLayer::new(cache.clone()))
446 | .finish();
447 |
448 | assert!(op.list("/").await.unwrap().is_empty());
449 |
450 | for i in 0..64 {
451 | op.write(&key(i), value(i)).await.unwrap();
452 | }
453 |
454 | assert_eq!(op.list("/").await.unwrap().len(), 64);
455 |
456 | for i in 0..64 {
457 | let buf = op.read(&key(i)).await.unwrap();
458 | assert_eq!(buf.to_vec(), value(i));
459 | }
460 |
461 | cache.clear().await.unwrap();
462 |
463 | for i in 0..64 {
464 | let buf = op.read(&key(i)).await.unwrap();
465 | assert_eq!(buf.to_vec(), value(i));
466 | }
467 |
468 | for i in 0..64 {
469 | op.delete(&key(i)).await.unwrap();
470 | }
471 |
472 | assert!(op.list("/").await.unwrap().is_empty());
473 |
474 | for i in 0..64 {
475 | let res = op.read(&key(i)).await;
476 | assert!(res.is_err(), "should fail to read deleted file");
477 | }
478 | }
479 |
480 | #[tokio::test]
481 | async fn test_size_limit() {
482 | let dir = tempfile::tempdir().unwrap();
483 |
484 | let cache = HybridCacheBuilder::new()
485 | .memory(1024 * 1024)
486 | .with_shards(1)
487 | .storage(Engine::Large(LargeEngineOptions::new()))
488 | .with_device_options(
489 | DirectFsDeviceOptions::new(dir.path())
490 | .with_capacity(16 * MiB as usize)
491 | .with_file_size(MiB as usize),
492 | )
493 | .with_recover_mode(RecoverMode::None)
494 | .build()
495 | .await
496 | .unwrap();
497 |
498 | // Set size limit: only cache files between 1KB and 10KB
499 | let op = Operator::new(Memory::default())
500 | .unwrap()
501 | .layer(FoyerLayer::new(cache.clone()).with_size_limit(1024..10 * 1024))
502 | .finish();
503 |
504 | let small_data = vec![1u8; 5 * 1024]; // 5KB - should be cached
505 | let large_data = vec![2u8; 20 * 1024]; // 20KB - should NOT be cached
506 | let tiny_data = vec![3u8; 512]; // 512B - below size limit, should NOT be cached
507 |
508 | // Write all files
509 | op.write("small.txt", small_data.clone()).await.unwrap();
510 | op.write("large.txt", large_data.clone()).await.unwrap();
511 | op.write("tiny.txt", tiny_data.clone()).await.unwrap();
512 |
513 | // All should be readable
514 | let read_small = op.read("small.txt").await.unwrap();
515 | assert_eq!(read_small.to_vec(), small_data);
516 |
517 | let read_large = op.read("large.txt").await.unwrap();
518 | assert_eq!(read_large.to_vec(), large_data);
519 |
520 | let read_tiny = op.read("tiny.txt").await.unwrap();
521 | assert_eq!(read_tiny.to_vec(), tiny_data);
522 |
523 | // Clear the cache to test read-through behavior
524 | cache.clear().await.unwrap();
525 |
526 | // All files should still be readable from underlying storage
527 | let read_small = op.read("small.txt").await.unwrap();
528 | assert_eq!(read_small.to_vec(), small_data);
529 |
530 | let read_large = op.read("large.txt").await.unwrap();
531 | assert_eq!(read_large.to_vec(), large_data);
532 |
533 | let read_tiny = op.read("tiny.txt").await.unwrap();
534 | assert_eq!(read_tiny.to_vec(), tiny_data);
535 |
536 | // After reading, small file should be cached, but large and tiny should not
537 | // We can verify this by reading with range - cached files should support range reads
538 | let read_small_range = op.read_with("small.txt").range(0..1024).await.unwrap();
539 | assert_eq!(read_small_range.len(), 1024);
540 | assert_eq!(read_small_range.to_vec(), small_data[0..1024]);
541 | }
542 |
543 | #[test]
544 | fn test_error() {
545 | let e = Error::new(ErrorKind::NotFound, "not found");
546 | let fe = FoyerError::other(e);
547 | let oe = extract_err(fe);
548 | assert_eq!(oe.kind(), ErrorKind::NotFound);
549 | }
550 |
551 | #[test]
552 | fn test_foyer_key_version_none_vs_empty() {
553 | let key_none = FoyerKey {
554 | path: "test/path".to_string(),
555 | version: None,
556 | };
557 |
558 | let key_empty = FoyerKey {
559 | path: "test/path".to_string(),
560 | version: Some("".to_string()),
561 | };
562 |
563 | let mut buf_none = Vec::new();
564 | key_none.encode(&mut buf_none).unwrap();
565 |
566 | let mut buf_empty = Vec::new();
567 | key_empty.encode(&mut buf_empty).unwrap();
568 |
569 | assert_ne!(
570 | buf_none, buf_empty,
571 | "Serialization of version=None and version=\"\" should be different"
572 | );
573 |
574 | let decoded_none = FoyerKey::decode(&mut Cursor::new(&buf_none)).unwrap();
575 | assert_eq!(decoded_none, key_none);
576 | let decoded_empty = FoyerKey::decode(&mut Cursor::new(&buf_empty)).unwrap();
577 | assert_eq!(decoded_empty, key_empty);
578 | }
579 |
580 | #[test]
581 | fn test_foyer_key_serde() {
582 | use std::io::Cursor;
583 |
584 | let test_cases = vec![
585 | FoyerKey {
586 | path: "simple".to_string(),
587 | version: None,
588 | },
589 | FoyerKey {
590 | path: "with/slash/path".to_string(),
591 | version: None,
592 | },
593 | FoyerKey {
594 | path: "versioned".to_string(),
595 | version: Some("v1.0.0".to_string()),
596 | },
597 | FoyerKey {
598 | path: "empty-version".to_string(),
599 | version: Some("".to_string()),
600 | },
601 | FoyerKey {
602 | path: "".to_string(),
603 | version: None,
604 | },
605 | FoyerKey {
606 | path: "unicode/路径/🚀".to_string(),
607 | version: Some("版本-1".to_string()),
608 | },
609 | FoyerKey {
610 | path: "long/".to_string().repeat(100),
611 | version: Some("long-version-".to_string().repeat(50)),
612 | },
613 | ];
614 |
615 | for original in test_cases {
616 | let mut buffer = Vec::new();
617 | original
618 | .encode(&mut buffer)
619 | .expect("encoding should succeed");
620 |
621 | let decoded =
622 | FoyerKey::decode(&mut Cursor::new(&buffer)).expect("decoding should succeed");
623 |
624 | assert_eq!(
625 | decoded, original,
626 | "decode(encode(key)) should equal original key"
627 | );
628 | }
629 | }
630 | }
631 |
```
--------------------------------------------------------------------------------
/core/services/aliyun-drive/src/core.rs:
--------------------------------------------------------------------------------
```rust
1 | // Licensed to the Apache Software Foundation (ASF) under one
2 | // or more contributor license agreements. See the NOTICE file
3 | // distributed with this work for additional information
4 | // regarding copyright ownership. The ASF licenses this file
5 | // to you under the Apache License, Version 2.0 (the
6 | // "License"); you may not use this file except in compliance
7 | // with the License. You may obtain a copy of the License at
8 | //
9 | // http://www.apache.org/licenses/LICENSE-2.0
10 | //
11 | // Unless required by applicable law or agreed to in writing,
12 | // software distributed under the License is distributed on an
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 | // KIND, either express or implied. See the License for the
15 | // specific language governing permissions and limitations
16 | // under the License.
17 |
18 | use std::fmt::Debug;
19 | use std::sync::Arc;
20 |
21 | use bytes::Buf;
22 | use http::Method;
23 | use http::Request;
24 | use http::Response;
25 | use http::header;
26 | use http::header::HeaderValue;
27 | use mea::mutex::Mutex;
28 | use serde::Deserialize;
29 | use serde::Serialize;
30 |
31 | use super::error::parse_error;
32 | use opendal_core::raw::*;
33 | use opendal_core::*;
34 |
35 | /// Available Aliyun Drive Type.
36 | #[derive(Debug, Deserialize, Default, Clone)]
37 | pub enum DriveType {
38 | /// Use the default type of Aliyun Drive.
39 | #[default]
40 | Default,
41 | /// Use the backup type of Aliyun Drive.
42 | ///
43 | /// Fallback to the default type if no backup drive is found.
44 | Backup,
45 | /// Use the resource type of Aliyun Drive.
46 | ///
47 | /// Fallback to the default type if no resource drive is found.
48 | Resource,
49 | }
50 |
51 | /// Available Aliyun Drive Signer Set
52 | pub enum AliyunDriveSign {
53 | Refresh(String, String, String, Option<String>, i64),
54 | Access(String),
55 | }
56 |
57 | pub struct AliyunDriveSigner {
58 | pub drive_id: Option<String>,
59 | pub sign: AliyunDriveSign,
60 | }
61 |
62 | pub struct AliyunDriveCore {
63 | pub info: Arc<AccessorInfo>,
64 |
65 | pub endpoint: String,
66 | pub root: String,
67 | pub drive_type: DriveType,
68 |
69 | pub signer: Arc<Mutex<AliyunDriveSigner>>,
70 | pub dir_lock: Arc<Mutex<()>>,
71 | }
72 |
73 | impl Debug for AliyunDriveCore {
74 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75 | f.debug_struct("AliyunDriveCore")
76 | .field("root", &self.root)
77 | .field("drive_type", &self.drive_type)
78 | .finish_non_exhaustive()
79 | }
80 | }
81 |
82 | impl AliyunDriveCore {
83 | async fn send(&self, mut req: Request<Buffer>, token: Option<&str>) -> Result<Buffer> {
84 | // AliyunDrive raise NullPointerException if you haven't set a user-agent.
85 | req.headers_mut().insert(
86 | header::USER_AGENT,
87 | HeaderValue::from_str(&format!("opendal/{VERSION}"))
88 | .expect("user agent must be valid header value"),
89 | );
90 | if req.method() == Method::POST {
91 | req.headers_mut().insert(
92 | header::CONTENT_TYPE,
93 | HeaderValue::from_static("application/json;charset=UTF-8"),
94 | );
95 | }
96 | if let Some(token) = token {
97 | req.headers_mut().insert(
98 | header::AUTHORIZATION,
99 | HeaderValue::from_str(&format_authorization_by_bearer(token)?)
100 | .expect("access token must be valid header value"),
101 | );
102 | }
103 | let res = self.info.http_client().send(req).await?;
104 | if !res.status().is_success() {
105 | return Err(parse_error(res));
106 | }
107 | Ok(res.into_body())
108 | }
109 |
110 | async fn get_access_token(
111 | &self,
112 | client_id: &str,
113 | client_secret: &str,
114 | refresh_token: &str,
115 | ) -> Result<Buffer> {
116 | let body = serde_json::to_vec(&AccessTokenRequest {
117 | refresh_token,
118 | grant_type: "refresh_token",
119 | client_id,
120 | client_secret,
121 | })
122 | .map_err(new_json_serialize_error)?;
123 | let req = Request::post(format!("{}/oauth/access_token", self.endpoint))
124 | .body(Buffer::from(body))
125 | .map_err(new_request_build_error)?;
126 | self.send(req, None).await
127 | }
128 |
129 | async fn get_drive_id(&self, token: Option<&str>) -> Result<Buffer> {
130 | let req = Request::post(format!("{}/adrive/v1.0/user/getDriveInfo", self.endpoint))
131 | .body(Buffer::new())
132 | .map_err(new_request_build_error)?;
133 | self.send(req, token).await
134 | }
135 |
136 | pub async fn get_token_and_drive(&self) -> Result<(Option<String>, String)> {
137 | let mut signer = self.signer.lock().await;
138 | let token = match &mut signer.sign {
139 | AliyunDriveSign::Access(access_token) => Some(access_token.clone()),
140 | AliyunDriveSign::Refresh(
141 | client_id,
142 | client_secret,
143 | refresh_token,
144 | access_token,
145 | expire_at,
146 | ) => {
147 | if *expire_at < Timestamp::now().into_inner().as_second() || access_token.is_none()
148 | {
149 | let res = self
150 | .get_access_token(client_id, client_secret, refresh_token)
151 | .await?;
152 | let output: RefreshTokenResponse = serde_json::from_reader(res.reader())
153 | .map_err(new_json_deserialize_error)?;
154 | *access_token = Some(output.access_token);
155 | *expire_at = output.expires_in + Timestamp::now().into_inner().as_second();
156 | *refresh_token = output.refresh_token;
157 | }
158 | access_token.clone()
159 | }
160 | };
161 | let Some(drive_id) = &signer.drive_id else {
162 | let res = self.get_drive_id(token.as_deref()).await?;
163 | let output: DriveInfoResponse =
164 | serde_json::from_reader(res.reader()).map_err(new_json_deserialize_error)?;
165 | let drive_id = match self.drive_type {
166 | DriveType::Default => output.default_drive_id,
167 | DriveType::Backup => output.backup_drive_id.unwrap_or(output.default_drive_id),
168 | DriveType::Resource => output.resource_drive_id.unwrap_or(output.default_drive_id),
169 | };
170 | signer.drive_id = Some(drive_id.clone());
171 | return Ok((token, drive_id));
172 | };
173 | Ok((token, drive_id.clone()))
174 | }
175 |
176 | pub fn build_path(&self, path: &str, rooted: bool) -> String {
177 | let file_path = if rooted {
178 | build_rooted_abs_path(&self.root, path)
179 | } else {
180 | build_abs_path(&self.root, path)
181 | };
182 | let file_path = file_path.strip_suffix('/').unwrap_or(&file_path);
183 | if file_path.is_empty() {
184 | return "/".to_string();
185 | }
186 | file_path.to_string()
187 | }
188 |
189 | pub async fn get_by_path(&self, path: &str) -> Result<Buffer> {
190 | let file_path = self.build_path(path, true);
191 | let req = Request::post(format!(
192 | "{}/adrive/v1.0/openFile/get_by_path",
193 | self.endpoint
194 | ));
195 | let (token, drive_id) = self.get_token_and_drive().await?;
196 | let body = serde_json::to_vec(&GetByPathRequest {
197 | drive_id: &drive_id,
198 | file_path: &file_path,
199 | })
200 | .map_err(new_json_serialize_error)?;
201 | let req = req
202 | .extension(Operation::Read)
203 | .body(Buffer::from(body))
204 | .map_err(new_request_build_error)?;
205 | self.send(req, token.as_deref()).await
206 | }
207 |
208 | pub async fn ensure_dir_exists(&self, path: &str) -> Result<String> {
209 | let file_path = self.build_path(path, false);
210 | if file_path == "/" {
211 | return Ok("root".to_string());
212 | }
213 | let file_path = file_path.strip_suffix('/').unwrap_or(&file_path);
214 | let paths = file_path.split('/').collect::<Vec<&str>>();
215 | let mut parent: Option<String> = None;
216 | for path in paths {
217 | let _guard = self.dir_lock.lock().await;
218 | let res = self
219 | .create(
220 | parent.as_deref(),
221 | path,
222 | CreateType::Folder,
223 | CheckNameMode::Refuse,
224 | )
225 | .await?;
226 | let output: CreateResponse =
227 | serde_json::from_reader(res.reader()).map_err(new_json_deserialize_error)?;
228 | parent = Some(output.file_id);
229 | }
230 | Ok(parent.expect("ensure_dir_exists must succeed"))
231 | }
232 |
233 | pub async fn create_with_rapid_upload(
234 | &self,
235 | parent_file_id: Option<&str>,
236 | name: &str,
237 | typ: CreateType,
238 | check_name_mode: CheckNameMode,
239 | size: Option<u64>,
240 | rapid_upload: Option<RapidUpload>,
241 | ) -> Result<Buffer> {
242 | let mut content_hash = None;
243 | let mut proof_code = None;
244 | let mut pre_hash = None;
245 | if let Some(rapid_upload) = rapid_upload {
246 | content_hash = rapid_upload.content_hash;
247 | proof_code = rapid_upload.proof_code;
248 | pre_hash = rapid_upload.pre_hash;
249 | }
250 |
251 | let (token, drive_id) = self.get_token_and_drive().await?;
252 | let body = serde_json::to_vec(&CreateRequest {
253 | drive_id: &drive_id,
254 | parent_file_id: parent_file_id.unwrap_or("root"),
255 | name,
256 | typ,
257 | check_name_mode,
258 | size,
259 | pre_hash: pre_hash.as_deref(),
260 | content_hash: content_hash.as_deref(),
261 | content_hash_name: content_hash.is_some().then_some("sha1"),
262 | proof_code: proof_code.as_deref(),
263 | proof_version: proof_code.is_some().then_some("v1"),
264 | })
265 | .map_err(new_json_serialize_error)?;
266 | let req = Request::post(format!("{}/adrive/v1.0/openFile/create", self.endpoint))
267 | .extension(Operation::Write)
268 | .body(Buffer::from(body))
269 | .map_err(new_request_build_error)?;
270 | self.send(req, token.as_deref()).await
271 | }
272 |
273 | pub async fn create(
274 | &self,
275 | parent_file_id: Option<&str>,
276 | name: &str,
277 | typ: CreateType,
278 | check_name_mode: CheckNameMode,
279 | ) -> Result<Buffer> {
280 | self.create_with_rapid_upload(parent_file_id, name, typ, check_name_mode, None, None)
281 | .await
282 | }
283 |
284 | async fn get_download_url(&self, file_id: &str) -> Result<String> {
285 | let (token, drive_id) = self.get_token_and_drive().await?;
286 | let body = serde_json::to_vec(&FileRequest {
287 | drive_id: &drive_id,
288 | file_id,
289 | })
290 | .map_err(new_json_serialize_error)?;
291 |
292 | let req = Request::post(format!(
293 | "{}/adrive/v1.0/openFile/getDownloadUrl",
294 | self.endpoint
295 | ))
296 | .extension(Operation::Read)
297 | .body(Buffer::from(body))
298 | .map_err(new_request_build_error)?;
299 |
300 | let res = self.send(req, token.as_deref()).await?;
301 |
302 | let output: GetDownloadUrlResponse =
303 | serde_json::from_reader(res.reader()).map_err(new_json_serialize_error)?;
304 |
305 | Ok(output.url)
306 | }
307 |
308 | pub async fn download(&self, file_id: &str, range: BytesRange) -> Result<Response<HttpBody>> {
309 | let download_url = self.get_download_url(file_id).await?;
310 | let req = Request::get(download_url)
311 | .extension(Operation::Read)
312 | .header(header::RANGE, range.to_header())
313 | .body(Buffer::new())
314 | .map_err(new_request_build_error)?;
315 | self.info.http_client().fetch(req).await
316 | }
317 |
318 | pub async fn move_path(&self, file_id: &str, to_parent_file_id: &str) -> Result<()> {
319 | let (token, drive_id) = self.get_token_and_drive().await?;
320 | let body = serde_json::to_vec(&MovePathRequest {
321 | drive_id: &drive_id,
322 | file_id,
323 | to_parent_file_id,
324 | check_name_mode: CheckNameMode::AutoRename,
325 | })
326 | .map_err(new_json_serialize_error)?;
327 | let req = Request::post(format!("{}/adrive/v1.0/openFile/move", self.endpoint))
328 | .extension(Operation::Write)
329 | .body(Buffer::from(body))
330 | .map_err(new_request_build_error)?;
331 | self.send(req, token.as_deref()).await?;
332 | Ok(())
333 | }
334 |
335 | pub async fn update_path(&self, file_id: &str, name: &str) -> Result<()> {
336 | let (token, drive_id) = self.get_token_and_drive().await?;
337 | let body = serde_json::to_vec(&UpdatePathRequest {
338 | drive_id: &drive_id,
339 | file_id,
340 | name,
341 | check_name_mode: CheckNameMode::Refuse,
342 | })
343 | .map_err(new_json_serialize_error)?;
344 | let req = Request::post(format!("{}/adrive/v1.0/openFile/update", self.endpoint))
345 | .extension(Operation::Write)
346 | .body(Buffer::from(body))
347 | .map_err(new_request_build_error)?;
348 | self.send(req, token.as_deref()).await?;
349 | Ok(())
350 | }
351 |
352 | pub async fn copy_path(
353 | &self,
354 | file_id: &str,
355 | to_parent_file_id: &str,
356 | auto_rename: bool,
357 | ) -> Result<Buffer> {
358 | let (token, drive_id) = self.get_token_and_drive().await?;
359 | let body = serde_json::to_vec(&CopyPathRequest {
360 | drive_id: &drive_id,
361 | file_id,
362 | to_parent_file_id,
363 | auto_rename,
364 | })
365 | .map_err(new_json_serialize_error)?;
366 | let req = Request::post(format!("{}/adrive/v1.0/openFile/copy", self.endpoint))
367 | .extension(Operation::Copy)
368 | .body(Buffer::from(body))
369 | .map_err(new_request_build_error)?;
370 | self.send(req, token.as_deref()).await
371 | }
372 |
373 | pub async fn delete_path(&self, file_id: &str) -> Result<()> {
374 | let (token, drive_id) = self.get_token_and_drive().await?;
375 | let body = serde_json::to_vec(&FileRequest {
376 | drive_id: &drive_id,
377 | file_id,
378 | })
379 | .map_err(new_json_serialize_error)?;
380 | let req = Request::post(format!("{}/adrive/v1.0/openFile/delete", self.endpoint))
381 | .extension(Operation::Delete)
382 | .body(Buffer::from(body))
383 | .map_err(new_request_build_error)?;
384 | self.send(req, token.as_deref()).await?;
385 | Ok(())
386 | }
387 |
388 | pub async fn list(
389 | &self,
390 | parent_file_id: &str,
391 | limit: Option<usize>,
392 | marker: Option<String>,
393 | ) -> Result<Buffer> {
394 | let (token, drive_id) = self.get_token_and_drive().await?;
395 | let body = serde_json::to_vec(&ListRequest {
396 | drive_id: &drive_id,
397 | parent_file_id,
398 | limit,
399 | marker: marker.as_deref(),
400 | })
401 | .map_err(new_json_serialize_error)?;
402 | let req = Request::post(format!("{}/adrive/v1.0/openFile/list", self.endpoint))
403 | .extension(Operation::List)
404 | .body(Buffer::from(body))
405 | .map_err(new_request_build_error)?;
406 | self.send(req, token.as_deref()).await
407 | }
408 |
409 | pub async fn complete(&self, file_id: &str, upload_id: &str) -> Result<Buffer> {
410 | let (token, drive_id) = self.get_token_and_drive().await?;
411 | let body = serde_json::to_vec(&CompleteRequest {
412 | drive_id: &drive_id,
413 | file_id,
414 | upload_id,
415 | })
416 | .map_err(new_json_serialize_error)?;
417 | let req = Request::post(format!("{}/adrive/v1.0/openFile/complete", self.endpoint))
418 | .extension(Operation::Write)
419 | .body(Buffer::from(body))
420 | .map_err(new_request_build_error)?;
421 | self.send(req, token.as_deref()).await
422 | }
423 |
424 | async fn get_upload_url(
425 | &self,
426 | file_id: &str,
427 | upload_id: &str,
428 | part_number: usize,
429 | ) -> Result<String> {
430 | let (token, drive_id) = self.get_token_and_drive().await?;
431 | let part_info_list = vec![PartInfoItem {
432 | part_number: Some(part_number),
433 | }];
434 | let body = serde_json::to_vec(&GetUploadRequest {
435 | drive_id: &drive_id,
436 | file_id,
437 | upload_id,
438 | part_info_list: Some(part_info_list),
439 | })
440 | .map_err(new_json_serialize_error)?;
441 |
442 | let req = Request::post(format!(
443 | "{}/adrive/v1.0/openFile/getUploadUrl",
444 | self.endpoint
445 | ))
446 | .extension(Operation::Write)
447 | .body(Buffer::from(body))
448 | .map_err(new_request_build_error)?;
449 |
450 | let res = self.send(req, token.as_deref()).await?;
451 |
452 | let mut output: UploadUrlResponse =
453 | serde_json::from_reader(res.reader()).map_err(new_json_deserialize_error)?;
454 |
455 | let Some(upload_url) = output
456 | .part_info_list
457 | .take()
458 | .map(|mut list| list.swap_remove(0))
459 | .map(|part_info| part_info.upload_url)
460 | else {
461 | return Err(Error::new(ErrorKind::Unexpected, "cannot find upload_url"));
462 | };
463 |
464 | Ok(upload_url)
465 | }
466 | pub async fn upload(
467 | &self,
468 | file_id: &str,
469 | upload_id: &str,
470 | part_number: usize,
471 | body: Buffer,
472 | ) -> Result<Buffer> {
473 | let upload_url = self.get_upload_url(file_id, upload_id, part_number).await?;
474 | let req = Request::put(upload_url)
475 | .extension(Operation::Write)
476 | .body(body)
477 | .map_err(new_request_build_error)?;
478 | self.send(req, None).await
479 | }
480 | }
481 |
482 | pub struct RapidUpload {
483 | pub pre_hash: Option<String>,
484 | pub content_hash: Option<String>,
485 | pub proof_code: Option<String>,
486 | }
487 |
488 | #[derive(Debug, Deserialize)]
489 | pub struct RefreshTokenResponse {
490 | pub access_token: String,
491 | pub expires_in: i64,
492 | pub refresh_token: String,
493 | }
494 |
495 | #[derive(Debug, Deserialize)]
496 | pub struct DriveInfoResponse {
497 | pub default_drive_id: String,
498 | pub resource_drive_id: Option<String>,
499 | pub backup_drive_id: Option<String>,
500 | }
501 |
502 | #[derive(Debug, Serialize)]
503 | #[serde(rename_all = "snake_case")]
504 | pub enum CreateType {
505 | File,
506 | Folder,
507 | }
508 |
509 | #[derive(Debug, Serialize)]
510 | #[serde(rename_all = "snake_case")]
511 | pub enum CheckNameMode {
512 | Refuse,
513 | AutoRename,
514 | }
515 |
516 | #[derive(Deserialize)]
517 | pub struct UploadUrlResponse {
518 | pub part_info_list: Option<Vec<PartInfo>>,
519 | }
520 |
521 | #[derive(Deserialize)]
522 | pub struct CreateResponse {
523 | pub file_id: String,
524 | pub upload_id: Option<String>,
525 | pub exist: Option<bool>,
526 | }
527 |
528 | #[derive(Serialize, Deserialize)]
529 | pub struct PartInfo {
530 | pub etag: Option<String>,
531 | pub part_number: usize,
532 | pub part_size: Option<u64>,
533 | pub upload_url: String,
534 | pub content_type: Option<String>,
535 | }
536 |
537 | #[derive(Deserialize)]
538 | pub struct AliyunDriveFileList {
539 | pub items: Vec<AliyunDriveFile>,
540 | pub next_marker: Option<String>,
541 | }
542 |
543 | #[derive(Deserialize)]
544 | pub struct CopyResponse {
545 | pub file_id: String,
546 | }
547 |
548 | #[derive(Deserialize)]
549 | pub struct AliyunDriveFile {
550 | pub file_id: String,
551 | pub parent_file_id: String,
552 | pub name: String,
553 | pub size: Option<u64>,
554 | pub content_type: Option<String>,
555 | #[serde(rename = "type")]
556 | pub path_type: String,
557 | pub updated_at: String,
558 | }
559 |
560 | #[derive(Deserialize)]
561 | pub struct GetDownloadUrlResponse {
562 | pub url: String,
563 | }
564 |
565 | #[derive(Serialize)]
566 | pub struct AccessTokenRequest<'a> {
567 | refresh_token: &'a str,
568 | grant_type: &'a str,
569 | client_id: &'a str,
570 | client_secret: &'a str,
571 | }
572 |
573 | #[derive(Serialize)]
574 | pub struct GetByPathRequest<'a> {
575 | drive_id: &'a str,
576 | file_path: &'a str,
577 | }
578 |
579 | #[derive(Serialize)]
580 | pub struct CreateRequest<'a> {
581 | drive_id: &'a str,
582 | parent_file_id: &'a str,
583 | name: &'a str,
584 | #[serde(rename = "type")]
585 | typ: CreateType,
586 | check_name_mode: CheckNameMode,
587 | size: Option<u64>,
588 | pre_hash: Option<&'a str>,
589 | content_hash: Option<&'a str>,
590 | content_hash_name: Option<&'a str>,
591 | proof_code: Option<&'a str>,
592 | proof_version: Option<&'a str>,
593 | }
594 |
595 | #[derive(Serialize)]
596 | pub struct FileRequest<'a> {
597 | drive_id: &'a str,
598 | file_id: &'a str,
599 | }
600 |
601 | #[derive(Serialize)]
602 | pub struct MovePathRequest<'a> {
603 | drive_id: &'a str,
604 | file_id: &'a str,
605 | to_parent_file_id: &'a str,
606 | check_name_mode: CheckNameMode,
607 | }
608 |
609 | #[derive(Serialize)]
610 | pub struct UpdatePathRequest<'a> {
611 | drive_id: &'a str,
612 | file_id: &'a str,
613 | name: &'a str,
614 | check_name_mode: CheckNameMode,
615 | }
616 |
617 | #[derive(Serialize)]
618 | pub struct CopyPathRequest<'a> {
619 | drive_id: &'a str,
620 | file_id: &'a str,
621 | to_parent_file_id: &'a str,
622 | auto_rename: bool,
623 | }
624 |
625 | #[derive(Serialize)]
626 | pub struct ListRequest<'a> {
627 | drive_id: &'a str,
628 | parent_file_id: &'a str,
629 | limit: Option<usize>,
630 | marker: Option<&'a str>,
631 | }
632 |
633 | #[derive(Serialize)]
634 | pub struct CompleteRequest<'a> {
635 | drive_id: &'a str,
636 | file_id: &'a str,
637 | upload_id: &'a str,
638 | }
639 |
640 | #[derive(Serialize)]
641 | pub struct GetUploadRequest<'a> {
642 | drive_id: &'a str,
643 | file_id: &'a str,
644 | upload_id: &'a str,
645 | part_info_list: Option<Vec<PartInfoItem>>,
646 | }
647 |
648 | #[derive(Serialize)]
649 | pub struct PartInfoItem {
650 | part_number: Option<usize>,
651 | }
652 |
```
--------------------------------------------------------------------------------
/core/layers/fastmetrics/src/lib.rs:
--------------------------------------------------------------------------------
```rust
1 | // Licensed to the Apache Software Foundation (ASF) under one
2 | // or more contributor license agreements. See the NOTICE file
3 | // distributed with this work for additional information
4 | // regarding copyright ownership. The ASF licenses this file
5 | // to you under the Apache License, Version 2.0 (the
6 | // "License"); you may not use this file except in compliance
7 | // with the License. You may obtain a copy of the License at
8 | //
9 | // http://www.apache.org/licenses/LICENSE-2.0
10 | //
11 | // Unless required by applicable law or agreed to in writing,
12 | // software distributed under the License is distributed on an
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 | // KIND, either express or implied. See the License for the
15 | // specific language governing permissions and limitations
16 | // under the License.
17 |
18 | //! Metrics layer (using the [fastmetrics](https://docs.rs/fastmetrics/) crate) implementation for Apache OpenDAL.
19 |
20 | #![cfg_attr(docsrs, feature(doc_cfg))]
21 | #![deny(missing_docs)]
22 |
23 | use fastmetrics::encoder::EncodeLabelSet;
24 | use fastmetrics::encoder::LabelSetEncoder;
25 | use fastmetrics::metrics::counter::Counter;
26 | use fastmetrics::metrics::family::Family;
27 | use fastmetrics::metrics::family::MetricFactory;
28 | use fastmetrics::metrics::gauge::Gauge;
29 | use fastmetrics::metrics::histogram::Histogram;
30 | use fastmetrics::raw::LabelSetSchema;
31 | use fastmetrics::registry::Register;
32 | use fastmetrics::registry::Registry;
33 | use fastmetrics::registry::with_global_registry_mut;
34 | use opendal_core::raw::*;
35 | use opendal_core::*;
36 | use opendal_layer_observe_metrics_common as observe;
37 |
38 | /// Add [fastmetrics](https://docs.rs/fastmetrics/) for every operation.
39 | ///
40 | /// # Examples
41 | ///
42 | /// ## Basic Usage
43 | ///
44 | /// ```no_run
45 | /// # use fastmetrics::format::text;
46 | /// # use log::info;
47 | /// # use opendal_core::services;
48 | /// # use opendal_core::Operator;
49 | /// # use opendal_core::Result;
50 | /// # use opendal_layer_fastmetrics::FastmetricsLayer;
51 | /// #
52 | /// # #[tokio::main]
53 | /// # async fn main() -> Result<()> {
54 | /// let mut registry = fastmetrics::registry::Registry::default();
55 | /// let op = Operator::new(services::Memory::default())?
56 | /// .layer(FastmetricsLayer::builder().register(&mut registry)?)
57 | /// .finish();
58 | ///
59 | /// // Write data into object test.
60 | /// op.write("test", "Hello, World!").await?;
61 | ///
62 | /// // Read data from the object.
63 | /// let bs = op.read("test").await?;
64 | /// info!("content: {}", String::from_utf8_lossy(&bs.to_bytes()));
65 | ///
66 | /// // Get object metadata.
67 | /// let meta = op.stat("test").await?;
68 | /// info!("meta: {:?}", meta);
69 | ///
70 | /// // Export prometheus metrics.
71 | /// let mut output = String::new();
72 | /// text::encode(&mut output, ®istry).unwrap();
73 | /// println!("{}", output);
74 | /// # Ok(())
75 | /// # }
76 | /// ```
77 | /// ## Global Instance
78 | ///
79 | /// `FastmetricsLayer` needs to be registered before instantiation.
80 | ///
81 | /// If there are multiple operators in an application that need the `FastmetricsLayer`, we could
82 | /// instantiate it once and pass it to multiple operators. But we cannot directly call
83 | /// `.layer(FastmetricsLayer::builder().register(&mut registry)?)` for different services, because
84 | /// registering the same metrics multiple times to the same registry will cause register errors.
85 | /// Therefore, we can provide a global instance for the `FastmetricsLayer`.
86 | ///
87 | /// ```no_run
88 | /// # use std::sync::OnceLock;
89 | /// #
90 | /// # use fastmetrics::format::text;
91 | /// # use fastmetrics::registry::with_global_registry;
92 | /// # use log::info;
93 | /// # use opendal_core::services;
94 | /// # use opendal_core::Operator;
95 | /// # use opendal_core::Result;
96 | /// # use opendal_layer_fastmetrics::FastmetricsLayer;
97 | /// #
98 | /// fn global_fastmetrics_layer() -> &'static FastmetricsLayer {
99 | /// static GLOBAL: OnceLock<FastmetricsLayer> = OnceLock::new();
100 | /// GLOBAL.get_or_init(|| {
101 | /// FastmetricsLayer::builder()
102 | /// .register_global()
103 | /// .expect("Failed to register with the global registry")
104 | /// })
105 | /// }
106 | ///
107 | /// # #[tokio::main]
108 | /// # async fn main() -> Result<()> {
109 | /// let op = Operator::new(services::Memory::default())?
110 | /// .layer(global_fastmetrics_layer().clone())
111 | /// .finish();
112 | ///
113 | /// // Write data into object test.
114 | /// op.write("test", "Hello, World!").await?;
115 | ///
116 | /// // Read data from the object.
117 | /// let bs = op.read("test").await?;
118 | /// info!("content: {}", String::from_utf8_lossy(&bs.to_bytes()));
119 | ///
120 | /// // Get object metadata.
121 | /// let meta = op.stat("test").await?;
122 | /// info!("meta: {:?}", meta);
123 | ///
124 | /// // Export prometheus metrics.
125 | /// let mut output = String::new();
126 | /// with_global_registry(|registry| text::encode(&mut output, ®istry).unwrap());
127 | /// println!("{}", output);
128 | /// # Ok(())
129 | /// # }
130 | #[derive(Clone)]
131 | pub struct FastmetricsLayer {
132 | interceptor: FastmetricsInterceptor,
133 | }
134 |
135 | impl FastmetricsLayer {
136 | /// Create a [`FastmetricsLayerBuilder`] to set the configuration of metrics.
137 | pub fn builder() -> FastmetricsLayerBuilder {
138 | FastmetricsLayerBuilder::default()
139 | }
140 | }
141 |
142 | impl<A: Access> Layer<A> for FastmetricsLayer {
143 | type LayeredAccess = observe::MetricsAccessor<A, FastmetricsInterceptor>;
144 |
145 | fn layer(&self, inner: A) -> Self::LayeredAccess {
146 | observe::MetricsLayer::new(self.interceptor.clone()).layer(inner)
147 | }
148 | }
149 |
150 | /// [`FastmetricsLayerBuilder`] is a config builder to build a [`FastmetricsLayer`].
151 | pub struct FastmetricsLayerBuilder {
152 | bytes_buckets: Vec<f64>,
153 | bytes_rate_buckets: Vec<f64>,
154 | entries_buckets: Vec<f64>,
155 | entries_rate_buckets: Vec<f64>,
156 | duration_seconds_buckets: Vec<f64>,
157 | ttfb_buckets: Vec<f64>,
158 | disable_label_root: bool,
159 | }
160 |
161 | impl Default for FastmetricsLayerBuilder {
162 | fn default() -> Self {
163 | Self {
164 | bytes_buckets: observe::DEFAULT_BYTES_BUCKETS.to_vec(),
165 | bytes_rate_buckets: observe::DEFAULT_BYTES_RATE_BUCKETS.to_vec(),
166 | entries_buckets: observe::DEFAULT_ENTRIES_BUCKETS.to_vec(),
167 | entries_rate_buckets: observe::DEFAULT_ENTRIES_RATE_BUCKETS.to_vec(),
168 | duration_seconds_buckets: observe::DEFAULT_DURATION_SECONDS_BUCKETS.to_vec(),
169 | ttfb_buckets: observe::DEFAULT_TTFB_BUCKETS.to_vec(),
170 | disable_label_root: false,
171 | }
172 | }
173 | }
174 |
175 | impl FastmetricsLayerBuilder {
176 | /// Set buckets for bytes related histogram like `operation_bytes`.
177 | pub fn bytes_buckets(mut self, buckets: Vec<f64>) -> Self {
178 | if !buckets.is_empty() {
179 | self.bytes_buckets = buckets;
180 | }
181 | self
182 | }
183 |
184 | /// Set buckets for bytes rate related histogram like `operation_bytes_rate`.
185 | pub fn bytes_rate_buckets(mut self, buckets: Vec<f64>) -> Self {
186 | if !buckets.is_empty() {
187 | self.bytes_rate_buckets = buckets;
188 | }
189 | self
190 | }
191 |
192 | /// Set buckets for entries related histogram like `operation_entries`.
193 | pub fn entries_buckets(mut self, buckets: Vec<f64>) -> Self {
194 | if !buckets.is_empty() {
195 | self.entries_buckets = buckets;
196 | }
197 | self
198 | }
199 |
200 | /// Set buckets for entries rate related histogram like `operation_entries_rate`.
201 | pub fn entries_rate_buckets(mut self, buckets: Vec<f64>) -> Self {
202 | if !buckets.is_empty() {
203 | self.entries_rate_buckets = buckets;
204 | }
205 | self
206 | }
207 |
208 | /// Set buckets for duration seconds related histogram like `operation_duration_seconds`.
209 | pub fn duration_seconds_buckets(mut self, buckets: Vec<f64>) -> Self {
210 | if !buckets.is_empty() {
211 | self.duration_seconds_buckets = buckets;
212 | }
213 | self
214 | }
215 |
216 | /// Set buckets for ttfb related histogram like `operation_ttfb_seconds`.
217 | pub fn ttfb_buckets(mut self, buckets: Vec<f64>) -> Self {
218 | if !buckets.is_empty() {
219 | self.ttfb_buckets = buckets;
220 | }
221 | self
222 | }
223 |
224 | /// The 'root' label might have risks of being high cardinality; users can choose to disable it
225 | /// when they found it's not useful for their metrics.
226 | pub fn disable_label_root(mut self, disable: bool) -> Self {
227 | self.disable_label_root = disable;
228 | self
229 | }
230 |
231 | /// Register the metrics into the registry and return a [`FastmetricsLayer`].
232 | ///
233 | /// # Example
234 | ///
235 | /// ```no_run
236 | /// # use opendal_core::services;
237 | /// # use opendal_core::Operator;
238 | /// # use opendal_core::Result;
239 | /// # use opendal_layer_fastmetrics::FastmetricsLayer;
240 | /// #
241 | /// # fn main() -> Result<()> {
242 | /// let mut registry = fastmetrics::registry::Registry::default();
243 | ///
244 | /// // Pick a builder and configure it.
245 | /// let builder = services::Memory::default();
246 | /// let _ = Operator::new(builder)?
247 | /// .layer(FastmetricsLayer::builder().register(&mut registry)?)
248 | /// .finish();
249 | /// # Ok(())
250 | /// # }
251 | /// ```
252 | pub fn register(self, registry: &mut Registry) -> Result<FastmetricsLayer> {
253 | let operation_bytes = Family::new(HistogramFactory {
254 | buckets: self.bytes_buckets.clone(),
255 | });
256 | let operation_bytes_rate = Family::new(HistogramFactory {
257 | buckets: self.bytes_rate_buckets.clone(),
258 | });
259 | let operation_entries = Family::new(HistogramFactory {
260 | buckets: self.entries_buckets.clone(),
261 | });
262 | let operation_entries_rate = Family::new(HistogramFactory {
263 | buckets: self.entries_rate_buckets.clone(),
264 | });
265 | let operation_duration_seconds = Family::new(HistogramFactory {
266 | buckets: self.duration_seconds_buckets.clone(),
267 | });
268 | let operation_errors_total = Family::default();
269 | let operation_executing = Family::default();
270 | let operation_ttfb_seconds = Family::new(HistogramFactory {
271 | buckets: self.ttfb_buckets.clone(),
272 | });
273 |
274 | let http_executing = Family::default();
275 | let http_request_bytes = Family::new(HistogramFactory {
276 | buckets: self.bytes_buckets.clone(),
277 | });
278 | let http_request_bytes_rate = Family::new(HistogramFactory {
279 | buckets: self.bytes_rate_buckets.clone(),
280 | });
281 | let http_request_duration_seconds = Family::new(HistogramFactory {
282 | buckets: self.duration_seconds_buckets.clone(),
283 | });
284 | let http_response_bytes = Family::new(HistogramFactory {
285 | buckets: self.bytes_buckets.clone(),
286 | });
287 | let http_response_bytes_rate = Family::new(HistogramFactory {
288 | buckets: self.bytes_rate_buckets.clone(),
289 | });
290 | let http_response_duration_seconds = Family::new(HistogramFactory {
291 | buckets: self.duration_seconds_buckets.clone(),
292 | });
293 | let http_connection_errors_total = Family::default();
294 | let http_status_errors_total = Family::default();
295 |
296 | let interceptor = FastmetricsInterceptor {
297 | operation_bytes,
298 | operation_bytes_rate,
299 | operation_entries,
300 | operation_entries_rate,
301 | operation_duration_seconds,
302 | operation_errors_total,
303 | operation_executing,
304 | operation_ttfb_seconds,
305 |
306 | http_executing,
307 | http_request_bytes,
308 | http_request_bytes_rate,
309 | http_request_duration_seconds,
310 | http_response_bytes,
311 | http_response_bytes_rate,
312 | http_response_duration_seconds,
313 | http_connection_errors_total,
314 | http_status_errors_total,
315 |
316 | disable_label_root: self.disable_label_root,
317 | };
318 | interceptor
319 | .register(registry)
320 | .map_err(|err| Error::new(ErrorKind::Unexpected, err.to_string()).set_source(err))?;
321 |
322 | Ok(FastmetricsLayer { interceptor })
323 | }
324 |
325 | /// Register the metrics into the global registry and return a [`FastmetricsLayer`].
326 | ///
327 | /// # Example
328 | ///
329 | /// ```no_run
330 | /// # use opendal_core::services;
331 | /// # use opendal_core::Operator;
332 | /// # use opendal_core::Result;
333 | /// # use opendal_layer_fastmetrics::FastmetricsLayer;
334 | /// #
335 | /// # fn main() -> Result<()> {
336 | /// // Pick a builder and configure it.
337 | /// let builder = services::Memory::default();
338 | /// let _ = Operator::new(builder)?
339 | /// .layer(FastmetricsLayer::builder().register_global()?)
340 | /// .finish();
341 | /// # Ok(())
342 | /// # }
343 | /// ```
344 | pub fn register_global(self) -> Result<FastmetricsLayer> {
345 | with_global_registry_mut(|registry| self.register(registry))
346 | }
347 | }
348 |
349 | #[derive(Clone)]
350 | struct HistogramFactory {
351 | buckets: Vec<f64>,
352 | }
353 |
354 | impl MetricFactory<Histogram> for HistogramFactory {
355 | fn new_metric(&self) -> Histogram {
356 | Histogram::new(self.buckets.iter().cloned())
357 | }
358 | }
359 |
360 | #[doc(hidden)]
361 | #[derive(Clone, Debug)]
362 | pub struct FastmetricsInterceptor {
363 | operation_bytes: Family<OperationLabels, Histogram, HistogramFactory>,
364 | operation_bytes_rate: Family<OperationLabels, Histogram, HistogramFactory>,
365 | operation_entries: Family<OperationLabels, Histogram, HistogramFactory>,
366 | operation_entries_rate: Family<OperationLabels, Histogram, HistogramFactory>,
367 | operation_duration_seconds: Family<OperationLabels, Histogram, HistogramFactory>,
368 | operation_errors_total: Family<OperationLabels, Counter>,
369 | operation_executing: Family<OperationLabels, Gauge>,
370 | operation_ttfb_seconds: Family<OperationLabels, Histogram, HistogramFactory>,
371 |
372 | http_executing: Family<OperationLabels, Gauge>,
373 | http_request_bytes: Family<OperationLabels, Histogram, HistogramFactory>,
374 | http_request_bytes_rate: Family<OperationLabels, Histogram, HistogramFactory>,
375 | http_request_duration_seconds: Family<OperationLabels, Histogram, HistogramFactory>,
376 | http_response_bytes: Family<OperationLabels, Histogram, HistogramFactory>,
377 | http_response_bytes_rate: Family<OperationLabels, Histogram, HistogramFactory>,
378 | http_response_duration_seconds: Family<OperationLabels, Histogram, HistogramFactory>,
379 | http_connection_errors_total: Family<OperationLabels, Counter>,
380 | http_status_errors_total: Family<OperationLabels, Counter>,
381 |
382 | disable_label_root: bool,
383 | }
384 |
385 | impl Register for FastmetricsInterceptor {
386 | fn register(&self, registry: &mut Registry) -> fastmetrics::error::Result<()> {
387 | macro_rules! register_metrics {
388 | ($($field:ident => $value:expr),* $(,)?) => {
389 | $(
390 | {
391 | let ((name, unit), help) = ($value.name_with_unit(), $value.help());
392 | registry.register_metric(name, help, unit, self.$field.clone())?;
393 | }
394 | )*
395 | };
396 | }
397 |
398 | register_metrics! {
399 | // Operation metrics
400 | operation_bytes => observe::MetricValue::OperationBytes(0),
401 | operation_bytes_rate => observe::MetricValue::OperationBytesRate(0.0),
402 | operation_entries => observe::MetricValue::OperationEntries(0),
403 | operation_entries_rate => observe::MetricValue::OperationEntriesRate(0.0),
404 | operation_duration_seconds => observe::MetricValue::OperationDurationSeconds(Duration::default()),
405 | operation_errors_total => observe::MetricValue::OperationErrorsTotal,
406 | operation_executing => observe::MetricValue::OperationExecuting(0),
407 | operation_ttfb_seconds => observe::MetricValue::OperationTtfbSeconds(Duration::default()),
408 |
409 | // HTTP metrics
410 | http_executing => observe::MetricValue::HttpExecuting(0),
411 | http_request_bytes => observe::MetricValue::HttpRequestBytes(0),
412 | http_request_bytes_rate => observe::MetricValue::HttpRequestBytesRate(0.0),
413 | http_request_duration_seconds => observe::MetricValue::HttpRequestDurationSeconds(Duration::default()),
414 | http_response_bytes => observe::MetricValue::HttpResponseBytes(0),
415 | http_response_bytes_rate => observe::MetricValue::HttpResponseBytesRate(0.0),
416 | http_response_duration_seconds => observe::MetricValue::HttpResponseDurationSeconds(Duration::default()),
417 | http_connection_errors_total => observe::MetricValue::HttpConnectionErrorsTotal,
418 | http_status_errors_total => observe::MetricValue::HttpStatusErrorsTotal,
419 | }
420 |
421 | Ok(())
422 | }
423 | }
424 |
425 | impl observe::MetricsIntercept for FastmetricsInterceptor {
426 | fn observe(&self, labels: observe::MetricLabels, value: observe::MetricValue) {
427 | let labels = OperationLabels {
428 | labels,
429 | disable_label_root: self.disable_label_root,
430 | };
431 | match value {
432 | observe::MetricValue::OperationBytes(v) => {
433 | self.operation_bytes
434 | .with_or_new(&labels, |hist| hist.observe(v as f64));
435 | }
436 | observe::MetricValue::OperationBytesRate(v) => {
437 | self.operation_bytes_rate
438 | .with_or_new(&labels, |hist| hist.observe(v));
439 | }
440 | observe::MetricValue::OperationEntries(v) => {
441 | self.operation_entries
442 | .with_or_new(&labels, |hist| hist.observe(v as f64));
443 | }
444 | observe::MetricValue::OperationEntriesRate(v) => {
445 | self.operation_entries_rate
446 | .with_or_new(&labels, |hist| hist.observe(v));
447 | }
448 | observe::MetricValue::OperationDurationSeconds(v) => {
449 | self.operation_duration_seconds
450 | .with_or_new(&labels, |hist| hist.observe(v.as_secs_f64()));
451 | }
452 | observe::MetricValue::OperationErrorsTotal => {
453 | self.operation_errors_total
454 | .with_or_new(&labels, |counter| counter.inc());
455 | }
456 | observe::MetricValue::OperationExecuting(v) => {
457 | self.operation_executing
458 | .with_or_new(&labels, |gauge| gauge.inc_by(v as i64));
459 | }
460 | observe::MetricValue::OperationTtfbSeconds(v) => {
461 | self.operation_ttfb_seconds
462 | .with_or_new(&labels, |hist| hist.observe(v.as_secs_f64()));
463 | }
464 |
465 | observe::MetricValue::HttpExecuting(v) => {
466 | self.http_executing
467 | .with_or_new(&labels, |gauge| gauge.inc_by(v as i64));
468 | }
469 | observe::MetricValue::HttpRequestBytes(v) => {
470 | self.http_request_bytes
471 | .with_or_new(&labels, |hist| hist.observe(v as f64));
472 | }
473 | observe::MetricValue::HttpRequestBytesRate(v) => {
474 | self.http_request_bytes_rate
475 | .with_or_new(&labels, |hist| hist.observe(v));
476 | }
477 | observe::MetricValue::HttpRequestDurationSeconds(v) => {
478 | self.http_request_duration_seconds
479 | .with_or_new(&labels, |hist| hist.observe(v.as_secs_f64()));
480 | }
481 | observe::MetricValue::HttpResponseBytes(v) => {
482 | self.http_response_bytes
483 | .with_or_new(&labels, |hist| hist.observe(v as f64));
484 | }
485 | observe::MetricValue::HttpResponseBytesRate(v) => {
486 | self.http_response_bytes_rate
487 | .with_or_new(&labels, |hist| hist.observe(v));
488 | }
489 | observe::MetricValue::HttpResponseDurationSeconds(v) => {
490 | self.http_response_duration_seconds
491 | .with_or_new(&labels, |hist| hist.observe(v.as_secs_f64()));
492 | }
493 | observe::MetricValue::HttpConnectionErrorsTotal => {
494 | self.http_connection_errors_total
495 | .with_or_new(&labels, |counter| counter.inc());
496 | }
497 | observe::MetricValue::HttpStatusErrorsTotal => {
498 | self.http_status_errors_total
499 | .with_or_new(&labels, |counter| counter.inc());
500 | }
501 | _ => {}
502 | };
503 | }
504 | }
505 |
506 | #[derive(Clone, Debug, PartialEq, Eq, Hash)]
507 | struct OperationLabels {
508 | labels: observe::MetricLabels,
509 | disable_label_root: bool,
510 | }
511 |
512 | impl LabelSetSchema for OperationLabels {
513 | fn names() -> Option<&'static [&'static str]> {
514 | static NAMES: &[&str] = &[
515 | observe::LABEL_SCHEME,
516 | observe::LABEL_NAMESPACE,
517 | observe::LABEL_ROOT,
518 | observe::LABEL_OPERATION,
519 | observe::LABEL_ERROR,
520 | observe::LABEL_STATUS_CODE,
521 | ];
522 | Some(NAMES)
523 | }
524 | }
525 |
526 | impl EncodeLabelSet for OperationLabels {
527 | fn encode(&self, encoder: &mut dyn LabelSetEncoder) -> fastmetrics::error::Result<()> {
528 | encoder.encode(&(observe::LABEL_SCHEME, self.labels.scheme))?;
529 | encoder.encode(&(observe::LABEL_NAMESPACE, self.labels.namespace.as_ref()))?;
530 | if !self.disable_label_root {
531 | encoder.encode(&(observe::LABEL_ROOT, self.labels.root.as_ref()))?;
532 | }
533 | encoder.encode(&(observe::LABEL_OPERATION, self.labels.operation))?;
534 | if let Some(error) = &self.labels.error {
535 | encoder.encode(&(observe::LABEL_ERROR, error.into_static()))?;
536 | }
537 | if let Some(code) = &self.labels.status_code {
538 | encoder.encode(&(observe::LABEL_STATUS_CODE, code.as_str()))?;
539 | }
540 | Ok(())
541 | }
542 | }
543 |
```
--------------------------------------------------------------------------------
/core/services/azure-common/src/lib.rs:
--------------------------------------------------------------------------------
```rust
1 | // Licensed to the Apache Software Foundation (ASF) under one
2 | // or more contributor license agreements. See the NOTICE file
3 | // distributed with this work for additional information
4 | // regarding copyright ownership. The ASF licenses this file
5 | // to you under the Apache License, Version 2.0 (the
6 | // "License"); you may not use this file except in compliance
7 | // with the License. You may obtain a copy of the License at
8 | //
9 | // http://www.apache.org/licenses/LICENSE-2.0
10 | //
11 | // Unless required by applicable law or agreed to in writing,
12 | // software distributed under the License is distributed on an
13 | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 | // KIND, either express or implied. See the License for the
15 | // specific language governing permissions and limitations
16 | // under the License.
17 |
18 | #![cfg_attr(docsrs, feature(doc_cfg))]
19 | #![deny(missing_docs)]
20 | //! Azure Storage helpers.
21 | //!
22 | //! This module provides utilities and shared abstractions for services built
23 | //! on Azure Storage, such as Azure Blob Storage (`services-azblob`) or
24 | //! Azure Data Lake Storage (`services-azdls`).
25 |
26 | use std::collections::HashMap;
27 |
28 | use http::Uri;
29 | use http::response::Parts;
30 | use opendal_core::{Error, ErrorKind, Result};
31 | use reqsign::{AzureStorageConfig, AzureStorageCredential};
32 |
33 | /// Parses an [Azure connection string][1] into a configuration object.
34 | ///
35 | /// The connection string doesn't have to specify all required parameters
36 | /// because the user is still allowed to set them later directly on the object.
37 | ///
38 | /// The function takes an AzureStorageService parameter because it determines
39 | /// the fields used to parse the endpoint.
40 | ///
41 | /// [1]: https://learn.microsoft.com/en-us/azure/storage/common/storage-configure-connection-string
42 | pub fn azure_config_from_connection_string(
43 | conn_str: &str,
44 | storage: AzureStorageService,
45 | ) -> Result<AzureStorageConfig> {
46 | let key_values = parse_connection_string(conn_str)?;
47 |
48 | if storage == AzureStorageService::Blob {
49 | // Try to read development storage configuration.
50 | if let Some(development_config) = collect_blob_development_config(&key_values, &storage) {
51 | return Ok(AzureStorageConfig {
52 | account_name: Some(development_config.account_name),
53 | account_key: Some(development_config.account_key),
54 | endpoint: Some(development_config.endpoint),
55 | ..Default::default()
56 | });
57 | }
58 | }
59 |
60 | let mut config = AzureStorageConfig {
61 | account_name: key_values.get("AccountName").cloned(),
62 | endpoint: collect_endpoint(&key_values, &storage)?,
63 | ..Default::default()
64 | };
65 |
66 | if let Some(creds) = collect_credentials(&key_values) {
67 | set_credentials(&mut config, creds);
68 | };
69 |
70 | Ok(config)
71 | }
72 |
73 | /// The service that a connection string refers to. The type influences
74 | /// interpretation of endpoint-related fields during parsing.
75 | #[derive(PartialEq)]
76 | pub enum AzureStorageService {
77 | /// Azure Blob Storage.
78 | Blob,
79 |
80 | /// Azure File Storage.
81 | File,
82 |
83 | /// Azure Data Lake Storage Gen2.
84 | /// Backed by Blob Storage but exposed through a different endpoint (`dfs`).
85 | Adls,
86 | }
87 |
88 | /// Try to extract the storage account name from an Azure endpoint.
89 | ///
90 | /// Returns `None` if the endpoint doesn't match known Azure Storage suffixes.
91 | pub fn azure_account_name_from_endpoint(endpoint: &str) -> Option<String> {
92 | /// Known Azure Storage endpoint suffixes.
93 | const KNOWN_ENDPOINT_SUFFIXES: &[&str] = &[
94 | "core.windows.net", // Azure public cloud
95 | "core.usgovcloudapi.net", // Azure US Government
96 | "core.chinacloudapi.cn", // Azure China
97 | ];
98 |
99 | let endpoint: &str = endpoint
100 | .strip_prefix("http://")
101 | .or_else(|| endpoint.strip_prefix("https://"))
102 | .unwrap_or(endpoint);
103 |
104 | let (account_name, service_endpoint) = endpoint.split_once('.')?;
105 | let (_storage_service, endpoint_suffix) = service_endpoint.split_once('.')?;
106 |
107 | if KNOWN_ENDPOINT_SUFFIXES.contains(&endpoint_suffix.trim_end_matches('/')) {
108 | Some(account_name.to_string())
109 | } else {
110 | None
111 | }
112 | }
113 |
114 | /// Takes a semicolon-delimited Azure Storage connection string and returns
115 | /// key-value pairs split from it.
116 | fn parse_connection_string(conn_str: &str) -> Result<HashMap<String, String>> {
117 | conn_str
118 | .trim()
119 | .replace("\n", "")
120 | .split(';')
121 | .filter(|&field| !field.is_empty())
122 | .map(|field| {
123 | let (key, value) = field.trim().split_once('=').ok_or(Error::new(
124 | ErrorKind::ConfigInvalid,
125 | format!("Invalid connection string, expected '=' in field: {field}"),
126 | ))?;
127 | Ok((key.to_string(), value.to_string()))
128 | })
129 | .collect()
130 | }
131 |
132 | fn collect_blob_development_config(
133 | key_values: &HashMap<String, String>,
134 | storage: &AzureStorageService,
135 | ) -> Option<DevelopmentStorageConfig> {
136 | debug_assert!(
137 | storage == &AzureStorageService::Blob,
138 | "Azurite Development Storage only supports Blob Storage"
139 | );
140 |
141 | // Azurite defaults.
142 | const AZURITE_DEFAULT_STORAGE_ACCOUNT_NAME: &str = "devstoreaccount1";
143 | const AZURITE_DEFAULT_STORAGE_ACCOUNT_KEY: &str =
144 | "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
145 |
146 | const AZURITE_DEFAULT_BLOB_URI: &str = "http://127.0.0.1:10000";
147 |
148 | if key_values.get("UseDevelopmentStorage") != Some(&"true".to_string()) {
149 | return None; // Not using development storage
150 | }
151 |
152 | let account_name = key_values
153 | .get("AccountName")
154 | .cloned()
155 | .unwrap_or(AZURITE_DEFAULT_STORAGE_ACCOUNT_NAME.to_string());
156 | let account_key = key_values
157 | .get("AccountKey")
158 | .cloned()
159 | .unwrap_or(AZURITE_DEFAULT_STORAGE_ACCOUNT_KEY.to_string());
160 | let development_proxy_uri = key_values
161 | .get("DevelopmentStorageProxyUri")
162 | .cloned()
163 | .unwrap_or(AZURITE_DEFAULT_BLOB_URI.to_string());
164 |
165 | Some(DevelopmentStorageConfig {
166 | endpoint: format!("{development_proxy_uri}/{account_name}"),
167 | account_name,
168 | account_key,
169 | })
170 | }
171 |
172 | /// Helper struct to hold development storage aka Azurite configuration.
173 | struct DevelopmentStorageConfig {
174 | account_name: String,
175 | account_key: String,
176 | endpoint: String,
177 | }
178 |
179 | /// Parses an endpoint from the key-value pairs if possible.
180 | ///
181 | /// Users are still able to later supplement configuration with an endpoint,
182 | /// so endpoint-related fields aren't enforced.
183 | fn collect_endpoint(
184 | key_values: &HashMap<String, String>,
185 | storage: &AzureStorageService,
186 | ) -> Result<Option<String>> {
187 | match storage {
188 | AzureStorageService::Blob => collect_or_build_endpoint(key_values, "BlobEndpoint", "blob"),
189 | AzureStorageService::File => collect_or_build_endpoint(key_values, "FileEndpoint", "file"),
190 | AzureStorageService::Adls => {
191 | // ADLS doesn't have a dedicated endpoint field and we can only
192 | // build it from parts.
193 | if let Some(dfs_endpoint) = collect_endpoint_from_parts(key_values, "dfs")? {
194 | Ok(Some(dfs_endpoint.clone()))
195 | } else {
196 | Ok(None)
197 | }
198 | }
199 | }
200 | }
201 |
202 | fn collect_credentials(key_values: &HashMap<String, String>) -> Option<AzureStorageCredential> {
203 | if let Some(sas_token) = key_values.get("SharedAccessSignature") {
204 | Some(AzureStorageCredential::SharedAccessSignature(
205 | sas_token.clone(),
206 | ))
207 | } else if let (Some(account_name), Some(account_key)) =
208 | (key_values.get("AccountName"), key_values.get("AccountKey"))
209 | {
210 | Some(AzureStorageCredential::SharedKey(
211 | account_name.clone(),
212 | account_key.clone(),
213 | ))
214 | } else {
215 | // We default to no authentication. This is not an error because e.g.
216 | // Azure Active Directory configuration is typically not passed via
217 | // connection strings.
218 | // Users may also set credentials manually on the configuration.
219 | None
220 | }
221 | }
222 |
223 | fn set_credentials(config: &mut AzureStorageConfig, creds: AzureStorageCredential) {
224 | match creds {
225 | AzureStorageCredential::SharedAccessSignature(sas_token) => {
226 | config.sas_token = Some(sas_token);
227 | }
228 | AzureStorageCredential::SharedKey(account_name, account_key) => {
229 | config.account_name = Some(account_name);
230 | config.account_key = Some(account_key);
231 | }
232 | AzureStorageCredential::BearerToken(_, _) => {
233 | // Bearer tokens shouldn't be passed via connection strings.
234 | }
235 | }
236 | }
237 |
238 | fn collect_or_build_endpoint(
239 | key_values: &HashMap<String, String>,
240 | endpoint_key: &str,
241 | service_name: &str,
242 | ) -> Result<Option<String>> {
243 | if let Some(endpoint) = key_values.get(endpoint_key) {
244 | Ok(Some(endpoint.clone()))
245 | } else if let Some(built_endpoint) = collect_endpoint_from_parts(key_values, service_name)? {
246 | Ok(Some(built_endpoint.clone()))
247 | } else {
248 | Ok(None)
249 | }
250 | }
251 |
252 | fn collect_endpoint_from_parts(
253 | key_values: &HashMap<String, String>,
254 | storage_endpoint_name: &str,
255 | ) -> Result<Option<String>> {
256 | let (account_name, endpoint_suffix) = match (
257 | key_values.get("AccountName"),
258 | key_values.get("EndpointSuffix"),
259 | ) {
260 | (Some(name), Some(suffix)) => (name, suffix),
261 | _ => return Ok(None), // Can't build an endpoint if one of them is missing
262 | };
263 |
264 | let protocol = key_values
265 | .get("DefaultEndpointsProtocol")
266 | .map(String::as_str)
267 | .unwrap_or("https"); // Default to HTTPS if not specified
268 | if protocol != "http" && protocol != "https" {
269 | return Err(Error::new(
270 | ErrorKind::ConfigInvalid,
271 | format!("Invalid DefaultEndpointsProtocol: {protocol}"),
272 | ));
273 | }
274 |
275 | Ok(Some(format!(
276 | "{protocol}://{account_name}.{storage_endpoint_name}.{endpoint_suffix}"
277 | )))
278 | }
279 |
280 | /// Add response context to error.
281 | ///
282 | /// This helper function will:
283 | ///
284 | /// - remove sensitive or useless headers from parts.
285 | /// - fetch uri if parts extensions contains `Uri`.
286 | /// - censor sensitive SAS URI query parameters
287 | pub fn with_azure_error_response_context(mut err: Error, mut parts: Parts) -> Error {
288 | if let Some(uri) = parts.extensions.get::<Uri>() {
289 | err = err.with_context("uri", censor_sas_uri(uri));
290 | }
291 |
292 | // The following headers may contains sensitive information.
293 | parts.headers.remove("Set-Cookie");
294 | parts.headers.remove("WWW-Authenticate");
295 | parts.headers.remove("Proxy-Authenticate");
296 |
297 | err = err.with_context("response", format!("{parts:?}"));
298 |
299 | err
300 | }
301 |
302 | fn censor_sas_uri(uri: &Uri) -> String {
303 | if let Some(query) = uri.query() {
304 | // There is a large set of query parameters specified for SAS URIs.
305 | // Some of them may be useful to an attacker, but the most important part is the signature.
306 | // Without a signature, an attacker will not be able to replay the request.
307 | // For now, just remove the signature.
308 | //
309 | // https://learn.microsoft.com/en-us/rest/api/storageservices/create-account-sas
310 | // https://learn.microsoft.com/en-us/rest/api/storageservices/create-service-sas
311 | // https://learn.microsoft.com/en-us/rest/api/storageservices/create-user-delegation-sas
312 | //
313 | let path = uri.path();
314 | let new_query: String = query
315 | .split("&")
316 | .filter(|p| !p.starts_with("sig="))
317 | .collect::<Vec<_>>()
318 | .join("&");
319 | let mut parts = uri.clone().into_parts();
320 | parts.path_and_query = Some(format!("{path}?{new_query}").try_into().unwrap());
321 | Uri::from_parts(parts).unwrap().to_string()
322 | } else {
323 | uri.to_string()
324 | }
325 | }
326 |
327 | #[cfg(test)]
328 | mod tests {
329 | use http::Uri;
330 | use reqsign::AzureStorageConfig;
331 |
332 | use super::censor_sas_uri;
333 |
334 | use super::{
335 | AzureStorageService, azure_account_name_from_endpoint, azure_config_from_connection_string,
336 | };
337 |
338 | #[test]
339 | fn test_azure_config_from_connection_string() {
340 | #[allow(unused_mut)]
341 | let mut test_cases = vec![
342 | ("minimal fields",
343 | (AzureStorageService::Blob, "BlobEndpoint=https://testaccount.blob.core.windows.net/"),
344 | Some(AzureStorageConfig{
345 | endpoint: Some("https://testaccount.blob.core.windows.net/".to_string()),
346 | ..Default::default()
347 | }),
348 | ),
349 | ("basic creds and blob endpoint",
350 | (AzureStorageService::Blob, "AccountName=testaccount;AccountKey=testkey;BlobEndpoint=https://testaccount.blob.core.windows.net/"),
351 | Some(AzureStorageConfig{
352 | account_name: Some("testaccount".to_string()),
353 | account_key: Some("testkey".to_string()),
354 | endpoint: Some("https://testaccount.blob.core.windows.net/".to_string()),
355 | ..Default::default()
356 | }),
357 | ),
358 | ("SAS token",
359 | (AzureStorageService::Blob, "SharedAccessSignature=blablabla"),
360 | Some(AzureStorageConfig{
361 | sas_token: Some("blablabla".to_string()),
362 | ..Default::default()
363 | }),
364 | ),
365 | ("endpoint from parts",
366 | (AzureStorageService::Blob, "AccountName=testaccount;EndpointSuffix=core.windows.net;DefaultEndpointsProtocol=https"),
367 | Some(AzureStorageConfig{
368 | endpoint: Some("https://testaccount.blob.core.windows.net".to_string()),
369 | account_name: Some("testaccount".to_string()),
370 | ..Default::default()
371 | }),
372 | ),
373 | ("endpoint from parts and no protocol",
374 | (AzureStorageService::Blob, "AccountName=testaccount;EndpointSuffix=core.windows.net"),
375 | Some(AzureStorageConfig{
376 | // Defaults to https
377 | endpoint: Some("https://testaccount.blob.core.windows.net".to_string()),
378 | account_name: Some("testaccount".to_string()),
379 | ..Default::default()
380 | }),
381 | ),
382 | ("prefers sas over key",
383 | (AzureStorageService::Blob, "AccountName=testaccount;AccountKey=testkey;SharedAccessSignature=sas_token"),
384 | Some(AzureStorageConfig{
385 | sas_token: Some("sas_token".to_string()),
386 | account_name: Some("testaccount".to_string()),
387 | ..Default::default()
388 | }),
389 | ),
390 | ("development storage",
391 | (AzureStorageService::Blob, "UseDevelopmentStorage=true",),
392 | Some(AzureStorageConfig{
393 | account_name: Some("devstoreaccount1".to_string()),
394 | account_key: Some("Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==".to_string()),
395 | endpoint: Some("http://127.0.0.1:10000/devstoreaccount1".to_string()),
396 | ..Default::default()
397 | }),
398 | ),
399 | ("development storage with custom account values",
400 | (AzureStorageService::Blob, "UseDevelopmentStorage=true;AccountName=myAccount;AccountKey=myKey"),
401 | Some(AzureStorageConfig {
402 | endpoint: Some("http://127.0.0.1:10000/myAccount".to_string()),
403 | account_name: Some("myAccount".to_string()),
404 | account_key: Some("myKey".to_string()),
405 | ..Default::default()
406 | }),
407 | ),
408 | ("development storage with custom uri",
409 | (AzureStorageService::Blob, "UseDevelopmentStorage=true;DevelopmentStorageProxyUri=http://127.0.0.1:12345"),
410 | Some(AzureStorageConfig {
411 | endpoint: Some("http://127.0.0.1:12345/devstoreaccount1".to_string()),
412 | account_name: Some("devstoreaccount1".to_string()),
413 | account_key: Some("Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==".to_string()),
414 | ..Default::default()
415 | }),
416 | ),
417 | ("unknown key is ignored",
418 | (AzureStorageService::Blob, "SomeUnknownKey=123;BlobEndpoint=https://testaccount.blob.core.windows.net/"),
419 | Some(AzureStorageConfig{
420 | endpoint: Some("https://testaccount.blob.core.windows.net/".to_string()),
421 | ..Default::default()
422 | }),
423 | ),
424 | ("leading and trailing `;`",
425 | (AzureStorageService::Blob, ";AccountName=testaccount;"),
426 | Some(AzureStorageConfig {
427 | account_name: Some("testaccount".to_string()),
428 | ..Default::default()
429 | }),
430 | ),
431 | ("line breaks",
432 | (AzureStorageService::Blob, r#"
433 | AccountName=testaccount;
434 | AccountKey=testkey;
435 | EndpointSuffix=core.windows.net;
436 | DefaultEndpointsProtocol=https"#),
437 | Some(AzureStorageConfig {
438 | account_name: Some("testaccount".to_string()),
439 | account_key: Some("testkey".to_string()),
440 | endpoint: Some("https://testaccount.blob.core.windows.net".to_string()),
441 | ..Default::default()
442 | }),
443 | ),
444 | ("missing equals",
445 | (AzureStorageService::Blob, "AccountNameexample;AccountKey=example;EndpointSuffix=core.windows.net;DefaultEndpointsProtocol=https",),
446 | None, // This should fail due to missing '='
447 | ),
448 | ("with invalid protocol",
449 | (AzureStorageService::Blob, "DefaultEndpointsProtocol=ftp;AccountName=example;EndpointSuffix=core.windows.net",),
450 | None, // This should fail due to invalid protocol
451 | ),
452 | ];
453 |
454 | test_cases.push(
455 | ("adls endpoint from parts",
456 | (AzureStorageService::Adls, "AccountName=testaccount;EndpointSuffix=core.windows.net;DefaultEndpointsProtocol=https"),
457 | Some(AzureStorageConfig{
458 | account_name: Some("testaccount".to_string()),
459 | endpoint: Some("https://testaccount.dfs.core.windows.net".to_string()),
460 | ..Default::default()
461 | }),
462 | )
463 | );
464 |
465 | test_cases.extend(vec![
466 | (
467 | "file endpoint from field",
468 | (
469 | AzureStorageService::File,
470 | "FileEndpoint=https://testaccount.file.core.windows.net",
471 | ),
472 | Some(AzureStorageConfig {
473 | endpoint: Some("https://testaccount.file.core.windows.net".to_string()),
474 | ..Default::default()
475 | }),
476 | ),
477 | (
478 | "file endpoint from parts",
479 | (
480 | AzureStorageService::File,
481 | "AccountName=testaccount;EndpointSuffix=core.windows.net",
482 | ),
483 | Some(AzureStorageConfig {
484 | account_name: Some("testaccount".to_string()),
485 | endpoint: Some("https://testaccount.file.core.windows.net".to_string()),
486 | ..Default::default()
487 | }),
488 | ),
489 | ]);
490 |
491 | test_cases.push((
492 | "azdls development storage",
493 | (AzureStorageService::Adls, "UseDevelopmentStorage=true"),
494 | Some(AzureStorageConfig::default()), // Azurite doesn't support ADLSv2, so we ignore this case
495 | ));
496 |
497 | for (name, (storage, conn_str), expected) in test_cases {
498 | let actual = azure_config_from_connection_string(conn_str, storage);
499 |
500 | if let Some(expected) = expected {
501 | assert_azure_storage_config_eq(&actual.expect(name), &expected, name);
502 | } else {
503 | assert!(actual.is_err(), "Expected error for case: {name}");
504 | }
505 | }
506 | }
507 |
508 | #[test]
509 | fn test_azure_account_name_from_endpoint() {
510 | let test_cases = vec![
511 | ("https://account.blob.core.windows.net", Some("account")),
512 | (
513 | "https://account.blob.core.usgovcloudapi.net",
514 | Some("account"),
515 | ),
516 | (
517 | "https://account.blob.core.chinacloudapi.cn",
518 | Some("account"),
519 | ),
520 | ("https://account.dfs.core.windows.net", Some("account")),
521 | ("https://account.blob.core.windows.net/", Some("account")),
522 | ("https://account.blob.unknown.suffix.com", None),
523 | ("http://blob.core.windows.net", None),
524 | ];
525 | for (endpoint, expected_account_name) in test_cases {
526 | let account_name = azure_account_name_from_endpoint(endpoint);
527 | assert_eq!(
528 | account_name,
529 | expected_account_name.map(|s| s.to_string()),
530 | "Endpoint: {endpoint}"
531 | );
532 | }
533 | }
534 |
535 | #[test]
536 | fn test_azure_uri_context_removes_sig() {
537 | let uri: Uri = "https://foo.bar/path?foo=foo&sig=SENSITIVE&bar=bar"
538 | .parse()
539 | .unwrap();
540 | let expected = "https://foo.bar/path?foo=foo&bar=bar";
541 | assert_eq!(censor_sas_uri(&uri), expected);
542 | }
543 |
544 | /// Helper function to compare AzureStorageConfig fields manually.
545 | fn assert_azure_storage_config_eq(
546 | actual: &AzureStorageConfig,
547 | expected: &AzureStorageConfig,
548 | name: &str,
549 | ) {
550 | assert_eq!(
551 | actual.account_name, expected.account_name,
552 | "account_name mismatch: {name}"
553 | );
554 | assert_eq!(
555 | actual.account_key, expected.account_key,
556 | "account_key mismatch: {name}"
557 | );
558 | assert_eq!(
559 | actual.endpoint, expected.endpoint,
560 | "endpoint mismatch: {name}"
561 | );
562 | assert_eq!(
563 | actual.sas_token, expected.sas_token,
564 | "sas_token mismatch: {name}"
565 | );
566 | }
567 | }
568 |
```