Skip to main content

Chemical Database Integration Plan

Executive Summary

This document outlines the strategy for integrating external chemical databases (primarily PubChem) with the Tellus EHS platform to enhance PPE recommendations, exposure controls, and regulatory compliance features. The approach uses demand-driven caching to avoid storing unnecessary data while ensuring relevant chemical information is available when needed.

Table of Contents

  1. Problem Statement
  2. Solution Overview
  3. Data Sources
  4. Caching Strategy
  5. SDS Parsing Integration
  6. Database Schema
  7. API Design
  8. Background Job Implementation
  9. Data Flow Architecture
  10. Implementation Phases
  11. Performance Considerations
  12. References
  13. Appendix: Integration with PPE System
  14. Appendix: Calculated Exposure Controls
  15. Appendix: Regulatory Compliance Features

Problem Statement

Current Limitations

The Tellus EHS platform currently relies solely on data extracted from SDS documents for chemical safety information. While SDS data is comprehensive, it has limitations:

  1. Inconsistent Data Quality: Different manufacturers format SDS data differently
  2. Missing Regulatory Limits: Not all SDS documents include OSHA PEL, NIOSH REL, or IDLH values
  3. Limited Chemical Properties: Physical properties needed for PPE selection may be incomplete
  4. No Chemical Family Classification: Determining chemical families for glove/cartridge selection requires external data
  5. Validation Gap: No way to validate or supplement extracted SDS data

Why Not Copy the Entire Database?

PubChem contains 100+ million compounds. Copying the entire database would be:

  • Wasteful: 99.9%+ of compounds will never be encountered by users
  • Expensive: Storage and maintenance costs for unused data
  • Complex: Keeping massive dataset synchronized with source
  • Unnecessary: Industrial/commercial chemicals represent a small subset

Solution Overview

Demand-Driven Caching Strategy

Instead of bulk importing external databases, we implement a just-in-time caching approach:

┌─────────────────────────────────────────────────────────────────────┐
│ Chemical Data Request Flow │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ SDS Upload ──► Extract CAS Numbers ──► Check Cache │
│ │ │ │
│ │ ┌────────┴────────┐ │
│ │ ▼ ▼ │
│ │ Cache Hit Cache Miss │
│ │ │ │ │
│ │ │ Fetch from │
│ │ │ PubChem │
│ │ │ │ │
│ │ │ Store in │
│ │ │ Cache │
│ │ │ │ │
│ ▼ └────────┬────────┘ │
│ PPE Recommendation ◄───────────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────┘

Key Principles

  1. Fetch on First Encounter: When a CAS number is first seen, fetch and cache its data
  2. Prioritize Common Chemicals: Pre-seed cache with ~100 most common industrial chemicals
  3. Background Processing: External API calls happen in background jobs, not user requests
  4. Graceful Degradation: System works without external data; it just enhances when available
  5. Cache Refresh: Stale data refreshed periodically (90-day cycle)

Data Sources

Primary: PubChem

Why PubChem?

  • Free, public API with no rate limits for reasonable usage
  • Comprehensive chemical property data
  • GHS classification information
  • Cross-references to other databases (EPA, NIOSH)

API Endpoints:

Base URL: https://pubchem.ncbi.nlm.nih.gov/rest/pug

# Get compound by CAS number
/compound/name/{cas_number}/JSON

# Get specific properties
/compound/cid/{cid}/property/MolecularWeight,IUPACName,VaporPressure/JSON

# Get GHS classification
/compound/cid/{cid}/classification/JSON

Secondary: NIOSH Pocket Guide

Why NIOSH?

  • Authoritative occupational exposure limits
  • IDLH (Immediately Dangerous to Life or Health) values
  • Recommended exposure limits (REL)
  • Respirator recommendations

Data Access:

  • Static dataset (~600 chemicals)
  • Can be pre-loaded as seed data
  • Updates annually

Tertiary: EPA CompTox

Why CompTox?

  • Additional toxicity data
  • Environmental fate information
  • Regulatory status

Usage:

  • On-demand for specific regulatory queries
  • Lower priority than PubChem/NIOSH

Caching Strategy

Cache Levels

┌─────────────────────────────────────────────────────────────────────┐
│ Cache Priority Levels │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ Level 1 (Pre-seeded): ~100 common industrial chemicals │
│ ──────────────────── Always in cache, refreshed monthly │
│ │
│ Level 2 (Company Common): Chemicals used by multiple companies │
│ ────────────────────── Cached indefinitely, refreshed quarterly │
│ │
│ Level 3 (On-Demand): First-time encountered chemicals │
│ ───────────────────── Cached on fetch, may expire │
│ │
└─────────────────────────────────────────────────────────────────────┘

Cache Population Triggers

Trigger EventActionPriority
SDS Document UploadedQueue CAS numbers for enrichmentHigh
Inventory Item AddedCheck composition CAS numbersMedium
PPE Recommendation RequestFetch missing chemical dataHigh
Bulk ImportBatch process all CAS numbersLow (background)
Cache Miss on QueryImmediate fetch and cacheHigh

Cache Refresh Strategy

# Refresh priorities
REFRESH_INTERVALS = {
'level_1_seed': 30, # 30 days - common chemicals
'level_2_company': 90, # 90 days - company chemicals
'level_3_ondemand': 180, # 180 days - infrequent chemicals
}

# Stale data handling
def should_refresh(cache_entry):
age_days = (datetime.now() - cache_entry.fetched_at).days

if cache_entry.priority_level == 1:
return age_days > REFRESH_INTERVALS['level_1_seed']
elif cache_entry.fetch_count > 5: # Frequently accessed
return age_days > REFRESH_INTERVALS['level_2_company']
else:
return age_days > REFRESH_INTERVALS['level_3_ondemand']

Estimated Cache Size

CategoryEst. RecordsData per RecordTotal Size
Seed Chemicals1005 KB500 KB
Company Chemicals5,0005 KB25 MB
On-Demand15,0005 KB75 MB
Total20,100-~100 MB

SDS Parsing Integration

Current SDS Parsing Flow

The existing SDS background job parses uploaded documents and extracts:

  • Product identification (Section 1)
  • Hazard information (Section 2)
  • Composition (Section 3) - CAS numbers extracted here
  • First aid measures (Section 4)
  • PPE recommendations (Section 8)

Enhanced Flow with Chemical Enrichment

┌─────────────────────────────────────────────────────────────────────┐
│ SDS Parsing + Chemical Enrichment Flow │
├─────────────────────────────────────────────────────────────────────┤
│ │
│ 1. SDS Upload │
│ └──► Queue background job (existing) │
│ │
│ 2. Parse SDS Document │
│ └──► Extract all sections via LLM (existing) │
│ └──► Store parsed_json in chemiq_sds_documents │
│ │
│ 3. Extract Composition [EXISTING] │
│ └──► Parse Section 3 composition │
│ └──► Store in chemiq_sds_composition table │
│ └──► Extract CAS numbers │
│ │
│ 4. Chemical Enrichment [NEW] │
│ └──► For each CAS number: │
│ ├──► Check chemiq_pubchem_cache │
│ ├──► If miss: Queue PubChem fetch job │
│ └──► Store enriched data │
│ │
│ 5. PPE Enhancement [NEW] │
│ └──► Use enriched data for PPE recommendations │
│ └──► Store in ppe_sds_recommendations table │
│ │
│ 6. Mark SDS Processing Complete │
│ └──► Update chemiq_sds_documents.processing_status │
│ │
└─────────────────────────────────────────────────────────────────────┘

Integration Points

1. After Composition Extraction

# In sds_processing_service.py - after Section 3 parsing

async def process_composition_chemicals(
db: Session,
sds_id: UUID,
composition: List[CompositionComponent]
) -> None:
"""
Process extracted composition and enrich with external data.
Called after SDS Section 3 is parsed.
"""
cas_numbers_to_fetch = []

for component in composition:
if component.cas_number:
# Check if already in cache
cached = db.query(PubChemCache).filter(
PubChemCache.cas_number == component.cas_number
).first()

if not cached:
cas_numbers_to_fetch.append(component.cas_number)
elif should_refresh(cached):
cas_numbers_to_fetch.append(component.cas_number)

# Queue background job for fetching
if cas_numbers_to_fetch:
await queue_pubchem_fetch_job(
cas_numbers=cas_numbers_to_fetch,
source_sds_id=sds_id,
priority='normal'
)

2. PPE Recommendation Enhancement

# In ppe_recommendation_service.py

async def get_enhanced_ppe_recommendation(
db: Session,
sds_detail: SDSDocumentDetail
) -> PPERecommendation:
"""
Generate PPE recommendations using both SDS data and enriched chemical data.
"""
recommendation = PPERecommendation()

# Get composition CAS numbers
composition = sds_detail.composition or []

for component in composition:
if component.cas_number:
# Try to get enriched data
enriched = db.query(PubChemCache).filter(
PubChemCache.cas_number == component.cas_number
).first()

if enriched:
# Use chemical family for glove selection
if enriched.chemical_family:
recommendation.gloves = enhance_glove_selection(
recommendation.gloves,
enriched.chemical_family
)

# Use vapor pressure for respiratory selection
if enriched.vapor_pressure_mmhg:
recommendation.respiratory = enhance_respiratory_selection(
recommendation.respiratory,
enriched.vapor_pressure_mmhg,
enriched.osha_pel_ppm
)

return recommendation

Database Schema

PubChem Cache Table

-- Primary cache table for external chemical data
CREATE TABLE chemiq_pubchem_cache (
cache_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),

-- Identifiers
cas_number VARCHAR(20) NOT NULL UNIQUE,
pubchem_cid INTEGER,

-- Chemical Identity
iupac_name VARCHAR(500),
common_name VARCHAR(255),
molecular_formula VARCHAR(100),
molecular_weight DECIMAL(10, 4),
chemical_family VARCHAR(100), -- 'ketone', 'alcohol', 'aromatic', etc.
chemical_class VARCHAR(100), -- Higher-level classification

-- Physical Properties (for PPE selection)
vapor_pressure_mmhg DECIMAL(10, 4),
boiling_point_c DECIMAL(8, 2),
flash_point_c DECIMAL(8, 2),
specific_gravity DECIMAL(6, 4),
water_solubility_mg_l DECIMAL(12, 4),

-- Regulatory Limits (for exposure control)
osha_pel_ppm DECIMAL(10, 4),
osha_pel_mg_m3 DECIMAL(10, 4),
niosh_rel_ppm DECIMAL(10, 4),
niosh_rel_mg_m3 DECIMAL(10, 4),
niosh_idlh_ppm DECIMAL(10, 4),
acgih_tlv_ppm DECIMAL(10, 4),

-- GHS Classification (as JSONB for flexibility)
ghs_classification JSONB,
-- Example: {
-- "hazard_statements": ["H315", "H319", "H335"],
-- "precautionary_statements": ["P261", "P305+P351+P338"],
-- "signal_word": "Warning",
-- "pictograms": ["GHS07"]
-- }

-- Source Tracking
source VARCHAR(50) DEFAULT 'pubchem', -- 'pubchem', 'niosh', 'epa_comptox'
source_url VARCHAR(500),

-- Cache Management
fetched_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
last_refreshed_at TIMESTAMP WITH TIME ZONE,
fetch_count INTEGER DEFAULT 1,
priority_level SMALLINT DEFAULT 3, -- 1=seed, 2=company, 3=on-demand

-- Metadata
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

-- Indexes for common queries
CREATE INDEX idx_pubchem_cache_cas ON chemiq_pubchem_cache(cas_number);
CREATE INDEX idx_pubchem_cache_cid ON chemiq_pubchem_cache(pubchem_cid);
CREATE INDEX idx_pubchem_cache_family ON chemiq_pubchem_cache(chemical_family);
CREATE INDEX idx_pubchem_cache_priority ON chemiq_pubchem_cache(priority_level);
CREATE INDEX idx_pubchem_cache_fetched ON chemiq_pubchem_cache(fetched_at);

Chemical Fetch Queue Table

-- Queue for background chemical data fetching
CREATE TABLE chemiq_chemical_fetch_queue (
queue_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),

-- Chemical to fetch
cas_number VARCHAR(20) NOT NULL,

-- Source context
source_type VARCHAR(50) NOT NULL, -- 'sds_upload', 'inventory_add', 'manual'
source_id UUID, -- Reference to source (sds_id, inventory_id)
company_id UUID REFERENCES core_companies(company_id),

-- Processing status
status VARCHAR(20) DEFAULT 'pending', -- 'pending', 'processing', 'completed', 'failed'
priority VARCHAR(20) DEFAULT 'normal', -- 'high', 'normal', 'low'

-- Attempt tracking
attempts INTEGER DEFAULT 0,
max_attempts INTEGER DEFAULT 3,
last_attempt_at TIMESTAMP WITH TIME ZONE,
error_message TEXT,

-- Timestamps
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
processed_at TIMESTAMP WITH TIME ZONE,

-- Prevent duplicate queue entries
UNIQUE(cas_number, status) WHERE status IN ('pending', 'processing')
);

-- Index for queue processing
CREATE INDEX idx_fetch_queue_status ON chemiq_chemical_fetch_queue(status, priority, created_at);

Cache Statistics Table

-- Track cache usage for optimization
CREATE TABLE chemiq_cache_statistics (
stat_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),

-- Time period
period_start DATE NOT NULL,
period_end DATE NOT NULL,

-- Metrics
total_lookups INTEGER DEFAULT 0,
cache_hits INTEGER DEFAULT 0,
cache_misses INTEGER DEFAULT 0,
fetch_successes INTEGER DEFAULT 0,
fetch_failures INTEGER DEFAULT 0,

-- Performance
avg_fetch_time_ms DECIMAL(10, 2),

-- Top accessed chemicals (for optimization)
top_chemicals JSONB,
-- Example: [{"cas": "67-64-1", "count": 150}, ...]

created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

API Design

Internal Service APIs

Chemical Enrichment Service

# app/services/chemical_enrichment_service.py

from typing import Optional, List
from uuid import UUID
from sqlalchemy.orm import Session
from app.db.models.pubchem_cache import PubChemCache
from app.schemas.chemical import ChemicalEnrichmentData

class ChemicalEnrichmentService:
"""Service for managing chemical data enrichment from external sources."""

def __init__(self, db: Session):
self.db = db

async def get_chemical_data(
self,
cas_number: str,
fetch_if_missing: bool = True
) -> Optional[ChemicalEnrichmentData]:
"""
Get enriched chemical data for a CAS number.

Args:
cas_number: The CAS number to look up
fetch_if_missing: If True, queue fetch job when not in cache

Returns:
ChemicalEnrichmentData if found, None otherwise
"""
# Check cache first
cached = self.db.query(PubChemCache).filter(
PubChemCache.cas_number == cas_number
).first()

if cached:
# Update fetch count for statistics
cached.fetch_count += 1
self.db.commit()
return ChemicalEnrichmentData.from_orm(cached)

if fetch_if_missing:
await self._queue_fetch(cas_number)

return None

async def get_bulk_chemical_data(
self,
cas_numbers: List[str]
) -> dict[str, ChemicalEnrichmentData]:
"""
Get enriched data for multiple CAS numbers.
Returns dict mapping CAS number to data (missing CAS numbers omitted).
"""
results = {}
missing = []

# Batch query cache
cached_entries = self.db.query(PubChemCache).filter(
PubChemCache.cas_number.in_(cas_numbers)
).all()

cached_cas = set()
for entry in cached_entries:
results[entry.cas_number] = ChemicalEnrichmentData.from_orm(entry)
cached_cas.add(entry.cas_number)

# Queue missing CAS numbers for fetch
missing = [cas for cas in cas_numbers if cas not in cached_cas]
if missing:
await self._queue_bulk_fetch(missing)

return results

async def _queue_fetch(
self,
cas_number: str,
priority: str = 'normal',
source_type: str = 'lookup',
source_id: Optional[UUID] = None
) -> None:
"""Queue a single CAS number for background fetching."""
from app.db.models.fetch_queue import ChemicalFetchQueue

# Check if already queued
existing = self.db.query(ChemicalFetchQueue).filter(
ChemicalFetchQueue.cas_number == cas_number,
ChemicalFetchQueue.status.in_(['pending', 'processing'])
).first()

if not existing:
queue_entry = ChemicalFetchQueue(
cas_number=cas_number,
source_type=source_type,
source_id=source_id,
priority=priority
)
self.db.add(queue_entry)
self.db.commit()

async def _queue_bulk_fetch(
self,
cas_numbers: List[str],
priority: str = 'low'
) -> None:
"""Queue multiple CAS numbers for background fetching."""
for cas in cas_numbers:
await self._queue_fetch(cas, priority=priority, source_type='bulk_lookup')

PubChem API Client

# app/services/pubchem_client.py

import httpx
from typing import Optional, Dict, Any
from tenacity import retry, stop_after_attempt, wait_exponential

class PubChemClient:
"""Client for interacting with PubChem REST API."""

BASE_URL = "https://pubchem.ncbi.nlm.nih.gov/rest/pug"

def __init__(self):
self.client = httpx.AsyncClient(timeout=30.0)

@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10)
)
async def get_compound_by_cas(
self,
cas_number: str
) -> Optional[Dict[str, Any]]:
"""
Fetch compound data from PubChem by CAS number.

Returns dict with compound properties or None if not found.
"""
try:
# First, get the CID from CAS number
cid_url = f"{self.BASE_URL}/compound/name/{cas_number}/cids/JSON"
cid_response = await self.client.get(cid_url)

if cid_response.status_code == 404:
return None

cid_response.raise_for_status()
cid_data = cid_response.json()
cid = cid_data.get('IdentifierList', {}).get('CID', [None])[0]

if not cid:
return None

# Get compound properties
props_url = (
f"{self.BASE_URL}/compound/cid/{cid}/property/"
"MolecularFormula,MolecularWeight,IUPACName,"
"XLogP,TPSA,Complexity/JSON"
)
props_response = await self.client.get(props_url)
props_response.raise_for_status()
props_data = props_response.json()

# Get GHS classification
ghs_url = f"{self.BASE_URL}/compound/cid/{cid}/classification/JSON"
ghs_response = await self.client.get(ghs_url)
ghs_data = None
if ghs_response.status_code == 200:
ghs_data = self._parse_ghs_classification(ghs_response.json())

# Combine results
properties = props_data.get('PropertyTable', {}).get('Properties', [{}])[0]

return {
'pubchem_cid': cid,
'cas_number': cas_number,
'iupac_name': properties.get('IUPACName'),
'molecular_formula': properties.get('MolecularFormula'),
'molecular_weight': properties.get('MolecularWeight'),
'ghs_classification': ghs_data,
'source_url': f"https://pubchem.ncbi.nlm.nih.gov/compound/{cid}"
}

except httpx.HTTPStatusError as e:
if e.response.status_code == 404:
return None
raise

def _parse_ghs_classification(
self,
raw_data: Dict[str, Any]
) -> Optional[Dict[str, Any]]:
"""Parse GHS classification from PubChem response."""
try:
classifications = raw_data.get('Hierarchies', {}).get('Hierarchy', [])

for classification in classifications:
if 'GHS' in classification.get('SourceName', ''):
nodes = classification.get('Node', [])

hazard_statements = []
pictograms = []
signal_word = None

for node in nodes:
info = node.get('Information', {})
name = info.get('Name', '')

if name.startswith('H') and name[1:].isdigit():
hazard_statements.append(name)
elif 'Pictogram' in name:
pictograms.append(name)
elif name in ['Danger', 'Warning']:
signal_word = name

return {
'hazard_statements': hazard_statements,
'pictograms': pictograms,
'signal_word': signal_word
}

return None
except Exception:
return None

async def close(self):
"""Close the HTTP client."""
await self.client.aclose()

Background Job Implementation

Chemical Fetch Worker

# app/workers/chemical_fetch_worker.py

import asyncio
from datetime import datetime, timedelta
from typing import List
from sqlalchemy.orm import Session
from app.db.session import get_db
from app.db.models.pubchem_cache import PubChemCache
from app.db.models.fetch_queue import ChemicalFetchQueue
from app.services.pubchem_client import PubChemClient
from app.core.logging import get_logger

logger = get_logger(__name__)

class ChemicalFetchWorker:
"""Background worker for fetching chemical data from external sources."""

BATCH_SIZE = 10
POLL_INTERVAL = 30 # seconds

def __init__(self):
self.pubchem_client = PubChemClient()
self.running = False

async def start(self):
"""Start the worker loop."""
self.running = True
logger.info("Chemical fetch worker started")

while self.running:
try:
await self._process_batch()
except Exception as e:
logger.error(f"Error in fetch worker: {e}")

await asyncio.sleep(self.POLL_INTERVAL)

async def stop(self):
"""Stop the worker loop."""
self.running = False
await self.pubchem_client.close()
logger.info("Chemical fetch worker stopped")

async def _process_batch(self):
"""Process a batch of queued fetch requests."""
db = next(get_db())

try:
# Get pending items, prioritized
items = db.query(ChemicalFetchQueue).filter(
ChemicalFetchQueue.status == 'pending',
ChemicalFetchQueue.attempts < ChemicalFetchQueue.max_attempts
).order_by(
# High priority first
ChemicalFetchQueue.priority.desc(),
# Older items first
ChemicalFetchQueue.created_at.asc()
).limit(self.BATCH_SIZE).all()

if not items:
return

logger.info(f"Processing {len(items)} chemical fetch requests")

for item in items:
await self._fetch_and_cache(db, item)

finally:
db.close()

async def _fetch_and_cache(
self,
db: Session,
queue_item: ChemicalFetchQueue
) -> bool:
"""
Fetch chemical data and store in cache.

Returns True if successful, False otherwise.
"""
queue_item.status = 'processing'
queue_item.attempts += 1
queue_item.last_attempt_at = datetime.utcnow()
db.commit()

try:
# Fetch from PubChem
data = await self.pubchem_client.get_compound_by_cas(
queue_item.cas_number
)

if data:
# Create or update cache entry
cache_entry = db.query(PubChemCache).filter(
PubChemCache.cas_number == queue_item.cas_number
).first()

if cache_entry:
# Update existing
cache_entry.pubchem_cid = data.get('pubchem_cid')
cache_entry.iupac_name = data.get('iupac_name')
cache_entry.molecular_formula = data.get('molecular_formula')
cache_entry.molecular_weight = data.get('molecular_weight')
cache_entry.ghs_classification = data.get('ghs_classification')
cache_entry.source_url = data.get('source_url')
cache_entry.last_refreshed_at = datetime.utcnow()
cache_entry.updated_at = datetime.utcnow()
else:
# Create new entry
cache_entry = PubChemCache(
cas_number=queue_item.cas_number,
pubchem_cid=data.get('pubchem_cid'),
iupac_name=data.get('iupac_name'),
molecular_formula=data.get('molecular_formula'),
molecular_weight=data.get('molecular_weight'),
ghs_classification=data.get('ghs_classification'),
source='pubchem',
source_url=data.get('source_url'),
priority_level=3 # On-demand
)
db.add(cache_entry)

queue_item.status = 'completed'
queue_item.processed_at = datetime.utcnow()
db.commit()

logger.info(f"Cached chemical data for CAS {queue_item.cas_number}")
return True
else:
# Not found in PubChem
queue_item.status = 'completed'
queue_item.processed_at = datetime.utcnow()
queue_item.error_message = 'Not found in PubChem'
db.commit()

logger.warning(
f"Chemical CAS {queue_item.cas_number} not found in PubChem"
)
return False

except Exception as e:
queue_item.error_message = str(e)

if queue_item.attempts >= queue_item.max_attempts:
queue_item.status = 'failed'
else:
queue_item.status = 'pending' # Will retry

db.commit()

logger.error(
f"Error fetching CAS {queue_item.cas_number}: {e}"
)
return False

SDS Processing Integration

# app/services/sds_processing_service.py (additions)

from app.services.chemical_enrichment_service import ChemicalEnrichmentService

class SDSProcessingService:
"""Enhanced SDS processing with chemical enrichment."""

async def process_sds_document(
self,
db: Session,
sds_id: UUID,
file_path: str
) -> SDSDocument:
"""
Process an SDS document with chemical enrichment.
"""
# ... existing SDS parsing code ...

# After parsing composition (Section 3)
if parsed_data.composition:
await self._enrich_composition_chemicals(
db,
sds_id,
parsed_data.composition
)

# ... rest of processing ...

async def _enrich_composition_chemicals(
self,
db: Session,
sds_id: UUID,
composition: List[CompositionComponent]
) -> None:
"""
Queue chemical enrichment for all CAS numbers in composition.
"""
enrichment_service = ChemicalEnrichmentService(db)

cas_numbers = [
comp.cas_number
for comp in composition
if comp.cas_number
]

if cas_numbers:
# This will check cache and queue any missing
await enrichment_service.get_bulk_chemical_data(cas_numbers)

logger.info(
f"Queued enrichment for {len(cas_numbers)} chemicals "
f"from SDS {sds_id}"
)

Cache Refresh Job

# app/workers/cache_refresh_worker.py

class CacheRefreshWorker:
"""Background worker for refreshing stale cache entries."""

REFRESH_BATCH_SIZE = 50

async def refresh_stale_entries(self, db: Session):
"""
Find and refresh stale cache entries based on priority.
"""
now = datetime.utcnow()

# Level 1 (seed): Refresh if older than 30 days
level_1_cutoff = now - timedelta(days=30)

# Level 2 (company): Refresh if older than 90 days
level_2_cutoff = now - timedelta(days=90)

# Level 3 (on-demand): Refresh if older than 180 days and used recently
level_3_cutoff = now - timedelta(days=180)

stale_entries = db.query(PubChemCache).filter(
db.or_(
# Level 1 entries older than 30 days
db.and_(
PubChemCache.priority_level == 1,
PubChemCache.fetched_at < level_1_cutoff
),
# Level 2 entries older than 90 days
db.and_(
PubChemCache.priority_level == 2,
PubChemCache.fetched_at < level_2_cutoff
),
# Level 3 entries older than 180 days with high fetch count
db.and_(
PubChemCache.priority_level == 3,
PubChemCache.fetched_at < level_3_cutoff,
PubChemCache.fetch_count > 5
)
)
).limit(self.REFRESH_BATCH_SIZE).all()

for entry in stale_entries:
await self._queue_refresh(db, entry.cas_number)

async def _queue_refresh(
self,
db: Session,
cas_number: str
) -> None:
"""Queue a cache entry for refresh."""
queue_entry = ChemicalFetchQueue(
cas_number=cas_number,
source_type='cache_refresh',
priority='low'
)
db.add(queue_entry)
db.commit()

Data Flow Architecture

Complete System Data Flow

┌─────────────────────────────────────────────────────────────────────────────┐
│ Chemical Data Flow Architecture │
├─────────────────────────────────────────────────────────────────────────────┤
│ │
│ ┌──────────────┐ ┌──────────────────┐ ┌────────────────────┐ │
│ │ SDS Upload │───►│ SDS Parser │───►│ chemiq_sds_ │ │
│ │ (UI/API) │ │ (Background Job) │ │ documents │ │
│ └──────────────┘ └────────┬─────────┘ └────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────┐ ┌────────────────────┐ │
│ │ Composition │───►│ chemiq_sds_ │ │
│ │ Extractor │ │ composition │ │
│ └────────┬─────────┘ └────────────────────┘ │
│ │ │
│ │ CAS Numbers │
│ ▼ │
│ ┌──────────────┐ ┌──────────────────┐ ┌────────────────────┐ │
│ │ PubChem │◄───│ Chemical │───►│ chemiq_chemical_ │ │
│ │ API │ │ Enrichment │ │ fetch_queue │ │
│ └──────┬───────┘ │ Service │ └────────────────────┘ │
│ │ └────────┬─────────┘ │
│ │ │ │
│ │ ▼ │
│ │ ┌──────────────────┐ ┌────────────────────┐ │
│ └───────────►│ Cache Manager │───►│ chemiq_pubchem_ │ │
│ │ │ │ cache │ │
│ └────────┬─────────┘ └────────────────────┘ │
│ │ │
│ │ Enriched Data │
│ ▼ │
│ ┌──────────────────┐ ┌────────────────────┐ │
│ │ PPE │───►│ ppe_sds_ │ │
│ │ Recommendation │ │ recommendations │ │
│ │ Engine │ └────────────────────┘ │
│ └──────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘

Component Interactions

┌────────────────────────────────────────────────────────────────────────────┐
│ Component Interaction Diagram │
├────────────────────────────────────────────────────────────────────────────┤
│ │
│ Frontend (React) │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ SDS Upload Modal ──► Inventory Detail ──► PPE Display Card │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │ │ ▲ │
│ │ POST /sds/upload │ GET /inventory/{id} │ │
│ ▼ ▼ │ │
│ Backend (FastAPI) │ │
│ ┌─────────────────────────────────────────────────────┴──────────────┐ │
│ │ SDS API ────────► Inventory API ────────► PPE Service │ │
│ │ │ │ │ │ │
│ │ ▼ │ │ │ │
│ │ Background Jobs │ │ │ │
│ │ ┌────────────────────────┴───────────────────────┐│ │ │
│ │ │ SDS Parser ──► Enrichment Service ──► PPE Calc ││ │ │
│ │ └────────────────────────────────────────────────┘│ │ │
│ │ │ │ │ │
│ │ ▼ ▼ │ │
│ │ ┌──────────────────────────────────┐ │ │
│ │ │ Chemical Fetch Worker │ │ │
│ │ │ (processes queue, calls PubChem) │ │ │
│ │ └──────────────────────────────────┘ │ │
│ └────────────────────────────────────────────────────────────────────┘ │
│ │
│ Database (PostgreSQL) │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ chemiq_sds_documents │ chemiq_sds_composition │ │
│ │ chemiq_pubchem_cache │ chemiq_chemical_fetch_queue │ │
│ │ ppe_sds_recommendations │ ppe_glove_chemical_resistance │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
│ External APIs │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ PubChem REST API │ NIOSH Data (pre-loaded) │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
│ │
└────────────────────────────────────────────────────────────────────────────┘

Implementation Phases

Phase 1: Database Schema & Models (Week 1)

Deliverables:

  • Create Alembic migration for chemiq_pubchem_cache table
  • Create Alembic migration for chemiq_chemical_fetch_queue table
  • Create SQLAlchemy models for new tables
  • Create Pydantic schemas for chemical enrichment data

Files to Create/Modify:

  • tellus-ehs-hazcom-service/app/db/models/pubchem_cache.py (new)
  • tellus-ehs-hazcom-service/app/db/models/fetch_queue.py (new)
  • tellus-ehs-hazcom-service/app/schemas/chemical.py (new)
  • tellus-ehs-hazcom-service/alembic/versions/XXXX_add_pubchem_cache.py (new)

Phase 2: External API Client (Week 1-2)

Deliverables:

  • Implement PubChem REST API client
  • Add retry logic and error handling
  • Add unit tests for API client
  • Handle rate limiting gracefully

Files to Create:

  • tellus-ehs-hazcom-service/app/services/pubchem_client.py (new)
  • tellus-ehs-hazcom-service/tests/services/test_pubchem_client.py (new)

Phase 3: Enrichment Service (Week 2)

Deliverables:

  • Implement ChemicalEnrichmentService
  • Add cache lookup and population logic
  • Implement queue management
  • Add bulk lookup optimization

Files to Create:

  • tellus-ehs-hazcom-service/app/services/chemical_enrichment_service.py (new)

Phase 4: Background Worker (Week 2-3)

Deliverables:

  • Implement ChemicalFetchWorker
  • Add worker to background service
  • Implement cache refresh logic
  • Add monitoring and logging

Files to Create/Modify:

  • tellus-ehs-background-service/app/workers/chemical_fetch_worker.py (new)
  • tellus-ehs-background-service/app/workers/cache_refresh_worker.py (new)
  • tellus-ehs-background-service/app/main.py (modify)

Phase 5: SDS Integration (Week 3)

Deliverables:

  • Integrate enrichment into SDS parsing pipeline
  • Queue CAS numbers after composition extraction
  • Update SDS processing status to reflect enrichment state

Files to Modify:

  • tellus-ehs-hazcom-service/app/services/sds_processing_service.py

Phase 6: PPE Enhancement (Week 3-4)

Deliverables:

  • Integrate enriched data into PPE recommendation engine
  • Use chemical family for glove material selection
  • Use regulatory limits for respiratory selection
  • Update PPE schemas to include enrichment source

Files to Modify:

  • tellus-ehs-hazcom-service/app/services/ppe_recommendation_service.py
  • Reference: docs/features/ppe_recommendation_system.md

Phase 7: Seed Data & Testing (Week 4)

Deliverables:

  • Pre-load NIOSH Pocket Guide data (~600 chemicals)
  • Pre-load common industrial chemical seed data (~100 chemicals)
  • Integration testing with real SDS documents
  • Performance testing for cache efficiency

Files to Create:

  • tellus-ehs-hazcom-service/app/data/seed_chemicals.json (new)
  • tellus-ehs-hazcom-service/app/scripts/seed_pubchem_cache.py (new)

Performance Considerations

API Rate Limiting

PubChem allows reasonable usage but recommends:

  • Max 5 requests per second
  • Use batch endpoints when possible
  • Cache aggressively to reduce API calls
# Rate limiter implementation
from asyncio import Semaphore
from time import time

class RateLimiter:
def __init__(self, requests_per_second: int = 5):
self.requests_per_second = requests_per_second
self.semaphore = Semaphore(requests_per_second)
self.last_reset = time()

async def acquire(self):
async with self.semaphore:
# Simple rate limiting
current = time()
if current - self.last_reset >= 1:
self.last_reset = current
yield

Cache Efficiency Metrics

Target metrics for monitoring:

  • Cache hit rate: > 95% after warm-up period
  • Average fetch latency: < 2 seconds
  • Queue processing time: < 5 minutes for batch uploads
  • Cache size growth: < 10 MB/month

Database Optimization

-- Partial index for pending queue items
CREATE INDEX idx_fetch_queue_pending
ON chemiq_chemical_fetch_queue(created_at)
WHERE status = 'pending';

-- Covering index for cache lookups
CREATE INDEX idx_pubchem_cache_lookup
ON chemiq_pubchem_cache(cas_number)
INCLUDE (chemical_family, osha_pel_ppm, vapor_pressure_mmhg);

Seed Data: Common Industrial Chemicals

The following ~100 chemicals should be pre-loaded into the cache:

SEED_CHEMICALS = [
# Solvents
{"cas": "67-64-1", "name": "Acetone", "family": "ketone"},
{"cas": "64-17-5", "name": "Ethanol", "family": "alcohol"},
{"cas": "67-56-1", "name": "Methanol", "family": "alcohol"},
{"cas": "78-93-3", "name": "Methyl Ethyl Ketone (MEK)", "family": "ketone"},
{"cas": "108-88-3", "name": "Toluene", "family": "aromatic"},
{"cas": "1330-20-7", "name": "Xylene", "family": "aromatic"},
{"cas": "71-43-2", "name": "Benzene", "family": "aromatic"},
{"cas": "110-82-7", "name": "Cyclohexane", "family": "aliphatic"},
{"cas": "142-82-5", "name": "n-Heptane", "family": "aliphatic"},
{"cas": "110-54-3", "name": "n-Hexane", "family": "aliphatic"},
{"cas": "75-09-2", "name": "Methylene Chloride", "family": "chlorinated"},
{"cas": "79-01-6", "name": "Trichloroethylene", "family": "chlorinated"},
{"cas": "127-18-4", "name": "Perchloroethylene", "family": "chlorinated"},
{"cas": "67-63-0", "name": "Isopropyl Alcohol", "family": "alcohol"},
{"cas": "71-36-3", "name": "n-Butanol", "family": "alcohol"},

# Acids
{"cas": "7664-93-9", "name": "Sulfuric Acid", "family": "inorganic_acid"},
{"cas": "7647-01-0", "name": "Hydrochloric Acid", "family": "inorganic_acid"},
{"cas": "7697-37-2", "name": "Nitric Acid", "family": "inorganic_acid"},
{"cas": "7664-38-2", "name": "Phosphoric Acid", "family": "inorganic_acid"},
{"cas": "64-19-7", "name": "Acetic Acid", "family": "organic_acid"},
{"cas": "7664-39-3", "name": "Hydrofluoric Acid", "family": "inorganic_acid"},

# Bases
{"cas": "1310-73-2", "name": "Sodium Hydroxide", "family": "inorganic_base"},
{"cas": "1310-58-3", "name": "Potassium Hydroxide", "family": "inorganic_base"},
{"cas": "7664-41-7", "name": "Ammonia", "family": "inorganic_base"},
{"cas": "141-43-5", "name": "Ethanolamine", "family": "amine"},

# Oxidizers
{"cas": "7722-84-1", "name": "Hydrogen Peroxide", "family": "oxidizer"},
{"cas": "7681-52-9", "name": "Sodium Hypochlorite", "family": "oxidizer"},

# Fuels & Petroleum
{"cas": "8006-61-9", "name": "Gasoline", "family": "petroleum"},
{"cas": "68476-34-6", "name": "Diesel Fuel", "family": "petroleum"},
{"cas": "8008-20-6", "name": "Kerosene", "family": "petroleum"},
{"cas": "64742-47-8", "name": "Mineral Spirits", "family": "petroleum"},

# Common Industrial Chemicals
{"cas": "50-00-0", "name": "Formaldehyde", "family": "aldehyde"},
{"cas": "107-21-1", "name": "Ethylene Glycol", "family": "glycol"},
{"cas": "57-55-6", "name": "Propylene Glycol", "family": "glycol"},
{"cas": "111-76-2", "name": "2-Butoxyethanol", "family": "glycol_ether"},
{"cas": "109-86-4", "name": "2-Methoxyethanol", "family": "glycol_ether"},
{"cas": "75-56-9", "name": "Propylene Oxide", "family": "epoxide"},
{"cas": "75-21-8", "name": "Ethylene Oxide", "family": "epoxide"},

# Monomers & Polymers
{"cas": "100-42-5", "name": "Styrene", "family": "monomer"},
{"cas": "80-62-6", "name": "Methyl Methacrylate", "family": "monomer"},
{"cas": "75-01-4", "name": "Vinyl Chloride", "family": "monomer"},
{"cas": "107-13-1", "name": "Acrylonitrile", "family": "monomer"},

# Gases
{"cas": "7782-44-7", "name": "Oxygen", "family": "gas"},
{"cas": "7727-37-9", "name": "Nitrogen", "family": "gas"},
{"cas": "124-38-9", "name": "Carbon Dioxide", "family": "gas"},
{"cas": "630-08-0", "name": "Carbon Monoxide", "family": "gas"},
{"cas": "7783-06-4", "name": "Hydrogen Sulfide", "family": "gas"},
{"cas": "7782-50-5", "name": "Chlorine", "family": "gas"},

# Pesticides (common active ingredients)
{"cas": "1912-24-9", "name": "Atrazine", "family": "pesticide"},
{"cas": "1071-83-6", "name": "Glyphosate", "family": "pesticide"},
{"cas": "333-41-5", "name": "Diazinon", "family": "pesticide"},
{"cas": "2921-88-2", "name": "Chlorpyrifos", "family": "pesticide"},

# Heavy Metals (compounds)
{"cas": "7439-92-1", "name": "Lead", "family": "heavy_metal"},
{"cas": "7439-97-6", "name": "Mercury", "family": "heavy_metal"},
{"cas": "7440-43-9", "name": "Cadmium", "family": "heavy_metal"},
{"cas": "7440-47-3", "name": "Chromium", "family": "heavy_metal"},
{"cas": "7440-38-2", "name": "Arsenic", "family": "heavy_metal"},

# Isocyanates
{"cas": "584-84-9", "name": "Toluene Diisocyanate (TDI)", "family": "isocyanate"},
{"cas": "101-68-8", "name": "MDI (Diphenylmethane Diisocyanate)", "family": "isocyanate"},
{"cas": "822-06-0", "name": "Hexamethylene Diisocyanate (HDI)", "family": "isocyanate"},

# Epoxies
{"cas": "1675-54-3", "name": "Bisphenol A Diglycidyl Ether", "family": "epoxy"},
{"cas": "25068-38-6", "name": "Epoxy Resin (generic)", "family": "epoxy"},

# Welding Fumes Components
{"cas": "1309-37-1", "name": "Iron Oxide", "family": "metal_oxide"},
{"cas": "1313-27-5", "name": "Molybdenum Trioxide", "family": "metal_oxide"},
{"cas": "1314-13-2", "name": "Zinc Oxide", "family": "metal_oxide"},
{"cas": "1317-38-0", "name": "Copper Oxide", "family": "metal_oxide"},

# Cleaning Agents
{"cas": "111-42-2", "name": "Diethanolamine", "family": "amine"},
{"cas": "102-71-6", "name": "Triethanolamine", "family": "amine"},
{"cas": "7173-51-5", "name": "Didecyldimethylammonium Chloride", "family": "quaternary_ammonium"},
{"cas": "68391-01-5", "name": "Benzalkonium Chloride", "family": "quaternary_ammonium"},

# Lubricants & Oils
{"cas": "8012-95-1", "name": "Mineral Oil", "family": "oil"},
{"cas": "8042-47-5", "name": "White Mineral Oil", "family": "oil"},
{"cas": "64742-54-7", "name": "Heavy Paraffinic Distillate", "family": "oil"},

# Silicones
{"cas": "63148-62-9", "name": "Polydimethylsiloxane", "family": "silicone"},
{"cas": "107-46-0", "name": "Hexamethyldisiloxane", "family": "silicone"},

# Paint Components
{"cas": "13463-67-7", "name": "Titanium Dioxide", "family": "pigment"},
{"cas": "1333-86-4", "name": "Carbon Black", "family": "pigment"},
{"cas": "7727-43-7", "name": "Barium Sulfate", "family": "filler"},
{"cas": "14807-96-6", "name": "Talc", "family": "filler"},

# Refrigerants
{"cas": "75-71-8", "name": "Dichlorodifluoromethane (R-12)", "family": "refrigerant"},
{"cas": "75-45-6", "name": "Chlorodifluoromethane (R-22)", "family": "refrigerant"},
{"cas": "811-97-2", "name": "1,1,1,2-Tetrafluoroethane (R-134a)", "family": "refrigerant"},

# Additional Common Chemicals
{"cas": "7758-29-4", "name": "Sodium Tripolyphosphate", "family": "phosphate"},
{"cas": "497-19-8", "name": "Sodium Carbonate", "family": "carbonate"},
{"cas": "144-55-8", "name": "Sodium Bicarbonate", "family": "carbonate"},
{"cas": "7647-14-5", "name": "Sodium Chloride", "family": "salt"},
{"cas": "10043-52-4", "name": "Calcium Chloride", "family": "salt"},
{"cas": "56-81-5", "name": "Glycerol", "family": "polyol"},
{"cas": "75-05-8", "name": "Acetonitrile", "family": "nitrile"},
{"cas": "110-91-8", "name": "Morpholine", "family": "amine"},
{"cas": "60-00-4", "name": "EDTA", "family": "chelating_agent"},
{"cas": "112-34-5", "name": "Diethylene Glycol Monobutyl Ether", "family": "glycol_ether"},
]

References

External Resources

API Response Examples

PubChem Compound Properties:

{
"PropertyTable": {
"Properties": [{
"CID": 180,
"MolecularFormula": "C3H6O",
"MolecularWeight": 58.08,
"IUPACName": "propan-2-one",
"XLogP": -0.2,
"TPSA": 17.1
}]
}
}

PubChem GHS Classification:

{
"Hierarchies": {
"Hierarchy": [{
"SourceName": "European Chemicals Agency (ECHA) GHS",
"Node": [
{"Information": {"Name": "H225"}},
{"Information": {"Name": "H319"}},
{"Information": {"Name": "H336"}},
{"Information": {"Name": "GHS02"}},
{"Information": {"Name": "GHS07"}},
{"Information": {"Name": "Danger"}}
]
}]
}
}

Appendix: Integration with PPE System

The chemical database enrichment directly supports the PPE Recommendation System documented in PPE Recommendation.

Data Flow: Chemical Data to PPE Recommendations

┌────────────────────────────────────────────────────────────────────────┐
│ Chemical Data ──► PPE Recommendation Flow │
├────────────────────────────────────────────────────────────────────────┤
│ │
│ chemiq_pubchem_cache │
│ ┌─────────────────────────────────────────────────────────────────┐ │
│ │ chemical_family ──────────────► Glove Material Selection │ │
│ │ (ketone → Nitrile/Butyl) │ │
│ │ │ │
│ │ vapor_pressure_mmhg ──────────► Respiratory APF Calculation │ │
│ │ osha_pel_ppm (high VP + low PEL = higher APF)│ │
│ │ │ │
│ │ ghs_classification ───────────► Eye Protection Selection │ │
│ │ (H314 → Goggles + Face Shield) │ │
│ │ │ │
│ │ niosh_idlh_ppm ───────────────► Respiratory Urgency Level │ │
│ │ (low IDLH = SCBA required) │ │
│ └─────────────────────────────────────────────────────────────────┘ │
│ │
└────────────────────────────────────────────────────────────────────────┘

Enhancement Examples

Without Enrichment (SDS-only):

SDS Section 8 says: "Wear chemical resistant gloves"
→ Recommendation: Generic chemical-resistant gloves

With Enrichment:

SDS Section 3: Contains Acetone (67-64-1)
Cache lookup: chemical_family = "ketone"
→ Recommendation: Butyl rubber gloves (breakthrough time 480+ min for ketones)

Appendix: Calculated Exposure Controls

The chemical database provides authoritative exposure limit data that enables intelligent workplace exposure assessments and controls.

The Problem with SDS Exposure Limits

SDS documents often have:

  • Missing values: Many SDS don't include all exposure limits
  • Outdated values: PELs may not reflect current NIOSH RELs or ACGIH TLVs
  • Inconsistent units: mg/m³ vs ppm without conversion factors
  • No context: No indication of when limits were last updated

Authoritative Exposure Limit Data

The chemiq_pubchem_cache table stores multiple exposure limit types:

FieldSourceDescription
osha_pel_ppmOSHAPermissible Exposure Limit (legal requirement)
osha_pel_mg_m3OSHAPEL in mg/m³ (for particulates/non-volatiles)
niosh_rel_ppmNIOSHRecommended Exposure Limit (best practice)
niosh_rel_mg_m3NIOSHREL in mg/m³
niosh_idlh_ppmNIOSHImmediately Dangerous to Life or Health
acgih_tlv_ppmACGIHThreshold Limit Value (industry consensus)

Exposure Assessment Algorithm

# app/services/exposure_assessment_service.py

from typing import List, Optional
from dataclasses import dataclass
from decimal import Decimal

@dataclass
class ExposureAssessment:
"""Result of exposure assessment for a chemical mixture."""
# Calculated limits
mixture_pel_ppm: Optional[Decimal]
mixture_rel_ppm: Optional[Decimal]
# Risk indicators
exposure_index: float # Sum of (concentration / limit) ratios
is_overexposed: bool
limiting_chemical: Optional[str] # Chemical driving the limit
# Recommendations
ventilation_required: bool
ppe_required: bool
monitoring_frequency: str # 'routine', 'frequent', 'continuous'
warnings: List[str]

class ExposureAssessmentService:
"""
Calculate workplace exposure controls using enriched chemical data.

Implements OSHA mixture calculation formula:
Em = (C1/L1) + (C2/L2) + ... + (Cn/Ln)
Where Em <= 1 for compliant exposure
"""

def __init__(self, db: Session):
self.db = db
self.enrichment_service = ChemicalEnrichmentService(db)

async def assess_mixture_exposure(
self,
composition: List[SDSComposition],
use_limit_type: str = 'osha_pel' # 'osha_pel', 'niosh_rel', 'acgih_tlv'
) -> ExposureAssessment:
"""
Calculate exposure index for a chemical mixture.

Per OSHA 29 CFR 1910.1000(d)(2)(i):
"When the components have the same type of effect on the same organ/system,
their combined effect should be considered additive."
"""
exposure_index = 0.0
limiting_chemical = None
lowest_limit = float('inf')
warnings = []

for component in composition:
if not component.cas_number or not component.concentration_percent:
continue

# Get enriched data for this chemical
enriched = await self.enrichment_service.get_chemical_data(
component.cas_number,
fetch_if_missing=True
)

if not enriched:
warnings.append(
f"No exposure data for {component.chemical_name} "
f"(CAS {component.cas_number})"
)
continue

# Get the appropriate limit
limit = self._get_exposure_limit(enriched, use_limit_type)

if limit is None or limit == 0:
warnings.append(
f"No {use_limit_type} limit for {component.chemical_name}"
)
continue

# Assume worst-case: concentration in air = concentration in product
# (This is conservative; actual air concentration depends on many factors)
concentration_ppm = self._estimate_air_concentration(
component.concentration_percent,
enriched.vapor_pressure_mmhg
)

# Calculate component's contribution to exposure index
ratio = concentration_ppm / limit
exposure_index += ratio

# Track the limiting chemical
if limit < lowest_limit:
lowest_limit = limit
limiting_chemical = component.chemical_name

# Check IDLH
if enriched.niosh_idlh_ppm and concentration_ppm > enriched.niosh_idlh_ppm * 0.1:
warnings.append(
f"WARNING: {component.chemical_name} may approach IDLH "
f"({enriched.niosh_idlh_ppm} ppm)"
)

# Determine controls based on exposure index
is_overexposed = exposure_index > 1.0
ventilation_required = exposure_index > 0.5
ppe_required = exposure_index > 0.25

monitoring_frequency = 'routine'
if exposure_index > 0.5:
monitoring_frequency = 'frequent'
if exposure_index > 0.8:
monitoring_frequency = 'continuous'

return ExposureAssessment(
mixture_pel_ppm=Decimal(str(lowest_limit)) if lowest_limit != float('inf') else None,
mixture_rel_ppm=None, # Would calculate separately for NIOSH REL
exposure_index=round(exposure_index, 3),
is_overexposed=is_overexposed,
limiting_chemical=limiting_chemical,
ventilation_required=ventilation_required,
ppe_required=ppe_required,
monitoring_frequency=monitoring_frequency,
warnings=warnings
)

def _get_exposure_limit(
self,
enriched: ChemicalEnrichmentData,
limit_type: str
) -> Optional[float]:
"""Get the appropriate exposure limit based on type."""
if limit_type == 'osha_pel':
return enriched.osha_pel_ppm
elif limit_type == 'niosh_rel':
return enriched.niosh_rel_ppm
elif limit_type == 'acgih_tlv':
return enriched.acgih_tlv_ppm
# Default to most protective available
return enriched.niosh_rel_ppm or enriched.osha_pel_ppm or enriched.acgih_tlv_ppm

def _estimate_air_concentration(
self,
concentration_percent: float,
vapor_pressure_mmhg: Optional[float]
) -> float:
"""
Estimate potential air concentration based on composition and volatility.

This is a simplified model. Real exposure depends on:
- Application method (spray, brush, dip)
- Ventilation
- Temperature
- Surface area exposed
"""
if vapor_pressure_mmhg is None:
# Conservative default for unknown volatility
vapor_pressure_mmhg = 10.0

# Simplified saturation calculation
# At 25°C, 1 atm: ppm = (VP_mmHg / 760) * 1,000,000
saturation_ppm = (vapor_pressure_mmhg / 760) * 1_000_000

# Assume worst case: 10% of saturation concentration
# adjusted by component percentage
estimated_ppm = saturation_ppm * 0.1 * (concentration_percent / 100)

return estimated_ppm

Exposure Control Dashboard Display

// Frontend display of calculated exposure controls

interface ExposureControlsDisplay {
// Calculated exposure index
exposureIndex: number; // 0.0 - 2.0+ (>1.0 = overexposed)
exposureStatus: 'safe' | 'caution' | 'action_required' | 'danger';

// Per-chemical breakdown
chemicalContributions: {
chemicalName: string;
casNumber: string;
concentration: number; // % in product
exposureLimit: number; // PEL/REL in ppm
limitSource: 'OSHA PEL' | 'NIOSH REL' | 'ACGIH TLV';
contributionRatio: number; // concentration / limit
}[];

// Recommended controls
engineeringControls: {
ventilationType: 'general' | 'local_exhaust' | 'enclosed';
airChangesPerHour: number;
required: boolean;
};

monitoringRequirements: {
frequency: 'initial' | 'routine' | 'frequent' | 'continuous';
method: string; // e.g., "Direct reading instrument", "Personal sampling"
recordRetention: string; // e.g., "30 years" for carcinogens
};

// Warnings
warnings: string[];
idlhChemicals: string[]; // Chemicals with IDLH concerns
}

Exposure Limit Comparison View

When SDS data and cached data differ, show comparison:

┌────────────────────────────────────────────────────────────────────────┐
│ Exposure Limits: Acetone (67-64-1) │
├────────────────────────────────────────────────────────────────────────┤
│ │
│ Source │ TWA (ppm) │ STEL (ppm) │ Notes │
│ ───────────────────────────────────────────────────────────────────── │
│ SDS (extracted) │ 500 │ — │ Manufacturer data │
│ OSHA PEL │ 1000 │ — │ 29 CFR 1910.1000 │
│ NIOSH REL │ 250 │ — │ Current best practice │
│ ACGIH TLV │ 250 │ 500 │ 2024 update │
│ │
│ ⚠️ Recommendation: Use NIOSH REL (250 ppm) - more protective │
│ │
└────────────────────────────────────────────────────────────────────────┘

Appendix: Regulatory Compliance Features

The enriched chemical database enables automated regulatory compliance checking beyond basic SDS requirements.

Regulatory Lists Integration

-- Track regulatory list membership for cached chemicals
CREATE TABLE chemiq_regulatory_lists (
list_id UUID PRIMARY KEY DEFAULT gen_random_uuid(),

cas_number VARCHAR(20) NOT NULL,

-- Regulatory programs
is_osha_pel_listed BOOLEAN DEFAULT FALSE,
is_niosh_rel_listed BOOLEAN DEFAULT FALSE,
is_acgih_tlv_listed BOOLEAN DEFAULT FALSE,
is_epa_sara_313 BOOLEAN DEFAULT FALSE, -- TRI reporting
is_epa_cercla BOOLEAN DEFAULT FALSE, -- Superfund
is_california_prop65 BOOLEAN DEFAULT FALSE, -- CA Prop 65 carcinogen/reproductive
is_eu_reach BOOLEAN DEFAULT FALSE, -- EU REACH
is_carcinogen BOOLEAN DEFAULT FALSE, -- IARC/NTP listed
carcinogen_classification VARCHAR(50), -- 'Group 1', 'Group 2A', etc.

-- Reporting thresholds
sara_313_threshold_lbs INT, -- TRI reporting threshold
cercla_rq_lbs INT, -- Reportable quantity
prop65_warning_required BOOLEAN DEFAULT FALSE,

-- Sources
last_verified_at TIMESTAMP WITH TIME ZONE,
source_urls JSONB,

created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),

UNIQUE(cas_number)
);

CREATE INDEX idx_regulatory_lists_cas ON chemiq_regulatory_lists(cas_number);
CREATE INDEX idx_regulatory_sara313 ON chemiq_regulatory_lists(is_epa_sara_313) WHERE is_epa_sara_313 = TRUE;
CREATE INDEX idx_regulatory_prop65 ON chemiq_regulatory_lists(is_california_prop65) WHERE is_california_prop65 = TRUE;

Compliance Check Service

# app/services/compliance_check_service.py

from typing import List, Dict
from dataclasses import dataclass, field

@dataclass
class ComplianceCheckResult:
"""Result of regulatory compliance check for a chemical or product."""

# Overall status
is_compliant: bool
compliance_score: float # 0-100

# OSHA requirements
osha_requirements: Dict = field(default_factory=dict)
# Example: {
# 'hazcom_required': True,
# 'exposure_monitoring': 'required',
# 'medical_surveillance': 'required',
# 'recordkeeping': '30_years'
# }

# EPA requirements
epa_requirements: Dict = field(default_factory=dict)
# Example: {
# 'sara_313_reporting': True,
# 'sara_313_threshold_lbs': 10000,
# 'cercla_reportable': True,
# 'cercla_rq_lbs': 5000,
# 'tier2_reporting': True
# }

# State requirements
state_requirements: List[Dict] = field(default_factory=list)
# Example: [{
# 'state': 'CA',
# 'program': 'Prop 65',
# 'requirement': 'warning_label',
# 'warning_text': 'This product contains chemicals known...'
# }]

# Action items
required_actions: List[str] = field(default_factory=list)
warnings: List[str] = field(default_factory=list)

class ComplianceCheckService:
"""
Check regulatory compliance for chemicals and chemical inventories.
"""

def __init__(self, db: Session):
self.db = db
self.enrichment_service = ChemicalEnrichmentService(db)

async def check_product_compliance(
self,
composition: List[SDSComposition],
company_state: str = None, # For state-specific requirements
annual_usage_lbs: float = None # For threshold calculations
) -> ComplianceCheckResult:
"""
Check regulatory compliance for a product based on composition.
"""
result = ComplianceCheckResult(is_compliant=True, compliance_score=100)

for component in composition:
if not component.cas_number:
continue

# Get regulatory data
reg_data = self.db.query(ChemiqRegulatoryLists).filter(
ChemiqRegulatoryLists.cas_number == component.cas_number
).first()

if not reg_data:
# Queue for enrichment
result.warnings.append(
f"No regulatory data for {component.chemical_name} - "
"compliance status unknown"
)
continue

# Check OSHA requirements
self._check_osha_requirements(result, reg_data, component)

# Check EPA requirements
if annual_usage_lbs:
self._check_epa_requirements(result, reg_data, component, annual_usage_lbs)

# Check state requirements
if company_state:
self._check_state_requirements(result, reg_data, component, company_state)

# Check carcinogen status
if reg_data.is_carcinogen:
result.required_actions.append(
f"CARCINOGEN: {component.chemical_name} is classified as "
f"{reg_data.carcinogen_classification}. Special controls required."
)
result.osha_requirements['medical_surveillance'] = 'required'
result.osha_requirements['recordkeeping'] = '30_years'
result.compliance_score -= 20

result.is_compliant = result.compliance_score >= 70
return result

def _check_osha_requirements(
self,
result: ComplianceCheckResult,
reg_data: ChemiqRegulatoryLists,
component: SDSComposition
):
"""Check OSHA-specific requirements."""

# HazCom is always required for hazardous chemicals
if component.is_hazardous:
result.osha_requirements['hazcom_required'] = True
result.required_actions.append(
f"HazCom: Ensure {component.chemical_name} is on chemical inventory "
"and workers are trained"
)

# Exposure monitoring for PEL-listed chemicals
if reg_data.is_osha_pel_listed:
result.osha_requirements['exposure_monitoring'] = 'recommended'

# Get enriched data for specific limit
enriched = self.db.query(PubChemCache).filter(
PubChemCache.cas_number == component.cas_number
).first()

if enriched and enriched.osha_pel_ppm:
result.osha_requirements['pel_ppm'] = enriched.osha_pel_ppm
result.required_actions.append(
f"EXPOSURE: Monitor exposure to {component.chemical_name} "
f"(PEL: {enriched.osha_pel_ppm} ppm)"
)

def _check_epa_requirements(
self,
result: ComplianceCheckResult,
reg_data: ChemiqRegulatoryLists,
component: SDSComposition,
annual_usage_lbs: float
):
"""Check EPA reporting requirements."""

# SARA 313 / TRI reporting
if reg_data.is_epa_sara_313:
threshold = reg_data.sara_313_threshold_lbs or 10000

# Estimate component usage
component_usage = annual_usage_lbs * (component.concentration_percent / 100)

if component_usage >= threshold:
result.epa_requirements['sara_313_reporting'] = True
result.epa_requirements['sara_313_chemical'] = component.chemical_name
result.required_actions.append(
f"TRI REPORTING: {component.chemical_name} usage "
f"({component_usage:.0f} lbs) exceeds SARA 313 threshold "
f"({threshold} lbs). Form R required."
)
result.compliance_score -= 10

# CERCLA reportable quantities
if reg_data.is_epa_cercla and reg_data.cercla_rq_lbs:
result.epa_requirements['cercla_rq_lbs'] = reg_data.cercla_rq_lbs
result.required_actions.append(
f"SPILL REPORTING: {component.chemical_name} releases >= "
f"{reg_data.cercla_rq_lbs} lbs must be reported to NRC"
)

def _check_state_requirements(
self,
result: ComplianceCheckResult,
reg_data: ChemiqRegulatoryLists,
component: SDSComposition,
company_state: str
):
"""Check state-specific requirements."""

# California Prop 65
if company_state == 'CA' and reg_data.is_california_prop65:
result.state_requirements.append({
'state': 'CA',
'program': 'Proposition 65',
'requirement': 'warning_label',
'chemical': component.chemical_name,
'warning_text': (
"WARNING: This product contains chemicals known to the "
"State of California to cause cancer and/or reproductive harm."
)
})
result.required_actions.append(
f"PROP 65: {component.chemical_name} requires California Prop 65 warning"
)
result.compliance_score -= 5

Regulatory Compliance Dashboard

// Frontend interface for compliance dashboard

interface ComplianceDashboard {
// Overall site compliance
siteComplianceScore: number; // 0-100

// Chemical inventory compliance
totalChemicals: number;
chemicalsWithCompleteData: number;
chemicalsNeedingReview: number;

// Regulatory program status
programs: {
name: string; // 'OSHA HazCom', 'EPA TRI', 'CA Prop 65'
status: 'compliant' | 'action_needed' | 'non_compliant';
nextDeadline?: Date;
actionItems: string[];
}[];

// Upcoming reporting deadlines
upcomingDeadlines: {
program: string;
deadline: Date;
chemicalsAffected: string[];
estimatedEffort: string;
}[];

// High-priority chemicals
priorityChemicals: {
chemicalName: string;
casNumber: string;
concerns: string[]; // ['carcinogen', 'TRI_reportable', 'prop65']
requiredActions: string[];
}[];
}

Compliance Report Generation

async def generate_compliance_report(
db: Session,
company_id: UUID,
site_id: UUID,
report_year: int
) -> ComplianceReport:
"""
Generate annual compliance report for a site.

Includes:
- Chemical inventory summary
- Regulatory program status
- TRI reporting data (if applicable)
- Exposure monitoring summary
- Training records summary
"""

# Get all inventory items for site
inventory = db.query(ChemIQInventory).filter(
ChemIQInventory.site_id == site_id,
ChemIQInventory.is_active == True
).all()

report = ComplianceReport(
company_id=company_id,
site_id=site_id,
report_year=report_year,
generated_at=datetime.utcnow()
)

compliance_service = ComplianceCheckService(db)

for item in inventory:
# Get composition from SDS
composition = db.query(SDSComposition).filter(
SDSComposition.sds_id == item.company_product.current_sds_id
).all()

if composition:
# Check compliance for this product
check_result = await compliance_service.check_product_compliance(
composition=composition,
company_state=site.state,
annual_usage_lbs=item.annual_usage_estimate
)

report.add_chemical_check(item, check_result)

# Aggregate TRI reportable chemicals
report.calculate_tri_summary()

# Generate Prop 65 warning requirements
report.generate_prop65_warnings()

return report

Data Flow: Enriched Data to Compliance

┌────────────────────────────────────────────────────────────────────────┐
│ Chemical Data ──► Regulatory Compliance Flow │
├────────────────────────────────────────────────────────────────────────┤
│ │
│ chemiq_pubchem_cache chemiq_regulatory_lists │
│ ┌─────────────────────┐ ┌────────────────────────┐ │
│ │ cas_number │──────────│ cas_number │ │
│ │ chemical_family │ │ is_epa_sara_313 │──┐ │
│ │ osha_pel_ppm │──┐ │ sara_313_threshold_lbs │ │ │
│ │ niosh_rel_ppm │ │ │ is_california_prop65 │ │ │
│ │ niosh_idlh_ppm │ │ │ is_carcinogen │ │ │
│ └─────────────────────┘ │ └────────────────────────┘ │ │
│ │ │ │
│ ▼ ▼ │
│ ┌──────────────────────────────────────────────┐ │
│ │ Compliance Check Service │ │
│ │ ─────────────────────────────────────────── │ │
│ │ • Calculate exposure indices │ │
│ │ • Check TRI reporting thresholds │ │
│ │ • Identify Prop 65 requirements │ │
│ │ • Flag carcinogens for special handling │ │
│ │ • Generate required action items │ │
│ └────────────────────────┬─────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────────┐ │
│ │ Compliance Dashboard │ │
│ │ • Site compliance score │ │
│ │ • Upcoming deadlines │ │
│ │ • Required actions │ │
│ │ • Regulatory reports │ │
│ └──────────────────────────────────────────────┘ │
│ │
└────────────────────────────────────────────────────────────────────────┘

Regulatory Data Sources for Enrichment

Regulatory ProgramData SourceUpdate Frequency
OSHA PELsOSHA Annotated PEL TablesAnnual
NIOSH RELsNIOSH Pocket GuideAnnual
ACGIH TLVsACGIH (subscription)Annual
EPA SARA 313EPA TRI Chemical ListAnnual (March)
EPA CERCLA RQs40 CFR 302.4As amended
CA Prop 65OEHHA Chemical ListQuarterly
IARC CarcinogensIARC MonographsAs published
NTP CarcinogensReport on CarcinogensBiennial

Pre-Seeding Regulatory Data

The NIOSH Pocket Guide (~600 chemicals) provides excellent seed data with:

  • Exposure limits (REL, PEL, IDLH)
  • Carcinogen classifications
  • Respirator recommendations
# Seed NIOSH Pocket Guide data
async def seed_niosh_pocket_guide(db: Session):
"""
Pre-load NIOSH Pocket Guide data into cache and regulatory tables.
"""
niosh_data = load_niosh_pocket_guide_json() # Pre-downloaded JSON

for chemical in niosh_data:
# Update PubChem cache
cache_entry = PubChemCache(
cas_number=chemical['cas'],
common_name=chemical['name'],
niosh_rel_ppm=chemical.get('rel_ppm'),
niosh_rel_mg_m3=chemical.get('rel_mg_m3'),
osha_pel_ppm=chemical.get('pel_ppm'),
osha_pel_mg_m3=chemical.get('pel_mg_m3'),
niosh_idlh_ppm=chemical.get('idlh_ppm'),
priority_level=1, # Seed data
source='niosh_npg'
)
db.merge(cache_entry)

# Update regulatory lists
reg_entry = ChemiqRegulatoryLists(
cas_number=chemical['cas'],
is_osha_pel_listed=chemical.get('pel_ppm') is not None,
is_niosh_rel_listed=chemical.get('rel_ppm') is not None,
is_carcinogen=chemical.get('carcinogen', False),
carcinogen_classification=chemical.get('carcinogen_class'),
last_verified_at=datetime.utcnow()
)
db.merge(reg_entry)

db.commit()