Skip to main content

Documentation Index

Fetch the complete documentation index at: https://reporting.continu.com/llms.txt

Use this file to discover all available pages before exploring further.

This is the page your team will come back to most. The Reporting API is well-suited to warehouse syncs, but a few patterns make the difference between a reliable pipeline and one that quietly diverges from the source of truth. For ongoing data warehouse loads, sync incrementally on a regular cadence.
1

Track your cursor

Track the last until timestamp you’ve synced. Store this in your pipeline state (a metadata table, Airflow XCom, dbt-sources, etc.).
2

Set range from your cursor

On each run, set from to that stored timestamp and until to “now.”
3

Sync and advance

POST the request, write the results to your warehouse, and update your stored cursor to the new until.
Run 1 (initial backfill): from = 0,             until = now
Run 2 (1 hour later):     from = previous until, until = now
Run 3 (1 hour later):     from = previous until, until = now
...
Recommended cadence: hourly for active reports (content completion, user engagement), daily for slower-changing ones (assignments, assessments). Avoid sub-minute polling.

Initial backfill

For the first load, break the historical window into chunks (e.g., one month per request) and run them sequentially.
Don’t try to backfill a multi-year window in a single request. Responses are unpaginated, and very large responses are slow and fragile. Chunking lets you resume cleanly if anything fails mid-backfill.
import time
from datetime import datetime, timedelta, timezone

start = datetime(2022, 1, 1, tzinfo=timezone.utc)
end = datetime.now(timezone.utc)
window = timedelta(days=30)

cursor = start
while cursor < end:
    chunk_end = min(cursor + window, end)
    from_ts = int(cursor.timestamp())
    until_ts = int(chunk_end.timestamp())
    sync_chunk(from_ts, until_ts)
    cursor = chunk_end
    time.sleep(2)  # gentle pacing under the 50 RPM limit

Handling updates

Records that change after their initial creation (a completion that gets un-done, an assignment that’s reassigned) will reappear in subsequent reports with updated timestamps.
Your warehouse load should upsert by the record’s ID (e.g., completion_id, assignment_id, assessment_id), not append. If you append, you’ll get duplicates and stale rows.
To capture all changes, set range_target to the broadest “updated” date field the endpoint supports (typically updated_date for assignments) so any change pulls the record into your next sync.

Handling deletes

Deleted records are not currently exposed via this API. If you need to detect deletions:
  1. Run a periodic full reconciliation against the current state.
  2. Pull all active records for a wide date range.
  3. Compare against your warehouse — anything in the warehouse but not in the reconciliation is presumed deleted.
A weekly reconciliation is typically sufficient for most analytics use cases. If you need lower-latency deletion detection, raise it with your Continu account team.

Field selection strategy

Start narrow

Request only the fields you need today. Adding fields later is a body-only change — no schema migration on the API side.

Mirror your warehouse

Only flag the fields you’ve made columns for in your warehouse. Don’t request fields you’ll discard.

Sub-objects sparingly

Toggling content, user, assignment to true inflates response size significantly. Only enable when needed.

Re-evaluate quarterly

As your reporting needs evolve, audit which fields you’re requesting and prune unused ones.
Endpoint groupSuggested frequency
Content completion, user contentHourly
User engagementHourly
Assignment summary, pivoted assignmentsDaily
Assessment statusDaily
Track status, SCORM statusDaily
Workshop engagement, workshop statusDaily
Manager summaryDaily
Simplified assignments (current state)On-demand or daily snapshot

End-to-end skeleton

import os
import time
from datetime import datetime, timezone
import requests

BASE = "https://reports-latest.continu.co/api/v1"
TOKEN = os.environ["CONTINU_TOKEN"]

def fetch_completions(from_ts: int, until_ts: int):
    response = requests.post(
        f"{BASE}/reports/content-completion/{from_ts}/{until_ts}",
        headers={"Authorization": TOKEN, "Content-Type": "application/json"},
        json={
            "email": True,
            "first_name": True,
            "last_name": True,
            "departments": True,
            "content": True,
            "assignment": True,
            "user_filter": {},  # all users
        },
        timeout=300,
    )

    if response.status_code == 429:
        retry_after = int(response.headers.get("Retry-After", 30))
        time.sleep(retry_after)
        return fetch_completions(from_ts, until_ts)

    response.raise_for_status()
    return response.json()

def run_hourly_sync(cursor_table):
    last_until = cursor_table.get("content_completion", 0)
    now = int(datetime.now(timezone.utc).timestamp())
    records = fetch_completions(last_until, now)
    warehouse_upsert("content_completion", records, primary_key="completion_id")
    cursor_table.set("content_completion", now)