-
Notifications
You must be signed in to change notification settings - Fork 148
Add PaginatedKVStore support to VssStore #864
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,7 +24,7 @@ use bitcoin::Network; | |
| use lightning::impl_writeable_tlv_based_enum; | ||
| use lightning::io::{self, Error, ErrorKind}; | ||
| use lightning::sign::{EntropySource as LdkEntropySource, RandomBytes}; | ||
| use lightning::util::persist::KVStore; | ||
| use lightning::util::persist::{KVStore, PageToken, PaginatedKVStore, PaginatedListResponse}; | ||
| use lightning::util::ser::{Readable, Writeable}; | ||
| use prost::Message; | ||
| use vss_client::client::VssClient; | ||
|
|
@@ -293,6 +293,32 @@ impl KVStore for VssStore { | |
| } | ||
| } | ||
|
|
||
| impl PaginatedKVStore for VssStore { | ||
| fn list_paginated( | ||
| &self, primary_namespace: &str, secondary_namespace: &str, page_token: Option<PageToken>, | ||
| ) -> impl Future<Output = Result<PaginatedListResponse, io::Error>> + 'static + Send { | ||
| let primary_namespace = primary_namespace.to_string(); | ||
| let secondary_namespace = secondary_namespace.to_string(); | ||
| let inner = Arc::clone(&self.inner); | ||
| let runtime = self.internal_runtime(); | ||
| async move { | ||
| let task = runtime.spawn(async move { | ||
| inner | ||
| .list_paginated_internal( | ||
| &inner.async_client, | ||
| primary_namespace, | ||
| secondary_namespace, | ||
| page_token, | ||
| ) | ||
| .await | ||
| }); | ||
| task.await.map_err(|e| { | ||
| io::Error::new(io::ErrorKind::Other, format!("VSS runtime task failed: {}", e)) | ||
| })? | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl Drop for VssStore { | ||
| fn drop(&mut self) { | ||
| if let Some(runtime) = self.internal_runtime.take() { | ||
|
|
@@ -391,35 +417,34 @@ impl VssStoreInner { | |
| } | ||
| } | ||
|
|
||
| async fn list_all_keys( | ||
| async fn list_keys( | ||
| &self, client: &VssClient<CustomRetryPolicy>, primary_namespace: &str, | ||
| secondary_namespace: &str, | ||
| ) -> io::Result<Vec<String>> { | ||
| let mut page_token = None; | ||
| let mut keys = vec![]; | ||
| secondary_namespace: &str, page_token: Option<String>, page_size: Option<i32>, | ||
| ) -> io::Result<(Vec<String>, Option<String>)> { | ||
| let key_prefix = self.build_obfuscated_prefix(primary_namespace, secondary_namespace); | ||
| while page_token != Some("".to_string()) { | ||
| let request = ListKeyVersionsRequest { | ||
| store_id: self.store_id.clone(), | ||
| key_prefix: Some(key_prefix.clone()), | ||
| page_token, | ||
| page_size: None, | ||
| }; | ||
| let request = ListKeyVersionsRequest { | ||
| store_id: self.store_id.clone(), | ||
| key_prefix: Some(key_prefix), | ||
| page_token, | ||
| page_size, | ||
| }; | ||
|
|
||
| let response = client.list_key_versions(&request).await.map_err(|e| { | ||
| let msg = format!( | ||
| "Failed to list keys in {}/{}: {}", | ||
| primary_namespace, secondary_namespace, e | ||
| ); | ||
| Error::new(ErrorKind::Other, msg) | ||
| })?; | ||
| let response = client.list_key_versions(&request).await.map_err(|e| { | ||
| let msg = format!( | ||
| "Failed to list keys in {}/{}: {}", | ||
| primary_namespace, secondary_namespace, e | ||
| ); | ||
| Error::new(ErrorKind::Other, msg) | ||
| })?; | ||
|
|
||
| for kv in response.key_versions { | ||
| keys.push(self.extract_key(&kv.key)?); | ||
| } | ||
| page_token = response.next_page_token; | ||
| let mut keys = Vec::with_capacity(response.key_versions.len()); | ||
| for kv in response.key_versions { | ||
| keys.push(self.extract_key(&kv.key)?); | ||
| } | ||
| Ok(keys) | ||
|
|
||
| // VSS may return an empty string instead of None to signal the last page. | ||
| let next_page_token = response.next_page_token.filter(|t| !t.is_empty()); | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this necessary in this commit? Shouldn't this happen in the previous commit? |
||
| Ok((keys, next_page_token)) | ||
| } | ||
|
|
||
| async fn read_internal( | ||
|
|
@@ -543,20 +568,50 @@ impl VssStoreInner { | |
| ) -> io::Result<Vec<String>> { | ||
| check_namespace_key_validity(&primary_namespace, &secondary_namespace, None, "list")?; | ||
|
|
||
| let keys = self | ||
| .list_all_keys(client, &primary_namespace, &secondary_namespace) | ||
| .await | ||
| .map_err(|e| { | ||
| let msg = format!( | ||
| "Failed to retrieve keys in namespace: {}/{} : {}", | ||
| primary_namespace, secondary_namespace, e | ||
| ); | ||
| Error::new(ErrorKind::Other, msg) | ||
| })?; | ||
|
|
||
| let mut page_token: Option<String> = None; | ||
| let mut keys = vec![]; | ||
| loop { | ||
| let (page_keys, next_page_token) = self | ||
| .list_keys(client, &primary_namespace, &secondary_namespace, page_token, None) | ||
| .await?; | ||
| keys.extend(page_keys); | ||
| match next_page_token { | ||
| Some(t) => page_token = Some(t), | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here: why in this commit? |
||
| None => break, | ||
| } | ||
| } | ||
| Ok(keys) | ||
| } | ||
|
|
||
| async fn list_paginated_internal( | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems this is duplicating a lot of the logic of
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done, also fixed a potential issue where if the VSS server returned |
||
| &self, client: &VssClient<CustomRetryPolicy>, primary_namespace: String, | ||
| secondary_namespace: String, page_token: Option<PageToken>, | ||
| ) -> io::Result<PaginatedListResponse> { | ||
| check_namespace_key_validity( | ||
| &primary_namespace, | ||
| &secondary_namespace, | ||
| None, | ||
| "list_paginated", | ||
| )?; | ||
|
|
||
| const PAGE_SIZE: i32 = 50; | ||
|
|
||
| let vss_page_token = page_token.map(|t| t.to_string()); | ||
| let (keys, next_page_token) = self | ||
| .list_keys( | ||
| client, | ||
| &primary_namespace, | ||
| &secondary_namespace, | ||
| vss_page_token, | ||
| Some(PAGE_SIZE), | ||
| ) | ||
| .await?; | ||
|
|
||
| let next_page_token = next_page_token.map(PageToken::new); | ||
|
|
||
| Ok(PaginatedListResponse { keys, next_page_token }) | ||
| } | ||
|
|
||
| async fn execute_locked_write< | ||
| F: Future<Output = Result<(), lightning::io::Error>>, | ||
| FN: FnOnce() -> F, | ||
|
|
@@ -941,35 +996,109 @@ mod tests { | |
| use super::*; | ||
| use crate::io::test_utils::do_read_write_remove_list_persist; | ||
|
|
||
| #[tokio::test] | ||
| async fn vss_read_write_remove_list_persist() { | ||
| fn build_vss_store() -> VssStore { | ||
| let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap(); | ||
| let mut rng = rng(); | ||
| let rand_store_id: String = (0..7).map(|_| rng.sample(Alphanumeric) as char).collect(); | ||
| let mut node_seed = [0u8; 64]; | ||
| rng.fill_bytes(&mut node_seed); | ||
| let entropy = NodeEntropy::from_seed_bytes(node_seed); | ||
| let vss_store = | ||
| VssStoreBuilder::new(entropy, vss_base_url, rand_store_id, Network::Testnet) | ||
| .build_with_sigs_auth(HashMap::new()) | ||
| .unwrap(); | ||
| VssStoreBuilder::new(entropy, vss_base_url, rand_store_id, Network::Testnet) | ||
| .build_with_sigs_auth(HashMap::new()) | ||
| .unwrap() | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn vss_read_write_remove_list_persist() { | ||
| let vss_store = build_vss_store(); | ||
| do_read_write_remove_list_persist(&vss_store).await; | ||
| } | ||
|
|
||
| #[tokio::test(flavor = "multi_thread", worker_threads = 1)] | ||
| async fn vss_read_write_remove_list_persist_in_runtime_context() { | ||
| let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap(); | ||
| let mut rng = rng(); | ||
| let rand_store_id: String = (0..7).map(|_| rng.sample(Alphanumeric) as char).collect(); | ||
| let mut node_seed = [0u8; 64]; | ||
| rng.fill_bytes(&mut node_seed); | ||
| let entropy = NodeEntropy::from_seed_bytes(node_seed); | ||
| let vss_store = | ||
| VssStoreBuilder::new(entropy, vss_base_url, rand_store_id, Network::Testnet) | ||
| .build_with_sigs_auth(HashMap::new()) | ||
| .unwrap(); | ||
|
|
||
| let vss_store = build_vss_store(); | ||
| do_read_write_remove_list_persist(&vss_store).await; | ||
| drop(vss_store) | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn vss_paginated_listing() { | ||
| let store = build_vss_store(); | ||
| let ns = "test_paginated"; | ||
| let sub = "listing"; | ||
| let num_entries = 5; | ||
|
|
||
| for i in 0..num_entries { | ||
| let key = format!("key_{:04}", i); | ||
| let data = vec![i as u8; 32]; | ||
| KVStore::write(&store, ns, sub, &key, data).await.unwrap(); | ||
| } | ||
|
|
||
| let mut all_keys = Vec::new(); | ||
| let mut page_token = None; | ||
|
|
||
| loop { | ||
| let response = | ||
| PaginatedKVStore::list_paginated(&store, ns, sub, page_token).await.unwrap(); | ||
| all_keys.extend(response.keys); | ||
| match response.next_page_token { | ||
| Some(token) => page_token = Some(token), | ||
| _ => break, | ||
| } | ||
|
Comment on lines
+1044
to
+1047
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm here we determine whether we are done with pagination based on whether the page token is
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Update to handle empty string |
||
| } | ||
|
|
||
| assert_eq!(all_keys.len(), num_entries); | ||
|
|
||
| // Verify no duplicates | ||
| let mut unique = all_keys.clone(); | ||
| unique.sort(); | ||
| unique.dedup(); | ||
| assert_eq!(unique.len(), num_entries); | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn vss_paginated_empty_namespace() { | ||
| let store = build_vss_store(); | ||
| let response = | ||
| PaginatedKVStore::list_paginated(&store, "nonexistent", "ns", None).await.unwrap(); | ||
| assert!(response.keys.is_empty()); | ||
| assert!(response.next_page_token.is_none()); | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn vss_paginated_removal() { | ||
| let store = build_vss_store(); | ||
| let ns = "test_paginated"; | ||
| let sub = "removal"; | ||
|
|
||
| KVStore::write(&store, ns, sub, "a", vec![1u8; 8]).await.unwrap(); | ||
| KVStore::write(&store, ns, sub, "b", vec![2u8; 8]).await.unwrap(); | ||
| KVStore::write(&store, ns, sub, "c", vec![3u8; 8]).await.unwrap(); | ||
|
|
||
| KVStore::remove(&store, ns, sub, "b", false).await.unwrap(); | ||
|
|
||
| let response = PaginatedKVStore::list_paginated(&store, ns, sub, None).await.unwrap(); | ||
| assert_eq!(response.keys.len(), 2); | ||
| assert!(response.keys.contains(&"a".to_string())); | ||
| assert!(!response.keys.contains(&"b".to_string())); | ||
|
benthecarman marked this conversation as resolved.
|
||
| assert!(response.keys.contains(&"c".to_string())); | ||
| } | ||
|
|
||
| #[tokio::test] | ||
| async fn vss_paginated_namespace_isolation() { | ||
| let store = build_vss_store(); | ||
|
|
||
| KVStore::write(&store, "ns_a", "sub", "key_1", vec![1u8; 8]).await.unwrap(); | ||
| KVStore::write(&store, "ns_a", "sub", "key_2", vec![2u8; 8]).await.unwrap(); | ||
| KVStore::write(&store, "ns_b", "sub", "key_3", vec![3u8; 8]).await.unwrap(); | ||
|
|
||
| let response = PaginatedKVStore::list_paginated(&store, "ns_a", "sub", None).await.unwrap(); | ||
| assert_eq!(response.keys.len(), 2); | ||
|
benthecarman marked this conversation as resolved.
|
||
| assert!(response.keys.contains(&"key_1".to_string())); | ||
| assert!(response.keys.contains(&"key_2".to_string())); | ||
|
|
||
| let response = PaginatedKVStore::list_paginated(&store, "ns_b", "sub", None).await.unwrap(); | ||
| assert_eq!(response.keys.len(), 1); | ||
| assert!(response.keys.contains(&"key_3".to_string())); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given this version introduced new error types (namely version mismatch, but can't recall if there are others), we'll need to update our retry policy to not retry if we hit a version mismatch, as it's unrecoverable.