Source code for app.alba_core.matching

"""Property filtering and scoring for Alba Core.

This module owns the most important safety rule in the system:

    Hard filters happen before scoring.

If a property fails a hard requirement, it cannot appear in the top-three list no
matter how many soft preferences it satisfies. That keeps Alba Core honest and
prevents a ranking score from hiding a mismatch.
"""

from __future__ import annotations

import math
from collections import Counter
from dataclasses import dataclass

from app.alba_core.models import MatchResult, PropertyRecord, RentalRequirements


[docs] @dataclass(slots=True) class MatchingOutput: """Collection of successful matches and aggregate rejection reasons.""" matches: list[MatchResult] rejected_reasons: dict[str, int]
[docs] def to_dict(self) -> dict[str, object]: """Return a JSON-friendly matching response.""" return { "matches": [match.to_dict() for match in self.matches], "rejected_reasons": dict(self.rejected_reasons), }
[docs] class PropertyMatcher: """Hard filters first, soft scoring second. The most important rule is that scoring never rescues a listing that failed a hard requirement such as city, budget, or minimum bedrooms. """ def __init__(self, top_n: int = 3) -> None: """Create a matcher that returns at most ``top_n`` results.""" self.top_n = top_n
[docs] def match( self, requirements: RentalRequirements, properties: list[PropertyRecord], ) -> MatchingOutput: """Filter, score, sort, and return the best matching properties.""" rejected = Counter() scored: list[MatchResult] = [] for property_record in properties: hard_reasons = self._hard_filter_reasons(requirements, property_record) if hard_reasons: rejected.update(hard_reasons) continue scored.append(self._score(requirements, property_record)) scored.sort(key=lambda item: item.score, reverse=True) return MatchingOutput(matches=scored[: self.top_n], rejected_reasons=dict(rejected))
def _hard_filter_reasons( self, requirements: RentalRequirements, property_record: PropertyRecord, ) -> list[str]: """Return all hard reasons that make a property ineligible.""" reasons: list[str] = [] if requirements.city and not self._location_matches(requirements, property_record): reasons.append("location_mismatch") if requirements.budget_max is not None: if property_record.rent_pw is None: reasons.append("missing_rent") elif property_record.rent_pw > requirements.budget_max: reasons.append("over_budget") if requirements.bedrooms_min is not None: if property_record.bedrooms is None: reasons.append("missing_bedrooms") elif property_record.bedrooms < requirements.bedrooms_min: reasons.append("not_enough_bedrooms") if requirements.bathrooms_min is not None and requirements.bathrooms_min > 0: if property_record.bathrooms is None: reasons.append("missing_bathrooms") elif property_record.bathrooms < requirements.bathrooms_min: reasons.append("not_enough_bathrooms") if requirements.parking_min is not None and requirements.parking_min > 0: if property_record.parking_spaces is None: reasons.append("missing_parking") elif property_record.parking_spaces < requirements.parking_min: reasons.append("not_enough_parking") if requirements.property_type and property_record.property_type: if self._normalize_type(requirements.property_type) != self._normalize_type(property_record.property_type): reasons.append("property_type_mismatch") return reasons def _score(self, requirements: RentalRequirements, property_record: PropertyRecord) -> MatchResult: """Score one eligible property and keep an explanation breakdown.""" breakdown: dict[str, float] = {} notes: list[str] = [] breakdown["location"] = self._location_score(requirements, property_record, notes) breakdown["budget"] = self._budget_score(requirements, property_record, notes) breakdown["space"] = self._space_score(requirements, property_record, notes) breakdown["property_type"] = self._property_type_score(requirements, property_record, notes) breakdown["lifestyle"] = self._lifestyle_score(requirements, property_record, notes) breakdown["availability"] = self._availability_score(requirements, property_record, notes) priority = requirements.priority or "" if priority in {"location", "space", "price", "lifestyle extras", "move-in timing"}: # Priority does not change eligibility. It only boosts the relevant # score bucket after the property has passed hard filters. boost_key = "budget" if priority == "price" else "lifestyle" if priority == "lifestyle extras" else "availability" if priority == "move-in timing" else priority breakdown[boost_key] = breakdown.get(boost_key, 0.0) * 1.2 notes.append(f"Priority weighting applied for {priority}.") total = sum(breakdown.values()) return MatchResult( property=property_record, score=total, match_notes=notes, score_breakdown=breakdown, ) def _location_matches(self, requirements: RentalRequirements, property_record: PropertyRecord) -> bool: """Return True when city, suburb, or region matches the request.""" requested = (requirements.city or "").casefold() return requested in { (property_record.city or "").casefold(), (property_record.suburb or "").casefold(), (property_record.region or "").casefold(), } def _location_score( self, requirements: RentalRequirements, property_record: PropertyRecord, notes: list[str], ) -> float: """Score exact cache-backed location and suburb preference fit.""" score = 0.0 requested = (requirements.city or "").casefold() if requested and self._location_matches(requirements, property_record): score += 25.0 notes.append("Matches the requested location.") suburb = (property_record.suburb or "").casefold() if suburb and suburb in {item.casefold() for item in requirements.suburb_preferences}: score += 15.0 notes.append("Matches a preferred suburb.") return score def _budget_score( self, requirements: RentalRequirements, property_record: PropertyRecord, notes: list[str], ) -> float: """Score weekly rent fit after hard budget filtering has passed.""" if requirements.budget_max is None or property_record.rent_pw is None: return 0.0 ratio = property_record.rent_pw / requirements.budget_max if ratio <= 1: notes.append("Within the requested weekly budget.") if requirements.priority == "price": return max(0.0, 24.0 * (1.0 - ratio)) return max(0.0, 20.0 * ratio) return 0.0 def _space_score( self, requirements: RentalRequirements, property_record: PropertyRecord, notes: list[str], ) -> float: """Score bedroom, bathroom, and parking fit.""" score = 0.0 if requirements.bedrooms_min is not None and property_record.bedrooms is not None: if property_record.bedrooms == requirements.bedrooms_min: score += 18.0 notes.append("Bedroom count is an exact fit.") elif property_record.bedrooms > requirements.bedrooms_min: score += 14.0 + min(property_record.bedrooms - requirements.bedrooms_min, 2) notes.append("Meets the bedroom requirement.") if requirements.bathrooms_min is not None and property_record.bathrooms is not None: score += min(8.0, 4.0 + max(0, property_record.bathrooms - requirements.bathrooms_min)) if requirements.parking_min is not None and property_record.parking_spaces is not None: if requirements.parking_min == 0: score += 1.0 elif property_record.parking_spaces >= requirements.parking_min: score += 4.0 return score def _property_type_score( self, requirements: RentalRequirements, property_record: PropertyRecord, notes: list[str], ) -> float: """Score property type once any hard type mismatch has been cleared.""" if not requirements.property_type: return 0.0 if self._normalize_type(requirements.property_type) == self._normalize_type(property_record.property_type): notes.append("Matches the requested property type.") return 8.0 return 0.0 def _lifestyle_score( self, requirements: RentalRequirements, property_record: PropertyRecord, notes: list[str], ) -> float: """Score flexible lifestyle extras from description and feature fields.""" score = 0.0 text = " ".join( value or "" for value in [ property_record.description, property_record.view_type, property_record.pool, property_record.spa, property_record.furnishing, property_record.pets, ] ).casefold() for feature in requirements.features: if feature.casefold() in text: score += 4.0 notes.append(f"Includes requested feature: {feature}.") if requirements.pets_required is True and "yes" in (property_record.pets or "").casefold(): score += 4.0 if requirements.furnished_required is True and "yes" in (property_record.furnishing or "").casefold(): score += 4.0 return min(score, 16.0) def _availability_score( self, requirements: RentalRequirements, property_record: PropertyRecord, notes: list[str], ) -> float: """Score availability against simple move-in timing preferences.""" availability = (property_record.availability or "").casefold() if requirements.move_in_timing == "asap" and availability == "available": notes.append("Listed as available now.") return 8.0 if property_record.available_from is not None: return max(1.0, 6.0 - math.log1p(abs(property_record.available_from.toordinal()))) return 1.0 def _normalize_type(self, value: str | None) -> str: """Map source property type wording into Alba Core's small type set.""" text = (value or "").casefold() if "town" in text: return "townhouse" if "apart" in text or "unit" in text or "flat" in text: return "apartment" if "house" in text or "home" in text: return "house" return text