import { Message, Stream, StreamOptions } from '~/api/customprotocol-core';
import * as helpers from '~/domain/helpers';
import { ProcessEvent } from '~/domain/process-events';
import { PodInfo } from '~/domain/timescape';
import { ProtoFactory } from '~/factories/proto';
import { logger } from '~/utils/logger';
import { ThrottledEmitter } from '~/utils/throttled-emitter';
import * as fgspb from '~backend/proto/tetragon/events_pb';
import * as uipepb from '~backend/proto/ui/process-events_pb';

export enum Event {
  ProcessEvents = 'process-events',
}

export type Handlers = {
  [Event.ProcessEvents]: (pe: ProcessEvent[]) => void;
};

export type Options = StreamOptions & {
  pod: PodInfo;
};

export class ProcessEventsStream extends Stream<Handlers> {
  public static readonly eventsThrottlingDelay = 500;

  private eventsThrottler: ThrottledEmitter<ProcessEvent>;
  public readonly podInfo: PodInfo;

  constructor(opts: Options) {
    super(opts);
    this.podInfo = opts.pod;

    // TODO: I guess such throttling is guaranteed by custom protocol
    this.eventsThrottler = new ThrottledEmitter(ProcessEventsStream.eventsThrottlingDelay);
    this.setupEventHandlers();
  }

  public getPodName(): string {
    return this.podInfo.name;
  }

  public onProcessEvents(fn: Handlers[Event.ProcessEvents]): this {
    this.on(Event.ProcessEvents, fn);
    return this;
  }

  protected messageBuilder(msg: Message, isFirst: boolean): Message {
    if (!isFirst) return msg;

    const req = ProtoFactory.processEventsRequestFromPodInfo(this.podInfo);
    msg.setBodyBytes(uipepb.GetTimescapeProcessEventsRequest.toBinary(req));

    return msg;
  }

  private setupEventHandlers() {
    this.eventsThrottler.on(events => {
      this.emit(Event.ProcessEvents, events);
    });

    this.onMessage(msg => {
      const events = uipepb.GetTimescapeProcessEventsResponse.fromBinary(msg.body).events;

      this.parseAndEmitEvents(events);
    });
  }

  private parseAndEmitEvents(eventList: fgspb.GetEventsResponse[]) {
    const parsedEvents: ProcessEvent[] = [];

    eventList.forEach(evt => {
      switch (evt.event.oneofKind) {
        case 'process_accept':
        case 'process_connect':
        case 'process_close':
        case 'process_exec':
        case 'process_exit':
        case 'process_listen':
          break;
        default:
          return;
      }

      const parsed = helpers.processEvents.parseObject(evt);
      if (parsed == null) {
        logger.warn('process event was not successfully parsed: ', evt);
        return;
      }

      parsedEvents.push(parsed);
    });

    logger.log(`parsed events: `, parsedEvents);
    if (parsedEvents.length > 0) {
      this.eventsThrottler.emit(parsedEvents);
    }
  }
}
