// ABOUTME: BullMQ worker for daily cleanup of old JobRun and Scrape records
// ABOUTME: Deletes successful JobRuns older than 30 days, orphaned Scrapes

import { Worker, Job } from 'bullmq';
import { PrismaClient } from '@prisma/client';
import { DateTime } from 'luxon';
import logger from '../utils/logger';
import { moveToDeadLetter } from '../utils/dead-letter';
import { connection, QUEUE_NAMES } from './queue';

// ============================================================================
// Types
// ============================================================================

interface CleanupJobData {
  /** Override retention days (defaults to 30 for JobRuns) */
  jobRunRetentionDays?: number;
  /** Override orphan retention days (defaults to 90 for Scrapes) */
  scrapeRetentionDays?: number;
}

interface CleanupJobResult {
  success: boolean;
  itemsProcessed: number;
  durationMs: number;
  errors: string[];
  metadata: {
    deletedJobRuns: number;
    deletedScrapes: number;
    keptFailedJobRuns: number;
    keptPartialJobRuns: number;
    cutoffDate: string;
  };
}

// ============================================================================
// Worker Factory
// ============================================================================

export function createCleanupWorker(prisma: PrismaClient): Worker {
  const worker = new Worker<CleanupJobData, CleanupJobResult>(
    QUEUE_NAMES.CLEANUP,
    async (job: Job<CleanupJobData>): Promise<CleanupJobResult> => {
      const startTime = Date.now();

      const jobRunRetentionDays = job.data.jobRunRetentionDays ?? 30;
      const scrapeRetentionDays = job.data.scrapeRetentionDays ?? 90;

      logger.info(
        {
          jobId: job.id,
          jobRunRetentionDays,
          scrapeRetentionDays,
        },
        'Cleanup worker: job started'
      );

      const result: CleanupJobResult = {
        success: true,
        itemsProcessed: 0,
        durationMs: 0,
        errors: [],
        metadata: {
          deletedJobRuns: 0,
          deletedScrapes: 0,
          keptFailedJobRuns: 0,
          keptPartialJobRuns: 0,
          cutoffDate: '',
        },
      };

      try {
        // Calculate cutoff dates
        const jobRunCutoff = DateTime.now()
          .toUTC()
          .minus({ days: jobRunRetentionDays })
          .toJSDate();

        const scrapeCutoff = DateTime.now()
          .toUTC()
          .minus({ days: scrapeRetentionDays })
          .toJSDate();

        result.metadata.cutoffDate = jobRunCutoff.toISOString();

        // --- Cleanup 1: Delete old successful JobRun records ---
        logger.info(
          {
            cutoffDate: jobRunCutoff.toISOString(),
            retentionDays: jobRunRetentionDays,
          },
          'Cleanup worker: deleting old successful JobRuns'
        );

        // Count kept records for reporting
        const keptFailed = await prisma.jobRun.count({
          where: {
            status: 'failed',
            startedAt: { lt: jobRunCutoff },
          },
        });

        const keptPartial = await prisma.jobRun.count({
          where: {
            status: 'partial',
            startedAt: { lt: jobRunCutoff },
          },
        });

        result.metadata.keptFailedJobRuns = keptFailed;
        result.metadata.keptPartialJobRuns = keptPartial;

        const jobRunDeleteResult = await prisma.jobRun.deleteMany({
          where: {
            status: 'success',
            startedAt: { lt: jobRunCutoff },
          },
        });

        result.metadata.deletedJobRuns = jobRunDeleteResult.count;
        result.itemsProcessed += jobRunDeleteResult.count;

        logger.info(
          {
            deletedSuccessful: jobRunDeleteResult.count,
            keptFailed,
            keptPartial,
          },
          'Cleanup worker: JobRun cleanup completed'
        );

        // --- Cleanup 2: Delete orphaned Scrape records ---
        logger.info(
          { cutoffDate: scrapeCutoff.toISOString(), retentionDays: scrapeRetentionDays },
          'Cleanup worker: deleting orphaned Scrape records'
        );

        // Delete Scrape records that are:
        // 1. Older than the retention period
        // 2. NOT associated with a job run (orphaned - jobRunId is null)
        // 3. Or the associated job run no longer exists
        const orphanScrapes = await prisma.scrape.deleteMany({
          where: {
            scrapedAt: { lt: scrapeCutoff },
            jobRunId: null,
          },
        });

        result.metadata.deletedScrapes = orphanScrapes.count;
        result.itemsProcessed += orphanScrapes.count;

        logger.info(
          {
            deletedOrphanScrapes: orphanScrapes.count,
          },
          'Cleanup worker: orphaned Scrape cleanup completed'
        );

        // --- Cleanup 3: Delete Scrape records referencing deleted job runs ---
        // Prisma doesn't support NOT EXISTS directly, so we do a two-step approach:
        // find valid job run IDs, then delete scrapes not referencing them.
        if (result.metadata.deletedJobRuns > 0) {
          // Only run this if we actually deleted some job runs
          try {
            const validJobRunIds = await prisma.jobRun.findMany({
              where: {
                startedAt: { lt: scrapeCutoff },
              },
              select: { id: true },
            });

            const validIdSet = new Set(validJobRunIds.map((jr) => jr.id));

            // Find scrapes with orphaned jobRunId references
            const scrapesWithOrphanedJobRuns = await prisma.scrape.findMany({
              where: {
                scrapedAt: { lt: scrapeCutoff },
                jobRunId: { not: null },
              },
              select: { id: true, jobRunId: true },
            });

            const orphanedScrapeIds = scrapesWithOrphanedJobRuns
              .filter((s) => s.jobRunId && !validIdSet.has(s.jobRunId))
              .map((s) => s.id);

            if (orphanedScrapeIds.length > 0) {
              // Delete in batches to avoid query size limits
              const BATCH_SIZE = 100;
              for (let i = 0; i < orphanedScrapeIds.length; i += BATCH_SIZE) {
                const batch = orphanedScrapeIds.slice(i, i + BATCH_SIZE);
                const batchResult = await prisma.scrape.deleteMany({
                  where: { id: { in: batch } },
                });
                result.metadata.deletedScrapes += batchResult.count;
                result.itemsProcessed += batchResult.count;
              }

              logger.info(
                {
                  deletedStaleScrapes: orphanedScrapeIds.length,
                },
                'Cleanup worker: removed Scrapes referencing deleted JobRuns'
              );
            }
          } catch (error) {
            const errorMessage =
              error instanceof Error ? error.message : 'Unknown error';
            result.errors.push(
              `Stale scrape cleanup (non-fatal): ${errorMessage}`
            );
            logger.error(
              { error: errorMessage },
              'Cleanup worker: failed to clean stale scrapes (non-fatal)'
            );
          }
        }

        result.durationMs = Date.now() - startTime;

        logger.info(
          {
            jobId: job.id,
            deletedJobRuns: result.metadata.deletedJobRuns,
            deletedScrapes: result.metadata.deletedScrapes,
            keptFailedJobRuns: result.metadata.keptFailedJobRuns,
            keptPartialJobRuns: result.metadata.keptPartialJobRuns,
            errors: result.errors.length,
            durationMs: result.durationMs,
          },
          'Cleanup worker: job completed'
        );

        return result;
      } catch (error) {
        const errorMessage =
          error instanceof Error ? error.message : 'Unknown error';

        logger.error(
          { jobId: job.id, error: errorMessage },
          'Cleanup worker: job failed'
        );

        result.success = false;
        result.durationMs = Date.now() - startTime;
        result.errors.push(`Fatal error: ${errorMessage}`);

        throw error;
      }
    },
    {
      connection,
      concurrency: 1,
      lockDuration: 120_000, // 2 minute lock (cleanup may take a while)
    }
  );

  worker.on('completed', (job: Job) => {
    logger.info(
      { jobId: job.id, queue: QUEUE_NAMES.CLEANUP },
      'Cleanup worker: job completed event'
    );
  });

  worker.on('failed', async (job: Job | undefined, error: Error) => {
    logger.error(
      {
        jobId: job?.id,
        queue: QUEUE_NAMES.CLEANUP,
        error: error.message,
        attemptsMade: job?.attemptsMade,
      },
      'Cleanup worker: job failed event'
    );

    // Move to dead letter queue after exhausting all retries
    if (job && job.attemptsMade >= (job.opts.attempts || 1)) {
      await moveToDeadLetter(job, error);
    }
  });

  logger.info('Cleanup worker created');
  return worker;
}
