リビジョン | 7764 (tree) |
---|---|
日時 | 2019-06-12 21:07:34 |
作者 | (del#24082) |
WSAAsyncSelect()でメッセージ通知を抑止する方法を止めて、bufchainの上限と下限を見て、
recv()の一時停止および再開をするようにした。
チケット #39297
@@ -679,9 +679,21 @@ | ||
679 | 679 | |
680 | 680 | while (channel->local_socket != INVALID_SOCKET) { |
681 | 681 | char buf[CHANNEL_READ_BUF_SIZE]; |
682 | - int amount = recv(channel->local_socket, buf, sizeof(buf), 0); | |
682 | + int amount; | |
683 | 683 | int err; |
684 | 684 | |
685 | + // recvの一時停止中ならば、何もせずに戻る。 | |
686 | + if (SSHv2(pvar)) { | |
687 | + Channel_t* c = ssh2_local_channel_lookup(channel_num); | |
688 | + if (c->bufchain_recv_suspended) { | |
689 | + logprintf(LOG_LEVEL_NOTICE, "%s: channel=%d recv was skipped for flow control", | |
690 | + __FUNCTION__, channel_num); | |
691 | + return; | |
692 | + } | |
693 | + } | |
694 | + | |
695 | + amount = recv(channel->local_socket, buf, sizeof(buf), 0); | |
696 | + | |
685 | 697 | // Xサーバからのデータ受信があれば、ノンブロッキングモードでソケット受信を行い、 |
686 | 698 | // SSHサーバのXアプリケーションへ送信する。 |
687 | 699 | //OutputDebugPrintf("%s: recv %d\n", __FUNCTION__, amount); |
@@ -740,10 +752,10 @@ | ||
740 | 752 | } |
741 | 753 | } |
742 | 754 | |
743 | -// local connection(WinSock)からのメッセージ通知を切り替える | |
755 | +// local connectionの受信の停止および再開の判断を行う | |
744 | 756 | // |
745 | -// notify: TRUE メッセージを通知する | |
746 | -// FALSE メッセージを通知しない | |
757 | +// notify: TRUE recvを再開する | |
758 | +// FALSE recvを停止する | |
747 | 759 | // |
748 | 760 | // [目的] |
749 | 761 | // remote_windowに空きがない場合は通知オフとし、空きができた場合は |
@@ -757,49 +769,43 @@ | ||
757 | 769 | { |
758 | 770 | int channel_num; |
759 | 771 | FWDChannel* channel; |
760 | - int ret; | |
772 | + int changed = 0; | |
761 | 773 | |
762 | 774 | channel_num = c->local_num; |
763 | 775 | channel = pvar->fwd_state.channels + channel_num; |
764 | 776 | |
765 | 777 | if (notify) { |
766 | - // メッセージ通知を有効にする | |
767 | - ret = WSAAsyncSelect( | |
768 | - channel->local_socket, | |
769 | - make_accept_wnd(pvar), WM_SOCK_IO, | |
770 | - FD_CONNECT | FD_READ | FD_CLOSE | FD_WRITE | |
771 | - ); | |
772 | - } else { | |
773 | - /* メッセージ通知を無効にする。 | |
774 | - 無効後、キューに溜まっているメッセージが送られてくることがあるので注意。 | |
775 | - | |
776 | - https://docs.microsoft.com/en-us/windows/desktop/api/winsock/nf-winsock-wsaasyncselect | |
777 | - To cancel all notification indicating that Windows Sockets should send no further | |
778 | - messages related to network events on the socket, lEvent is set to zero. | |
778 | + // recvを再開するか判断する | |
779 | + if (c->bufchain_amount <= FWD_LOW_WATER_MARK) { | |
780 | + // 下限を下回ったので再開 | |
781 | + c->bufchain_recv_suspended = FALSE; | |
779 | 782 | |
780 | - Although WSAAsyncSelect immediately disables event message posting for the socket | |
781 | - in this instance, it is possible that messages could be waiting in the application | |
782 | - message queue. Therefore, the application must be prepared to receive network | |
783 | - event messages even after cancellation. | |
784 | - */ | |
785 | - ret = WSAAsyncSelect( | |
786 | - channel->local_socket, | |
787 | - make_accept_wnd(pvar), | |
788 | - 0, 0); | |
789 | - } | |
783 | + // ここで再開のメッセージを飛ばす | |
784 | + PostMessage(pvar->fwd_state.accept_wnd, WM_SOCK_IO, | |
785 | + (WPARAM)channel->local_socket, | |
786 | + MAKEWPARAM(FD_READ, 0) | |
787 | + ); | |
790 | 788 | |
791 | - if (ret != 0) { | |
792 | - logprintf(LOG_LEVEL_ERROR, "%s: Can not change local channel(%d) WinSock notification(%d).", | |
793 | - __FUNCTION__, channel_num, notify); | |
789 | + changed = 1; | |
790 | + } | |
791 | + | |
792 | + } else { | |
793 | + // recvを停止するか判断する | |
794 | + if (c->bufchain_amount >= FWD_HIGH_WATER_MARK) { | |
795 | + // 上限を超えたので停止 | |
796 | + c->bufchain_recv_suspended = TRUE; | |
797 | + changed = 1; | |
798 | + } | |
794 | 799 | } |
795 | - else { | |
796 | - logprintf(LOG_LEVEL_NOTICE, | |
797 | - "%s: Local channel#%d WinSock notification has been `%s' for flow control(buffer size %lu).", | |
798 | - __FUNCTION__, channel_num, | |
799 | - notify ? "enabled" : "disabled", | |
800 | - c->bufchain_amount); | |
801 | - } | |
802 | 800 | |
801 | + logprintf(LOG_LEVEL_NOTICE, | |
802 | + "%s: Local channel#%d recv has been `%s' for flow control(buffer size %lu, recv %s).", | |
803 | + __FUNCTION__, channel_num, | |
804 | + c->bufchain_recv_suspended ? "disabled" : "enabled", | |
805 | + c->bufchain_amount, | |
806 | + changed ? "changed" : "" | |
807 | + ); | |
808 | + | |
803 | 809 | } |
804 | 810 | |
805 | 811 |
@@ -35,6 +35,11 @@ | ||
35 | 35 | #ifndef __FWD_H |
36 | 36 | #define __FWD_H |
37 | 37 | |
38 | +// ポート転送におけるフロー制御の閾値 | |
39 | +// 適用先 Channel_t.bufchain_amount | |
40 | +#define FWD_HIGH_WATER_MARK (1 * 1024 * 1024) // 1MB | |
41 | +#define FWD_LOW_WATER_MARK (0) // 0MB | |
42 | + | |
38 | 43 | #define FWD_REMOTE_CONNECTED 0x01 |
39 | 44 | #define FWD_LOCAL_CONNECTED 0x02 |
40 | 45 | #define FWD_BOTH_CONNECTED (FWD_REMOTE_CONNECTED | FWD_LOCAL_CONNECTED) |
@@ -213,6 +213,8 @@ | ||
213 | 213 | c->type = type; |
214 | 214 | c->local_num = local_num; // alloc_channel()の返値を保存しておく |
215 | 215 | c->bufchain = NULL; |
216 | + c->bufchain_amount = 0; | |
217 | + c->bufchain_recv_suspended = FALSE; | |
216 | 218 | if (type == TYPE_SCP) { |
217 | 219 | c->scp.state = SCP_INIT; |
218 | 220 | c->scp.progress_window = NULL; |
@@ -390,7 +392,7 @@ | ||
390 | 392 | // SSH1で管理しているchannel構造体から、SSH2向けのChannel_tへ変換する。 |
391 | 393 | // TODO: 将来的にはチャネル構造体は1つに統合する。 |
392 | 394 | // (2005.6.12 yutaka) |
393 | -static Channel_t *ssh2_local_channel_lookup(int local_num) | |
395 | +Channel_t *ssh2_local_channel_lookup(int local_num) | |
394 | 396 | { |
395 | 397 | int i; |
396 | 398 | Channel_t *c; |
@@ -882,6 +882,7 @@ | ||
882 | 882 | int local_num; |
883 | 883 | bufchain_t *bufchain; |
884 | 884 | unsigned long bufchain_amount; |
885 | + BOOL bufchain_recv_suspended; | |
885 | 886 | scp_t scp; |
886 | 887 | buffer_t *agent_msg; |
887 | 888 | int agent_request_len; |
@@ -893,6 +894,7 @@ | ||
893 | 894 | unsigned char *begin_send_packet(PTInstVar pvar, int type, int len); |
894 | 895 | void finish_send_packet_special(PTInstVar pvar, int skip_compress); |
895 | 896 | void SSH2_send_channel_data(PTInstVar pvar, Channel_t *c, unsigned char *buf, unsigned int buflen, int retry); |
897 | +Channel_t* ssh2_local_channel_lookup(int local_num); | |
896 | 898 | |
897 | 899 | #define finish_send_packet(pvar) finish_send_packet_special((pvar), 0) |
898 | 900 | #define get_payload_uint32(pvar, offset) get_uint32_MSBfirst((pvar)->ssh_state.payload + (offset)) |