Thread Safety in Mosaic

Thread safety is about making sure your app works correctly when multiple things are happening at the same time. Think of it like organizing a kitchen where multiple chefs are cooking - you need rules to prevent chaos.

Why Thread Safety Matters

Modern apps do many things simultaneously:

  • Loading data from the internet
  • Updating the user interface
  • Processing user input
  • Saving data to storage

Without proper coordination, these operations can interfere with each other and cause bugs like:

  • Data getting corrupted
  • App crashes
  • Inconsistent state
  • Race conditions (two operations fighting over the same resource)

Understanding the Problem

Here’s an example of what can go wrong:

// ❌ DANGEROUS - Not thread safe
class Counter {
  int _value = 0;
  
  void increment() {
    _value = _value + 1;  // This is NOT atomic!
  }
  
  int get value => _value;
}

// If two operations call increment() at the same time:
// Operation A reads _value (0)
// Operation B reads _value (0) <- Still 0!
// Operation A sets _value to 1
// Operation B sets _value to 1 <- Should be 2!
// Result: We lost one increment!

Mutex - The Digital Lock

A Mutex (Mutual Exclusion) is like a lock on a bathroom door. Only one person can use the bathroom at a time. In programming, only one operation can access the protected data at a time.

Basic Mutex Usage

import 'package:mosaic/mosaic.dart';

// ✅ SAFE - Using Mutex
class SafeCounter {
  int _value = 0;
  final Mutex<int> _mutex = Mutex(0);  // Protect the value with a mutex
  
  Future<void> increment() async {
    await _mutex.use((currentValue) async {
      // Only one operation can be here at a time
      final newValue = currentValue + 1;
      return newValue;  // This becomes the new protected value
    });
  }
  
  Future<int> get value async {
    return await _mutex.get();  // Safe read
  }
}

Real-World Example: User Preferences

class UserPreferences {
  final Mutex<Map<String, dynamic>> _preferences = Mutex({});
  
  Future<void> setPreference(String key, dynamic value) async {
    await _preferences.use((prefs) async {
      prefs[key] = value;
      await saveToStorage(prefs);  // Save to device storage
      return prefs;
    });
    
    logger.info('Preference updated: $key = $value', ['user', 'preferences']);
  }
  
  Future<T?> getPreference<T>(String key) async {
    final prefs = await _preferences.get();
    return prefs[key] as T?;
  }
  
  Future<void> updateTheme(String themeName) async {
    await setPreference('theme', themeName);
    
    // Notify other modules about the theme change
    events.emit<String>('preferences/theme_changed', themeName);
  }
  
  Future<void> saveToStorage(Map<String, dynamic> prefs) async {
    // Simulate saving to device storage
    await Future.delayed(Duration(milliseconds: 100));
  }
}

Shopping Cart Example

class ShoppingCart {
  final Mutex<List<CartItem>> _items = Mutex([]);
  
  Future<void> addItem(Product product, int quantity) async {
    await _items.use((items) async {
      // Check if item already exists
      final existingIndex = items.indexWhere((item) => item.productId == product.id);
      
      if (existingIndex >= 0) {
        // Update existing item
        items[existingIndex] = items[existingIndex].copyWith(
          quantity: items[existingIndex].quantity + quantity,
        );
      } else {
        // Add new item
        items.add(CartItem(
          productId: product.id,
          productName: product.name,
          price: product.price,
          quantity: quantity,
        ));
      }
      
      return items;
    });
    
    logger.info('Added $quantity x ${product.name} to cart', ['cart', 'user']);
    events.emit<Map<String, dynamic>>('cart/item_added', {
      'product_id': product.id,
      'quantity': quantity,
    });
  }
  
  Future<void> removeItem(String productId) async {
    await _items.use((items) async {
      items.removeWhere((item) => item.productId == productId);
      return items;
    });
    
    logger.info('Removed item from cart: $productId', ['cart', 'user']);
    events.emit<String>('cart/item_removed', productId);
  }
  
  Future<double> getTotalPrice() async {
    final items = await _items.get();
    return items.fold(0.0, (total, item) => total + (item.price * item.quantity));
  }
  
  Future<int> getItemCount() async {
    final items = await _items.get();
    return items.fold(0, (total, item) => total + item.quantity);
  }
}

class CartItem {
  final String productId;
  final String productName;
  final double price;
  final int quantity;
  
  CartItem({
    required this.productId,
    required this.productName,
    required this.price,
    required this.quantity,
  });
  
  CartItem copyWith({int? quantity}) {
    return CartItem(
      productId: productId,
      productName: productName,
      price: price,
      quantity: quantity ?? this.quantity,
    );
  }
}

Semaphore - Managing Limited Resources

A Semaphore is like a parking lot with a limited number of spaces. It allows a specific number of operations to happen simultaneously, but no more.

Basic Semaphore Usage

class NetworkManager {
  final Semaphore _semaphore = Semaphore();  // Default allows 1 operation at a time
  
  Future<String> makeApiCall(String url) async {
    await _semaphore.lock();  // Wait for permission to proceed
    
    try {
      logger.info('Making API call to: $url', ['network']);
      
      // Simulate network request
      await Future.delayed(Duration(seconds: 2));
      
      logger.info('API call completed: $url', ['network']);
      return 'Response from $url';
      
    } finally {
      _semaphore.release();  // Always release the semaphore
    }
  }
}

Download Manager Example

class DownloadManager {
  // Allow up to 3 simultaneous downloads
  final Semaphore _downloadSemaphore = Semaphore();
  final List<String> _activeDownloads = [];
  final Mutex<List<String>> _downloadsMutex = Mutex([]);
  
  Future<void> downloadFile(String url, String fileName) async {
    // Wait for a download slot
    await _downloadSemaphore.lock();
    
    try {
      // Add to active downloads list (thread-safe)
      await _downloadsMutex.use((downloads) async {
        downloads.add(fileName);
        return downloads;
      });
      
      logger.info('Starting download: $fileName', ['downloads']);
      events.emit<String>('download/started', fileName);
      
      // Simulate file download
      await Future.delayed(Duration(seconds: 5));
      
      logger.info('Download completed: $fileName', ['downloads']);
      events.emit<String>('download/completed', fileName);
      
    } catch (error) {
      logger.error('Download failed: $fileName - $error', ['downloads', 'errors']);
      events.emit<Map<String, dynamic>>('download/failed', {
        'fileName': fileName,
        'error': error.toString(),
      });
    } finally {
      // Remove from active downloads and release semaphore
      await _downloadsMutex.use((downloads) async {
        downloads.remove(fileName);
        return downloads;
      });
      
      _semaphore.release();
    }
  }
  
  Future<List<String>> getActiveDownloads() async {
    return await _downloadsMutex.get();
  }
  
  Future<void> downloadMultipleFiles(List<String> urls) async {
    // Start all downloads - semaphore will limit concurrent downloads
    final futures = urls.map((url) {
      final fileName = url.split('/').last;
      return downloadFile(url, fileName);
    });
    
    await Future.wait(futures);
    logger.info('All downloads completed', ['downloads']);
  }
}

Thread-Safe Modules

Here’s how to make your modules thread-safe:

class ThreadSafeDataModule extends Module with Loggable {
  ThreadSafeDataModule() : super(name: 'data');
  
  @override
  List<String> get loggerTags => ['data'];
  
  final Mutex<Map<String, dynamic>> _cache = Mutex({});
  final Semaphore _apiSemaphore = Semaphore();  // Limit API calls
  
  @override
  Future<void> onInit() async {
    info('Data module initializing with thread safety');
    
    // Listen for data requests from other modules
    events.on<String>('data/request', (context) async {
      final dataKey = context.data;
      await _handleDataRequest(dataKey);
    });
    
    // Listen for cache clear requests
    events.on<void>('data/clear_cache', (context) async {
      await _clearCache();
    });
  }
  
  Future<void> _handleDataRequest(String key) async {
    try {
      final data = await getData(key);
      events.emit<Map<String, dynamic>>('data/response', {
        'key': key,
        'data': data,
        'timestamp': DateTime.now().toIso8601String(),
      });
    } catch (error) {
      events.emit<Map<String, dynamic>>('data/error', {
        'key': key,
        'error': error.toString(),
      });
    }
  }
  
  Future<dynamic> getData(String key) async {
    // First check cache (thread-safe)
    final cachedData = await _cache.use((cache) async {
      return cache[key];
    });
    
    if (cachedData != null) {
      debug('Cache hit for key: $key');
      return cachedData;
    }
    
    // Not in cache, fetch from API (with concurrency limit)
    await _apiSemaphore.lock();
    
    try {
      debug('Fetching data from API for key: $key');
      
      // Simulate API call
      await Future.delayed(Duration(milliseconds: 500));
      final data = {'key': key, 'value': 'data_for_$key', 'fetched_at': DateTime.now().toIso8601String()};
      
      // Store in cache (thread-safe)
      await _cache.use((cache) async {
        cache[key] = data;
        return cache;
      });
      
      info('Data fetched and cached for key: $key');
      return data;
      
    } finally {
      _apiSemaphore.release();
    }
  }
  
  Future<void> _clearCache() async {
    await _cache.use((cache) async {
      final keysCleared = cache.length;
      cache.clear();
      info('Cache cleared, removed $keysCleared items');
      return cache;
    });
    
    events.emit<int>('data/cache_cleared', await _getCacheSize());
  }
  
  Future<int> _getCacheSize() async {
    final cache = await _cache.get();
    return cache.length;
  }
  
  Future<void> preloadData(List<String> keys) async {
    info('Preloading ${keys.length} data items');
    
    // Load multiple items concurrently (semaphore will limit concurrency)
    final futures = keys.map((key) => getData(key));
    await Future.wait(futures);
    
    info('Preloading completed');
  }
}

Best Practices

1. Always Use try-finally with Semaphores

// ✅ Good - Always release semaphore
Future<void> safeOperation() async {
  await semaphore.lock();
  
  try {
    // Your operation here
    await doSomething();
  } finally {
    semaphore.release();  // Always release, even if operation fails
  }
}

// ❌ Bad - Semaphore might not be released
Future<void> unsafeOperation() async {
  await semaphore.lock();
  await doSomething();  // If this throws, semaphore is never released!
  semaphore.release();
}

2. Keep Critical Sections Short

// ✅ Good - Short critical section
Future<void> quickUpdate() async {
  final newData = await prepareData();  // Do expensive work outside mutex
  
  await _mutex.use((data) async {
    data['updated'] = newData;  // Quick update inside mutex
    return data;
  });
}

// ❌ Bad - Long critical section
Future<void> slowUpdate() async {
  await _mutex.use((data) async {
    final newData = await expensiveApiCall();  // This blocks other operations!
    data['updated'] = newData;
    return data;
  });
}

3. Avoid Nested Locks

// ✅ Good - Single lock
Future<void> safeMethod() async {
  await _mutex.use((data) async {
    // Do all work here
    data['field1'] = 'value1';
    data['field2'] = 'value2';
    return data;
  });
}

// ❌ Bad - Nested locks can cause deadlocks
Future<void> dangerousMethod() async {
  await _mutex1.use((data1) async {
    await _mutex2.use((data2) async {  // Dangerous nesting!
      // Complex operation
      return data2;
    });
    return data1;
  });
}

4. Use Meaningful Names

// ✅ Good - Clear purpose
final Mutex<UserProfile> _userProfileMutex = Mutex(UserProfile.empty());
final Semaphore _databaseConnectionSemaphore = Semaphore();

// ❌ Bad - Unclear purpose
final Mutex<dynamic> _mutex1 = Mutex(null);
final Semaphore _sem = Semaphore();

Testing Thread Safety

Test your thread-safe code:

void main() {
  group('Thread Safety Tests', () {
    test('should handle concurrent increments correctly', () async {
      final counter = SafeCounter();
      
      // Start 100 concurrent increment operations
      final futures = List.generate(100, (_) => counter.increment());
      
      // Wait for all to complete
      await Future.wait(futures);
      
      // Should be exactly 100, no lost increments
      final finalValue = await counter.value;
      expect(finalValue, equals(100));
    });
    
    test('should handle concurrent cart operations', () async {
      final cart = ShoppingCart();
      final product = Product(id: '1', name: 'Test Product', price: 10.0);
      
      // Add same product from multiple operations simultaneously
      final futures = List.generate(10, (_) => cart.addItem(product, 1));
      await Future.wait(futures);
      
      // Should have 10 items total
      final itemCount = await cart.getItemCount();
      expect(itemCount, equals(10));
      
      final totalPrice = await cart.getTotalPrice();
      expect(totalPrice, equals(100.0));  // 10 items * $10 each
    });
  });
}

Debugging Thread Safety Issues

Add Logging to Critical Sections

class DebugMutex<T> {
  final Mutex<T> _mutex;
  final String _name;
  
  DebugMutex(T initialValue, this._name) : _mutex = Mutex(initialValue);
  
  Future<R> use<R>(Future<T> Function(T) operation) async {
    logger.debug('$_name: Waiting for lock', ['thread_safety']);
    
    final result = await _mutex.use((value) async {
      logger.debug('$_name: Lock acquired', ['thread_safety']);
      final newValue = await operation(value);
      logger.debug('$_name: Operation completed', ['thread_safety']);
      return newValue;
    });
    
    logger.debug('$_name: Lock released', ['thread_safety']);
    return result;
  }
}

Monitor Semaphore Usage

class MonitoredSemaphore {
  final Semaphore _semaphore = Semaphore();
  final String _name;
  int _waitingCount = 0;
  int _activeCount = 0;
  
  MonitoredSemaphore(this._name);
  
  Future<void> lock() async {
    _waitingCount++;
    logger.debug('$_name: $_waitingCount operations waiting', ['thread_safety']);
    
    await _semaphore.lock();
    
    _waitingCount--;
    _activeCount++;
    logger.debug('$_name: $_activeCount operations active', ['thread_safety']);
  }
  
  void release() {
    _activeCount--;
    _semaphore.release();
    logger.debug('$_name: $_activeCount operations active', ['thread_safety']);
  }
}

Thread safety might seem complex, but it’s essential for building reliable apps. Start with simple Mutex usage for protecting shared data, then gradually add Semaphores when you need to limit concurrent operations. Remember: it’s better to be overly cautious with thread safety than to debug mysterious race conditions later!