Source file src/net/http/clientconn_test.go

     1  // Copyright 2025 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package http_test
     6  
     7  import (
     8  	"context"
     9  	"fmt"
    10  	"io"
    11  	"net/http"
    12  	"sync"
    13  	"sync/atomic"
    14  	"testing"
    15  	"testing/synctest"
    16  	"time"
    17  )
    18  
    19  func TestTransportNewClientConnRoundTrip(t *testing.T) { run(t, testTransportNewClientConnRoundTrip) }
    20  func testTransportNewClientConnRoundTrip(t *testing.T, mode testMode) {
    21  	cst := newClientServerTest(t, mode, http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
    22  		io.WriteString(w, req.Host)
    23  	}), optFakeNet)
    24  
    25  	scheme := mode.Scheme() // http or https
    26  	cc, err := cst.tr.NewClientConn(t.Context(), scheme, cst.ts.Listener.Addr().String())
    27  	if err != nil {
    28  		t.Fatal(err)
    29  	}
    30  	defer cc.Close()
    31  
    32  	// Send requests for a couple different domains.
    33  	// All use the same connection.
    34  	for _, host := range []string{"example.tld", "go.dev"} {
    35  		req, _ := http.NewRequest("GET", fmt.Sprintf("%v://%v/", scheme, host), nil)
    36  		resp, err := cc.RoundTrip(req)
    37  		if err != nil {
    38  			t.Fatal(err)
    39  		}
    40  		got, _ := io.ReadAll(resp.Body)
    41  		if string(got) != host {
    42  			t.Errorf("got response body %q, want %v", got, host)
    43  		}
    44  		resp.Body.Close()
    45  
    46  		// CloseIdleConnections does not close connections created by NewClientConn.
    47  		cst.tr.CloseIdleConnections()
    48  	}
    49  
    50  	if err := cc.Err(); err != nil {
    51  		t.Errorf("before close: ClientConn.Err() = %v, want nil", err)
    52  	}
    53  
    54  	cc.Close()
    55  	if err := cc.Err(); err == nil {
    56  		t.Errorf("after close: ClientConn.Err() = nil, want error")
    57  	}
    58  
    59  	req, _ := http.NewRequest("GET", scheme+"://example.tld/", nil)
    60  	resp, err := cc.RoundTrip(req)
    61  	if err == nil {
    62  		resp.Body.Close()
    63  		t.Errorf("after close: cc.RoundTrip succeeded, want error")
    64  	}
    65  	t.Log(err)
    66  }
    67  
    68  func newClientConnTest(t testing.TB, mode testMode, h http.HandlerFunc, opts ...any) (*clientServerTest, *http.ClientConn) {
    69  	if h == nil {
    70  		h = func(w http.ResponseWriter, req *http.Request) {}
    71  	}
    72  	cst := newClientServerTest(t, mode, h, opts...)
    73  	cc, err := cst.tr.NewClientConn(t.Context(), mode.Scheme(), cst.ts.Listener.Addr().String())
    74  	if err != nil {
    75  		t.Fatal(err)
    76  	}
    77  	t.Cleanup(func() {
    78  		cc.Close()
    79  	})
    80  	synctest.Wait()
    81  	return cst, cc
    82  }
    83  
    84  // TestClientConnReserveAll reserves every concurrency slot on a connection.
    85  func TestClientConnReserveAll(t *testing.T) { runSynctest(t, testClientConnReserveAll) }
    86  func testClientConnReserveAll(t *testing.T, mode testMode) {
    87  	cst, cc := newClientConnTest(t, mode, nil, optFakeNet, func(s *http.Server) {
    88  		s.HTTP2 = &http.HTTP2Config{
    89  			MaxConcurrentStreams: 3,
    90  		}
    91  	})
    92  
    93  	want := 1
    94  	switch mode {
    95  	case http2Mode, http2UnencryptedMode:
    96  		want = cst.ts.Config.HTTP2.MaxConcurrentStreams
    97  	}
    98  	available := cc.Available()
    99  	if available != want {
   100  		t.Fatalf("cc.Available() = %v, want %v", available, want)
   101  	}
   102  
   103  	// Reserve every available concurrency slot on the connection.
   104  	for i := range available {
   105  		if err := cc.Reserve(); err != nil {
   106  			t.Fatalf("cc.Reserve() #%v = %v, want nil", i, err)
   107  		}
   108  		if got, want := cc.Available(), available-i-1; got != want {
   109  			t.Fatalf("cc.Available() = %v, want %v", got, want)
   110  		}
   111  		if got, want := cc.InFlight(), i+1; got != want {
   112  			t.Fatalf("cc.InFlight() = %v, want %v", got, want)
   113  		}
   114  	}
   115  
   116  	// The next reservation attempt should fail, since every slot is consumed.
   117  	if err := cc.Reserve(); err == nil {
   118  		t.Fatalf("cc.Reserve() = nil, want error")
   119  	}
   120  }
   121  
   122  // TestClientConnReserveParallel starts concurrent goroutines which reserve every
   123  // concurrency slot on a connection.
   124  func TestClientConnReserveParallel(t *testing.T) { runSynctest(t, testClientConnReserveParallel) }
   125  func testClientConnReserveParallel(t *testing.T, mode testMode) {
   126  	_, cc := newClientConnTest(t, mode, nil, optFakeNet, func(s *http.Server) {
   127  		s.HTTP2 = &http.HTTP2Config{
   128  			MaxConcurrentStreams: 3,
   129  		}
   130  	})
   131  	var (
   132  		wg      sync.WaitGroup
   133  		mu      sync.Mutex
   134  		success int
   135  		failure int
   136  	)
   137  	available := cc.Available()
   138  	const extra = 2
   139  	for range available + extra {
   140  		wg.Go(func() {
   141  			err := cc.Reserve()
   142  			mu.Lock()
   143  			defer mu.Unlock()
   144  			if err == nil {
   145  				success++
   146  			} else {
   147  				failure++
   148  			}
   149  		})
   150  	}
   151  	wg.Wait()
   152  
   153  	if got, want := success, available; got != want {
   154  		t.Errorf("%v successful reservations, want %v", got, want)
   155  	}
   156  	if got, want := failure, extra; got != want {
   157  		t.Errorf("%v failed reservations, want %v", got, want)
   158  	}
   159  }
   160  
   161  // TestClientConnReserveRelease repeatedly reserves and releases concurrency slots.
   162  func TestClientConnReserveRelease(t *testing.T) { runSynctest(t, testClientConnReserveRelease) }
   163  func testClientConnReserveRelease(t *testing.T, mode testMode) {
   164  	_, cc := newClientConnTest(t, mode, nil, optFakeNet, func(s *http.Server) {
   165  		s.HTTP2 = &http.HTTP2Config{
   166  			MaxConcurrentStreams: 3,
   167  		}
   168  	})
   169  
   170  	available := cc.Available()
   171  	for i := range 2 * available {
   172  		if err := cc.Reserve(); err != nil {
   173  			t.Fatalf("cc.Reserve() #%v = %v, want nil", i, err)
   174  		}
   175  		cc.Release()
   176  	}
   177  
   178  	if got, want := cc.Available(), available; got != want {
   179  		t.Fatalf("cc.Available() = %v, want %v", available, want)
   180  	}
   181  }
   182  
   183  // TestClientConnReserveAndConsume reserves a concurrency slot on a connection,
   184  // and then verifies that various events consume the reservation.
   185  func TestClientConnReserveAndConsume(t *testing.T) {
   186  	for _, test := range []struct {
   187  		name     string
   188  		consume  func(t *testing.T, cc *http.ClientConn, mode testMode)
   189  		handler  func(w http.ResponseWriter, req *http.Request, donec chan struct{})
   190  		h1Closed bool
   191  	}{{
   192  		// Explicit release.
   193  		name: "release",
   194  		consume: func(t *testing.T, cc *http.ClientConn, mode testMode) {
   195  			cc.Release()
   196  		},
   197  	}, {
   198  		// Invalid request sent to RoundTrip.
   199  		name: "invalid field name",
   200  		consume: func(t *testing.T, cc *http.ClientConn, mode testMode) {
   201  			req, _ := http.NewRequest("GET", mode.Scheme()+"://example.tld/", nil)
   202  			req.Header["invalid field name"] = []string{"x"}
   203  			_, err := cc.RoundTrip(req)
   204  			if err == nil {
   205  				t.Fatalf("RoundTrip succeeded, want failure")
   206  			}
   207  		},
   208  	}, {
   209  		// Successful request/response cycle.
   210  		name: "body close",
   211  		consume: func(t *testing.T, cc *http.ClientConn, mode testMode) {
   212  			req, _ := http.NewRequest("GET", mode.Scheme()+"://example.tld/", nil)
   213  			resp, err := cc.RoundTrip(req)
   214  			if err != nil {
   215  				t.Fatalf("RoundTrip: %v", err)
   216  			}
   217  			resp.Body.Close()
   218  		},
   219  	}, {
   220  		// Request context canceled before headers received.
   221  		name: "cancel",
   222  		consume: func(t *testing.T, cc *http.ClientConn, mode testMode) {
   223  			ctx, cancel := context.WithCancel(t.Context())
   224  			go func() {
   225  				req, _ := http.NewRequestWithContext(ctx, "GET", mode.Scheme()+"://example.tld/", nil)
   226  				_, err := cc.RoundTrip(req)
   227  				if err == nil {
   228  					t.Errorf("RoundTrip succeeded, want failure")
   229  				}
   230  			}()
   231  			synctest.Wait()
   232  			cancel()
   233  		},
   234  		handler: func(w http.ResponseWriter, req *http.Request, donec chan struct{}) {
   235  			<-donec
   236  		},
   237  		// An HTTP/1 connection is closed after a request is canceled on it.
   238  		h1Closed: true,
   239  	}, {
   240  		// Response body closed before full response received.
   241  		name: "early body close",
   242  		consume: func(t *testing.T, cc *http.ClientConn, mode testMode) {
   243  			req, _ := http.NewRequest("GET", mode.Scheme()+"://example.tld/", nil)
   244  			resp, err := cc.RoundTrip(req)
   245  			if err != nil {
   246  				t.Fatalf("RoundTrip: %v", err)
   247  			}
   248  			t.Logf("%T", resp.Body)
   249  			resp.Body.Close()
   250  		},
   251  		handler: func(w http.ResponseWriter, req *http.Request, donec chan struct{}) {
   252  			w.WriteHeader(200)
   253  			http.NewResponseController(w).Flush()
   254  			<-donec
   255  		},
   256  		// An HTTP/1 connection is closed after a request is canceled on it.
   257  		h1Closed: true,
   258  	}} {
   259  		t.Run(test.name, func(t *testing.T) {
   260  			runSynctest(t, func(t *testing.T, mode testMode) {
   261  				donec := make(chan struct{})
   262  				defer close(donec)
   263  				handler := func(w http.ResponseWriter, req *http.Request) {
   264  					if test.handler != nil {
   265  						test.handler(w, req, donec)
   266  					}
   267  				}
   268  
   269  				_, cc := newClientConnTest(t, mode, handler, optFakeNet)
   270  				stateHookCalls := 0
   271  				cc.SetStateHook(func(cc *http.ClientConn) {
   272  					stateHookCalls++
   273  				})
   274  				synctest.Wait()
   275  				stateHookCalls = 0 // ignore any initial update call
   276  
   277  				avail := cc.Available()
   278  				if err := cc.Reserve(); err != nil {
   279  					t.Fatalf("cc.Reserve() = %v, want nil", err)
   280  				}
   281  				synctest.Wait()
   282  				if got, want := stateHookCalls, 0; got != want {
   283  					t.Errorf("connection state hook calls: %v, want %v", got, want)
   284  				}
   285  
   286  				test.consume(t, cc, mode)
   287  				if mode == http1Mode || mode == https1Mode {
   288  					time.Sleep(http.MaxPostCloseReadTime)
   289  				}
   290  				synctest.Wait()
   291  
   292  				// State hook should be called, either to report the
   293  				// connection availability increasing or the connection closing,
   294  				// or both.
   295  				if stateHookCalls == 0 {
   296  					t.Errorf("connection state hook calls: %v, want >1", stateHookCalls)
   297  				}
   298  
   299  				if test.h1Closed && (mode == http1Mode || mode == https1Mode) {
   300  					if got, want := cc.Available(), 0; got != want {
   301  						t.Errorf("cc.Available() = %v, want %v", got, want)
   302  					}
   303  					if got, want := cc.InFlight(), 0; got != want {
   304  						t.Errorf("cc.InFlight() = %v, want %v", got, want)
   305  					}
   306  					if err := cc.Err(); err == nil {
   307  						t.Errorf("cc.Err() = nil, want closed connection")
   308  					}
   309  				} else {
   310  					if got, want := cc.Available(), avail; got != want {
   311  						t.Errorf("cc.Available() = %v, want %v", got, want)
   312  					}
   313  					if got, want := cc.InFlight(), 0; got != want {
   314  						t.Errorf("cc.InFlight() = %v, want %v", got, want)
   315  					}
   316  					if err := cc.Err(); err != nil {
   317  						t.Errorf("cc.Err() = %v, want nil", err)
   318  					}
   319  				}
   320  
   321  				if cc.Available() > 0 {
   322  					if err := cc.Reserve(); err != nil {
   323  						t.Errorf("cc.Reserve() = %v, want success", err)
   324  					}
   325  				}
   326  			})
   327  		})
   328  	}
   329  
   330  }
   331  
   332  // TestClientConnRoundTripBlocks verifies that RoundTrip blocks until a concurrency
   333  // slot is available on a connection.
   334  func TestClientConnRoundTripBlocks(t *testing.T) { runSynctest(t, testClientConnRoundTripBlocks) }
   335  func testClientConnRoundTripBlocks(t *testing.T, mode testMode) {
   336  	var handlerCalls atomic.Int64
   337  	requestc := make(chan struct{})
   338  	handler := func(w http.ResponseWriter, req *http.Request) {
   339  		handlerCalls.Add(1)
   340  		<-requestc
   341  	}
   342  	_, cc := newClientConnTest(t, mode, handler, optFakeNet, func(s *http.Server) {
   343  		s.HTTP2 = &http.HTTP2Config{
   344  			MaxConcurrentStreams: 3,
   345  		}
   346  	})
   347  
   348  	available := cc.Available()
   349  	var responses atomic.Int64
   350  	const extra = 2
   351  	for range available + extra {
   352  		go func() {
   353  			req, _ := http.NewRequest("GET", mode.Scheme()+"://example.tld/", nil)
   354  			resp, err := cc.RoundTrip(req)
   355  			responses.Add(1)
   356  			if err != nil {
   357  				t.Errorf("RoundTrip: %v", err)
   358  				return
   359  			}
   360  			resp.Body.Close()
   361  		}()
   362  	}
   363  
   364  	synctest.Wait()
   365  	if got, want := int(handlerCalls.Load()), available; got != want {
   366  		t.Errorf("got %v handler calls, want %v", got, want)
   367  	}
   368  	if got, want := int(responses.Load()), 0; got != want {
   369  		t.Errorf("got %v responses, want %v", got, want)
   370  	}
   371  
   372  	for i := range available + extra {
   373  		requestc <- struct{}{}
   374  		synctest.Wait()
   375  		if got, want := int(responses.Load()), i+1; got != want {
   376  			t.Errorf("got %v responses, want %v", got, want)
   377  		}
   378  	}
   379  }
   380  

View as plain text