SafePath Training — Phase 1C: Assignment & Training Delivery
Overview
This document covers the assignment engine and training delivery pipeline:
- Assignment creation (manual and bulk)
- Training delivery (lesson progress tracking, play-and-lock)
- Quiz submission and server-side scoring
- Completion records with e-signature
- PDF certificate generation
- Classroom/in-person training records
Prerequisite: Phase 1B (course management) must be complete.
Files to create:
tellus-ehs-hazcom-service/app/schemas/safepath/assignment.pytellus-ehs-hazcom-service/app/schemas/safepath/result.pytellus-ehs-hazcom-service/app/schemas/safepath/classroom.pytellus-ehs-hazcom-service/app/services/safepath/assignment_service.pytellus-ehs-hazcom-service/app/services/safepath/delivery_service.pytellus-ehs-hazcom-service/app/services/safepath/quiz_service.pytellus-ehs-hazcom-service/app/services/safepath/certificate_generator.pytellus-ehs-hazcom-service/app/services/safepath/classroom_service.pytellus-ehs-hazcom-service/app/api/v1/safepath/assignments.pytellus-ehs-hazcom-service/app/api/v1/safepath/results.pytellus-ehs-hazcom-service/app/api/v1/safepath/classroom.py
Pydantic Schemas
Assignment Schemas
File: tellus-ehs-hazcom-service/app/schemas/safepath/assignment.py
"""SafePath Assignment Schemas"""
from datetime import date, datetime
from typing import Optional, List
from uuid import UUID
from pydantic import BaseModel, Field, ConfigDict
class AssignmentCreate(BaseModel):
"""Create one or more assignments."""
course_id: UUID
assigned_to_user_ids: List[UUID] = Field(..., min_length=1, description="One or more user IDs")
due_date: date
priority: str = Field(default="normal", pattern="^(normal|urgent)$")
site_id: Optional[UUID] = None
notes: Optional[str] = None
class AssignmentUpdate(BaseModel):
"""Update an assignment (e.g., change due date or priority)."""
due_date: Optional[date] = None
priority: Optional[str] = Field(None, pattern="^(normal|urgent)$")
notes: Optional[str] = None
class AssignmentListItem(BaseModel):
"""Assignment list item with user and course info."""
model_config = ConfigDict(from_attributes=True)
assignment_id: UUID
course_id: UUID
course_title: str
course_version: int
assigned_to: UUID
assigned_to_name: str
assigned_to_email: str
assigned_by_name: Optional[str] = None
site_name: Optional[str] = None
due_date: date
priority: str
status: str
notes: Optional[str] = None
latest_attempt_number: int = 0
latest_score_percent: Optional[float] = None
completed_at: Optional[datetime] = None
created_at: datetime
class AssignmentListResponse(BaseModel):
"""Paginated assignment list."""
items: List[AssignmentListItem]
total: int
page: int
page_size: int
total_pages: int
class AssignmentDetailResponse(BaseModel):
"""Full assignment detail for learner view."""
model_config = ConfigDict(from_attributes=True)
assignment_id: UUID
course_id: UUID
course_title: str
course_description: Optional[str] = None
course_version: int
passing_score_percent: int
max_retakes: int
due_date: date
priority: str
status: str
lessons: List[dict] # Lesson data with progress overlay
quiz: Optional[dict] = None # Quiz data without correct answers
attempts: List["AttemptSummary"] = []
can_retake: bool = True
class AttemptSummary(BaseModel):
"""Summary of a single training attempt."""
attempt_number: int
score_percent: Optional[float] = None
passed: Optional[bool] = None
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
Result Schemas
File: tellus-ehs-hazcom-service/app/schemas/safepath/result.py
"""SafePath Result Schemas"""
from datetime import datetime
from typing import Optional, List
from uuid import UUID
from pydantic import BaseModel, Field, ConfigDict
class LessonProgressUpdate(BaseModel):
"""Update progress for a single lesson."""
lesson_id: UUID
percent_complete: int = Field(..., ge=0, le=100)
last_position: Optional[int] = None # For video: seconds watched
class QuizSubmission(BaseModel):
"""Submit quiz answers for scoring."""
answers: List[dict] = Field(
...,
description='Array of {"question_id": "uuid", "selected": ["a"]} objects',
)
class TrainingCompletion(BaseModel):
"""Complete training with e-signature acknowledgment."""
signature_text: str = Field(..., min_length=1, description="Typed name as e-signature")
class ResultResponse(BaseModel):
"""Training result response."""
model_config = ConfigDict(from_attributes=True)
result_id: UUID
assignment_id: UUID
user_id: UUID
course_id: UUID
attempt_number: int
score_percent: Optional[float] = None
passed: Optional[bool] = None
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
duration_seconds: Optional[int] = None
delivery_method: str
instructor_name: Optional[str] = None
acknowledgment_at: Optional[datetime] = None
created_at: datetime
class QuizScoreResponse(BaseModel):
"""Response after quiz submission."""
score_percent: float
passed: bool
correct_count: int
total_questions: int
can_retake: bool
attempts_remaining: int
feedback: Optional[List[dict]] = None # Per-question feedback if enabled
Classroom Schemas
File: tellus-ehs-hazcom-service/app/schemas/safepath/classroom.py
"""SafePath Classroom Session Schemas"""
from datetime import date, time, datetime
from typing import Optional, List
from uuid import UUID
from pydantic import BaseModel, Field, ConfigDict
class ClassroomSessionCreate(BaseModel):
"""Create an in-person training session."""
course_id: UUID
instructor_name: str = Field(..., min_length=1, max_length=255)
session_date: date
start_time: Optional[time] = None
end_time: Optional[time] = None
location: Optional[str] = Field(None, max_length=255)
site_id: Optional[UUID] = None
max_attendees: Optional[int] = Field(None, ge=1)
notes: Optional[str] = None
class AttendanceRecord(BaseModel):
"""Single attendee record for a classroom session."""
user_id: UUID
passed: bool = True
score_percent: Optional[float] = Field(None, ge=0, le=100)
notes: Optional[str] = None
class ClassroomAttendanceSubmit(BaseModel):
"""Submit attendance for a classroom session."""
attendees: List[AttendanceRecord] = Field(..., min_length=1)
class ClassroomSessionResponse(BaseModel):
"""Response for a classroom session."""
model_config = ConfigDict(from_attributes=True)
session_id: UUID
course_id: UUID
course_title: str
instructor_name: str
session_date: date
start_time: Optional[time] = None
end_time: Optional[time] = None
location: Optional[str] = None
site_name: Optional[str] = None
max_attendees: Optional[int] = None
attendee_count: int = 0
notes: Optional[str] = None
created_at: datetime
class ClassroomSessionListResponse(BaseModel):
"""Paginated classroom session list."""
items: List[ClassroomSessionResponse]
total: int
page: int
page_size: int
total_pages: int
Assignment Service
File: tellus-ehs-hazcom-service/app/services/safepath/assignment_service.py
"""SafePath Assignment Service
Handles assignment creation, status management, and bulk operations.
"""
from typing import Optional, List, Tuple
from uuid import UUID
from datetime import datetime, date
from sqlalchemy import func, and_
from sqlalchemy.orm import Session, joinedload
from app.db.models.safepath import (
Assignment,
Course,
Result,
SafePathAuditLog,
)
from app.db.models.user import User
from app.db.models.company import CompanySite
from app.schemas.safepath.assignment import AssignmentCreate, AssignmentUpdate, AssignmentListItem
class AssignmentService:
"""Service for managing training assignments."""
def __init__(self, db: Session):
self.db = db
def create_assignments(
self, company_id: UUID, assigned_by: UUID, data: AssignmentCreate
) -> List[Assignment]:
"""Create assignments for one or more users.
Validates:
- Course exists and is published
- Users belong to the company
- No duplicate active assignment for same user+course
"""
# Validate course
course = (
self.db.query(Course)
.filter(Course.course_id == data.course_id, Course.company_id == company_id)
.first()
)
if not course:
raise ValueError("Course not found")
if course.status != "published":
raise ValueError("Can only assign published courses")
created = []
for user_id in data.assigned_to_user_ids:
# Check for existing active assignment
existing = (
self.db.query(Assignment)
.filter(
Assignment.course_id == data.course_id,
Assignment.assigned_to == user_id,
Assignment.company_id == company_id,
Assignment.status.in_(["pending", "in_progress"]),
)
.first()
)
if existing:
continue # Skip duplicate
assignment = Assignment(
company_id=company_id,
course_id=data.course_id,
course_version=course.version_number,
assigned_to=user_id,
assigned_by=assigned_by,
site_id=data.site_id,
due_date=data.due_date,
priority=data.priority,
status="pending",
notes=data.notes,
)
self.db.add(assignment)
self.db.flush()
created.append(assignment)
# Audit log
self._log_event(
company_id=company_id,
event_type="assignment.created",
entity_type="assignment",
entity_id=assignment.assignment_id,
user_id=assigned_by,
details={
"course_id": str(data.course_id),
"assigned_to": str(user_id),
"due_date": str(data.due_date),
},
)
return created
def list_assignments(
self,
company_id: UUID,
page: int = 1,
page_size: int = 25,
status: Optional[str] = None,
user_id: Optional[UUID] = None,
course_id: Optional[UUID] = None,
site_id: Optional[UUID] = None,
overdue_only: bool = False,
) -> Tuple[List[AssignmentListItem], int]:
"""List assignments with filtering and pagination."""
query = self.db.query(Assignment).filter(Assignment.company_id == company_id)
if status:
query = query.filter(Assignment.status == status)
if user_id:
query = query.filter(Assignment.assigned_to == user_id)
if course_id:
query = query.filter(Assignment.course_id == course_id)
if site_id:
query = query.filter(Assignment.site_id == site_id)
if overdue_only:
query = query.filter(
Assignment.status.in_(["pending", "in_progress"]),
Assignment.due_date < date.today(),
)
total = query.count()
assignments = (
query
.options(joinedload(Assignment.course), joinedload(Assignment.results))
.order_by(Assignment.due_date.asc())
.offset((page - 1) * page_size)
.limit(page_size)
.all()
)
items = []
for a in assignments:
# Get user info
user = self.db.query(User).filter(User.user_id == a.assigned_to).first()
assigner = (
self.db.query(User).filter(User.user_id == a.assigned_by).first()
if a.assigned_by else None
)
site = (
self.db.query(CompanySite).filter(CompanySite.site_id == a.site_id).first()
if a.site_id else None
)
# Latest attempt info
latest_result = (
self.db.query(Result)
.filter(Result.assignment_id == a.assignment_id)
.order_by(Result.attempt_number.desc())
.first()
)
items.append(AssignmentListItem(
assignment_id=a.assignment_id,
course_id=a.course_id,
course_title=a.course.title if a.course else "Unknown",
course_version=a.course_version,
assigned_to=a.assigned_to,
assigned_to_name=f"{user.first_name} {user.last_name}" if user else "Unknown",
assigned_to_email=user.email if user else "",
assigned_by_name=(
f"{assigner.first_name} {assigner.last_name}" if assigner else None
),
site_name=site.site_name if site else None,
due_date=a.due_date,
priority=a.priority,
status=a.status,
notes=a.notes,
latest_attempt_number=latest_result.attempt_number if latest_result else 0,
latest_score_percent=(
float(latest_result.score_percent) if latest_result and latest_result.score_percent else None
),
completed_at=latest_result.completed_at if latest_result and latest_result.passed else None,
created_at=a.created_at,
))
return items, total
def update_assignment(
self, company_id: UUID, assignment_id: UUID, data: AssignmentUpdate
) -> Optional[Assignment]:
"""Update assignment details (due date, priority, notes)."""
assignment = (
self.db.query(Assignment)
.filter(
Assignment.assignment_id == assignment_id,
Assignment.company_id == company_id,
)
.first()
)
if not assignment:
return None
update_data = data.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(assignment, field, value)
assignment.updated_at = datetime.utcnow()
self.db.flush()
return assignment
def mark_overdue(self, company_id: UUID) -> int:
"""Batch update: mark past-due pending/in_progress assignments as overdue.
Called by a background job daily.
Returns count of updated assignments.
"""
count = (
self.db.query(Assignment)
.filter(
Assignment.company_id == company_id,
Assignment.status.in_(["pending", "in_progress"]),
Assignment.due_date < date.today(),
)
.update({"status": "overdue", "updated_at": datetime.utcnow()}, synchronize_session=False)
)
return count
def _log_event(self, **kwargs):
log = SafePathAuditLog(**kwargs)
self.db.add(log)
Quiz Scoring Service
File: tellus-ehs-hazcom-service/app/services/safepath/quiz_service.py
"""SafePath Quiz Scoring Service
Server-side quiz scoring. Answers are validated against the stored
correct options. Score is computed as (correct / total) * 100.
"""
from typing import List, Tuple, Optional
from uuid import UUID
from sqlalchemy.orm import Session
from app.db.models.safepath import Quiz, QuizQuestion
class QuizService:
"""Server-side quiz scoring."""
def __init__(self, db: Session):
self.db = db
def score_quiz(
self, course_id: UUID, answers: List[dict]
) -> Tuple[float, bool, int, int, List[dict]]:
"""Score a quiz submission.
Args:
course_id: The course containing the quiz
answers: List of {"question_id": "uuid", "selected": ["a", "b"]}
Returns:
(score_percent, passed, correct_count, total_questions, feedback)
"""
# Get the quiz and questions for this course
quiz = (
self.db.query(Quiz)
.filter(Quiz.course_id == course_id)
.first()
)
if not quiz:
return 0.0, False, 0, 0, []
questions = (
self.db.query(QuizQuestion)
.filter(QuizQuestion.quiz_id == quiz.quiz_id)
.all()
)
if not questions:
return 100.0, True, 0, 0, []
question_map = {str(q.question_id): q for q in questions}
answer_map = {a["question_id"]: a.get("selected", []) for a in answers}
correct_count = 0
feedback = []
for q in questions:
qid = str(q.question_id)
selected = answer_map.get(qid, [])
is_correct = self._check_answer(q, selected)
if is_correct:
correct_count += 1
feedback.append({
"question_id": qid,
"correct": is_correct,
"explanation": q.explanation,
})
total = len(questions)
score_percent = round((correct_count / total) * 100, 2) if total > 0 else 0.0
# Get passing score from course
from app.db.models.safepath import Course
course = self.db.query(Course).filter(Course.course_id == course_id).first()
passing_score = course.passing_score_percent if course else 80
passed = score_percent >= passing_score
return score_percent, passed, correct_count, total, feedback
def _check_answer(self, question: QuizQuestion, selected: List[str]) -> bool:
"""Check if selected answer(s) are correct for a question."""
options = question.options
if not options or not selected:
return False
if question.question_type in ("mcq_single", "true_false"):
# Single correct answer
correct_ids = [o["id"] for o in options if o.get("is_correct")]
return len(selected) == 1 and selected[0] in correct_ids
elif question.question_type == "mcq_multi":
# All correct must be selected, no extras
correct_ids = set(o["id"] for o in options if o.get("is_correct"))
return set(selected) == correct_ids
elif question.question_type == "matching":
# All pairs must be correctly matched
correct_pairs = {o["id"]: o["right"] for o in options if o.get("is_correct", True)}
selected_pairs = {s["id"]: s.get("right", "") for s in selected} if isinstance(selected[0], dict) else {}
return selected_pairs == correct_pairs
return False
Training Delivery Service
File: tellus-ehs-hazcom-service/app/services/safepath/delivery_service.py
"""SafePath Training Delivery Service
Manages the learner experience: starting training, tracking lesson progress,
submitting quizzes, recording completion with e-signature.
"""
import hashlib
from typing import Optional
from uuid import UUID
from datetime import datetime
from sqlalchemy.orm import Session, joinedload
from app.db.models.safepath import (
Assignment,
Course,
Lesson,
Result,
SafePathAuditLog,
)
from app.services.safepath.quiz_service import QuizService
class DeliveryService:
"""Manages the learner training delivery flow."""
def __init__(self, db: Session):
self.db = db
self.quiz_service = QuizService(db)
def start_training(
self, assignment_id: UUID, user_id: UUID
) -> Optional[Result]:
"""Start a training attempt.
Creates a new Result record and updates assignment status to in_progress.
"""
assignment = (
self.db.query(Assignment)
.filter(
Assignment.assignment_id == assignment_id,
Assignment.assigned_to == user_id,
)
.first()
)
if not assignment:
return None
if assignment.status not in ("pending", "in_progress", "overdue"):
raise ValueError(f"Cannot start training for assignment with status '{assignment.status}'")
# Determine attempt number
existing_attempts = (
self.db.query(Result)
.filter(Result.assignment_id == assignment_id)
.count()
)
# Check retake limit
course = self.db.query(Course).filter(Course.course_id == assignment.course_id).first()
if existing_attempts >= (course.max_retakes + 1): # +1 because first attempt is not a "retake"
raise ValueError("Maximum retake attempts exceeded")
result = Result(
assignment_id=assignment_id,
user_id=user_id,
course_id=assignment.course_id,
attempt_number=existing_attempts + 1,
started_at=datetime.utcnow(),
delivery_method="online",
lesson_progress={},
)
self.db.add(result)
# Update assignment status
if assignment.status in ("pending", "overdue"):
assignment.status = "in_progress"
assignment.updated_at = datetime.utcnow()
self.db.flush()
self._log_event(
company_id=assignment.company_id,
event_type="assignment.started",
entity_type="assignment",
entity_id=assignment_id,
user_id=user_id,
details={"attempt_number": result.attempt_number},
)
return result
def update_lesson_progress(
self, result_id: UUID, user_id: UUID, lesson_id: UUID, percent_complete: int, last_position: Optional[int] = None
) -> Optional[Result]:
"""Update progress for a lesson within an active training attempt."""
result = (
self.db.query(Result)
.filter(Result.result_id == result_id, Result.user_id == user_id)
.first()
)
if not result or result.completed_at:
return None
progress = result.lesson_progress or {}
lid = str(lesson_id)
progress[lid] = {
"completed": percent_complete >= 80, # threshold check
"percent": percent_complete,
"last_position": last_position,
}
result.lesson_progress = progress
self.db.flush()
return result
def check_quiz_unlocked(self, result_id: UUID) -> bool:
"""Check if all lessons meet their completion threshold (play-and-lock)."""
result = self.db.query(Result).filter(Result.result_id == result_id).first()
if not result:
return False
lessons = (
self.db.query(Lesson)
.filter(Lesson.course_id == result.course_id)
.all()
)
progress = result.lesson_progress or {}
for lesson in lessons:
lid = str(lesson.lesson_id)
lesson_progress = progress.get(lid, {})
if lesson_progress.get("percent", 0) < lesson.completion_threshold_percent:
return False
return True
def submit_quiz(
self, result_id: UUID, user_id: UUID, answers: list
) -> dict:
"""Submit quiz answers and compute score.
Returns scoring result.
"""
result = (
self.db.query(Result)
.filter(Result.result_id == result_id, Result.user_id == user_id)
.first()
)
if not result:
raise ValueError("Result not found")
if result.completed_at:
raise ValueError("This attempt is already completed")
# Check play-and-lock
if not self.check_quiz_unlocked(result_id):
raise ValueError("Complete all lessons before taking the quiz")
# Score the quiz
score_percent, passed, correct_count, total_questions, feedback = (
self.quiz_service.score_quiz(result.course_id, answers)
)
# Update result
result.quiz_answers = answers
result.score_percent = score_percent
result.passed = passed
# Get course for retake info
course = self.db.query(Course).filter(Course.course_id == result.course_id).first()
assignment = self.db.query(Assignment).filter(
Assignment.assignment_id == result.assignment_id
).first()
total_attempts = (
self.db.query(Result)
.filter(Result.assignment_id == result.assignment_id)
.count()
)
attempts_remaining = max(0, (course.max_retakes + 1) - total_attempts)
can_retake = not passed and attempts_remaining > 0
self.db.flush()
# Audit
event = "quiz.passed" if passed else "quiz.failed"
self._log_event(
company_id=assignment.company_id,
event_type=event,
entity_type="result",
entity_id=result.result_id,
user_id=user_id,
details={"score": score_percent, "passed": passed, "attempt": result.attempt_number},
)
return {
"score_percent": score_percent,
"passed": passed,
"correct_count": correct_count,
"total_questions": total_questions,
"can_retake": can_retake,
"attempts_remaining": attempts_remaining,
"feedback": feedback,
}
def complete_training(
self, result_id: UUID, user_id: UUID, signature_text: str, ip_address: Optional[str] = None
) -> Optional[Result]:
"""Complete training with e-signature acknowledgment.
Only allowed if quiz was passed (or no quiz exists).
"""
result = (
self.db.query(Result)
.filter(Result.result_id == result_id, Result.user_id == user_id)
.first()
)
if not result:
return None
if result.completed_at:
raise ValueError("Training already completed")
# Check quiz passed (if quiz exists)
from app.db.models.safepath import Quiz
has_quiz = self.db.query(Quiz).filter(Quiz.course_id == result.course_id).count() > 0
if has_quiz and not result.passed:
raise ValueError("Must pass the quiz before completing training")
# Record completion
now = datetime.utcnow()
result.completed_at = now
result.acknowledgment_signature = signature_text
result.acknowledgment_at = now
if ip_address:
result.acknowledgment_ip_hash = hashlib.sha256(ip_address.encode()).hexdigest()
if result.started_at:
result.duration_seconds = int((now - result.started_at).total_seconds())
# Update assignment status
assignment = self.db.query(Assignment).filter(
Assignment.assignment_id == result.assignment_id
).first()
if assignment:
assignment.status = "completed"
assignment.updated_at = now
self.db.flush()
self._log_event(
company_id=assignment.company_id if assignment else None,
event_type="assignment.completed",
entity_type="assignment",
entity_id=result.assignment_id,
user_id=user_id,
details={
"result_id": str(result.result_id),
"score": float(result.score_percent) if result.score_percent else None,
"duration_seconds": result.duration_seconds,
},
)
return result
def _log_event(self, **kwargs):
if kwargs.get("company_id"):
log = SafePathAuditLog(**kwargs)
self.db.add(log)
Certificate Generator
File: tellus-ehs-hazcom-service/app/services/safepath/certificate_generator.py
"""SafePath Certificate Generator
Generates PDF certificates for completed training using ReportLab.
Dependencies to add to requirements.txt:
reportlab>=4.0.0
"""
import io
from datetime import datetime
from typing import Optional
from reportlab.lib.pagesizes import LETTER, landscape
from reportlab.lib.units import inch
from reportlab.lib.colors import HexColor
from reportlab.pdfgen import canvas
class CertificateGenerator:
"""Generates training completion certificates as PDF."""
# Tellus brand colors
PRIMARY = HexColor("#1a56db")
DARK = HexColor("#1e293b")
LIGHT_BG = HexColor("#f8fafc")
BORDER = HexColor("#3b82f6")
def generate(
self,
employee_name: str,
course_title: str,
completion_date: datetime,
score_percent: Optional[float] = None,
instructor_name: Optional[str] = None,
company_name: Optional[str] = None,
osha_ref: Optional[str] = None,
) -> bytes:
"""Generate a PDF certificate and return as bytes.
Args:
employee_name: Full name of the employee
course_title: Title of the completed course
completion_date: Date/time of completion
score_percent: Quiz score (if applicable)
instructor_name: Trainer/instructor name
company_name: Company name for branding
osha_ref: OSHA standard reference (e.g., "1910.134")
Returns:
PDF file as bytes
"""
buffer = io.BytesIO()
c = canvas.Canvas(buffer, pagesize=landscape(LETTER))
width, height = landscape(LETTER)
# Background
c.setFillColor(self.LIGHT_BG)
c.rect(0, 0, width, height, fill=1, stroke=0)
# Border
c.setStrokeColor(self.BORDER)
c.setLineWidth(3)
c.rect(0.5 * inch, 0.5 * inch, width - inch, height - inch, fill=0, stroke=1)
# Inner border
c.setLineWidth(1)
c.rect(0.6 * inch, 0.6 * inch, width - 1.2 * inch, height - 1.2 * inch, fill=0, stroke=1)
# Header
c.setFillColor(self.PRIMARY)
c.setFont("Helvetica-Bold", 14)
c.drawCentredString(width / 2, height - 1.2 * inch, "CERTIFICATE OF COMPLETION")
# Company name
if company_name:
c.setFillColor(self.DARK)
c.setFont("Helvetica", 11)
c.drawCentredString(width / 2, height - 1.6 * inch, company_name)
# "This certifies that"
c.setFont("Helvetica", 12)
c.drawCentredString(width / 2, height - 2.2 * inch, "This certifies that")
# Employee name
c.setFont("Helvetica-Bold", 24)
c.setFillColor(self.DARK)
c.drawCentredString(width / 2, height - 2.8 * inch, employee_name)
# "has successfully completed"
c.setFont("Helvetica", 12)
c.drawCentredString(width / 2, height - 3.3 * inch, "has successfully completed")
# Course title
c.setFont("Helvetica-Bold", 18)
c.setFillColor(self.PRIMARY)
c.drawCentredString(width / 2, height - 3.9 * inch, course_title)
# OSHA reference
if osha_ref:
c.setFont("Helvetica", 10)
c.setFillColor(self.DARK)
c.drawCentredString(width / 2, height - 4.3 * inch, f"OSHA Standard: {osha_ref}")
# Score and date
y_bottom = 1.5 * inch
c.setFont("Helvetica", 11)
c.setFillColor(self.DARK)
date_str = completion_date.strftime("%B %d, %Y")
c.drawCentredString(width / 2, y_bottom + 0.6 * inch, f"Date: {date_str}")
if score_percent is not None:
c.drawCentredString(width / 2, y_bottom + 0.3 * inch, f"Score: {score_percent:.0f}%")
# Instructor signature line
if instructor_name:
sig_y = y_bottom - 0.1 * inch
c.line(width / 2 - 1.5 * inch, sig_y, width / 2 + 1.5 * inch, sig_y)
c.setFont("Helvetica", 10)
c.drawCentredString(width / 2, sig_y - 0.2 * inch, instructor_name)
c.setFont("Helvetica", 8)
c.drawCentredString(width / 2, sig_y - 0.4 * inch, "Instructor / Safety Coordinator")
# Footer
c.setFont("Helvetica", 8)
c.setFillColor(HexColor("#94a3b8"))
c.drawCentredString(width / 2, 0.8 * inch, "Generated by Tellus EHS - SafePath Training")
c.save()
buffer.seek(0)
return buffer.read()
API Endpoints
File: tellus-ehs-hazcom-service/app/api/v1/safepath/assignments.py
"""SafePath Assignment & Delivery API Endpoints"""
import math
from typing import Optional
from uuid import UUID
from fastapi import APIRouter, Depends, HTTPException, Query, Request
from sqlalchemy.orm import Session
from app.db.session import get_db
from app.api.v1.adminhq.auth import get_user_context, UserContext
from app.services.safepath.assignment_service import AssignmentService
from app.services.safepath.delivery_service import DeliveryService
from app.services.safepath.certificate_generator import CertificateGenerator
from app.schemas.safepath.assignment import (
AssignmentCreate,
AssignmentUpdate,
AssignmentListResponse,
AssignmentDetailResponse,
)
from app.schemas.safepath.result import (
LessonProgressUpdate,
QuizSubmission,
TrainingCompletion,
ResultResponse,
QuizScoreResponse,
)
router = APIRouter(prefix="/training", tags=["SafePath Training"])
# ============================================================================
# Assignment Management
# ============================================================================
@router.post("/assignments", status_code=201)
def create_assignments(
data: AssignmentCreate,
ctx: UserContext = Depends(get_user_context),
db: Session = Depends(get_db),
):
"""Create training assignments for one or more users."""
service = AssignmentService(db)
try:
assignments = service.create_assignments(ctx.company_id, ctx.user_id, data)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
db.commit()
return {"success": True, "created_count": len(assignments)}
@router.get("/assignments", response_model=AssignmentListResponse)
def list_assignments(
page: int = Query(1, ge=1),
page_size: int = Query(25, ge=1, le=100),
status: Optional[str] = Query(None),
user_id: Optional[UUID] = Query(None),
course_id: Optional[UUID] = Query(None),
site_id: Optional[UUID] = Query(None),
overdue_only: bool = Query(False),
ctx: UserContext = Depends(get_user_context),
db: Session = Depends(get_db),
):
"""List training assignments with filtering."""
service = AssignmentService(db)
items, total = service.list_assignments(
company_id=ctx.company_id,
page=page,
page_size=page_size,
status=status,
user_id=user_id,
course_id=course_id,
site_id=site_id,
overdue_only=overdue_only,
)
return AssignmentListResponse(
items=items,
total=total,
page=page,
page_size=page_size,
total_pages=math.ceil(total / page_size) if total > 0 else 0,
)
@router.put("/assignments/{assignment_id}")
def update_assignment(
assignment_id: UUID,
data: AssignmentUpdate,
ctx: UserContext = Depends(get_user_context),
db: Session = Depends(get_db),
):
"""Update assignment details."""
service = AssignmentService(db)
assignment = service.update_assignment(ctx.company_id, assignment_id, data)
if not assignment:
raise HTTPException(status_code=404, detail="Assignment not found")
db.commit()
return {"success": True}
# ============================================================================
# Training Delivery (Learner Flow)
# ============================================================================
@router.post("/assignments/{assignment_id}/start", response_model=ResultResponse)
def start_training(
assignment_id: UUID,
ctx: UserContext = Depends(get_user_context),
db: Session = Depends(get_db),
):
"""Start a training attempt for an assignment."""
service = DeliveryService(db)
try:
result = service.start_training(assignment_id, ctx.user_id)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
if not result:
raise HTTPException(status_code=404, detail="Assignment not found")
db.commit()
return ResultResponse.model_validate(result)
@router.post("/results/{result_id}/lesson-progress")
def update_lesson_progress(
result_id: UUID,
data: LessonProgressUpdate,
ctx: UserContext = Depends(get_user_context),
db: Session = Depends(get_db),
):
"""Update lesson progress during training."""
service = DeliveryService(db)
result = service.update_lesson_progress(
result_id, ctx.user_id, data.lesson_id, data.percent_complete, data.last_position
)
if not result:
raise HTTPException(status_code=404, detail="Active result not found")
db.commit()
return {"success": True, "lesson_progress": result.lesson_progress}
@router.post("/results/{result_id}/submit-quiz", response_model=QuizScoreResponse)
def submit_quiz(
result_id: UUID,
data: QuizSubmission,
ctx: UserContext = Depends(get_user_context),
db: Session = Depends(get_db),
):
"""Submit quiz answers for server-side scoring."""
service = DeliveryService(db)
try:
score_data = service.submit_quiz(result_id, ctx.user_id, data.answers)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
db.commit()
return QuizScoreResponse(**score_data)
@router.post("/results/{result_id}/complete", response_model=ResultResponse)
def complete_training(
result_id: UUID,
data: TrainingCompletion,
request: Request,
ctx: UserContext = Depends(get_user_context),
db: Session = Depends(get_db),
):
"""Complete training with e-signature acknowledgment."""
ip_address = request.client.host if request.client else None
service = DeliveryService(db)
try:
result = service.complete_training(result_id, ctx.user_id, data.signature_text, ip_address)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
if not result:
raise HTTPException(status_code=404, detail="Result not found")
db.commit()
return ResultResponse.model_validate(result)
@router.get("/results/{result_id}/certificate")
def download_certificate(
result_id: UUID,
ctx: UserContext = Depends(get_user_context),
db: Session = Depends(get_db),
):
"""Download PDF certificate for a completed training."""
from fastapi.responses import StreamingResponse
from app.db.models.safepath import Result, Course
from app.db.models.user import User
from app.db.models.company import Company
result = db.query(Result).filter(Result.result_id == result_id, Result.user_id == ctx.user_id).first()
if not result or not result.completed_at or not result.passed:
raise HTTPException(status_code=404, detail="Completed training result not found")
course = db.query(Course).filter(Course.course_id == result.course_id).first()
user = db.query(User).filter(User.user_id == result.user_id).first()
company = db.query(Company).filter(Company.company_id == ctx.company_id).first()
generator = CertificateGenerator()
pdf_bytes = generator.generate(
employee_name=f"{user.first_name} {user.last_name}" if user else "Unknown",
course_title=course.title if course else "Unknown Course",
completion_date=result.completed_at,
score_percent=float(result.score_percent) if result.score_percent else None,
instructor_name=result.instructor_name,
company_name=company.company_name if company else None,
osha_ref=course.osha_standard_ref if course else None,
)
return StreamingResponse(
io.BytesIO(pdf_bytes),
media_type="application/pdf",
headers={"Content-Disposition": f"attachment; filename=certificate_{result_id}.pdf"},
)
Learner Flow Sequence
1. GET /training/assignments?user_id=me&status=pending
→ List my pending assignments
2. POST /training/assignments/{id}/start
→ Creates Result record (attempt #1), returns result_id
→ Assignment status: pending → in_progress
3. For each lesson:
POST /training/results/{result_id}/lesson-progress
→ Update: {"lesson_id": "...", "percent_complete": 45}
→ Tracks video position, PDF pages read, etc.
4. When all lessons completed (play-and-lock check passes):
POST /training/results/{result_id}/submit-quiz
→ Submits answers, returns score + pass/fail + feedback
→ If failed: can_retake=true, go back to step 2
5. If passed (or no quiz):
POST /training/results/{result_id}/complete
→ Records e-signature, timestamps, duration
→ Assignment status: in_progress → completed
→ Certificate available for download
6. GET /training/results/{result_id}/certificate
→ Download PDF certificate
Verification Checklist
After implementing this phase:
-
Assignment flow:
- Create assignment for a user on a published course
- Duplicate assignment for same user+course is skipped
- Assignment appears in user's list with status "pending"
-
Training delivery:
- Start training creates a result with attempt_number=1
- Lesson progress updates are tracked per-lesson
- Quiz is locked until all lessons meet threshold
- Quiz submission returns server-computed score
- Failed quiz allows retake (within max_retakes limit)
- Completion requires e-signature; records IP hash
-
Certificate:
- PDF certificate downloads for completed+passed results
- Certificate includes employee name, course title, date, score, company name
-
Classroom:
- Create in-person session with instructor, date, location
- Submit attendance creates assignments + results for each attendee