import { UPLOAD_BATCH_LIMIT, refreshPipelineById } from "@/hooks/pipeline";
import {
  POLLING_INTERVAL,
  processPromisesBatch,
} from "@llamacloud/shared/utils";
import type {
  ManagedIngestionStatusResponse,
  PipelineFile,
  ReadFileContentApiV1FilesIdContentGetResponse,
} from "@llamaindex/cloud/api";
import {
  addFilesToPipelineApiV1PipelinesPipelineIdFilesPut,
  getFileApiV1FilesIdGet,
  getPipelineFileStatusApiV1PipelinesPipelineIdFilesFileIdStatusGet,
  listPipelineFiles2ApiV1PipelinesPipelineIdFiles2Get,
  readFileContentApiV1FilesIdContentGet,
  uploadFileApiV1FilesPost,
} from "@llamaindex/cloud/api";
import type {
  QueryClient,
  UseSuspenseQueryOptions,
} from "@tanstack/react-query";
import {
  useMutation,
  useQueryClient,
  useSuspenseQuery,
} from "@tanstack/react-query";

const FILE_QUERY_KEY = "rawFiles";
const DATA_SOURCE_QUERY_KEY = "dataSources";
const FILE_CONTENT_QUERY_KEY = "fileContent";

export async function refreshRawFiles(
  queryClient: QueryClient,
  pipelineId: string,
) {
  await queryClient.invalidateQueries({
    queryKey: [FILE_QUERY_KEY, pipelineId],
  });
}

export const usePipelineFileStatus = ({
  pipelineId,
  fileId,
}: {
  pipelineId: string;
  fileId: string;
}) => {
  return useSuspenseQuery({
    queryKey: [FILE_QUERY_KEY, pipelineId, fileId],
    queryFn: async () => {
      const { data } =
        await getPipelineFileStatusApiV1PipelinesPipelineIdFilesFileIdStatusGet(
          {
            path: {
              pipeline_id: pipelineId,
              file_id: fileId,
            },
            throwOnError: true,
          },
        );
      return data;
    },
    refetchInterval: (query) => {
      const latestExecution = query.state.data;

      if (latestExecution?.status === "IN_PROGRESS") {
        return POLLING_INTERVAL;
      }

      return 5000; // we need refetch these in case an upstream job creates a file level job while status is not in progress
    },
  });
};

export async function getPipelineFilesStatuses(
  pipelineId: string,
  limit?: number,
  offset?: number,
) {
  const {
    data: { files },
  } = await listPipelineFiles2ApiV1PipelinesPipelineIdFiles2Get({
    path: {
      pipeline_id: pipelineId,
    },
    query: {
      limit,
      offset,
    },
    throwOnError: true,
  });

  const statusPromises = await processPromisesBatch(files, 150, (file) =>
    getPipelineFileStatusApiV1PipelinesPipelineIdFilesFileIdStatusGet({
      path: {
        pipeline_id: pipelineId,
        file_id: file.file_id!,
      },
      throwOnError: false,
    }),
  );

  return files
    .filter(
      (_, index) =>
        !(statusPromises[index] && "error" in statusPromises[index]),
    )
    .map((file, index) => ({
      file,
      status: statusPromises[index]!.data!,
    }));
}

export const usePipelineFilesStatuses = ({
  pipelineId,
  limit,
  offset,
}: {
  pipelineId: string;
  limit?: number;
  offset?: number;
}) => {
  return useSuspenseQuery<
    Array<{ file: PipelineFile; status: ManagedIngestionStatusResponse }>
  >({
    queryKey: [FILE_QUERY_KEY, pipelineId, "statuses", { limit, offset }],
    queryFn: async () => {
      return getPipelineFilesStatuses(pipelineId, limit, offset);
    },
    refetchInterval: (query) => {
      const latestExecution = query.state.data;

      const hasPendingOrError = latestExecution?.some(
        (file) =>
          file.status?.status === "IN_PROGRESS" ||
          file.status?.status === "ERROR" ||
          file.status?.status === "PARTIAL_SUCCESS",
      );

      const TWENTY_SECONDS_IN_MILLISECONDS = 20000;

      if (hasPendingOrError) {
        return TWENTY_SECONDS_IN_MILLISECONDS;
      }

      return false;
    },
  });
};

// this api will return all raw files uploaded to the platform.
export function useRawFiles(
  pipelineId: string,
  limit?: number,
  offset?: number,
) {
  return useSuspenseQuery({
    queryKey: [FILE_QUERY_KEY, pipelineId, { limit, offset }],
    queryFn: async () => {
      const { data } =
        await listPipelineFiles2ApiV1PipelinesPipelineIdFiles2Get({
          path: {
            pipeline_id: pipelineId,
          },
          query: {
            limit,
            offset,
          },
          throwOnError: true,
        });
      return data;
    },
    refetchInterval: (query) => {
      const latestExecution = query.state.data?.files;
      if (latestExecution?.length === 0) {
        return 5000;
      }
      return false;
    },
  });
}

export function useFile(fileId: string, projectId?: string) {
  return useSuspenseQuery({
    queryKey: [DATA_SOURCE_QUERY_KEY, fileId, projectId],
    queryFn: async () => {
      const { data } = await getFileApiV1FilesIdGet({
        query: {
          project_id: projectId,
        },
        path: {
          id: fileId,
        },
        throwOnError: true,
      });
      return data;
    },
  });
}

export function useFileContent(
  params: { fileId: string; projectId?: string; expiresAtSeconds?: number },
  options?: Partial<
    UseSuspenseQueryOptions<
      unknown,
      unknown,
      ReadFileContentApiV1FilesIdContentGetResponse
    >
  >,
) {
  return useSuspenseQuery({
    queryKey: [DATA_SOURCE_QUERY_KEY, FILE_CONTENT_QUERY_KEY, params],
    queryFn: async () => {
      const { data } = await readFileContentApiV1FilesIdContentGet({
        query: {
          project_id: params.projectId,
          expires_at_seconds: params.expiresAtSeconds,
        },
        path: {
          id: params.fileId,
        },
        throwOnError: true,
      });
      return data;
    },
    ...options,
  });
}

export function useUploadFiles(projectId: string) {
  const queryClient = useQueryClient();
  return useMutation({
    mutationFn: async ({
      files,
      pipelineId,
    }: {
      pipelineId: string;
      files: File[];
    }) => {
      const fileIdsToUpload: string[] = [];

      await processPromisesBatch(
        files,
        UPLOAD_BATCH_LIMIT,
        async (file: File) => {
          const { data: uploadedFile } = await uploadFileApiV1FilesPost({
            query: {
              project_id: projectId,
            },
            body: {
              upload_file: file,
            },
            throwOnError: true,
          });
          fileIdsToUpload.push(uploadedFile.id);
        },
      );

      await addFilesToPipelineApiV1PipelinesPipelineIdFilesPut({
        path: {
          pipeline_id: pipelineId,
        },
        body: fileIdsToUpload.map((fileId) => ({
          file_id: fileId,
        })),
        throwOnError: true,
      });
    },
    onSettled: async (_, __, { pipelineId }) => {
      await refreshPipelineById(queryClient, pipelineId);
      await refreshRawFiles(queryClient, pipelineId);
    },
  });
}
