SafePath Training — Phase 2B: Standalone Automation
Overview
This document covers the automation features that work in standalone SafePath (no ChemIQ dependency). These features are available at Standard and Pro tiers and include:
- Auto-Assignment Rules Engine — configurable rules to automatically assign training based on triggers (new hire, role change, site transfer, cert expiring)
- Notification & Reminder Automation — email and in-app notifications for training events (assigned, upcoming due, overdue, completed, cert expiring)
- Certification Expiration Alerts — proactive alerts when certifications are approaching expiration, with optional auto-assign refresher
Prerequisites:
- Phase 1 (1A–1F) complete — all core SafePath tables, services, and UI exist
SafePathAutoAssignmentRulemodel already exists (created in Phase 1A)SafePathAssignment.auto_rule_idFK already exists- Notification framework (email service) — needs building as part of this phase
Tier gating:
- Auto-assignment rules: Standard + Pro (
ASSIGNMENT_AUTO_RULES) - Certification expiration alerts: Standard + Pro (
CERTIFICATION_EXPIRATION_ALERTS) - Webhook notifications: Pro only
- Multi-channel + Manager alerts: Pro only
Files to create/modify:
| Action | File |
|---|---|
| Create | app/services/safepath/auto_assignment_engine.py |
| Create | app/services/safepath/notification_service.py |
| Create | app/schemas/safepath/automation.py |
| Create | app/api/v1/safepath/automation.py |
| Modify | app/api/v1/safepath/__init__.py (register new router) |
| Create | app/services/safepath/scheduler.py (cron-like task runner) |
| Create | src/pages/safepath/automation/index.tsx |
| Create | src/pages/safepath/automation/rules.tsx |
| Create | src/pages/safepath/automation/notifications.tsx |
| Modify | src/services/api/safepath.api.ts (add API methods) |
| Modify | src/types/safepath.ts (add TypeScript interfaces) |
1. Architecture
1.1 Auto-Assignment Rules Engine
┌──────────────────────────────────────────────────────────────────┐
│ Auto-Assignment Rules Engine │
│ │
│ Triggers (Events) Rules (Config) │
│ ───────────────── ────────────── │
│ • new_hire SafePathAutoAssignmentRule │
│ • role_change ├── trigger_type │
│ • site_transfer ├── trigger_config (JSONB) │
│ • cert_expiring ├── course_id │
│ • (chemical_added → Phase 2A) ├── due_date_offset_days │
│ └── is_active │
│ │
│ Engine evaluates triggers against active rules and creates │
│ SafePathAssignment records with auto_rule_id set. │
└──────────────────────────────────────────────────────────────────┘
│
│ creates
▼
┌──────────────────────────────────────────────────────────────────┐
│ SafePathAssignment │
│ ├── auto_rule_id → (which rule created this) │
│ ├── notes: "Auto-assigned: {trigger description}" │
│ └── status: "assigned" │
└──────────────────────────────────────────────────────────────────┘
│
│ triggers
▼
┌──────────────────────────────────────────────────────────────────┐
│ Notification Service │
│ ├── Email (Mailgun) — all tiers │
│ ├── In-app notification — Standard + Pro │
│ └── Webhook — Pro only │
└──────────────────────────────────────────────────────────────────┘
1.2 Notification Flow
Event occurs (assignment, due date, completion, cert expiry)
│
▼
NotificationService.send(event_type, recipient, data)
│
├── Determine notification channels based on plan tier
│ ├── Starter: Email only
│ ├── Standard: Email + In-app
│ └── Pro: Email + In-app + Webhook + Manager alert
│
├── Render email template
│ └── Send via Mailgun API (existing config)
│
├── Create in-app notification record
│ └── safepath_notifications table (new)
│
└── Fire webhook (Pro only)
└── POST to configured webhook URL
1.3 Scheduled Tasks
Scheduler (runs periodically via background service or cron)
│
├── Every 6 hours: Check for overdue assignments → send overdue alerts
├── Daily at 8am: Check certifications expiring within alert window
├── Daily at 8am: Send training reminders (X days before due date)
└── Weekly: Generate compliance summary for managers (Pro only)
2. Database Changes
2.1 New Table: safepath_notifications
CREATE TABLE safepath_notifications (
notification_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
company_id UUID NOT NULL REFERENCES core_data_companies(company_id) ON DELETE CASCADE,
user_id UUID NOT NULL REFERENCES core_data_users(user_id) ON DELETE CASCADE,
notification_type VARCHAR(50) NOT NULL,
-- training_assigned, training_reminder, training_overdue, training_completed,
-- cert_expiring, cert_expired, cert_renewed
title VARCHAR(255) NOT NULL,
message TEXT NOT NULL,
entity_type VARCHAR(30), -- assignment, certification, course
entity_id UUID,
is_read BOOLEAN NOT NULL DEFAULT FALSE,
read_at TIMESTAMP,
channels_sent JSONB DEFAULT '[]'::jsonb, -- ["email", "in_app", "webhook"]
email_sent_at TIMESTAMP,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
CONSTRAINT ck_safepath_notif_type CHECK (
notification_type IN (
'training_assigned', 'training_reminder', 'training_overdue',
'training_completed', 'cert_expiring', 'cert_expired', 'cert_renewed'
)
)
);
CREATE INDEX ix_safepath_notif_user_unread ON safepath_notifications(user_id, is_read)
WHERE is_read = FALSE;
CREATE INDEX ix_safepath_notif_company ON safepath_notifications(company_id, created_at DESC);
2.2 New Table: safepath_notification_preferences
CREATE TABLE safepath_notification_preferences (
preference_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
company_id UUID NOT NULL REFERENCES core_data_companies(company_id) ON DELETE CASCADE,
user_id UUID REFERENCES core_data_users(user_id) ON DELETE CASCADE,
-- NULL user_id = company-wide default
notification_type VARCHAR(50) NOT NULL,
email_enabled BOOLEAN NOT NULL DEFAULT TRUE,
in_app_enabled BOOLEAN NOT NULL DEFAULT TRUE,
reminder_days_before_due INTEGER DEFAULT 7,
cert_alert_days_before_expiry INTEGER DEFAULT 60,
manager_alert_enabled BOOLEAN DEFAULT FALSE, -- Pro only
webhook_url TEXT, -- Pro only
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
updated_at TIMESTAMP NOT NULL DEFAULT NOW(),
CONSTRAINT uq_safepath_notif_pref UNIQUE (company_id, user_id, notification_type)
);
2.3 Alembic Migration
File: alembic/versions/XXXX_add_safepath_automation_tables.py
"""Add SafePath automation tables (notifications, preferences)
Revision ID: auto-generated
Revises: <previous_migration>
Create Date: auto-generated
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import UUID, JSONB
def upgrade() -> None:
# safepath_notifications
op.create_table(
'safepath_notifications',
sa.Column('notification_id', UUID(as_uuid=True), primary_key=True,
server_default=sa.text('gen_random_uuid()')),
sa.Column('company_id', UUID(as_uuid=True),
sa.ForeignKey('core_data_companies.company_id', ondelete='CASCADE'),
nullable=False),
sa.Column('user_id', UUID(as_uuid=True),
sa.ForeignKey('core_data_users.user_id', ondelete='CASCADE'),
nullable=False),
sa.Column('notification_type', sa.String(50), nullable=False),
sa.Column('title', sa.String(255), nullable=False),
sa.Column('message', sa.Text, nullable=False),
sa.Column('entity_type', sa.String(30), nullable=True),
sa.Column('entity_id', UUID(as_uuid=True), nullable=True),
sa.Column('is_read', sa.Boolean, nullable=False, server_default='false'),
sa.Column('read_at', sa.DateTime, nullable=True),
sa.Column('channels_sent', JSONB, server_default='[]'),
sa.Column('email_sent_at', sa.DateTime, nullable=True),
sa.Column('created_at', sa.DateTime, nullable=False, server_default=sa.text('NOW()')),
)
op.create_index('ix_safepath_notif_user_unread', 'safepath_notifications',
['user_id', 'is_read'], postgresql_where=sa.text('is_read = FALSE'))
op.create_index('ix_safepath_notif_company', 'safepath_notifications',
['company_id', sa.text('created_at DESC')])
# safepath_notification_preferences
op.create_table(
'safepath_notification_preferences',
sa.Column('preference_id', UUID(as_uuid=True), primary_key=True,
server_default=sa.text('gen_random_uuid()')),
sa.Column('company_id', UUID(as_uuid=True),
sa.ForeignKey('core_data_companies.company_id', ondelete='CASCADE'),
nullable=False),
sa.Column('user_id', UUID(as_uuid=True),
sa.ForeignKey('core_data_users.user_id', ondelete='CASCADE'),
nullable=True),
sa.Column('notification_type', sa.String(50), nullable=False),
sa.Column('email_enabled', sa.Boolean, nullable=False, server_default='true'),
sa.Column('in_app_enabled', sa.Boolean, nullable=False, server_default='true'),
sa.Column('reminder_days_before_due', sa.Integer, server_default='7'),
sa.Column('cert_alert_days_before_expiry', sa.Integer, server_default='60'),
sa.Column('manager_alert_enabled', sa.Boolean, server_default='false'),
sa.Column('webhook_url', sa.Text, nullable=True),
sa.Column('created_at', sa.DateTime, nullable=False, server_default=sa.text('NOW()')),
sa.Column('updated_at', sa.DateTime, nullable=False, server_default=sa.text('NOW()')),
sa.UniqueConstraint('company_id', 'user_id', 'notification_type',
name='uq_safepath_notif_pref'),
)
def downgrade() -> None:
op.drop_table('safepath_notification_preferences')
op.drop_index('ix_safepath_notif_company', 'safepath_notifications')
op.drop_index('ix_safepath_notif_user_unread', 'safepath_notifications')
op.drop_table('safepath_notifications')
2.4 SQLAlchemy Models
File: app/db/models/safepath.py (add to existing file)
# ---------------------------------------------------------------------------
# Notification Tables (Phase 2B)
# ---------------------------------------------------------------------------
class SafePathNotification(Base):
"""In-app notification for SafePath events"""
__tablename__ = "safepath_notifications"
__table_args__ = (
CheckConstraint(
"notification_type IN ('training_assigned', 'training_reminder', 'training_overdue', "
"'training_completed', 'cert_expiring', 'cert_expired', 'cert_renewed')",
name="ck_safepath_notif_type"
),
Index("ix_safepath_notif_user_unread", "user_id", "is_read",
postgresql_where=text("is_read = FALSE")),
Index("ix_safepath_notif_company", "company_id", text("created_at DESC")),
)
notification_id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
company_id = Column(UUID(as_uuid=True), ForeignKey("core_data_companies.company_id", ondelete="CASCADE"), nullable=False)
user_id = Column(UUID(as_uuid=True), ForeignKey("core_data_users.user_id", ondelete="CASCADE"), nullable=False)
notification_type = Column(String(50), nullable=False)
title = Column(String(255), nullable=False)
message = Column(Text, nullable=False)
entity_type = Column(String(30), nullable=True)
entity_id = Column(UUID(as_uuid=True), nullable=True)
is_read = Column(Boolean, default=False, nullable=False)
read_at = Column(DateTime, nullable=True)
channels_sent = Column(JSONB, default=list, nullable=True)
email_sent_at = Column(DateTime, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
# Relationships
user = relationship("User", foreign_keys=[user_id])
def __repr__(self):
return f"<SafePathNotification(type={self.notification_type}, read={self.is_read})>"
class SafePathNotificationPreference(Base):
"""Notification preferences per user or company-wide default"""
__tablename__ = "safepath_notification_preferences"
__table_args__ = (
UniqueConstraint("company_id", "user_id", "notification_type",
name="uq_safepath_notif_pref"),
)
preference_id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
company_id = Column(UUID(as_uuid=True), ForeignKey("core_data_companies.company_id", ondelete="CASCADE"), nullable=False)
user_id = Column(UUID(as_uuid=True), ForeignKey("core_data_users.user_id", ondelete="CASCADE"), nullable=True)
notification_type = Column(String(50), nullable=False)
email_enabled = Column(Boolean, default=True, nullable=False)
in_app_enabled = Column(Boolean, default=True, nullable=False)
reminder_days_before_due = Column(Integer, default=7)
cert_alert_days_before_expiry = Column(Integer, default=60)
manager_alert_enabled = Column(Boolean, default=False)
webhook_url = Column(Text, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow, nullable=False)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Relationships
user = relationship("User", foreign_keys=[user_id])
def __repr__(self):
return f"<SafePathNotificationPreference(type={self.notification_type})>"
3. Pydantic Schemas
File: app/schemas/safepath/automation.py
"""SafePath Automation Schemas
Schemas for auto-assignment rules, notifications, and certification alerts.
"""
from datetime import datetime, date
from typing import Optional, List
from uuid import UUID
from pydantic import BaseModel, Field, ConfigDict
# ============================================================================
# Auto-Assignment Rule Schemas
# ============================================================================
class AutoAssignmentRuleCreate(BaseModel):
"""Create an auto-assignment rule."""
course_id: UUID
trigger_type: str = Field(..., pattern="^(new_hire|role_change|site_transfer|cert_expiring|chemical_added)$")
trigger_config: dict = Field(default_factory=dict, description="Trigger-specific configuration")
due_date_offset_days: int = Field(default=30, ge=1, le=365)
is_active: bool = Field(default=True)
class AutoAssignmentRuleUpdate(BaseModel):
"""Update an auto-assignment rule."""
trigger_config: Optional[dict] = None
due_date_offset_days: Optional[int] = Field(None, ge=1, le=365)
is_active: Optional[bool] = None
class AutoAssignmentRuleResponse(BaseModel):
"""Response schema for an auto-assignment rule."""
model_config = ConfigDict(from_attributes=True)
rule_id: UUID
company_id: UUID
course_id: UUID
course_title: Optional[str] = None
trigger_type: str
trigger_config: dict
due_date_offset_days: int
is_active: bool
created_by: Optional[UUID] = None
created_by_name: Optional[str] = None
created_at: datetime
updated_at: Optional[datetime] = None
# Stats
assignments_created_count: int = 0
class AutoAssignmentRuleListResponse(BaseModel):
"""Paginated list of auto-assignment rules."""
items: List[AutoAssignmentRuleResponse]
total: int
page: int
page_size: int
total_pages: int
class TriggerConfigNewHire(BaseModel):
"""Config for new_hire trigger: assign when employee joins matching criteria."""
role_ids: List[UUID] = Field(default_factory=list, description="Only assign if user has one of these roles")
site_ids: List[UUID] = Field(default_factory=list, description="Only assign if user is at one of these sites")
class TriggerConfigRoleChange(BaseModel):
"""Config for role_change trigger: assign when role changes to matching role."""
target_role_ids: List[UUID] = Field(..., min_length=1, description="Roles that trigger assignment")
class TriggerConfigSiteTransfer(BaseModel):
"""Config for site_transfer trigger: assign when transferred to matching site."""
target_site_ids: List[UUID] = Field(..., min_length=1, description="Sites that trigger assignment")
class TriggerConfigCertExpiring(BaseModel):
"""Config for cert_expiring trigger: assign refresher when cert expires soon."""
certification_type_id: UUID = Field(..., description="Which certification type to watch")
alert_days_before: int = Field(default=60, ge=7, le=365)
# ============================================================================
# Auto-Assignment Engine Schemas
# ============================================================================
class RuleEvaluationRequest(BaseModel):
"""Request to evaluate rules for a specific trigger event."""
trigger_type: str
user_id: UUID
company_id: UUID
trigger_data: dict = Field(default_factory=dict, description="Trigger-specific context data")
class RuleEvaluationResult(BaseModel):
"""Result of evaluating auto-assignment rules."""
rules_evaluated: int
rules_matched: int
assignments_created: int
assignments: List[dict] # [{rule_id, course_id, course_title, due_date}]
skipped: List[dict] # [{rule_id, reason}]
# ============================================================================
# Notification Schemas
# ============================================================================
class NotificationResponse(BaseModel):
"""Response schema for a notification."""
model_config = ConfigDict(from_attributes=True)
notification_id: UUID
notification_type: str
title: str
message: str
entity_type: Optional[str] = None
entity_id: Optional[UUID] = None
is_read: bool
read_at: Optional[datetime] = None
channels_sent: List[str] = []
created_at: datetime
class NotificationListResponse(BaseModel):
"""Paginated notification list."""
items: List[NotificationResponse]
total: int
unread_count: int
page: int
page_size: int
total_pages: int
class NotificationMarkReadRequest(BaseModel):
"""Mark one or more notifications as read."""
notification_ids: List[UUID] = Field(..., min_length=1)
class NotificationPreferenceResponse(BaseModel):
"""Response schema for notification preference."""
model_config = ConfigDict(from_attributes=True)
preference_id: UUID
notification_type: str
email_enabled: bool
in_app_enabled: bool
reminder_days_before_due: int
cert_alert_days_before_expiry: int
manager_alert_enabled: bool
webhook_url: Optional[str] = None
class NotificationPreferenceUpdate(BaseModel):
"""Update notification preferences."""
email_enabled: Optional[bool] = None
in_app_enabled: Optional[bool] = None
reminder_days_before_due: Optional[int] = Field(None, ge=1, le=30)
cert_alert_days_before_expiry: Optional[int] = Field(None, ge=7, le=365)
manager_alert_enabled: Optional[bool] = None
webhook_url: Optional[str] = None
# ============================================================================
# Certification Alert Schemas
# ============================================================================
class CertExpirationAlert(BaseModel):
"""A certification that is expiring or has expired."""
certification_id: UUID
user_id: UUID
user_name: str
user_email: str
certification_type_name: str
expiration_date: date
days_until_expiry: int # Negative = already expired
status: str # expiring_soon, expired
auto_refresher_assigned: bool = False
refresher_course_id: Optional[UUID] = None
class CertExpirationSummary(BaseModel):
"""Summary of certification expirations for a company."""
expiring_within_30_days: int
expiring_within_60_days: int
expiring_within_90_days: int
already_expired: int
alerts: List[CertExpirationAlert]
4. Auto-Assignment Engine Service
File: app/services/safepath/auto_assignment_engine.py
"""SafePath Auto-Assignment Engine
Evaluates auto-assignment rules against trigger events and creates
training assignments automatically.
"""
import uuid
from datetime import datetime, date, timedelta
from typing import Optional, List, Tuple
from uuid import UUID
from sqlalchemy import and_, func
from sqlalchemy.orm import Session, joinedload
from app.db.models.safepath import (
SafePathAutoAssignmentRule, SafePathAssignment, SafePathCourse,
SafePathCertification, SafePathCertificationType, SafePathAuditLog,
)
from app.db.models.user import User, CompanyUser
from app.db.models.company import CompanySite
from app.schemas.safepath.automation import (
AutoAssignmentRuleCreate, AutoAssignmentRuleUpdate,
AutoAssignmentRuleResponse, AutoAssignmentRuleListResponse,
RuleEvaluationRequest, RuleEvaluationResult,
)
class AutoAssignmentEngine:
"""Engine for managing and evaluating auto-assignment rules."""
def __init__(self, db: Session):
self.db = db
# ========================================================================
# Rule CRUD
# ========================================================================
def create_rule(
self,
request: AutoAssignmentRuleCreate,
company_id: UUID,
user_id: UUID,
) -> SafePathAutoAssignmentRule:
"""Create a new auto-assignment rule."""
# Verify course exists and belongs to company
course = self.db.query(SafePathCourse).filter(
SafePathCourse.course_id == request.course_id,
SafePathCourse.company_id == company_id,
).first()
if not course:
raise ValueError("Course not found or doesn't belong to this company.")
rule = SafePathAutoAssignmentRule(
rule_id=uuid.uuid4(),
company_id=company_id,
course_id=request.course_id,
trigger_type=request.trigger_type,
trigger_config=request.trigger_config,
due_date_offset_days=request.due_date_offset_days,
is_active=request.is_active,
created_by=user_id,
)
self.db.add(rule)
self._log_event(company_id, user_id, "auto_rule_created", "rule", rule.rule_id, {
"trigger_type": request.trigger_type,
"course_id": str(request.course_id),
})
self.db.commit()
self.db.refresh(rule)
return rule
def update_rule(
self,
rule_id: UUID,
request: AutoAssignmentRuleUpdate,
company_id: UUID,
user_id: UUID,
) -> Optional[SafePathAutoAssignmentRule]:
"""Update an existing auto-assignment rule."""
rule = self.db.query(SafePathAutoAssignmentRule).filter(
SafePathAutoAssignmentRule.rule_id == rule_id,
SafePathAutoAssignmentRule.company_id == company_id,
).first()
if not rule:
return None
if request.trigger_config is not None:
rule.trigger_config = request.trigger_config
if request.due_date_offset_days is not None:
rule.due_date_offset_days = request.due_date_offset_days
if request.is_active is not None:
rule.is_active = request.is_active
self._log_event(company_id, user_id, "auto_rule_updated", "rule", rule.rule_id, {
"changes": request.model_dump(exclude_none=True),
})
self.db.commit()
self.db.refresh(rule)
return rule
def delete_rule(
self, rule_id: UUID, company_id: UUID, user_id: UUID
) -> bool:
"""Delete an auto-assignment rule."""
rule = self.db.query(SafePathAutoAssignmentRule).filter(
SafePathAutoAssignmentRule.rule_id == rule_id,
SafePathAutoAssignmentRule.company_id == company_id,
).first()
if not rule:
return False
self._log_event(company_id, user_id, "auto_rule_deleted", "rule", rule.rule_id, {
"trigger_type": rule.trigger_type,
})
self.db.delete(rule)
self.db.commit()
return True
def list_rules(
self,
company_id: UUID,
page: int = 1,
page_size: int = 20,
trigger_type: Optional[str] = None,
is_active: Optional[bool] = None,
) -> AutoAssignmentRuleListResponse:
"""List auto-assignment rules with filtering."""
query = self.db.query(SafePathAutoAssignmentRule).filter(
SafePathAutoAssignmentRule.company_id == company_id,
).options(
joinedload(SafePathAutoAssignmentRule.course),
joinedload(SafePathAutoAssignmentRule.creator),
)
if trigger_type:
query = query.filter(SafePathAutoAssignmentRule.trigger_type == trigger_type)
if is_active is not None:
query = query.filter(SafePathAutoAssignmentRule.is_active == is_active)
total = query.count()
rules = query.order_by(
SafePathAutoAssignmentRule.created_at.desc()
).offset((page - 1) * page_size).limit(page_size).all()
items = []
for rule in rules:
# Count assignments created by this rule
assignment_count = self.db.query(func.count(SafePathAssignment.assignment_id)).filter(
SafePathAssignment.auto_rule_id == rule.rule_id
).scalar() or 0
items.append(AutoAssignmentRuleResponse(
rule_id=rule.rule_id,
company_id=rule.company_id,
course_id=rule.course_id,
course_title=rule.course.title if rule.course else None,
trigger_type=rule.trigger_type,
trigger_config=rule.trigger_config,
due_date_offset_days=rule.due_date_offset_days,
is_active=rule.is_active,
created_by=rule.created_by,
created_by_name=rule.creator.full_name if rule.creator else None,
created_at=rule.created_at,
updated_at=rule.updated_at,
assignments_created_count=assignment_count,
))
return AutoAssignmentRuleListResponse(
items=items,
total=total,
page=page,
page_size=page_size,
total_pages=(total + page_size - 1) // page_size,
)
# ========================================================================
# Rule Evaluation
# ========================================================================
def evaluate_trigger(
self,
request: RuleEvaluationRequest,
) -> RuleEvaluationResult:
"""Evaluate all active rules for a specific trigger event.
Called when an event occurs (new hire, role change, site transfer, cert expiring).
Finds matching rules and creates assignments.
"""
rules = self.db.query(SafePathAutoAssignmentRule).filter(
SafePathAutoAssignmentRule.company_id == request.company_id,
SafePathAutoAssignmentRule.trigger_type == request.trigger_type,
SafePathAutoAssignmentRule.is_active == True,
).options(
joinedload(SafePathAutoAssignmentRule.course)
).all()
result = RuleEvaluationResult(
rules_evaluated=len(rules),
rules_matched=0,
assignments_created=0,
assignments=[],
skipped=[],
)
for rule in rules:
matched, reason = self._evaluate_rule(rule, request)
if not matched:
result.skipped.append({
"rule_id": str(rule.rule_id),
"reason": reason,
})
continue
# Check if course is published
if not rule.course or rule.course.status != "published":
result.skipped.append({
"rule_id": str(rule.rule_id),
"reason": "Course is not published",
})
continue
# Check if user already has an active assignment
existing = self.db.query(SafePathAssignment).filter(
SafePathAssignment.course_id == rule.course_id,
SafePathAssignment.assigned_to == request.user_id,
SafePathAssignment.status.in_(["assigned", "in_progress"]),
).first()
if existing:
result.skipped.append({
"rule_id": str(rule.rule_id),
"reason": "User already has an active assignment for this course",
})
continue
# Create assignment
due_date = date.today() + timedelta(days=rule.due_date_offset_days)
assignment = SafePathAssignment(
assignment_id=uuid.uuid4(),
company_id=request.company_id,
course_id=rule.course_id,
assigned_to=request.user_id,
due_date=due_date,
priority="normal",
status="assigned",
auto_rule_id=rule.rule_id,
notes=f"Auto-assigned: {self._trigger_description(request.trigger_type, request.trigger_data)}",
)
self.db.add(assignment)
result.rules_matched += 1
result.assignments_created += 1
result.assignments.append({
"rule_id": str(rule.rule_id),
"course_id": str(rule.course_id),
"course_title": rule.course.title,
"due_date": due_date.isoformat(),
})
if result.assignments_created > 0:
self.db.commit()
return result
def check_expiring_certifications(
self,
company_id: UUID,
alert_days: int = 60,
) -> List[dict]:
"""Check for certifications expiring within the alert window.
Called by the scheduler. Returns list of expiring certs and optionally
triggers auto-assignment of refresher courses.
"""
cutoff_date = date.today() + timedelta(days=alert_days)
expiring = self.db.query(SafePathCertification).filter(
SafePathCertification.company_id == company_id,
SafePathCertification.status == "active",
SafePathCertification.expiration_date <= cutoff_date,
SafePathCertification.expiration_date >= date.today(),
).options(
joinedload(SafePathCertification.user),
joinedload(SafePathCertification.certification_type),
).all()
alerts = []
for cert in expiring:
days_until = (cert.expiration_date - date.today()).days
# Check for cert_expiring auto-rules
rules = self.db.query(SafePathAutoAssignmentRule).filter(
SafePathAutoAssignmentRule.company_id == company_id,
SafePathAutoAssignmentRule.trigger_type == "cert_expiring",
SafePathAutoAssignmentRule.is_active == True,
).all()
auto_assigned = False
for rule in rules:
config = rule.trigger_config or {}
target_type_id = config.get("certification_type_id")
alert_threshold = config.get("alert_days_before", 60)
if target_type_id and str(cert.certification_type_id) != target_type_id:
continue
if days_until > alert_threshold:
continue
# Trigger auto-assignment
eval_request = RuleEvaluationRequest(
trigger_type="cert_expiring",
user_id=cert.user_id,
company_id=company_id,
trigger_data={
"certification_id": str(cert.certification_id),
"certification_type_id": str(cert.certification_type_id),
"expiration_date": cert.expiration_date.isoformat(),
},
)
eval_result = self.evaluate_trigger(eval_request)
if eval_result.assignments_created > 0:
auto_assigned = True
alerts.append({
"certification_id": str(cert.certification_id),
"user_id": str(cert.user_id),
"user_name": cert.user.full_name if cert.user else "Unknown",
"certification_type": cert.certification_type.name if cert.certification_type else "Unknown",
"expiration_date": cert.expiration_date.isoformat(),
"days_until_expiry": days_until,
"auto_refresher_assigned": auto_assigned,
})
return alerts
# ========================================================================
# Private Helpers
# ========================================================================
def _evaluate_rule(
self,
rule: SafePathAutoAssignmentRule,
request: RuleEvaluationRequest,
) -> Tuple[bool, str]:
"""Evaluate whether a specific rule matches the trigger event."""
config = rule.trigger_config or {}
if rule.trigger_type == "new_hire":
# Check role and site filters
role_ids = config.get("role_ids", [])
site_ids = config.get("site_ids", [])
if role_ids:
user_role_id = request.trigger_data.get("role_id", "")
if user_role_id and user_role_id not in role_ids:
return False, "User role doesn't match rule filter"
if site_ids:
user_site_id = request.trigger_data.get("site_id", "")
if user_site_id and user_site_id not in site_ids:
return False, "User site doesn't match rule filter"
return True, ""
elif rule.trigger_type == "role_change":
target_role_ids = config.get("target_role_ids", [])
new_role_id = request.trigger_data.get("new_role_id", "")
if target_role_ids and new_role_id not in target_role_ids:
return False, "New role doesn't match target roles"
return True, ""
elif rule.trigger_type == "site_transfer":
target_site_ids = config.get("target_site_ids", [])
new_site_id = request.trigger_data.get("new_site_id", "")
if target_site_ids and new_site_id not in target_site_ids:
return False, "New site doesn't match target sites"
return True, ""
elif rule.trigger_type == "cert_expiring":
target_type_id = config.get("certification_type_id")
cert_type_id = request.trigger_data.get("certification_type_id", "")
if target_type_id and cert_type_id != target_type_id:
return False, "Certification type doesn't match"
return True, ""
elif rule.trigger_type == "chemical_added":
# Handled by Phase 2A HazComIntegrationService
return True, ""
return False, f"Unknown trigger type: {rule.trigger_type}"
def _trigger_description(self, trigger_type: str, trigger_data: dict) -> str:
"""Generate a human-readable description of the trigger."""
descriptions = {
"new_hire": "New employee onboarded",
"role_change": f"Role changed to {trigger_data.get('new_role_name', 'new role')}",
"site_transfer": f"Transferred to {trigger_data.get('new_site_name', 'new site')}",
"cert_expiring": f"Certification expiring on {trigger_data.get('expiration_date', 'soon')}",
"chemical_added": f"New chemical '{trigger_data.get('product_name', 'unknown')}' added",
}
return descriptions.get(trigger_type, f"Trigger: {trigger_type}")
def _log_event(
self, company_id: UUID, user_id: UUID, event_type: str,
entity_type: str, entity_id: UUID, details: dict
):
"""Write to SafePath audit log."""
log = SafePathAuditLog(
log_id=uuid.uuid4(),
company_id=company_id,
event_type=event_type,
entity_type=entity_type,
entity_id=entity_id,
user_id=user_id,
details=details,
)
self.db.add(log)
5. Notification Service
File: app/services/safepath/notification_service.py
"""SafePath Notification Service
Handles email, in-app, and webhook notifications for SafePath events.
"""
import uuid
import logging
from datetime import datetime
from typing import Optional, List
from uuid import UUID
from sqlalchemy import and_, func
from sqlalchemy.orm import Session
from app.db.models.safepath import SafePathNotification, SafePathNotificationPreference
from app.db.models.user import User
from app.schemas.safepath.automation import (
NotificationResponse, NotificationListResponse,
NotificationPreferenceResponse, NotificationPreferenceUpdate,
)
from app.core.config import settings
logger = logging.getLogger(__name__)
# Email templates
EMAIL_TEMPLATES = {
"training_assigned": {
"subject": "New Training Assignment: {course_title}",
"body": """
Hello {user_name},
You have been assigned a new training course: **{course_title}**
Due date: {due_date}
Priority: {priority}
Please log in to Tellus EHS to start your training.
{app_url}/safepath/learner
Best regards,
Tellus EHS Training
""",
},
"training_reminder": {
"subject": "Training Reminder: {course_title} due in {days_remaining} days",
"body": """
Hello {user_name},
This is a reminder that your training assignment is due soon:
Course: {course_title}
Due date: {due_date}
Days remaining: {days_remaining}
Please complete your training before the due date.
{app_url}/safepath/learner
Best regards,
Tellus EHS Training
""",
},
"training_overdue": {
"subject": "OVERDUE: Training Assignment - {course_title}",
"body": """
Hello {user_name},
Your training assignment is now overdue:
Course: {course_title}
Due date: {due_date}
Days overdue: {days_overdue}
Please complete this training as soon as possible.
{app_url}/safepath/learner
Best regards,
Tellus EHS Training
""",
},
"training_completed": {
"subject": "Training Completed: {course_title}",
"body": """
Hello {user_name},
Congratulations! You have successfully completed:
Course: {course_title}
Score: {score}%
Completed on: {completed_at}
Your training record has been updated.
Best regards,
Tellus EHS Training
""",
},
"cert_expiring": {
"subject": "Certification Expiring: {cert_type_name} - {days_until} days remaining",
"body": """
Hello {user_name},
Your certification is expiring soon:
Certification: {cert_type_name}
Expiration date: {expiration_date}
Days remaining: {days_until}
Please take action to renew your certification.
Best regards,
Tellus EHS Training
""",
},
"cert_expired": {
"subject": "EXPIRED: Certification - {cert_type_name}",
"body": """
Hello {user_name},
Your certification has expired:
Certification: {cert_type_name}
Expired on: {expiration_date}
Please contact your training coordinator to renew.
Best regards,
Tellus EHS Training
""",
},
}
class NotificationService:
"""Service for sending and managing SafePath notifications."""
def __init__(self, db: Session):
self.db = db
# ========================================================================
# Send Notifications
# ========================================================================
def send_notification(
self,
company_id: UUID,
user_id: UUID,
notification_type: str,
title: str,
message: str,
entity_type: Optional[str] = None,
entity_id: Optional[UUID] = None,
template_data: Optional[dict] = None,
plan_tier: str = "STARTER",
) -> SafePathNotification:
"""Send a notification via appropriate channels based on plan tier.
Channels:
- STARTER: Email only
- STANDARD: Email + In-app
- PRO: Email + In-app + Webhook
"""
# Get user preferences
prefs = self._get_preferences(company_id, user_id, notification_type)
channels_sent = []
# 1. Email (all tiers)
if prefs.get("email_enabled", True):
email_sent = self._send_email(user_id, notification_type, template_data or {})
if email_sent:
channels_sent.append("email")
# 2. In-app notification (Standard + Pro)
if plan_tier in ("STANDARD", "PRO") and prefs.get("in_app_enabled", True):
notification = SafePathNotification(
notification_id=uuid.uuid4(),
company_id=company_id,
user_id=user_id,
notification_type=notification_type,
title=title,
message=message,
entity_type=entity_type,
entity_id=entity_id,
channels_sent=channels_sent + ["in_app"],
email_sent_at=datetime.utcnow() if "email" in channels_sent else None,
)
self.db.add(notification)
channels_sent.append("in_app")
else:
# Still create a record for tracking, but don't show in-app
notification = SafePathNotification(
notification_id=uuid.uuid4(),
company_id=company_id,
user_id=user_id,
notification_type=notification_type,
title=title,
message=message,
entity_type=entity_type,
entity_id=entity_id,
is_read=True, # Mark as read since it's email-only
channels_sent=channels_sent,
email_sent_at=datetime.utcnow() if "email" in channels_sent else None,
)
self.db.add(notification)
# 3. Webhook (Pro only)
if plan_tier == "PRO":
webhook_url = prefs.get("webhook_url")
if webhook_url:
self._fire_webhook(webhook_url, {
"event": notification_type,
"title": title,
"message": message,
"entity_type": entity_type,
"entity_id": str(entity_id) if entity_id else None,
"timestamp": datetime.utcnow().isoformat(),
})
channels_sent.append("webhook")
# 4. Manager alert (Pro only)
if plan_tier == "PRO" and prefs.get("manager_alert_enabled", False):
if notification_type in ("training_overdue", "cert_expired"):
self._send_manager_alert(company_id, user_id, notification_type, title, message)
notification.channels_sent = channels_sent
self.db.commit()
self.db.refresh(notification)
return notification
# ========================================================================
# Notification CRUD
# ========================================================================
def list_notifications(
self,
company_id: UUID,
user_id: UUID,
page: int = 1,
page_size: int = 20,
unread_only: bool = False,
) -> NotificationListResponse:
"""List notifications for a user."""
query = self.db.query(SafePathNotification).filter(
SafePathNotification.company_id == company_id,
SafePathNotification.user_id == user_id,
)
if unread_only:
query = query.filter(SafePathNotification.is_read == False)
total = query.count()
unread_count = self.db.query(func.count(SafePathNotification.notification_id)).filter(
SafePathNotification.company_id == company_id,
SafePathNotification.user_id == user_id,
SafePathNotification.is_read == False,
).scalar() or 0
notifications = query.order_by(
SafePathNotification.created_at.desc()
).offset((page - 1) * page_size).limit(page_size).all()
items = [
NotificationResponse(
notification_id=n.notification_id,
notification_type=n.notification_type,
title=n.title,
message=n.message,
entity_type=n.entity_type,
entity_id=n.entity_id,
is_read=n.is_read,
read_at=n.read_at,
channels_sent=n.channels_sent or [],
created_at=n.created_at,
)
for n in notifications
]
return NotificationListResponse(
items=items,
total=total,
unread_count=unread_count,
page=page,
page_size=page_size,
total_pages=(total + page_size - 1) // page_size,
)
def mark_read(
self,
notification_ids: List[UUID],
user_id: UUID,
) -> int:
"""Mark notifications as read. Returns count of updated."""
count = self.db.query(SafePathNotification).filter(
SafePathNotification.notification_id.in_(notification_ids),
SafePathNotification.user_id == user_id,
SafePathNotification.is_read == False,
).update(
{"is_read": True, "read_at": datetime.utcnow()},
synchronize_session="fetch",
)
self.db.commit()
return count
def mark_all_read(self, company_id: UUID, user_id: UUID) -> int:
"""Mark all notifications as read for a user."""
count = self.db.query(SafePathNotification).filter(
SafePathNotification.company_id == company_id,
SafePathNotification.user_id == user_id,
SafePathNotification.is_read == False,
).update(
{"is_read": True, "read_at": datetime.utcnow()},
synchronize_session="fetch",
)
self.db.commit()
return count
# ========================================================================
# Notification Preferences
# ========================================================================
def get_preferences(
self,
company_id: UUID,
user_id: UUID,
) -> List[NotificationPreferenceResponse]:
"""Get notification preferences for a user."""
# Get user-specific preferences
prefs = self.db.query(SafePathNotificationPreference).filter(
SafePathNotificationPreference.company_id == company_id,
SafePathNotificationPreference.user_id == user_id,
).all()
# If no user prefs, get company defaults
if not prefs:
prefs = self.db.query(SafePathNotificationPreference).filter(
SafePathNotificationPreference.company_id == company_id,
SafePathNotificationPreference.user_id == None,
).all()
return [
NotificationPreferenceResponse(
preference_id=p.preference_id,
notification_type=p.notification_type,
email_enabled=p.email_enabled,
in_app_enabled=p.in_app_enabled,
reminder_days_before_due=p.reminder_days_before_due,
cert_alert_days_before_expiry=p.cert_alert_days_before_expiry,
manager_alert_enabled=p.manager_alert_enabled,
webhook_url=p.webhook_url,
)
for p in prefs
]
def update_preference(
self,
company_id: UUID,
user_id: UUID,
notification_type: str,
request: NotificationPreferenceUpdate,
) -> SafePathNotificationPreference:
"""Update or create notification preference."""
pref = self.db.query(SafePathNotificationPreference).filter(
SafePathNotificationPreference.company_id == company_id,
SafePathNotificationPreference.user_id == user_id,
SafePathNotificationPreference.notification_type == notification_type,
).first()
if not pref:
pref = SafePathNotificationPreference(
preference_id=uuid.uuid4(),
company_id=company_id,
user_id=user_id,
notification_type=notification_type,
)
self.db.add(pref)
if request.email_enabled is not None:
pref.email_enabled = request.email_enabled
if request.in_app_enabled is not None:
pref.in_app_enabled = request.in_app_enabled
if request.reminder_days_before_due is not None:
pref.reminder_days_before_due = request.reminder_days_before_due
if request.cert_alert_days_before_expiry is not None:
pref.cert_alert_days_before_expiry = request.cert_alert_days_before_expiry
if request.manager_alert_enabled is not None:
pref.manager_alert_enabled = request.manager_alert_enabled
if request.webhook_url is not None:
pref.webhook_url = request.webhook_url
self.db.commit()
self.db.refresh(pref)
return pref
# ========================================================================
# Private Helpers
# ========================================================================
def _get_preferences(
self, company_id: UUID, user_id: UUID, notification_type: str
) -> dict:
"""Get effective preferences for a user + notification type."""
# User-specific first
pref = self.db.query(SafePathNotificationPreference).filter(
SafePathNotificationPreference.company_id == company_id,
SafePathNotificationPreference.user_id == user_id,
SafePathNotificationPreference.notification_type == notification_type,
).first()
if not pref:
# Fall back to company default
pref = self.db.query(SafePathNotificationPreference).filter(
SafePathNotificationPreference.company_id == company_id,
SafePathNotificationPreference.user_id == None,
SafePathNotificationPreference.notification_type == notification_type,
).first()
if pref:
return {
"email_enabled": pref.email_enabled,
"in_app_enabled": pref.in_app_enabled,
"reminder_days_before_due": pref.reminder_days_before_due,
"cert_alert_days_before_expiry": pref.cert_alert_days_before_expiry,
"manager_alert_enabled": pref.manager_alert_enabled,
"webhook_url": pref.webhook_url,
}
# Default: all enabled
return {
"email_enabled": True,
"in_app_enabled": True,
"reminder_days_before_due": 7,
"cert_alert_days_before_expiry": 60,
"manager_alert_enabled": False,
"webhook_url": None,
}
def _send_email(
self, user_id: UUID, notification_type: str, template_data: dict
) -> bool:
"""Send email notification via Mailgun."""
user = self.db.query(User).filter(User.user_id == user_id).first()
if not user or not user.email:
return False
template = EMAIL_TEMPLATES.get(notification_type)
if not template:
logger.warning(f"No email template for notification type: {notification_type}")
return False
template_data["user_name"] = user.full_name
template_data["app_url"] = settings.ALLOWED_ORIGINS.split(",")[0] if settings.ALLOWED_ORIGINS else "https://app.tellusehs.com"
try:
import requests
subject = template["subject"].format(**template_data)
body = template["body"].format(**template_data)
response = requests.post(
f"https://api.mailgun.net/v3/{settings.MAILGUN_DOMAIN}/messages",
auth=("api", settings.MAILGUN_API_KEY),
data={
"from": f"{settings.MAILGUN_FROM_NAME} <{settings.MAILGUN_FROM_EMAIL}>",
"to": [user.email],
"subject": subject,
"text": body,
},
)
return response.status_code == 200
except Exception as e:
logger.error(f"Failed to send email notification: {e}")
return False
def _fire_webhook(self, webhook_url: str, payload: dict) -> bool:
"""Fire a webhook notification (Pro tier only)."""
try:
import requests
response = requests.post(
webhook_url,
json=payload,
timeout=10,
headers={"Content-Type": "application/json", "X-Source": "tellus-safepath"},
)
return response.status_code < 400
except Exception as e:
logger.error(f"Failed to fire webhook: {e}")
return False
def _send_manager_alert(
self,
company_id: UUID,
user_id: UUID,
notification_type: str,
title: str,
message: str,
):
"""Send alert to user's manager (Pro tier only)."""
# Find user's manager (via company_user relationship or role hierarchy)
# For now, send to all users with admin/coordinator role
from app.db.models.user import CompanyUser
admins = self.db.query(User).join(
CompanyUser, CompanyUser.user_id == User.user_id
).filter(
CompanyUser.company_id == company_id,
CompanyUser.status == "active",
# Filter for admin/coordinator roles would go here
).limit(5).all()
user = self.db.query(User).filter(User.user_id == user_id).first()
user_name = user.full_name if user else "Unknown"
for admin in admins:
if admin.user_id == user_id:
continue # Don't alert the user themselves
manager_notification = SafePathNotification(
notification_id=uuid.uuid4(),
company_id=company_id,
user_id=admin.user_id,
notification_type=notification_type,
title=f"[Manager Alert] {title}",
message=f"Employee {user_name}: {message}",
is_read=False,
channels_sent=["in_app"],
)
self.db.add(manager_notification)
6. Scheduler Service
File: app/services/safepath/scheduler.py
"""SafePath Scheduler
Scheduled tasks for reminders, overdue checks, and certification alerts.
Designed to be called by a cron job or background service.
"""
import logging
from datetime import datetime, date, timedelta
from uuid import UUID
from sqlalchemy import and_, func
from sqlalchemy.orm import Session, joinedload
from app.db.models.safepath import (
SafePathAssignment, SafePathCertification,
SafePathNotification, SafePathNotificationPreference,
)
from app.db.models.user import User
from app.db.models.company import Company
from app.db.models.module import CompanyEnabledModule
from app.services.safepath.notification_service import NotificationService
from app.services.safepath.auto_assignment_engine import AutoAssignmentEngine
logger = logging.getLogger(__name__)
class SafePathScheduler:
"""Runs scheduled SafePath tasks."""
def __init__(self, db: Session):
self.db = db
self.notification_service = NotificationService(db)
self.auto_assignment_engine = AutoAssignmentEngine(db)
def run_all_scheduled_tasks(self):
"""Run all scheduled tasks. Called by cron or background service."""
logger.info("SafePath Scheduler: Starting scheduled tasks")
# Get all companies with SafePath enabled
companies = self.db.query(CompanyEnabledModule.company_id).filter(
CompanyEnabledModule.module_code == "SAFEPATH",
CompanyEnabledModule.is_active == True,
).distinct().all()
for (company_id,) in companies:
try:
self.check_overdue_assignments(company_id)
self.send_training_reminders(company_id)
self.check_expiring_certifications(company_id)
except Exception as e:
logger.error(f"SafePath Scheduler error for company {company_id}: {e}")
logger.info("SafePath Scheduler: Completed scheduled tasks")
def check_overdue_assignments(self, company_id: UUID):
"""Find assignments past due date and send overdue notifications."""
overdue = self.db.query(SafePathAssignment).filter(
SafePathAssignment.company_id == company_id,
SafePathAssignment.status.in_(["assigned", "in_progress"]),
SafePathAssignment.due_date < date.today(),
).options(
joinedload(SafePathAssignment.course),
).all()
for assignment in overdue:
# Update status to overdue if not already
if assignment.status != "overdue":
assignment.status = "overdue"
# Check if we already sent an overdue notification today
existing_notif = self.db.query(SafePathNotification).filter(
SafePathNotification.user_id == assignment.assigned_to,
SafePathNotification.notification_type == "training_overdue",
SafePathNotification.entity_id == assignment.assignment_id,
SafePathNotification.created_at >= datetime.combine(date.today(), datetime.min.time()),
).first()
if existing_notif:
continue # Already notified today
days_overdue = (date.today() - assignment.due_date).days
course_title = assignment.course.title if assignment.course else "Unknown Course"
self.notification_service.send_notification(
company_id=company_id,
user_id=assignment.assigned_to,
notification_type="training_overdue",
title=f"Training Overdue: {course_title}",
message=f"Your training '{course_title}' is {days_overdue} days overdue.",
entity_type="assignment",
entity_id=assignment.assignment_id,
template_data={
"course_title": course_title,
"due_date": assignment.due_date.isoformat(),
"days_overdue": str(days_overdue),
},
)
self.db.commit()
logger.info(f"Company {company_id}: {len(overdue)} overdue assignments processed")
def send_training_reminders(self, company_id: UUID):
"""Send reminders for assignments due soon."""
# Get company preference for reminder days
pref = self.db.query(SafePathNotificationPreference).filter(
SafePathNotificationPreference.company_id == company_id,
SafePathNotificationPreference.user_id == None,
SafePathNotificationPreference.notification_type == "training_reminder",
).first()
reminder_days = pref.reminder_days_before_due if pref else 7
reminder_date = date.today() + timedelta(days=reminder_days)
upcoming = self.db.query(SafePathAssignment).filter(
SafePathAssignment.company_id == company_id,
SafePathAssignment.status.in_(["assigned", "in_progress"]),
SafePathAssignment.due_date <= reminder_date,
SafePathAssignment.due_date >= date.today(),
).options(
joinedload(SafePathAssignment.course),
).all()
reminders_sent = 0
for assignment in upcoming:
# Check if we already sent a reminder for this assignment
existing_notif = self.db.query(SafePathNotification).filter(
SafePathNotification.user_id == assignment.assigned_to,
SafePathNotification.notification_type == "training_reminder",
SafePathNotification.entity_id == assignment.assignment_id,
SafePathNotification.created_at >= datetime.combine(
date.today() - timedelta(days=3), datetime.min.time()
), # Don't resend within 3 days
).first()
if existing_notif:
continue
days_remaining = (assignment.due_date - date.today()).days
course_title = assignment.course.title if assignment.course else "Unknown Course"
self.notification_service.send_notification(
company_id=company_id,
user_id=assignment.assigned_to,
notification_type="training_reminder",
title=f"Training Due Soon: {course_title}",
message=f"Your training '{course_title}' is due in {days_remaining} days.",
entity_type="assignment",
entity_id=assignment.assignment_id,
template_data={
"course_title": course_title,
"due_date": assignment.due_date.isoformat(),
"days_remaining": str(days_remaining),
},
)
reminders_sent += 1
logger.info(f"Company {company_id}: {reminders_sent} training reminders sent")
def check_expiring_certifications(self, company_id: UUID):
"""Check for expiring certifications and send alerts."""
# Get company preference for cert alert days
pref = self.db.query(SafePathNotificationPreference).filter(
SafePathNotificationPreference.company_id == company_id,
SafePathNotificationPreference.user_id == None,
SafePathNotificationPreference.notification_type == "cert_expiring",
).first()
alert_days = pref.cert_alert_days_before_expiry if pref else 60
# Also run auto-assignment engine for cert_expiring rules
self.auto_assignment_engine.check_expiring_certifications(
company_id, alert_days
)
# Send notifications for expiring certs
cutoff_date = date.today() + timedelta(days=alert_days)
expiring = self.db.query(SafePathCertification).filter(
SafePathCertification.company_id == company_id,
SafePathCertification.status == "active",
SafePathCertification.expiration_date <= cutoff_date,
SafePathCertification.expiration_date >= date.today(),
).options(
joinedload(SafePathCertification.certification_type),
).all()
alerts_sent = 0
for cert in expiring:
# Don't re-alert within 7 days
existing_notif = self.db.query(SafePathNotification).filter(
SafePathNotification.user_id == cert.user_id,
SafePathNotification.notification_type == "cert_expiring",
SafePathNotification.entity_id == cert.certification_id,
SafePathNotification.created_at >= datetime.combine(
date.today() - timedelta(days=7), datetime.min.time()
),
).first()
if existing_notif:
continue
days_until = (cert.expiration_date - date.today()).days
cert_type_name = cert.certification_type.name if cert.certification_type else "Unknown"
self.notification_service.send_notification(
company_id=company_id,
user_id=cert.user_id,
notification_type="cert_expiring",
title=f"Certification Expiring: {cert_type_name}",
message=f"Your {cert_type_name} certification expires in {days_until} days.",
entity_type="certification",
entity_id=cert.certification_id,
template_data={
"cert_type_name": cert_type_name,
"expiration_date": cert.expiration_date.isoformat(),
"days_until": str(days_until),
},
)
alerts_sent += 1
logger.info(f"Company {company_id}: {alerts_sent} certification alerts sent")
7. API Endpoints
File: app/api/v1/safepath/automation.py
"""SafePath Automation API
Endpoints for auto-assignment rules, notifications, and certification alerts.
"""
from uuid import UUID
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.orm import Session
from app.db.session import get_db
from app.api.dependencies.permissions import get_user_context
from app.services.safepath.auto_assignment_engine import AutoAssignmentEngine
from app.services.safepath.notification_service import NotificationService
from app.schemas.safepath.automation import (
AutoAssignmentRuleCreate,
AutoAssignmentRuleUpdate,
AutoAssignmentRuleResponse,
AutoAssignmentRuleListResponse,
RuleEvaluationRequest,
RuleEvaluationResult,
NotificationListResponse,
NotificationMarkReadRequest,
NotificationPreferenceResponse,
NotificationPreferenceUpdate,
CertExpirationSummary,
CertExpirationAlert,
)
router = APIRouter(prefix="/safepath/automation", tags=["SafePath Automation"])
# ============================================================================
# Auto-Assignment Rules
# ============================================================================
@router.get(
"/rules",
response_model=AutoAssignmentRuleListResponse,
summary="List auto-assignment rules",
)
async def list_rules(
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
trigger_type: Optional[str] = Query(None),
is_active: Optional[bool] = Query(None),
db: Session = Depends(get_db),
user_ctx=Depends(get_user_context),
):
"""List auto-assignment rules with optional filtering."""
engine = AutoAssignmentEngine(db)
return engine.list_rules(
user_ctx.company_id, page, page_size, trigger_type, is_active
)
@router.post(
"/rules",
response_model=AutoAssignmentRuleResponse,
status_code=status.HTTP_201_CREATED,
summary="Create auto-assignment rule",
)
async def create_rule(
request: AutoAssignmentRuleCreate,
db: Session = Depends(get_db),
user_ctx=Depends(get_user_context),
):
"""Create a new auto-assignment rule.
Requires: Standard or Pro tier (ASSIGNMENT_AUTO_RULES capability).
"""
engine = AutoAssignmentEngine(db)
try:
rule = engine.create_rule(request, user_ctx.company_id, user_ctx.user_id)
return AutoAssignmentRuleResponse(
rule_id=rule.rule_id,
company_id=rule.company_id,
course_id=rule.course_id,
trigger_type=rule.trigger_type,
trigger_config=rule.trigger_config,
due_date_offset_days=rule.due_date_offset_days,
is_active=rule.is_active,
created_by=rule.created_by,
created_at=rule.created_at,
)
except ValueError as e:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=str(e))
@router.put(
"/rules/{rule_id}",
response_model=AutoAssignmentRuleResponse,
summary="Update auto-assignment rule",
)
async def update_rule(
rule_id: UUID,
request: AutoAssignmentRuleUpdate,
db: Session = Depends(get_db),
user_ctx=Depends(get_user_context),
):
"""Update an existing auto-assignment rule."""
engine = AutoAssignmentEngine(db)
rule = engine.update_rule(rule_id, request, user_ctx.company_id, user_ctx.user_id)
if not rule:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Rule not found.")
return AutoAssignmentRuleResponse(
rule_id=rule.rule_id,
company_id=rule.company_id,
course_id=rule.course_id,
trigger_type=rule.trigger_type,
trigger_config=rule.trigger_config,
due_date_offset_days=rule.due_date_offset_days,
is_active=rule.is_active,
created_by=rule.created_by,
created_at=rule.created_at,
updated_at=rule.updated_at,
)
@router.delete(
"/rules/{rule_id}",
summary="Delete auto-assignment rule",
)
async def delete_rule(
rule_id: UUID,
db: Session = Depends(get_db),
user_ctx=Depends(get_user_context),
):
"""Delete an auto-assignment rule."""
engine = AutoAssignmentEngine(db)
if not engine.delete_rule(rule_id, user_ctx.company_id, user_ctx.user_id):
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Rule not found.")
return {"success": True, "message": "Rule deleted."}
@router.post(
"/rules/evaluate",
response_model=RuleEvaluationResult,
summary="Evaluate rules for a trigger event",
)
async def evaluate_rules(
request: RuleEvaluationRequest,
db: Session = Depends(get_db),
user_ctx=Depends(get_user_context),
):
"""Evaluate all active rules for a specific trigger event.
Called internally when events occur (new hire, role change, etc.).
"""
engine = AutoAssignmentEngine(db)
return engine.evaluate_trigger(request)
# ============================================================================
# Notifications
# ============================================================================
@router.get(
"/notifications",
response_model=NotificationListResponse,
summary="List notifications for current user",
)
async def list_notifications(
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
unread_only: bool = Query(False),
db: Session = Depends(get_db),
user_ctx=Depends(get_user_context),
):
"""List SafePath notifications for the current user."""
service = NotificationService(db)
return service.list_notifications(
user_ctx.company_id, user_ctx.user_id, page, page_size, unread_only
)
@router.post(
"/notifications/mark-read",
summary="Mark notifications as read",
)
async def mark_notifications_read(
request: NotificationMarkReadRequest,
db: Session = Depends(get_db),
user_ctx=Depends(get_user_context),
):
"""Mark one or more notifications as read."""
service = NotificationService(db)
count = service.mark_read(request.notification_ids, user_ctx.user_id)
return {"success": True, "marked_read": count}
@router.post(
"/notifications/mark-all-read",
summary="Mark all notifications as read",
)
async def mark_all_notifications_read(
db: Session = Depends(get_db),
user_ctx=Depends(get_user_context),
):
"""Mark all notifications as read for the current user."""
service = NotificationService(db)
count = service.mark_all_read(user_ctx.company_id, user_ctx.user_id)
return {"success": True, "marked_read": count}
# ============================================================================
# Notification Preferences
# ============================================================================
@router.get(
"/notification-preferences",
response_model=list[NotificationPreferenceResponse],
summary="Get notification preferences",
)
async def get_notification_preferences(
db: Session = Depends(get_db),
user_ctx=Depends(get_user_context),
):
"""Get notification preferences for the current user."""
service = NotificationService(db)
return service.get_preferences(user_ctx.company_id, user_ctx.user_id)
@router.put(
"/notification-preferences/{notification_type}",
summary="Update notification preference",
)
async def update_notification_preference(
notification_type: str,
request: NotificationPreferenceUpdate,
db: Session = Depends(get_db),
user_ctx=Depends(get_user_context),
):
"""Update notification preference for a specific type."""
service = NotificationService(db)
pref = service.update_preference(
user_ctx.company_id, user_ctx.user_id, notification_type, request
)
return {"success": True, "message": "Preference updated."}
# ============================================================================
# Certification Expiration Alerts
# ============================================================================
@router.get(
"/cert-expiration-summary",
response_model=CertExpirationSummary,
summary="Get certification expiration summary",
)
async def get_cert_expiration_summary(
db: Session = Depends(get_db),
user_ctx=Depends(get_user_context),
):
"""Get a summary of certification expirations for the company.
Shows counts of certs expiring within 30, 60, 90 days and already expired.
"""
from datetime import date, timedelta
from app.db.models.safepath import SafePathCertification
from sqlalchemy import func
company_id = user_ctx.company_id
def count_expiring(days_from_now: int) -> int:
cutoff = date.today() + timedelta(days=days_from_now)
return db.query(func.count(SafePathCertification.certification_id)).filter(
SafePathCertification.company_id == company_id,
SafePathCertification.status == "active",
SafePathCertification.expiration_date <= cutoff,
SafePathCertification.expiration_date >= date.today(),
).scalar() or 0
expired_count = db.query(func.count(SafePathCertification.certification_id)).filter(
SafePathCertification.company_id == company_id,
SafePathCertification.status == "active",
SafePathCertification.expiration_date < date.today(),
).scalar() or 0
# Get detailed alerts for expiring within 90 days
engine = AutoAssignmentEngine(db)
alerts_data = engine.check_expiring_certifications(company_id, alert_days=90)
alerts = [
CertExpirationAlert(
certification_id=a["certification_id"],
user_id=a["user_id"],
user_name=a["user_name"],
user_email="",
certification_type_name=a["certification_type"],
expiration_date=a["expiration_date"],
days_until_expiry=a["days_until_expiry"],
status="expiring_soon" if a["days_until_expiry"] > 0 else "expired",
auto_refresher_assigned=a.get("auto_refresher_assigned", False),
)
for a in alerts_data
]
return CertExpirationSummary(
expiring_within_30_days=count_expiring(30),
expiring_within_60_days=count_expiring(60),
expiring_within_90_days=count_expiring(90),
already_expired=expired_count,
alerts=alerts,
)
8. Frontend Changes
8.1 TypeScript Interfaces
File: src/types/safepath.ts (add to existing file)
// ============================================================================
// Automation Types
// ============================================================================
export interface AutoAssignmentRule {
rule_id: string;
company_id: string;
course_id: string;
course_title?: string;
trigger_type: 'new_hire' | 'role_change' | 'site_transfer' | 'cert_expiring' | 'chemical_added';
trigger_config: Record<string, any>;
due_date_offset_days: number;
is_active: boolean;
created_by?: string;
created_by_name?: string;
created_at: string;
updated_at?: string;
assignments_created_count: number;
}
export interface AutoAssignmentRuleCreate {
course_id: string;
trigger_type: string;
trigger_config: Record<string, any>;
due_date_offset_days: number;
is_active: boolean;
}
export interface SafePathNotification {
notification_id: string;
notification_type: string;
title: string;
message: string;
entity_type?: string;
entity_id?: string;
is_read: boolean;
read_at?: string;
channels_sent: string[];
created_at: string;
}
export interface NotificationListResponse {
items: SafePathNotification[];
total: number;
unread_count: number;
page: number;
page_size: number;
total_pages: number;
}
export interface NotificationPreference {
preference_id: string;
notification_type: string;
email_enabled: boolean;
in_app_enabled: boolean;
reminder_days_before_due: number;
cert_alert_days_before_expiry: number;
manager_alert_enabled: boolean;
webhook_url?: string;
}
export interface CertExpirationAlert {
certification_id: string;
user_id: string;
user_name: string;
user_email: string;
certification_type_name: string;
expiration_date: string;
days_until_expiry: number;
status: 'expiring_soon' | 'expired';
auto_refresher_assigned: boolean;
refresher_course_id?: string;
}
export interface CertExpirationSummary {
expiring_within_30_days: number;
expiring_within_60_days: number;
expiring_within_90_days: number;
already_expired: number;
alerts: CertExpirationAlert[];
}
8.2 API Methods
File: src/services/api/safepath.api.ts (add to existing file)
// ============================================================================
// Automation API
// ============================================================================
// Auto-Assignment Rules
export const listAutoAssignmentRules = async (
params: { page?: number; page_size?: number; trigger_type?: string; is_active?: boolean },
token: string, userId: string, companyId: string,
) => {
const qs = new URLSearchParams();
if (params.page) qs.set('page', String(params.page));
if (params.page_size) qs.set('page_size', String(params.page_size));
if (params.trigger_type) qs.set('trigger_type', params.trigger_type);
if (params.is_active !== undefined) qs.set('is_active', String(params.is_active));
const res = await fetch(`${API_URL}/api/v1/safepath/automation/rules?${qs}`, {
headers: authHeaders(token, userId, companyId),
});
if (!res.ok) throw new Error('Failed to list rules');
return res.json();
};
export const createAutoAssignmentRule = async (
request: AutoAssignmentRuleCreate, token: string, userId: string, companyId: string,
) => {
const res = await fetch(`${API_URL}/api/v1/safepath/automation/rules`, {
method: 'POST',
headers: authHeaders(token, userId, companyId),
body: JSON.stringify(request),
});
if (!res.ok) throw new Error('Failed to create rule');
return res.json();
};
export const updateAutoAssignmentRule = async (
ruleId: string, request: Partial<AutoAssignmentRuleCreate>,
token: string, userId: string, companyId: string,
) => {
const res = await fetch(`${API_URL}/api/v1/safepath/automation/rules/${ruleId}`, {
method: 'PUT',
headers: authHeaders(token, userId, companyId),
body: JSON.stringify(request),
});
if (!res.ok) throw new Error('Failed to update rule');
return res.json();
};
export const deleteAutoAssignmentRule = async (
ruleId: string, token: string, userId: string, companyId: string,
) => {
const res = await fetch(`${API_URL}/api/v1/safepath/automation/rules/${ruleId}`, {
method: 'DELETE',
headers: authHeaders(token, userId, companyId),
});
if (!res.ok) throw new Error('Failed to delete rule');
return res.json();
};
// Notifications
export const listNotifications = async (
params: { page?: number; unread_only?: boolean },
token: string, userId: string, companyId: string,
) => {
const qs = new URLSearchParams();
if (params.page) qs.set('page', String(params.page));
if (params.unread_only) qs.set('unread_only', 'true');
const res = await fetch(`${API_URL}/api/v1/safepath/automation/notifications?${qs}`, {
headers: authHeaders(token, userId, companyId),
});
if (!res.ok) throw new Error('Failed to list notifications');
return res.json();
};
export const markNotificationsRead = async (
notificationIds: string[], token: string, userId: string, companyId: string,
) => {
const res = await fetch(`${API_URL}/api/v1/safepath/automation/notifications/mark-read`, {
method: 'POST',
headers: authHeaders(token, userId, companyId),
body: JSON.stringify({ notification_ids: notificationIds }),
});
if (!res.ok) throw new Error('Failed to mark read');
return res.json();
};
export const markAllNotificationsRead = async (
token: string, userId: string, companyId: string,
) => {
const res = await fetch(`${API_URL}/api/v1/safepath/automation/notifications/mark-all-read`, {
method: 'POST',
headers: authHeaders(token, userId, companyId),
});
if (!res.ok) throw new Error('Failed to mark all read');
return res.json();
};
// Notification Preferences
export const getNotificationPreferences = async (
token: string, userId: string, companyId: string,
) => {
const res = await fetch(`${API_URL}/api/v1/safepath/automation/notification-preferences`, {
headers: authHeaders(token, userId, companyId),
});
if (!res.ok) throw new Error('Failed to get preferences');
return res.json();
};
export const updateNotificationPreference = async (
notificationType: string, request: Partial<NotificationPreference>,
token: string, userId: string, companyId: string,
) => {
const res = await fetch(`${API_URL}/api/v1/safepath/automation/notification-preferences/${notificationType}`, {
method: 'PUT',
headers: authHeaders(token, userId, companyId),
body: JSON.stringify(request),
});
if (!res.ok) throw new Error('Failed to update preference');
return res.json();
};
// Certification Alerts
export const getCertExpirationSummary = async (
token: string, userId: string, companyId: string,
) => {
const res = await fetch(`${API_URL}/api/v1/safepath/automation/cert-expiration-summary`, {
headers: authHeaders(token, userId, companyId),
});
if (!res.ok) throw new Error('Failed to get cert summary');
return res.json();
};
8.3 Frontend Pages
Automation Hub — src/pages/safepath/automation/index.tsx
Dashboard page with three sections:
- Auto-Assignment Rules — card showing active rules count, link to rules page
- Notifications — card showing unread count, recent notifications
- Certification Alerts — card with expiring/expired cert counts, color-coded (green/yellow/red)
Rules Manager — src/pages/safepath/automation/rules.tsx
Full CRUD interface for auto-assignment rules:
- Table listing all rules with trigger type, course, status, assignments created count
- "Create Rule" button → modal with:
- Course dropdown (published courses only)
- Trigger type dropdown (new_hire, role_change, site_transfer, cert_expiring)
- Trigger config fields (dynamic based on trigger type)
- Due date offset (days)
- Active toggle
- Edit inline / delete with confirmation
- Filter by trigger type, active/inactive
Notification Preferences — src/pages/safepath/automation/notifications.tsx
Settings page for notification preferences:
- Toggle email/in-app per notification type
- Set reminder days before due (1-30)
- Set cert alert days before expiry (7-365)
- Webhook URL input (Pro only, grayed out for other tiers)
- Manager alert toggle (Pro only)
9. Integration Points
9.1 User Onboarding Hook
When a new user is added to a company (user invitation accepted), evaluate new_hire rules:
File: app/services/user_service.py (or invitation acceptance handler)
# After user accepts invitation and is added to company:
try:
from app.services.safepath.auto_assignment_engine import AutoAssignmentEngine
from app.schemas.safepath.automation import RuleEvaluationRequest
engine = AutoAssignmentEngine(db)
engine.evaluate_trigger(RuleEvaluationRequest(
trigger_type="new_hire",
user_id=user.user_id,
company_id=company_id,
trigger_data={
"role_id": str(role_id) if role_id else "",
"site_id": str(site_id) if site_id else "",
},
))
except Exception as e:
logger.warning(f"SafePath auto-assignment hook failed: {e}")
9.2 Role Change Hook
When a user's role changes:
try:
engine = AutoAssignmentEngine(db)
engine.evaluate_trigger(RuleEvaluationRequest(
trigger_type="role_change",
user_id=user_id,
company_id=company_id,
trigger_data={
"previous_role_id": str(old_role_id),
"new_role_id": str(new_role_id),
"new_role_name": new_role_name,
},
))
except Exception as e:
logger.warning(f"SafePath role-change hook failed: {e}")
9.3 Background Service / Cron
The scheduler should be called periodically. Options:
- SQS-based (existing pattern): Add a scheduled SQS message that triggers
SafePathScheduler.run_all_scheduled_tasks() - Cron job:
python -m app.tasks.safepath_schedulercalled by OS cron - FastAPI startup: Background task with
asyncio.create_task(not recommended for production)
Recommended: Add to existing SQS worker pattern:
# In SQS worker message handler
if message_type == "safepath_scheduled_tasks":
from app.services.safepath.scheduler import SafePathScheduler
scheduler = SafePathScheduler(db)
scheduler.run_all_scheduled_tasks()
10. Verification Checklist
Backend
-
SafePathNotificationandSafePathNotificationPreferencemodels create without errors - Alembic migration applies and rolls back cleanly
-
POST /safepath/automation/rulescreates a rule -
GET /safepath/automation/ruleslists rules with filtering -
PUT /safepath/automation/rules/{id}updates a rule -
DELETE /safepath/automation/rules/{id}deletes a rule -
POST /safepath/automation/rules/evaluatecreates assignments for matching rules -
POST /safepath/automation/rules/evaluateskips already-assigned users -
GET /safepath/automation/notificationsreturns paginated notifications -
POST /safepath/automation/notifications/mark-readmarks notifications as read -
GET /safepath/automation/notification-preferencesreturns preferences -
PUT /safepath/automation/notification-preferences/{type}updates preferences -
GET /safepath/automation/cert-expiration-summaryreturns correct counts - Scheduler
check_overdue_assignmentssends notifications and doesn't duplicate - Scheduler
send_training_reminderssends reminders with deduplication - Scheduler
check_expiring_certificationstriggers auto-rules and sends alerts - Email notifications send via Mailgun without errors
- New hire hook evaluates rules when user is added to company
Frontend
- Automation hub page loads with three summary cards
- Rules manager lists all rules with correct data
- Create rule modal works with dynamic trigger config fields
- Edit/delete rules work correctly
- Notification preferences page shows all notification types
- Toggle switches update preferences via API
- Certification alert cards show correct color coding (green/yellow/red)
11. Future Enhancements (Phase 3)
- Recurring assignments — auto-reassign completed courses on a schedule (annual refresher)
- Escalation chains — multi-level manager alerts for overdue training
- SMS notifications — text message alerts via Twilio
- Slack/Teams integration — send notifications to workspace channels
- Compliance dashboards — real-time compliance scorecards per site/role
- Bulk rule templates — pre-built rule sets for common industries (construction, manufacturing, etc.)