/*
 * Copyright (C) Yichun Zhang (agentzh)
 */


#ifndef DDEBUG
#define DDEBUG 0
#endif
#include "ddebug.h"


#include "ngx_http_drizzle_module.h"
#include "ngx_http_drizzle_handler.h"
#include "ngx_http_drizzle_processor.h"
#include "ngx_http_drizzle_util.h"
#include "ngx_http_drizzle_upstream.h"
#include "ngx_http_drizzle_keepalive.h"


#ifdef _WIN32
/* import the POLLIN and POLLOUT flags */
#   ifndef WIN32_LEAN_AND_MEAN
#       define WIN32_LEAN_AND_MEAN
#   endif
#   include <winsock2.h>
#endif


/* for read/write event handlers */


static ngx_int_t ngx_http_drizzle_create_request(ngx_http_request_t *r);
static ngx_int_t ngx_http_drizzle_reinit_request(ngx_http_request_t *r);
static void ngx_http_drizzle_abort_request(ngx_http_request_t *r);
static void ngx_http_drizzle_finalize_request(ngx_http_request_t *r,
        ngx_int_t rc);
static ngx_int_t ngx_http_drizzle_process_header(ngx_http_request_t *r);
static ngx_int_t ngx_http_drizzle_input_filter_init(void *data);
static ngx_int_t ngx_http_drizzle_input_filter(void *data, ssize_t bytes);


ngx_int_t
ngx_http_drizzle_handler(ngx_http_request_t *r)
{
    ngx_http_upstream_t            *u;
    ngx_http_drizzle_loc_conf_t    *dlcf;
#if defined(nginx_version) && nginx_version < 8017
    ngx_http_drizzle_ctx_t         *dctx;
#endif
    ngx_http_core_loc_conf_t       *clcf;
    ngx_str_t                       target;
    ngx_url_t                       url;
    ngx_connection_t               *c;

    dd("request: %p", r);
    dd("subrequest in memory: %d", (int) r->subrequest_in_memory);
    dd("connection: %p", r->connection);
    dd("connection log: %p", r->connection->log);

    if (r->subrequest_in_memory) {
        /* TODO: add support for subrequest in memory by
         * emitting output into u->buffer instead */

        ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0,
                      "ngx_http_drizzle_module does not support "
                      "subrequest in memory");

        return NGX_HTTP_INTERNAL_SERVER_ERROR;
    }

    dlcf = ngx_http_get_module_loc_conf(r, ngx_http_drizzle_module);

    if ((dlcf->default_query == NULL) && !(dlcf->methods_set & r->method)) {
        if (dlcf->methods_set != 0) {
            return NGX_HTTP_NOT_ALLOWED;
        }

        clcf = ngx_http_get_module_loc_conf(r, ngx_http_core_module);

        ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
                      "drizzle: missing \"drizzle_query\" in location \"%V\"",
                      &clcf->name);

        return NGX_HTTP_INTERNAL_SERVER_ERROR;
    }

    dd("XXX upstream already exists? %p", r->upstream);

#if defined(nginx_version) && \
    ((nginx_version >= 7063 && nginx_version < 8000) \
     || nginx_version >= 8007)

    dd("creating upstream.......");
    if (ngx_http_upstream_create(r) != NGX_OK) {
        return NGX_HTTP_INTERNAL_SERVER_ERROR;
    }

    u = r->upstream;

#else /* 0.7.x < 0.7.63, 0.8.x < 0.8.7 */

    dd("XXX create upstream");
    u = ngx_pcalloc(r->pool, sizeof(ngx_http_upstream_t));
    if (u == NULL) {
        return NGX_HTTP_INTERNAL_SERVER_ERROR;
    }

    u->peer.log = r->connection->log;
    u->peer.log_error = NGX_ERROR_ERR;
#  if (NGX_THREADS)
    u->peer.lock = &r->connection->lock;
#  endif

    r->upstream = u;

#endif

    if (dlcf->complex_target) {
        /* variables used in the drizzle_pass directive */
        if (ngx_http_complex_value(r, dlcf->complex_target, &target)
                != NGX_OK)
        {
            dd("failed to compile");
            return NGX_ERROR;
        }

        if (target.len == 0) {
            ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
                    "drizzle: handler: empty \"drizzle_pass\" target");
            return NGX_HTTP_INTERNAL_SERVER_ERROR;
        }

        url.host = target;
        url.port = 0;
        url.no_resolve = 1;

        dlcf->upstream.upstream = ngx_http_upstream_drizzle_add(r, &url);

        if (dlcf->upstream.upstream == NULL) {
            ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
                   "drizzle: upstream \"%V\" not found", &target);

            return NGX_HTTP_INTERNAL_SERVER_ERROR;
        }
    }

#if defined(nginx_version) && nginx_version < 8017
    dctx = ngx_pcalloc(r->pool, sizeof(ngx_http_drizzle_ctx_t));
    if (dctx == NULL) {
        return NGX_HTTP_INTERNAL_SERVER_ERROR;
    }

    ngx_http_set_ctx(r, dctx, ngx_http_drizzle_module);
#endif

    u->schema.len = sizeof("drizzle://") - 1;
    u->schema.data = (u_char *) "drizzle://";

    u->output.tag = (ngx_buf_tag_t) &ngx_http_drizzle_module;

    dd("drizzle tag: %p", (void *) u->output.tag);

    u->conf = &dlcf->upstream;

    u->create_request = ngx_http_drizzle_create_request;
    u->reinit_request = ngx_http_drizzle_reinit_request;
    u->process_header = ngx_http_drizzle_process_header;
    u->abort_request = ngx_http_drizzle_abort_request;
    u->finalize_request = ngx_http_drizzle_finalize_request;

    /* we bypass the upstream input filter mechanism in
     * ngx_http_upstream_process_headers */

    u->input_filter_init = ngx_http_drizzle_input_filter_init;
    u->input_filter = ngx_http_drizzle_input_filter;
    u->input_filter_ctx = NULL;

#if defined(nginx_version) && nginx_version >= 8011
    r->main->count++;
#endif

    dd("XXX connect timeout: %d", (int) dlcf->upstream.connect_timeout);

    ngx_http_upstream_dbd_init(r);

    /* override the read/write event handler to our own */
    u->write_event_handler = ngx_http_drizzle_wev_handler;
    u->read_event_handler  = ngx_http_drizzle_rev_handler;

    /* a bit hack-ish way to return error response (clean-up part) */
    if ((u->peer.connection) && (u->peer.connection->fd == 0)) {
        c = u->peer.connection;
        u->peer.connection = NULL;

        if (c->write->timer_set) {
            ngx_del_timer(c->write);
        }

        ngx_free_connection(c);

        ngx_http_upstream_drizzle_finalize_request(r, u,
#if defined(nginx_version) && (nginx_version >= 8017)
            NGX_HTTP_SERVICE_UNAVAILABLE);
#else
            dctx->status ? dctx->status : NGX_HTTP_INTERNAL_SERVER_ERROR);
#endif
    }

    return NGX_DONE;
}


void
ngx_http_drizzle_wev_handler(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
    ngx_connection_t            *c;

    dd("drizzle wev handler");

    /* just to ensure u->reinit_request always gets called for
     * upstream_next */
    u->request_sent = 1;

    c = u->peer.connection;

    if (c->write->timedout) {
        dd("drizzle connection write timeout");

        ngx_http_drizzle_set_thread_id_variable(r, u);

        ngx_http_upstream_drizzle_next(r, u,
                NGX_HTTP_UPSTREAM_FT_TIMEOUT);
        return;
    }

    if (ngx_http_upstream_drizzle_test_connect(c) != NGX_OK) {
        dd("drizzle connection is broken");

        ngx_http_upstream_drizzle_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
        return;
    }

    ngx_http_drizzle_set_libdrizzle_ready(r);

    (void) ngx_http_drizzle_process_events(r);
}


void
ngx_http_drizzle_rev_handler(ngx_http_request_t *r, ngx_http_upstream_t *u)
{
    ngx_connection_t            *c;

    dd("drizzle rev handler");

    /* just to ensure u->reinit_request always gets called for
     * upstream_next */
    u->request_sent = 1;

    c = u->peer.connection;

    if (c->read->timedout) {
        dd("drizzle connection read timeout");
        ngx_http_drizzle_set_thread_id_variable(r, u);

        ngx_http_upstream_drizzle_next(r, u,
                NGX_HTTP_UPSTREAM_FT_TIMEOUT);
        return;
    }

    if (ngx_http_upstream_drizzle_test_connect(c) != NGX_OK) {
        dd("drizzle connection is broken");

        ngx_http_upstream_drizzle_next(r, u, NGX_HTTP_UPSTREAM_FT_ERROR);
        return;
    }

    ngx_http_drizzle_set_libdrizzle_ready(r);

    (void) ngx_http_drizzle_process_events(r);
}


static ngx_int_t
ngx_http_drizzle_create_request(ngx_http_request_t *r)
{
    r->upstream->request_bufs = NULL;

    return NGX_OK;
}


static ngx_int_t
ngx_http_drizzle_reinit_request(ngx_http_request_t *r)
{
    ngx_http_upstream_t         *u;

    u = r->upstream;

    /* override the read/write event handler to our own */
    u->write_event_handler = ngx_http_drizzle_wev_handler;
    u->read_event_handler  = ngx_http_drizzle_rev_handler;

    return NGX_OK;
}


static void
ngx_http_drizzle_abort_request(ngx_http_request_t *r)
{
}


static void
ngx_http_drizzle_finalize_request(ngx_http_request_t *r,
    ngx_int_t rc)
{
}


static ngx_int_t
ngx_http_drizzle_process_header(ngx_http_request_t *r)
{
    ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
           "ngx_http_drizzle_process_header should not be called"
           " by the upstream");

    return NGX_ERROR;
}


static ngx_int_t
ngx_http_drizzle_input_filter_init(void *data)
{
    ngx_http_request_t          *r = data;

    ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
           "ngx_http_drizzle_input_filter_init should not be called"
           " by the upstream");

    return NGX_ERROR;
}


static ngx_int_t
ngx_http_drizzle_input_filter(void *data, ssize_t bytes)
{
    ngx_http_request_t          *r = data;

    ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
           "ngx_http_drizzle_input_filter should not be called"
           " by the upstream");

    return NGX_ERROR;
}


void
ngx_http_drizzle_set_libdrizzle_ready(ngx_http_request_t *r)
{
    ngx_http_upstream_drizzle_peer_data_t       *dp;
    drizzle_con_st                              *dc;
#if 1
    short                                        revents = 0;
#endif

    dp = r->upstream->peer.data;

    dc = dp->drizzle_con;

#if 0
    /* libdrizzle uses standard poll() event constants
     * and depends on drizzle_con_wait() to set them.
     * we can directly call drizzle_con_wait() here to
     * set those drizzle internal event states, because
     * epoll() and other underlying event mechamism used
     * by the nginx core can play well enough with poll().
     * */

    (void) drizzle_con_wait(dc->drizzle);
#endif

#if 1
    revents |= POLLOUT;
    revents |= POLLIN;

    /* drizzle_con_set_revents() isn't declared external in libdrizzle-0.4.0, */
    /* so we have to do its job all by ourselves... */

    dc->options |= DRIZZLE_CON_IO_READY;
    dc->revents = revents;
    dc->events &= (short) ~revents;
#endif
}


ngx_int_t
ngx_http_drizzle_status_handler(ngx_http_request_t *r)
{
    ngx_http_upstream_main_conf_t  *umcf;
    ngx_http_upstream_srv_conf_t  **uscfp;
    ngx_http_upstream_srv_conf_t   *uscf;
    ngx_uint_t                      i, n;
    ngx_chain_t                    *cl;
    ngx_buf_t                      *b;
    size_t                          len;
    ngx_int_t                       rc;
    ngx_queue_t                    *q;

    ngx_http_drizzle_keepalive_cache_t      *item;
    ngx_http_upstream_drizzle_srv_conf_t    *dscf;

    umcf = ngx_http_get_module_main_conf(r, ngx_http_upstream_module);

    uscfp = umcf->upstreams.elts;

    /* calculate the output buffer length */

    len = 0;

    if (ngx_process == NGX_PROCESS_WORKER) {
        len += sizeof("worker process: \n\n") - 1
             + ngx_http_drizzle_get_num_size(ngx_pid);
    }

    n = 0;

    for (i = 0; i < umcf->upstreams.nelts; i++) {
        uscf = uscfp[i];

        if (uscf->srv_conf == NULL) {
            /* skip implicit upstream specified directly by the fastcgi_pass,
             * proxy_pass, and similar directives */

            continue;
        }

        dscf = ngx_http_conf_upstream_srv_conf(uscf, ngx_http_drizzle_module);

        if (dscf == NULL || dscf->servers == NULL) {
            continue;
        }

        if (n != 0) {
            len += sizeof("\n") - 1;
        }

        n++;

        len += sizeof("upstream \n") - 1
             + uscf->host.len
             + sizeof("  active connections: \n") - 1
             + ngx_http_drizzle_get_num_size(dscf->active_conns)
             + sizeof("  connection pool capacity: \n") - 1
             + ngx_http_drizzle_get_num_size(dscf->max_cached);

        if (dscf->max_cached) {
            /* dump overflow flag for the connection pool */

            switch (dscf->overflow) {
                case drizzle_keepalive_overflow_ignore:
                    len += sizeof("  overflow: ignore\n") - 1;
                    break;

                case drizzle_keepalive_overflow_reject:
                    len += sizeof("  overflow: reject\n") - 1;
                    break;

                default:
                    len += sizeof("  overflow: N/A\n") - 1;
                    break;
            }

            /* dump the lengths of the "cache" and "free" queues in the pool */

            len += sizeof("  cached connection queue: \n") - 1
                 + ngx_http_drizzle_get_num_size(
                         ngx_http_drizzle_queue_size(&dscf->cache)
                   )
                 + sizeof("  free'd connection queue: \n") - 1
                 + ngx_http_drizzle_get_num_size(
                         ngx_http_drizzle_queue_size(&dscf->free)
                   )

            /* dump how many times that each individual connection in the
             * pool has been successfully used in the "cache" queue */

                 + sizeof("  cached connection successfully used count:\n") - 1;

            for (q = ngx_queue_head(&dscf->cache);
                 q != ngx_queue_sentinel(&dscf->cache);
                 q = ngx_queue_next(q))
            {
                item = ngx_queue_data(q, ngx_http_drizzle_keepalive_cache_t,
                        queue);

                len += sizeof(" ") - 1
                     + ngx_http_drizzle_get_num_size(item->used);
            }

            /* dump how many times that each individual connection in the
             * pool has been successfully used in the "free" queue */

            len += sizeof("  free'd connection successfully used count:\n") - 1;

            for (q = ngx_queue_head(&dscf->free);
                 q != ngx_queue_sentinel(&dscf->free);
                 q = ngx_queue_next(q))
            {
                item = ngx_queue_data(q, ngx_http_drizzle_keepalive_cache_t,
                        queue);

                len += sizeof(" ") - 1
                     + ngx_http_drizzle_get_num_size(item->used);
            }
        }

        len += sizeof("  servers: \n") - 1
             + ngx_http_drizzle_get_num_size(dscf->servers->nelts)
             + sizeof("  peers: \n") - 1
             + ngx_http_drizzle_get_num_size(dscf->peers->number);
    }

    /* allocate the output buffer */

    b = ngx_create_temp_buf(r->pool, len);
    if (b == NULL) {
        return NGX_HTTP_INTERNAL_SERVER_ERROR;
    }

    /* fill in the output buffer with the actual data */

    if (ngx_process == NGX_PROCESS_WORKER) {
        b->last = ngx_sprintf(b->last, "worker process: %P\n\n", ngx_pid);
    }

    n = 0;

    for (i = 0; i < umcf->upstreams.nelts; i++) {
        uscf = uscfp[i];

        if (uscf->srv_conf == NULL) {
            /* skip implicit upstream specified directly by the fastcgi_pass,
             * proxy_pass, and similar directives */

            continue;
        }

        dscf = ngx_http_conf_upstream_srv_conf(uscf, ngx_http_drizzle_module);

        if (dscf == NULL || dscf->servers == NULL) {
            continue;
        }

        if (n != 0) {
            *b->last++ = '\n';
        }

        n++;

        b->last = ngx_copy_const_str(b->last, "upstream ");
        b->last = ngx_copy(b->last, uscf->host.data, uscf->host.len);

        *b->last++ = '\n';

        b->last = ngx_sprintf(b->last, "  active connections: %uD\n",
                dscf->active_conns);

        b->last = ngx_sprintf(b->last, "  connection pool capacity: %uD\n",
                dscf->max_cached);

        if (dscf->max_cached) {
            /* dump overflow flag for the connection pool */

            switch (dscf->overflow) {
                case drizzle_keepalive_overflow_ignore:
                    b->last = ngx_copy_const_str(b->last,
                            "  overflow: ignore\n");
                    break;

                case drizzle_keepalive_overflow_reject:
                    b->last = ngx_copy_const_str(b->last,
                            "  overflow: reject\n");
                    break;

                default:
                    b->last = ngx_copy_const_str(b->last, "  overflow: N/A\n");
                    break;
            }

            /* dump the lengths of the "cache" and "free" queues in the pool */

            b->last = ngx_sprintf(b->last, "  cached connection queue: %uD\n",
                    ngx_http_drizzle_queue_size(&dscf->cache));

            b->last = ngx_sprintf(b->last, "  free'd connection queue: %uD\n",
                    ngx_http_drizzle_queue_size(&dscf->free));

            /* dump how many times that each individual connection in the
             * pool has been successfully used in the "cache" queue */

            b->last = ngx_copy_const_str(b->last,
                    "  cached connection successfully used count:");

            for (q = ngx_queue_head(&dscf->cache);
                 q != ngx_queue_sentinel(&dscf->cache);
                 q = ngx_queue_next(q))
            {
                item = ngx_queue_data(q, ngx_http_drizzle_keepalive_cache_t,
                        queue);
                b->last = ngx_sprintf(b->last, " %uD", item->used);
            }

            *b->last++ = '\n';

            /* dump how many times that each individual connection in the
             * pool has been successfully used in the "free" queue */

            b->last = ngx_copy_const_str(b->last,
                    "  free'd connection successfully used count:");

            for (q = ngx_queue_head(&dscf->free);
                 q != ngx_queue_sentinel(&dscf->free);
                 q = ngx_queue_next(q))
            {
                item = ngx_queue_data(q, ngx_http_drizzle_keepalive_cache_t,
                        queue);
                b->last = ngx_sprintf(b->last, " %uD", item->used);
            }

            *b->last++ = '\n';
        }

        b->last = ngx_sprintf(b->last, "  servers: %uD\n",
                dscf->servers->nelts);

        b->last = ngx_sprintf(b->last, "  peers: %uD\n", dscf->peers->number);
    }

    if (b->last != b->end) {
        ngx_log_error(NGX_LOG_ALERT, r->connection->log, 0,
                      "drizzle_status output buffer error: %O != %O",
                      (off_t) (b->last - b->pos),
                      (off_t) (b->end - b->pos));

        return NGX_HTTP_INTERNAL_SERVER_ERROR;
    }

    if (r == r->main) {
        b->last_buf = 1;
    }

    cl = ngx_alloc_chain_link(r->pool);
    if (cl == NULL) {
        return NGX_HTTP_INTERNAL_SERVER_ERROR;
    }

    cl->buf = b;
    cl->next = NULL;

    r->headers_out.status = NGX_HTTP_OK;

    rc = ngx_http_send_header(r);

    if (rc == NGX_ERROR || rc >= NGX_HTTP_SPECIAL_RESPONSE) {
        return rc;
    }

    return ngx_http_output_filter(r, cl);
}