Skip to main content

Entitlement Architecture & Implementation Plan

Table of Contents

  1. Overview
  2. Architecture Principles
  3. Data Model
  4. Three-Layer Permission Model
  5. Implementation Plan
  6. Code Examples
  7. Testing Strategy
  8. Rollout Plan

Overview

This document describes the comprehensive entitlement system for the Tellus EHS HazCom platform, which controls access to modules, features, and capabilities based on subscription plans (Starter, Standard, Pro) and enforces usage limits.

Key Concepts

  • Module: Top-level functional area (e.g., CHEMIQ, PLAN, LABELS)
  • Feature: Specific functionality within a module (e.g., SDS_BINDER, INVENTORY)
  • Capability: Individual function within a feature (e.g., BULK_UPLOAD, AI_EXTRACTION)
  • Entitlement: Permission or limit defined at plan level
  • Limit: Usage constraint (e.g., max 100 SDS uploads, 5 users)

Current State

The existing data model has:

  • modules table with categories (REQUIRED, USER_OPTION, UNDERLYING)
  • plan_entitlements table linking plans to modules
  • entitlement_definitions with types: feature and limit
  • company_entitlement_overrides for per-company exceptions

Gap Identified

The docs/tabular.csv file reveals feature-level and capability-level granularity that isn't currently tracked in the database. We need to extend the data model to support this.


Architecture Principles

  1. Centralized Control: All entitlement checks go through EntitlementService
  2. Explicit Over Implicit: Clear decorators make permissions visible in code
  3. Fail Secure: Default to denying access when entitlement is unclear
  4. User-Friendly: Provide clear upgrade paths in error messages
  5. Performance: Cache entitlement checks, minimize DB queries
  6. Overridable: Support company-specific overrides for special cases
  7. Auditable: Log all entitlement checks and limit violations

Data Model

Current Tables

-- Existing: modules table
CREATE TABLE modules (
module_id UUID PRIMARY KEY,
code VARCHAR(50) UNIQUE NOT NULL,
name VARCHAR(255) NOT NULL,
description TEXT,
category VARCHAR(20) NOT NULL, -- REQUIRED, USER_OPTION, UNDERLYING
parent_module_id UUID REFERENCES modules(module_id),
is_visible BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT NOW()
);

-- Existing: entitlement_definitions table
CREATE TABLE entitlement_definitions (
entitlement_id UUID PRIMARY KEY,
code VARCHAR(100) UNIQUE NOT NULL,
name VARCHAR(255) NOT NULL,
type VARCHAR(20) NOT NULL, -- 'feature' or 'limit'
unit VARCHAR(50), -- 'count', 'per_month', 'per_minute', etc.
description TEXT,
created_at TIMESTAMP DEFAULT NOW()
);

-- Existing: plan_entitlements table
CREATE TABLE plan_entitlements (
plan_entitlement_id UUID PRIMARY KEY,
plan_version_id UUID NOT NULL REFERENCES plan_versions(plan_version_id),
entitlement_id UUID NOT NULL REFERENCES entitlement_definitions(entitlement_id),
feature_enabled BOOLEAN,
limit_value BIGINT, -- NULL = unlimited
module_id UUID REFERENCES modules(module_id),
is_selectable BOOLEAN DEFAULT TRUE,
feature_level TEXT, -- 'basic', 'standard', 'advanced'
created_at TIMESTAMP DEFAULT NOW()
);

-- Existing: company_entitlement_overrides table
CREATE TABLE company_entitlement_overrides (
override_id UUID PRIMARY KEY,
company_id UUID NOT NULL REFERENCES companies(company_id),
entitlement_id UUID NOT NULL REFERENCES entitlement_definitions(entitlement_id),
feature_enabled BOOLEAN,
limit_value BIGINT,
reason TEXT,
created_at TIMESTAMP DEFAULT NOW()
);

New Tables (Phase 2 - Optional Enhancement)

-- New: module_features table (optional - for feature-level granularity)
CREATE TABLE module_features (
feature_id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
module_id UUID NOT NULL REFERENCES modules(module_id),
feature_code VARCHAR(100) NOT NULL, -- e.g., 'SDS_BINDER', 'INVENTORY'
feature_name VARCHAR(255) NOT NULL,
description TEXT,
created_at TIMESTAMP DEFAULT NOW(),
UNIQUE(module_id, feature_code)
);

-- New: feature_capabilities table (optional - for capability-level granularity)
CREATE TABLE feature_capabilities (
capability_id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
feature_id UUID NOT NULL REFERENCES module_features(feature_id),
capability_code VARCHAR(100) NOT NULL, -- e.g., 'BULK_UPLOAD', 'AI_EXTRACTION'
capability_name VARCHAR(255) NOT NULL,
description TEXT,
min_plan_level VARCHAR(20) NOT NULL, -- 'starter', 'standard', 'pro'
created_at TIMESTAMP DEFAULT NOW(),
UNIQUE(feature_id, capability_code)
);

Note: The new tables are optional for Phase 1. We can start with the existing schema and use naming conventions in entitlement_definitions.code to encode module/feature/capability hierarchy (e.g., CHEMIQ_SDS_BINDER_BULK_UPLOAD).


Three-Layer Permission Model

Layer 1: Module-Level Access

Question: Does the company have this module enabled in their plan?

Examples:

  • CHEMIQ (Chemical Inventory & SDS Binder)
  • PLAN (HazCom Plan Builder)
  • LABELS (GHS Labeling)

Implementation:

@require_module("CHEMIQ")
async def list_sds(...):
# Only accessible if company has CHEMIQ module
pass

Layer 2: Feature-Level Access

Question: Does the company have access to this feature within the module?

Examples:

  • CHEMIQ → SDS_BINDER
  • CHEMIQ → INVENTORY
  • CHEMIQ → CLASSIFICATION

Implementation:

# For most cases, if module is enabled, all features are accessible
# Feature-level checks are rare and usually handled implicitly
if not entitlement_service.check_feature_access(company_id, "CHEMIQ", "SDS_BINDER"):
raise HTTPException(403, "Feature not available")

Layer 3: Capability-Level Access (Plan Tier)

Question: Does the company's plan tier (Starter/Standard/Pro) support this capability?

Examples:

  • Starter: Upload single SDS
  • Standard: Bulk upload SDS
  • Pro: AI metadata extraction from SDS

Implementation:

@require_capability("CHEMIQ", "SDS_BINDER", "BULK_UPLOAD")
async def bulk_upload_sds(...):
# Only accessible to Standard or Pro plans
pass

Layer 4: Usage Limits

Question: Has the company exceeded their usage limits?

Examples:

  • Starter: 100 total SDS, 5 users, 1 site
  • Standard: 500 SDS, 25 users, 10 sites
  • Pro: Unlimited

Implementation:

@enforce_limit("LIMIT_SDS_UPLOADS")
async def upload_sds(...):
# Blocks if limit exceeded
pass

Implementation Plan

Phase 1: Core Infrastructure (Week 1-2)

1.1 Create EntitlementService

File: app/services/entitlement_service.py

Tasks:

  • Create EntitlementService class
  • Implement check_module_access(company_id, module_code)
  • Implement check_feature_access(company_id, module_code, feature_code)
  • Implement check_capability_access(company_id, module_code, feature_code, capability_code)
  • Implement get_company_plan_level(company_id)
  • Implement get_feature_level(company_id, module_code)

Key Methods:

class EntitlementService:
def check_module_access(self, company_id: UUID, module_code: str) -> bool
def check_capability_access(self, company_id: UUID, module_code: str,
feature_code: str, capability_code: str) -> bool
def get_company_plan_level(self, company_id: UUID) -> Optional[PlanLevel]
def get_feature_level(self, company_id: UUID, module_code: str) -> Optional[str]

1.2 Create Permission Decorators

File: app/core/permissions.py

Tasks:

  • Create @require_module(module_code) decorator
  • Create @require_capability(module_code, feature_code, capability_code) decorator
  • Add proper error handling with upgrade messages

Example:

@require_module("CHEMIQ")
async def list_sds(...):
pass

@require_capability("CHEMIQ", "SDS_BINDER", "BULK_UPLOAD")
async def bulk_upload_sds(...):
pass

1.3 Update Authentication Context

File: app/core/auth.py

Tasks:

  • Ensure company_id is available in request context
  • Add get_current_company_id() dependency
  • Handle multi-company users (consultants)

Phase 2: Limit Enforcement (Week 2-3)

2.1 Extend EntitlementService for Limits

Tasks:

  • Implement get_limit_value(company_id, entitlement_code)
  • Implement check_limit(company_id, entitlement_code, current_usage)
  • Implement get_usage_count(company_id, entitlement_code, time_period)
  • Add usage calculator methods:
    • _get_sds_upload_count()
    • _get_user_count()
    • _get_site_count()
    • _get_storage_usage_gb()
    • _get_api_call_count()
    • _get_chemical_count()

Example:

def check_limit(self, company_id: UUID, entitlement_code: str,
current_usage: int) -> Dict[str, any]:
return {
"allowed": bool,
"limit": int,
"current": int,
"remaining": int,
"unit": str
}

2.2 Create Limit Decorators

File: app/core/permissions.py

Tasks:

  • Create @enforce_limit(entitlement_code) decorator
  • Create @check_limit_only(entitlement_code) decorator
  • Add limit info to response headers (X-Limit, X-Current, X-Remaining)

Example:

@enforce_limit("LIMIT_SDS_UPLOADS")  # Blocks if limit exceeded
async def upload_sds(...):
pass

@check_limit_only("LIMIT_SDS_UPLOADS") # Just adds headers
async def list_sds(...):
pass

2.3 Database Migration for Limits

File: alembic/versions/YYYY_MM_DD_HHMM_add_limit_entitlements.py

Tasks:

  • Add limit entitlement definitions:
    • LIMIT_SDS_UPLOADS
    • LIMIT_USERS
    • LIMIT_SITES
    • LIMIT_API_CALLS
    • LIMIT_STORAGE_GB
    • LIMIT_CHEMICALS
    • LIMIT_INCIDENTS
    • LIMIT_TRAINING_COURSES
  • Seed plan_entitlements with tier-specific limits:
    • Starter: Lower limits
    • Standard: Medium limits
    • Pro: NULL (unlimited)

Example Limits:

EntitlementStarterStandardPro
SDS Uploads100500
Users525
Sites110
API Calls (per month)1001,00010,000
Storage (GB)550500

Phase 3: API Integration (Week 3-4)

3.1 Update Existing Routes

Tasks:

  • Identify all API routes that need protection
  • Add @require_module() decorators
  • Add @require_capability() decorators where appropriate
  • Add @enforce_limit() decorators to creation endpoints

Priority Routes:

CHEMIQ Module (app/api/v1/chemiq.py):

  • POST /sds/upload - Add @enforce_limit("LIMIT_SDS_UPLOADS")
  • POST /sds/bulk-upload - Add @require_capability() + @enforce_limit()
  • POST /sds/ai-extract - Add @require_capability()
  • GET /sds - Add @check_limit_only()
  • GET /inventory - Add plan-aware response

ADMINHQ Module (app/api/v1/users.py):

  • POST /users/invite - Add @enforce_limit("LIMIT_USERS")
  • POST /sites - Add @enforce_limit("LIMIT_SITES")

API Endpoints (app/api/v1/external.py):

  • All API routes - Add @enforce_limit("LIMIT_API_CALLS")

3.2 Create Entitlement API Endpoints

File: app/api/v1/entitlements.py

Tasks:

  • GET /entitlements/check-module/{module_code}
  • GET /entitlements/check-capability/{module}/{feature}/{capability}
  • GET /entitlements/feature-level/{module_code}
  • GET /entitlements/limit/{entitlement_code}
  • GET /entitlements/my-plan - Return current plan with all entitlements

Example:

@router.get("/check-module/{module_code}")
async def check_module_access(
module_code: str,
company_id: UUID = Depends(get_current_company_id),
db: Session = Depends(get_db)
):
service = EntitlementService(db)
has_access = service.check_module_access(company_id, module_code)
return {"has_access": has_access}

Phase 4: Frontend Integration (Week 4-5)

4.1 Create Frontend Entitlement Service

File: src/services/entitlements.ts

Tasks:

  • Create entitlementService.checkModule(moduleCode)
  • Create entitlementService.checkCapability(module, feature, capability)
  • Create entitlementService.getFeatureLevel(moduleCode)
  • Create entitlementService.checkLimit(entitlementCode)
  • Create entitlementService.parseLimitHeaders(headers)

Example:

export const entitlementService = {
async checkModule(moduleCode: string): Promise<boolean>,
async checkCapability(module: string, feature: string, capability: string): Promise<boolean>,
async checkLimit(entitlementCode: string): Promise<LimitInfo>,
parseLimitHeaders(headers: Headers): LimitInfo | null
};

4.2 Create React Hooks

File: src/hooks/useEntitlements.ts

Tasks:

  • Create useModuleAccess(moduleCode) hook
  • Create useCapabilityAccess(module, feature, capability) hook
  • Create useLimitInfo(entitlementCode) hook
  • Create usePlanLevel() hook

Example:

export function useModuleAccess(moduleCode: string) {
const [hasAccess, setHasAccess] = useState(false);
// ... implementation
return hasAccess;
}

export function useLimitInfo(entitlementCode: string) {
const [limitInfo, setLimitInfo] = useState<LimitInfo | null>(null);
// ... implementation
return limitInfo;
}

4.3 Update UI Components

Tasks:

  • Show/hide features based on plan tier
  • Display usage limits and current usage
  • Show upgrade prompts for locked features
  • Add plan tier badges to features

Example Component:

export function SDSUploadButton() {
const canBulkUpload = useCapabilityAccess('CHEMIQ', 'SDS_BINDER', 'BULK_UPLOAD');
const canUseAI = useCapabilityAccess('CHEMIQ', 'SDS_BINDER', 'AI_EXTRACTION');
const limitInfo = useLimitInfo('LIMIT_SDS_UPLOADS');

return (
<div>
<Button>Upload Single SDS</Button>

{canBulkUpload ? (
<Button>Bulk Upload</Button>
) : (
<Tooltip content="Upgrade to Standard for bulk upload">
<Button disabled>Bulk Upload (Standard+)</Button>
</Tooltip>
)}

{canUseAI && <Badge>AI Extraction Available</Badge>}

{limitInfo && (
<div className="text-sm">
{limitInfo.current} / {limitInfo.limit || '∞'} uploads
{limitInfo.remaining && limitInfo.remaining < 10 && (
<Badge variant="warning">Near limit - Upgrade recommended</Badge>
)}
</div>
)}
</div>
);
}

Phase 5: Data Migration (Week 5)

5.1 Seed Capability Data from tabular.csv

File: alembic/versions/YYYY_MM_DD_HHMM_seed_capabilities_from_csv.py

Tasks:

  • Parse docs/tabular.csv
  • Create entitlement definitions for each capability
  • Link capabilities to appropriate plan tiers
  • Verify data integrity

Example:

def upgrade():
import csv
from pathlib import Path

csv_path = Path(__file__).parent.parent.parent / "docs" / "tabular.csv"

with open(csv_path, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
module = row['Module']
feature = row['Feature Code']
capability = row['Feature Name']

# Determine min plan level based on which column has content
if row['Starter']:
min_level = 'starter'
elif row['Standard']:
min_level = 'standard'
elif row['Pro']:
min_level = 'pro'
else:
continue

# Create entitlement definition
code = f"{module}_{feature}_{capability.upper().replace(' ', '_')}"
# INSERT INTO entitlement_definitions ...

5.2 Create Company Override Admin UI

File: src/pages/admin/CompanyEntitlementOverrides.tsx

Tasks:

  • List all companies with their current plans
  • Allow admins to create/edit/delete overrides
  • Show override history and reasons
  • Add audit trail

Phase 6: Testing (Week 6)

6.1 Unit Tests

Tasks:

  • Test EntitlementService methods
  • Test permission decorators
  • Test limit enforcement logic
  • Test usage calculators

File: tests/services/test_entitlement_service.py

6.2 Integration Tests

Tasks:

  • Test API routes with different plan tiers
  • Test limit enforcement on creation endpoints
  • Test company overrides
  • Test multi-company scenarios (consultants)

File: tests/api/test_entitlements.py

6.3 Frontend Tests

Tasks:

  • Test entitlement service functions
  • Test React hooks
  • Test UI component behavior based on entitlements

File: src/services/__tests__/entitlements.test.ts


Phase 7: Documentation & Rollout (Week 6-7)

7.1 Internal Documentation

Tasks:

  • Document entitlement architecture (this file)
  • Create developer guide for adding new entitlements
  • Document limit types and usage
  • Create troubleshooting guide

7.2 User-Facing Documentation

Tasks:

  • Update plan comparison page
  • Document feature availability by tier
  • Create upgrade guide
  • Add FAQ about limits

7.3 Rollout Plan

Tasks:

  • Deploy to development environment
  • Test with real customer data (anonymized)
  • Deploy to staging
  • Run smoke tests
  • Deploy to production (gradual rollout)
  • Monitor for errors/issues
  • Gather user feedback

Code Examples

1. EntitlementService Implementation

File: app/services/entitlement_service.py

"""Entitlement checking service with limit support"""

from sqlalchemy.orm import Session
from typing import Optional, Dict
from uuid import UUID
from datetime import datetime, timedelta
from enum import Enum

from app.db.models.plan import PlanVersion, PlanEntitlement
from app.db.models.module import Module
from app.db.models.entitlement import EntitlementDefinition, CompanyEntitlementOverride
from app.db.models.company import Company


class EntitlementType(str, Enum):
FEATURE = "feature"
LIMIT = "limit"


class PlanLevel(str, Enum):
STARTER = "starter"
STANDARD = "standard"
PRO = "pro"


class LimitUnit(str, Enum):
COUNT = "count"
PER_MONTH = "per_month"
PER_DAY = "per_day"
PER_MINUTE = "per_minute"
CONCURRENT = "concurrent"


class EntitlementService:
"""Service for checking entitlements and enforcing limits"""

def __init__(self, db: Session):
self.db = db

def get_company_plan_level(self, company_id: UUID) -> Optional[PlanLevel]:
"""Get the active plan level for a company"""
company = self.db.query(Company).filter(Company.company_id == company_id).first()
if not company or not company.subscription:
return None

plan_version = (
self.db.query(PlanVersion)
.filter(
PlanVersion.plan_version_id == company.subscription.plan_version_id,
PlanVersion.is_active == True
)
.first()
)

if not plan_version or not plan_version.plan:
return None

plan_code = plan_version.plan.code.upper()
if plan_code == "STARTER":
return PlanLevel.STARTER
elif plan_code == "STANDARD":
return PlanLevel.STANDARD
elif plan_code == "PRO":
return PlanLevel.PRO

return None

def check_module_access(self, company_id: UUID, module_code: str) -> bool:
"""Check if company has access to a module"""
company = self.db.query(Company).filter(Company.company_id == company_id).first()
if not company:
return False

module = self.db.query(Module).filter(Module.code == module_code).first()
if not module:
return False

# REQUIRED modules are always accessible
if module.category == "REQUIRED":
return True

# Check plan entitlements
if not company.subscription:
return False

plan_version = (
self.db.query(PlanVersion)
.filter(
PlanVersion.plan_version_id == company.subscription.plan_version_id,
PlanVersion.is_active == True
)
.first()
)

if not plan_version:
return False

has_entitlement = (
self.db.query(PlanEntitlement)
.filter(
PlanEntitlement.plan_version_id == plan_version.plan_version_id,
PlanEntitlement.module_id == module.module_id,
PlanEntitlement.feature_enabled == True
)
.first() is not None
)

return has_entitlement

def check_capability_access(
self,
company_id: UUID,
module_code: str,
feature_code: str,
capability_code: str
) -> bool:
"""Check if company's plan level allows access to a specific capability"""
# First check module access
if not self.check_module_access(company_id, module_code):
return False

# Get company's plan level
plan_level = self.get_company_plan_level(company_id)
if not plan_level:
return False

# Build entitlement code from components
# e.g., "CHEMIQ_SDS_BINDER_BULK_UPLOAD"
entitlement_code = f"{module_code}_{feature_code}_{capability_code}"

# Get the capability's minimum required plan level
entitlement = (
self.db.query(EntitlementDefinition)
.filter(EntitlementDefinition.code == entitlement_code)
.first()
)

if not entitlement:
# If capability not found, allow by default (backward compatibility)
return True

# Check if capability is in plan entitlements
if not company.subscription:
return False

plan_version = (
self.db.query(PlanVersion)
.filter(
PlanVersion.plan_version_id == company.subscription.plan_version_id,
PlanVersion.is_active == True
)
.first()
)

if not plan_version:
return False

plan_entitlement = (
self.db.query(PlanEntitlement)
.filter(
PlanEntitlement.plan_version_id == plan_version.plan_version_id,
PlanEntitlement.entitlement_id == entitlement.entitlement_id,
PlanEntitlement.feature_enabled == True
)
.first()
)

return plan_entitlement is not None

def get_limit_value(self, company_id: UUID, entitlement_code: str) -> Optional[int]:
"""
Get the limit value for a specific entitlement.
Checks company overrides first, then plan entitlements.
Returns None for unlimited.
"""
# First check for company-specific override
override = (
self.db.query(CompanyEntitlementOverride)
.join(EntitlementDefinition)
.filter(
CompanyEntitlementOverride.company_id == company_id,
EntitlementDefinition.code == entitlement_code
)
.first()
)

if override and override.limit_value is not None:
return override.limit_value

# Otherwise get from plan
company = self.db.query(Company).filter(Company.company_id == company_id).first()
if not company or not company.subscription:
return None

plan_entitlement = (
self.db.query(PlanEntitlement)
.join(EntitlementDefinition)
.filter(
PlanEntitlement.plan_version_id == company.subscription.plan_version_id,
EntitlementDefinition.code == entitlement_code
)
.first()
)

return plan_entitlement.limit_value if plan_entitlement else None

def check_limit(
self,
company_id: UUID,
entitlement_code: str,
current_usage: int
) -> Dict[str, any]:
"""
Check if current usage is within limits.

Returns:
{
"allowed": bool,
"limit": int | None,
"current": int,
"remaining": int | None,
"unit": str | None
}
"""
entitlement = (
self.db.query(EntitlementDefinition)
.filter(EntitlementDefinition.code == entitlement_code)
.first()
)

if not entitlement or entitlement.type != EntitlementType.LIMIT:
# If not a limit entitlement, allow unlimited
return {
"allowed": True,
"limit": None,
"current": current_usage,
"remaining": None,
"unit": None
}

limit = self.get_limit_value(company_id, entitlement_code)

if limit is None:
# No limit set = unlimited
return {
"allowed": True,
"limit": None,
"current": current_usage,
"remaining": None,
"unit": entitlement.unit
}

allowed = current_usage < limit
remaining = max(0, limit - current_usage)

return {
"allowed": allowed,
"limit": limit,
"current": current_usage,
"remaining": remaining,
"unit": entitlement.unit
}

def get_usage_count(
self,
company_id: UUID,
entitlement_code: str,
time_period: Optional[timedelta] = None
) -> int:
"""
Get current usage count for a specific entitlement.
Delegates to specific usage calculators.
"""
usage_calculators = {
"LIMIT_SDS_UPLOADS": self._get_sds_upload_count,
"LIMIT_USERS": self._get_user_count,
"LIMIT_SITES": self._get_site_count,
"LIMIT_STORAGE_GB": self._get_storage_usage_gb,
"LIMIT_API_CALLS": self._get_api_call_count,
"LIMIT_CHEMICALS": self._get_chemical_count,
}

calculator = usage_calculators.get(entitlement_code)
if not calculator:
return 0

return calculator(company_id, time_period)

# Usage calculation methods

def _get_sds_upload_count(self, company_id: UUID, time_period: Optional[timedelta] = None) -> int:
"""Count SDS uploads, optionally within a time period"""
# TODO: Implement based on your SDS model
return 0

def _get_user_count(self, company_id: UUID, time_period: Optional[timedelta] = None) -> int:
"""Count active users in company"""
# TODO: Implement based on your User model
return 0

def _get_site_count(self, company_id: UUID, time_period: Optional[timedelta] = None) -> int:
"""Count sites in company"""
# TODO: Implement based on your Site model
return 0

def _get_storage_usage_gb(self, company_id: UUID, time_period: Optional[timedelta] = None) -> int:
"""Calculate total storage usage in GB"""
# TODO: Implement based on file storage
return 0

def _get_api_call_count(self, company_id: UUID, time_period: Optional[timedelta] = None) -> int:
"""Count API calls, typically within last month or minute"""
# TODO: Implement based on API logging
return 0

def _get_chemical_count(self, company_id: UUID, time_period: Optional[timedelta] = None) -> int:
"""Count chemicals in inventory"""
# TODO: Implement based on Chemical model
return 0

2. Permission Decorators

File: app/core/permissions.py

"""Permission decorators for API routes"""

from functools import wraps
from fastapi import HTTPException, status, Depends
from uuid import UUID
from datetime import timedelta

from app.services.entitlement_service import EntitlementService, LimitUnit, EntitlementDefinition
from app.db.session import get_db


def require_module(module_code: str):
"""Decorator to require module access"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
db = kwargs.get('db') or next(get_db())
company_id = kwargs.get('company_id')

if not company_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Company context not found"
)

entitlement_service = EntitlementService(db)

if not entitlement_service.check_module_access(company_id, module_code):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Access to {module_code} module not allowed for your plan. "
f"Please upgrade to access this feature."
)

return await func(*args, **kwargs)
return wrapper
return decorator


def require_capability(module_code: str, feature_code: str, capability_code: str):
"""Decorator to require specific capability access"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
db = kwargs.get('db') or next(get_db())
company_id = kwargs.get('company_id')

if not company_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Company context not found"
)

entitlement_service = EntitlementService(db)

if not entitlement_service.check_capability_access(
company_id, module_code, feature_code, capability_code
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"This feature requires a higher plan tier. "
f"Please upgrade to access {capability_code}."
)

return await func(*args, **kwargs)
return wrapper
return decorator


def enforce_limit(entitlement_code: str):
"""
Decorator to enforce usage limits before allowing action.
Use this on creation endpoints (POST).
"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
db = kwargs.get('db') or next(get_db())
company_id = kwargs.get('company_id')

if not company_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Company context not found"
)

entitlement_service = EntitlementService(db)

# Determine time period based on unit
entitlement = db.query(EntitlementDefinition).filter(
EntitlementDefinition.code == entitlement_code
).first()

time_period = None
if entitlement and entitlement.unit:
if entitlement.unit == LimitUnit.PER_MONTH:
time_period = timedelta(days=30)
elif entitlement.unit == LimitUnit.PER_DAY:
time_period = timedelta(days=1)
elif entitlement.unit == LimitUnit.PER_MINUTE:
time_period = timedelta(minutes=1)

# Get current usage
current_usage = entitlement_service.get_usage_count(
company_id,
entitlement_code,
time_period
)

# Check limit
limit_check = entitlement_service.check_limit(
company_id,
entitlement_code,
current_usage
)

if not limit_check["allowed"]:
unit_text = f" per {limit_check['unit']}" if limit_check['unit'] else ""
raise HTTPException(
status_code=status.HTTP_402_PAYMENT_REQUIRED,
detail=f"Usage limit exceeded. Your plan allows {limit_check['limit']}{unit_text}. "
f"Current usage: {limit_check['current']}. "
f"Please upgrade your plan for higher limits.",
headers={
"X-Limit": str(limit_check['limit']),
"X-Current": str(limit_check['current']),
"X-Remaining": str(limit_check['remaining'])
}
)

# Add limit info to kwargs for the route handler
kwargs['_limit_info'] = limit_check

return await func(*args, **kwargs)
return wrapper
return decorator


def check_limit_only(entitlement_code: str):
"""
Decorator to check limits but not enforce.
Adds limit info to response headers.
Use this on GET endpoints to inform users of their usage.
"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
db = kwargs.get('db') or next(get_db())
company_id = kwargs.get('company_id')

if company_id:
entitlement_service = EntitlementService(db)

current_usage = entitlement_service.get_usage_count(
company_id,
entitlement_code
)

limit_check = entitlement_service.check_limit(
company_id,
entitlement_code,
current_usage
)

kwargs['_limit_info'] = limit_check

response = await func(*args, **kwargs)

# Add limit headers if response supports it
if hasattr(response, 'headers') and '_limit_info' in kwargs:
info = kwargs['_limit_info']
if info['limit'] is not None:
response.headers['X-Limit'] = str(info['limit'])
response.headers['X-Current'] = str(info['current'])
response.headers['X-Remaining'] = str(info['remaining'])

return response
return wrapper
return decorator

3. API Route Examples

File: app/api/v1/chemiq.py

from fastapi import APIRouter, Depends, HTTPException, Response, UploadFile
from sqlalchemy.orm import Session
from uuid import UUID
from typing import List

from app.db.session import get_db
from app.core.permissions import require_module, require_capability, enforce_limit, check_limit_only
from app.core.auth import get_current_company_id
from app.services.entitlement_service import EntitlementService


router = APIRouter(prefix="/chemiq", tags=["chemiq"])


@router.post("/sds/upload")
@require_module("CHEMIQ")
@enforce_limit("LIMIT_SDS_UPLOADS")
async def upload_sds(
file: UploadFile,
company_id: UUID = Depends(get_current_company_id),
db: Session = Depends(get_db),
_limit_info: dict = None
):
"""
Upload SDS file.
Limits:
- Starter: 100 total SDS
- Standard: 500 total SDS
- Pro: Unlimited
"""
# Upload logic here
# The decorator has already verified we're within limits

return {
"message": "SDS uploaded successfully",
"limit_info": _limit_info
}


@router.post("/sds/bulk-upload")
@require_module("CHEMIQ")
@require_capability("CHEMIQ", "SDS_BINDER", "BULK_UPLOAD")
@enforce_limit("LIMIT_SDS_UPLOADS")
async def bulk_upload_sds(
files: List[UploadFile],
company_id: UUID = Depends(get_current_company_id),
db: Session = Depends(get_db)
):
"""
Bulk upload SDS files.
Requires: Standard or Pro plan
"""
# Bulk upload logic
return {"message": f"Uploaded {len(files)} SDS files"}


@router.post("/sds/ai-extract")
@require_module("CHEMIQ")
@require_capability("CHEMIQ", "SDS_BINDER", "AI_EXTRACTION")
async def ai_extract_sds(
file_id: UUID,
company_id: UUID = Depends(get_current_company_id),
db: Session = Depends(get_db)
):
"""
AI extraction from SDS.
Requires: Pro plan
"""
# AI extraction logic
return {"message": "AI extraction completed"}


@router.get("/sds")
@require_module("CHEMIQ")
@check_limit_only("LIMIT_SDS_UPLOADS")
async def list_sds(
response: Response,
company_id: UUID = Depends(get_current_company_id),
db: Session = Depends(get_db),
_limit_info: dict = None
):
"""
List all SDS files.
Response includes usage limit headers.
"""
# List SDS files
sds_files = [] # Query your SDS

return {
"items": sds_files,
"count": len(sds_files),
"limit_info": _limit_info
}


@router.get("/inventory")
@require_module("CHEMIQ")
async def get_inventory(
company_id: UUID = Depends(get_current_company_id),
db: Session = Depends(get_db)
):
"""Get inventory with plan-aware features"""
entitlement_service = EntitlementService(db)

# Get feature level
feature_level = entitlement_service.get_feature_level(company_id, "CHEMIQ")

# Build response based on plan level
response = {
"items": [], # Basic inventory
"feature_level": feature_level
}

# Add plan-specific features
if entitlement_service.check_capability_access(
company_id, "CHEMIQ", "INVENTORY", "BARCODE_SCAN"
):
response["barcode_scan_enabled"] = True

if entitlement_service.check_capability_access(
company_id, "CHEMIQ", "INVENTORY", "USAGE_ANALYTICS"
):
response["analytics"] = {
# Analytics data
}

return response

4. Frontend Service

File: src/services/entitlements.ts

import { api } from './api';

export interface LimitInfo {
allowed: boolean;
limit: number | null;
current: number;
remaining: number | null;
unit: string | null;
}

export const entitlementService = {
async checkModule(moduleCode: string): Promise<boolean> {
try {
const response = await api.get(`/entitlements/check-module/${moduleCode}`);
return response.data.has_access;
} catch {
return false;
}
},

async checkCapability(
moduleCode: string,
featureCode: string,
capabilityCode: string
): Promise<boolean> {
try {
const response = await api.get(
`/entitlements/check-capability/${moduleCode}/${featureCode}/${capabilityCode}`
);
return response.data.has_access;
} catch {
return false;
}
},

async getFeatureLevel(moduleCode: string): Promise<string | null> {
try {
const response = await api.get(`/entitlements/feature-level/${moduleCode}`);
return response.data.feature_level;
} catch {
return null;
}
},

async checkLimit(entitlementCode: string): Promise<LimitInfo> {
const response = await api.get(`/entitlements/limit/${entitlementCode}`);
return response.data;
},

parseLimitHeaders(headers: Headers): LimitInfo | null {
const limit = headers.get('X-Limit');
const current = headers.get('X-Current');
const remaining = headers.get('X-Remaining');

if (!limit) return null;

return {
allowed: parseInt(remaining || '0') > 0,
limit: parseInt(limit),
current: parseInt(current || '0'),
remaining: parseInt(remaining || '0'),
unit: null
};
}
};

5. React Component Example

File: src/components/SDSUploadButton.tsx

import { useState, useEffect } from 'react';
import { entitlementService, LimitInfo } from '@/services/entitlements';
import { Button } from '@/components/ui/button';
import { Badge } from '@/components/ui/badge';
import { Tooltip } from '@/components/ui/tooltip';

export function SDSUploadButton() {
const [canBulkUpload, setCanBulkUpload] = useState(false);
const [canUseAI, setCanUseAI] = useState(false);
const [limitInfo, setLimitInfo] = useState<LimitInfo | null>(null);

useEffect(() => {
async function checkEntitlements() {
const [bulkAccess, aiAccess, limits] = await Promise.all([
entitlementService.checkCapability('CHEMIQ', 'SDS_BINDER', 'BULK_UPLOAD'),
entitlementService.checkCapability('CHEMIQ', 'SDS_BINDER', 'AI_EXTRACTION'),
entitlementService.checkLimit('LIMIT_SDS_UPLOADS')
]);

setCanBulkUpload(bulkAccess);
setCanUseAI(aiAccess);
setLimitInfo(limits);
}

checkEntitlements();
}, []);

const isNearLimit = limitInfo && limitInfo.limit &&
limitInfo.current >= limitInfo.limit * 0.9;

return (
<div className="space-y-4">
{/* Upload Buttons */}
<div className="flex gap-2">
<Button disabled={!limitInfo?.allowed}>
Upload Single SDS
</Button>

{canBulkUpload ? (
<Button variant="secondary">
Bulk Upload
</Button>
) : (
<Tooltip content="Upgrade to Standard for bulk upload">
<Button variant="secondary" disabled>
Bulk Upload <Badge variant="outline" className="ml-2">Standard+</Badge>
</Button>
</Tooltip>
)}
</div>

{/* AI Badge */}
{canUseAI && (
<Badge variant="success">
AI Extraction Available
</Badge>
)}

{/* Limit Info */}
{limitInfo && (
<div className="text-sm border-l-2 border-gray-300 pl-3">
{limitInfo.limit ? (
<div>
<span className={isNearLimit ? 'text-orange-600 font-medium' : 'text-gray-600'}>
{limitInfo.current} / {limitInfo.limit} SDS documents
</span>
{isNearLimit && (
<div className="mt-1">
<Badge variant="warning">
Near limit - Upgrade recommended
</Badge>
</div>
)}
</div>
) : (
<span className="text-gray-600">Unlimited SDS uploads</span>
)}
</div>
)}
</div>
);
}

Testing Strategy

Unit Tests

Test Coverage:

  • EntitlementService methods
  • Permission decorators
  • Usage calculators
  • Limit checks

Example Test (tests/services/test_entitlement_service.py):

import pytest
from uuid import uuid4
from app.services.entitlement_service import EntitlementService, PlanLevel


def test_check_module_access_required_module(db, test_company):
"""REQUIRED modules should always be accessible"""
service = EntitlementService(db)
assert service.check_module_access(test_company.company_id, "ADMINHQ") == True


def test_check_module_access_plan_based(db, starter_company, standard_company):
"""USER_OPTION modules depend on plan"""
service = EntitlementService(db)

# Starter has CHEMIQ
assert service.check_module_access(starter_company.company_id, "CHEMIQ") == True

# Starter does NOT have INCIDENTIQ
assert service.check_module_access(starter_company.company_id, "INCIDENTIQ") == False

# Standard has INCIDENTIQ
assert service.check_module_access(standard_company.company_id, "INCIDENTIQ") == True


def test_check_limit_within_limit(db, test_company):
"""Should allow action when within limit"""
service = EntitlementService(db)
result = service.check_limit(test_company.company_id, "LIMIT_SDS_UPLOADS", 50)

assert result["allowed"] == True
assert result["limit"] == 100 # Starter limit
assert result["current"] == 50
assert result["remaining"] == 50


def test_check_limit_exceeded(db, test_company):
"""Should deny action when limit exceeded"""
service = EntitlementService(db)
result = service.check_limit(test_company.company_id, "LIMIT_SDS_UPLOADS", 101)

assert result["allowed"] == False
assert result["limit"] == 100
assert result["current"] == 101
assert result["remaining"] == 0

Integration Tests

Test Coverage:

  • API routes with different plans
  • Decorator behavior
  • Company overrides
  • Multi-company scenarios

Example Test (tests/api/test_entitlements.py):

import pytest
from fastapi.testclient import TestClient


def test_upload_sds_within_limit(client: TestClient, starter_auth_headers):
"""Should allow upload when within limit"""
response = client.post(
"/api/v1/chemiq/sds/upload",
files={"file": ("test.pdf", b"fake pdf content")},
headers=starter_auth_headers
)

assert response.status_code == 200
assert "limit_info" in response.json()


def test_upload_sds_exceeds_limit(client: TestClient, starter_auth_headers):
"""Should deny upload when limit exceeded"""
# Mock current usage to be at limit
# ... setup code ...

response = client.post(
"/api/v1/chemiq/sds/upload",
files={"file": ("test.pdf", b"fake pdf content")},
headers=starter_auth_headers
)

assert response.status_code == 402 # Payment Required
assert "limit exceeded" in response.json()["detail"].lower()


def test_bulk_upload_requires_standard(client: TestClient, starter_auth_headers, standard_auth_headers):
"""Bulk upload should require Standard or Pro plan"""
# Starter should be denied
response = client.post(
"/api/v1/chemiq/sds/bulk-upload",
files=[("files", ("test1.pdf", b"content"))],
headers=starter_auth_headers
)
assert response.status_code == 403

# Standard should succeed
response = client.post(
"/api/v1/chemiq/sds/bulk-upload",
files=[("files", ("test1.pdf", b"content"))],
headers=standard_auth_headers
)
assert response.status_code == 200

Rollout Plan

Week 1-2: Development

  • Implement EntitlementService
  • Create permission decorators
  • Add unit tests

Week 3: Integration

  • Update API routes with decorators
  • Create entitlement API endpoints
  • Add integration tests

Week 4: Frontend

  • Create frontend entitlement service
  • Build React hooks
  • Update UI components

Week 5: Data Migration

  • Seed limit entitlements
  • Verify data integrity
  • Create admin override UI

Week 6: Testing

  • QA testing on staging
  • Performance testing
  • Security review

Week 7: Production Rollout

  • Deploy to production (gradual rollout)
  • Monitor errors and usage
  • Gather user feedback
  • Iterate based on feedback

Success Metrics

Technical Metrics

  • 100% of API routes have appropriate decorators
  • Less than 100ms average latency for entitlement checks
  • 0 entitlement bypass vulnerabilities
  • >95% test coverage for entitlement code

Business Metrics

  • Reduction in support tickets about feature access
  • Increase in plan upgrade conversions
  • Clear usage metrics per company
  • Reduced revenue leakage from unlimited usage

Appendix

Entitlement Code Naming Convention

Use the following pattern for entitlement codes:

Feature Entitlements:

  • Format: {MODULE}_{FEATURE}_{CAPABILITY}
  • Example: CHEMIQ_SDS_BINDER_BULK_UPLOAD
  • Example: CHEMIQ_INVENTORY_BARCODE_SCAN

Limit Entitlements:

  • Format: LIMIT_{RESOURCE}
  • Example: LIMIT_SDS_UPLOADS
  • Example: LIMIT_USERS
  • Example: LIMIT_API_CALLS

Common Entitlement Codes

CodeTypeDescription
LIMIT_SDS_UPLOADSlimitMax SDS documents
LIMIT_USERSlimitMax active users
LIMIT_SITESlimitMax sites/locations
LIMIT_API_CALLSlimitAPI rate limit
LIMIT_STORAGE_GBlimitStorage quota
CHEMIQ_SDS_BINDER_BULK_UPLOADfeatureBulk SDS upload
CHEMIQ_SDS_BINDER_AI_EXTRACTIONfeatureAI metadata extraction
CHEMIQ_INVENTORY_BARCODE_SCANfeatureBarcode scanning
CHEMIQ_INVENTORY_ANALYTICSfeatureUsage analytics

Contact & Support

For questions or issues during implementation:

  • Architecture Questions: Review this document or check docs/data_model/
  • Code Issues: Check existing implementations in app/services/
  • Testing: See tests/ directory for examples

Document Version: 1.0 Last Updated: 2025-01-07 Author: Claude (Anthropic) Status: Draft - Ready for Review