/*
 * Copyright (C) Yichun Zhang (agentzh)
 */


#ifndef DDEBUG
#define DDEBUG 0
#endif
#include "ddebug.h"

#include "ngx_http_memc_response.h"
#include "ngx_http_memc_module.h"


#ifdef s_char
#undef s_char
#endif

#define s_char signed char


%% machine memc_storage;
%% write data;

%% machine memc_flush_all;
%% write data;

%% machine memc_version;
%% write data;

%% machine memc_stats;
%% write data;

%% machine memc_delete;
%% write data;

%% machine memc_incr_decr;
%% write data;


u_char  ngx_http_memc_end[] = CRLF "END" CRLF;


static u_char *parse_memc_storage(int *cs_addr, u_char *p, u_char *pe,
        ngx_uint_t *status_addr, unsigned *done_addr);
static u_char *parse_memc_flush_all(int *cs_addr, u_char *p, u_char *pe,
        ngx_uint_t *status_addr, unsigned *done_addr);
static u_char *parse_memc_version(int *cs_addr, u_char *p, u_char *pe,
        ngx_uint_t *status_addr, unsigned *done_addr);
static u_char *parse_memc_stats(int *cs_addr, u_char *p, u_char *pe,
        ngx_uint_t *status_addr, unsigned *done_addr);
static u_char *parse_memc_delete(int *cs_addr, u_char *p, u_char *pe,
        ngx_uint_t *status_addr, unsigned *done_addr);
static u_char *parse_memc_incr_decr(int *cs_addr, u_char *p, u_char *pe,
        ngx_uint_t *status_addr, unsigned *done_addr);
static ngx_int_t ngx_http_memc_write_simple_response(ngx_http_request_t *r,
        ngx_http_upstream_t *u, ngx_http_memc_ctx_t *ctx,
        ngx_uint_t status, ngx_str_t *resp);


ngx_int_t
ngx_http_memc_process_simple_header(ngx_http_request_t *r)
{
    ngx_int_t                rc;
    int                      cs;
    s_char                  *p;
    s_char                  *pe;
    s_char                  *orig;
    ngx_str_t                resp;
    ngx_http_upstream_t     *u;
    ngx_http_memc_ctx_t     *ctx;
    ngx_uint_t               status;
    unsigned                 done = 0;
    int                      error_state;
    int                      final_state;

    status = NGX_HTTP_OK;

    dd("process simple cmd header");

    ctx = ngx_http_get_module_ctx(r, ngx_http_memc_module);

    if (ctx->parser_state == NGX_ERROR) {
        dd("reinit state");

        if (ctx->is_storage_cmd) {
            dd("init memc_storage machine...");

            %% machine memc_storage;
            %% write init;

        } else if (ctx->cmd == ngx_http_memc_cmd_flush_all) {
            dd("init memc_flush_all machine...");

            %% machine memc_flush_all;
            %% write init;

        } else if (ctx->cmd == ngx_http_memc_cmd_version) {
            dd("init memc_version machine...");

            %% machine memc_version;
            %% write init;

        } else if (ctx->cmd == ngx_http_memc_cmd_stats) {
            dd("init memc_stats machine...");

            %% machine memc_stats;
            %% write init;

        } else if (ctx->cmd == ngx_http_memc_cmd_delete) {
            dd("init memc_delete machine...");

            %% machine memc_delete;
            %% write init;

        } else if (ctx->cmd == ngx_http_memc_cmd_incr
                || ctx->cmd == ngx_http_memc_cmd_decr)
        {
            dd("init memc_incr_decr machine...");

            %% machine memc_incr_decr;
            %% write init;

        } else {
            ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
              "unrecognized memcached command in "
              "ngx_http_memc_process_simple_header: \"%V\"",
              &ctx->cmd_str);

            return NGX_ERROR; /* this results in 500 status */
        }

    } else {
        cs = ctx->parser_state;
    }

    u = r->upstream;

    orig = (s_char *) u->buffer.pos;

    p  = (s_char *) u->buffer.pos;
    pe = (s_char *) u->buffer.last;

    dd("buffer len: %d", (int) (pe - p));

    if (ctx->is_storage_cmd) {
        error_state = memc_storage_error;
        final_state = memc_storage_first_final;

        p = (s_char *) parse_memc_storage(&cs, (u_char *) p, (u_char *) pe,
                &status, &done);

    } else if (ctx->cmd == ngx_http_memc_cmd_flush_all) {
        error_state = memc_flush_all_error;
        final_state = memc_flush_all_first_final;

        p = (s_char *) parse_memc_flush_all(&cs, (u_char *) p, (u_char *) pe,
                &status, &done);

    } else if (ctx->cmd == ngx_http_memc_cmd_version) {
        error_state = memc_version_error;
        final_state = memc_version_first_final;

        p = (s_char *) parse_memc_version(&cs, (u_char *) p, (u_char *) pe,
                &status, &done);

    } else if (ctx->cmd == ngx_http_memc_cmd_stats) {
        error_state = memc_stats_error;
        final_state = memc_stats_first_final;

        p = (s_char *) parse_memc_stats(&cs, (u_char *) p, (u_char *) pe,
                &status, &done);

    } else if (ctx->cmd == ngx_http_memc_cmd_delete) {
        error_state = memc_delete_error;
        final_state = memc_delete_first_final;

        p = (s_char *) parse_memc_delete(&cs, (u_char *) p, (u_char *) pe,
                &status, &done);

    } else if (ctx->cmd == ngx_http_memc_cmd_incr
            || ctx->cmd == ngx_http_memc_cmd_decr)
    {
        error_state = memc_incr_decr_error;
        final_state = memc_incr_decr_first_final;

        p = (s_char *) parse_memc_incr_decr(&cs, (u_char *) p, (u_char *) pe,
                &status, &done);

    } else {
        ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
          "unrecognized memcached command in "
          "ngx_http_memc_process_simple_header: \"%V\"",
          &ctx->cmd_str);

        return NGX_ERROR; /* this results in 500 status */
    }

    ctx->parser_state = cs;

    resp.data = u->buffer.start;
    resp.len  = (u_char *) p - resp.data;

    u->buffer.pos = (u_char *) p;

    dd("machine state: %d (done: %d)", cs, done);
    dd("memcached response: (len: %d) %s", (int) resp.len, resp.data);

    if (done || cs >= final_state) {
        dd("memcached response parsed (resp.len: %d)", (int) resp.len);

        rc = ngx_http_memc_write_simple_response(r, u, ctx, status, &resp);

        return rc;
    }

    if (cs == error_state) {
        ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
                  "memcached sent invalid response for command \"%V\" "
                  "at pos %O: %V", &ctx->cmd_str, (off_t) (p - orig), &resp);

        status = NGX_HTTP_BAD_GATEWAY;
        u->headers_in.status_n = status;
        u->state->status = status;

        /* u->headers_in.status_n will be the final status */
        return NGX_OK;
    }

    dd("we need more data to proceed (returned NGX_AGAIN)");

    return NGX_AGAIN;
}


ngx_int_t
ngx_http_memc_empty_filter_init(void *data)
{
    ngx_http_memc_ctx_t  *ctx = data;
    ngx_http_upstream_t  *u;

    u = ctx->request->upstream;

    u->length = 0;

    /* to persuade ngx_http_upstream_keepalive (if any)
       to cache the connection if the status is neither
       200 nor 404. */
    if (u->headers_in.status_n == NGX_HTTP_CREATED) {
        u->headers_in.status_n = NGX_HTTP_OK;
    }

    return NGX_OK;
}

ngx_int_t
ngx_http_memc_empty_filter(void *data, ssize_t bytes)
{
    ngx_http_memc_ctx_t  *ctx = data;
    ngx_http_upstream_t  *u;

    u = ctx->request->upstream;

    /* recover the buffer for subrequests in memory */
    u->buffer.last += ctx->body_length;

    return NGX_OK;
}


ngx_int_t
ngx_http_memc_get_cmd_filter_init(void *data)
{
    ngx_http_memc_ctx_t  *ctx = data;

    ngx_http_upstream_t  *u;

    u = ctx->request->upstream;

    dd("filter init: u->length: %d", (int) u->length);

    u->length = u->headers_in.content_length_n + NGX_HTTP_MEMC_END;

    dd("filter init (2): u->length: %d", (int) u->length);

    return NGX_OK;
}


ngx_int_t
ngx_http_memc_get_cmd_filter(void *data, ssize_t bytes)
{
    ngx_http_memc_ctx_t  *ctx = data;

    u_char               *last;
    ngx_buf_t            *b;
    ngx_chain_t          *cl, **ll;
    ngx_http_upstream_t  *u;

    u = ctx->request->upstream;
    b = &u->buffer;

    if (u->length == ctx->rest) {

        if (ngx_strncmp(b->last,
                        ngx_http_memc_end + NGX_HTTP_MEMC_END - ctx->rest,
                        bytes) != 0)
        {
            ngx_log_error(NGX_LOG_ERR, ctx->request->connection->log, 0,
                          "memcached sent invalid trailer");

            u->length = 0;
            ctx->rest = 0;

            return NGX_OK;
        }

        u->length -= bytes;
        ctx->rest -= bytes;

#if defined(nginx_version) && nginx_version >= 1001004
        if (u->length == 0) {
            u->keepalive = 1;
        }
#endif

        return NGX_OK;
    }

    for (cl = u->out_bufs, ll = &u->out_bufs; cl; cl = cl->next) {
        ll = &cl->next;
    }

    cl = ngx_chain_get_free_buf(ctx->request->pool, &u->free_bufs);
    if (cl == NULL) {
        return NGX_ERROR;
    }

    cl->buf->flush = 1;
    cl->buf->memory = 1;

    *ll = cl;

    last = b->last;
    cl->buf->pos = last;
    b->last += bytes;
    cl->buf->last = b->last;
    cl->buf->tag = u->output.tag;

    ngx_log_debug4(NGX_LOG_DEBUG_HTTP, ctx->request->connection->log, 0,
                   "memcached filter bytes:%z size:%z length:%z rest:%z",
                   bytes, b->last - b->pos, u->length, ctx->rest);

    if (bytes <= (ssize_t) (u->length - NGX_HTTP_MEMC_END)) {
        u->length -= bytes;
        return NGX_OK;
    }

    last += u->length - NGX_HTTP_MEMC_END;

    if (ngx_strncmp(last, ngx_http_memc_end, b->last - last) != 0) {
        ngx_log_error(NGX_LOG_ERR, ctx->request->connection->log, 0,
                      "memcached sent invalid trailer");

#if defined(nginx_version) && nginx_version >= 1001004
        b->last = last;
        cl->buf->last = last;
        u->length = 0;
        ctx->rest = 0;

        return NGX_OK;
#endif
    }

    ctx->rest -= b->last - last;
    b->last = last;
    cl->buf->last = last;
    u->length = ctx->rest;

#if defined(nginx_version) && nginx_version >= 1001004
    if (u->length == 0) {
        u->keepalive = 1;
    }
#endif

    return NGX_OK;
}


ngx_int_t
ngx_http_memc_process_get_cmd_header(ngx_http_request_t *r)
{
    ngx_http_memc_loc_conf_t        *conf;
    u_char                          *p, *len;
    ngx_str_t                        line;
    ngx_http_upstream_t             *u;
    ngx_http_memc_ctx_t             *ctx;
    ngx_http_variable_value_t       *flags_vv;

    u = r->upstream;

    dd("process header: u->length: %u", (unsigned) u->length);

    for (p = u->buffer.pos; p < u->buffer.last; p++) {
        if (*p == LF) {
            goto found;
        }
    }

    return NGX_AGAIN;

found:

    *p = '\0';

    line.len = p - u->buffer.pos - 1;
    line.data = u->buffer.pos;

    ngx_log_debug1(NGX_LOG_DEBUG_HTTP, r->connection->log, 0,
                   "memcached: \"%V\"", &line);

    p = u->buffer.pos;

    ctx = ngx_http_get_module_ctx(r, ngx_http_memc_module);

    if (ngx_strncmp(p, "VALUE ", sizeof("VALUE ") - 1) == 0) {

        p += sizeof("VALUE ") - 1;

        if (ngx_strncmp(p, ctx->key.data, ctx->key.len) != 0) {
            ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
                          "memcached sent invalid key in response \"%V\" "
                          "for key \"%V\"",
                          &line, &ctx->key);

            return NGX_HTTP_UPSTREAM_INVALID_HEADER;
        }

        p += ctx->key.len;

        if (*p++ != ' ') {
            goto no_valid;
        }

        /* save flags */

        flags_vv = ctx->memc_flags_vv;

        if (flags_vv == NULL) {
            return NGX_ERROR;
        }

        if (flags_vv->not_found) {
            flags_vv->not_found = 0;
            flags_vv->valid = 1;
            flags_vv->no_cacheable = 0;
        }

        flags_vv->data = p;

        while (*p) {
            if (*p++ == ' ') {
                flags_vv->len = p - 1 - flags_vv->data;
                conf = ngx_http_get_module_loc_conf(r, ngx_http_memc_module);

                if (conf->flags_to_last_modified) {
                    r->headers_out.last_modified_time =
                            ngx_atotm(flags_vv->data, flags_vv->len);
                }

                goto length;
            }
        }

        goto no_valid;

    length:

        len = p;

        while (*p && *p++ != CR) { /* void */ }

#if defined(nginx_version) && nginx_version >= 1001004
        u->headers_in.content_length_n = ngx_atoof(len, p - len - 1);
        if (u->headers_in.content_length_n == -1) {
#else
        r->headers_out.content_length_n = ngx_atoof(len, p - len - 1);
        if (r->headers_out.content_length_n == -1) {
#endif
            ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
                          "memcached sent invalid length in response \"%V\" "
                          "for key \"%V\"",
                          &line, &ctx->key);
            return NGX_HTTP_UPSTREAM_INVALID_HEADER;
        }

        u->headers_in.status_n = NGX_HTTP_OK;
        u->state->status = NGX_HTTP_OK;
        u->buffer.pos = p + 1;

        return NGX_OK;
    }

    if (ngx_strcmp(p, "END\x0d") == 0) {
        ngx_log_error(NGX_LOG_INFO, r->connection->log, 0,
                      "key: \"%V\" was not found by memcached", &ctx->key);

        u->headers_in.status_n = NGX_HTTP_NOT_FOUND;
        u->state->status = NGX_HTTP_NOT_FOUND;

#if defined(nginx_version) && nginx_version >= 1001004
        u->keepalive = 1;
#endif

        return NGX_OK;
    }

no_valid:

    ngx_log_error(NGX_LOG_ERR, r->connection->log, 0,
                  "memcached sent invalid response: \"%V\"", &line);

    return NGX_HTTP_UPSTREAM_INVALID_HEADER;
}


static ngx_int_t
ngx_http_memc_write_simple_response(ngx_http_request_t *r,
    ngx_http_upstream_t *u, ngx_http_memc_ctx_t *ctx, ngx_uint_t status,
    ngx_str_t *resp)
{
    ngx_chain_t             *cl, **ll;

    for (cl = u->out_bufs, ll = &u->out_bufs; cl; cl = cl->next) {
        ll = &cl->next;
    }

    cl = ngx_chain_get_free_buf(r->pool, &u->free_bufs);
    if (cl == NULL) {
        return NGX_ERROR;
    }

    cl->buf->flush = 1;
    cl->buf->memory = 1;
    cl->buf->pos = resp->data;
    cl->buf->last = cl->buf->pos + resp->len;

    *ll = cl;

    /* for subrequests in memory */
    u->buffer.pos = resp->data;
    u->buffer.last = resp->data + resp->len;
    ctx->body_length = resp->len;

#if defined(nginx_version) && nginx_version >= 1001004
    u->headers_in.content_length_n = resp->len;
    u->keepalive = 1;
#else
    r->headers_out.content_length_n = resp->len;
#endif

    u->headers_in.status_n = status;
    u->state->status = status;

    return NGX_OK;
}


static u_char *
parse_memc_storage(int *cs_addr, u_char *p, u_char *pe,
    ngx_uint_t *status_addr, unsigned *done_addr)
{
    int cs = *cs_addr;

    %% machine memc_storage;
    %% include "memc_storage.rl";
    %% write exec;

    *cs_addr = cs;

    return p;
}


static u_char *
parse_memc_flush_all(int *cs_addr, u_char *p, u_char *pe,
    ngx_uint_t *status_addr, unsigned *done_addr)
{
    int cs = *cs_addr;

    %% machine memc_flush_all;
    %% include "memc_flush_all.rl";
    %% write exec;

    *cs_addr = cs;

    return p;
}


static u_char *
parse_memc_version(int *cs_addr, u_char *p, u_char *pe,
    ngx_uint_t *status_addr, unsigned *done_addr)
{
    int cs = *cs_addr;

    %% machine memc_version;
    %% include "memc_version.rl";
    %% write exec;

    *cs_addr = cs;

    return p;
}


static u_char *
parse_memc_stats(int *cs_addr, u_char *p, u_char *pe, ngx_uint_t *status_addr,
    unsigned *done_addr)
{
    int cs = *cs_addr;

    %% machine memc_stats;
    %% include "memc_stats.rl";
    %% write exec;

    *cs_addr = cs;

    return p;
}


static u_char *
parse_memc_delete(int *cs_addr, u_char *p, u_char *pe, ngx_uint_t *status_addr,
    unsigned *done_addr)
{
    int cs = *cs_addr;

    %% machine memc_delete;
    %% include "memc_delete.rl";
    %% write exec;

    *cs_addr = cs;

    return p;
}


static u_char *
parse_memc_incr_decr(int *cs_addr, u_char *p, u_char *pe,
    ngx_uint_t *status_addr, unsigned *done_addr)
{
    int cs = *cs_addr;

    %% machine memc_incr_decr;
    %% include "memc_incr_decr.rl";
    %% write exec;

    *cs_addr = cs;

    return p;
}