// Copyright (C) 2003  Davis E. King (davis@dlib.net)
// License: Boost Software License   See LICENSE.txt for the full license.
#ifndef DLIB_SERVER_KERNEL_CPp_
#define DLIB_SERVER_KERNEL_CPp_

#include "server_kernel.h"
#include "../string.h"

namespace dlib
{

// ----------------------------------------------------------------------------------------

    server::
    server (
    ) :
        listening_port(0),
        running(false),
        shutting_down(false),
        running_signaler(running_mutex),
        thread_count(0),
        thread_count_signaler(thread_count_mutex),
        max_connections(1000),
        thread_count_zero(thread_count_mutex),
        graceful_close_timeout(500)
    {
    }

// ----------------------------------------------------------------------------------------

    server::
    ~server (
    )
    {
        clear();
    }

// ----------------------------------------------------------------------------------------

    unsigned long server::
    get_graceful_close_timeout (
    ) const
    {
        auto_mutex lock(max_connections_mutex);
        return graceful_close_timeout;
    }

// ----------------------------------------------------------------------------------------

    void server::
    set_graceful_close_timeout (
        unsigned long timeout
    ) 
    {
        auto_mutex lock(max_connections_mutex);
        graceful_close_timeout = timeout;
    }

// ----------------------------------------------------------------------------------------


    int server::
    get_max_connections (
    ) const
    {
        max_connections_mutex.lock();
        int temp = max_connections;
        max_connections_mutex.unlock();
        return temp;
    }

// ----------------------------------------------------------------------------------------

    void server::
    set_max_connections (
        int max
    ) 
    {
        // make sure requires clause is not broken
        DLIB_CASSERT( 
            max >= 0 ,
            "\tvoid server::set_max_connections"
            << "\n\tmax == " << max
            << "\n\tthis: " << this
            );

        max_connections_mutex.lock();
        max_connections = max;
        max_connections_mutex.unlock();
    }

// ----------------------------------------------------------------------------------------

    void server::
    clear (
    )
    {
        // signal that we are shutting down
        shutting_down_mutex.lock();
        shutting_down = true;
        shutting_down_mutex.unlock();



        max_connections_mutex.lock();
        listening_port_mutex.lock();
        listening_ip_mutex.lock();
        listening_ip = "";        
        listening_port = 0;
        max_connections = 1000;
        graceful_close_timeout = 500;
        listening_port_mutex.unlock();
        listening_ip_mutex.unlock();
        max_connections_mutex.unlock();


        // tell all the connections to shut down
        cons_mutex.lock();
        connection* temp;
        while (cons.size() > 0)
        {
            cons.remove_any(temp);
            temp->shutdown();
        }
        cons_mutex.unlock();


        // wait for all the connections to shut down 
        thread_count_mutex.lock();
        while (thread_count > 0)
        {
            thread_count_zero.wait();
        }
        thread_count_mutex.unlock();
        



        // wait for the listener to close
        running_mutex.lock();
        while (running == true)
        {
            running_signaler.wait();
        }
        running_mutex.unlock();



        // signal that the shutdown is complete
        shutting_down_mutex.lock();
        shutting_down = false;
        shutting_down_mutex.unlock();
    }

// ----------------------------------------------------------------------------------------

    void server::
    start_async_helper (
    )
    {
        try
        {
            start_accepting_connections();
        }
        catch (std::exception& e)
        {
            sdlog << LERROR << e.what();
        }
    }

// ----------------------------------------------------------------------------------------

    void server::
    start_async (
    )
    {
        auto_mutex lock(running_mutex);
        if (running)
            return;

        // Any exceptions likely to be thrown by the server are going to be
        // thrown when trying to bind the port.  So calling this here rather
        // than in the thread we are about to make will cause start_async()
        // to report errors back to the user in a very straight forward way.
        open_listening_socket();

        async_start_thread.reset(new thread_function(make_mfp(*this,&server::start_async_helper)));
    }

// ----------------------------------------------------------------------------------------

    void server::
    open_listening_socket (
    )
    {
        if (!sock)
        {
            int status = create_listener(sock,listening_port,listening_ip);
            const int port_used = listening_port;

            // if there was an error then clear this object
            if (status < 0)
            {
                max_connections_mutex.lock();
                listening_port_mutex.lock();
                listening_ip_mutex.lock();
                listening_ip = "";        
                listening_port = 0;
                max_connections = 1000;
                graceful_close_timeout = 500;
                listening_port_mutex.unlock();
                listening_ip_mutex.unlock();
                max_connections_mutex.unlock();
            }



            // throw an exception for the error
            if (status == PORTINUSE)
            {
                throw dlib::socket_error(
                    EPORT_IN_USE,
                    "error occurred in server::start()\nport " + cast_to_string(port_used) + " already in use"
                );
            }
            else if (status == OTHER_ERROR)
            {
                throw dlib::socket_error(
                    "error occurred in server::start()\nunable to create listener"
                );            
            }
        }

        running_mutex.lock();
        running = true;
        running_mutex.unlock();
    }

// ----------------------------------------------------------------------------------------

    void server::
    start (
    )
    {
        // make sure requires clause is not broken
        DLIB_CASSERT( 
              this->is_running() == false,
            "\tvoid server::start"
            << "\n\tis_running() == " << this->is_running() 
            << "\n\tthis: " << this
            );

        start_accepting_connections();

    }

// ----------------------------------------------------------------------------------------

    void server::
    start_accepting_connections (
    )
    {
        open_listening_socket();

        // determine the listening port
        bool port_assigned = false;
        listening_port_mutex.lock();
        if (listening_port == 0)
        {
            port_assigned = true;
            listening_port = sock->get_listening_port();
        }
        listening_port_mutex.unlock();
        if (port_assigned)
            on_listening_port_assigned();
        


        int status = 0;

        connection* client;
        bool exit = false;
        while ( true )
        {


            // accept the next connection
            status = sock->accept(client,1000);


            // if there was an error then quit the loop
            if (status == OTHER_ERROR)
            {
                break;
            }

            shutting_down_mutex.lock();
            // if we are shutting down then signal that we should quit the loop
            exit = shutting_down;
            shutting_down_mutex.unlock();  


            // if we should be shutting down 
            if (exit)
            {
                // if a connection was opened then close it
                if (status == 0)
                    delete client;
                break;
            }



            // if the accept timed out
            if (status == TIMEOUT)
            {
                continue;       
            }





            // add this new connection to cons
            cons_mutex.lock();
            connection* client_temp = client;
            try{cons.add(client_temp);}
            catch(...)
            { 
                sock.reset();
                delete client;
                cons_mutex.unlock();

                // signal that we are not running start() anymore
                running_mutex.lock();
                running = false;
                running_signaler.broadcast();
                running_mutex.unlock();               
                

                clear(); 
                throw;
            }
            cons_mutex.unlock();


            // make a param structure
            param* temp = 0;
            try{
            temp = new param (
                            *this,
                            *client,
                            get_graceful_close_timeout() 
                            );
            } catch (...) 
            {
                sock.reset();
                delete client;
                running_mutex.lock();
                running = false;
                running_signaler.broadcast();
                running_mutex.unlock();
                clear(); 
                throw;
            }


            // if create_new_thread failed
            if (!create_new_thread(service_connection,temp))
            {
                delete temp;
                // close the listening socket
                sock.reset();

                // close the new connection and remove it from cons
                cons_mutex.lock();
                connection* ctemp;
                if (cons.is_member(client))
                {
                    cons.remove(client,ctemp);
                }
                delete client;
                cons_mutex.unlock();


                // signal that the listener has closed
                running_mutex.lock();
                running = false;
                running_signaler.broadcast();
                running_mutex.unlock();

                // make sure the object is cleared
                clear();

                // throw the exception
                throw dlib::thread_error(
                    ECREATE_THREAD,
                    "error occurred in server::start()\nunable to start thread"
                    );    
            }
            // if we made the new thread then update thread_count
            else
            {
                // increment the thread count
                thread_count_mutex.lock();
                ++thread_count;
                if (thread_count == 0)
                    thread_count_zero.broadcast();
                thread_count_mutex.unlock();
            }


            

            // check if we have hit the maximum allowed number of connections
            max_connections_mutex.lock();
            // if max_connections is zero or the loop is ending then skip this
            if (max_connections != 0)
            {
                // wait for thread_count to be less than max_connections
                thread_count_mutex.lock();
                while (thread_count >= max_connections)
                {
                    max_connections_mutex.unlock();
                    thread_count_signaler.wait();
                    max_connections_mutex.lock();     

                    // if we are shutting down the quit the loop
                    shutting_down_mutex.lock();
                    exit = shutting_down;
                    shutting_down_mutex.unlock();
                    if (exit)
                        break;
                }
                thread_count_mutex.unlock();
            }
            max_connections_mutex.unlock();

            if (exit)
            {
                break;
            }
        } //while ( true )


        // close the socket
        sock.reset();

        // signal that the listener has closed
        running_mutex.lock();
        running = false;
        running_signaler.broadcast();
        running_mutex.unlock();

        // if there was an error with accept then throw an exception
        if (status == OTHER_ERROR)
        {
            // make sure the object is cleared
            clear();

            // throw the exception
            throw dlib::socket_error(
             "error occurred in server::start()\nlistening socket returned error"
                );            
        }
    }

// ----------------------------------------------------------------------------------------

    bool server::
    is_running (
    ) const
    {
        running_mutex.lock();
        bool temp = running;
        running_mutex.unlock();
        return temp;
    }

// ----------------------------------------------------------------------------------------

    const std::string server::
    get_listening_ip (
    ) const
    {
        listening_ip_mutex.lock();
        std::string ip(listening_ip);
        listening_ip_mutex.unlock();
        return ip;
    }

// ----------------------------------------------------------------------------------------

    int server::
    get_listening_port (
    ) const
    {
        listening_port_mutex.lock();
        int port = listening_port;
        listening_port_mutex.unlock();        
        return port;
    }

// ----------------------------------------------------------------------------------------

    void server::
    set_listening_port (
        int port
    )
    {
        // make sure requires clause is not broken
        DLIB_CASSERT( 
            ( port >= 0 &&
              this->is_running() == false ),
            "\tvoid server::set_listening_port"
            << "\n\tport         == " << port
            << "\n\tis_running() == " << this->is_running() 
            << "\n\tthis: " << this
            );

        listening_port_mutex.lock();
        listening_port = port;
        listening_port_mutex.unlock();
    }

// ----------------------------------------------------------------------------------------

    void server::
    set_listening_ip (
        const std::string& ip
    )
    {
        // make sure requires clause is not broken
        DLIB_CASSERT( 
            ( ( is_ip_address(ip) || ip == "" ) &&
              this->is_running() == false ),
            "\tvoid server::set_listening_ip"
            << "\n\tip           == " << ip
            << "\n\tis_running() == " << this->is_running() 
            << "\n\tthis: " << this
            );

        listening_ip_mutex.lock();
        listening_ip = ip;
        listening_ip_mutex.unlock();
    }

// ----------------------------------------------------------------------------------------
// ----------------------------------------------------------------------------------------
    // static member function definitions
// ----------------------------------------------------------------------------------------
// ----------------------------------------------------------------------------------------

    const logger server::sdlog("dlib.server");

    void server::
    service_connection(
        void* item
    )
    {
        param& p = *static_cast<param*>(item);


        p.the_server.on_connect(p.new_connection);


        // remove this connection from cons and close it
        p.the_server.cons_mutex.lock();
        connection* temp;
        if (p.the_server.cons.is_member(&p.new_connection))
            p.the_server.cons.remove(&p.new_connection,temp);
        p.the_server.cons_mutex.unlock();

        try{ close_gracefully(&p.new_connection, p.graceful_close_timeout); } 
        catch (...) { sdlog << LERROR << "close_gracefully() threw"; } 

        // decrement the thread count and signal if it is now zero
        p.the_server.thread_count_mutex.lock();
        --p.the_server.thread_count;
        p.the_server.thread_count_signaler.broadcast();
        if (p.the_server.thread_count == 0)
            p.the_server.thread_count_zero.broadcast();
        p.the_server.thread_count_mutex.unlock();

        delete &p;


    }

// ----------------------------------------------------------------------------------------

}

#endif // DLIB_SERVER_KERNEL_CPp_