var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
    function adopt(value) { return value instanceof P ? value : new P(function (resolve) { resolve(value); }); }
    return new (P || (P = Promise))(function (resolve, reject) {
        function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
        function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
        function step(result) { result.done ? resolve(result.value) : adopt(result.value).then(fulfilled, rejected); }
        step((generator = generator.apply(thisArg, _arguments || [])).next());
    });
};
var __rest = (this && this.__rest) || function (s, e) {
    var t = {};
    for (var p in s) if (Object.prototype.hasOwnProperty.call(s, p) && e.indexOf(p) < 0)
        t[p] = s[p];
    if (s != null && typeof Object.getOwnPropertySymbols === "function")
        for (var i = 0, p = Object.getOwnPropertySymbols(s); i < p.length; i++) {
            if (e.indexOf(p[i]) < 0 && Object.prototype.propertyIsEnumerable.call(s, p[i]))
                t[p[i]] = s[p[i]];
        }
    return t;
};
import { END, eventChannel } from 'redux-saga';
import { all, call, delay, fork, put, race, select, take } from 'redux-saga/effects';
import '../utils/safari';
import { apiActions, apiDisconnected, apiError, handleApiListenerConnected, SOCKET_CLOSED, SOCKET_ERROR, socketClosed, socketError, API_RECONNECT_REQUEST, reconnect, apiReconnectingState, CLEAR_USER_LOGIN_TOKENS, API_LOGIN_REJECT, } from '../actions';
import { getConfig } from '../utils/config';
import { ApiMessages } from '../ApiMessages';
import { ApiResponseHandlers } from '../ApiResponseHandlers';
import { incomingMessageLogger, outgoingMessageLogger } from '../utils/logger';
import { ProtoIdentifiers } from '../ProtoIdentifiers';
var EmitEventTypes;
(function (EmitEventTypes) {
    EmitEventTypes["MESSAGE"] = "MESSAGE";
    EmitEventTypes["ERROR"] = "ERROR";
    EmitEventTypes["DISCONNECT"] = "DISCONNECT";
})(EmitEventTypes || (EmitEventTypes = {}));
const MAX_RECONNECT_ATTEMPTS = 10;
const EXPONENTIAL_BACKOFF_FACTOR = 2;
const INITIAL_DELAY = 1000;
const MAX_DELAY = 60000;
const connect = () => {
    const appConfig = getConfig();
    if (!appConfig) {
        throw new Error('Config must be set before call to listen()');
    }
    const webSocketUrl = appConfig.feedApi.url;
    console.log('Connecting to: ', webSocketUrl);
    const socket = new WebSocket(webSocketUrl);
    return new Promise((resolve, reject) => {
        socket.onopen = () => {
            resolve(socket);
        };
        socket.onerror = (error) => {
            reject(error);
        };
    });
};
function* connectWithExponentialBackoff() {
    let attempt = 0;
    let currentDelay = INITIAL_DELAY;
    while (attempt < MAX_RECONNECT_ATTEMPTS) {
        try {
            yield put(apiReconnectingState(true, currentDelay / 1000));
            // @ts-ignore
            const socket = yield call(connect);
            return socket;
        }
        catch (e) {
            console.error(`Attempt ${attempt + 1} failed: ${e.message}`);
        }
        yield put(apiReconnectingState(false, currentDelay / 1000));
        const { reconnect } = yield race({
            timeout: delay(currentDelay),
            reconnect: take(API_RECONNECT_REQUEST),
        });
        currentDelay = Math.min(currentDelay * EXPONENTIAL_BACKOFF_FACTOR, MAX_DELAY);
        attempt++;
        if (reconnect) {
            attempt = 0;
            currentDelay = INITIAL_DELAY;
        }
    }
    throw new Error(`Connecting to backend failed after ${attempt} attempts`);
}
function parseProtobufMessage(data) {
    return __awaiter(this, void 0, void 0, function* () {
        const buffer = yield data.arrayBuffer();
        const identifier = new Uint8Array(buffer.slice(0, 1))[0];
        const message = new Uint8Array(buffer.slice(1));
        const protoIdentifier = ProtoIdentifiers[identifier];
        if (protoIdentifier) {
            const { messageType, proto } = protoIdentifier;
            return Object.assign({ messageType }, proto.decode(message));
        }
        else {
            throw Error(`No Protobuf definition found for identifier: ${identifier}`);
        }
    });
}
function watchMessages(socket) {
    return eventChannel((emit) => {
        socket.onmessage = (event) => __awaiter(this, void 0, void 0, function* () {
            let msg;
            if (!event.data.arrayBuffer) {
                msg = JSON.parse(event.data);
            }
            else {
                msg = yield parseProtobufMessage(event.data);
            }
            emit({
                type: EmitEventTypes.MESSAGE,
                message: msg,
            });
        });
        socket.onerror = (event) => {
            emit({
                type: EmitEventTypes.ERROR,
                message: event,
            });
            console.log('API connection error', event);
            emit(END);
        };
        socket.onclose = (event) => {
            if (event.wasClean) {
                emit({
                    type: EmitEventTypes.DISCONNECT,
                    payload: {
                        event,
                    },
                });
                console.log(`API connection closed cleanly`, event);
            }
            else {
                emit({
                    type: EmitEventTypes.ERROR,
                    payload: {
                        event,
                    },
                });
                console.log(`API connection closed abnormally`, event);
            }
        };
        return () => {
            socket.close();
            // .then(() => console.log('closing socket... logout'));
        };
    });
}
function* listenerTask(socketChannel) {
    var _a, _b;
    while (true) {
        // @ts-ignore
        const event = yield take(socketChannel);
        const { type, message, payload } = event;
        if (type === EmitEventTypes.MESSAGE) {
            const { messageType } = message, data = __rest(message, ["messageType"]);
            if ((messageType !== 'Quote' || messageType !== 'QuotesBatch') && !!incomingMessageLogger) {
                incomingMessageLogger(message);
            }
            // @ts-ignore
            if (!!ApiResponseHandlers[messageType]) {
                // @ts-ignore
                const action = ApiResponseHandlers[messageType](data);
                if (action === null || action === void 0 ? void 0 : action.type) {
                    yield put(action);
                }
            }
            else {
                console.error(`Cannot handle response for "${messageType}". No handlers are defined for this type.`);
            }
        }
        else if (type === EmitEventTypes.DISCONNECT) {
            yield put(socketClosed());
        }
        else if (type === EmitEventTypes.ERROR) {
            yield put(socketError((_a = payload === null || payload === void 0 ? void 0 : payload.event) === null || _a === void 0 ? void 0 : _a.code, (_b = payload === null || payload === void 0 ? void 0 : payload.event) === null || _b === void 0 ? void 0 : _b.reason));
        }
    }
}
/*  User Created Message (e.g. dispatch({ type: 'EXE_TASK', taskid: 5 })) sent to ws server  */
function* executeTask(socket) {
    while (true) {
        // @ts-ignore
        const data = yield take(apiActions.SEND);
        if (!!outgoingMessageLogger) {
            outgoingMessageLogger(data.payload);
        }
        socket.send(JSON.stringify(data.payload));
    }
}
// FIXME LVC! This should somehow be integrated in the regular expontential backoff
// logic and not retry with a fixed 1000ms delay
function* attemptReconnect(causeDelay) {
    if (causeDelay) {
        yield delay(1000);
    }
    yield put(reconnect());
}
// LVC!
// MAX_RECONNECT_ATTEMPTS, EXPONENTIAL_BACKOFF_FACTOR, INITIAL_DELAY, MAX_DELAY
export default function* apiListenerSaga() {
    while (true) {
        // @ts-ignore
        const { login } = yield race({
            login: take(apiActions.LOGIN),
            reconnect: take(API_RECONNECT_REQUEST),
        });
        const user = yield select((state) => state.user);
        const version = yield select((state) => state.releaseVersion);
        try {
            let socket;
            // TODO: Why are these separate?
            // Isn't the first iteration of `connectWithExponential` connecting immediately anyway?
            if (login) {
                // @ts-ignore
                socket = yield call(connect);
            }
            else {
                // @ts-ignore
                socket = yield call(connectWithExponentialBackoff);
            }
            yield put(handleApiListenerConnected());
            // @ts-ignore
            const socketChannel = yield call(watchMessages, socket);
            socket.send(JSON.stringify(ApiMessages.requestLogin(user.token, version)));
            // FIXME: Known issue - if the API token has expired, the server will close the connection and we will
            // not attempt to reconnect. We need to separate the actions into API_TOKEN_REJECTED for the `TOKEN_INVALID` message and `API_LOGIN_REJECT`
            // for permanent rejection (analogous to 401 and 403 status codes) and attempt a refresh on `TOKEN_INVALID`.
            const { cancel, error } = yield race({
                task: all([call(executeTask, socket), call(listenerTask, socketChannel)]),
                // FIXME: Do we need CLEAR_USER_LOGIN_TOKENS here?
                cancel: take([API_LOGIN_REJECT, CLEAR_USER_LOGIN_TOKENS]),
                error: take([SOCKET_CLOSED, SOCKET_ERROR]),
            });
            if (cancel || error) {
                socketChannel.close();
            }
            if (cancel) {
                yield put(apiDisconnected());
            }
            if (error) {
                yield fork(attemptReconnect, true);
            }
        }
        catch (error) {
            console.log('Error connecting to WebSocket: ', error);
            yield put(apiError(error.toString(), Boolean(login)));
        }
    }
}
