// %~dp0/../../../openai-realtime-console/src/pages/ConsolePage.tsx
// %~dp0/../../../openai-realtime-console/relay-server/lib/relay.js

// RealtimeApiAudioManager.ts

import { ref, type Ref } from 'vue';
import { ToastPluginApi } from 'vue-toast-notification';
import {
  ToAudioServer,
  ToAudioClient,
  ClientConnectRequest,
  ClientPush,
  ServerPush,
  AudioFileUpload,
  AudioPlaybackProgressPt,
  ServerReqAck,
  ServerReq,
  Empty,
} from '../proto-gen-ref/audio_pb';
import { WavRecorder, WavStreamPlayer } from './lib/wavtools';
import { WebSocketManager } from './WebSocketManager';

interface AudioData {
  mono: ArrayBuffer;
  raw: ArrayBuffer;
}

interface AudioPacket {
  serializedMsg: Uint8Array;
  seqNum: number;
  durationMs: number;
}

type TrackSampleOffset = Awaited<ReturnType<WavStreamPlayer['getTrackSampleOffset']>>;

export class RealtimeAudioStreamingService extends WebSocketManager {
  private static readonly AUTH_KEY = "a692c1cf-c590-49c5-81ef-850afd79d36c";
  private static readonly SAMPLE_RATE = 24000;

  private playbackMonitorInterval: number | null = null;
  private readonly PLAYBACK_MONITOR_INTERVAL_MS = 200;

  private isWavRecorderConnected: Ref<boolean>;
  private isWavPlayerPlaying: Ref<boolean>;
  private isMutingRecording: Ref<boolean>;
  private wavRecorder: Ref<WavRecorder>;
  private wavStreamPlayer: Ref<WavStreamPlayer>;
  private startupQueue: AudioPacket[] = [];
  private chunkProcessorCallCount = 0;
  private onlyAllowedResponseItemId = '';
  private speechCallback?: (text: string) => void;
  private speechMessageCallbacks: Array<(source: string, text: string) => void> = [];

  private playingTrackId = '';
  private receivingTrackId = '';

  private isHandlingConversationStop = false;

  private audioConnKey: string | null = null;
  private isActivelyRecording: Ref<boolean> = ref(false);

  private constructor(hostPortNum: number, toast: ToastPluginApi, userToken: string) {
    super(hostPortNum, toast, userToken);
    this.isWavRecorderConnected = ref(false);
    this.isWavPlayerPlaying = ref(false);
    this.isMutingRecording = ref(false);
    this.wavRecorder = ref(new WavRecorder({
      sampleRate: RealtimeAudioStreamingService.SAMPLE_RATE,
      debug: true
    }));
    this.wavStreamPlayer = ref(new WavStreamPlayer({
      sampleRate: RealtimeAudioStreamingService.SAMPLE_RATE
    }));
  }

  private static instance: RealtimeAudioStreamingService | null = null;
  public static getInstance(): RealtimeAudioStreamingService {
    if (!this.instance) {
      throw new Error('RealtimeAudioStreamingService not initialized');
    }
    return this.instance;
  }

  public static initialize(hostPortNum: number, toast: ToastPluginApi, userToken: string): RealtimeAudioStreamingService {
    if (this.instance) {
      return this.instance;
    }
    this.instance = new RealtimeAudioStreamingService(hostPortNum, toast, userToken);
    return this.instance;
  }

  public isMicMuted(): boolean {
    return this.isMutingRecording.value;
  }

  public isRecording(): boolean {
    return this.isWavRecorderConnected.value &&
      this.wavRecorder.value.getStatus() === 'recording';
  }

  public isPlayingBack(): boolean {
    return this.isWavPlayerPlaying.value;
  }

  private txPlaybackProgress(trackSampleOffset: TrackSampleOffset): void {
    const { trackId, offset, currentTime } = trackSampleOffset;
    const offsetMs = Math.floor(currentTime * 1000);
    //console.log('txPlaybackProgress', trackId, 'currentTime', currentTime, 'offsetMs', offsetMs);
    const appp = new AudioPlaybackProgressPt();
    appp.setItemId(trackId ?? '');
    appp.setOffsetMs(offsetMs);
    const cpsh = new ClientPush();
    cpsh.setAudioPlaybackStatus(appp);
    const toServer = new ToAudioServer();
    toServer.setCpsh(cpsh);
    this.txAudio(toServer.serializeBinary());
  }

  private async wavStreamPlayerCurrTrackId(): Promise<string> {
    const trackSampleOffset = await this.wavStreamPlayer.value.getTrackSampleOffset();
    // console.log('wavStreamPlayerCurrTrackId got offset:', trackSampleOffset);
    return (trackSampleOffset?.trackId) ?? '';
  }

  private async isWavStreamPlayerPlaying(): Promise<boolean> {
    const trackSampleOffset = await this.wavStreamPlayer.value.getTrackSampleOffset();
    return Boolean(trackSampleOffset?.trackId);
  }

  private async monitorPlaybackProgress(): Promise<void> {
    const curTrackId = await this.wavStreamPlayerCurrTrackId();
    this.isWavPlayerPlaying.value = curTrackId.length > 0;
    if (this.playingTrackId !== curTrackId) {
      this.playingTrackId = curTrackId;
      console.log('playingAudioTrack', this.playingTrackId);
    }
  }

  private startPlaybackMonitoring(): void {
    if (this.playbackMonitorInterval === null) {
      console.log('startPlaybackMonitoring - creating interval');
      this.playbackMonitorInterval = window.setInterval(
        async () => {
          // console.log('Monitoring interval tick');
          await this.monitorPlaybackProgress();
        },
        this.PLAYBACK_MONITOR_INTERVAL_MS
      );
    } else {
      console.log('startPlaybackMonitoring - interval already exists');
    }
  }

  private stopPlaybackMonitoring(): void {
    if (this.playbackMonitorInterval !== null) {
      console.log('startPlaybackMonitoring', 'stopping');
      window.clearInterval(this.playbackMonitorInterval);
      this.playbackMonitorInterval = null;
      this.isWavPlayerPlaying.value = false;
    }
  }

  private txAudio(serializedBinary: Uint8Array): void {
    if (this.wsConnection?.readyState === WebSocket.OPEN) {
      this.wsConnection.send(serializedBinary);
    } else {
      console.log('######  txAudio() when wsConnection.readyState !== WebSocket.OPEN  ######');
    }
  }

  private createAudioPacket(monoInt16Array: Int16Array, seqNum: number): AudioPacket {
    const abuf = new Uint8Array(
      monoInt16Array.buffer,
      monoInt16Array.byteOffset,
      monoInt16Array.byteLength
    );
    // console.log('createAudioPacket', 'monoInt16Array.length', monoInt16Array.length);
    const durationMs = Math.floor((1000 * monoInt16Array.length) / RealtimeAudioStreamingService.SAMPLE_RATE);
    const afu = new AudioFileUpload()
      .setAudio(abuf)
      .setAudioType('pcm16')
      .setAudioDurationMs(durationMs)
      .setSequenceNumber(seqNum);
    const cpsh = new ClientPush()
      .setAudioRequest(afu);
    const toServer = new ToAudioServer()
      .setCpsh(cpsh);
    return { serializedMsg: toServer.serializeBinary(), seqNum, durationMs };
  }

  private async handleAudioPlaybackCancel(spsh: ServerPush): Promise<void> {
    const apc = spsh.getAudioPlaybackCancel();
    if (apc) {
      const trackSampleOffset = await this.wavStreamPlayer.value.interrupt();
      if (trackSampleOffset?.trackId) {
        const { trackId, offset, currentTime } = trackSampleOffset;
        const offsetMs = Math.floor(currentTime * 1000);
        const appp = new AudioPlaybackProgressPt();
        appp.setItemId(trackId);
        appp.setOffsetMs(offsetMs);
        const cpsh = new ClientPush();
        cpsh.setAudioPlaybackCancelledAt(appp);
        const toServer = new ToAudioServer();
        toServer.setCpsh(cpsh);
        this.txAudio(toServer.serializeBinary());
        console.log('ClientPush.AUDIO_PLAYBACK_CANCELLED_AT', trackId, offsetMs);
      } else {
        console.log('ClientPush.AUDIO_PLAYBACK_CANCELLED_AT', '(nothing playing?)');
      }
    }
  }

  private handleAudioResponse(spsh: ServerPush): void {
    const afu = spsh.getAudioResponse();
    if (afu) {
      const trackId = afu.getItemId();
      if (trackId.length === 0) {
        console.log('handleAudioResponse', 'ignoring audio response with empty trackId');
        return;
      }
      if (this.receivingTrackId !== trackId) {
        this.receivingTrackId = trackId;
        console.log('receivingAudioTrack', this.receivingTrackId);
      }
      if (this.onlyAllowedResponseItemId.length > 0 && trackId !== this.onlyAllowedResponseItemId) {
        console.log('handleAudioResponse', 'ignoring audio response for trackId', trackId, '- only allowing', this.onlyAllowedResponseItemId);
        return;
      }
      const seqNum = afu.getSequenceNumber();
      const receivedAudio = afu.getAudio_asU8();
      const audioBuffer = receivedAudio.buffer.slice(
        receivedAudio.byteOffset,
        receivedAudio.byteOffset + receivedAudio.byteLength
      );
      const audioInt16Array = new Int16Array(audioBuffer);
      if (seqNum % 20 === 0) {
        //console.log( 'getAudioResponse', seqNum, 'O', receivedAudio.byteOffset, 'B', receivedAudio.length, 'L', audioInt16Array.length );
      }
      this.wavStreamPlayer.value.add16BitPCM(audioInt16Array, trackId);
    }
  }

  protected onCleanup(): void {
    this.stopPlaybackMonitoring();
    this.isActivelyRecording.value = false;
    this.wavRecorder.value.end().catch(console.error);
    this.startupQueue = [];
    this.chunkProcessorCallCount = 0;
    this.wavStreamPlayer.value.interrupt().catch(console.error);
    this.connAckd = false;
    this.audioConnKey = null;
  }

  protected override handleMessage(event: MessageEvent): void {
    const msg = ToAudioClient.deserializeBinary(new Uint8Array(event.data));
    switch (msg.getTypeCase()) {
      case ToAudioClient.TypeCase.SREQ: {
        const sreq = msg.getSreq();
        if (sreq) {
          switch (sreq.getSreqCase()) {
            case ServerReq.SreqCase.REALTIME_CONVERSATION_STOP: {
              const stopReq = sreq.getRealtimeConversationStop();
              if (stopReq) {
                this.handleConversationStop(sreq.getSreqId(), stopReq.getMaxPlaybackCompletionTmMs());
              }
            } break;
          }
        }
      } break;

      case ToAudioClient.TypeCase.CONNACK: {
        const cack = msg.getConnack();
        if (cack) {
          const telltale = cack.getTelltale();
          console.log(`WebSocket connection #${this.connectionCounter} received CONNACK:`, telltale);
          let startupQueueDuration = 0;
          let startupQueueDepth = 0;
          while (this.startupQueue.length > 0) {
            const audioPacket = this.startupQueue.shift();
            if (audioPacket) {
              this.txAudio(audioPacket.serializedMsg);
              // console.log('startupQueue.shift', audioPacket.seqNum, 'durationMs', audioPacket.durationMs);
              startupQueueDuration += audioPacket.durationMs;
              startupQueueDepth++;
            }
          }
          this.connAckd = true;
          console.log('startupQueueFlushed', 'depth', startupQueueDepth, 'durationMs', startupQueueDuration);
        }
      } break;

      case ToAudioClient.TypeCase.SPSH: {
        const spsh = msg.getSpsh();
        if (spsh) {
          switch (spsh.getPushCase()) {
            case ServerPush.PushCase.AUDIO_PLAYBACK_CANCEL:
              this.handleAudioPlaybackCancel(spsh);
              break;

            case ServerPush.PushCase.AUDIO_RESPONSE:
              this.handleAudioResponse(spsh);
              break;

            case ServerPush.PushCase.AUDIO_TRANSCRIPT: {
              const transcript = spsh.getAudioTranscript();
              if (transcript) {
                this.speechMessageCallbacks.forEach(callback => {
                  callback(transcript.getSource(), transcript.getMessage());
                });
              }
            } break;
          }
        }
      } break;
    }
  }

  protected sendConnectRequest(): void {
    if (this.audioConnKey && this.wsConnection) {
      const serverReq = new ClientConnectRequest()
        .setAuthKey(this.audioConnKey)
        .setUserIdent(this.userToken);
      const connReq = new ToAudioServer()
        .setConnreq(serverReq);
      this.wsConnection.send(connReq.serializeBinary());
    } else {
      console.log('sendConnectRequest: invalid audioConnKey or wsConnection');
    }
  }

  public async connectWithKey(audioConnKey: string): Promise<void> {
    if (this.audioConnKey === audioConnKey) {
      console.log('Audio connection already established with this key');
      return;
    }

    // Connect to WebSocket
    this.audioConnKey = audioConnKey;
    this.setupWebSocketConnection();

    try {
      // Set up audio output first
      await this.wavStreamPlayer.value.connect();

      // Start recording
      await this.wavRecorder.value.begin();
      this.isActivelyRecording.value = true;
      this.chunkProcessorCallCount = 0;

      // Set up recording callback
      const recordPromise = this.wavRecorder.value.record((data: AudioData) => {
        if (!this.isMutingRecording.value) {
          const monoInt16Array = new Int16Array(data.mono);
          if (monoInt16Array.byteLength > 0) {
            const audioPacket = this.createAudioPacket(monoInt16Array, this.chunkProcessorCallCount++);
            if (this.connAckd) {
              this.txAudio(audioPacket.serializedMsg);
            } else {
              this.startupQueue.push(audioPacket);
            }
          }
        }
        return true;
      });

      this.startPlaybackMonitoring();

      // Wait for recording setup to complete
      await recordPromise;

    } catch (error) {
      this.isActivelyRecording.value = false;
      this.stopPlaybackMonitoring();
      console.error('Error establishing audio connection:', error);
      throw error;
    }
  }

  public toggleAudioMuting(): void {
    this.isMutingRecording.value = !this.isMutingRecording.value;
  }

  public onSpeechMessage(callback: (source: string, text: string) => void) {
    this.speechMessageCallbacks.push(callback);
  }

  private async handleConversationStop(sreqId: string, maxPlaybackCompletionTmMs: number): Promise<void> {
    if (this.isHandlingConversationStop) {
      console.log('Already handling conversation stop, ignoring duplicate request');
      return;
    }

    this.isHandlingConversationStop = true;
    try {
      // Send gracefully_stopping_at message immediately
      const curTrackId = await this.wavStreamPlayerCurrTrackId();
      const trackSampleOffset = await this.wavStreamPlayer.value.getTrackSampleOffset();
      const offsetMs = Math.floor((trackSampleOffset?.currentTime ?? 0) * 1000);

      const appp = new AudioPlaybackProgressPt();
      appp.setItemId(curTrackId);
      appp.setOffsetMs(offsetMs);
      const sack = new ServerReqAck();
      sack.setSreqId(sreqId);
      sack.setGracefullyStoppingAt(appp);
      const toServer = new ToAudioServer();
      toServer.setSack(sack);
      this.txAudio(toServer.serializeBinary());

      // Wait for playback to complete
      await this.waitForPlaybackCompletion(maxPlaybackCompletionTmMs);

      // Send graceful_stop_done instead of generic ack
      const sackDone = new ServerReqAck();
      sackDone.setSreqId(sreqId);
      sackDone.setGracefulStopDone(new Empty());
      const toServerDone = new ToAudioServer();
      toServerDone.setSack(sackDone);
      this.txAudio(toServerDone.serializeBinary());
    } finally {
      this.isHandlingConversationStop = false;
    }
  }

  private async waitForPlaybackCompletion(maxPlaybackCompletionTmMs: number): Promise<void> {
    const curTrackId = await this.wavStreamPlayerCurrTrackId();
    if (curTrackId.length > 0) {
      this.onlyAllowedResponseItemId = curTrackId; // only rx & playback audio for curTrackId while we wait
      console.log('waitForPlaybackCompletion', 'WavPlayerIsPlaying', curTrackId);

      const startTime = Date.now();
      while (await this.isWavStreamPlayerPlaying()) {
        if (Date.now() - startTime > maxPlaybackCompletionTmMs) {
          console.log('Playback wait timeout reached, interrupting playback');
          await this.wavStreamPlayer.value.interrupt();
          break;
        }
        await new Promise(resolve => setTimeout(resolve, 100));
      }

      const elapsedTime = Date.now() - startTime;
      console.log('waitForPlaybackCompletion', 'WavPlayerDonePlaying', elapsedTime, 'ms');
    }
    this.onlyAllowedResponseItemId = '';
  }
}
