Notification System Architecture
Comprehensive architecture document for the Tellus EHS notification system.
Table of Contents
- System Overview
- Unified Notification System
- Delivery Channels
- Data Flow
- Database Schema
- Backend Services
- Background Service Integration
- Frontend Architecture
- Preferences & Settings
- Multi-Tenancy & Security
System Overview
The Tellus EHS platform has a unified notification system with 32 types across 7 categories, delivering notifications via in-app (SSE + polling), email (Mailgun), and webhook (Pro tier, planned).
Updated 2026-02-24: The system was unified in Phase 1. The previously separate SafePath notification tables (
safepath_notifications,safepath_notification_preferences) were dropped and all training notifications now flow through the singlenotificationstable.
TELLUS NOTIFICATION SYSTEM
+---------------------------------------------------------------------+
| |
| +--------------------------------------------------------------+ |
| | Unified Notifications Table | |
| | (32 types / 7 categories) | |
| | | |
| | workflow (9) | inventory (6) | compliance (4) | ai (4) | |
| | export (2) | training (7) | labels (2) | |
| +----------------------------+---------------------------------+ |
| | |
| +-----------------+-----------------+ |
| | | |
| +-------v-------+ +--------v--------+ |
| | SSE Manager | (real-time) | Email (Mailgun) | |
| | + REST API | (CRUD) | via SQS async | |
| +-------+-------+ +-----------------+ |
| | |
| +-------v--------+ |
| | NotificationBell| (frontend) |
| | + 60s polling | |
| +----------------+ |
+---------------------------------------------------------------------+
Unified Notification System
Location: app/db/models/notification.py, app/services/notification_service.py
Handles all notifications across all modules via a single notifications table:
Categories & Types:
| Category | Types | Example |
|---|---|---|
workflow | plan_created, plan_published, plan_archived, plan_shared, section_completed, plan_approved, plan_rejected, plan_comment, plan_revision_requested | "HazCom Plan 'Q3 Update' was published" |
inventory | chemical_added, chemical_updated, chemical_removed, inventory_import_complete, sds_attached, sds_expiring | "New chemical 'Acetone' added to Site A" |
compliance | compliance_gap_detected, compliance_status_changed, audit_scheduled, quantity_threshold_exceeded | "Compliance gap: Site B missing 3 SDS documents" |
ai | ai_generation_started, ai_generation_completed, ai_generation_failed, ai_prefill_completed | "AI content generation completed for Plan 'Q3'" |
export | export_ready, export_failed | "Your HazCom Plan PDF is ready for download" |
training | training_assigned, training_reminder, training_overdue, training_completed, cert_expiring, cert_expired, cert_renewed | "Training 'GHS Overview' assigned to you" |
labels | label_found, label_not_found | "EPA label found for product 'Roundup Pro'" |
Key characteristics:
- Global
email_notifications_enabledper-user opt-out (Phase 2) - 90-day TTL with automatic cleanup
- Supports
action_urlfor deep-linking metadataJSONB field for extra context (plan_id, chemical_name, etc.)- Tracks
actor_user_idandactor_namefor "who did it" - Deduplication via
has_recent_notification()(same entity + type within N days) - Email delivery for 15 email-worthy types via background service (Mailgun)
Delivery Channels
Channel 1: In-App (Real-time via SSE)
Status: Implemented
Backend API --> DB Insert --> SSE Manager --> EventSource (Browser)
|
NotificationBell component
- SSEManager (
app/services/sse_manager.py): In-memory singleton usingasyncio.Queueper connection - SSE Endpoint:
GET /api/v1/notifications/stream?token=<jwt> - Heartbeat: Every 30 seconds to keep connection alive
- Reconnect: EventSource auto-reconnects; REST polling (60s) catches missed events
- Multi-tab: Each tab opens its own SSE connection; all receive push
Channel 2: Email (Mailgun via Background Service)
Status: Implemented (Phase 2, completed 2026-02-24)
Main Service (API) SQS Queue Background Service
+--------------------+ +--------------------+
| NotificationSvc | | |
| .notify_user() |--creates in-app notification-->DB | |
| | | |
| if email-worthy: |--dispatch_email()-->[EMAIL_SEND]--> | handle_email_send |
| | | -> check opt-out |
| | | -> render HTML |
| | | -> Mailgun API |
+--------------------+ +--------------------+
- 15 email-worthy types defined in
EMAIL_WORTHY_TYPESconstant - Mailgun HTTP API client in background service (
app/services/email/mailgun_client.py) - HTML + plaintext templates with urgency-based styling (red/amber/blue)
- User opt-out:
email_notifications_enabledboolean oncore_data_userstable - Two dispatch paths: (1) Main service -> SQS -> background for API-originated, (2) Background handlers send directly via helper for background-originated
Channel 3: Webhook (Pro Tier)
Status: Schema only (not implemented)
- Intended for enterprise integrations (Slack, Teams, custom)
Data Flow
Flow A: Synchronous In-App Notification (General)
Used for events triggered directly by API calls (plan publish, chemical add, etc.):
1. User action (e.g., publish plan)
2. API endpoint calls NotificationService.notify_plan_event()
3. Service creates Notification records in DB (one per recipient)
4. Service calls sse_manager.push_to_users(recipient_ids, event)
5. If email-worthy: dispatches EMAIL_SEND to SQS for each recipient
6. SSE pushes event to connected browsers
7. NotificationBell shows badge + dropdown entry
8. Background service processes EMAIL_SEND, checks opt-out, sends via Mailgun
Flow B: Asynchronous Notification (Background Service)
Used for events originating from background jobs (AI generation, imports):
1. Background job completes (e.g., AI generation)
2. Handler dispatches NOTIFICATION_SEND message to SQS
3. SQS consumer picks up message
4. handle_notification_send() creates DB records via raw SQL
5. If email-worthy: sends email directly via helper function (no re-queue)
6. Frontend catches up on next poll (60s)
Flow C: Scheduled Notifications (Training)
Used for time-based events (overdue, reminders, cert expiry):
1. Cron/scheduler triggers SafePathScheduler.run_all_scheduled_tasks()
2. Scheduler queries for overdue assignments, upcoming due dates, expiring certs
3. For each match, checks deduplication (was user notified recently?)
4. Creates notification record via NotificationService + UnitOfWork
5. Email sent if type is email-worthy and user has email enabled
Flow D: Auto-Assignment Notifications
Used when auto-assignment rules create new training assignments:
1. Trigger event occurs (chemical added, new hire, etc.)
2. API dispatches SAFEPATH_RULE_EVALUATE to SQS
3. Background handler evaluates rules, creates assignments
4. Creates training_assigned notification in notifications table
5. Sends email if user has email enabled
Database Schema
notifications Table
CREATE TABLE notifications (
notification_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
company_id UUID NOT NULL REFERENCES companies(company_id),
recipient_user_id UUID NOT NULL REFERENCES users(user_id),
notification_type VARCHAR(50) NOT NULL,
category VARCHAR(30) NOT NULL,
title VARCHAR(255) NOT NULL,
message TEXT NOT NULL,
entity_type VARCHAR(50), -- 'plan', 'chemical', 'sds', etc.
entity_id UUID, -- ID of the related entity
action_url VARCHAR(500), -- Deep link to relevant page
metadata JSONB DEFAULT '{}', -- Extra context (plan_name, chemical_name, etc.)
actor_user_id UUID, -- Who triggered the event
actor_name VARCHAR(255), -- Cached name of the actor
is_read BOOLEAN DEFAULT FALSE,
read_at TIMESTAMP WITH TIME ZONE,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
expires_at TIMESTAMP WITH TIME ZONE -- 90 days from creation
);
-- Indexes
CREATE INDEX ix_notifications_user_unread ON notifications (recipient_user_id, is_read)
WHERE is_read = FALSE;
CREATE INDEX ix_notifications_company_created ON notifications (company_id, created_at DESC);
CREATE INDEX ix_notifications_expires ON notifications (expires_at)
WHERE expires_at IS NOT NULL;
core_data_users — Email Preference Column
ALTER TABLE core_data_users ADD COLUMN email_notifications_enabled BOOLEAN NOT NULL DEFAULT TRUE;
Backend Services
NotificationService
File: app/services/notification_service.py
| Method | Purpose | Used By |
|---|---|---|
notify_plan_event() | Plan lifecycle events -> all company users (minus actor) | HazCom Plan API |
notify_inventory_event() | Chemical inventory events -> site users | ChemIQ Inventory API |
notify_user() | Single-user notification | Various |
notify_company_users() | Broadcast to all company users | Various |
get_notifications() | Paginated list for user (supports category filter) | Notification API |
get_unread_count() | Badge count (supports category filter) | Notification API |
mark_as_read() | Mark single as read | Notification API |
mark_all_as_read() | Mark all as read | Notification API |
_dispatch_emails_for_recipients() | Fire-and-forget email dispatch for email-worthy types | Internal |
NotificationRepository
File: app/db/repositories/notification_repository.py
| Method | Purpose |
|---|---|
create() | Insert single notification |
create_bulk() | Batch insert for multi-recipient |
get_for_user() | Paginated query with is_read + category filter |
count_unread() | Lightweight count for badge |
mark_read() | Set is_read=True + read_at |
mark_all_read() | Bulk update for user |
delete_expired() | Cleanup notifications past expires_at |
has_recent_notification() | Deduplication check |
SafePathScheduler
File: app/services/safepath/scheduler.py
| Method | Trigger | Frequency | Dedup Window |
|---|---|---|---|
check_overdue_assignments() | Assignment.due_date < today | Daily (intended) | 24 hours |
send_training_reminders() | Assignment due within N days | Daily (intended) | 72 hours |
check_expiring_certifications() | Cert expires within N days | Daily (intended) | 7 days |
Background Service Integration
SQS Message Types (Notification-Related)
| MessageType | Handler | Purpose |
|---|---|---|
NOTIFICATION_SEND | handle_notification_send() | Create notification DB records + send email |
EMAIL_SEND | handle_email_send() | Send email via Mailgun (dispatched from main service) |
SAFEPATH_RULE_EVALUATE | handle_safepath_rule_evaluate() | Evaluate auto-assignment rules + create assignments + send email |
SAFEPATH_PLAN_RETRAINING | handle_safepath_plan_retraining() | Create retraining assignments + send email |
Email Infrastructure (Background Service)
| File | Purpose |
|---|---|
app/services/email/mailgun_client.py | Async Mailgun HTTP client (httpx) |
app/services/email/templates.py | Subject lines + HTML/text rendering with urgency styling |
app/services/email/send_notification_email.py | Helper for background handlers to send emails directly |
Frontend Architecture
NotificationBell Component
File: src/components/NotificationBell.tsx
- Renders bell icon with red badge (unread count, max "99+")
- Dropdown shows recent notifications grouped by category with icons
- Category icon mapping: workflow (FileText/blue), inventory (FlaskConical/emerald), compliance (ShieldAlert/amber), ai (Sparkles/purple), export (Download/gray), training (GraduationCap/indigo), labels (Tag/teal)
- Clicking a notification navigates to
action_urland marks as read - "Mark all as read" button
useNotifications Hook
File: src/hooks/useNotifications.ts
- Polls
GET /api/v1/notificationsevery 60 seconds for unread count - Fetches full notification list on dropdown open
- Optimistic UI: immediately updates local state on mark-read actions
- Returns:
{ notifications, unreadCount, loading, markAsRead, markAllAsRead, refetch }
API Client Functions
File: src/services/api/notification.api.ts
listNotifications(token, userId, companyId, params)— supportscategoryfiltermarkNotificationRead(token, userId, companyId, notificationId)markAllNotificationsRead(token, userId, companyId)getEmailPreference(token, userId, companyId)— global email toggleupdateEmailPreference(token, userId, companyId, enabled)— toggle email on/off
Preferences & Settings
Current (Phase 2 — Global Toggle)
email_notifications_enabledboolean oncore_data_users(default:true)- Controls whether ANY email notifications are sent
- API:
GET/PUT /api/v1/notifications/email-preference
Planned (Tier 2 — Per-Category Preferences)
See Notification Preferences Plan for the detailed implementation plan:
- Per-category preferences (7 categories) with 3-tier resolution (user -> company -> system)
- Three frequency options:
immediate,daily_digest,off - Always-immediate override for 5 safety-critical types
- Daily digest batching via
pending_digest_emailsstaging table
Multi-Tenancy & Security
- All notification queries are scoped by
company_id - Users can only see their own notifications (
recipient_user_id = current_user) - SSE stream requires JWT authentication via query parameter (EventSource limitation)
- SSE endpoint verifies JWT and extracts
user_idfor connection registration mark_readverifies notification ownership before updating- Admin users cannot read other users' notifications (privacy)
- Notification metadata may contain entity names but never sensitive data