Write a Python function that processes CSV files and filters rows based on a date range.
import csv
from datetime import datetime, date
from typing import Iterator
from pathlib import Path
def filter_csv_by_date(
file_path: str | Path,
date_column: str,
start_date: date,
end_date: date,
date_format: str = "%Y-%m-%d",
) -> Iterator[dict[str, str]]:
"""
Filter rows from a CSV file based on a date range (inclusive).
Args:
file_path: Path to the CSV file
date_column: Name of the column containing date values
start_date: Start of the date range (inclusive)
end_date: End of the date range (inclusive)
date_format: strptime format string for parsing dates
Yields:
Matching rows as dictionaries
"""
with open(file_path, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
if date_column not in (reader.fieldnames or []):
raise ValueError(
f"Column '{date_column}' not found. "
f"Available columns: {reader.fieldnames}"
)
for row in reader:
try:
row_date = datetime.strptime(
row[date_column].strip(), date_format
).date()
if start_date <= row_date <= end_date:
yield row
except ValueError:
# Skip rows with unparseable dates
continue
# Usage example
if __name__ == "__main__":
from datetime import date
results = list(
filter_csv_by_date(
file_path="sales_data.csv",
date_column="transaction_date",
start_date=date(2024, 1, 1),
end_date=date(2024, 3, 31),
)
)
print(f"Found {len(results)} matching rows")
rows = list(filter_csv_by_date(...))
import csv
matches = filter_csv_by_date("input.csv", "date", start, end)
with open("output.csv", "w", newline="") as f:
writer = None
for row in matches:
if writer is None:
writer = csv.DictWriter(f, fieldnames=row.keys())
writer.writeheader()
writer.writerow(row)
Can you add error handling for when the file doesn't exist?
import csv
from datetime import datetime, date
from typing import Iterator
from pathlib import Path
class CSVProcessingError(Exception):
"""Base exception for CSV processing errors."""
pass
def filter_csv_by_date(
file_path: str | Path,
date_column: str,
start_date: date,
end_date: date,
date_format: str = "%Y-%m-%d",
skip_invalid_dates: bool = True,
) -> Iterator[dict[str, str]]:
"""
Filter rows from a CSV file based on a date range (inclusive).
Raises CSVProcessingError with descriptive messages on failure.
"""
path = Path(file_path)
# File existence check
if not path.exists():
raise FileNotFoundError(
f"CSV file not found: '{path.resolve()}'
"
f"Check the file path and try again."
)
# File type check
if path.suffix.lower() != ".csv":
raise CSVProcessingError(
f"Expected a .csv file, got '{path.suffix}'"
)
# Read permission check
if not path.is_file():
raise CSVProcessingError(
f"'{path}' is not a file (it may be a directory)"
)
try:
with open(path, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
if not reader.fieldnames:
raise CSVProcessingError("CSV file appears to be empty or has no header row.")
if date_column not in reader.fieldnames:
raise CSVProcessingError(
f"Column '{date_column}' not found.\n"
f"Available columns: {', '.join(reader.fieldnames)}"
)
for line_num, row in enumerate(reader, start=2):
raw_value = row.get(date_column, "").strip()
try:
row_date = datetime.strptime(raw_value, date_format).date()
if start_date <= row_date <= end_date:
yield row
except ValueError:
if not skip_invalid_dates:
raise CSVProcessingError(
f"Line {line_num}: Cannot parse date '{raw_value}' "
f"with format '{date_format}'"
)
except UnicodeDecodeError:
raise CSVProcessingError(
f"Cannot read '{path}' as UTF-8. "
f"Try re-encoding the file or specify a different encoding."
)
except PermissionError:
raise CSVProcessingError(
f"Permission denied reading '{path}'. "
f"Check that you have read access to this file."
)
try:
results = list(filter_csv_by_date("data.csv", "date", start, end))
except FileNotFoundError as e:
print(f"File error: {e}")
except CSVProcessingError as e:
print(f"CSV error: {e}")
lesanje.ai may produce inaccurate information. Use Shift+Enter for new line.