Thread Safety in Mosaic
Thread safety is about making sure your app works correctly when multiple things are happening at the same time. Think of it like organizing a kitchen where multiple chefs are cooking - you need rules to prevent chaos.
Why Thread Safety Matters
Modern apps do many things simultaneously:
- Loading data from the internet
- Updating the user interface
- Processing user input
- Saving data to storage
Without proper coordination, these operations can interfere with each other and cause bugs like:
- Data getting corrupted
- App crashes
- Inconsistent state
- Race conditions (two operations fighting over the same resource)
Understanding the Problem
Here’s an example of what can go wrong:
// ❌ DANGEROUS - Not thread safe
class Counter {
int _value = 0;
void increment() {
_value = _value + 1; // This is NOT atomic!
}
int get value => _value;
}
// If two operations call increment() at the same time:
// Operation A reads _value (0)
// Operation B reads _value (0) <- Still 0!
// Operation A sets _value to 1
// Operation B sets _value to 1 <- Should be 2!
// Result: We lost one increment!
Mutex - The Digital Lock
A Mutex (Mutual Exclusion) is like a lock on a bathroom door. Only one person can use the bathroom at a time. In programming, only one operation can access the protected data at a time.
Basic Mutex Usage
import 'package:mosaic/mosaic.dart';
// ✅ SAFE - Using Mutex
class SafeCounter {
int _value = 0;
final Mutex<int> _mutex = Mutex(0); // Protect the value with a mutex
Future<void> increment() async {
await _mutex.use((currentValue) async {
// Only one operation can be here at a time
final newValue = currentValue + 1;
return newValue; // This becomes the new protected value
});
}
Future<int> get value async {
return await _mutex.get(); // Safe read
}
}
Real-World Example: User Preferences
class UserPreferences {
final Mutex<Map<String, dynamic>> _preferences = Mutex({});
Future<void> setPreference(String key, dynamic value) async {
await _preferences.use((prefs) async {
prefs[key] = value;
await saveToStorage(prefs); // Save to device storage
return prefs;
});
logger.info('Preference updated: $key = $value', ['user', 'preferences']);
}
Future<T?> getPreference<T>(String key) async {
final prefs = await _preferences.get();
return prefs[key] as T?;
}
Future<void> updateTheme(String themeName) async {
await setPreference('theme', themeName);
// Notify other modules about the theme change
events.emit<String>('preferences/theme_changed', themeName);
}
Future<void> saveToStorage(Map<String, dynamic> prefs) async {
// Simulate saving to device storage
await Future.delayed(Duration(milliseconds: 100));
}
}
Shopping Cart Example
class ShoppingCart {
final Mutex<List<CartItem>> _items = Mutex([]);
Future<void> addItem(Product product, int quantity) async {
await _items.use((items) async {
// Check if item already exists
final existingIndex = items.indexWhere((item) => item.productId == product.id);
if (existingIndex >= 0) {
// Update existing item
items[existingIndex] = items[existingIndex].copyWith(
quantity: items[existingIndex].quantity + quantity,
);
} else {
// Add new item
items.add(CartItem(
productId: product.id,
productName: product.name,
price: product.price,
quantity: quantity,
));
}
return items;
});
logger.info('Added $quantity x ${product.name} to cart', ['cart', 'user']);
events.emit<Map<String, dynamic>>('cart/item_added', {
'product_id': product.id,
'quantity': quantity,
});
}
Future<void> removeItem(String productId) async {
await _items.use((items) async {
items.removeWhere((item) => item.productId == productId);
return items;
});
logger.info('Removed item from cart: $productId', ['cart', 'user']);
events.emit<String>('cart/item_removed', productId);
}
Future<double> getTotalPrice() async {
final items = await _items.get();
return items.fold(0.0, (total, item) => total + (item.price * item.quantity));
}
Future<int> getItemCount() async {
final items = await _items.get();
return items.fold(0, (total, item) => total + item.quantity);
}
}
class CartItem {
final String productId;
final String productName;
final double price;
final int quantity;
CartItem({
required this.productId,
required this.productName,
required this.price,
required this.quantity,
});
CartItem copyWith({int? quantity}) {
return CartItem(
productId: productId,
productName: productName,
price: price,
quantity: quantity ?? this.quantity,
);
}
}
Semaphore - Managing Limited Resources
A Semaphore is like a parking lot with a limited number of spaces. It allows a specific number of operations to happen simultaneously, but no more.
Basic Semaphore Usage
class NetworkManager {
final Semaphore _semaphore = Semaphore(); // Default allows 1 operation at a time
Future<String> makeApiCall(String url) async {
await _semaphore.lock(); // Wait for permission to proceed
try {
logger.info('Making API call to: $url', ['network']);
// Simulate network request
await Future.delayed(Duration(seconds: 2));
logger.info('API call completed: $url', ['network']);
return 'Response from $url';
} finally {
_semaphore.release(); // Always release the semaphore
}
}
}
Download Manager Example
class DownloadManager {
// Allow up to 3 simultaneous downloads
final Semaphore _downloadSemaphore = Semaphore();
final List<String> _activeDownloads = [];
final Mutex<List<String>> _downloadsMutex = Mutex([]);
Future<void> downloadFile(String url, String fileName) async {
// Wait for a download slot
await _downloadSemaphore.lock();
try {
// Add to active downloads list (thread-safe)
await _downloadsMutex.use((downloads) async {
downloads.add(fileName);
return downloads;
});
logger.info('Starting download: $fileName', ['downloads']);
events.emit<String>('download/started', fileName);
// Simulate file download
await Future.delayed(Duration(seconds: 5));
logger.info('Download completed: $fileName', ['downloads']);
events.emit<String>('download/completed', fileName);
} catch (error) {
logger.error('Download failed: $fileName - $error', ['downloads', 'errors']);
events.emit<Map<String, dynamic>>('download/failed', {
'fileName': fileName,
'error': error.toString(),
});
} finally {
// Remove from active downloads and release semaphore
await _downloadsMutex.use((downloads) async {
downloads.remove(fileName);
return downloads;
});
_semaphore.release();
}
}
Future<List<String>> getActiveDownloads() async {
return await _downloadsMutex.get();
}
Future<void> downloadMultipleFiles(List<String> urls) async {
// Start all downloads - semaphore will limit concurrent downloads
final futures = urls.map((url) {
final fileName = url.split('/').last;
return downloadFile(url, fileName);
});
await Future.wait(futures);
logger.info('All downloads completed', ['downloads']);
}
}
Thread-Safe Modules
Here’s how to make your modules thread-safe:
class ThreadSafeDataModule extends Module with Loggable {
ThreadSafeDataModule() : super(name: 'data');
@override
List<String> get loggerTags => ['data'];
final Mutex<Map<String, dynamic>> _cache = Mutex({});
final Semaphore _apiSemaphore = Semaphore(); // Limit API calls
@override
Future<void> onInit() async {
info('Data module initializing with thread safety');
// Listen for data requests from other modules
events.on<String>('data/request', (context) async {
final dataKey = context.data;
await _handleDataRequest(dataKey);
});
// Listen for cache clear requests
events.on<void>('data/clear_cache', (context) async {
await _clearCache();
});
}
Future<void> _handleDataRequest(String key) async {
try {
final data = await getData(key);
events.emit<Map<String, dynamic>>('data/response', {
'key': key,
'data': data,
'timestamp': DateTime.now().toIso8601String(),
});
} catch (error) {
events.emit<Map<String, dynamic>>('data/error', {
'key': key,
'error': error.toString(),
});
}
}
Future<dynamic> getData(String key) async {
// First check cache (thread-safe)
final cachedData = await _cache.use((cache) async {
return cache[key];
});
if (cachedData != null) {
debug('Cache hit for key: $key');
return cachedData;
}
// Not in cache, fetch from API (with concurrency limit)
await _apiSemaphore.lock();
try {
debug('Fetching data from API for key: $key');
// Simulate API call
await Future.delayed(Duration(milliseconds: 500));
final data = {'key': key, 'value': 'data_for_$key', 'fetched_at': DateTime.now().toIso8601String()};
// Store in cache (thread-safe)
await _cache.use((cache) async {
cache[key] = data;
return cache;
});
info('Data fetched and cached for key: $key');
return data;
} finally {
_apiSemaphore.release();
}
}
Future<void> _clearCache() async {
await _cache.use((cache) async {
final keysCleared = cache.length;
cache.clear();
info('Cache cleared, removed $keysCleared items');
return cache;
});
events.emit<int>('data/cache_cleared', await _getCacheSize());
}
Future<int> _getCacheSize() async {
final cache = await _cache.get();
return cache.length;
}
Future<void> preloadData(List<String> keys) async {
info('Preloading ${keys.length} data items');
// Load multiple items concurrently (semaphore will limit concurrency)
final futures = keys.map((key) => getData(key));
await Future.wait(futures);
info('Preloading completed');
}
}
Best Practices
1. Always Use try-finally with Semaphores
// ✅ Good - Always release semaphore
Future<void> safeOperation() async {
await semaphore.lock();
try {
// Your operation here
await doSomething();
} finally {
semaphore.release(); // Always release, even if operation fails
}
}
// ❌ Bad - Semaphore might not be released
Future<void> unsafeOperation() async {
await semaphore.lock();
await doSomething(); // If this throws, semaphore is never released!
semaphore.release();
}
2. Keep Critical Sections Short
// ✅ Good - Short critical section
Future<void> quickUpdate() async {
final newData = await prepareData(); // Do expensive work outside mutex
await _mutex.use((data) async {
data['updated'] = newData; // Quick update inside mutex
return data;
});
}
// ❌ Bad - Long critical section
Future<void> slowUpdate() async {
await _mutex.use((data) async {
final newData = await expensiveApiCall(); // This blocks other operations!
data['updated'] = newData;
return data;
});
}
3. Avoid Nested Locks
// ✅ Good - Single lock
Future<void> safeMethod() async {
await _mutex.use((data) async {
// Do all work here
data['field1'] = 'value1';
data['field2'] = 'value2';
return data;
});
}
// ❌ Bad - Nested locks can cause deadlocks
Future<void> dangerousMethod() async {
await _mutex1.use((data1) async {
await _mutex2.use((data2) async { // Dangerous nesting!
// Complex operation
return data2;
});
return data1;
});
}
4. Use Meaningful Names
// ✅ Good - Clear purpose
final Mutex<UserProfile> _userProfileMutex = Mutex(UserProfile.empty());
final Semaphore _databaseConnectionSemaphore = Semaphore();
// ❌ Bad - Unclear purpose
final Mutex<dynamic> _mutex1 = Mutex(null);
final Semaphore _sem = Semaphore();
Testing Thread Safety
Test your thread-safe code:
void main() {
group('Thread Safety Tests', () {
test('should handle concurrent increments correctly', () async {
final counter = SafeCounter();
// Start 100 concurrent increment operations
final futures = List.generate(100, (_) => counter.increment());
// Wait for all to complete
await Future.wait(futures);
// Should be exactly 100, no lost increments
final finalValue = await counter.value;
expect(finalValue, equals(100));
});
test('should handle concurrent cart operations', () async {
final cart = ShoppingCart();
final product = Product(id: '1', name: 'Test Product', price: 10.0);
// Add same product from multiple operations simultaneously
final futures = List.generate(10, (_) => cart.addItem(product, 1));
await Future.wait(futures);
// Should have 10 items total
final itemCount = await cart.getItemCount();
expect(itemCount, equals(10));
final totalPrice = await cart.getTotalPrice();
expect(totalPrice, equals(100.0)); // 10 items * $10 each
});
});
}
Debugging Thread Safety Issues
Add Logging to Critical Sections
class DebugMutex<T> {
final Mutex<T> _mutex;
final String _name;
DebugMutex(T initialValue, this._name) : _mutex = Mutex(initialValue);
Future<R> use<R>(Future<T> Function(T) operation) async {
logger.debug('$_name: Waiting for lock', ['thread_safety']);
final result = await _mutex.use((value) async {
logger.debug('$_name: Lock acquired', ['thread_safety']);
final newValue = await operation(value);
logger.debug('$_name: Operation completed', ['thread_safety']);
return newValue;
});
logger.debug('$_name: Lock released', ['thread_safety']);
return result;
}
}
Monitor Semaphore Usage
class MonitoredSemaphore {
final Semaphore _semaphore = Semaphore();
final String _name;
int _waitingCount = 0;
int _activeCount = 0;
MonitoredSemaphore(this._name);
Future<void> lock() async {
_waitingCount++;
logger.debug('$_name: $_waitingCount operations waiting', ['thread_safety']);
await _semaphore.lock();
_waitingCount--;
_activeCount++;
logger.debug('$_name: $_activeCount operations active', ['thread_safety']);
}
void release() {
_activeCount--;
_semaphore.release();
logger.debug('$_name: $_activeCount operations active', ['thread_safety']);
}
}
Thread safety might seem complex, but it’s essential for building reliable apps. Start with simple Mutex usage for protecting shared data, then gradually add Semaphores when you need to limit concurrent operations. Remember: it’s better to be overly cautious with thread safety than to debug mysterious race conditions later!