Hourly CRM inbound orchestrator for three inboxes using Notion-synced SOP, strict business-lead filt
Hourly CRM inbound orchestrator for three inboxes using Notion-synced SOP, strict business-lead filtering, Supabase persistence, and actionable-only Slack reporting.
Use this skill for hourly polling CRM workflows across:
info@yourdomain.comsales@yourdomain.comsupport@yourdomain.comThe source-of-truth SOP is synced from Notion page CRM_SOP_PAGE_ID every run.
Grab the skill package ZIP file using the button above.
Extract and move the folder into your AI agent's skills directory.
Your agent now knows the skill. Just ask it to perform the task!
This is the raw instruction document consumed by your AI agent.
Use this skill for hourly polling CRM workflows across:
info@yourdomain.comsales@yourdomain.comsupport@yourdomain.comThe source-of-truth SOP is synced from Notion page CRM_SOP_PAGE_ID every run.
Required:
NOTION_API_KEYCRM_SOP_PAGE_ID (default: 31288fb313488013924ade7bf704ab6f)CRM_MONITORED_EMAILS (comma-separated)CRM_POLL_QUERY (default: in:inbox is:unread -in:spam -in:trash -category:promotions -category:social -category:updates -category:forums)CRM_POLL_OVERLAP_MINUTES (default: 120)SUPABASE_URLSUPABASE_SECRET_KEYOptional:
CRM_POLL_MAX_RESULTS (default: 200)CRM_POLL_MAX_AGE_HOURS (default: 36)CRM_SOP_CACHE_FILE (default: /tmp/crm-inbound-sop-cache.json)CRM_POLL_STATE_TABLE (default: crm_poll_state)CRM_CONTACTS_TABLE (default: crm_contacts)CRM_ACTIVITIES_TABLE (default: crm_activities)CRM_DRAFTS_TABLE (default: crm_drafts)CRM_ACCOUNTING_TABLE (default: accounting_entries)CRM_JOB_RUNS_TABLE (default: crm_job_runs)GOG_ACCOUNT (fallback sender account for approvals)CRM_OUTSTANDING_LOOKBACK_DAYS (default: 7)CRM_OUTSTANDING_STALE_HOURS (default: 24)CRM_OUTSTANDING_NOTIFY_EMPTY (default: false)CRM_CLASSIFIER_MODEL (default: gpt-5-nano)CRM_REPLY_MODEL (default: gpt-5.2)CRM_USE_MODEL_CLASSIFIER (default: true)CRM_USE_MODEL_REPLY_WRITER (default: true)OPENAI_API_KEY (required to use model classifier/reply writer)CRM_GMAIL_LABEL_APPLY (default: true)CRM_GMAIL_LABEL_LEAD (default: CRM/Lead)tsx {baseDir}/scripts/fetch-sop.ts fetch_sop
Optional flags:
--page-id <id>--cache-file <path>--output <path>tsx {baseDir}/scripts/poll-inboxes.ts poll_inboxes
Optional flags:
--accounts <csv>--query <gmail-query>--overlap-minutes <n>--max-age-hours <n>--output <path>tsx {baseDir}/scripts/process-inbound.ts process_inbound \
--poll-file /tmp/crm-poll.json
Optional flags:
--sop-file <path>--output <path>tsx {baseDir}/scripts/approval-action.ts approval_action \
--action approve \
--draft-id <draft_id> \
--approved-by "U052337J8QH"
Also supported:
--action revise --notes "<feedback>"--action reject --reason "<reason>"tsx {baseDir}/scripts/check-outstanding.ts check_outstanding
Optional flags:
--lookback-days <n> (default: 7)--stale-hours <n> (default: 24)--output <path>For each actionable lead, post a simple Slack card containing only:
Approval/revision happens in the Slack thread, not via command strings in the main message.
account_email:message_id.gpt-5-nano into receipt|sales|support|ignore (fallback to heuristics only if model call fails).classification, lead, inbound, routing, qualification) and inject it into the classifier prompt.sales when business ask is explicit.CRM/Lead (or CRM_GMAIL_LABEL_LEAD) to sales threads.view in browser, unsubscribe, manage preferences, roundup-style blasts, Gmail promotional categories) are forced to ignore unless explicit lead criteria are met.sales guesses are downgraded to ignore.gpt-5.2Use clear headings in your Notion page so policy extraction stays deterministic:
Business Context
Lead Classification Rules
sales): person/company reaching out for consulting, sponsorship, partnerships, affiliate opportunities, expert-network interviews, or any paid advisory callLead Qualification Checklist
Response Playbook
Out-of-Scope
Reference template:
cat {baseDir}/references/notion-inbound-sop-template.md
Tables:
crm_contactscrm_activitiescrm_draftsaccounting_entriescrm_job_runscrm_poll_stateReference DDL:
cat {baseDir}/references/supabase-schema.sql
openclaw cron add \
--name "CRM hourly polling" \
--cron "0 * * * *" \
--tz "America/New_York" \
--session isolated \
--message "Run crm-inbound-orchestrator hourly polling cycle. Use skill crm-inbound-orchestrator. Run fetch_sop, poll_inboxes, process_inbound. Only report actionable items."
openclaw cron add \
--name "CRM morning outstanding check" \
--cron "20 9 * * *" \
--tz "America/Toronto" \
--session isolated \
--message "Run crm-inbound-orchestrator outstanding review. Use skill crm-inbound-orchestrator. Run check_outstanding for last 7 days and post a concise summary to Slack."
degraded.Browse additional components, config blocks, and reference sheets included in the ZIP.
crm-inbound-orchestrator/SKILL.md
crm-inbound-orchestrator/references/notion-inbound-sop-template.md
crm-inbound-orchestrator/references/supabase-schema.sql
crm-inbound-orchestrator/scripts/approval-action.ts
crm-inbound-orchestrator/scripts/check-outstanding.ts
crm-inbound-orchestrator/scripts/fetch-sop.ts
crm-inbound-orchestrator/scripts/poll-inboxes.ts
crm-inbound-orchestrator/scripts/process-inbound.ts
skills/crm-inbound-orchestrator/scripts/poll-inboxes.ts
import { execFile } from "node:child_process";
import { randomUUID } from "node:crypto";
import { mkdir, writeFile } from "node:fs/promises";
import path from "node:path";
import { promisify } from "node:util";
type CliArgs = {
command?: string;
flags: Record<string, string | boolean>;
};
type PollStateRow = {
account_email: string;
last_polled_at?: string;
last_message_ts?: string;
updated_at?: string;
};
type GmailMessage = {
account_email: string;
message_id: string;
thread_id?: string;
subject?: string;
from?: string;
snippet?: string;
body_text?: string;
received_at?: string;
internal_ts?: number;
source_key: string;
raw: Record<string, unknown>;
};
type AccountPollResult = {
account_email: string;
query: string;
since_ts: string;
fetched_count: number;
dropped_older_than_window?: number;
error?: string;
};
type PollOutput = {
command: "poll_inboxes";
run_id: string;
started_at: string;
finished_at: string;
poll_query: string;
overlap_minutes: number;
max_age_hours: number;
max_results: number;
per_account: AccountPollResult[];
partial_failure: boolean;
total_messages: number;
messages: GmailMessage[];
};
const execFileAsync = promisify(execFile);
const DEFAULT_POLL_QUERY =
"in:inbox is:unread -in:spam -in:trash -category:promotions -category:social -category:updates -category:forums";
const DEFAULT_OVERLAP_MINUTES = 120;
const DEFAULT_MAX_RESULTS = 200;
const DEFAULT_MAX_AGE_HOURS = 36;
const DEFAULT_POLL_STATE_TABLE = "crm_poll_state";
const DEFAULT_OUTPUT = "/tmp/crm-poll.json";
function parseArgs(argv: string[]): CliArgs {
const tokens = argv.slice(2);
const command = tokens.shift();
const flags: Record<string, string | boolean> = {};
for (let i = 0; i < tokens.length; i += 1) {
const token = tokens[i];
if (!token.startsWith("--")) {
continue;
}
const key = token.slice(2);
const next = tokens[i + 1];
if (!next || next.startsWith("--")) {
flags[key] = true;
continue;
}
flags[key] = next;
i += 1;
}
return { command, flags };
}
function asString(value: string | boolean | undefined): string | undefined {
return typeof value === "string" ? value : undefined;
}
function asNumber(value: string | boolean | undefined): number | undefined {
const text = asString(value);
if (!text) {
return undefined;
}
const parsed = Number.parseInt(text, 10);
return Number.isFinite(parsed) ? parsed : undefined;
}
function clean(value: string | undefined): string | undefined {
const trimmed = value?.trim();
return trimmed ? trimmed : undefined;
}
function parseEmails(value: string | undefined): string[] {
const text = clean(value);
if (!text) {
return [];
}
return text
.split(",")
.map((entry) => entry.trim().toLowerCase())
.filter((entry) => entry.length > 0);
}
function getRecord(value: unknown): Record<string, unknown> | undefined {
if (!value || typeof value !== "object") {
return undefined;
}
return value as Record<string, unknown>;
}
function getString(record: Record<string, unknown>, keys: string[]): string | undefined {
for (const key of keys) {
const candidate = record[key];
if (typeof candidate === "string" && candidate.trim()) {
return candidate.trim();
}
}
return undefined;
}
function getEpochMillis(record: Record<string, unknown>): number | undefined {
const candidates = [record.internalDate, record.internal_date, record.receivedAt, record.date];
for (const candidate of candidates) {
if (typeof candidate === "number" && Number.isFinite(candidate)) {
if (candidate > 1_000_000_000_000) {
return candidate;
}
if (candidate > 1_000_000_000) {
return candidate * 1000;
}
}
if (typeof candidate === "string" && candidate.trim()) {
if (/^\d+$/.test(candidate.trim())) {
const parsed = Number.parseInt(candidate.trim(), 10);
if (Number.isFinite(parsed)) {
if (parsed > 1_000_000_000_000) {
return parsed;
}
if (parsed > 1_000_000_000) {
return parsed * 1000;
}
}
}
const parsedDate = Date.parse(candidate);
if (Number.isFinite(parsedDate)) {
return parsedDate;
}
}
}
return undefined;
}
function sanitizeText(value: string): string {
return value
.replace(/<style[\s\S]*?<\/style>/gi, " ")
.replace(/<script[\s\S]*?<\/script>/gi, " ")
.replace(/<[^>]+>/g, " ")
.replace(/ /gi, " ")
.replace(/&/gi, "&")
.replace(/</gi, "<")
.replace(/>/gi, ">")
.replace(/\s+/g, " ")
.trim();
}
function decodeBase64UrlText(value: string): string | undefined {
const compact = value.trim();
if (compact.length < 24 || !/^[A-Za-z0-9+/_=-]+$/.test(compact)) {
return undefined;
}
try {
const normalized = compact.replace(/-/g, "+").replace(/_/g, "/");
const decoded = Buffer.from(normalized, "base64").toString("utf8");
const cleaned = sanitizeText(decoded);
if (!cleaned) {
return undefined;
}
const printableRatio =
cleaned.replace(/[^\x09\x0A\x0D\x20-\x7E]/g, "").length / Math.max(cleaned.length, 1);
if (printableRatio < 0.75) {
return undefined;
}
return cleaned;
} catch {
return undefined;
}
}
function collectBodyText(value: unknown, out: string[], depth = 0): void {
if (depth > 5 || out.length > 40 || value === undefined || value === null) {
return;
}
if (typeof value === "string") {
const cleaned = sanitizeText(value);
if (cleaned) {
out.push(cleaned);
}
const decoded = decodeBase64UrlText(value);
if (decoded) {
out.push(decoded);
}
return;
}
if (Array.isArray(value)) {
for (const item of value) {
collectBodyText(item, out, depth + 1);
}
return;
}
const record = getRecord(value);
if (!record) {
return;
}
const keys = [
"snippet",
"preview",
"bodySnippet",
"body",
"textBody",
"plainTextBody",
"bodyText",
"content",
"text",
"value",
"data",
"payload",
"parts",
"mimeParts",
"message",
"messages",
"raw",
];
for (const key of keys) {
if (key in record) {
collectBodyText(record[key], out, depth + 1);
}
}
}
function extractBodyText(raw: Record<string, unknown>): string | undefined {
const segments: string[] = [];
collectBodyText(raw, segments, 0);
if (segments.length === 0) {
return undefined;
}
const unique = Array.from(new Set(segments.map((segment) => segment.trim()).filter(Boolean)));
if (unique.length === 0) {
return undefined;
}
unique.sort((a, b) => b.length - a.length);
return unique[0]?.slice(0, 4_000);
}
function parseGogMessages(rawJson: string): Record<string, unknown>[] {
const parsed = JSON.parse(rawJson) as unknown;
if (Array.isArray(parsed)) {
return parsed.filter((item): item is Record<string, unknown> => Boolean(getRecord(item)));
}
const record = getRecord(parsed);
if (!record) {
return [];
}
const listCandidates = [
record.messages,
record.items,
record.data,
record.results,
record.rows,
record.threads,
];
for (const candidate of listCandidates) {
if (!Array.isArray(candidate)) {
continue;
}
return candidate.filter((item): item is Record<string, unknown> => Boolean(getRecord(item)));
}
return [];
}
async function runGogMessageSearch(args: {
account: string;
query: string;
maxResults: number;
}): Promise<Record<string, unknown>[]> {
const commandArgs = [
"gmail",
"messages",
"search",
args.query,
"--max",
String(args.maxResults),
"--account",
args.account,
"--json",
"--no-input",
"--include-body",
];
const { stdout } = await execFileAsync("gog", commandArgs, {
maxBuffer: 15 * 1024 * 1024,
});
return parseGogMessages(stdout);
}
async function supabaseSelectPollState(options: {
supabaseUrl: string;
serviceKey: string;
table: string;
accountEmail: string;
}): Promise<PollStateRow | undefined> {
const query = new URLSearchParams();
query.set("select", "account_email,last_polled_at,last_message_ts,updated_at");
query.set("account_email", `eq.${options.accountEmail}`);
query.set("limit", "1");
const response = await fetch(
`${options.supabaseUrl}/rest/v1/${options.table}?${query.toString()}`,
{
headers: {
apikey: options.serviceKey,
Authorization: `Bearer ${options.serviceKey}`,
Accept: "application/json",
},
},
);
if (!response.ok) {
const text = await response.text();
throw new Error(`Supabase poll state read failed (${response.status}): ${text || "unknown"}`);
}
const body = (await response.json()) as unknown;
if (!Array.isArray(body) || body.length === 0) {
return undefined;
}
const row = getRecord(body[0]);
if (!row) {
return undefined;
}
return {
account_email: String(row.account_email ?? options.accountEmail),
last_polled_at: typeof row.last_polled_at === "string" ? row.last_polled_at : undefined,
last_message_ts: typeof row.last_message_ts === "string" ? row.last_message_ts : undefined,
updated_at: typeof row.updated_at === "string" ? row.updated_at : undefined,
};
}
function buildSinceEpochSeconds(state: PollStateRow | undefined, overlapMinutes: number): number {
const overlapMs = overlapMinutes * 60_000;
const now = Date.now();
if (!state?.last_message_ts) {
return Math.floor((now - overlapMs) / 1000);
}
const parsed = Date.parse(state.last_message_ts);
if (!Number.isFinite(parsed)) {
return Math.floor((now - overlapMs) / 1000);
}
const sinceMs = Math.max(parsed - overlapMs, 0);
return Math.floor(sinceMs / 1000);
}
async function writeJson(filePath: string, payload: unknown): Promise<void> {
await mkdir(path.dirname(filePath), { recursive: true });
await writeFile(filePath, `${JSON.stringify(payload, null, 2)}\n`, "utf8");
}
function normalizeMessage(account: string, raw: Record<string, unknown>): GmailMessage | undefined {
const messageId = getString(raw, ["id", "messageId", "message_id"]);
if (!messageId) {
return undefined;
}
const threadId = getString(raw, ["threadId", "thread_id"]);
const subject = getString(raw, ["subject"]);
const from = getString(raw, ["from", "sender"]);
const bodyText = extractBodyText(raw);
const snippet = getString(raw, ["snippet", "preview", "bodySnippet"]) || bodyText?.slice(0, 600);
const internalTs = getEpochMillis(raw);
return {
account_email: account,
message_id: messageId,
thread_id: threadId,
subject,
from,
snippet,
body_text: bodyText,
received_at: internalTs ? new Date(internalTs).toISOString() : undefined,
internal_ts: internalTs,
source_key: `${account}:${messageId}`,
raw,
};
}
async function main() {
const { command, flags } = parseArgs(process.argv);
if (command !== "poll_inboxes") {
console.error(
"Usage: bun poll-inboxes.ts poll_inboxes [--accounts <csv>] [--query <gmail-query>] [--overlap-minutes <n>] [--max-age-hours <n>] [--output <path>]",
);
process.exit(1);
}
const accounts = parseEmails(asString(flags.accounts) || process.env.CRM_MONITORED_EMAILS);
if (accounts.length === 0) {
throw new Error("CRM_MONITORED_EMAILS is required (comma-separated)");
}
const pollQuery =
clean(asString(flags.query)) || clean(process.env.CRM_POLL_QUERY) || DEFAULT_POLL_QUERY;
const overlapMinutes =
asNumber(flags["overlap-minutes"]) ||
asNumber(process.env.CRM_POLL_OVERLAP_MINUTES) ||
DEFAULT_OVERLAP_MINUTES;
const maxResults =
asNumber(flags["max-results"]) ||
asNumber(process.env.CRM_POLL_MAX_RESULTS) ||
DEFAULT_MAX_RESULTS;
const maxAgeHours =
asNumber(flags["max-age-hours"]) ||
asNumber(process.env.CRM_POLL_MAX_AGE_HOURS) ||
DEFAULT_MAX_AGE_HOURS;
const outputPath = clean(asString(flags.output)) || DEFAULT_OUTPUT;
const nowMs = Date.now();
const minAllowedTs = nowMs - maxAgeHours * 60 * 60 * 1000;
const supabaseUrl = clean(process.env.SUPABASE_URL);
const supabaseKey = clean(process.env.SUPABASE_SECRET_KEY);
if (!supabaseUrl || !supabaseKey) {
throw new Error("SUPABASE_URL and SUPABASE_SECRET_KEY are required");
}
const pollStateTable = clean(process.env.CRM_POLL_STATE_TABLE) || DEFAULT_POLL_STATE_TABLE;
const runId = randomUUID();
const startedAt = new Date().toISOString();
const perAccount: AccountPollResult[] = [];
const allMessages: GmailMessage[] = [];
for (const account of accounts) {
try {
const state = await supabaseSelectPollState({
supabaseUrl,
serviceKey: supabaseKey,
table: pollStateTable,
accountEmail: account,
});
const sinceEpoch = buildSinceEpochSeconds(state, overlapMinutes);
const accountQuery = `${pollQuery} after:${sinceEpoch}`;
const rows = await runGogMessageSearch({
account,
query: accountQuery,
maxResults,
});
const normalized = rows
.map((row) => normalizeMessage(account, row))
.filter((message): message is GmailMessage => Boolean(message));
const seen = new Set<string>();
const deduped: GmailMessage[] = [];
for (const message of normalized) {
if (seen.has(message.source_key)) {
continue;
}
seen.add(message.source_key);
deduped.push(message);
}
const freshMessages = deduped.filter((message) => {
const ts =
typeof message.internal_ts === "number" && Number.isFinite(message.internal_ts)
? message.internal_ts
: message.received_at
? Date.parse(message.received_at)
: Number.NaN;
return Number.isFinite(ts) && ts >= minAllowedTs;
});
allMessages.push(...freshMessages);
perAccount.push({
account_email: account,
query: accountQuery,
since_ts: new Date(sinceEpoch * 1000).toISOString(),
fetched_count: freshMessages.length,
dropped_older_than_window: deduped.length - freshMessages.length,
});
} catch (error) {
perAccount.push({
account_email: account,
query: pollQuery,
since_ts: new Date(Date.now() - overlapMinutes * 60_000).toISOString(),
fetched_count: 0,
error: error instanceof Error ? error.message : "unknown polling error",
});
}
}
const output: PollOutput = {
command: "poll_inboxes",
run_id: runId,
started_at: startedAt,
finished_at: new Date().toISOString(),
poll_query: pollQuery,
overlap_minutes: overlapMinutes,
max_age_hours: maxAgeHours,
max_results: maxResults,
per_account: perAccount,
partial_failure: perAccount.some((entry) => Boolean(entry.error)),
total_messages: allMessages.length,
messages: allMessages,
};
await writeJson(outputPath, output);
console.log(JSON.stringify(output, null, 2));
}
await main();