Kouhei Sutou
null+****@clear*****
Tue Jan 6 23:20:50 JST 2015
Kouhei Sutou 2015-01-06 23:20:50 +0900 (Tue, 06 Jan 2015) New Revision: 95d2b3f12444ac351c8884556ac335fbb8dc67f1 https://github.com/groonga/groonga/commit/95d2b3f12444ac351c8884556ac335fbb8dc67f1 Message: Support streaming dump Currently, "groonga --protocol http" only supports it. Modified files: include/groonga/groonga.h lib/ctx.c lib/proc.c src/groonga.c Modified: include/groonga/groonga.h (+1 -0) =================================================================== --- include/groonga/groonga.h 2015-01-05 23:06:34 +0900 (d1c9347) +++ include/groonga/groonga.h 2015-01-06 23:20:50 +0900 (684cf1e) @@ -1302,6 +1302,7 @@ GRN_API void grn_output_envelope(grn_ctx *ctx, grn_rc rc, grn_obj *head, grn_obj *body, grn_obj *foot, const char *file, int line); +GRN_API void grn_ctx_output_flush(grn_ctx *ctx, int flags); GRN_API void grn_ctx_output_array_open(grn_ctx *ctx, const char *name, int nelements); GRN_API void grn_ctx_output_array_close(grn_ctx *ctx); Modified: lib/ctx.c (+12 -0) =================================================================== --- lib/ctx.c 2015-01-05 23:06:34 +0900 (7de01f6) +++ lib/ctx.c 2015-01-06 23:20:50 +0900 (565cc27) @@ -3003,6 +3003,18 @@ grn_set_term_handler(void) } void +grn_ctx_output_flush(grn_ctx *ctx, int flags) +{ + if (flags & GRN_CTX_QUIET) { + return; + } + if (!ctx->impl->output) { + return; + } + ctx->impl->output(ctx, 0, ctx->impl->data.ptr); +} + +void grn_ctx_output_array_open(grn_ctx *ctx, const char *name, int nelements) { grn_output_array_open(ctx, ctx->impl->outbuf, ctx->impl->output_type, Modified: lib/proc.c (+6 -0) =================================================================== --- lib/proc.c 2015-01-05 23:06:34 +0900 (359a53a) +++ lib/proc.c 2015-01-06 23:20:50 +0900 (e13ebb9) @@ -2629,6 +2629,8 @@ exit : return NULL; } +static const size_t DUMP_FLUSH_THRESHOLD_SIZE = 256 * 1024; + static void dump_name(grn_ctx *ctx, grn_obj *outbuf, const char *name, int name_len) { @@ -2966,6 +2968,9 @@ dump_records(grn_ctx *ctx, grn_obj *outbuf, grn_obj *table) } } GRN_TEXT_PUTC(ctx, outbuf, ']'); + if (GRN_TEXT_LEN(outbuf) >= DUMP_FLUSH_THRESHOLD_SIZE) { + grn_ctx_output_flush(ctx, 0); + } } GRN_TEXT_PUTS(ctx, outbuf, "\n]\n"); GRN_TEXT_PUT(ctx, outbuf, GRN_TEXT_VALUE(&delete_commands), @@ -3217,6 +3222,7 @@ proc_dump(grn_ctx *ctx, int nargs, grn_obj **args, grn_user_data *user_data) ctx->impl->output_type = GRN_CONTENT_NONE; ctx->impl->mime_type = "text/x-groonga-command-list"; dump_schema(ctx, outbuf); + grn_ctx_output_flush(ctx, 0); /* To update index columns correctly, we first create the whole schema, then load non-derivative records, while skipping records of index columns. That way, groonga will silently do the job of updating index columns for us. */ Modified: src/groonga.c (+217 -77) =================================================================== --- src/groonga.c 2015-01-05 23:06:34 +0900 (da3f853) +++ src/groonga.c 2015-01-06 23:20:50 +0900 (e27bd72) @@ -649,109 +649,245 @@ start_service(grn_ctx *ctx, const char *db_path, typedef struct { grn_msg *msg; + grn_bool in_body; + grn_bool is_chunked; } ht_context; static void -h_output(grn_ctx *ctx, int flags, void *arg) +h_output_set_header(grn_ctx *ctx, grn_obj *header, + grn_rc rc, long long int content_length) { - grn_rc expr_rc = ctx->rc; - ht_context *hc = (ht_context *)arg; - grn_sock fd = hc->msg->u.fd; - grn_obj header, head, foot, *outbuf = ctx->impl->outbuf; - grn_bool should_return_body = (hc->msg->header.qtype == 'G'); - if (!(flags & GRN_CTX_TAIL)) { return; } - GRN_TEXT_INIT(&header, 0); - GRN_TEXT_INIT(&head, 0); - GRN_TEXT_INIT(&foot, 0); - output_envelope(ctx, expr_rc, &head, outbuf, &foot); - switch (expr_rc) { + switch (rc) { case GRN_SUCCESS : - GRN_TEXT_SETS(ctx, &header, "HTTP/1.1 200 OK\r\n"); + GRN_TEXT_SETS(ctx, header, "HTTP/1.1 200 OK\r\n"); break; case GRN_INVALID_ARGUMENT : case GRN_SYNTAX_ERROR : - GRN_TEXT_SETS(ctx, &header, "HTTP/1.1 400 Bad Request\r\n"); + GRN_TEXT_SETS(ctx, header, "HTTP/1.1 400 Bad Request\r\n"); break; case GRN_NO_SUCH_FILE_OR_DIRECTORY : - GRN_TEXT_SETS(ctx, &header, "HTTP/1.1 404 Not Found\r\n"); + GRN_TEXT_SETS(ctx, header, "HTTP/1.1 404 Not Found\r\n"); break; default : - GRN_TEXT_SETS(ctx, &header, "HTTP/1.1 500 Internal Server Error\r\n"); + GRN_TEXT_SETS(ctx, header, "HTTP/1.1 500 Internal Server Error\r\n"); break; } - GRN_TEXT_PUTS(ctx, &header, "Connection: close\r\n"); - GRN_TEXT_PUTS(ctx, &header, "Content-Type: "); - GRN_TEXT_PUTS(ctx, &header, grn_ctx_get_mime_type(ctx)); - GRN_TEXT_PUTS(ctx, &header, "\r\nContent-Length: "); - grn_text_lltoa(ctx, &header, - GRN_TEXT_LEN(&head) + GRN_TEXT_LEN(outbuf) + GRN_TEXT_LEN(&foot)); - GRN_TEXT_PUTS(ctx, &header, "\r\n\r\n"); - { - ssize_t ret, len; + GRN_TEXT_PUTS(ctx, header, "Content-Type: "); + GRN_TEXT_PUTS(ctx, header, grn_ctx_get_mime_type(ctx)); + GRN_TEXT_PUTS(ctx, header, "\r\n"); + if (content_length >= 0) { + GRN_TEXT_PUTS(ctx, header, "Connection: close\r\n"); + GRN_TEXT_PUTS(ctx, header, "Content-Length: "); + grn_text_lltoa(ctx, header, content_length); + GRN_TEXT_PUTS(ctx, header, "\r\n"); + } else { + GRN_TEXT_PUTS(ctx, header, "Transfer-Encoding: chunked\r\n"); + } + GRN_TEXT_PUTS(ctx, header, "\r\n"); +} + +static void +h_output_send(grn_ctx *ctx, grn_sock fd, + grn_obj *header, grn_obj *head, grn_obj *body, grn_obj *foot) +{ + ssize_t ret; + ssize_t len = 0; #ifdef WIN32 - int n_buffers; - WSABUF wsabufs[4]; - wsabufs[0].buf = GRN_TEXT_VALUE(&header); - wsabufs[0].len = GRN_TEXT_LEN(&header); - n_buffers = 1; - len = GRN_TEXT_LEN(&header); - if (should_return_body) { - wsabufs[1].buf = GRN_TEXT_VALUE(&head); - wsabufs[1].len = GRN_TEXT_LEN(&head); - wsabufs[2].buf = GRN_TEXT_VALUE(outbuf); - wsabufs[2].len = GRN_TEXT_LEN(outbuf); - wsabufs[3].buf = GRN_TEXT_VALUE(&foot); - wsabufs[3].len = GRN_TEXT_LEN(&foot); - n_buffers += 3; - len += GRN_TEXT_LEN(&head) + GRN_TEXT_LEN(outbuf) + GRN_TEXT_LEN(&foot); - } - { - DWORD sent; - if (WSASend(fd, wsabufs, n_buffers, &sent, 0, NULL, NULL) == SOCKET_ERROR) { - SERR("WSASend"); - } - ret = sent; + int n_buffers = 0; + WSABUF wsabufs[4]; + if (header) { + wsabufs[n_buffers].buf = GRN_TEXT_VALUE(header); + wsabufs[n_buffers].len = GRN_TEXT_LEN(header); + len += GRN_TEXT_LEN(header); + n_buffers++; + } + if (head) { + wsabufs[n_buffers].buf = GRN_TEXT_VALUE(head); + wsabufs[n_buffers].len = GRN_TEXT_LEN(head); + len += GRN_TEXT_LEN(head); + n_buffers++; + } + if (body) { + wsabufs[n_buffers].buf = GRN_TEXT_VALUE(body); + wsabufs[n_buffers].len = GRN_TEXT_LEN(body); + len += GRN_TEXT_LEN(body); + n_buffers++; + } + if (foot) { + wsabufs[n_buffers].buf = GRN_TEXT_VALUE(foot); + wsabufs[n_buffers].len = GRN_TEXT_LEN(foot); + len += GRN_TEXT_LEN(foot); + n_buffers++; + } + { + DWORD sent; + if (WSASend(fd, wsabufs, n_buffers, &sent, 0, NULL, NULL) == SOCKET_ERROR) { + SERR("WSASend"); } + ret = sent; + } #else /* WIN32 */ - struct iovec msg_iov[4]; - struct msghdr msg; - msg.msg_name = NULL; - msg.msg_namelen = 0; - msg.msg_iov = msg_iov; - msg.msg_iovlen = 1; - msg.msg_control = NULL; - msg.msg_controllen = 0; - msg.msg_flags = 0; - msg_iov[0].iov_base = GRN_TEXT_VALUE(&header); - msg_iov[0].iov_len = GRN_TEXT_LEN(&header); - len = GRN_TEXT_LEN(&header); - if (should_return_body) { - msg_iov[1].iov_base = GRN_TEXT_VALUE(&head); - msg_iov[1].iov_len = GRN_TEXT_LEN(&head); - msg_iov[2].iov_base = GRN_TEXT_VALUE(outbuf); - msg_iov[2].iov_len = GRN_TEXT_LEN(outbuf); - msg_iov[3].iov_base = GRN_TEXT_VALUE(&foot); - msg_iov[3].iov_len = GRN_TEXT_LEN(&foot); - msg.msg_iovlen += 3; - len += GRN_TEXT_LEN(&head) + GRN_TEXT_LEN(outbuf) + GRN_TEXT_LEN(&foot); - } - if ((ret = sendmsg(fd, &msg, MSG_NOSIGNAL)) == -1) { - SERR("sendmsg"); - } + struct iovec msg_iov[4]; + struct msghdr msg; + msg.msg_name = NULL; + msg.msg_namelen = 0; + msg.msg_iov = msg_iov; + msg.msg_iovlen = 0; + msg.msg_control = NULL; + msg.msg_controllen = 0; + msg.msg_flags = 0; + + if (header) { + msg_iov[msg.msg_iovlen].iov_base = GRN_TEXT_VALUE(header); + msg_iov[msg.msg_iovlen].iov_len = GRN_TEXT_LEN(header); + len += GRN_TEXT_LEN(header); + msg.msg_iovlen++; + } + if (head) { + msg_iov[msg.msg_iovlen].iov_base = GRN_TEXT_VALUE(head); + msg_iov[msg.msg_iovlen].iov_len = GRN_TEXT_LEN(head); + len += GRN_TEXT_LEN(head); + msg.msg_iovlen++; + } + if (body) { + msg_iov[msg.msg_iovlen].iov_base = GRN_TEXT_VALUE(body); + msg_iov[msg.msg_iovlen].iov_len = GRN_TEXT_LEN(body); + len += GRN_TEXT_LEN(body); + msg.msg_iovlen++; + } + if (foot) { + msg_iov[msg.msg_iovlen].iov_base = GRN_TEXT_VALUE(foot); + msg_iov[msg.msg_iovlen].iov_len = GRN_TEXT_LEN(foot); + len += GRN_TEXT_LEN(foot); + msg.msg_iovlen++; + } + if ((ret = sendmsg(fd, &msg, MSG_NOSIGNAL)) == -1) { + SERR("sendmsg"); + } #endif /* WIN32 */ - if (ret != len) { - GRN_LOG(&grn_gctx, GRN_LOG_NOTICE, - "couldn't send all data (%" GRN_FMT_LLD "/%" GRN_FMT_LLD ")", - (long long int)ret, (long long int)len); + if (ret != len) { + GRN_LOG(&grn_gctx, GRN_LOG_NOTICE, + "couldn't send all data (%" GRN_FMT_LLD "/%" GRN_FMT_LLD ")", + (long long int)ret, (long long int)len); + } +} + +static void +h_output_raw(grn_ctx *ctx, int flags, ht_context *hc) +{ + grn_rc expr_rc = ctx->rc; + grn_sock fd = hc->msg->u.fd; + grn_obj header_; + grn_obj head_; + grn_obj body_; + grn_obj foot_; + grn_obj *header = NULL; + grn_obj *head = NULL; + grn_obj *body = NULL; + grn_obj *foot = NULL; + char *chunk = NULL; + unsigned int chunk_size = 0; + int recv_flags; + grn_bool is_last_message = (flags & GRN_CTX_TAIL); + + GRN_TEXT_INIT(&header_, 0); + GRN_TEXT_INIT(&head_, 0); + GRN_TEXT_INIT(&body_, GRN_OBJ_DO_SHALLOW_COPY); + GRN_TEXT_INIT(&foot_, 0); + + grn_ctx_recv(ctx, &chunk, &chunk_size, &recv_flags); + GRN_TEXT_SET(ctx, &body_, chunk, chunk_size); + + if (!hc->in_body) { + if (is_last_message) { + h_output_set_header(ctx, &header_, expr_rc, GRN_TEXT_LEN(&body_)); + hc->is_chunked = GRN_FALSE; + } else { + h_output_set_header(ctx, &header_, expr_rc, -1); + hc->is_chunked = GRN_TRUE; + } + header = &header_; + hc->in_body = GRN_TRUE; + } + + if (GRN_TEXT_LEN(&body_) > 0) { + if (hc->is_chunked) { + grn_text_printf(ctx, &head_, + "%x\r\n", (unsigned int)GRN_TEXT_LEN(&body_)); + head = &head_; + GRN_TEXT_PUTS(ctx, &foot_, "\r\n"); + foot = &foot_; + } + body = &body_; + } + + if (is_last_message) { + if (hc->is_chunked) { + GRN_TEXT_PUTS(ctx, &foot_, "0\r\n"); + GRN_TEXT_PUTS(ctx, &foot_, "Connection: close\r\n"); + GRN_TEXT_PUTS(ctx, &foot_, "\r\n"); + foot = &foot_; } } - GRN_BULK_REWIND(outbuf); + + h_output_send(ctx, fd, header, head, body, foot); + + GRN_OBJ_FIN(ctx, &foot_); + GRN_OBJ_FIN(ctx, &body_); + GRN_OBJ_FIN(ctx, &head_); + GRN_OBJ_FIN(ctx, &header_); +} + +static void +h_output_typed(grn_ctx *ctx, int flags, ht_context *hc) +{ + grn_rc expr_rc = ctx->rc; + grn_sock fd = hc->msg->u.fd; + grn_obj header, head, body, foot; + char *chunk = NULL; + unsigned int chunk_size = 0; + int recv_flags; + grn_bool should_return_body = (hc->msg->header.qtype == 'G'); + + if (!(flags & GRN_CTX_TAIL)) { return; } + + GRN_TEXT_INIT(&header, 0); + GRN_TEXT_INIT(&head, 0); + GRN_TEXT_INIT(&body, GRN_OBJ_DO_SHALLOW_COPY); + GRN_TEXT_INIT(&foot, 0); + + grn_ctx_recv(ctx, &chunk, &chunk_size, &recv_flags); + GRN_TEXT_SET(ctx, &body, chunk, chunk_size); + + output_envelope(ctx, expr_rc, &head, &body, &foot); + h_output_set_header(ctx, &header, expr_rc, + GRN_TEXT_LEN(&head) + + GRN_TEXT_LEN(&body) + + GRN_TEXT_LEN(&foot)); + if (should_return_body) { + h_output_send(ctx, fd, &header, &head, &body, &foot); + } else { + h_output_send(ctx, fd, &header, NULL, NULL, NULL); + } GRN_OBJ_FIN(ctx, &foot); + GRN_OBJ_FIN(ctx, &body); GRN_OBJ_FIN(ctx, &head); GRN_OBJ_FIN(ctx, &header); } static void +h_output(grn_ctx *ctx, int flags, void *arg) +{ + ht_context *hc = (ht_context *)arg; + + if (grn_ctx_get_output_type(ctx) == GRN_CONTENT_NONE) { + h_output_raw(ctx, flags, hc); + } else { + h_output_typed(ctx, flags, hc); + } +} + +static void do_htreq_get(grn_ctx *ctx, grn_msg *msg) { char *path = NULL; @@ -982,6 +1118,8 @@ do_htreq_post(grn_ctx *ctx, grn_msg *msg) if (ctx->rc != GRN_SUCCESS) { ht_context context; context.msg = msg; + context.in_body = GRN_FALSE; + context.is_chunked = GRN_FALSE; h_output(ctx, GRN_CTX_TAIL, &context); return; } @@ -1698,6 +1836,8 @@ h_worker(void *arg) nfthreads--; MUTEX_UNLOCK(q_mutex); hc.msg = (grn_msg *)msg; + hc.in_body = GRN_FALSE; + hc.is_chunked = GRN_FALSE; do_htreq(ctx, (grn_msg *)msg); MUTEX_LOCK(q_mutex); } while (nfthreads < max_nfthreads && grn_gctx.stat != GRN_CTX_QUIT); -------------- next part -------------- HTML����������������������������...ダウンロード