diff --git a/CHANGELOG.md b/CHANGELOG.md index 70022f903c..20cca8efd7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ All notable changes to this project will be documented in this file. From versio - Add config `client-error-verbosity` to customize error verbosity by @taimoorzaeem in #4088, #3980, #3824 - Add `Vary` header to responses by @develop7 in #4609 - Add config `db-timezone-enabled` for optional querying of timezones by @taimoorzaeem in #4751 +- Use SO_REUSEPORT on platforms supporting it by @mkleczek in #4703 #4694 ### Changed @@ -21,6 +22,9 @@ All notable changes to this project will be documented in this file. From versio - Log error when `db-schemas` config contains schema `pg_catalog` or `information_schema` by @taimoorzaeem in #4359 + Now fails at startup. Prior to this, it failed with `PGRST205` on requests related to these schemas. +### Fixed +- Shutdown should wait for in flight requests by @mkleczek in #4702 + ## [14.8] - 2026-04-03 ### Added diff --git a/nix/overlays/haskell-packages.nix b/nix/overlays/haskell-packages.nix index 36feba7640..6801caf072 100644 --- a/nix/overlays/haskell-packages.nix +++ b/nix/overlays/haskell-packages.nix @@ -78,6 +78,53 @@ let hasql-pool = lib.dontCheck prev.hasql-pool_1_0_1; hasql-transaction = lib.dontCheck prev.hasql-transaction_1_1_0_1; postgresql-binary = lib.dontCheck (lib.doJailbreak prev.postgresql-binary_0_13_1_3); + + http2 = + prev.callHackageDirect + { + pkg = "http2"; + ver = "5.4.0"; + sha256 = "sha256-PeEWVd61bQ8G7LvfLeXklzXqNJFaAjE2ecRMWJZESPE="; + } + { }; + + http-semantics = + prev.callHackageDirect + { + pkg = "http-semantics"; + ver = "0.4.0"; + sha256 = "sha256-rh0z51EKvsu5rQd5n2z3fSRjjEObouNZSBPO9NFYOF0="; + } + { }; + + network-run = + prev.callHackageDirect + { + pkg = "network-run"; + ver = "0.5.0"; + sha256 = "sha256-vbXh+CzxDsGApjqHxCYf/ijpZtUCApFbkcF5gyN0THU="; + } + { }; + + time-manager = + prev.callHackageDirect + { + pkg = "time-manager"; + ver = "0.2.4"; + sha256 = "sha256-sAt/331YLQ2IU3z90aKYSq1nxoazv87irsuJp7ZG3pw="; + } + { }; + + warp = + lib.dontCheck (prev.callCabal2nixWithOptions "warp" + (super.fetchFromGitHub { + owner = "mkleczek"; + repo = "wai"; + rev = "7ca66f023ccaf2e3862ad97392f1f11afea3b6ff"; + #sha256 = "sha256-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA="; + sha256 = "sha256-Z/1yikmlDZhjv4LhewjRvW7g5s8KZrHztFRnefEDu7Y="; + }) "--subpath=warp" + { }); }; in { diff --git a/src/PostgREST/Admin.hs b/src/PostgREST/Admin.hs index 99733a6995..f8501417be 100644 --- a/src/PostgREST/Admin.hs +++ b/src/PostgREST/Admin.hs @@ -22,20 +22,20 @@ import qualified PostgREST.AppState as AppState import qualified Network.Socket as NS import Protolude -runAdmin :: AppState -> Maybe NS.Socket -> NS.Socket -> Warp.Settings -> IO () -runAdmin appState maybeAdminSocket socketREST settings = do +runAdmin :: AppState -> Maybe NS.Socket -> IO (Maybe NS.Socket) -> Warp.Settings -> IO () +runAdmin appState maybeAdminSocket getSocketREST settings = do whenJust maybeAdminSocket $ \adminSocket -> do address <- resolveSocketToAddress adminSocket observer $ AdminStartObs address void . forkIO $ Warp.runSettingsSocket settings adminSocket adminApp where - adminApp = admin appState socketREST + adminApp = admin appState getSocketREST observer = AppState.getObserver appState -- | PostgREST admin application -admin :: AppState.AppState -> NS.Socket -> Wai.Application -admin appState socketREST req respond = do - isMainAppReachable <- isRight <$> reachMainApp socketREST +admin :: AppState.AppState -> IO (Maybe NS.Socket) -> Wai.Application +admin appState getSocketREST req respond = do + isMainAppReachable <- getSocketREST >>= maybe (pure False) (fmap isRight . reachMainApp) isLoaded <- AppState.isLoaded appState isPending <- AppState.isPending appState @@ -44,8 +44,8 @@ admin appState socketREST req respond = do respond $ Wai.responseLBS (if isMainAppReachable then HTTP.status200 else HTTP.status500) [] mempty ["ready"] -> let - status | not isMainAppReachable = HTTP.status500 - | isPending = HTTP.status503 + status | isPending = HTTP.status503 + | not isMainAppReachable = HTTP.status500 | isLoaded = HTTP.status200 | otherwise = HTTP.status500 in diff --git a/src/PostgREST/App.hs b/src/PostgREST/App.hs index 2938e61843..52954fbb80 100644 --- a/src/PostgREST/App.hs +++ b/src/PostgREST/App.hs @@ -12,6 +12,7 @@ Some of its functionality includes: {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE ViewPatterns #-} +{-# LANGUAGE TypeApplications #-} module PostgREST.App ( postgrest , run @@ -22,7 +23,10 @@ import GHC.IO.Exception (IOErrorType (..)) import System.IO.Error (ioeGetErrorType) import Control.Monad.Except (liftEither) +import Control.Monad.Extra (whenJust) import Data.Either.Combinators (mapLeft, whenLeft) +import Data.IORef (atomicWriteIORef, newIORef, + readIORef) import Data.Maybe (fromJust) import Data.String (IsString (..)) import Network.Wai.Handler.Warp (defaultSettings, setHost, @@ -62,11 +66,11 @@ import PostgREST.Version (docsVersion, prettyVersion) import qualified Data.ByteString.Char8 as BS import qualified Data.List as L -import Data.Streaming.Network (bindPortTCP, - bindRandomPortTCP) +import Data.Streaming.Network (HostPreference, + bindPortGenEx) import qualified Data.Text as T import qualified Network.HTTP.Types as HTTP -import qualified Network.HTTP.Types.Header as HTTP (hVary) +import qualified Network.HTTP.Types.Header as HTTP import qualified Network.Socket as NS import PostgREST.Unix (createAndBindDomainSocket) import Protolude hiding (Handler) @@ -77,21 +81,35 @@ run :: AppState -> IO () run appState = do conf@AppConfig{..} <- AppState.getConfig appState - AppState.schemaCacheLoader appState -- Loads the initial SchemaCache - (mainSocket, adminSocket) <- initSockets conf + mainSocketRef <- newIORef Nothing + adminSocket <- initAdminServerSocket conf - Unix.installSignalHandlers observer (AppState.getMainThreadId appState) (AppState.schemaCacheLoader appState) (AppState.readInDbConfig False appState) + let closeSockets = do + whenJust adminSocket NS.close + readIORef mainSocketRef >>= foldMap NS.close + Unix.installSignalHandlers observer closeSockets (AppState.schemaCacheLoader appState) (AppState.readInDbConfig False appState) + + Admin.runAdmin appState adminSocket (readIORef mainSocketRef) (serverSettings conf) Listener.runListener appState - Admin.runAdmin appState adminSocket mainSocket (serverSettings conf) + -- Kick off and wait for the initial SchemaCache load before creating the + -- main API socket. + AppState.schemaCacheLoader appState + AppState.waitForSchemaCacheLoaded appState + + mainSocket <- initServerSocket conf + atomicWriteIORef mainSocketRef $ Just mainSocket let app = postgrest configLogLevel appState (AppState.schemaCacheLoader appState) - do - address <- resolveSocketToAddress mainSocket - observer $ AppServerAddressObs address + address <- resolveSocketToAddress mainSocket + observer $ AppServerAddressObs address + -- Hardcoding maximum graceful shutdown timeout (arbitrary set to 5 seconds) + -- This is unfortunate but necessary becase graceful shutdowns don't work with HTTP keep-alive + -- causing Warp to handle requests on already opened connections even if the listen socket is closed + -- See: https://github.com/yesodweb/wai/issues/853 Warp.runSettingsSocket (serverSettings conf & setOnException onWarpException) mainSocket app where observer = AppState.getObserver appState @@ -248,38 +266,27 @@ addRetryHint delay response = do isServiceUnavailable :: Wai.Response -> Bool isServiceUnavailable response = Wai.responseStatus response == HTTP.status503 -type AppSockets = (NS.Socket, Maybe NS.Socket) - -initSockets :: AppConfig -> IO AppSockets -initSockets AppConfig{..} = do - let - cfg'usp = configServerUnixSocket - cfg'uspm = configServerUnixSocketMode - cfg'host = configServerHost - cfg'port = configServerPort - cfg'adminHost = configAdminServerHost - cfg'adminPort = configAdminServerPort - - sock <- case cfg'usp of - -- I'm not using `streaming-commons`' bindPath function here because it's not defined for Windows, - -- but we need to have runtime error if we try to use it in Windows, not compile time error - Just path -> createAndBindDomainSocket path cfg'uspm - Nothing -> do - (_, sock) <- - if cfg'port /= 0 - then do - sock <- bindPortTCP cfg'port (fromString $ T.unpack cfg'host) - pure (cfg'port, sock) - else do - -- explicitly bind to a random port, returning bound port number - (num, sock) <- bindRandomPortTCP (fromString $ T.unpack cfg'host) - pure (num, sock) - pure sock - - adminSock <- case cfg'adminPort of - Just adminPort -> do - adminSock <- bindPortTCP adminPort (fromString $ T.unpack cfg'adminHost) - pure $ Just adminSock - Nothing -> pure Nothing - - pure (sock, adminSock) +initServerSocket :: AppConfig -> IO NS.Socket +initServerSocket AppConfig{..} = case configServerUnixSocket of + -- I'm not using `streaming-commons`' bindPath function here because it's not defined for Windows, + -- but we need to have runtime error if we try to use it in Windows, not compile time error + Just path -> createAndBindDomainSocket path configServerUnixSocketMode + Nothing -> + bindPortTCPWithReusePort configServerPort (fromString $ T.unpack configServerHost) + +initAdminServerSocket :: AppConfig -> IO (Maybe NS.Socket) +initAdminServerSocket AppConfig{..} = + traverse (`bindPortTCPWithReusePort` adminHost) configAdminServerPort + where + adminHost = fromString $ T.unpack configAdminServerHost + +bindPortTCPWithReusePort :: Int -> HostPreference -> IO NS.Socket +bindPortTCPWithReusePort port hostPreference = do + -- Some unix variants can expose ReusePort but reject it at runtime. + -- Fall back to binding without ReusePort when that happens. + try @SomeException (bindPortGenEx reusePortOpts NS.Stream port hostPreference) + >>= either (const $ bindPortGenEx [] NS.Stream port hostPreference) pure + >>= listenSocket + where + reusePortOpts = [(NS.ReusePort, 1)] + listenSocket sock = NS.listen sock (max 2048 NS.maxListenQueue) $> sock diff --git a/src/PostgREST/AppState.hs b/src/PostgREST/AppState.hs index 44000294be..4d465aa8df 100644 --- a/src/PostgREST/AppState.hs +++ b/src/PostgREST/AppState.hs @@ -27,6 +27,7 @@ module PostgREST.AppState , getObserver , isLoaded , isPending + , waitForSchemaCacheLoaded ) where import qualified Data.ByteString.Char8 as BS @@ -391,6 +392,9 @@ markSchemaCacheLoaded = void . (`tryPutMVar` ()) . getSCStatusMVar . stateSCache isSchemaCacheLoaded :: AppState -> IO Bool isSchemaCacheLoaded = fmap not . isEmptyMVar . getSCStatusMVar . stateSCacheStatus +waitForSchemaCacheLoaded :: AppState -> IO () +waitForSchemaCacheLoaded = void . readMVar . getSCStatusMVar . stateSCacheStatus + -- | Reads the in-db config and reads the config file again -- | We don't retry reading the in-db config after it fails immediately, because it could have user errors. We just report the error and continue. readInDbConfig :: Bool -> AppState -> IO () diff --git a/src/PostgREST/Unix.hs b/src/PostgREST/Unix.hs index 2128c6cb96..3b1080404b 100644 --- a/src/PostgREST/Unix.hs +++ b/src/PostgREST/Unix.hs @@ -19,10 +19,9 @@ import System.Directory (removeFile) import System.IO.Error (isDoesNotExistError) -- | Set signal handlers, only for systems with signals -installSignalHandlers :: Observation.ObservationHandler -> ThreadId -> IO () -> IO () -> IO () +installSignalHandlers :: Observation.ObservationHandler -> IO () -> IO () -> IO () -> IO () #ifndef mingw32_HOST_OS -installSignalHandlers observer tid usr1 usr2 = do - let interrupt = throwTo tid UserInterrupt +installSignalHandlers observer interrupt usr1 usr2 = do install Signals.sigINT $ observer (Observation.TerminationUnixSignalObs "SIGINT") >> interrupt install Signals.sigTERM $ observer (Observation.TerminationUnixSignalObs "SIGTERM") >> interrupt install Signals.sigUSR1 usr1 diff --git a/test/io/postgrest.py b/test/io/postgrest.py index 33a64a0d9a..b78632973c 100644 --- a/test/io/postgrest.py +++ b/test/io/postgrest.py @@ -88,7 +88,7 @@ def run( admin_port=None, host=None, wait_for_readiness=True, - wait_max_seconds=1, + wait_max_seconds=3, no_pool_connection_available=False, no_startup_stdout=True, ): @@ -188,6 +188,7 @@ def wait_until_exit(postgrest, timeout=1): def wait_until_status_code(url, max_seconds, status_code): "Wait for the given HTTP endpoint to return a status code" session = requests_unixsocket.Session() + response = None for _ in range(max_seconds * 10): try: diff --git a/test/io/test_io.py b/test/io/test_io.py index 6dc81845c8..af60e18e6b 100644 --- a/test/io/test_io.py +++ b/test/io/test_io.py @@ -5,6 +5,7 @@ import signal import time import pytest +import requests from config import CONFIGSDIR, FIXTURES, SECRET from util import Thread, jwtauthheader, parse_server_timings_header @@ -105,7 +106,6 @@ def sleep(): t.join() -@pytest.mark.xfail(reason="Graceful shutdown is currently failing", strict=True) def test_graceful_shutdown_waits_for_in_flight_request(defaultenv): "SIGTERM should allow in-flight requests to finish before exiting" @@ -152,7 +152,6 @@ def test_random_port_bound(defaultenv): assert True # liveness check is done by run(), so we just need to check that it doesn't fail -@pytest.mark.xfail(reason="PostgREST should not start on a used port", strict=True) def test_so_reuseport_zero_downtime_handover(defaultenv): "A second PostgREST instance should take over on the same main/admin ports without request failures." @@ -1279,14 +1278,21 @@ def test_log_postgrest_host_and_port(host, defaultenv): with run( env=defaultenv, host=host, port=port, no_startup_stdout=False ) as postgrest: - output = postgrest.read_stdout(nlines=10) + output = postgrest.read_stdout(nlines=20) if is_unix: - re.match(r'API server listening on "/tmp/.*\.sock"', output[2]) + assert any( + "API server listening on" in line and "postgrest.sock" in line + for line in output + ) elif is_ipv6(host): - assert f"API server listening on [{host}]:{port}" in output[2] + assert any( + f"API server listening on [{host}]:{port}" in line for line in output + ) else: # IPv4 - assert f"API server listening on {host}:{port}" in output[2] + assert any( + f"API server listening on {host}:{port}" in line for line in output + ) def test_succeed_w_role_having_superuser_settings(defaultenv): @@ -1605,27 +1611,6 @@ def test_pgrst_log_503_client_error_to_stderr(defaultenv): assert any(log_message in line for line in output) -def test_log_error_when_empty_schema_cache_on_startup_to_stderr(defaultenv): - "Should log the 503 error message when there is an empty schema cache on startup" - - env = { - **defaultenv, - "PGRST_INTERNAL_SCHEMA_CACHE_QUERY_SLEEP": "300", - } - - with run(env=env, wait_for_readiness=False) as postgrest: - postgrest.wait_until_scache_starts_loading() - - response = postgrest.session.get("/projects") - assert response.status_code == 503 - - output_start = postgrest.read_stdout(nlines=10) - - log_err_message = '{"code":"PGRST002","details":null,"hint":null,"message":"Could not query the database for the schema cache. Retrying."}' - - assert any(log_err_message in line for line in output_start) - - def test_no_double_schema_cache_reload_on_empty_schema(defaultenv): "Should only load the schema cache once on a 503 error when there's an empty schema cache on startup" @@ -1637,8 +1622,8 @@ def test_no_double_schema_cache_reload_on_empty_schema(defaultenv): with run(env=env, port=freeport(), wait_for_readiness=False) as postgrest: postgrest.wait_until_scache_starts_loading() - response = postgrest.session.get("/projects") - assert response.status_code == 503 + with pytest.raises(requests.ConnectionError): + postgrest.session.get("/projects") # Should wait enough time to load the schema cache twice to guarantee that the test is valid time.sleep(1) @@ -1723,9 +1708,10 @@ def test_schema_cache_error_observation(defaultenv): # assert exitCode == 1 output = postgrest.read_stdout(nlines=9) - assert ( + assert any( "Failed to load the schema cache using db-schemas=public and db-extra-search-path=x" - in output[7] + in line + for line in output )