// Copyright (C) 2016  Davis E. King (davis@dlib.net)
// License: Boost Software License   See LICENSE.txt for the full license.
#ifndef DLIB_SUBPROCeSS_STREAM_H_
#define DLIB_SUBPROCeSS_STREAM_H_

#include <utility>
#include <unistd.h>
#include <iostream>
#include <memory>
#include <dlib/matrix.h>
#include <sys/types.h>
#include <sys/socket.h>


namespace dlib
{

// --------------------------------------------------------------------------------------

    // Call dlib's serialize and deserialize by default.   The point of this version of
    // serialize is to do something fast that normally we wouldn't do, like directly copy
    // memory.  This is safe since this is an interprocess communication happening the same
    // machine.
    template <typename T> void interprocess_serialize ( const T& item, std::ostream& out) { serialize(item, out); } 
    template <typename T> void interprocess_deserialize (T& item, std::istream& in) { deserialize(item, in); } 

    // But have overloads for direct memory copies for some types since this is faster than
    // their default serialization.
    template <typename T, long NR, long NC, typename MM, typename L>
    void interprocess_serialize(const dlib::matrix<T,NR,NC,MM,L>& item, std::ostream& out)
    {
        dlib::serialize(item.nr(), out);
        dlib::serialize(item.nc(), out);
        if (item.size() != 0)
            out.write((const char*)&item(0,0), sizeof(T)*item.size());
        if (!out)
            throw dlib::serialization_error("Error writing matrix to interprocess iostream.");
    }

    template <typename T, long NR, long NC, typename MM, typename L>
    void interprocess_deserialize(dlib::matrix<T,NR,NC,MM,L>& item, std::istream& in)
    {
        long nr, nc;
        dlib::deserialize(nr, in);
        dlib::deserialize(nc, in);
        item.set_size(nr,nc);
        if (item.size() != 0)
            in.read((char*)&item(0,0), sizeof(T)*item.size());
        if (!in)
            throw dlib::serialization_error("Error reading matrix from interprocess iostream.");
    }

// ----------------------------------------------------------------------------------------

    namespace impl{ std::iostream& get_data_iostream(); }

    inline void send_to_parent_process() {impl::get_data_iostream().flush();}
    template <typename U, typename ...T>
    void send_to_parent_process(U&& arg1, T&& ...args)
    /*!
        ensures
            - sends all the arguments to send_to_parent_process() to the parent process by
              serializing them with interprocess_serialize().
    !*/
    {
        interprocess_serialize(arg1, impl::get_data_iostream());
        send_to_parent_process(std::forward<T>(args)...);
        if (!impl::get_data_iostream())
            throw dlib::error("Error sending object to parent process.");
    }

    inline void receive_from_parent_process() {}
    template <typename U, typename ...T>
    void receive_from_parent_process(U&& arg1, T&& ...args)
    /*!
        ensures
            - receives all the arguments to receive_from_parent_process() from the parent
              process by deserializing them from interprocess_serialize().
    !*/
    {
        interprocess_deserialize(arg1, impl::get_data_iostream());
        receive_from_parent_process(std::forward<T>(args)...);
        if (!impl::get_data_iostream())
            throw dlib::error("Error receiving object from parent process.");
    }


// ----------------------------------------------------------------------------------------

    class filestreambuf;

    class subprocess_stream : noncopyable
    {
        /*!
            WHAT THIS OBJECT REPRESENTS
                This is a tool for spawning a subprocess and communicating with it.  Here
                is an example: 

                    subprocess_stream s("/usr/bin/some_program");
                    s.send(obj1, obj2, obj3);
                    s.receive(obj4, obj5);
                    s.wait(); // wait for sub process to terminate 

                Then in the sub process you would have:

                    receive_from_parent_process(obj1, obj2, obj3);
                    // do stuff
                    cout << "echo this text to parent cout" << endl;
                    send_to_parent_process(obj4, obj5);


                Additionally, if the sub process writes to its standard out then that will
                be echoed to std::cout in the parent process.  Writing to std::cerr or
                returning a non-zero value from main will also be noted by the parent
                process and an appropriate exception will be thrown.
        !*/

    public:

        explicit subprocess_stream(
            const char* program_name
        );
        /*!
            ensures
                - spawns a sub process by executing the file with the given program_name.
        !*/

        ~subprocess_stream(
        );
        /*!
            ensures
                - calls wait().  Note that the destructor never throws even though wait() can. 
                  If an exception is thrown by wait() it is just logged to std::cerr.
        !*/

        void wait(
        );
        /*!
            ensures
                - closes the input stream to the child process and then waits for the child
                  to terminate.  
                - If the child returns an error (by returning != 0 from its main) or
                  outputs to its standard error then wait() throws a dlib::error() with the
                  standard error output in it.
        !*/

        int get_child_pid() const { return child_pid; }
        /*!
            ensures
                - returns the PID of the child process
        !*/

        template <typename U, typename ...T>
        void send(U&& arg1, T&& ...args)
        /*!
            ensures
                - sends all the arguments to send() to the subprocess by serializing them
                  with interprocess_serialize().
        !*/
        {
            interprocess_serialize(arg1, iosub);
            send(std::forward<T>(args)...);
            if (!iosub)
            {
                std::ostringstream sout;
                sout << stderr.rdbuf();
                throw dlib::error("Error sending object to child process.\n" + sout.str());
            }
        }
        void send() {iosub.flush();}

        template <typename U, typename ...T>
        void receive(U&& arg1, T&& ...args)
        /*!
            ensures
                - receives all the arguments to receive() to the subprocess by deserializing
                  them with interprocess_deserialize().
        !*/
        {
            interprocess_deserialize(arg1, iosub);
            receive(std::forward<T>(args)...);
            if (!iosub)
            {
                std::ostringstream sout;
                sout << stderr.rdbuf();
                throw dlib::error("Error receiving object from child process.\n" + sout.str() );
            }
        }
        void receive() {}


    private:

        void send_eof(); 

        class cpipe : noncopyable
        {
        private:
            int fd[2];
        public:
            cpipe() { if (socketpair(AF_LOCAL, SOCK_STREAM, 0, fd)) throw dlib::error("Failed to create pipe"); }
            ~cpipe() { close(); }
            int parent_fd() const { return fd[0]; }
            int child_fd() const { return fd[1]; }
            void close() { ::close(fd[0]); ::close(fd[1]); }
        };

        cpipe data_pipe;
        cpipe stdout_pipe;
        cpipe stderr_pipe;
        bool wait_called = false;
        std::unique_ptr<filestreambuf> inout_buf; 
        std::unique_ptr<filestreambuf> err_buf;
        int child_pid = -1;
        std::istream stderr;
        std::iostream iosub;
    };
}

// ----------------------------------------------------------------------------------------

#endif // DLIB_SUBPROCeSS_STREAM_H_