Hourly CRM inbound orchestrator for three inboxes using Notion-synced SOP, strict business-lead filt
Hourly CRM inbound orchestrator for three inboxes using Notion-synced SOP, strict business-lead filtering, Supabase persistence, and actionable-only Slack reporting.
Use this skill for hourly polling CRM workflows across:
info@yourdomain.comsales@yourdomain.comsupport@yourdomain.comThe source-of-truth SOP is synced from Notion page CRM_SOP_PAGE_ID every run.
Grab the skill package ZIP file using the button above.
Extract and move the folder into your AI agent's skills directory.
Your agent now knows the skill. Just ask it to perform the task!
This is the raw instruction document consumed by your AI agent.
Use this skill for hourly polling CRM workflows across:
info@yourdomain.comsales@yourdomain.comsupport@yourdomain.comThe source-of-truth SOP is synced from Notion page CRM_SOP_PAGE_ID every run.
Required:
NOTION_API_KEYCRM_SOP_PAGE_ID (default: 31288fb313488013924ade7bf704ab6f)CRM_MONITORED_EMAILS (comma-separated)CRM_POLL_QUERY (default: in:inbox is:unread -in:spam -in:trash -category:promotions -category:social -category:updates -category:forums)CRM_POLL_OVERLAP_MINUTES (default: 120)SUPABASE_URLSUPABASE_SECRET_KEYOptional:
CRM_POLL_MAX_RESULTS (default: 200)CRM_POLL_MAX_AGE_HOURS (default: 36)CRM_SOP_CACHE_FILE (default: /tmp/crm-inbound-sop-cache.json)CRM_POLL_STATE_TABLE (default: crm_poll_state)CRM_CONTACTS_TABLE (default: crm_contacts)CRM_ACTIVITIES_TABLE (default: crm_activities)CRM_DRAFTS_TABLE (default: crm_drafts)CRM_ACCOUNTING_TABLE (default: accounting_entries)CRM_JOB_RUNS_TABLE (default: crm_job_runs)GOG_ACCOUNT (fallback sender account for approvals)CRM_OUTSTANDING_LOOKBACK_DAYS (default: 7)CRM_OUTSTANDING_STALE_HOURS (default: 24)CRM_OUTSTANDING_NOTIFY_EMPTY (default: false)CRM_CLASSIFIER_MODEL (default: gpt-5-nano)CRM_REPLY_MODEL (default: gpt-5.2)CRM_USE_MODEL_CLASSIFIER (default: true)CRM_USE_MODEL_REPLY_WRITER (default: true)OPENAI_API_KEY (required to use model classifier/reply writer)CRM_GMAIL_LABEL_APPLY (default: true)CRM_GMAIL_LABEL_LEAD (default: CRM/Lead)tsx {baseDir}/scripts/fetch-sop.ts fetch_sop
Optional flags:
--page-id <id>--cache-file <path>--output <path>tsx {baseDir}/scripts/poll-inboxes.ts poll_inboxes
Optional flags:
--accounts <csv>--query <gmail-query>--overlap-minutes <n>--max-age-hours <n>--output <path>tsx {baseDir}/scripts/process-inbound.ts process_inbound \
--poll-file /tmp/crm-poll.json
Optional flags:
--sop-file <path>--output <path>tsx {baseDir}/scripts/approval-action.ts approval_action \
--action approve \
--draft-id <draft_id> \
--approved-by "U052337J8QH"
Also supported:
--action revise --notes "<feedback>"--action reject --reason "<reason>"tsx {baseDir}/scripts/check-outstanding.ts check_outstanding
Optional flags:
--lookback-days <n> (default: 7)--stale-hours <n> (default: 24)--output <path>For each actionable lead, post a simple Slack card containing only:
Approval/revision happens in the Slack thread, not via command strings in the main message.
account_email:message_id.gpt-5-nano into receipt|sales|support|ignore (fallback to heuristics only if model call fails).classification, lead, inbound, routing, qualification) and inject it into the classifier prompt.sales when business ask is explicit.CRM/Lead (or CRM_GMAIL_LABEL_LEAD) to sales threads.view in browser, unsubscribe, manage preferences, roundup-style blasts, Gmail promotional categories) are forced to ignore unless explicit lead criteria are met.sales guesses are downgraded to ignore.gpt-5.2Use clear headings in your Notion page so policy extraction stays deterministic:
Business Context
Lead Classification Rules
sales): person/company reaching out for consulting, sponsorship, partnerships, affiliate opportunities, expert-network interviews, or any paid advisory callLead Qualification Checklist
Response Playbook
Out-of-Scope
Reference template:
cat {baseDir}/references/notion-inbound-sop-template.md
Tables:
crm_contactscrm_activitiescrm_draftsaccounting_entriescrm_job_runscrm_poll_stateReference DDL:
cat {baseDir}/references/supabase-schema.sql
openclaw cron add \
--name "CRM hourly polling" \
--cron "0 * * * *" \
--tz "America/New_York" \
--session isolated \
--message "Run crm-inbound-orchestrator hourly polling cycle. Use skill crm-inbound-orchestrator. Run fetch_sop, poll_inboxes, process_inbound. Only report actionable items."
openclaw cron add \
--name "CRM morning outstanding check" \
--cron "20 9 * * *" \
--tz "America/Toronto" \
--session isolated \
--message "Run crm-inbound-orchestrator outstanding review. Use skill crm-inbound-orchestrator. Run check_outstanding for last 7 days and post a concise summary to Slack."
degraded.Browse additional components, config blocks, and reference sheets included in the ZIP.
crm-inbound-orchestrator/SKILL.md
crm-inbound-orchestrator/references/notion-inbound-sop-template.md
crm-inbound-orchestrator/references/supabase-schema.sql
crm-inbound-orchestrator/scripts/approval-action.ts
crm-inbound-orchestrator/scripts/check-outstanding.ts
crm-inbound-orchestrator/scripts/fetch-sop.ts
crm-inbound-orchestrator/scripts/poll-inboxes.ts
crm-inbound-orchestrator/scripts/process-inbound.ts
skills/crm-inbound-orchestrator/scripts/process-inbound.ts
import { execFile } from "node:child_process";
import { randomUUID } from "node:crypto";
import { mkdir, readFile, writeFile } from "node:fs/promises";
import path from "node:path";
import { promisify } from "node:util";
type CliArgs = {
command?: string;
flags: Record<string, string | boolean>;
};
type Classification = "receipt" | "sales" | "support" | "ignore";
type PollMessage = {
account_email: string;
message_id: string;
thread_id?: string;
subject?: string;
from?: string;
snippet?: string;
body_text?: string;
received_at?: string;
internal_ts?: number;
source_key: string;
raw?: Record<string, unknown>;
};
type PollFile = {
run_id?: string;
started_at?: string;
finished_at?: string;
partial_failure?: boolean;
messages: PollMessage[];
per_account?: Array<{ account_email: string; fetched_count?: number; error?: string }>;
};
type SopSnapshot = {
degraded?: boolean;
source?: string;
warnings?: string[];
sop?: {
hash?: string;
sections?: Array<{ heading?: string; items?: string[] }>;
blocks?: Array<{ text?: string }>;
};
};
type ClassificationResult = {
label: Classification;
confidence: number;
reasons: string[];
};
type ContactRow = {
id: string;
email: string;
display_name?: string;
};
type ActivityRow = {
id: string;
source_key: string;
account_email: string;
message_id: string;
};
type DraftRow = {
id: string;
activity_id: string;
account_email: string;
to_email: string;
subject: string;
body: string;
};
type ProcessResult = {
command: "process_inbound";
run_id: string;
started_at: string;
finished_at: string;
status: "ok" | "partial_failure";
degraded: boolean;
totals: {
polled_messages: number;
processed_messages: number;
activities_upserted: number;
drafts_upserted: number;
accounting_entries_upserted: number;
};
classification_counts: Record<Classification, number>;
sales_drafts: Array<{
draft_id: string;
activity_id: string;
account_email: string;
to_email: string;
slack_posted: boolean;
slack_error?: string;
}>;
accounting_entries: Array<{
activity_id: string;
source_key: string;
vendor?: string;
amount?: number;
currency?: string;
}>;
poll_state_updates: Array<{
account_email: string;
last_polled_at: string;
last_message_ts?: string;
}>;
warnings: string[];
};
type SlackBlock = Record<string, unknown>;
type SlackMessage = {
text: string;
blocks?: SlackBlock[];
};
const DEFAULT_SOP_CACHE_FILE = "/tmp/crm-inbound-sop-cache.json";
const DEFAULT_OUTPUT_FILE = "/tmp/crm-process.json";
const DEFAULT_CONTACTS_TABLE = "crm_contacts";
const DEFAULT_ACTIVITIES_TABLE = "crm_activities";
const DEFAULT_DRAFTS_TABLE = "crm_drafts";
const DEFAULT_ACCOUNTING_TABLE = "accounting_entries";
const DEFAULT_JOB_RUNS_TABLE = "crm_job_runs";
const DEFAULT_POLL_STATE_TABLE = "crm_poll_state";
const DEFAULT_CLASSIFIER_MODEL = "gpt-5-nano";
const DEFAULT_REPLY_MODEL = "gpt-5.2";
const DEFAULT_GMAIL_LEAD_LABEL = "CRM/Lead";
const execFileAsync = promisify(execFile);
const ensuredLabelCache = new Set<string>();
const LEAD_INTENT_SIGNALS = [
"consulting",
"consulting opportunity",
"paid consulting",
"advisory",
"advisor",
"expert network",
"subject matter expert",
"sponsorship",
"sponsorship inquiry",
"partnership",
"affiliate partnership",
"creator partnership",
"collaboration",
"consultation call",
"paid phone consultation",
"partnership inquiry",
];
const LEAD_DIRECT_ASK_SIGNALS = [
"interested in a quick rundown",
"are you interested",
"would you be interested",
"book some time",
"book a call",
"schedule a call",
"let us know if you'd be interested",
"if this is in your wheelhouse",
"reach out to discuss",
"follow up in case my previous email slipped through the cracks",
];
const LEAD_COMMERCIAL_SIGNALS = [
"payment for your time",
"paid",
"deliverables",
"budget",
"campaign brief",
"client",
"sponsorship",
"partnership",
"consultation",
"project",
"timeline",
];
const EXPERT_NETWORK_DOMAINS = [
"alphasights.com",
"guidepoint.com",
"thirdbridge.com",
"glgroup.com",
"dialecticanet.com",
"colemanrg.com",
"prosapient.com",
"visasq.com",
];
const AUTOMATED_SENDER_SIGNALS = [
"no-reply",
"noreply",
"do-not-reply",
"notifications",
"digest",
"newsletter",
"jobalerts",
"automated",
];
const AUTOMATED_TEXT_SIGNALS = [
"job alert",
"jobs you may be interested",
"recommended jobs",
"linkedin jobs",
"daily digest",
"weekly digest",
"unsubscribe",
"manage preferences",
"view in browser",
"notification settings",
];
const NEWSLETTER_DIGEST_SIGNALS = [
"view in browser",
"unsubscribe",
"manage preferences",
"privacy policy",
"all rights reserved",
"weekly digest",
"monthly digest",
"in the news",
"plus:",
"numerically speaking",
"top stories",
];
const GMAIL_PROMOTIONAL_LABELS = [
"CATEGORY_PROMOTIONS",
"CATEGORY_SOCIAL",
"CATEGORY_UPDATES",
"CATEGORY_FORUMS",
];
const BROADCAST_SENDER_HINTS = [
"news",
"newsletter",
"editor",
"editorial",
"updates",
"update",
"digest",
"crew",
"noreply",
"no-reply",
];
const HIRING_SIGNALS = [
"hiring",
"job opening",
"apply now",
"application",
"recruiter",
"career opportunity",
"open role",
"resume",
];
const VENDOR_SYSTEM_DOMAINS = [
"linkedin.com",
"indeed.com",
"glassdoor.com",
"ziprecruiter.com",
"monster.com",
"mail.linkedin.com",
"mailchimp.com",
"sendgrid.net",
"stripe.com",
"paypal.com",
"quickbooks.com",
"intuit.com",
];
const RECEIPT_SIGNALS = [
"invoice",
"receipt",
"payment",
"charged",
"charge",
"order #",
"order confirmation",
"billing",
"subscription",
"tax invoice",
];
const SUPPORT_SIGNALS = ["support", "help", "issue", "error", "problem", "unable", "bug"];
function parseArgs(argv: string[]): CliArgs {
const tokens = argv.slice(2);
const command = tokens.shift();
const flags: Record<string, string | boolean> = {};
for (let i = 0; i < tokens.length; i += 1) {
const token = tokens[i];
if (!token.startsWith("--")) {
continue;
}
const key = token.slice(2);
const next = tokens[i + 1];
if (!next || next.startsWith("--")) {
flags[key] = true;
continue;
}
flags[key] = next;
i += 1;
}
return { command, flags };
}
function asString(value: string | boolean | undefined): string | undefined {
return typeof value === "string" ? value : undefined;
}
function clean(value: string | undefined): string | undefined {
const trimmed = value?.trim();
return trimmed ? trimmed : undefined;
}
function getRecord(value: unknown): Record<string, unknown> | undefined {
if (!value || typeof value !== "object") {
return undefined;
}
return value as Record<string, unknown>;
}
function getString(record: Record<string, unknown>, keys: string[]): string | undefined {
for (const key of keys) {
const value = record[key];
if (typeof value === "string" && value.trim()) {
return value.trim();
}
}
return undefined;
}
function getBool(value: string | undefined, fallback: boolean): boolean {
if (!value) {
return fallback;
}
const normalized = value.trim().toLowerCase();
return normalized === "1" || normalized === "true" || normalized === "yes" || normalized === "on";
}
async function readJsonFile<T>(filePath: string): Promise<T> {
const raw = await readFile(filePath, "utf8");
return JSON.parse(raw) as T;
}
async function writeJson(filePath: string, payload: unknown): Promise<void> {
await mkdir(path.dirname(filePath), { recursive: true });
await writeFile(filePath, `${JSON.stringify(payload, null, 2)}\n`, "utf8");
}
async function supabaseRequest<T>(options: {
supabaseUrl: string;
serviceKey: string;
method: "GET" | "POST" | "PATCH";
table: string;
query?: URLSearchParams;
body?: unknown;
prefer?: string;
}): Promise<T> {
const suffix = options.query ? `?${options.query.toString()}` : "";
const response = await fetch(`${options.supabaseUrl}/rest/v1/${options.table}${suffix}`, {
method: options.method,
headers: {
apikey: options.serviceKey,
Authorization: `Bearer ${options.serviceKey}`,
"Content-Type": "application/json",
Accept: "application/json",
...(options.prefer ? { Prefer: options.prefer } : {}),
},
body: options.body === undefined ? undefined : JSON.stringify(options.body),
});
const text = await response.text();
const data = text.trim() ? (JSON.parse(text) as T) : ({} as T);
if (!response.ok) {
throw new Error(
`Supabase ${options.method} ${options.table} failed (${response.status}): ${text}`,
);
}
return data;
}
async function supabaseUpsertRow(
options: {
supabaseUrl: string;
serviceKey: string;
table: string;
onConflict: string;
},
row: Record<string, unknown>,
): Promise<Record<string, unknown>> {
const query = new URLSearchParams();
query.set("on_conflict", options.onConflict);
const response = await supabaseRequest<unknown>({
supabaseUrl: options.supabaseUrl,
serviceKey: options.serviceKey,
method: "POST",
table: options.table,
query,
body: [row],
prefer: "resolution=merge-duplicates,return=representation",
});
if (!Array.isArray(response) || response.length === 0) {
return row;
}
return getRecord(response[0]) ?? row;
}
async function supabasePatchRows(
options: {
supabaseUrl: string;
serviceKey: string;
table: string;
filters: Record<string, string>;
},
patch: Record<string, unknown>,
): Promise<Record<string, unknown>[]> {
const query = new URLSearchParams();
query.set("select", "*");
for (const [key, value] of Object.entries(options.filters)) {
query.set(key, `eq.${value}`);
}
const response = await supabaseRequest<unknown>({
supabaseUrl: options.supabaseUrl,
serviceKey: options.serviceKey,
method: "PATCH",
table: options.table,
query,
body: patch,
prefer: "return=representation",
});
if (!Array.isArray(response)) {
return [];
}
return response
.map((item) => getRecord(item))
.filter((item): item is Record<string, unknown> => Boolean(item));
}
function extractEmailAddress(rawFrom: string | undefined): string | undefined {
if (!rawFrom) {
return undefined;
}
const bracketMatch = rawFrom.match(/<([^>]+)>/);
if (bracketMatch?.[1]) {
return bracketMatch[1].trim().toLowerCase();
}
const bareMatch = rawFrom.match(/[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}/i);
return bareMatch?.[0]?.toLowerCase();
}
function extractEmailDomain(email: string | undefined): string | undefined {
if (!email || !email.includes("@")) {
return undefined;
}
const domain = email.split("@")[1]?.trim().toLowerCase();
return domain || undefined;
}
function includesAny(text: string, signals: string[]): boolean {
return signals.some((signal) => text.includes(signal));
}
function countSignals(text: string, signals: string[]): number {
return signals.reduce((count, signal) => (text.includes(signal) ? count + 1 : count), 0);
}
function domainInList(domain: string | undefined, entries: string[]): boolean {
if (!domain) {
return false;
}
return entries.some((entry) => domain === entry || domain.endsWith(`.${entry}`));
}
function buildInboundText(message: PollMessage): string {
return `${message.subject ?? ""} ${message.snippet ?? ""} ${message.body_text ?? ""} ${message.from ?? ""}`
.toLowerCase()
.trim();
}
function clampText(value: string | undefined, maxChars: number): string | undefined {
const normalized = value?.replace(/\s+/g, " ").trim();
if (!normalized) {
return undefined;
}
if (normalized.length <= maxChars) {
return normalized;
}
return `${normalized.slice(0, Math.max(0, maxChars - 3))}...`;
}
function summarizeInboundMessage(message: PollMessage, maxChars = 850): string | undefined {
const preferred = clampText(message.body_text, maxChars);
if (preferred) {
return preferred;
}
return clampText(message.snippet, maxChars);
}
function extractGmailLabels(message: PollMessage): string[] {
const raw = message.raw;
if (!raw || typeof raw !== "object") {
return [];
}
const labelsRaw = (raw as { labels?: unknown }).labels;
if (!Array.isArray(labelsRaw)) {
return [];
}
return labelsRaw
.filter((entry): entry is string => typeof entry === "string")
.map((entry) => entry.trim().toUpperCase())
.filter(Boolean);
}
function detectExplicitBusinessLead(message: PollMessage): { matched: boolean; reasons: string[] } {
const text = buildInboundText(message);
const senderEmail = extractEmailAddress(message.from);
const senderDomain = extractEmailDomain(senderEmail);
const senderLocal = senderEmail?.split("@")[0]?.trim().toLowerCase() || "";
const reasons: string[] = [];
const leadIntentScore = countSignals(text, LEAD_INTENT_SIGNALS);
const directAskScore = countSignals(text, LEAD_DIRECT_ASK_SIGNALS);
const businessContextScore = countSignals(text, LEAD_COMMERCIAL_SIGNALS);
const fromExpertNetwork = domainInList(senderDomain, EXPERT_NETWORK_DOMAINS);
const looksAutomated =
includesAny(senderLocal, AUTOMATED_SENDER_SIGNALS) ||
includesAny(text, AUTOMATED_TEXT_SIGNALS);
const looksBroadcastSender = includesAny(senderLocal, BROADCAST_SENDER_HINTS);
const looksHiring = includesAny(text, HIRING_SIGNALS);
const fromVendorSystem = domainInList(senderDomain, VENDOR_SYSTEM_DOMAINS);
if (looksAutomated || looksHiring) {
return { matched: false, reasons };
}
if (looksBroadcastSender && !fromExpertNetwork) {
return { matched: false, reasons };
}
if (fromExpertNetwork && (leadIntentScore > 0 || directAskScore > 0 || businessContextScore > 0)) {
reasons.push("lead-expert-network-outreach");
return { matched: true, reasons };
}
if (!fromVendorSystem && leadIntentScore > 0 && (directAskScore > 0 || businessContextScore > 0)) {
reasons.push("lead-business-outreach-intent");
return { matched: true, reasons };
}
if (!fromVendorSystem && directAskScore > 0 && businessContextScore > 0) {
reasons.push("lead-business-call-to-action");
return { matched: true, reasons };
}
return { matched: false, reasons };
}
function detectHardIgnore(message: PollMessage): { matched: boolean; reasons: string[] } {
const text = buildInboundText(message);
const senderEmail = extractEmailAddress(message.from);
const senderDomain = extractEmailDomain(senderEmail);
const senderLocal = senderEmail?.split("@")[0]?.trim().toLowerCase() || "";
const reasons: string[] = [];
const gmailLabels = extractGmailLabels(message);
const lead = detectExplicitBusinessLead(message);
if (lead.matched) {
return { matched: false, reasons };
}
const automatedScore =
countSignals(text, AUTOMATED_TEXT_SIGNALS) + countSignals(text, NEWSLETTER_DIGEST_SIGNALS);
const looksAutomatedSender =
includesAny(senderLocal, AUTOMATED_SENDER_SIGNALS) || domainInList(senderDomain, VENDOR_SYSTEM_DOMAINS);
const looksBroadcastSender = includesAny(senderLocal, BROADCAST_SENDER_HINTS);
const hasPromotionalCategory = gmailLabels.some((label) => GMAIL_PROMOTIONAL_LABELS.includes(label));
const looksHiring = includesAny(text, HIRING_SIGNALS);
if (looksHiring) {
reasons.push("hard-ignore-hiring-spam");
return { matched: true, reasons };
}
if (automatedScore >= 2) {
reasons.push("hard-ignore-newsletter-or-digest");
return { matched: true, reasons };
}
if (hasPromotionalCategory && (automatedScore >= 1 || looksBroadcastSender)) {
reasons.push("hard-ignore-gmail-promo-category");
return { matched: true, reasons };
}
if (looksBroadcastSender && automatedScore >= 1) {
reasons.push("hard-ignore-broadcast-sender");
return { matched: true, reasons };
}
if (looksAutomatedSender && automatedScore >= 1) {
reasons.push("hard-ignore-automated-sender");
return { matched: true, reasons };
}
return { matched: false, reasons };
}
function extractDisplayName(rawFrom: string | undefined): string | undefined {
if (!rawFrom) {
return undefined;
}
const withoutEmail = rawFrom
.replace(/<[^>]+>/g, "")
.replace(/\"/g, "")
.trim();
return withoutEmail || undefined;
}
function extractSopGuidance(sop: SopSnapshot | undefined, maxChars = 4_000): string {
const chunks: string[] = [];
const sections = Array.isArray(sop?.sop?.sections) ? sop.sop.sections : [];
for (const section of sections) {
if (!section || typeof section !== "object") {
continue;
}
const heading = typeof section.heading === "string" ? section.heading.trim() : "";
if (heading) {
chunks.push(`# ${heading}`);
}
const items = Array.isArray(section.items) ? section.items : [];
for (const item of items) {
if (typeof item === "string" && item.trim()) {
chunks.push(`- ${item.trim()}`);
}
}
}
if (chunks.length === 0 && Array.isArray(sop?.sop?.blocks)) {
for (const block of sop.sop.blocks) {
if (!block || typeof block !== "object") {
continue;
}
const text = (block as { text?: string }).text;
if (typeof text === "string" && text.trim()) {
chunks.push(text.trim());
}
}
}
const joined = chunks.join("\n");
if (joined.length <= maxChars) {
return joined;
}
return joined.slice(0, maxChars);
}
function extractClassificationPolicy(sop: SopSnapshot | undefined, maxChars = 2_200): string {
const lines: string[] = [];
const sections = Array.isArray(sop?.sop?.sections) ? sop.sop.sections : [];
for (const section of sections) {
if (!section || typeof section !== "object") {
continue;
}
const heading = typeof section.heading === "string" ? section.heading.trim() : "";
const headingLower = heading.toLowerCase();
const relevantHeading =
headingLower.includes("classif") ||
headingLower.includes("lead") ||
headingLower.includes("qualif") ||
headingLower.includes("inbound") ||
headingLower.includes("routing");
if (!relevantHeading) {
continue;
}
if (heading) {
lines.push(`# ${heading}`);
}
const items = Array.isArray(section.items) ? section.items : [];
for (const item of items) {
if (typeof item === "string" && item.trim()) {
lines.push(`- ${item.trim()}`);
}
}
}
if (lines.length === 0 && Array.isArray(sop?.sop?.blocks)) {
for (const block of sop.sop.blocks) {
if (!block || typeof block !== "object") {
continue;
}
const text = typeof block.text === "string" ? block.text.trim() : "";
if (!text) {
continue;
}
const lower = text.toLowerCase();
if (
lower.includes("lead") ||
lower.includes("consult") ||
lower.includes("sponsor") ||
lower.includes("partnership") ||
lower.includes("classif")
) {
lines.push(`- ${text}`);
}
}
}
const policy = lines.join("\n").trim();
if (!policy) {
return "";
}
return policy.length <= maxChars ? policy : policy.slice(0, maxChars);
}
function extractOpenAIText(payload: unknown): string | undefined {
const record = getRecord(payload);
if (!record) {
return undefined;
}
const outputText = record.output_text;
if (typeof outputText === "string" && outputText.trim()) {
return outputText.trim();
}
const output = Array.isArray(record.output) ? record.output : [];
for (const item of output) {
const itemRecord = getRecord(item);
if (!itemRecord) {
continue;
}
const content = Array.isArray(itemRecord.content) ? itemRecord.content : [];
for (const part of content) {
const partRecord = getRecord(part);
if (!partRecord) {
continue;
}
const text = getString(partRecord, ["text", "output_text"]);
if (text) {
return text;
}
}
}
return undefined;
}
function parseFirstJsonObject(text: string): Record<string, unknown> | undefined {
const direct = text.trim();
try {
const parsed = JSON.parse(direct) as unknown;
return getRecord(parsed);
} catch { }
const match = direct.match(/\{[\s\S]*\}/);
if (!match) {
return undefined;
}
try {
const parsed = JSON.parse(match[0]) as unknown;
return getRecord(parsed);
} catch {
return undefined;
}
}
async function callOpenAIJson(args: {
apiKey: string;
model: string;
systemPrompt: string;
userPrompt: string;
}): Promise<Record<string, unknown> | undefined> {
const response = await fetch("https://api.openai.com/v1/responses", {
method: "POST",
headers: {
Authorization: `Bearer ${args.apiKey}`,
"Content-Type": "application/json; charset=utf-8",
},
body: JSON.stringify({
model: args.model,
input: [
{ role: "system", content: [{ type: "input_text", text: args.systemPrompt }] },
{ role: "user", content: [{ type: "input_text", text: args.userPrompt }] },
],
}),
});
const text = await response.text();
if (!response.ok) {
throw new Error(`OpenAI responses error (${response.status}): ${text}`);
}
const payload = text.trim() ? (JSON.parse(text) as unknown) : undefined;
const llmText = payload ? extractOpenAIText(payload) : undefined;
if (!llmText) {
return undefined;
}
return parseFirstJsonObject(llmText);
}
function classifyInboundHeuristic(message: PollMessage): ClassificationResult {
const text = buildInboundText(message);
const reasons: string[] = [];
const hardIgnore = detectHardIgnore(message);
if (hardIgnore.matched) {
return { label: "ignore", confidence: 0.96, reasons: hardIgnore.reasons.slice(0, 4) };
}
const senderEmail = extractEmailAddress(message.from);
const senderDomain = extractEmailDomain(senderEmail);
const lead = detectExplicitBusinessLead(message);
if (lead.matched) {
reasons.push(...lead.reasons, "matched-explicit-business-lead");
return { label: "sales", confidence: 0.94, reasons: Array.from(new Set(reasons)).slice(0, 4) };
}
const senderLocal = senderEmail?.split("@")[0]?.trim().toLowerCase() || "";
const looksAutomated =
includesAny(senderLocal, AUTOMATED_SENDER_SIGNALS) ||
includesAny(text, AUTOMATED_TEXT_SIGNALS) ||
includesAny(text, AUTOMATED_SENDER_SIGNALS);
const looksHiring = includesAny(text, HIRING_SIGNALS);
const fromJobNetwork = domainInList(senderDomain, [
"linkedin.com",
"indeed.com",
"glassdoor.com",
"ziprecruiter.com",
"monster.com",
]);
const fromVendorSystem = domainInList(senderDomain, VENDOR_SYSTEM_DOMAINS);
const likelyNonHumanSender = looksAutomated || fromJobNetwork || fromVendorSystem;
if (likelyNonHumanSender || looksHiring) {
reasons.push("non-business-automation-filter");
return { label: "ignore", confidence: 0.94, reasons };
}
const receiptScore = countSignals(text, RECEIPT_SIGNALS);
const salesScore = countSignals(text, LEAD_INTENT_SIGNALS) + countSignals(text, LEAD_DIRECT_ASK_SIGNALS);
const supportScore = countSignals(text, SUPPORT_SIGNALS);
const ignoreScore = countSignals(text, AUTOMATED_TEXT_SIGNALS);
if (receiptScore > 0 && receiptScore >= salesScore) {
reasons.push("matched-receipt-signals");
return { label: "receipt", confidence: Math.min(0.65 + receiptScore * 0.08, 0.96), reasons };
}
if (salesScore > 0) {
reasons.push("sales-signals-without-explicit-lead");
return { label: "ignore", confidence: 0.88, reasons };
}
if (supportScore > 0) {
reasons.push("matched-support-signals");
return { label: "support", confidence: Math.min(0.58 + supportScore * 0.08, 0.9), reasons };
}
if (ignoreScore > 0) {
reasons.push("matched-ignore-signals");
return { label: "ignore", confidence: Math.min(0.6 + ignoreScore * 0.08, 0.92), reasons };
}
reasons.push("no-strong-signal");
return { label: "ignore", confidence: 0.52, reasons };
}
async function classifyInbound(args: {
message: PollMessage;
apiKey?: string;
model: string;
sop?: SopSnapshot;
}): Promise<ClassificationResult> {
const deterministicLead = detectExplicitBusinessLead(args.message);
if (deterministicLead.matched) {
return {
label: "sales",
confidence: 0.95,
reasons: Array.from(new Set(["rule-explicit-business-lead", ...deterministicLead.reasons])).slice(
0,
4,
),
};
}
const hardIgnore = detectHardIgnore(args.message);
if (hardIgnore.matched) {
return {
label: "ignore",
confidence: 0.96,
reasons: Array.from(new Set(["rule-hard-ignore", ...hardIgnore.reasons])).slice(0, 4),
};
}
if (!args.apiKey) {
return classifyInboundHeuristic(args.message);
}
try {
const classificationPolicy = extractClassificationPolicy(args.sop);
const systemPrompt = [
"You classify inbound email for a business owner.",
'Return strict JSON only: {"label":"receipt|sales|support|ignore","confidence":number,"reasons":[string]}',
"Rules:",
"- sales: inbound person asking for consulting, sponsorship, advisory, project inquiry, partnership, expert network opportunity, affiliate/creator collaboration, or a paid expert consultation.",
"- receipt: billing, invoice, payment confirmation.",
"- support: user issue/help request.",
"- ignore: newsletters, job alerts, vendor/system updates, social updates, hiring spam.",
"- Treat expert-network outreach (for example AlphaSights/Guidepoint/GLG/Third Bridge style requests) as sales when it asks for expertise/call/payment.",
"- Treat creator partnership/sponsorship outreach as sales when sender asks for call/brief/interest.",
"- Do not require the exact word 'consulting' if business intent is clear.",
"- If uncertain between sales and ignore, prefer sales only when sender appears human and there is explicit business ask.",
"- If message looks like newsletter/digest/blast (for example includes view-in-browser, unsubscribe/manage-preferences, top-stories roundup, or promotional Gmail categories), classify as ignore even with CTA links.",
classificationPolicy ? `Notion SOP classification policy:\n${classificationPolicy}` : "",
].join("\n");
const userPrompt = JSON.stringify(
{
mailbox: args.message.account_email,
from: args.message.from,
subject: args.message.subject,
snippet: args.message.snippet,
body_text: args.message.body_text,
gmail_labels: extractGmailLabels(args.message),
},
null,
2,
);
const parsed = await callOpenAIJson({
apiKey: args.apiKey,
model: args.model,
systemPrompt,
userPrompt,
});
const labelRaw = typeof parsed?.label === "string" ? parsed.label.toLowerCase().trim() : "";
const normalizedLabel = labelRaw.replace(/[^a-z]/g, "");
const label =
normalizedLabel === "receipt" ||
normalizedLabel === "sales" ||
normalizedLabel === "support" ||
normalizedLabel === "ignore"
? normalizedLabel
: undefined;
if (!label) {
return classifyInboundHeuristic(args.message);
}
const confidenceRaw = parsed?.confidence;
const confidence =
typeof confidenceRaw === "number" && Number.isFinite(confidenceRaw)
? Math.max(0, Math.min(1, confidenceRaw))
: 0.65;
const reasonsRaw = parsed?.reasons;
const reasons = Array.isArray(reasonsRaw)
? reasonsRaw.filter((item): item is string => typeof item === "string").slice(0, 4)
: [];
if (label === "sales") {
return {
label: "ignore",
confidence: Math.max(0.9, confidence),
reasons: Array.from(
new Set([
"blocked-model-sales-without-explicit-lead",
`llm-model:${args.model}`,
...reasons,
]),
).slice(0, 4),
};
}
const postHardIgnore = detectHardIgnore(args.message);
if (postHardIgnore.matched) {
return {
label: "ignore",
confidence: Math.max(0.9, confidence),
reasons: Array.from(
new Set([
"override-hard-ignore",
`llm-model:${args.model}`,
...postHardIgnore.reasons,
...reasons,
]),
).slice(0, 4),
};
}
if (label !== "sales") {
const leadOverride = detectExplicitBusinessLead(args.message);
if (leadOverride.matched) {
return {
label: "sales",
confidence: Math.max(0.9, confidence),
reasons: Array.from(
new Set([
"override-explicit-business-lead",
`llm-model:${args.model}`,
...leadOverride.reasons,
...reasons,
]),
).slice(0, 4),
};
}
}
const taggedReasons = Array.from(new Set([`llm-model:${args.model}`, ...reasons])).slice(0, 4);
return {
label,
confidence,
reasons: taggedReasons,
};
} catch {
return classifyInboundHeuristic(args.message);
}
}
function pickSopCues(sop: SopSnapshot | undefined): string[] {
const lines: string[] = [];
const sections = sop?.sop?.sections;
if (Array.isArray(sections)) {
for (const section of sections) {
if (!section || typeof section !== "object") {
continue;
}
const heading = typeof section.heading === "string" ? section.heading.trim() : "";
if (heading) {
lines.push(heading);
}
const items = Array.isArray(section.items) ? section.items : [];
for (const item of items) {
if (typeof item === "string" && item.trim()) {
lines.push(item.trim());
}
}
}
}
if (lines.length === 0 && Array.isArray(sop?.sop?.blocks)) {
for (const block of sop.sop.blocks) {
if (!block || typeof block !== "object") {
continue;
}
const text = (block as { text?: string }).text;
if (typeof text === "string" && text.trim()) {
lines.push(text.trim());
}
}
}
const useful = lines.filter((line) => {
const lower = line.toLowerCase();
return (
lower.includes("qualif") ||
lower.includes("response") ||
lower.includes("lead") ||
lower.includes("sponsor") ||
lower.includes("timeline") ||
lower.includes("pricing")
);
});
return Array.from(new Set(useful)).slice(0, 6);
}
function firstNameFromDisplay(display: string | undefined): string {
if (!display) {
return "there";
}
const cleanDisplay = display.replace(/[^A-Za-z\s-]/g, " ").trim();
if (!cleanDisplay) {
return "there";
}
return cleanDisplay.split(/\s+/)[0] || "there";
}
function buildSalesDraftFallback(args: {
senderDisplayName?: string;
senderEmail?: string;
subject?: string;
snippet?: string;
sopCues: string[];
}): { subject: string; body: string } {
const firstName = firstNameFromDisplay(args.senderDisplayName);
const intentSnippet = args.snippet?.trim() || "Thanks for reaching out.";
const subject = args.subject?.toLowerCase().startsWith("re:")
? args.subject
: `Re: ${args.subject ?? "Inquiry"}`;
const body = [
`Hi ${firstName},`,
"",
"Thanks for your message.",
intentSnippet,
"",
"To make this actionable, could you share:",
"1) Your primary objective",
"2) Timeline",
"3) Budget range or decision criteria",
"",
"Once we have those details, I can send a concrete recommendation and next steps.",
"",
"Best,",
"[Your Business Name]",
].join("\n");
return { subject, body };
}
async function buildSalesDraft(args: {
apiKey?: string;
model: string;
senderDisplayName?: string;
senderEmail?: string;
subject?: string;
snippet?: string;
mailbox: string;
sop: SopSnapshot | undefined;
sopCues: string[];
}): Promise<{ subject: string; body: string }> {
if (!args.apiKey) {
return buildSalesDraftFallback(args);
}
const sopGuidance = extractSopGuidance(args.sop);
try {
const systemPrompt = [
"You write friendly business email replies.",
"Use the SOP guidance exactly for tone and structure.",
'Return strict JSON only: {"subject":string,"body":string}.',
"Keep it concise, clear, and professional.",
"Do not invent facts.",
].join("\n");
const userPrompt = JSON.stringify(
{
mailbox: args.mailbox,
from_name: args.senderDisplayName,
from_email: args.senderEmail,
inbound_subject: args.subject,
inbound_message: args.snippet,
sop_guidance: sopGuidance,
sop_cues: args.sopCues,
},
null,
2,
);
const parsed = await callOpenAIJson({
apiKey: args.apiKey,
model: args.model,
systemPrompt,
userPrompt,
});
const subject = getString(parsed ?? {}, ["subject"]);
const body = getString(parsed ?? {}, ["body"]);
if (subject && body) {
return { subject, body };
}
} catch { }
return buildSalesDraftFallback(args);
}
function parseReceiptInfo(message: PollMessage): {
vendor?: string;
amount?: number;
currency?: string;
receipt_date?: string;
} {
const fromEmail = extractEmailAddress(message.from);
const vendor = fromEmail?.split("@")[0] || extractDisplayName(message.from);
const text = `${message.subject ?? ""} ${message.snippet ?? ""} ${message.body_text ?? ""}`;
let amount: number | undefined;
let currency: string | undefined;
const symbolMatch = text.match(/([$€£])\s*([0-9][0-9,]*(?:\.[0-9]{2})?)/);
if (symbolMatch?.[2]) {
amount = Number.parseFloat(symbolMatch[2].replace(/,/g, ""));
const symbol = symbolMatch[1];
currency = symbol === "$" ? "USD" : symbol === "€" ? "EUR" : symbol === "£" ? "GBP" : undefined;
} else {
const codeMatch = text.match(/\b(USD|EUR|GBP|NGN|CAD|AUD)\s*([0-9][0-9,]*(?:\.[0-9]{2})?)/i);
if (codeMatch?.[2]) {
amount = Number.parseFloat(codeMatch[2].replace(/,/g, ""));
currency = codeMatch[1].toUpperCase();
}
}
const receiptDate =
message.received_at ||
(message.internal_ts ? new Date(message.internal_ts).toISOString() : undefined);
return {
vendor,
amount: Number.isFinite(amount ?? Number.NaN) ? amount : undefined,
currency,
receipt_date: receiptDate,
};
}
function formatSlackWhen(isoDate: string | undefined): string {
if (!isoDate) {
return "n/a";
}
const ms = Date.parse(isoDate);
if (!Number.isFinite(ms)) {
return isoDate;
}
return `<!date^${Math.floor(ms / 1000)}^{date_short_pretty} {time}|${isoDate}>`;
}
function buildDraftSlackMessage(args: {
accountEmail: string;
subject: string;
receivedAt?: string;
inboundMessage?: string;
suggestedResponse: string;
}): SlackMessage {
const when = formatSlackWhen(args.receivedAt);
const inboundMessage = (args.inboundMessage || "").trim() || "(no message snippet)";
const suggested = args.suggestedResponse.trim();
const text = [
"CRM inbound lead",
`Mailbox: ${args.accountEmail}`,
`Subject: ${args.subject}`,
`When: ${when}`,
"",
"Message received:",
inboundMessage,
"",
"Suggested response:",
suggested,
].join("\n");
const blocks: SlackBlock[] = [
{
type: "header",
text: {
type: "plain_text",
text: "CRM Inbound Lead",
},
},
{
type: "section",
fields: [
{ type: "mrkdwn", text: `*Mailbox:*\n${args.accountEmail}` },
{ type: "mrkdwn", text: `*When:*\n${when}` },
{ type: "mrkdwn", text: `*Subject:*\n${args.subject}` },
],
},
{
type: "section",
text: {
type: "mrkdwn",
text: `*Message Received*\n${inboundMessage}`,
},
},
{
type: "section",
text: {
type: "mrkdwn",
text: `*Suggested Response*\n\`\`\`${suggested}\`\`\``,
},
},
];
return { text, blocks };
}
async function ensureGmailLabel(account: string, labelName: string): Promise<void> {
const cacheKey = `${account}:${labelName.toLowerCase()}`;
if (ensuredLabelCache.has(cacheKey)) {
return;
}
try {
await execFileAsync(
"gog",
["gmail", "labels", "get", labelName, "--account", account, "--json", "--no-input"],
{
maxBuffer: 4 * 1024 * 1024,
},
);
ensuredLabelCache.add(cacheKey);
return;
} catch { }
await execFileAsync(
"gog",
["gmail", "labels", "create", labelName, "--account", account, "--json", "--no-input"],
{
maxBuffer: 4 * 1024 * 1024,
},
);
ensuredLabelCache.add(cacheKey);
}
async function applyLeadLabel(args: {
account: string;
threadId?: string;
labelName: string;
}): Promise<{ applied: boolean; error?: string }> {
if (!args.threadId) {
return { applied: false, error: "missing-thread-id-for-label" };
}
try {
await ensureGmailLabel(args.account, args.labelName);
await execFileAsync(
"gog",
[
"gmail",
"labels",
"modify",
args.threadId,
"--add",
args.labelName,
"--account",
args.account,
"--json",
"--no-input",
],
{
maxBuffer: 4 * 1024 * 1024,
},
);
return { applied: true };
} catch (error) {
const message = error instanceof Error ? error.message : "unknown-gmail-label-error";
return { applied: false, error: message };
}
}
async function maybePostSlack(message: SlackMessage): Promise<{ posted: boolean; error?: string }> {
const token = clean(process.env.SLACK_BOT_TOKEN);
const channel =
clean(process.env.CRM_SLACK_CHANNEL_ID) ||
clean(process.env.SLACK_CHANNEL_ID) ||
clean(process.env.CRM_SLACK_CHANNEL);
if (!token || !channel) {
return { posted: false, error: "CRM_SLACK_CHANNEL_ID or SLACK_BOT_TOKEN missing" };
}
const response = await fetch("https://slack.com/api/chat.postMessage", {
method: "POST",
headers: {
Authorization: `Bearer ${token}`,
"Content-Type": "application/json; charset=utf-8",
},
body: JSON.stringify({
channel,
text: message.text,
...(Array.isArray(message.blocks) && message.blocks.length > 0
? { blocks: message.blocks }
: {}),
unfurl_links: false,
unfurl_media: false,
}),
});
const data = (await response.json()) as Record<string, unknown>;
if (response.ok && data.ok === true) {
return { posted: true };
}
const error = typeof data.error === "string" ? data.error : `slack-error-${response.status}`;
return { posted: false, error };
}
async function loadSopSnapshot(pathOverride?: string): Promise<SopSnapshot | undefined> {
const sopFile = pathOverride || clean(process.env.CRM_SOP_CACHE_FILE) || DEFAULT_SOP_CACHE_FILE;
try {
return await readJsonFile<SopSnapshot>(sopFile);
} catch {
return undefined;
}
}
async function main() {
const { command, flags } = parseArgs(process.argv);
if (command !== "process_inbound") {
console.error(
"Usage: bun process-inbound.ts process_inbound --poll-file <path> [--sop-file <path>] [--output <path>]",
);
process.exit(1);
}
const pollFile = clean(asString(flags["poll-file"]));
if (!pollFile) {
throw new Error("--poll-file is required");
}
const outputFile = clean(asString(flags.output)) || DEFAULT_OUTPUT_FILE;
const sopFile = clean(asString(flags["sop-file"]));
const supabaseUrl = clean(process.env.SUPABASE_URL);
const supabaseKey = clean(process.env.SUPABASE_SECRET_KEY);
if (!supabaseUrl || !supabaseKey) {
throw new Error("SUPABASE_URL and SUPABASE_SECRET_KEY are required");
}
const contactsTable = clean(process.env.CRM_CONTACTS_TABLE) || DEFAULT_CONTACTS_TABLE;
const activitiesTable = clean(process.env.CRM_ACTIVITIES_TABLE) || DEFAULT_ACTIVITIES_TABLE;
const draftsTable = clean(process.env.CRM_DRAFTS_TABLE) || DEFAULT_DRAFTS_TABLE;
const accountingTable = clean(process.env.CRM_ACCOUNTING_TABLE) || DEFAULT_ACCOUNTING_TABLE;
const jobRunsTable = clean(process.env.CRM_JOB_RUNS_TABLE) || DEFAULT_JOB_RUNS_TABLE;
const pollStateTable = clean(process.env.CRM_POLL_STATE_TABLE) || DEFAULT_POLL_STATE_TABLE;
const startedAt = new Date().toISOString();
const poll = await readJsonFile<PollFile>(pollFile);
const sop = await loadSopSnapshot(sopFile);
const sopCues = pickSopCues(sop);
const openAIApiKey = clean(process.env.OPENAI_API_KEY);
const classifierModel =
clean(process.env.CRM_CLASSIFIER_MODEL) ||
clean(process.env.OPENCLAW_CRM_CLASSIFIER_MODEL) ||
DEFAULT_CLASSIFIER_MODEL;
const replyModel =
clean(process.env.CRM_REPLY_MODEL) ||
clean(process.env.OPENCLAW_CRM_REPLY_MODEL) ||
DEFAULT_REPLY_MODEL;
const useModelClassification = getBool(
clean(process.env.CRM_USE_MODEL_CLASSIFIER) ||
clean(process.env.OPENCLAW_CRM_USE_MODEL_CLASSIFIER),
true,
);
const useModelReplyWriter = getBool(
clean(process.env.CRM_USE_MODEL_REPLY_WRITER) ||
clean(process.env.OPENCLAW_CRM_USE_MODEL_REPLY_WRITER),
true,
);
const applyLeadLabels = getBool(clean(process.env.CRM_GMAIL_LABEL_APPLY), true);
const leadLabelName = clean(process.env.CRM_GMAIL_LABEL_LEAD) || DEFAULT_GMAIL_LEAD_LABEL;
const runId = poll.run_id || randomUUID();
await supabaseUpsertRow(
{
supabaseUrl,
serviceKey: supabaseKey,
table: jobRunsTable,
onConflict: "id",
},
{
id: runId,
started_at: poll.started_at || startedAt,
status: "running",
degraded: sop?.degraded === true,
poll_partial_failure: poll.partial_failure === true,
metrics: {
polled_messages: poll.messages.length,
},
accounts: poll.per_account ?? [],
updated_at: new Date().toISOString(),
},
);
const result: ProcessResult = {
command: "process_inbound",
run_id: runId,
started_at: startedAt,
finished_at: "",
status: "ok",
degraded: sop?.degraded === true,
totals: {
polled_messages: poll.messages.length,
processed_messages: 0,
activities_upserted: 0,
drafts_upserted: 0,
accounting_entries_upserted: 0,
},
classification_counts: {
receipt: 0,
sales: 0,
support: 0,
ignore: 0,
},
sales_drafts: [],
accounting_entries: [],
poll_state_updates: [],
warnings: [],
};
if (sop?.degraded) {
result.warnings.push(...(sop.warnings ?? []));
}
if (!sop) {
result.warnings.push("No SOP snapshot found; continuing with default routing behavior.");
}
const maxTsByAccount = new Map<string, string>();
for (const message of poll.messages) {
const classification = await classifyInbound({
message,
apiKey: useModelClassification ? openAIApiKey : undefined,
model: classifierModel,
sop,
});
result.classification_counts[classification.label] += 1;
const inboundMessage = summarizeInboundMessage(message);
const senderEmail = extractEmailAddress(message.from);
const senderName = extractDisplayName(message.from);
const messageTs =
message.received_at ||
(message.internal_ts ? new Date(message.internal_ts).toISOString() : undefined);
if (messageTs) {
const prior = maxTsByAccount.get(message.account_email);
if (!prior || Date.parse(messageTs) > Date.parse(prior)) {
maxTsByAccount.set(message.account_email, messageTs);
}
}
let contactId: string | undefined;
if (senderEmail && (classification.label === "sales" || classification.label === "support")) {
const contact = await supabaseUpsertRow(
{
supabaseUrl,
serviceKey: supabaseKey,
table: contactsTable,
onConflict: "email",
},
{
email: senderEmail,
display_name: senderName,
last_seen_at: messageTs || new Date().toISOString(),
source_account_email: message.account_email,
updated_at: new Date().toISOString(),
},
);
contactId = typeof contact.id === "string" ? contact.id : undefined;
}
const activityPayload: Record<string, unknown> = {
source_key: message.source_key,
account_email: message.account_email,
message_id: message.message_id,
thread_id: message.thread_id,
from_raw: message.from,
from_email: senderEmail,
from_name: senderName,
subject: message.subject,
snippet: inboundMessage,
received_at: messageTs,
classification: classification.label,
classification_confidence: classification.confidence,
classification_reasons: classification.reasons,
contact_id: contactId,
contact_email: senderEmail,
sop_hash: sop?.sop?.hash,
payload: message.raw ?? {},
updated_at: new Date().toISOString(),
};
const activity = await supabaseUpsertRow(
{
supabaseUrl,
serviceKey: supabaseKey,
table: activitiesTable,
onConflict: "source_key",
},
activityPayload,
);
const activityId = typeof activity.id === "string" ? activity.id : undefined;
if (!activityId) {
throw new Error(`Missing activity id after upsert for source_key=${message.source_key}`);
}
result.totals.activities_upserted += 1;
if (classification.label === "sales") {
const draft = await buildSalesDraft({
apiKey: useModelReplyWriter ? openAIApiKey : undefined,
model: replyModel,
senderDisplayName: senderName,
senderEmail,
subject: message.subject,
snippet: inboundMessage,
mailbox: message.account_email,
sop,
sopCues,
});
const toEmail = senderEmail || "unknown@example.com";
const draftRow = await supabaseUpsertRow(
{
supabaseUrl,
serviceKey: supabaseKey,
table: draftsTable,
onConflict: "activity_id",
},
{
activity_id: activityId,
account_email: message.account_email,
to_email: toEmail,
subject: draft.subject,
body: draft.body,
status: "draft",
approval_commands: "Handle approval/revisions in Slack thread",
reply_to_message_id: message.message_id,
sop_hash: sop?.sop?.hash,
updated_at: new Date().toISOString(),
},
);
const draftId = typeof draftRow.id === "string" ? draftRow.id : undefined;
if (!draftId) {
throw new Error(`Missing draft id after upsert for activity_id=${activityId}`);
}
const slackMessage = buildDraftSlackMessage({
accountEmail: message.account_email,
subject: draft.subject,
receivedAt: messageTs,
inboundMessage,
suggestedResponse: draft.body,
});
await supabasePatchRows(
{
supabaseUrl,
serviceKey: supabaseKey,
table: draftsTable,
filters: { id: draftId },
},
{
slack_summary: slackMessage.text,
updated_at: new Date().toISOString(),
},
);
const slack = await maybePostSlack(slackMessage);
result.sales_drafts.push({
draft_id: draftId,
activity_id: activityId,
account_email: message.account_email,
to_email: toEmail,
slack_posted: slack.posted,
slack_error: slack.error,
});
if (applyLeadLabels) {
const labelResult = await applyLeadLabel({
account: message.account_email,
threadId: message.thread_id,
labelName: leadLabelName,
});
if (!labelResult.applied && labelResult.error) {
result.warnings.push(
`Lead label apply failed for ${message.account_email}:${message.message_id} (${labelResult.error})`,
);
}
}
result.totals.drafts_upserted += 1;
}
if (classification.label === "receipt") {
const parsed = parseReceiptInfo(message);
await supabaseUpsertRow(
{
supabaseUrl,
serviceKey: supabaseKey,
table: accountingTable,
onConflict: "source_key",
},
{
source_key: message.source_key,
activity_id: activityId,
account_email: message.account_email,
vendor: parsed.vendor,
amount: parsed.amount,
currency: parsed.currency,
receipt_date: parsed.receipt_date,
subject: message.subject,
snippet: inboundMessage,
payload: message.raw ?? {},
updated_at: new Date().toISOString(),
},
);
result.accounting_entries.push({
activity_id: activityId,
source_key: message.source_key,
vendor: parsed.vendor,
amount: parsed.amount,
currency: parsed.currency,
});
result.totals.accounting_entries_upserted += 1;
}
if (classification.label !== "sales") {
await supabasePatchRows(
{
supabaseUrl,
serviceKey: supabaseKey,
table: draftsTable,
filters: {
activity_id: activityId,
status: "draft",
},
},
{
status: "rejected",
rejected_reason: `Auto-closed after reclassification to ${classification.label}`,
rejected_at: new Date().toISOString(),
updated_at: new Date().toISOString(),
},
);
}
result.totals.processed_messages += 1;
}
const accountSet = new Set<string>();
for (const message of poll.messages) {
accountSet.add(message.account_email);
}
for (const entry of poll.per_account ?? []) {
if (entry.account_email) {
accountSet.add(entry.account_email);
}
}
for (const accountEmail of accountSet) {
const stateRow = {
account_email: accountEmail,
last_polled_at: new Date().toISOString(),
last_message_ts: maxTsByAccount.get(accountEmail),
updated_at: new Date().toISOString(),
};
await supabaseUpsertRow(
{
supabaseUrl,
serviceKey: supabaseKey,
table: pollStateTable,
onConflict: "account_email",
},
stateRow,
);
result.poll_state_updates.push({
account_email: accountEmail,
last_polled_at: stateRow.last_polled_at,
last_message_ts: stateRow.last_message_ts,
});
}
if (poll.partial_failure || result.sales_drafts.some((entry) => !entry.slack_posted)) {
result.status = "partial_failure";
}
result.finished_at = new Date().toISOString();
await supabasePatchRows(
{
supabaseUrl,
serviceKey: supabaseKey,
table: jobRunsTable,
filters: { id: runId },
},
{
finished_at: result.finished_at,
status: result.status,
degraded: result.degraded,
metrics: {
polled_messages: result.totals.polled_messages,
processed_messages: result.totals.processed_messages,
activities_upserted: result.totals.activities_upserted,
drafts_upserted: result.totals.drafts_upserted,
accounting_entries_upserted: result.totals.accounting_entries_upserted,
},
warnings: result.warnings,
updated_at: new Date().toISOString(),
},
);
await writeJson(outputFile, result);
console.log(JSON.stringify(result, null, 2));
}
await main();