// %~dp0/../../../openai-realtime-console/src/pages/ConsolePage.tsx
// %~dp0/../../../openai-realtime-console/relay-server/lib/relay.js

// RealtimeApiAudioManager.ts

import { ref, type Ref } from 'vue';
import { ToastPluginApi } from 'vue-toast-notification';
import {
  ToAudioServer,
  ToAudioClient,
  ClientConnectRequest,
  AudioFileUpload,
  AudioPlaybackProgressPt,
  AudioPlaybackTrackChange,
  UpstreamState
} from '../proto-gen-ref/audio_pb';
import { WavRecorder, WavStreamPlayer } from './lib/wavtools';
import { WebSocketManager } from './WebSocketManager';

interface AudioData {
  mono: ArrayBuffer;
  raw: ArrayBuffer;
}

interface AudioPacket {
  serializedMsg: Uint8Array;
  seqNum: number;
  durationMs: number;
}

export class RealtimeAudioStreamingService extends WebSocketManager {
  private static readonly SAMPLE_RATE = 24000;

  private isWavPlayerPlaying: Ref<boolean>;
  private isMutingRecording: Ref<boolean>;
  private wavRecorder: Ref<WavRecorder>;
  private wavStreamPlayer: Ref<WavStreamPlayer>;
  private lastKnownUpstreamStatus: Ref<UpstreamState>;
  private startupQueue: AudioPacket[] = [];
  private chunkProcessorCallCount = 0;
  private onlyAllowedResponseItemId = '';
  private speechMessageCallbacks: Array<(source: string, text: string, itemId: string) => void> = [];
  private interruptingPlayback = false;

  private receivingTrackId = '';
  private isHandlingConversationStop = false;

  private audioConnKey: string | null = null;
  private isActivelyRecording: Ref<boolean> = ref(false);

  private trackChangePromise: Promise<void> | null = null;
  private trackChangeResolve: (() => void) | null = null;

  private completedTrackIds = new Set<string>();

  private constructor(hostPortNum: number, toast: ToastPluginApi, userToken: string) {
    super(hostPortNum, toast, userToken);
    this.isWavPlayerPlaying = ref(false);
    this.isMutingRecording = ref(false);
    this.wavRecorder = ref(new WavRecorder({
      sampleRate: RealtimeAudioStreamingService.SAMPLE_RATE,
      debug: true
    }));
    this.wavStreamPlayer = ref(new WavStreamPlayer({
      sampleRate: RealtimeAudioStreamingService.SAMPLE_RATE
    }));
    this.lastKnownUpstreamStatus = ref(UpstreamState.NO_CONNECTION);

    this.wavStreamPlayer.value.onTrackChange(({ previousTrackId, newTrackId }) => {
      console.log('wavStreamPlayer.onTrackChange', previousTrackId, '->', newTrackId);

      const wasInterrupting = this.interruptingPlayback;
      this.interruptingPlayback = false;  // Reset early to ensure it's always cleared

      // Only verify completion (rxd TRACK_RELAY_DONE) for natural track endings (not interruptions)
      if (previousTrackId && previousTrackId !== newTrackId && !wasInterrupting) {
        if (!this.completedTrackIds.has(previousTrackId)) {
          console.log('Ignoring temporary track change for', previousTrackId);
          return; // Don't process this track change yet
        }
        this.completedTrackIds.delete(previousTrackId);
      }

      this.isWavPlayerPlaying.value = Boolean(newTrackId);

      const appp = new AudioPlaybackTrackChange();
      if (previousTrackId !== null) {
        appp.setOldTrackId(previousTrackId);
      }
      if (newTrackId !== null) {
        appp.setNewTrackId(newTrackId);
      }
      const toServer = new ToAudioServer()
        .setAudioPlaybackTrackUpdate(appp);
      this.txAudio(toServer.serializeBinary());

      if (newTrackId === null && this.trackChangeResolve) {
        this.trackChangeResolve();
        this.trackChangeResolve = null;
        this.trackChangePromise = null;
      }
    });
  }

  private static instance: RealtimeAudioStreamingService | null = null;
  public static getInstance(): RealtimeAudioStreamingService {
    if (!this.instance) {
      throw new Error('RealtimeAudioStreamingService not initialized');
    }
    return this.instance;
  }

  public static initialize(hostPortNum: number, toast: ToastPluginApi, userToken: string): RealtimeAudioStreamingService {
    if (this.instance) {
      return this.instance;
    }
    this.instance = new RealtimeAudioStreamingService(hostPortNum, toast, userToken);
    return this.instance;
  }

  public isMicMuted(): boolean {
    return this.isMutingRecording.value;
  }

  public isRecording(): boolean {
    return this.wavRecorder.value.getStatus() === 'recording';
  }

  public isPlayingBack(): boolean {
    return this.isWavPlayerPlaying.value;
  }

  public getLastKnownUpstreamStatus(): UpstreamState {
    return this.lastKnownUpstreamStatus.value;
  }

  private txAudio(serializedBinary: Uint8Array): void {
    if (this.wsConnection?.readyState === WebSocket.OPEN) {
      this.wsConnection.send(serializedBinary);
    } else {
      console.log('######  txAudio() when wsConnection.readyState !== WebSocket.OPEN  ######');
    }
  }

  private createAudioPacket(monoInt16Array: Int16Array, seqNum: number): AudioPacket {
    const abuf = new Uint8Array(
      monoInt16Array.buffer,
      monoInt16Array.byteOffset,
      monoInt16Array.byteLength
    );
    // console.log('createAudioPacket', 'monoInt16Array.length', monoInt16Array.length);
    const durationMs = Math.floor((1000 * monoInt16Array.length) / RealtimeAudioStreamingService.SAMPLE_RATE);
    const afu = new AudioFileUpload()
      .setAudio(abuf)
      .setAudioType('pcm16')
      .setAudioDurationMs(durationMs)
      .setSequenceNumber(seqNum);
    const toServer = new ToAudioServer()
      .setAudioRequest(afu);
    return { serializedMsg: toServer.serializeBinary(), seqNum, durationMs };
  }

  private async interruptPlayback(): Promise<{ trackId: string | null, offset: number, currentTime: number } | null> {
    this.interruptingPlayback = true;
    return await this.wavStreamPlayer.value.interrupt();
  }

  private async handleAudioPlaybackCancel(msg: ToAudioClient): Promise<void> {
    const apc = msg.getAudioPlaybackCancel();
    if (apc) {
      const trackSampleOffset = await this.interruptPlayback();
      if (trackSampleOffset?.trackId) {
        const { trackId, offset, currentTime } = trackSampleOffset;
        const offsetMs = Math.floor(currentTime * 1000);
        const appp = new AudioPlaybackProgressPt()
          .setItemId(trackId)
          .setOffsetMs(offsetMs);
        const toServer = new ToAudioServer()
          .setAudioPlaybackCancelledAt(appp);
        this.txAudio(toServer.serializeBinary());
        console.log('AUDIO_PLAYBACK_CANCELLED_AT', trackId, offsetMs);
      } else {
        console.log('AUDIO_PLAYBACK_CANCELLED_AT', '(nothing playing?)');
      }
    }
  }

  private handleAudioResponse(msg: ToAudioClient): void {
    const afu = msg.getAudioResponse();
    if (afu) {
      const trackId = afu.getItemId();
      if (trackId.length === 0) {
        console.log('handleAudioResponse', 'ignoring audio response with empty trackId');
        return;
      }
      if (this.receivingTrackId !== trackId) {
        this.receivingTrackId = trackId;
        console.log('receivingAudioTrack', this.receivingTrackId);
      }
      if (this.onlyAllowedResponseItemId.length > 0 && trackId !== this.onlyAllowedResponseItemId) {
        console.log('handleAudioResponse', 'ignoring audio response for trackId', trackId, '- only allowing', this.onlyAllowedResponseItemId);
        return;
      }
      const seqNum = afu.getSequenceNumber();
      const receivedAudio = afu.getAudio_asU8();
      const audioBuffer = receivedAudio.buffer.slice(
        receivedAudio.byteOffset,
        receivedAudio.byteOffset + receivedAudio.byteLength
      );
      const audioInt16Array = new Int16Array(audioBuffer);
      if (seqNum % 20 === 0) {
        //console.log( 'getAudioResponse', seqNum, 'O', receivedAudio.byteOffset, 'B', receivedAudio.length, 'L', audioInt16Array.length );
      }
      this.wavStreamPlayer.value.add16BitPCM(audioInt16Array, trackId);
    }
  }

  protected onCleanup(): void {
    this.isActivelyRecording.value = false;
    this.wavRecorder.value.end().catch(console.error);
    this.startupQueue = [];
    this.chunkProcessorCallCount = 0;
    this.wavStreamPlayer.value.interrupt().catch(console.error);
    this.connAckd = false;
    this.audioConnKey = null;
  }

  protected override handleMessage(event: MessageEvent): void {
    const msg = ToAudioClient.deserializeBinary(new Uint8Array(event.data));
    switch (msg.getTypeCase()) {
      case ToAudioClient.TypeCase.REALTIME_CONVERSATION_STOP: {
        const stopReq = msg.getRealtimeConversationStop();
        if (stopReq) {
          this.handleConversationStop(stopReq.getSreqId(), stopReq.getMaxPlaybackCompletionTmMs());
        }
      } break;

      case ToAudioClient.TypeCase.CONNACK: {
        const cack = msg.getConnack();
        if (cack) {
          const telltale = cack.getTelltale();
          console.log(`WebSocket connection #${this.connectionCounter} received CONNACK:`, telltale);
          let startupQueueDuration = 0;
          let startupQueueDepth = 0;
          while (this.startupQueue.length > 0) {
            const audioPacket = this.startupQueue.shift();
            if (audioPacket) {
              this.txAudio(audioPacket.serializedMsg);
              // console.log('startupQueue.shift', audioPacket.seqNum, 'durationMs', audioPacket.durationMs);
              startupQueueDuration += audioPacket.durationMs;
              startupQueueDepth++;
            }
          }
          this.connAckd = true;
          console.log('startupQueueFlushed', 'depth', startupQueueDepth, 'durationMs', startupQueueDuration);
        }
      } break;

      case ToAudioClient.TypeCase.AUDIO_PLAYBACK_CANCEL:
        this.handleAudioPlaybackCancel(msg);
        break;

      case ToAudioClient.TypeCase.AUDIO_RESPONSE:
        this.handleAudioResponse(msg);
        break;

      case ToAudioClient.TypeCase.AUDIO_TRANSCRIPT: {
        const transcript = msg.getAudioTranscript();
        if (transcript) {
          this.speechMessageCallbacks.forEach(callback => {
            callback(transcript.getSource(), transcript.getMessage(), transcript.getItemId());
          });
        }
      } break;

      case ToAudioClient.TypeCase.TRACK_RELAY_DONE: {
        const trackId = msg.getTrackRelayDone();
        if (trackId) {
          console.log('TRACK_RELAY_DONE', trackId);
          if (trackId === '*') {  // Special case: upstream connection dropped
            // Mark current track as complete since we won't get any more audio
            if (this.receivingTrackId) {
              this.completedTrackIds.add(this.receivingTrackId);
            }
          } else {
            this.completedTrackIds.add(trackId);
          }
        }
      } break;

      case ToAudioClient.TypeCase.UPSTREAM_STATUS: {
        const newUpstreamStatus = msg.getUpstreamStatus();
        console.log('UPSTREAM_STATUS', this.lastKnownUpstreamStatus.value, '->', newUpstreamStatus);
        this.lastKnownUpstreamStatus.value = newUpstreamStatus;
      } break;
    }
  }

  protected sendConnectRequest(): boolean {
    if (this.audioConnKey && this.wsConnection) {
      const connReq = new ClientConnectRequest()
        .setAuthKey(this.audioConnKey)
        .setUserIdent(this.userToken);
      const tac = new ToAudioServer()
        .setConnreq(connReq);
      this.wsConnection.send(tac.serializeBinary());
    } else {
      console.log('sendConnectRequest: invalid audioConnKey or wsConnection');
    }
    return false; // because we have the ack thing going on
  }

  public async connectWithKey(audioConnKey: string): Promise<void> {
    if (this.audioConnKey === audioConnKey) {
      console.log('Audio connection already established with this key');
      return;
    }

    // Connect to WebSocket
    this.audioConnKey = audioConnKey;
    this.setupWebSocketConnection();

    try {
      // Set up audio output first
      await this.wavStreamPlayer.value.connect();

      // Start recording
      await this.wavRecorder.value.begin();
      this.isActivelyRecording.value = true;
      this.chunkProcessorCallCount = 0;

      // Set up recording callback
      const recordPromise = this.wavRecorder.value.record((data: AudioData) => {
        if (!this.isMutingRecording.value) {
          const monoInt16Array = new Int16Array(data.mono);
          if (monoInt16Array.byteLength > 0) {
            const audioPacket = this.createAudioPacket(monoInt16Array, this.chunkProcessorCallCount++);
            if (this.connAckd) {
              this.txAudio(audioPacket.serializedMsg);
            } else {
              this.startupQueue.push(audioPacket);
            }
          }
        }
        return true;
      });

      // Wait for recording setup to complete
      await recordPromise;

    } catch (error) {
      this.isActivelyRecording.value = false;
      console.error('Error establishing audio connection:', error);
      throw error;
    }
  }

  public toggleAudioMuting(): void {
    this.isMutingRecording.value = !this.isMutingRecording.value;
  }

  public onSpeechMessage(callback: (source: string, text: string, itemId: string) => void) {
    this.speechMessageCallbacks.push(callback);
  }

  private async handleConversationStop(sreqId: string, maxPlaybackCompletionTmMs: number): Promise<void> {
    if (this.isHandlingConversationStop) {
      console.log('Already handling conversation stop, ignoring duplicate request');
      return;
    }

    this.isHandlingConversationStop = true;
    try {
      // Get both track ID and offset in one atomic operation
      const trackSampleOffset = await this.wavStreamPlayer.value.getTrackSampleOffset();
      const curTrackId = trackSampleOffset?.trackId ?? '';
      const offsetMs = Math.floor((trackSampleOffset?.currentTime ?? 0) * 1000);

      const appp = new AudioPlaybackProgressPt()
        .setItemId(curTrackId)
        .setOffsetMs(offsetMs)
        .setSreqId(sreqId);
      const toServer = new ToAudioServer()
        .setGracefullyStoppingAt(appp);
      this.txAudio(toServer.serializeBinary());

      // Wait for playback to complete
      await this.waitForPlaybackCompletion(maxPlaybackCompletionTmMs);

      // Send graceful_stop_done instead of generic ack
      const toServerDone = new ToAudioServer()
        .setGracefulStopDone(sreqId);
      this.txAudio(toServerDone.serializeBinary());
    } finally {
      this.isHandlingConversationStop = false;
    }
  }

  private async waitForPlaybackCompletion(maxPlaybackCompletionTmMs: number): Promise<void> {
    // Get current state atomically
    const trackSampleOffset = await this.wavStreamPlayer.value.getTrackSampleOffset();
    const curTrackId = trackSampleOffset?.trackId ?? '';

    if (curTrackId.length > 0) {
      // Set up promise before changing any state
      this.trackChangePromise = new Promise((resolve) => {
        this.trackChangeResolve = resolve;
      });

      // Set allowed ID after promise is ready
      this.onlyAllowedResponseItemId = curTrackId;
      console.log('waitForPlaybackCompletion', 'WavPlayerIsPlaying', curTrackId);

      try {
        await Promise.race([
          this.trackChangePromise,
          new Promise((_, reject) => setTimeout(() => reject(new Error('Timeout')), maxPlaybackCompletionTmMs))
        ]);
      } catch (error) {
        console.log('Playback wait timeout reached, interrupting playback');
        await this.interruptPlayback();
      } finally {
        this.trackChangeResolve = null;
        this.trackChangePromise = null;
        this.onlyAllowedResponseItemId = '';
      }
    }
  }

  public submitText(text: string): void {
    const toServer = new ToAudioServer()
      .setUserSubmittedText(text);
    this.txAudio(toServer.serializeBinary());
  }

  public toggleMonologMode(): void {
    const newMsg = new ToAudioServer();
    newMsg.setToggleMonologMode(false); // value is irrelevant
    this.txAudio(newMsg.serializeBinary());
  }
}
