Skip to main content

Notification System Architecture

Comprehensive architecture document for the Tellus EHS notification system.

Table of Contents

  1. System Overview
  2. Unified Notification System
  3. Delivery Channels
  4. Data Flow
  5. Database Schema
  6. Backend Services
  7. Background Service Integration
  8. Frontend Architecture
  9. Preferences & Settings
  10. Multi-Tenancy & Security

System Overview

The Tellus EHS platform has a unified notification system with 32 types across 7 categories, delivering notifications via in-app (SSE + polling), email (Mailgun), and webhook (Pro tier, planned).

Updated 2026-02-24: The system was unified in Phase 1. The previously separate SafePath notification tables (safepath_notifications, safepath_notification_preferences) were dropped and all training notifications now flow through the single notifications table.

                              TELLUS NOTIFICATION SYSTEM
+---------------------------------------------------------------------+
| |
| +--------------------------------------------------------------+ |
| | Unified Notifications Table | |
| | (32 types / 7 categories) | |
| | | |
| | workflow (9) | inventory (6) | compliance (4) | ai (4) | |
| | export (2) | training (7) | labels (2) | |
| +----------------------------+---------------------------------+ |
| | |
| +-----------------+-----------------+ |
| | | |
| +-------v-------+ +--------v--------+ |
| | SSE Manager | (real-time) | Email (Mailgun) | |
| | + REST API | (CRUD) | via SQS async | |
| +-------+-------+ +-----------------+ |
| | |
| +-------v--------+ |
| | NotificationBell| (frontend) |
| | + 60s polling | |
| +----------------+ |
+---------------------------------------------------------------------+

Unified Notification System

Location: app/db/models/notification.py, app/services/notification_service.py

Handles all notifications across all modules via a single notifications table:

Categories & Types:

CategoryTypesExample
workflowplan_created, plan_published, plan_archived, plan_shared, section_completed, plan_approved, plan_rejected, plan_comment, plan_revision_requested"HazCom Plan 'Q3 Update' was published"
inventorychemical_added, chemical_updated, chemical_removed, inventory_import_complete, sds_attached, sds_expiring"New chemical 'Acetone' added to Site A"
compliancecompliance_gap_detected, compliance_status_changed, audit_scheduled, quantity_threshold_exceeded"Compliance gap: Site B missing 3 SDS documents"
aiai_generation_started, ai_generation_completed, ai_generation_failed, ai_prefill_completed"AI content generation completed for Plan 'Q3'"
exportexport_ready, export_failed"Your HazCom Plan PDF is ready for download"
trainingtraining_assigned, training_reminder, training_overdue, training_completed, cert_expiring, cert_expired, cert_renewed"Training 'GHS Overview' assigned to you"
labelslabel_found, label_not_found"EPA label found for product 'Roundup Pro'"

Key characteristics:

  • Global email_notifications_enabled per-user opt-out (Phase 2)
  • 90-day TTL with automatic cleanup
  • Supports action_url for deep-linking
  • metadata JSONB field for extra context (plan_id, chemical_name, etc.)
  • Tracks actor_user_id and actor_name for "who did it"
  • Deduplication via has_recent_notification() (same entity + type within N days)
  • Email delivery for 15 email-worthy types via background service (Mailgun)

Delivery Channels

Channel 1: In-App (Real-time via SSE)

Status: Implemented

Backend API --> DB Insert --> SSE Manager --> EventSource (Browser)
|
NotificationBell component
  • SSEManager (app/services/sse_manager.py): In-memory singleton using asyncio.Queue per connection
  • SSE Endpoint: GET /api/v1/notifications/stream?token=<jwt>
  • Heartbeat: Every 30 seconds to keep connection alive
  • Reconnect: EventSource auto-reconnects; REST polling (60s) catches missed events
  • Multi-tab: Each tab opens its own SSE connection; all receive push

Channel 2: Email (Mailgun via Background Service)

Status: Implemented (Phase 2, completed 2026-02-24)

Main Service (API)                    SQS Queue                Background Service
+--------------------+ +--------------------+
| NotificationSvc | | |
| .notify_user() |--creates in-app notification-->DB | |
| | | |
| if email-worthy: |--dispatch_email()-->[EMAIL_SEND]--> | handle_email_send |
| | | -> check opt-out |
| | | -> render HTML |
| | | -> Mailgun API |
+--------------------+ +--------------------+
  • 15 email-worthy types defined in EMAIL_WORTHY_TYPES constant
  • Mailgun HTTP API client in background service (app/services/email/mailgun_client.py)
  • HTML + plaintext templates with urgency-based styling (red/amber/blue)
  • User opt-out: email_notifications_enabled boolean on core_data_users table
  • Two dispatch paths: (1) Main service -> SQS -> background for API-originated, (2) Background handlers send directly via helper for background-originated

Channel 3: Webhook (Pro Tier)

Status: Schema only (not implemented)

  • Intended for enterprise integrations (Slack, Teams, custom)

Data Flow

Flow A: Synchronous In-App Notification (General)

Used for events triggered directly by API calls (plan publish, chemical add, etc.):

1. User action (e.g., publish plan)
2. API endpoint calls NotificationService.notify_plan_event()
3. Service creates Notification records in DB (one per recipient)
4. Service calls sse_manager.push_to_users(recipient_ids, event)
5. If email-worthy: dispatches EMAIL_SEND to SQS for each recipient
6. SSE pushes event to connected browsers
7. NotificationBell shows badge + dropdown entry
8. Background service processes EMAIL_SEND, checks opt-out, sends via Mailgun

Flow B: Asynchronous Notification (Background Service)

Used for events originating from background jobs (AI generation, imports):

1. Background job completes (e.g., AI generation)
2. Handler dispatches NOTIFICATION_SEND message to SQS
3. SQS consumer picks up message
4. handle_notification_send() creates DB records via raw SQL
5. If email-worthy: sends email directly via helper function (no re-queue)
6. Frontend catches up on next poll (60s)

Flow C: Scheduled Notifications (Training)

Used for time-based events (overdue, reminders, cert expiry):

1. Cron/scheduler triggers SafePathScheduler.run_all_scheduled_tasks()
2. Scheduler queries for overdue assignments, upcoming due dates, expiring certs
3. For each match, checks deduplication (was user notified recently?)
4. Creates notification record via NotificationService + UnitOfWork
5. Email sent if type is email-worthy and user has email enabled

Flow D: Auto-Assignment Notifications

Used when auto-assignment rules create new training assignments:

1. Trigger event occurs (chemical added, new hire, etc.)
2. API dispatches SAFEPATH_RULE_EVALUATE to SQS
3. Background handler evaluates rules, creates assignments
4. Creates training_assigned notification in notifications table
5. Sends email if user has email enabled

Database Schema

notifications Table

CREATE TABLE notifications (
notification_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
company_id UUID NOT NULL REFERENCES companies(company_id),
recipient_user_id UUID NOT NULL REFERENCES users(user_id),
notification_type VARCHAR(50) NOT NULL,
category VARCHAR(30) NOT NULL,
title VARCHAR(255) NOT NULL,
message TEXT NOT NULL,
entity_type VARCHAR(50), -- 'plan', 'chemical', 'sds', etc.
entity_id UUID, -- ID of the related entity
action_url VARCHAR(500), -- Deep link to relevant page
metadata JSONB DEFAULT '{}', -- Extra context (plan_name, chemical_name, etc.)
actor_user_id UUID, -- Who triggered the event
actor_name VARCHAR(255), -- Cached name of the actor
is_read BOOLEAN DEFAULT FALSE,
read_at TIMESTAMP WITH TIME ZONE,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
expires_at TIMESTAMP WITH TIME ZONE -- 90 days from creation
);

-- Indexes
CREATE INDEX ix_notifications_user_unread ON notifications (recipient_user_id, is_read)
WHERE is_read = FALSE;
CREATE INDEX ix_notifications_company_created ON notifications (company_id, created_at DESC);
CREATE INDEX ix_notifications_expires ON notifications (expires_at)
WHERE expires_at IS NOT NULL;

core_data_users — Email Preference Column

ALTER TABLE core_data_users ADD COLUMN email_notifications_enabled BOOLEAN NOT NULL DEFAULT TRUE;

Backend Services

NotificationService

File: app/services/notification_service.py

MethodPurposeUsed By
notify_plan_event()Plan lifecycle events -> all company users (minus actor)HazCom Plan API
notify_inventory_event()Chemical inventory events -> site usersChemIQ Inventory API
notify_user()Single-user notificationVarious
notify_company_users()Broadcast to all company usersVarious
get_notifications()Paginated list for user (supports category filter)Notification API
get_unread_count()Badge count (supports category filter)Notification API
mark_as_read()Mark single as readNotification API
mark_all_as_read()Mark all as readNotification API
_dispatch_emails_for_recipients()Fire-and-forget email dispatch for email-worthy typesInternal

NotificationRepository

File: app/db/repositories/notification_repository.py

MethodPurpose
create()Insert single notification
create_bulk()Batch insert for multi-recipient
get_for_user()Paginated query with is_read + category filter
count_unread()Lightweight count for badge
mark_read()Set is_read=True + read_at
mark_all_read()Bulk update for user
delete_expired()Cleanup notifications past expires_at
has_recent_notification()Deduplication check

SafePathScheduler

File: app/services/safepath/scheduler.py

MethodTriggerFrequencyDedup Window
check_overdue_assignments()Assignment.due_date < todayDaily (intended)24 hours
send_training_reminders()Assignment due within N daysDaily (intended)72 hours
check_expiring_certifications()Cert expires within N daysDaily (intended)7 days

Background Service Integration

MessageTypeHandlerPurpose
NOTIFICATION_SENDhandle_notification_send()Create notification DB records + send email
EMAIL_SENDhandle_email_send()Send email via Mailgun (dispatched from main service)
SAFEPATH_RULE_EVALUATEhandle_safepath_rule_evaluate()Evaluate auto-assignment rules + create assignments + send email
SAFEPATH_PLAN_RETRAININGhandle_safepath_plan_retraining()Create retraining assignments + send email

Email Infrastructure (Background Service)

FilePurpose
app/services/email/mailgun_client.pyAsync Mailgun HTTP client (httpx)
app/services/email/templates.pySubject lines + HTML/text rendering with urgency styling
app/services/email/send_notification_email.pyHelper for background handlers to send emails directly

Frontend Architecture

NotificationBell Component

File: src/components/NotificationBell.tsx

  • Renders bell icon with red badge (unread count, max "99+")
  • Dropdown shows recent notifications grouped by category with icons
  • Category icon mapping: workflow (FileText/blue), inventory (FlaskConical/emerald), compliance (ShieldAlert/amber), ai (Sparkles/purple), export (Download/gray), training (GraduationCap/indigo), labels (Tag/teal)
  • Clicking a notification navigates to action_url and marks as read
  • "Mark all as read" button

useNotifications Hook

File: src/hooks/useNotifications.ts

  • Polls GET /api/v1/notifications every 60 seconds for unread count
  • Fetches full notification list on dropdown open
  • Optimistic UI: immediately updates local state on mark-read actions
  • Returns: { notifications, unreadCount, loading, markAsRead, markAllAsRead, refetch }

API Client Functions

File: src/services/api/notification.api.ts

  • listNotifications(token, userId, companyId, params) — supports category filter
  • markNotificationRead(token, userId, companyId, notificationId)
  • markAllNotificationsRead(token, userId, companyId)
  • getEmailPreference(token, userId, companyId) — global email toggle
  • updateEmailPreference(token, userId, companyId, enabled) — toggle email on/off

Preferences & Settings

Current (Phase 2 — Global Toggle)

  • email_notifications_enabled boolean on core_data_users (default: true)
  • Controls whether ANY email notifications are sent
  • API: GET/PUT /api/v1/notifications/email-preference

Planned (Tier 2 — Per-Category Preferences)

See Notification Preferences Plan for the detailed implementation plan:

  • Per-category preferences (7 categories) with 3-tier resolution (user -> company -> system)
  • Three frequency options: immediate, daily_digest, off
  • Always-immediate override for 5 safety-critical types
  • Daily digest batching via pending_digest_emails staging table

Multi-Tenancy & Security

  • All notification queries are scoped by company_id
  • Users can only see their own notifications (recipient_user_id = current_user)
  • SSE stream requires JWT authentication via query parameter (EventSource limitation)
  • SSE endpoint verifies JWT and extracts user_id for connection registration
  • mark_read verifies notification ownership before updating
  • Admin users cannot read other users' notifications (privacy)
  • Notification metadata may contain entity names but never sensitive data