Entitlement Architecture & Implementation Plan
Table of Contents
- Overview
- Architecture Principles
- Data Model
- Three-Layer Permission Model
- Implementation Plan
- Code Examples
- Testing Strategy
- Rollout Plan
Overview
This document describes the comprehensive entitlement system for the Tellus EHS HazCom platform, which controls access to modules, features, and capabilities based on subscription plans (Starter, Standard, Pro) and enforces usage limits.
Key Concepts
- Module: Top-level functional area (e.g., CHEMIQ, PLAN, LABELS)
- Feature: Specific functionality within a module (e.g., SDS_BINDER, INVENTORY)
- Capability: Individual function within a feature (e.g., BULK_UPLOAD, AI_EXTRACTION)
- Entitlement: Permission or limit defined at plan level
- Limit: Usage constraint (e.g., max 100 SDS uploads, 5 users)
Current State
The existing data model has:
modulestable with categories (REQUIRED, USER_OPTION, UNDERLYING)plan_entitlementstable linking plans to modulesentitlement_definitionswith types:featureandlimitcompany_entitlement_overridesfor per-company exceptions
Gap Identified
The docs/tabular.csv file reveals feature-level and capability-level granularity that isn't currently tracked in the database. We need to extend the data model to support this.
Architecture Principles
- Centralized Control: All entitlement checks go through
EntitlementService - Explicit Over Implicit: Clear decorators make permissions visible in code
- Fail Secure: Default to denying access when entitlement is unclear
- User-Friendly: Provide clear upgrade paths in error messages
- Performance: Cache entitlement checks, minimize DB queries
- Overridable: Support company-specific overrides for special cases
- Auditable: Log all entitlement checks and limit violations
Data Model
Current Tables
-- Existing: modules table
CREATE TABLE modules (
module_id UUID PRIMARY KEY,
code VARCHAR(50) UNIQUE NOT NULL,
name VARCHAR(255) NOT NULL,
description TEXT,
category VARCHAR(20) NOT NULL, -- REQUIRED, USER_OPTION, UNDERLYING
parent_module_id UUID REFERENCES modules(module_id),
is_visible BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP DEFAULT NOW()
);
-- Existing: entitlement_definitions table
CREATE TABLE entitlement_definitions (
entitlement_id UUID PRIMARY KEY,
code VARCHAR(100) UNIQUE NOT NULL,
name VARCHAR(255) NOT NULL,
type VARCHAR(20) NOT NULL, -- 'feature' or 'limit'
unit VARCHAR(50), -- 'count', 'per_month', 'per_minute', etc.
description TEXT,
created_at TIMESTAMP DEFAULT NOW()
);
-- Existing: plan_entitlements table
CREATE TABLE plan_entitlements (
plan_entitlement_id UUID PRIMARY KEY,
plan_version_id UUID NOT NULL REFERENCES plan_versions(plan_version_id),
entitlement_id UUID NOT NULL REFERENCES entitlement_definitions(entitlement_id),
feature_enabled BOOLEAN,
limit_value BIGINT, -- NULL = unlimited
module_id UUID REFERENCES modules(module_id),
is_selectable BOOLEAN DEFAULT TRUE,
feature_level TEXT, -- 'basic', 'standard', 'advanced'
created_at TIMESTAMP DEFAULT NOW()
);
-- Existing: company_entitlement_overrides table
CREATE TABLE company_entitlement_overrides (
override_id UUID PRIMARY KEY,
company_id UUID NOT NULL REFERENCES companies(company_id),
entitlement_id UUID NOT NULL REFERENCES entitlement_definitions(entitlement_id),
feature_enabled BOOLEAN,
limit_value BIGINT,
reason TEXT,
created_at TIMESTAMP DEFAULT NOW()
);
New Tables (Phase 2 - Optional Enhancement)
-- New: module_features table (optional - for feature-level granularity)
CREATE TABLE module_features (
feature_id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
module_id UUID NOT NULL REFERENCES modules(module_id),
feature_code VARCHAR(100) NOT NULL, -- e.g., 'SDS_BINDER', 'INVENTORY'
feature_name VARCHAR(255) NOT NULL,
description TEXT,
created_at TIMESTAMP DEFAULT NOW(),
UNIQUE(module_id, feature_code)
);
-- New: feature_capabilities table (optional - for capability-level granularity)
CREATE TABLE feature_capabilities (
capability_id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
feature_id UUID NOT NULL REFERENCES module_features(feature_id),
capability_code VARCHAR(100) NOT NULL, -- e.g., 'BULK_UPLOAD', 'AI_EXTRACTION'
capability_name VARCHAR(255) NOT NULL,
description TEXT,
min_plan_level VARCHAR(20) NOT NULL, -- 'starter', 'standard', 'pro'
created_at TIMESTAMP DEFAULT NOW(),
UNIQUE(feature_id, capability_code)
);
Note: The new tables are optional for Phase 1. We can start with the existing schema and use naming conventions in entitlement_definitions.code to encode module/feature/capability hierarchy (e.g., CHEMIQ_SDS_BINDER_BULK_UPLOAD).
Three-Layer Permission Model
Layer 1: Module-Level Access
Question: Does the company have this module enabled in their plan?
Examples:
- CHEMIQ (Chemical Inventory & SDS Binder)
- PLAN (HazCom Plan Builder)
- LABELS (GHS Labeling)
Implementation:
@require_module("CHEMIQ")
async def list_sds(...):
# Only accessible if company has CHEMIQ module
pass
Layer 2: Feature-Level Access
Question: Does the company have access to this feature within the module?
Examples:
- CHEMIQ → SDS_BINDER
- CHEMIQ → INVENTORY
- CHEMIQ → CLASSIFICATION
Implementation:
# For most cases, if module is enabled, all features are accessible
# Feature-level checks are rare and usually handled implicitly
if not entitlement_service.check_feature_access(company_id, "CHEMIQ", "SDS_BINDER"):
raise HTTPException(403, "Feature not available")
Layer 3: Capability-Level Access (Plan Tier)
Question: Does the company's plan tier (Starter/Standard/Pro) support this capability?
Examples:
- Starter: Upload single SDS
- Standard: Bulk upload SDS
- Pro: AI metadata extraction from SDS
Implementation:
@require_capability("CHEMIQ", "SDS_BINDER", "BULK_UPLOAD")
async def bulk_upload_sds(...):
# Only accessible to Standard or Pro plans
pass
Layer 4: Usage Limits
Question: Has the company exceeded their usage limits?
Examples:
- Starter: 100 total SDS, 5 users, 1 site
- Standard: 500 SDS, 25 users, 10 sites
- Pro: Unlimited
Implementation:
@enforce_limit("LIMIT_SDS_UPLOADS")
async def upload_sds(...):
# Blocks if limit exceeded
pass
Implementation Plan
Phase 1: Core Infrastructure (Week 1-2)
1.1 Create EntitlementService
File: app/services/entitlement_service.py
Tasks:
- Create
EntitlementServiceclass - Implement
check_module_access(company_id, module_code) - Implement
check_feature_access(company_id, module_code, feature_code) - Implement
check_capability_access(company_id, module_code, feature_code, capability_code) - Implement
get_company_plan_level(company_id) - Implement
get_feature_level(company_id, module_code)
Key Methods:
class EntitlementService:
def check_module_access(self, company_id: UUID, module_code: str) -> bool
def check_capability_access(self, company_id: UUID, module_code: str,
feature_code: str, capability_code: str) -> bool
def get_company_plan_level(self, company_id: UUID) -> Optional[PlanLevel]
def get_feature_level(self, company_id: UUID, module_code: str) -> Optional[str]
1.2 Create Permission Decorators
File: app/core/permissions.py
Tasks:
- Create
@require_module(module_code)decorator - Create
@require_capability(module_code, feature_code, capability_code)decorator - Add proper error handling with upgrade messages
Example:
@require_module("CHEMIQ")
async def list_sds(...):
pass
@require_capability("CHEMIQ", "SDS_BINDER", "BULK_UPLOAD")
async def bulk_upload_sds(...):
pass
1.3 Update Authentication Context
File: app/core/auth.py
Tasks:
- Ensure
company_idis available in request context - Add
get_current_company_id()dependency - Handle multi-company users (consultants)
Phase 2: Limit Enforcement (Week 2-3)
2.1 Extend EntitlementService for Limits
Tasks:
- Implement
get_limit_value(company_id, entitlement_code) - Implement
check_limit(company_id, entitlement_code, current_usage) - Implement
get_usage_count(company_id, entitlement_code, time_period) - Add usage calculator methods:
-
_get_sds_upload_count() -
_get_user_count() -
_get_site_count() -
_get_storage_usage_gb() -
_get_api_call_count() -
_get_chemical_count()
-
Example:
def check_limit(self, company_id: UUID, entitlement_code: str,
current_usage: int) -> Dict[str, any]:
return {
"allowed": bool,
"limit": int,
"current": int,
"remaining": int,
"unit": str
}
2.2 Create Limit Decorators
File: app/core/permissions.py
Tasks:
- Create
@enforce_limit(entitlement_code)decorator - Create
@check_limit_only(entitlement_code)decorator - Add limit info to response headers (X-Limit, X-Current, X-Remaining)
Example:
@enforce_limit("LIMIT_SDS_UPLOADS") # Blocks if limit exceeded
async def upload_sds(...):
pass
@check_limit_only("LIMIT_SDS_UPLOADS") # Just adds headers
async def list_sds(...):
pass
2.3 Database Migration for Limits
File: alembic/versions/YYYY_MM_DD_HHMM_add_limit_entitlements.py
Tasks:
- Add limit entitlement definitions:
LIMIT_SDS_UPLOADSLIMIT_USERSLIMIT_SITESLIMIT_API_CALLSLIMIT_STORAGE_GBLIMIT_CHEMICALSLIMIT_INCIDENTSLIMIT_TRAINING_COURSES
- Seed
plan_entitlementswith tier-specific limits:- Starter: Lower limits
- Standard: Medium limits
- Pro: NULL (unlimited)
Example Limits:
| Entitlement | Starter | Standard | Pro |
|---|---|---|---|
| SDS Uploads | 100 | 500 | ∞ |
| Users | 5 | 25 | ∞ |
| Sites | 1 | 10 | ∞ |
| API Calls (per month) | 100 | 1,000 | 10,000 |
| Storage (GB) | 5 | 50 | 500 |
Phase 3: API Integration (Week 3-4)
3.1 Update Existing Routes
Tasks:
- Identify all API routes that need protection
- Add
@require_module()decorators - Add
@require_capability()decorators where appropriate - Add
@enforce_limit()decorators to creation endpoints
Priority Routes:
CHEMIQ Module (app/api/v1/chemiq.py):
-
POST /sds/upload- Add@enforce_limit("LIMIT_SDS_UPLOADS") -
POST /sds/bulk-upload- Add@require_capability()+@enforce_limit() -
POST /sds/ai-extract- Add@require_capability() -
GET /sds- Add@check_limit_only() -
GET /inventory- Add plan-aware response
ADMINHQ Module (app/api/v1/users.py):
-
POST /users/invite- Add@enforce_limit("LIMIT_USERS") -
POST /sites- Add@enforce_limit("LIMIT_SITES")
API Endpoints (app/api/v1/external.py):
- All API routes - Add
@enforce_limit("LIMIT_API_CALLS")
3.2 Create Entitlement API Endpoints
File: app/api/v1/entitlements.py
Tasks:
-
GET /entitlements/check-module/{module_code} -
GET /entitlements/check-capability/{module}/{feature}/{capability} -
GET /entitlements/feature-level/{module_code} -
GET /entitlements/limit/{entitlement_code} -
GET /entitlements/my-plan- Return current plan with all entitlements
Example:
@router.get("/check-module/{module_code}")
async def check_module_access(
module_code: str,
company_id: UUID = Depends(get_current_company_id),
db: Session = Depends(get_db)
):
service = EntitlementService(db)
has_access = service.check_module_access(company_id, module_code)
return {"has_access": has_access}
Phase 4: Frontend Integration (Week 4-5)
4.1 Create Frontend Entitlement Service
File: src/services/entitlements.ts
Tasks:
- Create
entitlementService.checkModule(moduleCode) - Create
entitlementService.checkCapability(module, feature, capability) - Create
entitlementService.getFeatureLevel(moduleCode) - Create
entitlementService.checkLimit(entitlementCode) - Create
entitlementService.parseLimitHeaders(headers)
Example:
export const entitlementService = {
async checkModule(moduleCode: string): Promise<boolean>,
async checkCapability(module: string, feature: string, capability: string): Promise<boolean>,
async checkLimit(entitlementCode: string): Promise<LimitInfo>,
parseLimitHeaders(headers: Headers): LimitInfo | null
};
4.2 Create React Hooks
File: src/hooks/useEntitlements.ts
Tasks:
- Create
useModuleAccess(moduleCode)hook - Create
useCapabilityAccess(module, feature, capability)hook - Create
useLimitInfo(entitlementCode)hook - Create
usePlanLevel()hook
Example:
export function useModuleAccess(moduleCode: string) {
const [hasAccess, setHasAccess] = useState(false);
// ... implementation
return hasAccess;
}
export function useLimitInfo(entitlementCode: string) {
const [limitInfo, setLimitInfo] = useState<LimitInfo | null>(null);
// ... implementation
return limitInfo;
}
4.3 Update UI Components
Tasks:
- Show/hide features based on plan tier
- Display usage limits and current usage
- Show upgrade prompts for locked features
- Add plan tier badges to features
Example Component:
export function SDSUploadButton() {
const canBulkUpload = useCapabilityAccess('CHEMIQ', 'SDS_BINDER', 'BULK_UPLOAD');
const canUseAI = useCapabilityAccess('CHEMIQ', 'SDS_BINDER', 'AI_EXTRACTION');
const limitInfo = useLimitInfo('LIMIT_SDS_UPLOADS');
return (
<div>
<Button>Upload Single SDS</Button>
{canBulkUpload ? (
<Button>Bulk Upload</Button>
) : (
<Tooltip content="Upgrade to Standard for bulk upload">
<Button disabled>Bulk Upload (Standard+)</Button>
</Tooltip>
)}
{canUseAI && <Badge>AI Extraction Available</Badge>}
{limitInfo && (
<div className="text-sm">
{limitInfo.current} / {limitInfo.limit || '∞'} uploads
{limitInfo.remaining && limitInfo.remaining < 10 && (
<Badge variant="warning">Near limit - Upgrade recommended</Badge>
)}
</div>
)}
</div>
);
}
Phase 5: Data Migration (Week 5)
5.1 Seed Capability Data from tabular.csv
File: alembic/versions/YYYY_MM_DD_HHMM_seed_capabilities_from_csv.py
Tasks:
- Parse
docs/tabular.csv - Create entitlement definitions for each capability
- Link capabilities to appropriate plan tiers
- Verify data integrity
Example:
def upgrade():
import csv
from pathlib import Path
csv_path = Path(__file__).parent.parent.parent / "docs" / "tabular.csv"
with open(csv_path, 'r') as f:
reader = csv.DictReader(f)
for row in reader:
module = row['Module']
feature = row['Feature Code']
capability = row['Feature Name']
# Determine min plan level based on which column has content
if row['Starter']:
min_level = 'starter'
elif row['Standard']:
min_level = 'standard'
elif row['Pro']:
min_level = 'pro'
else:
continue
# Create entitlement definition
code = f"{module}_{feature}_{capability.upper().replace(' ', '_')}"
# INSERT INTO entitlement_definitions ...
5.2 Create Company Override Admin UI
File: src/pages/admin/CompanyEntitlementOverrides.tsx
Tasks:
- List all companies with their current plans
- Allow admins to create/edit/delete overrides
- Show override history and reasons
- Add audit trail
Phase 6: Testing (Week 6)
6.1 Unit Tests
Tasks:
- Test
EntitlementServicemethods - Test permission decorators
- Test limit enforcement logic
- Test usage calculators
File: tests/services/test_entitlement_service.py
6.2 Integration Tests
Tasks:
- Test API routes with different plan tiers
- Test limit enforcement on creation endpoints
- Test company overrides
- Test multi-company scenarios (consultants)
File: tests/api/test_entitlements.py
6.3 Frontend Tests
Tasks:
- Test entitlement service functions
- Test React hooks
- Test UI component behavior based on entitlements
File: src/services/__tests__/entitlements.test.ts
Phase 7: Documentation & Rollout (Week 6-7)
7.1 Internal Documentation
Tasks:
- Document entitlement architecture (this file)
- Create developer guide for adding new entitlements
- Document limit types and usage
- Create troubleshooting guide
7.2 User-Facing Documentation
Tasks:
- Update plan comparison page
- Document feature availability by tier
- Create upgrade guide
- Add FAQ about limits
7.3 Rollout Plan
Tasks:
- Deploy to development environment
- Test with real customer data (anonymized)
- Deploy to staging
- Run smoke tests
- Deploy to production (gradual rollout)
- Monitor for errors/issues
- Gather user feedback
Code Examples
1. EntitlementService Implementation
File: app/services/entitlement_service.py
"""Entitlement checking service with limit support"""
from sqlalchemy.orm import Session
from typing import Optional, Dict
from uuid import UUID
from datetime import datetime, timedelta
from enum import Enum
from app.db.models.plan import PlanVersion, PlanEntitlement
from app.db.models.module import Module
from app.db.models.entitlement import EntitlementDefinition, CompanyEntitlementOverride
from app.db.models.company import Company
class EntitlementType(str, Enum):
FEATURE = "feature"
LIMIT = "limit"
class PlanLevel(str, Enum):
STARTER = "starter"
STANDARD = "standard"
PRO = "pro"
class LimitUnit(str, Enum):
COUNT = "count"
PER_MONTH = "per_month"
PER_DAY = "per_day"
PER_MINUTE = "per_minute"
CONCURRENT = "concurrent"
class EntitlementService:
"""Service for checking entitlements and enforcing limits"""
def __init__(self, db: Session):
self.db = db
def get_company_plan_level(self, company_id: UUID) -> Optional[PlanLevel]:
"""Get the active plan level for a company"""
company = self.db.query(Company).filter(Company.company_id == company_id).first()
if not company or not company.subscription:
return None
plan_version = (
self.db.query(PlanVersion)
.filter(
PlanVersion.plan_version_id == company.subscription.plan_version_id,
PlanVersion.is_active == True
)
.first()
)
if not plan_version or not plan_version.plan:
return None
plan_code = plan_version.plan.code.upper()
if plan_code == "STARTER":
return PlanLevel.STARTER
elif plan_code == "STANDARD":
return PlanLevel.STANDARD
elif plan_code == "PRO":
return PlanLevel.PRO
return None
def check_module_access(self, company_id: UUID, module_code: str) -> bool:
"""Check if company has access to a module"""
company = self.db.query(Company).filter(Company.company_id == company_id).first()
if not company:
return False
module = self.db.query(Module).filter(Module.code == module_code).first()
if not module:
return False
# REQUIRED modules are always accessible
if module.category == "REQUIRED":
return True
# Check plan entitlements
if not company.subscription:
return False
plan_version = (
self.db.query(PlanVersion)
.filter(
PlanVersion.plan_version_id == company.subscription.plan_version_id,
PlanVersion.is_active == True
)
.first()
)
if not plan_version:
return False
has_entitlement = (
self.db.query(PlanEntitlement)
.filter(
PlanEntitlement.plan_version_id == plan_version.plan_version_id,
PlanEntitlement.module_id == module.module_id,
PlanEntitlement.feature_enabled == True
)
.first() is not None
)
return has_entitlement
def check_capability_access(
self,
company_id: UUID,
module_code: str,
feature_code: str,
capability_code: str
) -> bool:
"""Check if company's plan level allows access to a specific capability"""
# First check module access
if not self.check_module_access(company_id, module_code):
return False
# Get company's plan level
plan_level = self.get_company_plan_level(company_id)
if not plan_level:
return False
# Build entitlement code from components
# e.g., "CHEMIQ_SDS_BINDER_BULK_UPLOAD"
entitlement_code = f"{module_code}_{feature_code}_{capability_code}"
# Get the capability's minimum required plan level
entitlement = (
self.db.query(EntitlementDefinition)
.filter(EntitlementDefinition.code == entitlement_code)
.first()
)
if not entitlement:
# If capability not found, allow by default (backward compatibility)
return True
# Check if capability is in plan entitlements
if not company.subscription:
return False
plan_version = (
self.db.query(PlanVersion)
.filter(
PlanVersion.plan_version_id == company.subscription.plan_version_id,
PlanVersion.is_active == True
)
.first()
)
if not plan_version:
return False
plan_entitlement = (
self.db.query(PlanEntitlement)
.filter(
PlanEntitlement.plan_version_id == plan_version.plan_version_id,
PlanEntitlement.entitlement_id == entitlement.entitlement_id,
PlanEntitlement.feature_enabled == True
)
.first()
)
return plan_entitlement is not None
def get_limit_value(self, company_id: UUID, entitlement_code: str) -> Optional[int]:
"""
Get the limit value for a specific entitlement.
Checks company overrides first, then plan entitlements.
Returns None for unlimited.
"""
# First check for company-specific override
override = (
self.db.query(CompanyEntitlementOverride)
.join(EntitlementDefinition)
.filter(
CompanyEntitlementOverride.company_id == company_id,
EntitlementDefinition.code == entitlement_code
)
.first()
)
if override and override.limit_value is not None:
return override.limit_value
# Otherwise get from plan
company = self.db.query(Company).filter(Company.company_id == company_id).first()
if not company or not company.subscription:
return None
plan_entitlement = (
self.db.query(PlanEntitlement)
.join(EntitlementDefinition)
.filter(
PlanEntitlement.plan_version_id == company.subscription.plan_version_id,
EntitlementDefinition.code == entitlement_code
)
.first()
)
return plan_entitlement.limit_value if plan_entitlement else None
def check_limit(
self,
company_id: UUID,
entitlement_code: str,
current_usage: int
) -> Dict[str, any]:
"""
Check if current usage is within limits.
Returns:
{
"allowed": bool,
"limit": int | None,
"current": int,
"remaining": int | None,
"unit": str | None
}
"""
entitlement = (
self.db.query(EntitlementDefinition)
.filter(EntitlementDefinition.code == entitlement_code)
.first()
)
if not entitlement or entitlement.type != EntitlementType.LIMIT:
# If not a limit entitlement, allow unlimited
return {
"allowed": True,
"limit": None,
"current": current_usage,
"remaining": None,
"unit": None
}
limit = self.get_limit_value(company_id, entitlement_code)
if limit is None:
# No limit set = unlimited
return {
"allowed": True,
"limit": None,
"current": current_usage,
"remaining": None,
"unit": entitlement.unit
}
allowed = current_usage < limit
remaining = max(0, limit - current_usage)
return {
"allowed": allowed,
"limit": limit,
"current": current_usage,
"remaining": remaining,
"unit": entitlement.unit
}
def get_usage_count(
self,
company_id: UUID,
entitlement_code: str,
time_period: Optional[timedelta] = None
) -> int:
"""
Get current usage count for a specific entitlement.
Delegates to specific usage calculators.
"""
usage_calculators = {
"LIMIT_SDS_UPLOADS": self._get_sds_upload_count,
"LIMIT_USERS": self._get_user_count,
"LIMIT_SITES": self._get_site_count,
"LIMIT_STORAGE_GB": self._get_storage_usage_gb,
"LIMIT_API_CALLS": self._get_api_call_count,
"LIMIT_CHEMICALS": self._get_chemical_count,
}
calculator = usage_calculators.get(entitlement_code)
if not calculator:
return 0
return calculator(company_id, time_period)
# Usage calculation methods
def _get_sds_upload_count(self, company_id: UUID, time_period: Optional[timedelta] = None) -> int:
"""Count SDS uploads, optionally within a time period"""
# TODO: Implement based on your SDS model
return 0
def _get_user_count(self, company_id: UUID, time_period: Optional[timedelta] = None) -> int:
"""Count active users in company"""
# TODO: Implement based on your User model
return 0
def _get_site_count(self, company_id: UUID, time_period: Optional[timedelta] = None) -> int:
"""Count sites in company"""
# TODO: Implement based on your Site model
return 0
def _get_storage_usage_gb(self, company_id: UUID, time_period: Optional[timedelta] = None) -> int:
"""Calculate total storage usage in GB"""
# TODO: Implement based on file storage
return 0
def _get_api_call_count(self, company_id: UUID, time_period: Optional[timedelta] = None) -> int:
"""Count API calls, typically within last month or minute"""
# TODO: Implement based on API logging
return 0
def _get_chemical_count(self, company_id: UUID, time_period: Optional[timedelta] = None) -> int:
"""Count chemicals in inventory"""
# TODO: Implement based on Chemical model
return 0
2. Permission Decorators
File: app/core/permissions.py
"""Permission decorators for API routes"""
from functools import wraps
from fastapi import HTTPException, status, Depends
from uuid import UUID
from datetime import timedelta
from app.services.entitlement_service import EntitlementService, LimitUnit, EntitlementDefinition
from app.db.session import get_db
def require_module(module_code: str):
"""Decorator to require module access"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
db = kwargs.get('db') or next(get_db())
company_id = kwargs.get('company_id')
if not company_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Company context not found"
)
entitlement_service = EntitlementService(db)
if not entitlement_service.check_module_access(company_id, module_code):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Access to {module_code} module not allowed for your plan. "
f"Please upgrade to access this feature."
)
return await func(*args, **kwargs)
return wrapper
return decorator
def require_capability(module_code: str, feature_code: str, capability_code: str):
"""Decorator to require specific capability access"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
db = kwargs.get('db') or next(get_db())
company_id = kwargs.get('company_id')
if not company_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Company context not found"
)
entitlement_service = EntitlementService(db)
if not entitlement_service.check_capability_access(
company_id, module_code, feature_code, capability_code
):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"This feature requires a higher plan tier. "
f"Please upgrade to access {capability_code}."
)
return await func(*args, **kwargs)
return wrapper
return decorator
def enforce_limit(entitlement_code: str):
"""
Decorator to enforce usage limits before allowing action.
Use this on creation endpoints (POST).
"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
db = kwargs.get('db') or next(get_db())
company_id = kwargs.get('company_id')
if not company_id:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Company context not found"
)
entitlement_service = EntitlementService(db)
# Determine time period based on unit
entitlement = db.query(EntitlementDefinition).filter(
EntitlementDefinition.code == entitlement_code
).first()
time_period = None
if entitlement and entitlement.unit:
if entitlement.unit == LimitUnit.PER_MONTH:
time_period = timedelta(days=30)
elif entitlement.unit == LimitUnit.PER_DAY:
time_period = timedelta(days=1)
elif entitlement.unit == LimitUnit.PER_MINUTE:
time_period = timedelta(minutes=1)
# Get current usage
current_usage = entitlement_service.get_usage_count(
company_id,
entitlement_code,
time_period
)
# Check limit
limit_check = entitlement_service.check_limit(
company_id,
entitlement_code,
current_usage
)
if not limit_check["allowed"]:
unit_text = f" per {limit_check['unit']}" if limit_check['unit'] else ""
raise HTTPException(
status_code=status.HTTP_402_PAYMENT_REQUIRED,
detail=f"Usage limit exceeded. Your plan allows {limit_check['limit']}{unit_text}. "
f"Current usage: {limit_check['current']}. "
f"Please upgrade your plan for higher limits.",
headers={
"X-Limit": str(limit_check['limit']),
"X-Current": str(limit_check['current']),
"X-Remaining": str(limit_check['remaining'])
}
)
# Add limit info to kwargs for the route handler
kwargs['_limit_info'] = limit_check
return await func(*args, **kwargs)
return wrapper
return decorator
def check_limit_only(entitlement_code: str):
"""
Decorator to check limits but not enforce.
Adds limit info to response headers.
Use this on GET endpoints to inform users of their usage.
"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
db = kwargs.get('db') or next(get_db())
company_id = kwargs.get('company_id')
if company_id:
entitlement_service = EntitlementService(db)
current_usage = entitlement_service.get_usage_count(
company_id,
entitlement_code
)
limit_check = entitlement_service.check_limit(
company_id,
entitlement_code,
current_usage
)
kwargs['_limit_info'] = limit_check
response = await func(*args, **kwargs)
# Add limit headers if response supports it
if hasattr(response, 'headers') and '_limit_info' in kwargs:
info = kwargs['_limit_info']
if info['limit'] is not None:
response.headers['X-Limit'] = str(info['limit'])
response.headers['X-Current'] = str(info['current'])
response.headers['X-Remaining'] = str(info['remaining'])
return response
return wrapper
return decorator
3. API Route Examples
File: app/api/v1/chemiq.py
from fastapi import APIRouter, Depends, HTTPException, Response, UploadFile
from sqlalchemy.orm import Session
from uuid import UUID
from typing import List
from app.db.session import get_db
from app.core.permissions import require_module, require_capability, enforce_limit, check_limit_only
from app.core.auth import get_current_company_id
from app.services.entitlement_service import EntitlementService
router = APIRouter(prefix="/chemiq", tags=["chemiq"])
@router.post("/sds/upload")
@require_module("CHEMIQ")
@enforce_limit("LIMIT_SDS_UPLOADS")
async def upload_sds(
file: UploadFile,
company_id: UUID = Depends(get_current_company_id),
db: Session = Depends(get_db),
_limit_info: dict = None
):
"""
Upload SDS file.
Limits:
- Starter: 100 total SDS
- Standard: 500 total SDS
- Pro: Unlimited
"""
# Upload logic here
# The decorator has already verified we're within limits
return {
"message": "SDS uploaded successfully",
"limit_info": _limit_info
}
@router.post("/sds/bulk-upload")
@require_module("CHEMIQ")
@require_capability("CHEMIQ", "SDS_BINDER", "BULK_UPLOAD")
@enforce_limit("LIMIT_SDS_UPLOADS")
async def bulk_upload_sds(
files: List[UploadFile],
company_id: UUID = Depends(get_current_company_id),
db: Session = Depends(get_db)
):
"""
Bulk upload SDS files.
Requires: Standard or Pro plan
"""
# Bulk upload logic
return {"message": f"Uploaded {len(files)} SDS files"}
@router.post("/sds/ai-extract")
@require_module("CHEMIQ")
@require_capability("CHEMIQ", "SDS_BINDER", "AI_EXTRACTION")
async def ai_extract_sds(
file_id: UUID,
company_id: UUID = Depends(get_current_company_id),
db: Session = Depends(get_db)
):
"""
AI extraction from SDS.
Requires: Pro plan
"""
# AI extraction logic
return {"message": "AI extraction completed"}
@router.get("/sds")
@require_module("CHEMIQ")
@check_limit_only("LIMIT_SDS_UPLOADS")
async def list_sds(
response: Response,
company_id: UUID = Depends(get_current_company_id),
db: Session = Depends(get_db),
_limit_info: dict = None
):
"""
List all SDS files.
Response includes usage limit headers.
"""
# List SDS files
sds_files = [] # Query your SDS
return {
"items": sds_files,
"count": len(sds_files),
"limit_info": _limit_info
}
@router.get("/inventory")
@require_module("CHEMIQ")
async def get_inventory(
company_id: UUID = Depends(get_current_company_id),
db: Session = Depends(get_db)
):
"""Get inventory with plan-aware features"""
entitlement_service = EntitlementService(db)
# Get feature level
feature_level = entitlement_service.get_feature_level(company_id, "CHEMIQ")
# Build response based on plan level
response = {
"items": [], # Basic inventory
"feature_level": feature_level
}
# Add plan-specific features
if entitlement_service.check_capability_access(
company_id, "CHEMIQ", "INVENTORY", "BARCODE_SCAN"
):
response["barcode_scan_enabled"] = True
if entitlement_service.check_capability_access(
company_id, "CHEMIQ", "INVENTORY", "USAGE_ANALYTICS"
):
response["analytics"] = {
# Analytics data
}
return response
4. Frontend Service
File: src/services/entitlements.ts
import { api } from './api';
export interface LimitInfo {
allowed: boolean;
limit: number | null;
current: number;
remaining: number | null;
unit: string | null;
}
export const entitlementService = {
async checkModule(moduleCode: string): Promise<boolean> {
try {
const response = await api.get(`/entitlements/check-module/${moduleCode}`);
return response.data.has_access;
} catch {
return false;
}
},
async checkCapability(
moduleCode: string,
featureCode: string,
capabilityCode: string
): Promise<boolean> {
try {
const response = await api.get(
`/entitlements/check-capability/${moduleCode}/${featureCode}/${capabilityCode}`
);
return response.data.has_access;
} catch {
return false;
}
},
async getFeatureLevel(moduleCode: string): Promise<string | null> {
try {
const response = await api.get(`/entitlements/feature-level/${moduleCode}`);
return response.data.feature_level;
} catch {
return null;
}
},
async checkLimit(entitlementCode: string): Promise<LimitInfo> {
const response = await api.get(`/entitlements/limit/${entitlementCode}`);
return response.data;
},
parseLimitHeaders(headers: Headers): LimitInfo | null {
const limit = headers.get('X-Limit');
const current = headers.get('X-Current');
const remaining = headers.get('X-Remaining');
if (!limit) return null;
return {
allowed: parseInt(remaining || '0') > 0,
limit: parseInt(limit),
current: parseInt(current || '0'),
remaining: parseInt(remaining || '0'),
unit: null
};
}
};
5. React Component Example
File: src/components/SDSUploadButton.tsx
import { useState, useEffect } from 'react';
import { entitlementService, LimitInfo } from '@/services/entitlements';
import { Button } from '@/components/ui/button';
import { Badge } from '@/components/ui/badge';
import { Tooltip } from '@/components/ui/tooltip';
export function SDSUploadButton() {
const [canBulkUpload, setCanBulkUpload] = useState(false);
const [canUseAI, setCanUseAI] = useState(false);
const [limitInfo, setLimitInfo] = useState<LimitInfo | null>(null);
useEffect(() => {
async function checkEntitlements() {
const [bulkAccess, aiAccess, limits] = await Promise.all([
entitlementService.checkCapability('CHEMIQ', 'SDS_BINDER', 'BULK_UPLOAD'),
entitlementService.checkCapability('CHEMIQ', 'SDS_BINDER', 'AI_EXTRACTION'),
entitlementService.checkLimit('LIMIT_SDS_UPLOADS')
]);
setCanBulkUpload(bulkAccess);
setCanUseAI(aiAccess);
setLimitInfo(limits);
}
checkEntitlements();
}, []);
const isNearLimit = limitInfo && limitInfo.limit &&
limitInfo.current >= limitInfo.limit * 0.9;
return (
<div className="space-y-4">
{/* Upload Buttons */}
<div className="flex gap-2">
<Button disabled={!limitInfo?.allowed}>
Upload Single SDS
</Button>
{canBulkUpload ? (
<Button variant="secondary">
Bulk Upload
</Button>
) : (
<Tooltip content="Upgrade to Standard for bulk upload">
<Button variant="secondary" disabled>
Bulk Upload <Badge variant="outline" className="ml-2">Standard+</Badge>
</Button>
</Tooltip>
)}
</div>
{/* AI Badge */}
{canUseAI && (
<Badge variant="success">
✨ AI Extraction Available
</Badge>
)}
{/* Limit Info */}
{limitInfo && (
<div className="text-sm border-l-2 border-gray-300 pl-3">
{limitInfo.limit ? (
<div>
<span className={isNearLimit ? 'text-orange-600 font-medium' : 'text-gray-600'}>
{limitInfo.current} / {limitInfo.limit} SDS documents
</span>
{isNearLimit && (
<div className="mt-1">
<Badge variant="warning">
Near limit - Upgrade recommended
</Badge>
</div>
)}
</div>
) : (
<span className="text-gray-600">Unlimited SDS uploads</span>
)}
</div>
)}
</div>
);
}
Testing Strategy
Unit Tests
Test Coverage:
- EntitlementService methods
- Permission decorators
- Usage calculators
- Limit checks
Example Test (tests/services/test_entitlement_service.py):
import pytest
from uuid import uuid4
from app.services.entitlement_service import EntitlementService, PlanLevel
def test_check_module_access_required_module(db, test_company):
"""REQUIRED modules should always be accessible"""
service = EntitlementService(db)
assert service.check_module_access(test_company.company_id, "ADMINHQ") == True
def test_check_module_access_plan_based(db, starter_company, standard_company):
"""USER_OPTION modules depend on plan"""
service = EntitlementService(db)
# Starter has CHEMIQ
assert service.check_module_access(starter_company.company_id, "CHEMIQ") == True
# Starter does NOT have INCIDENTIQ
assert service.check_module_access(starter_company.company_id, "INCIDENTIQ") == False
# Standard has INCIDENTIQ
assert service.check_module_access(standard_company.company_id, "INCIDENTIQ") == True
def test_check_limit_within_limit(db, test_company):
"""Should allow action when within limit"""
service = EntitlementService(db)
result = service.check_limit(test_company.company_id, "LIMIT_SDS_UPLOADS", 50)
assert result["allowed"] == True
assert result["limit"] == 100 # Starter limit
assert result["current"] == 50
assert result["remaining"] == 50
def test_check_limit_exceeded(db, test_company):
"""Should deny action when limit exceeded"""
service = EntitlementService(db)
result = service.check_limit(test_company.company_id, "LIMIT_SDS_UPLOADS", 101)
assert result["allowed"] == False
assert result["limit"] == 100
assert result["current"] == 101
assert result["remaining"] == 0
Integration Tests
Test Coverage:
- API routes with different plans
- Decorator behavior
- Company overrides
- Multi-company scenarios
Example Test (tests/api/test_entitlements.py):
import pytest
from fastapi.testclient import TestClient
def test_upload_sds_within_limit(client: TestClient, starter_auth_headers):
"""Should allow upload when within limit"""
response = client.post(
"/api/v1/chemiq/sds/upload",
files={"file": ("test.pdf", b"fake pdf content")},
headers=starter_auth_headers
)
assert response.status_code == 200
assert "limit_info" in response.json()
def test_upload_sds_exceeds_limit(client: TestClient, starter_auth_headers):
"""Should deny upload when limit exceeded"""
# Mock current usage to be at limit
# ... setup code ...
response = client.post(
"/api/v1/chemiq/sds/upload",
files={"file": ("test.pdf", b"fake pdf content")},
headers=starter_auth_headers
)
assert response.status_code == 402 # Payment Required
assert "limit exceeded" in response.json()["detail"].lower()
def test_bulk_upload_requires_standard(client: TestClient, starter_auth_headers, standard_auth_headers):
"""Bulk upload should require Standard or Pro plan"""
# Starter should be denied
response = client.post(
"/api/v1/chemiq/sds/bulk-upload",
files=[("files", ("test1.pdf", b"content"))],
headers=starter_auth_headers
)
assert response.status_code == 403
# Standard should succeed
response = client.post(
"/api/v1/chemiq/sds/bulk-upload",
files=[("files", ("test1.pdf", b"content"))],
headers=standard_auth_headers
)
assert response.status_code == 200
Rollout Plan
Week 1-2: Development
- Implement EntitlementService
- Create permission decorators
- Add unit tests
Week 3: Integration
- Update API routes with decorators
- Create entitlement API endpoints
- Add integration tests
Week 4: Frontend
- Create frontend entitlement service
- Build React hooks
- Update UI components
Week 5: Data Migration
- Seed limit entitlements
- Verify data integrity
- Create admin override UI
Week 6: Testing
- QA testing on staging
- Performance testing
- Security review
Week 7: Production Rollout
- Deploy to production (gradual rollout)
- Monitor errors and usage
- Gather user feedback
- Iterate based on feedback
Success Metrics
Technical Metrics
- 100% of API routes have appropriate decorators
- Less than 100ms average latency for entitlement checks
- 0 entitlement bypass vulnerabilities
- >95% test coverage for entitlement code
Business Metrics
- Reduction in support tickets about feature access
- Increase in plan upgrade conversions
- Clear usage metrics per company
- Reduced revenue leakage from unlimited usage
Appendix
Entitlement Code Naming Convention
Use the following pattern for entitlement codes:
Feature Entitlements:
- Format:
{MODULE}_{FEATURE}_{CAPABILITY} - Example:
CHEMIQ_SDS_BINDER_BULK_UPLOAD - Example:
CHEMIQ_INVENTORY_BARCODE_SCAN
Limit Entitlements:
- Format:
LIMIT_{RESOURCE} - Example:
LIMIT_SDS_UPLOADS - Example:
LIMIT_USERS - Example:
LIMIT_API_CALLS
Common Entitlement Codes
| Code | Type | Description |
|---|---|---|
LIMIT_SDS_UPLOADS | limit | Max SDS documents |
LIMIT_USERS | limit | Max active users |
LIMIT_SITES | limit | Max sites/locations |
LIMIT_API_CALLS | limit | API rate limit |
LIMIT_STORAGE_GB | limit | Storage quota |
CHEMIQ_SDS_BINDER_BULK_UPLOAD | feature | Bulk SDS upload |
CHEMIQ_SDS_BINDER_AI_EXTRACTION | feature | AI metadata extraction |
CHEMIQ_INVENTORY_BARCODE_SCAN | feature | Barcode scanning |
CHEMIQ_INVENTORY_ANALYTICS | feature | Usage analytics |
Contact & Support
For questions or issues during implementation:
- Architecture Questions: Review this document or check
docs/data_model/ - Code Issues: Check existing implementations in
app/services/ - Testing: See
tests/directory for examples
Document Version: 1.0 Last Updated: 2025-01-07 Author: Claude (Anthropic) Status: Draft - Ready for Review