Skip to main content

SafePath Training — Phase 1D: Certification Tracking

Overview

This document covers the certification and credential tracking subsystem — a core standalone feature that works independently of course delivery. EHS trainers and safety managers use this to track all employee certifications (both internal from SafePath courses and external from third-party providers).

Prerequisite: Phase 1A (database schema). Phase 1C (results) is needed for internal certification auto-creation but the core tracking works independently.

Files to create:

  • tellus-ehs-hazcom-service/app/schemas/safepath/certification.py
  • tellus-ehs-hazcom-service/app/services/safepath/certification_service.py
  • tellus-ehs-hazcom-service/app/api/v1/safepath/certifications.py

Pydantic Schemas

File: tellus-ehs-hazcom-service/app/schemas/safepath/certification.py

"""SafePath Certification Schemas"""

from datetime import date, datetime
from typing import Optional, List
from uuid import UUID
from pydantic import BaseModel, Field, ConfigDict


# ============================================================================
# Certification Type Schemas
# ============================================================================

class CertificationTypeResponse(BaseModel):
"""Response for a certification type."""
model_config = ConfigDict(from_attributes=True)

type_id: UUID
name: str
description: Optional[str] = None
default_validity_months: Optional[int] = None
retraining_frequency_months: Optional[int] = None
osha_standard_ref: Optional[str] = None
is_system: bool


class CertificationTypeCreate(BaseModel):
"""Create a custom certification type."""
name: str = Field(..., min_length=1, max_length=150)
description: Optional[str] = None
default_validity_months: Optional[int] = Field(None, ge=1)
retraining_frequency_months: Optional[int] = Field(None, ge=1)
osha_standard_ref: Optional[str] = Field(None, max_length=50)


# ============================================================================
# Certification Record Schemas
# ============================================================================

class CertificationCreate(BaseModel):
"""Log a certification (internal or external)."""
user_id: UUID
certification_type_id: UUID
issuing_authority: Optional[str] = Field(None, max_length=255)
certification_number: Optional[str] = Field(None, max_length=100)
issue_date: date
expiration_date: Optional[date] = None
source: str = Field(default="external", pattern="^(internal|external)$")
result_id: Optional[UUID] = None
notes: Optional[str] = None


class CertificationUpdate(BaseModel):
"""Update a certification record."""
issuing_authority: Optional[str] = Field(None, max_length=255)
certification_number: Optional[str] = Field(None, max_length=100)
expiration_date: Optional[date] = None
notes: Optional[str] = None
status: Optional[str] = Field(None, pattern="^(active|revoked)$")


class CertificationListItem(BaseModel):
"""Certification list item with employee and type info."""
model_config = ConfigDict(from_attributes=True)

certification_id: UUID
user_id: UUID
employee_name: str
employee_email: str
certification_type_name: str
osha_standard_ref: Optional[str] = None
issuing_authority: Optional[str] = None
certification_number: Optional[str] = None
issue_date: date
expiration_date: Optional[date] = None
status: str
source: str
days_until_expiry: Optional[int] = None
has_evidence: bool = False
created_at: datetime


class CertificationListResponse(BaseModel):
"""Paginated certification list."""
items: List[CertificationListItem]
total: int
page: int
page_size: int
total_pages: int


class CertificationDetailResponse(BaseModel):
"""Full certification detail."""
model_config = ConfigDict(from_attributes=True)

certification_id: UUID
user_id: UUID
employee_name: str
certification_type_id: UUID
certification_type_name: str
osha_standard_ref: Optional[str] = None
default_validity_months: Optional[int] = None
retraining_frequency_months: Optional[int] = None
issuing_authority: Optional[str] = None
certification_number: Optional[str] = None
issue_date: date
expiration_date: Optional[date] = None
status: str
source: str
result_id: Optional[UUID] = None
evidence_download_url: Optional[str] = None
notes: Optional[str] = None
created_at: datetime
updated_at: Optional[datetime] = None


# ============================================================================
# Expiration Summary Schemas
# ============================================================================

class ExpirationSummary(BaseModel):
"""Summary of expiring certifications for dashboard."""
expiring_within_30_days: int
expiring_within_60_days: int
expiring_within_90_days: int
already_expired: int
total_active: int


class ExpiringCertificationItem(BaseModel):
"""A single expiring certification."""
certification_id: UUID
employee_name: str
employee_email: str
certification_type_name: str
expiration_date: date
days_until_expiry: int
status: str

Certification Service

File: tellus-ehs-hazcom-service/app/services/safepath/certification_service.py

"""SafePath Certification Service

Manages certification records, expiration tracking, and status updates.
"""

from typing import Optional, List, Tuple
from uuid import UUID
from datetime import date, datetime, timedelta

from sqlalchemy import func, and_, case
from sqlalchemy.orm import Session

from app.db.models.safepath import (
Certification,
CertificationType,
SafePathAuditLog,
)
from app.db.models.user import User
from app.schemas.safepath.certification import (
CertificationCreate,
CertificationUpdate,
CertificationListItem,
ExpirationSummary,
ExpiringCertificationItem,
)


class CertificationService:
"""Service for managing employee certifications."""

def __init__(self, db: Session):
self.db = db

# ================================================================
# Certification Types
# ================================================================

def list_certification_types(self, company_id: UUID) -> List[CertificationType]:
"""List all certification types (system + company custom)."""
return (
self.db.query(CertificationType)
.filter(
(CertificationType.is_system == True) | # noqa: E712
(CertificationType.company_id == company_id)
)
.order_by(CertificationType.name)
.all()
)

def create_certification_type(
self, company_id: UUID, name: str, description: Optional[str] = None,
default_validity_months: Optional[int] = None,
retraining_frequency_months: Optional[int] = None,
osha_standard_ref: Optional[str] = None,
) -> CertificationType:
"""Create a custom certification type for a company."""
cert_type = CertificationType(
name=name,
description=description,
default_validity_months=default_validity_months,
retraining_frequency_months=retraining_frequency_months,
osha_standard_ref=osha_standard_ref,
is_system=False,
company_id=company_id,
)
self.db.add(cert_type)
self.db.flush()
return cert_type

# ================================================================
# Certification CRUD
# ================================================================

def create_certification(
self, company_id: UUID, created_by: UUID, data: CertificationCreate
) -> Certification:
"""Log a new certification record.

If no expiration_date is provided but the certification type has
a default_validity_months, auto-calculate the expiration.
"""
# Auto-calculate expiration if not provided
expiration_date = data.expiration_date
if not expiration_date:
cert_type = self.db.query(CertificationType).filter(
CertificationType.type_id == data.certification_type_id
).first()
if cert_type and cert_type.default_validity_months:
from dateutil.relativedelta import relativedelta
expiration_date = data.issue_date + relativedelta(
months=cert_type.default_validity_months
)

# Determine initial status
status = "active"
if expiration_date:
today = date.today()
if expiration_date < today:
status = "expired"
elif expiration_date <= today + timedelta(days=60):
status = "expiring_soon"

cert = Certification(
company_id=company_id,
user_id=data.user_id,
certification_type_id=data.certification_type_id,
issuing_authority=data.issuing_authority,
certification_number=data.certification_number,
issue_date=data.issue_date,
expiration_date=expiration_date,
status=status,
source=data.source,
result_id=data.result_id,
notes=data.notes,
created_by=created_by,
)
self.db.add(cert)
self.db.flush()

self._log_event(
company_id=company_id,
event_type="certification.logged",
entity_type="certification",
entity_id=cert.certification_id,
user_id=created_by,
details={
"certification_type_id": str(data.certification_type_id),
"employee_user_id": str(data.user_id),
"source": data.source,
},
)
return cert

def list_certifications(
self,
company_id: UUID,
page: int = 1,
page_size: int = 25,
user_id: Optional[UUID] = None,
certification_type_id: Optional[UUID] = None,
status: Optional[str] = None,
expiring_within_days: Optional[int] = None,
) -> Tuple[List[CertificationListItem], int]:
"""List certifications with filtering and pagination."""
query = self.db.query(Certification).filter(Certification.company_id == company_id)

if user_id:
query = query.filter(Certification.user_id == user_id)
if certification_type_id:
query = query.filter(Certification.certification_type_id == certification_type_id)
if status:
query = query.filter(Certification.status == status)
if expiring_within_days:
cutoff = date.today() + timedelta(days=expiring_within_days)
query = query.filter(
Certification.expiration_date.isnot(None),
Certification.expiration_date <= cutoff,
Certification.expiration_date >= date.today(),
)

total = query.count()

certs = (
query
.order_by(Certification.expiration_date.asc().nullslast())
.offset((page - 1) * page_size)
.limit(page_size)
.all()
)

items = []
today = date.today()
for c in certs:
user = self.db.query(User).filter(User.user_id == c.user_id).first()
cert_type = self.db.query(CertificationType).filter(
CertificationType.type_id == c.certification_type_id
).first()

days_until = None
if c.expiration_date:
days_until = (c.expiration_date - today).days

items.append(CertificationListItem(
certification_id=c.certification_id,
user_id=c.user_id,
employee_name=f"{user.first_name} {user.last_name}" if user else "Unknown",
employee_email=user.email if user else "",
certification_type_name=cert_type.name if cert_type else "Unknown",
osha_standard_ref=cert_type.osha_standard_ref if cert_type else None,
issuing_authority=c.issuing_authority,
certification_number=c.certification_number,
issue_date=c.issue_date,
expiration_date=c.expiration_date,
status=c.status,
source=c.source,
days_until_expiry=days_until,
has_evidence=bool(c.evidence_s3_key),
created_at=c.created_at,
))

return items, total

def update_certification(
self, company_id: UUID, certification_id: UUID, data: CertificationUpdate
) -> Optional[Certification]:
"""Update a certification record."""
cert = (
self.db.query(Certification)
.filter(
Certification.certification_id == certification_id,
Certification.company_id == company_id,
)
.first()
)
if not cert:
return None

update_data = data.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(cert, field, value)

# Recalculate status if expiration changed
if "expiration_date" in update_data and cert.expiration_date:
today = date.today()
if cert.expiration_date < today:
cert.status = "expired"
elif cert.expiration_date <= today + timedelta(days=60):
cert.status = "expiring_soon"
else:
cert.status = "active"

cert.updated_at = datetime.utcnow()
self.db.flush()
return cert

# ================================================================
# Expiration Tracking
# ================================================================

def get_expiration_summary(self, company_id: UUID) -> ExpirationSummary:
"""Get summary counts of certification expirations."""
today = date.today()
d30 = today + timedelta(days=30)
d60 = today + timedelta(days=60)
d90 = today + timedelta(days=90)

base = self.db.query(Certification).filter(
Certification.company_id == company_id,
Certification.status != "revoked",
)

expired = base.filter(
Certification.expiration_date.isnot(None),
Certification.expiration_date < today,
).count()

within_30 = base.filter(
Certification.expiration_date.isnot(None),
Certification.expiration_date >= today,
Certification.expiration_date <= d30,
).count()

within_60 = base.filter(
Certification.expiration_date.isnot(None),
Certification.expiration_date >= today,
Certification.expiration_date <= d60,
).count()

within_90 = base.filter(
Certification.expiration_date.isnot(None),
Certification.expiration_date >= today,
Certification.expiration_date <= d90,
).count()

active = base.filter(
(Certification.expiration_date.is_(None)) |
(Certification.expiration_date >= today)
).count()

return ExpirationSummary(
expiring_within_30_days=within_30,
expiring_within_60_days=within_60,
expiring_within_90_days=within_90,
already_expired=expired,
total_active=active,
)

def get_expiring_certifications(
self, company_id: UUID, within_days: int = 90
) -> List[ExpiringCertificationItem]:
"""Get list of certifications expiring within N days."""
today = date.today()
cutoff = today + timedelta(days=within_days)

certs = (
self.db.query(Certification)
.filter(
Certification.company_id == company_id,
Certification.expiration_date.isnot(None),
Certification.expiration_date >= today,
Certification.expiration_date <= cutoff,
Certification.status != "revoked",
)
.order_by(Certification.expiration_date.asc())
.all()
)

items = []
for c in certs:
user = self.db.query(User).filter(User.user_id == c.user_id).first()
cert_type = self.db.query(CertificationType).filter(
CertificationType.type_id == c.certification_type_id
).first()

items.append(ExpiringCertificationItem(
certification_id=c.certification_id,
employee_name=f"{user.first_name} {user.last_name}" if user else "Unknown",
employee_email=user.email if user else "",
certification_type_name=cert_type.name if cert_type else "Unknown",
expiration_date=c.expiration_date,
days_until_expiry=(c.expiration_date - today).days,
status=c.status,
))

return items

def refresh_expiration_statuses(self, company_id: UUID) -> int:
"""Batch update certification statuses based on current date.

Called by a daily background job. Returns count of updated records.
"""
today = date.today()
soon_cutoff = today + timedelta(days=60)
updated = 0

# Mark expired
expired_count = (
self.db.query(Certification)
.filter(
Certification.company_id == company_id,
Certification.status.in_(["active", "expiring_soon"]),
Certification.expiration_date.isnot(None),
Certification.expiration_date < today,
)
.update({"status": "expired", "updated_at": datetime.utcnow()}, synchronize_session=False)
)
updated += expired_count

# Mark expiring soon
soon_count = (
self.db.query(Certification)
.filter(
Certification.company_id == company_id,
Certification.status == "active",
Certification.expiration_date.isnot(None),
Certification.expiration_date >= today,
Certification.expiration_date <= soon_cutoff,
)
.update({"status": "expiring_soon", "updated_at": datetime.utcnow()}, synchronize_session=False)
)
updated += soon_count

return updated

def _log_event(self, **kwargs):
log = SafePathAuditLog(**kwargs)
self.db.add(log)

API Endpoints

File: tellus-ehs-hazcom-service/app/api/v1/safepath/certifications.py

"""SafePath Certification API Endpoints"""

import math
from typing import Optional
from uuid import UUID

from fastapi import APIRouter, Depends, HTTPException, Query, UploadFile, File
from sqlalchemy.orm import Session

from app.db.session import get_db
from app.api.v1.adminhq.auth import get_user_context, UserContext
from app.services.safepath.certification_service import CertificationService
from app.schemas.safepath.certification import (
CertificationTypeResponse,
CertificationTypeCreate,
CertificationCreate,
CertificationUpdate,
CertificationListResponse,
CertificationDetailResponse,
ExpirationSummary,
ExpiringCertificationItem,
)

router = APIRouter(prefix="/training", tags=["SafePath Certifications"])


# ============================================================================
# Certification Types
# ============================================================================

@router.get("/certification-types", response_model=list[CertificationTypeResponse])
def list_certification_types(
ctx: UserContext = Depends(get_user_context),
db: Session = Depends(get_db),
):
"""List all certification types (system + custom)."""
service = CertificationService(db)
types = service.list_certification_types(ctx.company_id)
return [CertificationTypeResponse.model_validate(t) for t in types]


@router.post("/certification-types", response_model=CertificationTypeResponse, status_code=201)
def create_certification_type(
data: CertificationTypeCreate,
ctx: UserContext = Depends(get_user_context),
db: Session = Depends(get_db),
):
"""Create a custom certification type."""
service = CertificationService(db)
cert_type = service.create_certification_type(
company_id=ctx.company_id,
name=data.name,
description=data.description,
default_validity_months=data.default_validity_months,
retraining_frequency_months=data.retraining_frequency_months,
osha_standard_ref=data.osha_standard_ref,
)
db.commit()
return CertificationTypeResponse.model_validate(cert_type)


# ============================================================================
# Certification Records
# ============================================================================

@router.post("/certifications", status_code=201)
def create_certification(
data: CertificationCreate,
ctx: UserContext = Depends(get_user_context),
db: Session = Depends(get_db),
):
"""Log a new certification (internal or external)."""
service = CertificationService(db)
cert = service.create_certification(ctx.company_id, ctx.user_id, data)
db.commit()
return {"success": True, "certification_id": str(cert.certification_id)}


@router.get("/certifications", response_model=CertificationListResponse)
def list_certifications(
page: int = Query(1, ge=1),
page_size: int = Query(25, ge=1, le=100),
user_id: Optional[UUID] = Query(None),
certification_type_id: Optional[UUID] = Query(None),
status: Optional[str] = Query(None, description="active, expiring_soon, expired, revoked"),
expiring_within_days: Optional[int] = Query(None, ge=1, description="Filter certs expiring within N days"),
ctx: UserContext = Depends(get_user_context),
db: Session = Depends(get_db),
):
"""List certifications with filtering."""
service = CertificationService(db)
items, total = service.list_certifications(
company_id=ctx.company_id,
page=page,
page_size=page_size,
user_id=user_id,
certification_type_id=certification_type_id,
status=status,
expiring_within_days=expiring_within_days,
)
return CertificationListResponse(
items=items,
total=total,
page=page,
page_size=page_size,
total_pages=math.ceil(total / page_size) if total > 0 else 0,
)


@router.put("/certifications/{certification_id}")
def update_certification(
certification_id: UUID,
data: CertificationUpdate,
ctx: UserContext = Depends(get_user_context),
db: Session = Depends(get_db),
):
"""Update a certification record."""
service = CertificationService(db)
cert = service.update_certification(ctx.company_id, certification_id, data)
if not cert:
raise HTTPException(status_code=404, detail="Certification not found")
db.commit()
return {"success": True}


@router.post("/certifications/{certification_id}/evidence")
async def upload_evidence(
certification_id: UUID,
file: UploadFile = File(..., description="Certificate scan (PDF or image)"),
ctx: UserContext = Depends(get_user_context),
db: Session = Depends(get_db),
):
"""Upload evidence document for a certification.

Accepts PDF, PNG, JPG files up to 10MB.
Stores in S3 under: safepath/certifications/{company_id}/{certification_id}/
"""
from app.db.models.safepath import Certification

# Validate file
allowed_types = ["application/pdf", "image/png", "image/jpeg"]
if file.content_type not in allowed_types:
raise HTTPException(status_code=400, detail="File must be PDF, PNG, or JPG")

content = await file.read()
if len(content) > 10 * 1024 * 1024: # 10MB
raise HTTPException(status_code=400, detail="File too large (max 10MB)")

cert = db.query(Certification).filter(
Certification.certification_id == certification_id,
Certification.company_id == ctx.company_id,
).first()
if not cert:
raise HTTPException(status_code=404, detail="Certification not found")

# Upload to S3 (follow existing SDS upload pattern)
import boto3
from app.core.config import settings

s3_client = boto3.client("s3")
s3_key = f"safepath/certifications/{ctx.company_id}/{certification_id}/{file.filename}"
s3_bucket = settings.S3_BUCKET_NAME

s3_client.put_object(
Bucket=s3_bucket,
Key=s3_key,
Body=content,
ContentType=file.content_type,
)

cert.evidence_s3_bucket = s3_bucket
cert.evidence_s3_key = s3_key
cert.updated_at = datetime.utcnow()
db.commit()

return {"success": True, "filename": file.filename}


# ============================================================================
# Expiration Tracking
# ============================================================================

@router.get("/certifications/expiration-summary", response_model=ExpirationSummary)
def get_expiration_summary(
ctx: UserContext = Depends(get_user_context),
db: Session = Depends(get_db),
):
"""Get summary of certification expirations for the dashboard."""
service = CertificationService(db)
return service.get_expiration_summary(ctx.company_id)


@router.get("/certifications/expiring", response_model=list[ExpiringCertificationItem])
def get_expiring_certifications(
within_days: int = Query(90, ge=1, le=365),
ctx: UserContext = Depends(get_user_context),
db: Session = Depends(get_db),
):
"""Get list of certifications expiring within N days."""
service = CertificationService(db)
return service.get_expiring_certifications(ctx.company_id, within_days)

Background Job: Expiration Status Refresh

Add to the background service job definitions:

# In tellus-ehs-background-service/app/jobs/definitions.py

async def refresh_certification_statuses():
"""Daily job: Update certification statuses based on expiration dates.

Runs once per day. For each company with SafePath enabled:
1. Mark expired certifications (expiration_date < today)
2. Mark expiring_soon (expiration_date within 60 days)
"""
from app.db.models.safepath import Certification
from app.services.safepath.certification_service import CertificationService

async with get_async_session() as db:
# Get all distinct company_ids with certifications
company_ids = await db.execute(
select(distinct(Certification.company_id))
)
for (company_id,) in company_ids:
service = CertificationService(db)
updated = service.refresh_expiration_statuses(company_id)
if updated > 0:
logger.info(f"Updated {updated} certification statuses for company {company_id}")
await db.commit()

Schedule: Run daily at 2:00 AM UTC.


Verification Checklist

  1. Certification types: GET returns 20 system types + any custom types
  2. Create certification: POST with user_id, type, issue_date creates record; auto-calculates expiration from type defaults
  3. Expiration status: Certifications auto-set to "expired" or "expiring_soon" based on dates
  4. Evidence upload: PDF/image uploads to S3 and links to certification record
  5. Expiration summary: Returns counts for 30/60/90 day windows + expired + active
  6. Expiring list: Returns certifications sorted by nearest expiration
  7. Background job: Daily refresh correctly updates statuses