import { logger, type WSMessage } from '@tunasong/models'
import {
  Awareness,
  awarenessProtocol,
  isYDocSync,
  SyncHandler,
  Y,
  type YDocMessage,
  type YDocSyncRequest,
  YJSSyncType,
} from '@tunasong/sync-lib'
import { fromUint8Array, toUint8Array } from 'js-base64'
import * as decoding from 'lib0/decoding'
import * as encoding from 'lib0/encoding'
import { Observable } from 'lib0/observable'
import * as time from 'lib0/time'
import { YJS_DEFAULT_LOCAL_ORIGIN } from '../local-origin.js'
import { UndoManager } from 'yjs'
import { splitMessage } from '@tunasong/ws'
import invariant from 'tiny-invariant'

type MessageHandler = (
  encoder: encoding.Encoder,
  decoder: decoding.Decoder,
  provider: YDocWebsocketProvider,
  emitSynced: boolean,
  messageType?: YDocMessage
) => void

const messageHandlers: MessageHandler[] = []

// @todo - this should depend on awareness.outdatedTime
const messageReconnectTimeout = 30000

export interface WebsocketProviderOpts {
  connect: boolean
  awareness: Awareness
  params: Record<string, string>
  /** The resyncInterval (step1) in milliseconds. Set to -1 to disable. */
  resyncInterval: number
  initialSync: boolean
  /** receiveOnly - i.e., do not send any messages */
  receiveOnly: boolean
}

/**
 * Websocket Provider for Yjs. Creates a websocket connection to sync the shared document.
 * The document name is attached to the provided url. I.e. the following example
 * creates a websocket connection to http://localhost:1234/my-document-name
 *
 * @example
 *   import { Y } from '@tunasong/sync-lib'
 *   import { WebsocketProvider } from 'y-websocket'
 *   const doc = new Y.Doc()
 *   const provider = new WebsocketProvider('http://localhost:1234', 'my-document-name', doc)
 *
 * @extends {Observable<string>}
 *
 */

type SyncEvent = 'update' | 'sync' | 'status'
// AWS API Gateway has a 128kB for Websockets‚ we'll use 125kB to be safe since there also is some overhead for the JSON envelope
const MAX_MESSAGE_SIZE = 1024 * 125
// The max size of the partial message envelope (approx, needs to be >= the actual size).
const PARTIAL_MESSAGE_ENVELOPE_SIZE = 512
/** Defer updates by this number of milliseconds. Abort previous awareness updates to avoid flooding the Websocket */
const AWARENESS_DEFER_MS = 50
const UPDATE_DEFER_MS = 500

export class YDocWebsocketProvider extends Observable<SyncEvent> {
  entityId: string
  doc: Y.Doc
  wsUnsuccessfulReconnects = 0
  messageHandlers = [...messageHandlers]
  /** Queue for messages received when we are not connected */
  messageQueue: Uint8Array[] = []

  isSynced = false
  wsLastMessageReceived = 0
  /**
   * Whether to connect to other peers or not
   */
  shouldConnect: boolean

  /** Run the initial SYNC0 step on connect. Typically false for read-only editors */
  initialSync: boolean

  resyncIntervalId = 0
  checkIntervalId = 0

  ws: WebSocket | null = null

  syncHandler: SyncHandler

  awarenessTimeout: number | null = null
  updateTimeout: number | null = null
  /** don't send messages */
  receiveOnly: boolean

  constructor(
    webSocket: WebSocket,
    entityId: string,
    doc: Y.Doc,
    private awareness: Awareness,
    opts?: Partial<WebsocketProviderOpts>
  ) {
    super()

    const { connect = true, resyncInterval = -1, initialSync = true, receiveOnly = false } = opts ?? {}

    this.receiveOnly = receiveOnly
    this.entityId = entityId
    this.doc = doc
    this.shouldConnect = connect
    this.initialSync = initialSync

    if (doc.guid !== entityId) {
      throw new Error(`WebsocketProvider: entityId ${entityId} does not match doc.guid ${doc.guid}`)
    }

    this.ws = webSocket

    this.doc.on('update', this.updateHandler)

    if (resyncInterval > 0) {
      this.resyncIntervalId = window.setInterval(() => this.sync(YJSSyncType.SyncStep1), resyncInterval)
    }

    if (typeof window !== 'undefined') {
      window.addEventListener('beforeunload', this.beforeUnloadHandler)
    } else if (typeof process !== 'undefined') {
      process.on('exit', () => this.beforeUnloadHandler)
    }

    this.checkIntervalId = window.setInterval(this.check, messageReconnectTimeout / 10)

    /** Websocket events */
    this.ws.addEventListener('open', this.connect)
    this.ws.addEventListener('close', this.disconnect)

    /** Awareness events */
    this.awareness.on('update', this.awarenessUpdateHandler)

    this.syncHandler = new SyncHandler(doc, this.awareness)
  }

  get connected() {
    return this.ws?.readyState === WebSocket.OPEN
  }

  handleWSMessage = (event: MessageEvent) => {
    this.wsLastMessageReceived = time.getUnixTime()

    /** Event is a MessageEvent */
    const eventMsg = JSON.parse(event.data) as MessageEvent

    const peerMsg = eventMsg?.data

    /** We may share the Websocket with others, so we need to check if the message is for us */
    if (!isYDocSync(peerMsg)) {
      return
    }

    /** Is the message for the entity we're managing? */
    if (peerMsg.entityId !== this.entityId) {
      logger.warn('Ignoring message for other entity', peerMsg.entityId, this.entityId)
      return
    }
    const sourceConnectionId = peerMsg.sourceConnectionId
    if (!sourceConnectionId) {
      logger.error(peerMsg)
      throw new Error(
        `sourceConnectionId is not set: ${peerMsg.type}: ${peerMsg.entityId}, ${peerMsg.targetConnectionId}`
      )
    }

    const binaryMsgs: Uint8Array[] = peerMsg.messagesBase64.map((m: string) => toUint8Array(m))

    const responses = binaryMsgs
      .map(msg => this.syncHandler.applySyncMessage(msg, sourceConnectionId))
      .flat()
      .filter(Boolean)

    this.sendMessages(responses, peerMsg.sourceConnectionId)
  }

  /** Set target to null to broadcast to all clients */

  sendMessages = (bufs: (Uint8Array | null)[], targetConnectionId?: string | null) => {
    if (this.receiveOnly) {
      logger.debug('Not sending messages, readOnly', bufs)
      return
    }
    const filteredBufs = bufs.filter(Boolean) as Uint8Array[]

    if (filteredBufs.length === 0) {
      return
    }

    this.messageQueue.push(...filteredBufs)

    if (!(this.connected && this.ws)) {
      logger.warn('Not connected or websocket not ready, queueing message...')
      return
    }

    const msgs = this.messageQueue
    this.messageQueue = []
    if (msgs.length === 0) {
      return
    }

    const msg: WSMessage<YDocSyncRequest> = {
      action: 'yjs',
      data: {
        targetConnectionId,
        entityId: this.entityId,
        type: 'sync',
        messagesBase64: msgs.map(buf => fromUint8Array(buf)),
      },
    }

    // Split the message into parts if it's too large. We
    const sendMessages =
      JSON.stringify(msg).length > MAX_MESSAGE_SIZE
        ? splitMessage(msg, MAX_MESSAGE_SIZE - PARTIAL_MESSAGE_ENVELOPE_SIZE)
        : [msg]

    for (const msg of sendMessages) {
      const msgStr = JSON.stringify(msg)
      invariant(msgStr.length <= MAX_MESSAGE_SIZE, `Message too large - this should not happen: ${msgStr.length}`)
      this.ws.send(msgStr)
    }
  }

  /**
   * Listens to Yjs updates and sends them to remote peers
   */
  updateHandler = (update: Uint8Array, origin: unknown) => {
    if (origin !== YJS_DEFAULT_LOCAL_ORIGIN && !(origin instanceof UndoManager)) {
      logger.debug(`Yjs update ignored from non-local origin`, origin)
      return
    }

    /** Queue the message, and debounce the update */
    this.messageQueue.push(SyncHandler.encodeUpdate(update))

    if (this.updateTimeout) {
      clearTimeout(this.updateTimeout)
    }

    this.updateTimeout = window.setTimeout(() => {
      logger.debug('Yjs update from local editor, sending to peers. Message length: ', update.length)

      this.sendMessages([], null)
    }, UPDATE_DEFER_MS)
  }

  awarenessUpdateHandler = ({ added, updated, removed }: { added: number[]; updated: number[]; removed: number[] }) => {
    const changedClients: number[] = [...added, ...updated, ...removed]
    /** Only send awareness updates for ourselves */
    if (!changedClients.includes(this.doc.clientID)) {
      return
    }

    const msg = this.syncHandler.encodeAwareness([this.doc.clientID])
    if (!msg) {
      logger.error(`Awareness encoder failed, null`)
      return
    }
    if (this.awarenessTimeout) {
      window.clearTimeout(this.awarenessTimeout)
    }
    /** Defer until the event loop is empty, and update only every 200ms */
    this.awarenessTimeout = window.setTimeout(() => {
      logger.debug(
        `Sending awareness update to all clients: ${added.length} added, ${updated.length} updated, ${removed.length} removed`,
        updated
      )
      this.sendMessages([msg], null)
      this.awarenessTimeout = null
    }, AWARENESS_DEFER_MS)
  }

  beforeUnloadHandler = () => {
    awarenessProtocol.removeAwarenessStates(this.awareness, [this.doc.clientID], 'window unload')
  }

  check = () => {
    if (messageReconnectTimeout < time.getUnixTime() - this.wsLastMessageReceived && this.ws) {
      // no message received in a long time - not even your own awareness
      // updates (which are updated every 15 seconds)
      this.ws.close()
    }
  }

  sync = (step: YJSSyncType.SyncStep0 | YJSSyncType.SyncStep1 = YJSSyncType.SyncStep0) => {
    const data =
      step === YJSSyncType.SyncStep0 ? this.syncHandler.encodeSyncStep0() : this.syncHandler.encodeSyncStep1()

    this.sendMessages([data, this.syncHandler.encodeAwareness()], null)
  }

  /**
   * Persist the changes between the local doc and the doc state when we have connected to.
   * Use after synced with local IndexedDB log.
   * @returns The update message that was sent to the server
   */
  persistLocalChanges = () => {
    const update = SyncHandler.encodeUpdate(this.syncHandler.getChanges())
    /** Mark messages as "server" to make sure the server stores them */
    this.sendMessages([update], 'server')
    return update
  }

  connect = () => {
    /** If the WebSocket is not already open we will run this when it is */
    if (this.ws?.readyState !== WebSocket.OPEN) {
      return
    }

    this.emit('status', [{ status: 'connecting' }])

    this.wsLastMessageReceived = time.getUnixTime()

    this.ws.addEventListener('message', this.handleWSMessage)

    this.emit('status', [{ status: 'connected' }])

    if (this.initialSync) {
      this.sync(YJSSyncType.SyncStep0)
    }
  }

  disconnect = () => {
    if (this.resyncIntervalId !== 0) {
      clearInterval(this.resyncIntervalId as number)
    }
    clearInterval(this.checkIntervalId as number)

    if (typeof window !== 'undefined') {
      window.removeEventListener('beforeunload', this.beforeUnloadHandler)
    } else if (typeof process !== 'undefined') {
      process.off('exit', () => this.beforeUnloadHandler)
    }

    this.ws?.removeEventListener('message', this.handleWSMessage)

    this.awareness.off('update', this.awarenessUpdateHandler)
    this.doc.off('update', this.updateHandler)

    /** Clear the sync status to ensure we fully sync up when going back online */
    this.syncHandler.clearSyncStatus()

    // update awareness (all users except local left)
    awarenessProtocol.removeAwarenessStates(
      this.awareness,
      Array.from(this.awareness.getStates().keys()).filter(client => client !== this.doc.clientID),
      this
    )

    this.emit('status', [{ status: 'disconnected' }])
  }

  destroy = () => {
    if (this.ws) {
      this.ws.removeEventListener('open', this.connect)
      this.ws.removeEventListener('close', this.disconnect)
    }
    this.disconnect()
    this.ws = null
    super.destroy()
  }
}
