Skip to content

Commit

Permalink
⚡️ avoid fetching again events to retrieve job results
Browse files Browse the repository at this point in the history
  • Loading branch information
AbdelStark committed Jul 30, 2024
1 parent ea7374d commit a8f42e0
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 24 deletions.
42 changes: 18 additions & 24 deletions crates/core/src/dvm/customer.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use std::sync::Arc;
use std::time::Duration;

use log::{debug, error, info};
use nostr_sdk::prelude::*;
use thiserror::Error;
use tokio::sync::Mutex;
use tokio::time::timeout;

use crate::config::Settings;
Expand Down Expand Up @@ -138,11 +140,15 @@ impl Customer {
let job_id = job_id.to_string();
let subscription_id = subscription_id.clone();

let result = Arc::new(Mutex::new(None));
let result_clone = Arc::clone(&result);

// Handle incoming Nostr notifications
self.nostr_client
.handle_notifications(|notification| {
.handle_notifications(move |notification| {
let job_id = job_id.clone();
let subscription_id = subscription_id.clone();
let result = Arc::clone(&result_clone);
async move {
if let RelayPoolNotification::Event {
subscription_id: sub_id,
Expand All @@ -151,10 +157,12 @@ impl Customer {
} = notification
{
if sub_id == subscription_id {
if let Ok(result) =
if let Ok(job_result) =
serde_json::from_str::<GenerateZKPJobResult>(&event.content)
{
if result.job_id == job_id {
if job_result.job_id == job_id {
let mut result_guard = result.lock().await;
*result_guard = Some(event.content.clone());
return Ok(true);
}
}
Expand All @@ -166,28 +174,14 @@ impl Customer {
.await
.map_err(CustomerError::NostrClientError)?;

let filter = Filter::new()
.kind(Kind::Custom(JOB_RESULT_KIND))
.author(PublicKey::from_bech32(&self.settings.prover_agent_pk).unwrap())
.since(Timestamp::now() - Duration::from_secs(60));

// Fetch recent events to find the job result
let events = self
.nostr_client
.get_events_of(vec![filter], None)
.await
.map_err(CustomerError::NostrClientError)?;

// Find and return the matching job result
for event in events {
if let Ok(job_result) = serde_json::from_str::<GenerateZKPJobResult>(&event.content) {
if job_result.job_id == job_id {
return Ok(job_result);
}
}
// Check if we found a result
let result_guard = result.lock().await;
if let Some(job_result) = result_guard.clone() {
// Convert the string to a GenerateZKPJobResult
Ok(serde_json::from_str(&job_result).unwrap())
} else {
Err(CustomerError::Unknown("Job result not found".to_string()))
}

Err(CustomerError::Unknown("Job result not found".to_string()))
}

/// Verifies the proof in a job result
Expand Down
42 changes: 42 additions & 0 deletions scripts/utils/concatenate_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import os
import shutil
import argparse

def concatenate_files(root_dir, extension, output_file):
"""
Concatenates all files with a given extension in a directory and its subdirectories into a single file.
Includes the full path of each file before appending its content, prefixed with '//' as a comment.
Args:
root_dir (str): The root directory to search for files.
extension (str): The file extension to look for.
output_file (str): The path to the output file where the content will be concatenated.
"""
with open(output_file, 'wb') as outfile:
# Walk through all directories and files in the root directory
for dirpath, dirnames, filenames in os.walk(root_dir):
for filename in filenames:
# Check if the file ends with the given extension
if filename.endswith(extension):
file_path = os.path.join(dirpath, filename)
# Write the full file path as a comment before the content
outfile.write(f"// {file_path}\n".encode())
# Open each file in binary mode and append its content to the output file
with open(file_path, 'rb') as infile:
shutil.copyfileobj(infile, outfile)
# Ensure there is a newline after each file's content (optional, for readability)
outfile.write(b'\n')
print(f"Appended {file_path} to {output_file}")

def main():
parser = argparse.ArgumentParser(description='Concatenate files with a specific extension from a directory into a single file.')
parser.add_argument('root_dir', type=str, help='The root directory to search for files')
parser.add_argument('extension', type=str, help='The file extension to look for')
parser.add_argument('output_file', type=str, help='The file where the content will be concatenated')

args = parser.parse_args()

concatenate_files(args.root_dir, args.extension, args.output_file)

if __name__ == '__main__':
main()

0 comments on commit a8f42e0

Please sign in to comment.