import { SQSClient, ReceiveMessageResult, ReceiveMessageCommand, DeleteMessageCommand } from "@aws-sdk/client-sqs";
import { empty, from, of, Observable } from "rxjs";
import { delay, flatMap, repeatWhen, tap } from "rxjs/operators";


function makeReceiveRequest(sqs: SQSClient, command: ReceiveMessageCommand): Observable<ReceiveMessageResult> {
    return new Observable<ReceiveMessageResult>(subscriber => {
        sqs.send(command, (err, data) => {
            if (err) {
                subscriber.error(err);
            } else {
                subscriber.next(data);
                subscriber.complete();
            }
        });
    });
}


function makeDeleteRequest(sqs: SQSClient, command: DeleteMessageCommand): void {
    sqs.send(command, () => {
        // don't care
    });
}


export function createQueueStream<T>(queueUrl: string): Observable<T> {
    const receiveCommand = new ReceiveMessageCommand({
        QueueUrl: queueUrl,
        MaxNumberOfMessages: 1,
        WaitTimeSeconds: 20
    });

    const sqs = new SQSClient({
        credentials: {
            accessKeyId: "AKIASKVBAY6HWMKXVAJX",
            secretAccessKey: "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"
        },
        region: "ca-central-1"
    });

    return makeReceiveRequest(sqs, receiveCommand)
        .pipe(
            flatMap(result => result.Messages
                ? from(result.Messages)
                : empty()
            ),
            tap(message => {
                if (message.ReceiptHandle) {
                    makeDeleteRequest(sqs, new DeleteMessageCommand({
                        QueueUrl: queueUrl,
                        ReceiptHandle: message.ReceiptHandle
                    }));
                }
            }),
            flatMap(message => message.Body
                ? of(JSON.parse(message.Body) as T)
                : empty()
            ),
            repeatWhen(completed => completed.pipe(delay(100)))
        );
}
