"""
Professional disk-backed list implementation for large-scale data processing.
PagedList provides a high-performance, memory-efficient alternative to Python lists
for handling datasets that exceed available system memory. The implementation
automatically manages data persistence through intelligent chunking and disk-backed
storage, maintaining list-like interface semantics while providing enterprise-grade
scalability for data processing workflows.
Features:
- Transparent disk-backed storage with automatic memory management
- Configurable chunking strategies for optimal performance
- Parallel processing capabilities for data transformations
- Enterprise-ready error handling and resource cleanup
- Type-safe operations with comprehensive validation
"""
import atexit
import concurrent.futures
import json
import logging
import os
import pickle
import uuid
import warnings
from typing import (
Any,
Callable,
Dict,
Iterable,
Iterator,
List,
Literal,
Optional,
Union,
)
[docs]
class PagedList:
"""A disk-backed list-like object that stores most of its data on disk.
When the list gets too large, data is chunked into `.pkl` files in the
`data/` directory. When retrieving slices, only relevant chunks are loaded
into memory.
"""
def __init__(
self,
chunk_size: int = 50_000,
disk_path: str = "data",
auto_cleanup: bool = True,
):
"""Initialize a PagedList with configurable parameters.
Args:
chunk_size: Maximum items per chunk before flushing to disk
disk_path: Directory to store chunk files
auto_cleanup: Whether to automatically cleanup chunks on exit
"""
if chunk_size <= 0:
raise ValueError("chunk_size must be positive")
self.chunk_size = chunk_size # Max items per chunk
self.disk_path = disk_path # Directory to store chunk files
self.chunk_count = 0 # Number of chunk files
self._in_memory_list: List[
Dict[str, Any]
] = [] # Temporary storage before flushing to disk
self._file_identifier = (
uuid.uuid4().hex
) # Random hex identifier for file names so they don't overwrite
self._auto_cleanup = auto_cleanup
# Ensure the storage directory exists
os.makedirs(self.disk_path, exist_ok=True)
if auto_cleanup:
atexit.register(self.cleanup_chunks) # Register cleanup on exit
@property
def is_empty(self) -> bool:
"""Return True if the list is empty."""
return len(self) == 0
@property
def total_chunks(self) -> int:
"""Return the total number of chunks."""
return self.chunk_count
@property
def in_memory_count(self) -> int:
"""Return the number of items currently in memory."""
return len(self._in_memory_list)
def __del__(self) -> None:
"""Destructor to ensure cleanup of chunk files."""
try:
if hasattr(self, "chunk_count") and hasattr(self, "disk_path"):
self.cleanup_chunks()
except AttributeError:
pass # Object may be partially initialized
[docs]
def append(self, data: Dict[str, Any]) -> None:
"""Appends an item to the list, flushing to disk if needed."""
if not isinstance(data, dict):
raise TypeError(f"Expected dict, got {type(data).__name__}")
self._in_memory_list.append(data)
# When memory limit is hit, save to disk
if len(self._in_memory_list) >= self.chunk_size:
self._save_chunk_to_file()
self._in_memory_list = [] # Reset in-memory list
[docs]
def extend(self, iterable: Iterable[Dict[str, Any]]) -> None:
"""Extends the list by adding multiple items at once."""
for item in iterable:
self.append(item) # Leverage existing append logic with type checking
def _save_chunk_to_file(self) -> None:
"""Saves the current in-memory list to a chunk file on disk."""
chunk_file = os.path.join(
self.disk_path, f"chunk_{self._file_identifier}_{self.chunk_count}.pkl"
)
with open(chunk_file, "wb") as f:
pickle.dump(self._in_memory_list, f)
logging.debug(f"Saved chunk {self.chunk_count} to {chunk_file}")
self.chunk_count += 1
def _load_chunk(self, chunk_index: int) -> List[Dict[str, Any]]:
"""Loads a specific chunk from disk."""
chunk_file = os.path.join(
self.disk_path, f"chunk_{self._file_identifier}_{chunk_index}.pkl"
)
if os.path.exists(chunk_file):
with open(chunk_file, "rb") as f:
loaded_data: List[Dict[str, Any]] = pickle.load(f)
return loaded_data
return [] # Return empty if file is missing
def __iter__(self) -> Iterator[Dict[str, Any]]:
"""Iterate over all items in the list."""
# Warn if multiple chunks exist (indicates large dataset being loaded)
if self.chunk_count > 0:
warnings.warn(
"Iterating over PagedList with disk-backed chunks loads all "
"data into memory. This may be slow and memory-intensive for "
"large lists.",
UserWarning,
stacklevel=2,
)
for i in range(len(self)):
yield self[i] # type: ignore
def __contains__(self, item: Dict[str, Any]) -> bool:
"""Check if an item is in the list."""
return any(record == item for record in self)
def __getitem__(
self, index: Union[int, slice]
) -> Union[Dict[str, Any], List[Dict[str, Any]]]:
"""Retrieve an item or slice from the disk-backed list."""
if isinstance(index, int): # Single index lookup
# Handle negative indices
if index < 0:
index = len(self) + index
# Check if index is in memory
disk_items = self.chunk_count * self.chunk_size
if index >= disk_items:
# Item is in memory
memory_index = index - disk_items
if memory_index < len(self._in_memory_list):
return self._in_memory_list[memory_index]
else:
raise IndexError("Index out of range")
else:
# Item is on disk
chunk_index, inner_index = divmod(index, self.chunk_size)
chunk_data = self._load_chunk(chunk_index)
if inner_index < len(chunk_data):
return chunk_data[inner_index]
else:
raise IndexError("Index out of range")
elif isinstance(index, slice): # Slice lookup
start, stop, step = index.indices(len(self))
results: List[Dict[str, Any]] = []
for idx in range(start, stop, step if step else 1):
item = self[idx]
if isinstance(item, dict):
results.append(item)
else:
# This shouldn't happen in normal usage, but handle it
results.extend(item)
return results
raise TypeError("Invalid index type. Must be int or slice.")
def __setitem__(
self,
index: Union[int, slice],
value: Union[Dict[str, Any], List[Dict[str, Any]]],
) -> None:
"""Updates a value or replaces a slice in the disk-backed list."""
if isinstance(index, int): # Single index update
# Handle negative indices
if index < 0:
index = len(self) + index
# Check if index is in memory
disk_items = self.chunk_count * self.chunk_size
if index >= disk_items:
# Item is in memory
memory_index = index - disk_items
if memory_index < len(self._in_memory_list):
if isinstance(value, list):
raise TypeError("Cannot assign a list to a single index")
self._in_memory_list[memory_index] = value
else:
raise IndexError("Index out of range")
else:
# Item is on disk
chunk_index, inner_index = divmod(index, self.chunk_size)
chunk_file = os.path.join(
self.disk_path, f"chunk_{self._file_identifier}_{chunk_index}.pkl"
)
chunk_data = self._load_chunk(chunk_index)
if inner_index >= len(chunk_data):
raise IndexError("Index out of range.")
if isinstance(value, list):
raise TypeError("Cannot assign a list to a single index")
chunk_data[inner_index] = value # Replace value
# Save the modified chunk
with open(chunk_file, "wb") as f:
pickle.dump(chunk_data, f)
logging.info(f"Updated index {index} in {chunk_file}")
elif isinstance(index, slice): # Slice update
start, stop, step = index.indices(len(self))
if step != 1:
raise ValueError("Step slicing is not supported.")
if not isinstance(value, list):
raise TypeError("Value must be a list when updating a slice.")
if len(value) != (stop - start): # Enforce exact replacement
raise ValueError(
"Replacement list must have the same length as the slice."
)
# Replace values item-by-item
for i, val in enumerate(value):
self.__setitem__(start + i, val) # Recursive update for each index
else:
raise TypeError("Invalid index type. Must be int or slice.")
def __delitem__(self, index: Union[int, slice]) -> None:
"""Delete an item or slice from the list."""
if isinstance(index, int):
if index < 0:
index = len(self) + index
if index < 0 or index >= len(self):
raise IndexError("list index out of range")
# For simplicity, convert to list, delete, then rebuild
all_items = list(self)
del all_items[index]
# Rebuild the list
self.clear()
for item in all_items:
self.append(item)
elif isinstance(index, slice):
# Convert to list, delete slice, then rebuild
all_items = list(self)
del all_items[index]
# Rebuild the list
self.clear()
for item in all_items:
self.append(item)
else:
raise TypeError("Invalid index type. Must be int or slice.")
[docs]
def combine_chunks(self) -> List[Dict[str, Any]]:
"""Loads and combines all chunks into a single list (if needed)."""
combined_data = []
for chunk_index in range(self.chunk_count):
combined_data.extend(self._load_chunk(chunk_index))
combined_data.extend(self._in_memory_list) # Include in-memory items
logging.info(f"Combined {self.chunk_count} chunks into a single list.")
return combined_data
[docs]
def cleanup_chunks(self) -> None:
"""Deletes all stored chunk files from disk."""
for chunk_index in range(self.chunk_count):
chunk_file = os.path.join(
self.disk_path, f"chunk_{self._file_identifier}_{chunk_index}.pkl"
)
try:
if os.path.exists(chunk_file):
os.remove(chunk_file)
logging.debug(f"Deleted {chunk_file}")
except OSError as e:
# Only log as debug instead of error for temporary directory cleanup
logging.debug(f"Could not delete {chunk_file}: {e}")
self.chunk_count = 0 # Reset chunk tracking
[docs]
def serialize(self, max_workers: Optional[int] = None) -> None:
"""Serializes the list of dictionaries by converting certain types to
JSON strings and updates the underlying files using multiple threads.
"""
def process_chunk(chunk_index: int) -> None:
chunk_data = self._load_chunk(chunk_index)
for record in chunk_data:
for key, value in record.items():
if isinstance(value, (bool, dict, list)):
record[key] = json.dumps(value)
# Save the modified chunk back to disk
chunk_file = os.path.join(
self.disk_path, f"chunk_{self._file_identifier}_{chunk_index}.pkl"
)
with open(chunk_file, "wb") as f:
pickle.dump(chunk_data, f)
logging.info(f"Serialized and updated chunk {chunk_index} in {chunk_file}")
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.map(process_chunk, range(self.chunk_count))
# Serialize remaining in-memory items
for record in self._in_memory_list:
for key, value in record.items():
if isinstance(value, (bool, dict, list)):
record[key] = json.dumps(value)
[docs]
def map(
self,
func: Callable[[Dict[str, Any]], Dict[str, Any]],
max_workers: Optional[int] = None,
) -> None:
"""Processes records in chunks using the provided function with multiple
threads.
Args:
func (callable): Function to apply to each record in the chunk.
max_workers (int, optional): The maximum number of threads to use.
Defaults to None, which means the number of threads will be
determined by the system.
Returns:
None (Modifies the records in-place)
"""
def process_chunk(chunk_index: int) -> None:
chunk_data = self._load_chunk(chunk_index)
transformed_chunk = list(map(func, chunk_data))
# Save the modified chunk back to disk
chunk_file = os.path.join(
self.disk_path, f"chunk_{self._file_identifier}_{chunk_index}.pkl"
)
with open(chunk_file, "wb") as f:
pickle.dump(transformed_chunk, f)
logging.info(
f"Applied function to chunk {chunk_index} and updated {chunk_file}"
)
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.map(process_chunk, range(self.chunk_count))
# Apply function to remaining in-memory items
self._in_memory_list = list(map(func, self._in_memory_list))
def __len__(self) -> int:
"""Returns the total number of items in the disk-backed list."""
return self.chunk_count * self.chunk_size + len(self._in_memory_list)
def __repr__(self) -> str:
"""Return a string representation of the PagedList."""
return f"<PagedList: {len(self)} items across {self.chunk_count} chunks>"
def __str__(self) -> str:
"""Return a user-friendly string representation."""
if self.is_empty:
return "PagedList(empty)"
return f"PagedList({len(self)} items)"
def __eq__(self, other: object) -> bool:
"""Check equality with another PagedList or list."""
if isinstance(other, PagedList):
if len(self) != len(other):
return False
return all(a == b for a, b in zip(self, other))
elif isinstance(other, list):
return list(self) == other
return False
def __ne__(self, other: object) -> bool:
"""Check inequality."""
return not self.__eq__(other)
def __enter__(self) -> "PagedList":
"""Context manager entry."""
return self
def __exit__(
self,
exc_type: Optional[type],
exc_val: Optional[BaseException],
exc_tb: Optional[object],
) -> Literal[False]:
"""Context manager exit with automatic cleanup."""
self.cleanup_chunks()
return False
[docs]
def clear(self) -> None:
"""Remove all items from the list."""
self.cleanup_chunks()
self._in_memory_list.clear()
[docs]
def insert(self, index: int, value: Dict[str, Any]) -> None:
"""Insert an item at a specific position."""
if index < 0:
index = max(0, len(self) + index)
elif index > len(self):
index = len(self)
# For simplicity, convert to list, insert, then rebuild
# This is not the most efficient for large lists, but it's correct
all_items = list(self)
all_items.insert(index, value)
# Rebuild the list
self.clear()
for item in all_items:
self.append(item)
[docs]
def remove(self, value: Dict[str, Any]) -> None:
"""Remove first occurrence of value."""
for i, item in enumerate(self):
if item == value:
del self[i]
return
raise ValueError(f"{value} not in list")
[docs]
def pop(self, index: int = -1) -> Dict[str, Any]:
"""Remove and return item at index (default last)."""
if len(self) == 0:
raise IndexError("pop from empty list")
if index < 0:
index = len(self) + index
if index < 0 or index >= len(self):
raise IndexError("pop index out of range")
value = self[index]
if isinstance(value, list):
raise TypeError("Cannot pop a slice")
del self[index]
return value
[docs]
def index(
self, value: Dict[str, Any], start: int = 0, stop: Optional[int] = None
) -> int:
"""Return index of first occurrence of value."""
if stop is None:
stop = len(self)
for i in range(start, min(stop, len(self))):
if self[i] == value:
return i
raise ValueError(f"{value} not in list")
[docs]
def count(self, value: Dict[str, Any]) -> int:
"""Return number of occurrences of value."""
return sum(1 for item in self if item == value)
[docs]
def copy(self) -> "PagedList":
"""Return a shallow copy of the list."""
new_list = PagedList(chunk_size=self.chunk_size, disk_path=self.disk_path)
new_list.extend(self)
return new_list
[docs]
def sort(
self,
*,
key: Optional[Callable[[Dict[str, Any]], Any]] = None,
reverse: bool = False,
) -> None:
"""Sort the list in place.
Warning: This operation loads all data into memory and may be slow for large lists.
Note: A key function is required when sorting dictionaries.
"""
if self.chunk_count > 0:
warnings.warn(
"Sorting a PagedList with multiple chunks loads all data "
"into memory. This may be slow and memory-intensive for "
"large lists.",
UserWarning,
stacklevel=2,
)
if key is None:
raise TypeError(
"sort() missing required argument: 'key' (dictionaries "
"require a key function)"
)
# Load all data, sort it, then rebuild the list
all_items = list(self)
all_items.sort(key=key, reverse=reverse)
# Rebuild the list
self.clear()
for item in all_items:
self.append(item)
[docs]
def reverse(self) -> None:
"""Reverse the list in place.
Warning: This operation loads all data into memory and may be slow for large lists.
"""
if self.chunk_count > 0:
warnings.warn(
"Reversing a PagedList with multiple chunks loads all data "
"into memory. This may be slow and memory-intensive for "
"large lists.",
UserWarning,
stacklevel=2,
)
# Load all data, reverse it, then rebuild the list
all_items = list(self)
all_items.reverse()
# Rebuild the list
self.clear()
for item in all_items:
self.append(item)
def __add__(self, other: Union[List[Dict[str, Any]], "PagedList"]) -> "PagedList":
"""Concatenate with another list or PagedList."""
if self.chunk_count > 0:
warnings.warn(
"Adding lists with a multi-chunk PagedList loads all data into memory. "
"Consider using extend() for better memory efficiency.",
UserWarning,
stacklevel=2,
)
new_list = self.copy()
if isinstance(other, PagedList):
new_list.extend(other)
elif isinstance(other, list):
new_list.extend(other)
else:
raise TypeError(f"Cannot concatenate PagedList with {type(other)}")
return new_list
def __mul__(self, other: int) -> "PagedList":
"""Repeat the list."""
if not isinstance(other, int):
raise TypeError(f"Cannot multiply PagedList by {type(other)}")
if other < 0:
other = 0
if self.chunk_count > 0 and other > 1:
warnings.warn(
"Multiplying a multi-chunk PagedList loads all data into memory. "
"This may be slow and memory-intensive for large lists.",
UserWarning,
stacklevel=2,
)
new_list = PagedList(chunk_size=self.chunk_size, disk_path=self.disk_path)
for _ in range(other):
new_list.extend(self)
return new_list
def __rmul__(self, other: int) -> "PagedList":
"""Repeat the list (reverse multiplication)."""
return self.__mul__(other)
def __reversed__(self) -> Iterator[Dict[str, Any]]:
"""Return a reverse iterator."""
if self.chunk_count > 0:
warnings.warn(
"Creating a reverse iterator for a multi-chunk PagedList "
"loads all data into memory. This may be slow and "
"memory-intensive for large lists.",
UserWarning,
stacklevel=2,
)
# For efficiency, we'll load all data and reverse it
all_items = list(self)
return reversed(all_items)
def __list__(self) -> List[Dict[str, Any]]:
"""Convert to a regular list with warning if chunked."""
if self.chunk_count > 0:
warnings.warn(
"Converting a multi-chunk PagedList to list loads all data "
"into memory and loses the disk-backed storage benefits. "
"Consider iterating directly over the PagedList instead.",
UserWarning,
stacklevel=2,
)
return self.combine_chunks()
def __tuple__(self) -> tuple:
"""Convert to tuple with warning."""
if self.chunk_count > 0:
warnings.warn(
"Converting a multi-chunk PagedList to tuple loads all data "
"into memory and loses the disk-backed storage benefits.",
UserWarning,
stacklevel=2,
)
return tuple(self.combine_chunks())