// Copyright (C) 2020 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "StreamSetObserver.h"

#include <android-base/logging.h>
#include <grpcpp/grpcpp.h>

#include "ClientConfig.pb.h"
#include "GrpcGraph.h"
#include "InputFrame.h"
#include "RunnerComponent.h"
#include "prebuilt_interface.h"
#include "types/Status.h"

namespace android {
namespace automotive {
namespace computepipe {
namespace graph {

SingleStreamObserver::SingleStreamObserver(int streamId, EndOfStreamReporter* endOfStreamReporter,
                                           StreamGraphInterface* streamGraphInterface) :
      mStreamId(streamId),
      mEndOfStreamReporter(endOfStreamReporter),
      mStreamGraphInterface(streamGraphInterface) {}

Status SingleStreamObserver::startObservingStream() {
    {
        std::lock_guard lock(mStopObservationLock);
        mStopped = false;
    }
    mThread = std::thread([this]() {
        proto::ObserveOutputStreamRequest observeStreamRequest;
        observeStreamRequest.set_stream_id(mStreamId);
        ::grpc::ClientContext context;
        ::grpc::CompletionQueue cq;

        void* tag;
        bool cqStatus;

        std::unique_ptr<::grpc::ClientAsyncReader<proto::OutputStreamResponse>> rpc(
                mStreamGraphInterface->getServiceStub()
                        ->AsyncObserveOutputStream(&context, observeStreamRequest, &cq, nullptr));

        proto::OutputStreamResponse response;

        cq.Next(&tag, &cqStatus);
        while (cqStatus) {
            // Dispatch data only stream is being observed.
            rpc->Read(&response, nullptr);
            {
                std::lock_guard lock(mStopObservationLock);
                if (mStopped || mStreamGraphInterface == nullptr) {
                    LOG(INFO) << "Graph stopped. ";
                    break;
                }

                // Since this is a separate thread, we need not worry about recursive locking
                // and callback can be executed without creating a new thread.
                if (response.has_pixel_data()) {
                    proto::PixelData pixels = response.pixel_data();
                    runner::InputFrame frame(pixels.height(), pixels.width(),
                                             static_cast<PixelFormat>(
                                                     static_cast<int>(pixels.format())),
                                             pixels.step(),
                                             reinterpret_cast<const unsigned char*>(
                                                     pixels.data().c_str()));
                    mStreamGraphInterface->dispatchPixelData(mStreamId, response.timestamp_us(),
                                                             frame);
                } else if (response.has_semantic_data()) {
                    mStreamGraphInterface
                            ->dispatchSerializedData(mStreamId, response.timestamp_us(),
                                                     std::move(*response.mutable_semantic_data()));
                }
            }

            cq.Next(&tag, &cqStatus);
        }

        ::grpc::Status grpcStatus;
        rpc->Finish(&grpcStatus, nullptr);
        if (!grpcStatus.ok()) {
            LOG(ERROR) << "Failed RPC with message: " << grpcStatus.error_message();
        }

        cq.Shutdown();
        if (mEndOfStreamReporter) {
            std::lock_guard lock(mStopObservationLock);
            mStopped = true;
            std::thread t =
                    std::thread([this]() { mEndOfStreamReporter->reportStreamClosed(mStreamId); });

            t.detach();
        }

        proto::OutputStreamResponse streamResponse;
    });

    return Status::SUCCESS;
}

void SingleStreamObserver::stopObservingStream() {
    std::lock_guard lock(mStopObservationLock);
    mStopped = true;
}

SingleStreamObserver::~SingleStreamObserver() {
    stopObservingStream();

    if (mThread.joinable()) {
        mThread.join();
    }
}

StreamSetObserver::StreamSetObserver(const runner::ClientConfig& clientConfig,
                                     StreamGraphInterface* streamGraphInterface) :
      mClientConfig(clientConfig), mStreamGraphInterface(streamGraphInterface) {}

Status StreamSetObserver::startObservingStreams() {
    std::lock_guard lock(mLock);
    std::map<int, int> outputConfigs = {};
    mClientConfig.getOutputStreamConfigs(outputConfigs);

    if (!mStopped || !mStreamObservers.empty()) {
        LOG(ERROR) << "Already started observing streams. Duplicate call is not allowed";
        return Status::ILLEGAL_STATE;
    }

    for (const auto& it : outputConfigs) {
        auto streamObserver =
                std::make_unique<SingleStreamObserver>(it.first, this, mStreamGraphInterface);
        Status status = streamObserver->startObservingStream();
        if (status != Status::SUCCESS) {
            std::thread t([this]() { stopObservingStreams(true); });
            t.detach();
            return status;
        }
        mStreamObservers.emplace(std::make_pair(it.first, std::move(streamObserver)));
    }

    mStopped = mStreamObservers.empty();
    return Status::SUCCESS;
}

void StreamSetObserver::stopObservingStreams(bool stopImmediately) {
    std::unique_lock lock(mLock);
    if (mStopped) {
        // Separate thread is necessary here to avoid recursive locking.
        if (mGraphTerminationThread.joinable()) {
            mGraphTerminationThread.join();
        }
        mGraphTerminationThread = std::thread([streamGraphInterface(mStreamGraphInterface)]() {
            streamGraphInterface->dispatchGraphTerminationMessage(Status::SUCCESS, "");
        });
        return;
    }

    // Wait for the streams to close if we are not stopping immediately.
    if (stopImmediately) {
        for (auto& it : mStreamObservers) {
            it.second->stopObservingStream();
        }

        mStoppedCv.wait(lock, [this]() -> bool { return mStopped; });
    }
}

void StreamSetObserver::reportStreamClosed(int streamId) {
    std::lock_guard lock(mLock);
    auto streamObserver = mStreamObservers.find(streamId);
    if (streamObserver == mStreamObservers.end()) {
        return;
    }
    mStreamObservers.erase(streamObserver);
    if (mStreamObservers.empty()) {
        mStopped = true;
        mStoppedCv.notify_one();
        mGraphTerminationThread = std::thread([streamGraphInterface(mStreamGraphInterface)]() {
            streamGraphInterface->dispatchGraphTerminationMessage(Status::SUCCESS, "");
        });
    }
}

StreamSetObserver::~StreamSetObserver() {
    if (mGraphTerminationThread.joinable()) {
        mGraphTerminationThread.join();
    }
}

}  // namespace graph
}  // namespace computepipe
}  // namespace automotive
}  // namespace android