/*
 * Copyright (c) 2010, FRiCKLE Piotr Sikora <info@frickle.com>
 * Copyright (c) 2009-2010, Xiaozhe Wang <chaoslawful@gmail.com>
 * Copyright (c) 2009-2010, Yichun Zhang <agentzh@gmail.com>
 * All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in the
 *    documentation and/or other materials provided with the distribution.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
 * HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */

#ifndef DDEBUG
#define DDEBUG 0
#endif

#include <nginx.h>
#include "ngx_postgres_ddebug.h"
#include "ngx_postgres_module.h"
#include "ngx_postgres_keepalive.h"
#include "ngx_postgres_processor.h"


ngx_int_t
ngx_postgres_upstream_init(ngx_conf_t *cf, ngx_http_upstream_srv_conf_t *uscf)
{
    ngx_postgres_upstream_srv_conf_t  *pgscf;
    ngx_postgres_upstream_server_t    *server;
    ngx_postgres_upstream_peers_t     *peers;
    ngx_uint_t                         i, j, n;

    dd("entering");

    uscf->peer.init = ngx_postgres_upstream_init_peer;

    pgscf = ngx_http_conf_upstream_srv_conf(uscf, ngx_postgres_module);

    if (pgscf->servers == NULL || pgscf->servers->nelts == 0) {
        ngx_log_error(NGX_LOG_ERR, cf->log, 0,
                      "postgres: no \"postgres_server\" defined"
                      " in upstream \"%V\" in %s:%ui",
                      &uscf->host, uscf->file_name, uscf->line);

        dd("returning NGX_ERROR");
        return NGX_ERROR;
    }

    /* pgscf->servers != NULL */

    server = uscf->servers->elts;

    n = 0;

    for (i = 0; i < uscf->servers->nelts; i++) {
        n += server[i].naddrs;
    }

    peers = ngx_pcalloc(cf->pool, sizeof(ngx_postgres_upstream_peers_t)
            + sizeof(ngx_postgres_upstream_peer_t) * (n - 1));

    if (peers == NULL) {
        dd("returning NGX_ERROR");
        return NGX_ERROR;
    }

    peers->single = (n == 1);
    peers->number = n;
    peers->name = &uscf->host;

    n = 0;

    for (i = 0; i < uscf->servers->nelts; i++) {
        for (j = 0; j < server[i].naddrs; j++) {
            peers->peer[n].sockaddr = server[i].addrs[j].sockaddr;
            peers->peer[n].socklen = server[i].addrs[j].socklen;
            peers->peer[n].name = server[i].addrs[j].name;
            peers->peer[n].port = server[i].port;
            peers->peer[n].dbname = server[i].dbname;
            peers->peer[n].user = server[i].user;
            peers->peer[n].password = server[i].password;

            peers->peer[n].host.data = ngx_pnalloc(cf->pool,
                                                   NGX_SOCKADDR_STRLEN);
            if (peers->peer[n].host.data == NULL) {
                dd("returning NGX_ERROR");
                return NGX_ERROR;
            }

            peers->peer[n].host.len = ngx_sock_ntop(peers->peer[n].sockaddr,
#if defined(nginx_version) && (nginx_version >= 1005003)
                                          peers->peer[n].socklen,
#endif
                                          peers->peer[n].host.data,
                                          NGX_SOCKADDR_STRLEN, 0);
            if (peers->peer[n].host.len == 0) {
                dd("returning NGX_ERROR");
                return NGX_ERROR;
            }

            n++;
        }
    }

    pgscf->peers = peers;
    pgscf->active_conns = 0;

    if (pgscf->max_cached) {
        dd("returning");
        return ngx_postgres_keepalive_init(cf->pool, pgscf);
    }

    dd("returning NGX_OK");
    return NGX_OK;
}

ngx_int_t
ngx_postgres_upstream_init_peer(ngx_http_request_t *r,
    ngx_http_upstream_srv_conf_t *uscf)
{
    ngx_postgres_upstream_peer_data_t  *pgdt;
    ngx_postgres_upstream_srv_conf_t   *pgscf;
    ngx_postgres_loc_conf_t            *pglcf;
    ngx_postgres_ctx_t                 *pgctx;
    ngx_http_core_loc_conf_t           *clcf;
    ngx_http_upstream_t                *u;
    ngx_postgres_mixed_t               *query;
    ngx_str_t                           sql;
    ngx_uint_t                          i;

    dd("entering");

    pgdt = ngx_pcalloc(r->pool, sizeof(ngx_postgres_upstream_peer_data_t));
    if (pgdt == NULL) {
        goto failed;
    }

    u = r->upstream;

    pgdt->upstream = u;
    pgdt->request = r;

    pgscf = ngx_http_conf_upstream_srv_conf(uscf, ngx_postgres_module);
    pglcf = ngx_http_get_module_loc_conf(r, ngx_postgres_module);
    pgctx = ngx_http_get_module_ctx(r, ngx_postgres_module);

    pgdt->srv_conf = pgscf;
    pgdt->loc_conf = pglcf;

    u->peer.data = pgdt;
    u->peer.get = ngx_postgres_upstream_get_peer;
    u->peer.free = ngx_postgres_upstream_free_peer;

    if (pglcf->query.methods_set & r->method) {
        /* method-specific query */
        dd("using method-specific query");

        query = pglcf->query.methods->elts;
        for (i = 0; i < pglcf->query.methods->nelts; i++) {
            if (query[i].key & r->method) {
                query = &query[i];
                break;
            }
        }

        if (i == pglcf->query.methods->nelts) {
            goto failed;
        }
    } else {
        /* default query */
        dd("using default query");

        query = pglcf->query.def;
    }

    if (query->cv) {
        /* complex value */
        dd("using complex value");

        if (ngx_http_complex_value(r, query->cv, &sql) != NGX_OK) {
            goto failed;
        }

        if (sql.len == 0) {
            clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);

            ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
                          "postgres: empty \"postgres_query\" (was: \"%V\")"
                          " in location \"%V\"", &query->cv->value,
                          &clcf->name);

            goto failed;
        }

        pgdt->query = sql;
    } else {
        /* simple value */
        dd("using simple value");

        pgdt->query = query->sv;
    }

    /* set $postgres_query */
    pgctx->var_query = pgdt->query;

    dd("returning NGX_OK");
    return NGX_OK;

failed:

#if defined(nginx_version) && (nginx_version >= 8017)
    dd("returning NGX_ERROR");
    return NGX_ERROR;
#else
    r->upstream->peer.data = NULL;

    dd("returning NGX_OK (NGX_ERROR)");
    return NGX_OK;
#endif
}

ngx_int_t
ngx_postgres_upstream_get_peer(ngx_peer_connection_t *pc, void *data)
{
    ngx_postgres_upstream_peer_data_t  *pgdt = data;
    ngx_postgres_upstream_srv_conf_t   *pgscf;
#if defined(nginx_version) && (nginx_version < 8017)
    ngx_postgres_ctx_t                 *pgctx;
#endif
    ngx_postgres_upstream_peers_t      *peers;
    ngx_postgres_upstream_peer_t       *peer;
    ngx_connection_t                   *pgxc = NULL;
    int                                 fd;
    ngx_event_t                        *rev, *wev;
    ngx_int_t                           rc;
    u_char                             *connstring, *last;
    size_t                              len;

    dd("entering");

#if defined(nginx_version) && (nginx_version < 8017)
    if (data == NULL) {
        goto failed;
    }

    pgctx = ngx_http_get_module_ctx(pgdt->request, ngx_postgres_module);
#endif

    pgscf = pgdt->srv_conf;

    pgdt->failed = 0;

    if (pgscf->max_cached && pgscf->single) {
        rc = ngx_postgres_keepalive_get_peer_single(pc, pgdt, pgscf);
        if (rc != NGX_DECLINED) {
            /* re-use keepalive peer */
            dd("re-using keepalive peer (single)");

            pgdt->state = state_db_send_query;

            ngx_postgres_process_events(pgdt->request);

            dd("returning NGX_AGAIN");
            return NGX_AGAIN;
        }
    }

    peers = pgscf->peers;

    if (pgscf->current > peers->number - 1) {
        pgscf->current = 0;
    }

    peer = &peers->peer[pgscf->current++];

    pgdt->name.len = peer->name.len;
    pgdt->name.data = peer->name.data;

    pgdt->sockaddr = *peer->sockaddr;

    pc->name = &pgdt->name;
    pc->sockaddr = &pgdt->sockaddr;
    pc->socklen = peer->socklen;
    pc->cached = 0;

    if ((pgscf->max_cached) && (!pgscf->single)) {
        rc = ngx_postgres_keepalive_get_peer_multi(pc, pgdt, pgscf);
        if (rc != NGX_DECLINED) {
            /* re-use keepalive peer */
            dd("re-using keepalive peer (multi)");

            pgdt->state = state_db_send_query;

            ngx_postgres_process_events(pgdt->request);

            dd("returning NGX_AGAIN");
            return NGX_AGAIN;
        }
    }

    if ((pgscf->reject) && (pgscf->active_conns >= pgscf->max_cached)) {
        ngx_log_error(NGX_LOG_INFO, pc->log, 0,
                      "postgres: keepalive connection pool is full,"
                      " rejecting request to upstream \"%V\"", &peer->name);

        /* a bit hack-ish way to return error response (setup part) */
        pc->connection = ngx_get_connection(0, pc->log);

#if defined(nginx_version) && (nginx_version < 8017)
        pgctx->status = NGX_HTTP_SERVICE_UNAVAILABLE;
#endif

        dd("returning NGX_AGAIN (NGX_HTTP_SERVICE_UNAVAILABLE)");
        return NGX_AGAIN;
    }

    /* sizeof("...") - 1 + 1 (for spaces and '\0' omitted */
    len = sizeof("hostaddr=") + peer->host.len
        + sizeof("port=") + sizeof("65535") - 1
        + sizeof("dbname=") + peer->dbname.len
        + sizeof("user=") + peer->user.len
        + sizeof("password=") + peer->password.len
        + sizeof("sslmode=disable");

    connstring = ngx_pnalloc(pgdt->request->pool, len);
    if (connstring == NULL) {
#if defined(nginx_version) && (nginx_version >= 8017)
        dd("returning NGX_ERROR");
        return NGX_ERROR;
#else
        goto failed;
#endif
    }

    /* TODO add unix sockets */
    last = ngx_snprintf(connstring, len - 1,
                        "hostaddr=%V port=%d dbname=%V user=%V password=%V"
                        " sslmode=disable",
                        &peer->host, peer->port, &peer->dbname, &peer->user,
                        &peer->password);
    *last = '\0';

    dd("PostgreSQL connection string: %s", connstring);

    /*
     * internal checks in PQsetnonblocking are taking care of any
     * PQconnectStart failures, so we don't need to check them here.
     */

    ngx_log_debug0(NGX_LOG_DEBUG_HTTP, pc->log, 0,
                   "postgres: connecting");

    pgdt->pgconn = PQconnectStart((const char *)connstring);
    if (PQsetnonblocking(pgdt->pgconn, 1) == -1) {
        ngx_log_error(NGX_LOG_ERR, pc->log, 0,
                      "postgres: connection failed: %s in upstream \"%V\"",
                      PQerrorMessage(pgdt->pgconn), &peer->name);

        PQfinish(pgdt->pgconn);
        pgdt->pgconn = NULL;

#if defined(nginx_version) && (nginx_version >= 8017)
        dd("returning NGX_DECLINED");
        return NGX_DECLINED;
#else
        pgctx->status = NGX_HTTP_BAD_GATEWAY;
        goto failed;
#endif
    }

#if defined(DDEBUG) && (DDEBUG > 1)
    PQtrace(pgdt->pgconn, stderr);
#endif

    dd("connection status:%d", (int) PQstatus(pgdt->pgconn));

    /* take spot in keepalive connection pool */
    pgscf->active_conns++;

    /* add the file descriptor (fd) into an nginx connection structure */

    fd = PQsocket(pgdt->pgconn);
    if (fd == -1) {
        ngx_log_error(NGX_LOG_ERR, pc->log, 0,
                      "postgres: failed to get connection fd");

        goto invalid;
    }

    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, pc->log, 0,
                   "postgres: connection fd:%d", fd);

    pgxc = pc->connection = ngx_get_connection(fd, pc->log);

    if (pgxc == NULL) {
        ngx_log_error(NGX_LOG_ERR, pc->log, 0,
                      "postgres: failed to get a free nginx connection");

        goto invalid;
    }

    pgxc->log = pc->log;
    pgxc->log_error = pc->log_error;
    pgxc->number = ngx_atomic_fetch_add(ngx_connection_counter, 1);

    rev = pgxc->read;
    wev = pgxc->write;

    rev->log = pc->log;
    wev->log = pc->log;

    /* register the connection with postgres connection fd into the
     * nginx event model */

    if (ngx_event_flags & NGX_USE_RTSIG_EVENT) {
        dd("NGX_USE_RTSIG_EVENT");
        if (ngx_add_conn(pgxc) != NGX_OK) {
            goto bad_add;
        }

    } else if (ngx_event_flags & NGX_USE_CLEAR_EVENT) {
        dd("NGX_USE_CLEAR_EVENT");
        if (ngx_add_event(rev, NGX_READ_EVENT, NGX_CLEAR_EVENT) != NGX_OK) {
            goto bad_add;
        }

        if (ngx_add_event(wev, NGX_WRITE_EVENT, NGX_CLEAR_EVENT) != NGX_OK) {
            goto bad_add;
        }

    } else {
        dd("NGX_USE_LEVEL_EVENT");
        if (ngx_add_event(rev, NGX_READ_EVENT, NGX_LEVEL_EVENT) != NGX_OK) {
            goto bad_add;
        }

        if (ngx_add_event(wev, NGX_WRITE_EVENT, NGX_LEVEL_EVENT) != NGX_OK) {
            goto bad_add;
        }
    }

    pgxc->log->action = "connecting to PostgreSQL database";
    pgdt->state = state_db_connect;

    dd("returning NGX_AGAIN");
    return NGX_AGAIN;

bad_add:

    ngx_log_error(NGX_LOG_ERR, pc->log, 0,
                  "postgres: failed to add nginx connection");

invalid:

    ngx_postgres_upstream_free_connection(pc->log, pc->connection,
                                          pgdt->pgconn, pgscf);

#if defined(nginx_version) && (nginx_version >= 8017)
    dd("returning NGX_ERROR");
    return NGX_ERROR;
#else

failed:

    /* a bit hack-ish way to return error response (setup part) */
    pc->connection = ngx_get_connection(0, pc->log);

    dd("returning NGX_AGAIN (NGX_ERROR)");
    return NGX_AGAIN;
#endif
}

void
ngx_postgres_upstream_free_peer(ngx_peer_connection_t *pc,
    void *data, ngx_uint_t state)
{
    ngx_postgres_upstream_peer_data_t  *pgdt = data;
    ngx_postgres_upstream_srv_conf_t   *pgscf;

    dd("entering");

#if defined(nginx_version) && (nginx_version < 8017)
    if (data == NULL) {
        dd("returning");
        return;
    }
#endif

    pgscf = pgdt->srv_conf;

    if (pgscf->max_cached) {
        ngx_postgres_keepalive_free_peer(pc, pgdt, pgscf, state);
    }

    if (pc->connection) {
        dd("free connection to PostgreSQL database");

        ngx_postgres_upstream_free_connection(pc->log, pc->connection,
                pgdt->pgconn, pgscf);

        pgdt->pgconn = NULL;
        pc->connection = NULL;
    }

    dd("returning");
}

ngx_flag_t
ngx_postgres_upstream_is_my_peer(const ngx_peer_connection_t *peer)
{
    dd("entering & returning");
    return (peer->get == ngx_postgres_upstream_get_peer);
}

void
ngx_postgres_upstream_free_connection(ngx_log_t *log, ngx_connection_t *c,
    PGconn *pgconn, ngx_postgres_upstream_srv_conf_t *pgscf)
{
    ngx_event_t  *rev, *wev;

    dd("entering");

    PQfinish(pgconn);

    if (c) {
        rev = c->read;
        wev = c->write;

        if (rev->timer_set) {
            ngx_del_timer(rev);
        }

        if (wev->timer_set) {
            ngx_del_timer(wev);
        }

        if (ngx_del_conn) {
           ngx_del_conn(c, NGX_CLOSE_EVENT);
        } else {
            if (rev->active || rev->disabled) {
                ngx_del_event(rev, NGX_READ_EVENT, NGX_CLOSE_EVENT);
            }

            if (wev->active || wev->disabled) {
                ngx_del_event(wev, NGX_WRITE_EVENT, NGX_CLOSE_EVENT);
            }
        }

#if defined(nginx_version) && nginx_version >= 1007005
        if (rev->posted) {
#else
        if (rev->prev) {
#endif
            ngx_delete_posted_event(rev);
        }

#if defined(nginx_version) && nginx_version >= 1007005
        if (wev->posted) {
#else
        if (wev->prev) {
#endif
            ngx_delete_posted_event(wev);
        }

        rev->closed = 1;
        wev->closed = 1;

#if defined(nginx_version) && (nginx_version >= 1001004)
        if (c->pool) {
            ngx_destroy_pool(c->pool);
        }
#endif

        ngx_free_connection(c);

        c->fd = (ngx_socket_t) -1;
    }

    /* free spot in keepalive connection pool */
    pgscf->active_conns--;

    dd("returning");
}