// %~dp0/../../../openai-realtime-console/src/pages/ConsolePage.tsx
// %~dp0/../../../openai-realtime-console/relay-server/lib/relay.js

// RealtimeApiAudioManager.ts

import { ref, type Ref } from 'vue';
import { ToastPluginApi } from 'vue-toast-notification';
import {
  ToAudioServer,
  ToAudioClient,
  ClientConnectRequest,
  AudioFileUpload,
  UpstreamState
} from '../proto-gen-ref/audio_pb';
import { WavRecorder, WavStreamPlayer } from './lib/wavtools';
import { WebSocketManager } from './WebSocketManager';

interface AudioData {
  mono: ArrayBuffer;
  raw: ArrayBuffer;
}

interface AudioPacket {
  serializedMsg: Uint8Array;
  seqNum: number;
  durationMs: number;
}

export class RealtimeAudioStreamingService extends WebSocketManager {
  private static readonly SAMPLE_RATE = 24000;
  private static readonly SAMPLES_PER_RECORDER_CHUNK = 4096;  // Each sample is 2 bytes (16-bit PCM)

  private isWavPlayerPlaying: Ref<boolean>;
  private isMutingRecording: Ref<boolean>;
  private wavRecorder: Ref<WavRecorder>;
  private wavStreamPlayer: Ref<WavStreamPlayer>;
  private lastKnownUpstreamStatusPrimary: Ref<UpstreamState>;
  private lastKnownUpstreamStatusObserver: Ref<UpstreamState>;
  private startupQueue: AudioPacket[] = [];
  private chunkProcessorCallCount = 0;
  private speechMessageCallbacks: Array<(source: string, text: string, itemId: string) => void> = [];

  private audioConnKey: string | null = null;
  private isActivelyRecording: Ref<boolean> = ref(false);
  private selectedUserAudioSessionId: string | null = null;

  private connectPromiseResolver: { resolve: () => void; reject: (error: Error) => void } | null = null;

  private constructor(
    hostPortNum: number,
    toast: ToastPluginApi,
    userToken: string,
    selectedUserAudioSessionId: string | null = null
  ) {
    super(hostPortNum, toast, userToken);
    this.selectedUserAudioSessionId = selectedUserAudioSessionId;
    this.isWavPlayerPlaying = ref(false);
    this.isMutingRecording = ref(false);
    this.wavRecorder = ref(new WavRecorder({
      sampleRate: RealtimeAudioStreamingService.SAMPLE_RATE,
      debug: true
    }));
    this.wavStreamPlayer = ref(new WavStreamPlayer({
      sampleRate: RealtimeAudioStreamingService.SAMPLE_RATE
    }));
    this.lastKnownUpstreamStatusPrimary = ref(UpstreamState.NO_CONNECTION);
    this.lastKnownUpstreamStatusObserver = ref(UpstreamState.NO_CONNECTION);

    this.wavStreamPlayer.value.onTrackChange(({ previousTrackId, newTrackId }) => {
      console.log('wavStreamPlayer.onTrackChange', previousTrackId, '->', newTrackId);
      this.isWavPlayerPlaying.value = Boolean(newTrackId);
    });
  }

  private static instance: RealtimeAudioStreamingService | null = null;
  public static getInstance(): RealtimeAudioStreamingService {
    if (!this.instance) {
      throw new Error('RealtimeAudioStreamingService not initialized');
    }
    return this.instance;
  }

  public static async initialize(
    hostPortNum: number,
    toast: ToastPluginApi,
    userToken: string,
    selectedUserAudioSessionId: string | null = null
  ): Promise<RealtimeAudioStreamingService> {
    if (this.instance) {
      await this.destroyInstance();
    }
    this.instance = new RealtimeAudioStreamingService(
      hostPortNum,
      toast,
      userToken,
      selectedUserAudioSessionId
    );
    return this.instance;
  }

  public isMicMuted(): boolean {
    return this.isMutingRecording.value;
  }

  public isRecording(): boolean {
    return this.wavRecorder.value.getStatus() === 'recording';
  }

  public isPlayingBack(): boolean {
    return this.isWavPlayerPlaying.value;
  }

  public getLastKnownUpstreamStatusPrimary(): UpstreamState {
    return this.lastKnownUpstreamStatusPrimary.value;
  }

  public getLastKnownUpstreamStatusObserver(): UpstreamState {
    return this.lastKnownUpstreamStatusObserver.value;
  }

  private txAudio(serializedBinary: Uint8Array): void {
    if (this.wsConnection?.readyState === WebSocket.OPEN) {
      this.wsConnection.send(serializedBinary);
    } else {
      console.log('######  txAudio() when wsConnection.readyState !== WebSocket.OPEN  ######');
    }
  }

  private createAudioPacket(monoInt16Array: Int16Array, seqNum: number): AudioPacket {
    const abuf = new Uint8Array(
      monoInt16Array.buffer,
      monoInt16Array.byteOffset,
      monoInt16Array.byteLength
    );
    // console.log('createAudioPacket', 'monoInt16Array.length', monoInt16Array.length);
    const durationMs = Math.floor((1000 * monoInt16Array.length) / RealtimeAudioStreamingService.SAMPLE_RATE);
    const afu = new AudioFileUpload()
      .setAudio(abuf)
      .setAudioType('pcm16')
      .setAudioDurationMs(durationMs)
      .setSequenceNumber(seqNum);
    const toServer = new ToAudioServer()
      .setAudioRequest(afu);
    return { serializedMsg: toServer.serializeBinary(), seqNum, durationMs };
  }

  private handleAudioResponse(msg: ToAudioClient): void {
    const afu = msg.getAudioResponse();
    if (afu) {
      const trackId = afu.getItemId() || 'unknown';
      const receivedAudio = afu.getAudio_asU8();
      const audioBuffer = receivedAudio.buffer.slice(
        receivedAudio.byteOffset,
        receivedAudio.byteOffset + receivedAudio.byteLength
      );
      const audioInt16Array = new Int16Array(audioBuffer);
      this.wavStreamPlayer.value.add16BitPCM(audioInt16Array, trackId);
    }
  }

  protected onCleanup(): void {
    if (this.isActivelyRecording.value && this.wavRecorder.value) {
      // Use WavRecorder's built-in cleanup
      this.wavRecorder.value.quit().catch(console.error);
    }

    this.isActivelyRecording.value = false;
    this.startupQueue = [];
    this.chunkProcessorCallCount = 0;
    if (this.wavStreamPlayer.value) {
      this.wavStreamPlayer.value.interrupt().catch(console.error);
    }
    this.connAckd = false;
    this.audioConnKey = null;

    // Ensure any pending connection promise is rejected
    if (this.connectPromiseResolver) {
      this.connectPromiseResolver.reject(new Error('Connection cleaned up'));
      this.connectPromiseResolver = null;
    }
  }

  protected override handleMessage(event: MessageEvent): void {
    const msg = ToAudioClient.deserializeBinary(new Uint8Array(event.data));
    switch (msg.getTypeCase()) {
      case ToAudioClient.TypeCase.CONNACK: {
        const cack = msg.getConnack();
        if (cack) {
          console.log('rxd ToAudioClient.CONNACK');
          const errMsg = cack.getError();
          console.log(`WebSocket connection #${this.connectionCounter} received CONNACK:`, errMsg);

          if (errMsg) {
            // Reject the connection promise with the error
            this.connectPromiseResolver?.reject(new Error(errMsg));
            this.connectPromiseResolver = null;
            return;
          }

          // Process successful connection
          let startupQueueDuration = 0;
          let startupQueueDepth = 0;
          while (this.startupQueue.length > 0) {
            const audioPacket = this.startupQueue.shift();
            if (audioPacket) {
              this.txAudio(audioPacket.serializedMsg);
              startupQueueDuration += audioPacket.durationMs;
              startupQueueDepth++;
            }
          }
          this.connAckd = true;
          console.log('startupQueueFlushed', 'depth', startupQueueDepth, 'durationMs', startupQueueDuration);

          // Resolve the connection promise on success
          this.connectPromiseResolver?.resolve();
          this.connectPromiseResolver = null;
        }
      } break;

      case ToAudioClient.TypeCase.AUDIO_RESPONSE:
        // console.log('rxd ToAudioClient.AUDIO_RESPONSE');
        this.handleAudioResponse(msg);
        break;

      case ToAudioClient.TypeCase.AUDIO_TRANSCRIPT: {
        const transcript = msg.getAudioTranscript();
        if (transcript) {
          this.speechMessageCallbacks.forEach(callback => {
            callback(transcript.getSource(), transcript.getMessage(), transcript.getItemId());
          });
        }
      } break;

      case ToAudioClient.TypeCase.UPSTREAM_STATUSES: {
        const newUpstreamStatuses = msg.getUpstreamStatuses();
        if (newUpstreamStatuses) {
          const primaryStatus = newUpstreamStatuses.getPrimary();
          const advisoryStatus = newUpstreamStatuses.getObserver();
          console.log('UPSTREAM_STATUSES',
            `Primary: ${this.lastKnownUpstreamStatusPrimary.value} -> ${primaryStatus}`,
            `Advisor: ${this.lastKnownUpstreamStatusObserver.value} -> ${advisoryStatus}`
          );
          this.lastKnownUpstreamStatusPrimary.value = primaryStatus;
          this.lastKnownUpstreamStatusObserver.value = advisoryStatus;
        }
      } break;
    }
  }

  protected sendConnectRequest(): boolean {
    if (this.audioConnKey && this.wsConnection) {
      const connReq = new ClientConnectRequest()
        .setAuthKey(this.audioConnKey)
        .setUserIdent(this.userToken);

      // Add userAudioSessionId if one was selected
      if (this.selectedUserAudioSessionId) {
        connReq.setUserAudioSessionId(this.selectedUserAudioSessionId);
      }

      const tac = new ToAudioServer()
        .setConnreq(connReq);
      this.wsConnection.send(tac.serializeBinary());
    } else {
      console.log('sendConnectRequest: invalid audioConnKey or wsConnection');
    }
    return false;
  }

  public async connectWithKey(audioConnKey: string): Promise<void> {
    if (this.audioConnKey === audioConnKey) {
      console.log('Audio connection already established with this key');
      return;
    }

    // Clean up any existing connection first
    await this.disconnect();

    // Create a promise that will resolve/reject based on CONNACK
    const connectionPromise = new Promise<void>((resolve, reject) => {
      this.connectPromiseResolver = { resolve, reject };
    });

    // Connect to WebSocket
    this.audioConnKey = audioConnKey;
    this.setupWebSocketConnection();

    try {
      // Wait for CONNACK before proceeding
      await connectionPromise;

      // Set up audio output first
      await this.wavStreamPlayer.value.connect();

      // Start recording
      await this.wavRecorder.value.begin();
      this.isActivelyRecording.value = true;
      this.chunkProcessorCallCount = 0;

      // Set up recording callback
      const recordPromise = this.wavRecorder.value.record((data: AudioData) => {
        if (!this.isMutingRecording.value) {
          const monoInt16Array = new Int16Array(data.mono);
          if (monoInt16Array.byteLength > 0) {
            const audioPacket = this.createAudioPacket(monoInt16Array, this.chunkProcessorCallCount++);
            if (this.connAckd) {
              this.txAudio(audioPacket.serializedMsg);
            } else {
              this.startupQueue.push(audioPacket);
            }
          }
        }
        return true;
      }, RealtimeAudioStreamingService.SAMPLES_PER_RECORDER_CHUNK * 2);  // Convert samples to bytes

      // Wait for recording setup to complete
      await recordPromise;

    } catch (error) {
      // Ensure cleanup happens on error
      await this.disconnect();
      throw error;
    }
  }

  public toggleAudioMuting(): void {
    this.isMutingRecording.value = !this.isMutingRecording.value;
  }

  public onSpeechMessage(callback: (source: string, text: string, itemId: string) => void) {
    this.speechMessageCallbacks.push(callback);
  }

  public submitText(text: string): void {
    const toServer = new ToAudioServer()
      .setUserSubmittedText(text);
    this.txAudio(toServer.serializeBinary());
  }

  public toggleMonologMode(): void {
    const newMsg = new ToAudioServer();
    newMsg.setToggleMonologMode(false); // value is irrelevant
    this.txAudio(newMsg.serializeBinary());
  }

  public toggleObserverMode(): void {
    const newMsg = new ToAudioServer();
    newMsg.setToggleAiObserverMode(false); // value is irrelevant
    this.txAudio(newMsg.serializeBinary());
  }

  // Add a destructor method
  public async destroy(): Promise<void> {
    await this.disconnect();
    // Don't null the instance here - that's handled by destroyInstance
  }

  public static async destroyInstance(): Promise<void> {
    if (this.instance) {
      try {
        await this.instance.destroy();
      } finally {
        // Ensure instance is nulled even if destroy throws
        this.instance = null;
      }
    }
  }

  protected override handleEarlyDisconnect(event: CloseEvent): void {
    // If we have a pending connection attempt, reject it
    if (this.connectPromiseResolver) {
      this.connectPromiseResolver.reject(
        new Error(`Connection closed before CONNACK received (code: ${event.code})`)
      );
      this.connectPromiseResolver = null;
    }
  }
}
