diff --git a/sync_client_test/src/main.rs b/sync_client_test/src/main.rs index 7253b27..1b69c95 100644 --- a/sync_client_test/src/main.rs +++ b/sync_client_test/src/main.rs @@ -1,8 +1,11 @@ // Mock sync client for testing the Arkendro sync server // This implements the binary protocol specified in PROTOCOL.md -use std::io::{Read, Write, Result, Error, ErrorKind}; +use std::io::{Read, Write, Result, Error, ErrorKind, BufReader, Seek}; use std::net::TcpStream; +use std::fs::{self, File}; +use std::path::{Path, PathBuf}; +use std::collections::HashMap; /// Command codes from the protocol #[derive(Debug, Clone, Copy)] @@ -335,6 +338,221 @@ impl SnapshotObj { } } +const CHUNK_SIZE: usize = 4 * 1024 * 1024 - 36; // 4 MiB minus protocol overhead (32 bytes hash + 4 bytes length) + +/// Chunk reference for memory-efficient storage +#[derive(Debug, Clone)] +struct ChunkRef { + hash: [u8; 32], + file_path: PathBuf, + offset: u64, + size: usize, +} + +/// File system scanner for recursive directory processing +struct FileSystemScanner { + root_path: PathBuf, + chunk_refs: Vec, // All chunks with file references + file_metadata: HashMap, // path -> (meta_hash, metadata) + dir_metadata: HashMap, // path -> (meta_hash, metadata) +} + +impl FileSystemScanner { + fn new>(root_path: P) -> Result { + let root_path = root_path.as_ref().to_path_buf(); + if !root_path.exists() { + return Err(Error::new(ErrorKind::NotFound, + format!("Root directory does not exist: {}", root_path.display()))); + } + if !root_path.is_dir() { + return Err(Error::new(ErrorKind::InvalidInput, + format!("Root path is not a directory: {}", root_path.display()))); + } + + Ok(Self { + root_path, + chunk_refs: Vec::new(), + file_metadata: HashMap::new(), + dir_metadata: HashMap::new(), + }) + } + + fn scan(&mut self) -> Result<()> { + println!("šŸ“ Scanning directory: {}", self.root_path.display()); + self.scan_directory(&self.root_path.clone())?; + println!("āœ“ Found {} files and {} directories", self.file_metadata.len(), self.dir_metadata.len()); + Ok(()) + } + + fn scan_directory(&mut self, dir_path: &Path) -> Result<[u8; 32]> { + let mut entries = Vec::new(); + + // Read directory contents + let dir_entries = fs::read_dir(dir_path)?; + let mut entry_list: Vec<_> = dir_entries.collect::, _>>()?; + entry_list.sort_by_key(|entry| entry.file_name()); + + for entry in entry_list { + let entry_path = entry.path(); + let entry_name = entry.file_name().to_string_lossy().to_string(); + + // Skip hidden files and common system files + if entry_name.starts_with('.') || entry_name == "Thumbs.db" || entry_name == "desktop.ini" { + continue; + } + + let metadata = entry.metadata()?; + + if metadata.is_file() { + let target_meta_hash = self.scan_file(&entry_path)?; + entries.push(DirEntry { + entry_type: EntryType::File, + name: entry_name, + target_meta_hash, + }); + } else if metadata.is_dir() { + let target_meta_hash = self.scan_directory(&entry_path)?; + entries.push(DirEntry { + entry_type: EntryType::Dir, + name: entry_name, + target_meta_hash, + }); + } + // Note: We skip symlinks for now as they require special handling + } + + // Create directory metadata + let dir_obj = DirObj::new(entries); + let dir_data = dir_obj.serialize(); + let dir_hash = blake3_hash(&dir_data); + + let relative_path = if dir_path == &self.root_path { + PathBuf::from("/") + } else { + dir_path.strip_prefix(&self.root_path) + .unwrap_or(dir_path) + .to_path_buf() + }; + + self.dir_metadata.insert(relative_path, (dir_hash, dir_obj)); + Ok(dir_hash) + } + + fn scan_file(&mut self, file_path: &Path) -> Result<[u8; 32]> { + let metadata = fs::metadata(file_path)?; + let total_size = metadata.len(); + + let relative_path = file_path.strip_prefix(&self.root_path) + .unwrap_or(file_path) + .to_path_buf(); + + // Use unified streaming method for all files - efficient for any size + println!(" Processing file ({} bytes): {}", total_size, file_path.display()); + let chunk_hashes = self.hash_file_chunks(file_path, &relative_path)?; + + // Create file metadata + let file_obj = FileObj::new(total_size, chunk_hashes); + let file_data = file_obj.serialize(); + let file_meta_hash = blake3_hash(&file_data); + + self.file_metadata.insert(relative_path, (file_meta_hash, file_obj)); + + Ok(file_meta_hash) + } + + fn get_all_chunk_hashes(&self) -> Vec<[u8; 32]> { + let mut all_hashes = Vec::new(); + + // Add chunks from chunk references (all files use this now) + for chunk_ref in &self.chunk_refs { + all_hashes.push(chunk_ref.hash); + } + + // Remove duplicates + all_hashes.sort(); + all_hashes.dedup(); + + all_hashes + } + + /// Hash file chunks using efficient streaming I/O (works for any file size) + fn hash_file_chunks(&mut self, file_path: &Path, _relative_path: &Path) -> Result> { + let file = File::open(file_path)?; + let mut reader = BufReader::with_capacity(CHUNK_SIZE * 2, file); + let mut chunk_hashes = Vec::new(); + let mut chunk_idx = 0; + + loop { + let mut buffer = vec![0u8; CHUNK_SIZE]; + let bytes_read = reader.read(&mut buffer)?; + + if bytes_read == 0 { + break; + } + + buffer.truncate(bytes_read); + + // Hash chunk + let chunk_hash = blake3_hash(&buffer); + + // Store chunk reference + self.chunk_refs.push(ChunkRef { + hash: chunk_hash, + file_path: file_path.to_path_buf(), + offset: (chunk_idx * CHUNK_SIZE) as u64, + size: bytes_read, + }); + + chunk_hashes.push(chunk_hash); + chunk_idx += 1; + + if chunk_idx % 100 == 0 { + println!(" Processed {} chunks...", chunk_idx); + } + } + + println!(" Completed {} chunks", chunk_idx); + Ok(chunk_hashes) + } + + /// Read chunk data on demand from file + fn read_chunk_data(&self, chunk_ref: &ChunkRef) -> Result> { + let mut file = File::open(&chunk_ref.file_path)?; + file.seek(std::io::SeekFrom::Start(chunk_ref.offset))?; + + let mut buffer = vec![0u8; chunk_ref.size]; + file.read_exact(&mut buffer)?; + + Ok(buffer) + } + + /// Get chunk reference by hash + fn find_chunk_ref(&self, hash: &[u8; 32]) -> Option<&ChunkRef> { + self.chunk_refs.iter().find(|chunk_ref| chunk_ref.hash == *hash) + } + + fn get_all_metadata_items(&self) -> Vec<(MetaType, [u8; 32])> { + let mut items = Vec::new(); + + // Add file metadata + for (_, (meta_hash, _)) in &self.file_metadata { + items.push((MetaType::File, *meta_hash)); + } + + // Add directory metadata + for (_, (meta_hash, _)) in &self.dir_metadata { + items.push((MetaType::Dir, *meta_hash)); + } + + items + } + + fn get_root_dir_hash(&self) -> Option<[u8; 32]> { + self.dir_metadata.get(&PathBuf::from("/")) + .map(|(hash, _)| *hash) + } +} + /// Simple sync client for testing struct SyncClient { stream: TcpStream, @@ -461,9 +679,6 @@ impl SyncClient { } fn send_chunk(&mut self, hash: &[u8; 32], data: &[u8]) -> Result<()> { - println!("Sending chunk {} bytes...", data.len()); - println!("Chunk hash: {}", hex::encode(hash)); - // Verify hash matches data let computed_hash = blake3_hash(data); if computed_hash != *hash { @@ -480,7 +695,6 @@ impl SyncClient { let (cmd, payload) = self.receive_message()?; match cmd { Command::ChunkOk => { - println!("āœ“ Chunk uploaded successfully"); Ok(()) } Command::ChunkFail => { @@ -543,9 +757,6 @@ impl SyncClient { } fn send_metadata(&mut self, meta_type: MetaType, meta_hash: &[u8; 32], body: &[u8]) -> Result<()> { - println!("Sending {:?} metadata {} bytes...", meta_type, body.len()); - println!("Metadata hash: {}", hex::encode(meta_hash)); - // Verify hash matches body let computed_hash = blake3_hash(body); if computed_hash != *meta_hash { @@ -563,7 +774,6 @@ impl SyncClient { let (cmd, payload) = self.receive_message()?; match cmd { Command::MetaOk => { - println!("āœ“ Metadata uploaded successfully"); Ok(()) } Command::MetaFail => { @@ -579,9 +789,6 @@ impl SyncClient { } fn send_snapshot(&mut self, snapshot_hash: &[u8; 32], snapshot_data: &[u8]) -> Result<()> { - println!("Sending snapshot {} bytes...", snapshot_data.len()); - println!("Snapshot hash: {}", hex::encode(snapshot_hash)); - // Verify hash matches data let computed_hash = blake3_hash(snapshot_data); if computed_hash != *snapshot_hash { @@ -610,14 +817,9 @@ impl SyncClient { let missing_chunks_count = u32::from_le_bytes([payload[0], payload[1], payload[2], payload[3]]) as usize; let missing_metas_count = u32::from_le_bytes([payload[4], payload[5], payload[6], payload[7]]) as usize; - let mut error_msg = format!("Server rejected snapshot: {} missing chunks, {} missing metadata items", + let error_msg = format!("Server rejected snapshot: {} missing chunks, {} missing metadata items", missing_chunks_count, missing_metas_count); - // Optionally parse the actual missing items for more detailed error - if missing_chunks_count > 0 || missing_metas_count > 0 { - error_msg.push_str(" (run with chunk/metadata verification to see details)"); - } - Err(Error::new(ErrorKind::Other, error_msg)) } _ => Err(Error::new(ErrorKind::InvalidData, "Expected SnapshotOk or SnapshotFail")), @@ -635,40 +837,46 @@ fn blake3_hash(data: &[u8]) -> [u8; 32] { blake3::hash(data).into() } -/// Generate some mock data for testing -fn generate_mock_data() -> Vec<(Vec, [u8; 32])> { - let mut data_chunks = Vec::new(); - - // Some test data chunks - let chunks = [ - b"Hello, Arkendro sync server! This is test chunk data.".to_vec(), - b"Another test chunk with different content for variety.".to_vec(), - b"Binary data test: \x00\x01\x02\x03\xFF\xFE\xFD\xFC".to_vec(), - ]; - - for chunk in chunks { - let hash = blake3_hash(&chunk); - data_chunks.push((chunk, hash)); - } - - data_chunks -} - fn main() -> Result<()> { - println!("šŸš€ Arkendro Sync Client Extended Test"); - println!("====================================\n"); + println!("šŸš€ Arkendro Sync Client - High Performance Recursive Upload"); + println!("=========================================================="); + + // Check for root directory argument + let root_dir = std::env::args().nth(1).unwrap_or_else(|| { + println!("Usage: {} ", std::env::args().next().unwrap()); + println!("Example: {} ./test_data", std::env::args().next().unwrap()); + println!("Using default: ./root"); + "./root".to_string() + }); + + println!("šŸ“ Root directory: {}", root_dir); + + // Create root directory if it doesn't exist (for testing) + if !Path::new(&root_dir).exists() { + println!("Creating test directory structure at {}", root_dir); + create_test_directory_structure(&root_dir)?; + } + + // Scan the filesystem + let mut scanner = FileSystemScanner::new(&root_dir)?; + scanner.scan()?; + + if scanner.file_metadata.is_empty() { + println!("⚠ No files found in directory. Nothing to upload."); + return Ok(()); + } // Connect to server let mut client = SyncClient::connect("127.0.0.1:8380")?; - println!("Connected to sync server\n"); + println!("Connected to sync server"); // Test protocol flow client.hello()?; - // Try to authenticate with hardcoded machine ID (you'll need to create a machine first via the web interface) + // Try to authenticate with hardcoded machine ID let machine_id = 1; // Hardcoded machine ID for testing match client.authenticate("admin", "password123", machine_id) { - Ok(()) => println!("Authentication successful!\n"), + Ok(()) => println!("Authentication successful!"), Err(e) => { println!("Authentication failed: {}", e); println!("Make sure you have:"); @@ -679,145 +887,94 @@ fn main() -> Result<()> { } } - println!("šŸ“ Creating test filesystem hierarchy...\n"); - - // Step 1: Create test file data chunks - let file1_data = b"Hello, this is the content of file1.txt in our test filesystem!"; - let file2_data = b"This is file2.log with some different content for testing purposes."; - let file3_data = b"Binary data file: \x00\x01\x02\x03\xFF\xFE\xFD\xFC and some text after."; - - let file1_hash = blake3_hash(file1_data); - let file2_hash = blake3_hash(file2_data); - let file3_hash = blake3_hash(file3_data); - - // Upload chunks if needed println!("šŸ”— Uploading file chunks..."); - let chunk_hashes = vec![file1_hash, file2_hash, file3_hash]; + + // Upload chunks if needed + let chunk_hashes = scanner.get_all_chunk_hashes(); let missing_chunks = client.check_chunks(&chunk_hashes)?; if !missing_chunks.is_empty() { - for &missing_hash in &missing_chunks { - if missing_hash == file1_hash { - client.send_chunk(&file1_hash, file1_data)?; - } else if missing_hash == file2_hash { - client.send_chunk(&file2_hash, file2_data)?; - } else if missing_hash == file3_hash { - client.send_chunk(&file3_hash, file3_data)?; + println!(" Uploading {} missing chunks...", missing_chunks.len()); + + for (i, &missing_hash) in missing_chunks.iter().enumerate() { + // Find the chunk data for this hash + let mut found = false; + + // Check chunk references (streaming chunks) + if let Some(chunk_ref) = scanner.find_chunk_ref(&missing_hash) { + match scanner.read_chunk_data(chunk_ref) { + Ok(chunk_data) => { + println!(" [{}/{}] Uploading chunk: {} bytes from {}", + i + 1, missing_chunks.len(), chunk_data.len(), + chunk_ref.file_path.file_name().unwrap_or_default().to_string_lossy()); + client.send_chunk(&missing_hash, &chunk_data)?; + found = true; + } + Err(e) => { + println!(" Error reading chunk: {}", e); + continue; + } + } + } + + if !found { + return Err(Error::new(ErrorKind::Other, + format!("Could not find chunk data for hash: {}", hex::encode(missing_hash)))); } } } else { println!("āœ“ All chunks already exist on server"); } - // Step 2: Create file metadata objects - println!("\nšŸ“„ Creating file metadata objects..."); - let file1_obj = FileObj::new(file1_data.len() as u64, vec![file1_hash]); - let file2_obj = FileObj::new(file2_data.len() as u64, vec![file2_hash]); - let file3_obj = FileObj::new(file3_data.len() as u64, vec![file3_hash]); - - let file1_meta_data = file1_obj.serialize(); - let file2_meta_data = file2_obj.serialize(); - let file3_meta_data = file3_obj.serialize(); - - let file1_meta_hash = blake3_hash(&file1_meta_data); - let file2_meta_hash = blake3_hash(&file2_meta_data); - let file3_meta_hash = blake3_hash(&file3_meta_data); - - // Upload file metadata - client.send_metadata(MetaType::File, &file1_meta_hash, &file1_meta_data)?; - client.send_metadata(MetaType::File, &file2_meta_hash, &file2_meta_data)?; - client.send_metadata(MetaType::File, &file3_meta_hash, &file3_meta_data)?; - - // Step 3: Create directory structures - println!("\nšŸ“ Creating directory structures..."); + println!("šŸ“„ Uploading file metadata..."); - // Create /logs subdirectory with file2 - let logs_dir_entries = vec![ - DirEntry { - entry_type: EntryType::File, - name: "app.log".to_string(), - target_meta_hash: file2_meta_hash, - }, - ]; - let logs_dir_obj = DirObj::new(logs_dir_entries); - let logs_dir_data = logs_dir_obj.serialize(); - let logs_dir_hash = blake3_hash(&logs_dir_data); - client.send_metadata(MetaType::Dir, &logs_dir_hash, &logs_dir_data)?; + // Upload file metadata + for (_path, (meta_hash, file_obj)) in &scanner.file_metadata { + let file_data = file_obj.serialize(); + client.send_metadata(MetaType::File, meta_hash, &file_data)?; + } - // Create /data subdirectory with file3 - let data_dir_entries = vec![ - DirEntry { - entry_type: EntryType::File, - name: "binary.dat".to_string(), - target_meta_hash: file3_meta_hash, - }, - ]; - let data_dir_obj = DirObj::new(data_dir_entries); - let data_dir_data = data_dir_obj.serialize(); - let data_dir_hash = blake3_hash(&data_dir_data); - client.send_metadata(MetaType::Dir, &data_dir_hash, &data_dir_data)?; + println!("šŸ“ Uploading directory metadata..."); + + // Upload directory metadata (in reverse order to upload children before parents) + let mut dir_entries: Vec<_> = scanner.dir_metadata.iter().collect(); + dir_entries.sort_by_key(|(path, _)| std::cmp::Reverse(path.components().count())); + + for (_path, (meta_hash, dir_obj)) in dir_entries { + let dir_data = dir_obj.serialize(); + client.send_metadata(MetaType::Dir, meta_hash, &dir_data)?; + } - // Create root directory with file1 and subdirectories - let root_dir_entries = vec![ - DirEntry { - entry_type: EntryType::File, - name: "readme.txt".to_string(), - target_meta_hash: file1_meta_hash, - }, - DirEntry { - entry_type: EntryType::Dir, - name: "logs".to_string(), - target_meta_hash: logs_dir_hash, - }, - DirEntry { - entry_type: EntryType::Dir, - name: "data".to_string(), - target_meta_hash: data_dir_hash, - }, - ]; - let root_dir_obj = DirObj::new(root_dir_entries); - let root_dir_data = root_dir_obj.serialize(); - let root_dir_hash = blake3_hash(&root_dir_data); - client.send_metadata(MetaType::Dir, &root_dir_hash, &root_dir_data)?; + // Get root directory hash + let root_dir_hash = scanner.get_root_dir_hash() + .ok_or_else(|| Error::new(ErrorKind::Other, "No root directory found"))?; - // Step 4: Create partition - println!("\nšŸ’½ Creating partition metadata..."); - let partition_obj = PartitionObj::new("test-partition".to_string(), root_dir_hash); + println!("šŸ’½ Creating partition metadata..."); + let partition_obj = PartitionObj::new("uploaded-data".to_string(), root_dir_hash); let partition_data = partition_obj.serialize(); let partition_hash = blake3_hash(&partition_data); client.send_metadata(MetaType::Partition, &partition_hash, &partition_data)?; - // Step 5: Create disk - println!("\nšŸ–„ļø Creating disk metadata..."); - let disk_obj = DiskObj::new("test-disk-001".to_string(), vec![partition_hash]); + println!("šŸ–„ļø Creating disk metadata..."); + let disk_obj = DiskObj::new("uploaded-disk".to_string(), vec![partition_hash]); let disk_data = disk_obj.serialize(); let disk_hash = blake3_hash(&disk_data); client.send_metadata(MetaType::Disk, &disk_hash, &disk_data)?; - // Step 6: Create snapshot - println!("\nšŸ“ø Creating snapshot..."); + println!("šŸ“ø Creating snapshot..."); let snapshot_obj = SnapshotObj::new(vec![disk_hash]); let snapshot_data = snapshot_obj.serialize(); let snapshot_hash = blake3_hash(&snapshot_data); - // Upload snapshot using SendSnapshot command (not SendMeta) client.send_snapshot(&snapshot_hash, &snapshot_data)?; - // Step 7: Verify everything is stored - println!("\nšŸ” Verifying stored objects..."); + println!("šŸ” Verifying stored objects..."); // Check all metadata objects - let all_metadata = vec![ - (MetaType::File, file1_meta_hash), - (MetaType::File, file2_meta_hash), - (MetaType::File, file3_meta_hash), - (MetaType::Dir, logs_dir_hash), - (MetaType::Dir, data_dir_hash), - (MetaType::Dir, root_dir_hash), - (MetaType::Partition, partition_hash), - (MetaType::Disk, disk_hash), - (MetaType::Snapshot, snapshot_hash), - ]; + let mut all_metadata = scanner.get_all_metadata_items(); + all_metadata.push((MetaType::Partition, partition_hash)); + all_metadata.push((MetaType::Disk, disk_hash)); + all_metadata.push((MetaType::Snapshot, snapshot_hash)); let missing_metadata = client.check_metadata(&all_metadata)?; if missing_metadata.is_empty() { @@ -830,27 +987,65 @@ fn main() -> Result<()> { } // Check all chunks - let all_chunks = vec![file1_hash, file2_hash, file3_hash]; - let missing_chunks_final = client.check_chunks(&all_chunks)?; + let missing_chunks_final = client.check_chunks(&chunk_hashes)?; if missing_chunks_final.is_empty() { println!("āœ“ All data chunks verified as stored"); } else { println!("⚠ Warning: {} chunks still missing", missing_chunks_final.len()); } - println!("\nšŸŽ‰ Complete filesystem hierarchy created!"); + println!("šŸŽ‰ Complete filesystem hierarchy uploaded!"); println!("šŸ“Š Summary:"); - println!(" • 3 files (readme.txt, logs/app.log, data/binary.dat)"); - println!(" • 3 directories (/, /logs, /data)"); - println!(" • 1 partition (test-partition)"); - println!(" • 1 disk (test-disk-001)"); + + // Count total chunks + let total_chunks = scanner.chunk_refs.len(); + + println!(" • {} files uploaded", scanner.file_metadata.len()); + println!(" • {} data chunks uploaded", total_chunks); + println!(" • {} directories processed", scanner.dir_metadata.len()); + println!(" • 1 partition (uploaded-data)"); + println!(" • 1 disk (uploaded-disk)"); println!(" • 1 snapshot"); println!(" • Snapshot hash: {}", hex::encode(snapshot_hash)); - println!("\nāœ… All tests completed successfully!"); + println!("āœ… Upload completed successfully!"); // Close connection client.close()?; Ok(()) } + +/// Create a test directory structure for demonstration purposes +fn create_test_directory_structure(root_dir: &str) -> Result<()> { + let root_path = Path::new(root_dir); + fs::create_dir_all(root_path)?; + + // Create some test files + fs::write(root_path.join("readme.txt"), + "Welcome to the Arkendro sync test!\n\nThis directory contains test files for upload.")?; + + fs::write(root_path.join("config.json"), + r#"{"name": "test", "version": "1.0", "settings": {"debug": true}}"#)?; + + // Create subdirectories + let logs_dir = root_path.join("logs"); + fs::create_dir_all(&logs_dir)?; + fs::write(logs_dir.join("app.log"), + "2024-01-01 10:00:00 INFO Application started\n2024-01-01 10:01:00 INFO Processing data\n")?; + fs::write(logs_dir.join("error.log"), + "2024-01-01 10:05:00 ERROR Connection timeout\n")?; + + let data_dir = root_path.join("data"); + fs::create_dir_all(&data_dir)?; + fs::write(data_dir.join("sample.csv"), + "id,name,value\n1,test,100\n2,demo,200\n")?; + + let nested_dir = data_dir.join("nested"); + fs::create_dir_all(&nested_dir)?; + fs::write(nested_dir.join("deep_file.txt"), + "This file is nested deeper in the directory structure.")?; + + println!("āœ“ Created test directory structure with sample files"); + Ok(()) +} \ No newline at end of file