Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@ All notable changes to this project will be documented in this file. From versio
- Add config `client-error-verbosity` to customize error verbosity by @taimoorzaeem in #4088, #3980, #3824
- Add `Vary` header to responses by @develop7 in #4609
- Add config `db-timezone-enabled` for optional querying of timezones by @taimoorzaeem in #4751
- Use SO_REUSEPORT on platforms supporting it by @mkleczek in #4703 #4694

### Changed

- All responses now include a `Vary` header by @develop7 in #4609
- Log error when `db-schemas` config contains schema `pg_catalog` or `information_schema` by @taimoorzaeem in #4359
+ Now fails at startup. Prior to this, it failed with `PGRST205` on requests related to these schemas.

### Fixed
- Shutdown should wait for in flight requests by @mkleczek in #4702

## [14.7] - 2026-03-20

### Fixed
Expand Down
47 changes: 47 additions & 0 deletions nix/overlays/haskell-packages.nix
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,53 @@ let
hasql-pool = lib.dontCheck prev.hasql-pool_1_0_1;
hasql-transaction = lib.dontCheck prev.hasql-transaction_1_1_0_1;
postgresql-binary = lib.dontCheck (lib.doJailbreak prev.postgresql-binary_0_13_1_3);

http2 =
prev.callHackageDirect
{
pkg = "http2";
ver = "5.4.0";
sha256 = "sha256-PeEWVd61bQ8G7LvfLeXklzXqNJFaAjE2ecRMWJZESPE=";
}
{ };

http-semantics =
prev.callHackageDirect
{
pkg = "http-semantics";
ver = "0.4.0";
sha256 = "sha256-rh0z51EKvsu5rQd5n2z3fSRjjEObouNZSBPO9NFYOF0=";
}
{ };

network-run =
prev.callHackageDirect
{
pkg = "network-run";
ver = "0.5.0";
sha256 = "sha256-vbXh+CzxDsGApjqHxCYf/ijpZtUCApFbkcF5gyN0THU=";
}
{ };

time-manager =
prev.callHackageDirect
{
pkg = "time-manager";
ver = "0.2.4";
sha256 = "sha256-sAt/331YLQ2IU3z90aKYSq1nxoazv87irsuJp7ZG3pw=";
}
{ };

warp =
lib.dontCheck (prev.callCabal2nixWithOptions "warp"
(super.fetchFromGitHub {
owner = "mkleczek";
repo = "wai";
rev = "7ca66f023ccaf2e3862ad97392f1f11afea3b6ff";
#sha256 = "sha256-AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=";
sha256 = "sha256-Z/1yikmlDZhjv4LhewjRvW7g5s8KZrHztFRnefEDu7Y=";
}) "--subpath=warp"
{ });
};
in
{
Expand Down
16 changes: 8 additions & 8 deletions src/PostgREST/Admin.hs
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,20 @@ import qualified PostgREST.AppState as AppState
import qualified Network.Socket as NS
import Protolude

runAdmin :: AppState -> Maybe NS.Socket -> NS.Socket -> Warp.Settings -> IO ()
runAdmin appState maybeAdminSocket socketREST settings = do
runAdmin :: AppState -> Maybe NS.Socket -> IO (Maybe NS.Socket) -> Warp.Settings -> IO ()
runAdmin appState maybeAdminSocket getSocketREST settings = do
whenJust maybeAdminSocket $ \adminSocket -> do
address <- resolveSocketToAddress adminSocket
observer $ AdminStartObs address
void . forkIO $ Warp.runSettingsSocket settings adminSocket adminApp
where
adminApp = admin appState socketREST
adminApp = admin appState getSocketREST
observer = AppState.getObserver appState

-- | PostgREST admin application
admin :: AppState.AppState -> NS.Socket -> Wai.Application
admin appState socketREST req respond = do
isMainAppReachable <- isRight <$> reachMainApp socketREST
admin :: AppState.AppState -> IO (Maybe NS.Socket) -> Wai.Application
admin appState getSocketREST req respond = do
isMainAppReachable <- getSocketREST >>= maybe (pure False) (fmap isRight . reachMainApp)
isLoaded <- AppState.isLoaded appState
isPending <- AppState.isPending appState

Expand All @@ -44,8 +44,8 @@ admin appState socketREST req respond = do
respond $ Wai.responseLBS (if isMainAppReachable then HTTP.status200 else HTTP.status500) [] mempty
["ready"] ->
let
status | not isMainAppReachable = HTTP.status500
| isPending = HTTP.status503
status | isPending = HTTP.status503
| not isMainAppReachable = HTTP.status500
| isLoaded = HTTP.status200
| otherwise = HTTP.status500
in
Expand Down
97 changes: 52 additions & 45 deletions src/PostgREST/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Some of its functionality includes:
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE ViewPatterns #-}
{-# LANGUAGE TypeApplications #-}
module PostgREST.App
( postgrest
, run
Expand All @@ -22,7 +23,10 @@ import GHC.IO.Exception (IOErrorType (..))
import System.IO.Error (ioeGetErrorType)

import Control.Monad.Except (liftEither)
import Control.Monad.Extra (whenJust)
import Data.Either.Combinators (mapLeft, whenLeft)
import Data.IORef (atomicWriteIORef, newIORef,
readIORef)
import Data.Maybe (fromJust)
import Data.String (IsString (..))
import Network.Wai.Handler.Warp (defaultSettings, setHost,
Expand Down Expand Up @@ -62,11 +66,11 @@ import PostgREST.Version (docsVersion, prettyVersion)

import qualified Data.ByteString.Char8 as BS
import qualified Data.List as L
import Data.Streaming.Network (bindPortTCP,
bindRandomPortTCP)
import Data.Streaming.Network (HostPreference,
bindPortGenEx)
import qualified Data.Text as T
import qualified Network.HTTP.Types as HTTP
import qualified Network.HTTP.Types.Header as HTTP (hVary)
import qualified Network.HTTP.Types.Header as HTTP
import qualified Network.Socket as NS
import PostgREST.Unix (createAndBindDomainSocket)
import Protolude hiding (Handler)
Expand All @@ -77,21 +81,35 @@ run :: AppState -> IO ()
run appState = do
conf@AppConfig{..} <- AppState.getConfig appState

AppState.schemaCacheLoader appState -- Loads the initial SchemaCache
(mainSocket, adminSocket) <- initSockets conf
mainSocketRef <- newIORef Nothing
adminSocket <- initAdminServerSocket conf

Unix.installSignalHandlers observer (AppState.getMainThreadId appState) (AppState.schemaCacheLoader appState) (AppState.readInDbConfig False appState)
let closeSockets = do
whenJust adminSocket NS.close
readIORef mainSocketRef >>= foldMap NS.close
Unix.installSignalHandlers observer closeSockets (AppState.schemaCacheLoader appState) (AppState.readInDbConfig False appState)

Admin.runAdmin appState adminSocket (readIORef mainSocketRef) (serverSettings conf)

Listener.runListener appState

Admin.runAdmin appState adminSocket mainSocket (serverSettings conf)
-- Kick off and wait for the initial SchemaCache load before creating the
-- main API socket.
AppState.schemaCacheLoader appState
AppState.waitForSchemaCacheLoaded appState

mainSocket <- initServerSocket conf
atomicWriteIORef mainSocketRef $ Just mainSocket

let app = postgrest configLogLevel appState (AppState.schemaCacheLoader appState)

do
address <- resolveSocketToAddress mainSocket
observer $ AppServerAddressObs address
address <- resolveSocketToAddress mainSocket
observer $ AppServerAddressObs address

-- Hardcoding maximum graceful shutdown timeout (arbitrary set to 5 seconds)
-- This is unfortunate but necessary becase graceful shutdowns don't work with HTTP keep-alive
-- causing Warp to handle requests on already opened connections even if the listen socket is closed
-- See: https://github.com/yesodweb/wai/issues/853
Warp.runSettingsSocket (serverSettings conf & setOnException onWarpException) mainSocket app
where
observer = AppState.getObserver appState
Expand Down Expand Up @@ -248,38 +266,27 @@ addRetryHint delay response = do
isServiceUnavailable :: Wai.Response -> Bool
isServiceUnavailable response = Wai.responseStatus response == HTTP.status503

type AppSockets = (NS.Socket, Maybe NS.Socket)

initSockets :: AppConfig -> IO AppSockets
initSockets AppConfig{..} = do
let
cfg'usp = configServerUnixSocket
cfg'uspm = configServerUnixSocketMode
cfg'host = configServerHost
cfg'port = configServerPort
cfg'adminHost = configAdminServerHost
cfg'adminPort = configAdminServerPort

sock <- case cfg'usp of
-- I'm not using `streaming-commons`' bindPath function here because it's not defined for Windows,
-- but we need to have runtime error if we try to use it in Windows, not compile time error
Just path -> createAndBindDomainSocket path cfg'uspm
Nothing -> do
(_, sock) <-
if cfg'port /= 0
then do
sock <- bindPortTCP cfg'port (fromString $ T.unpack cfg'host)
pure (cfg'port, sock)
else do
-- explicitly bind to a random port, returning bound port number
(num, sock) <- bindRandomPortTCP (fromString $ T.unpack cfg'host)
pure (num, sock)
pure sock

adminSock <- case cfg'adminPort of
Just adminPort -> do
adminSock <- bindPortTCP adminPort (fromString $ T.unpack cfg'adminHost)
pure $ Just adminSock
Nothing -> pure Nothing

pure (sock, adminSock)
initServerSocket :: AppConfig -> IO NS.Socket
initServerSocket AppConfig{..} = case configServerUnixSocket of
-- I'm not using `streaming-commons`' bindPath function here because it's not defined for Windows,
-- but we need to have runtime error if we try to use it in Windows, not compile time error
Just path -> createAndBindDomainSocket path configServerUnixSocketMode
Nothing ->
bindPortTCPWithReusePort configServerPort (fromString $ T.unpack configServerHost)

initAdminServerSocket :: AppConfig -> IO (Maybe NS.Socket)
initAdminServerSocket AppConfig{..} =
traverse (`bindPortTCPWithReusePort` adminHost) configAdminServerPort
where
adminHost = fromString $ T.unpack configAdminServerHost

bindPortTCPWithReusePort :: Int -> HostPreference -> IO NS.Socket
bindPortTCPWithReusePort port hostPreference = do
-- Some unix variants can expose ReusePort but reject it at runtime.
-- Fall back to binding without ReusePort when that happens.
try @SomeException (bindPortGenEx reusePortOpts NS.Stream port hostPreference)
>>= either (const $ bindPortGenEx [] NS.Stream port hostPreference) pure
>>= listenSocket
where
reusePortOpts = [(NS.ReusePort, 1)]
listenSocket sock = NS.listen sock (max 2048 NS.maxListenQueue) $> sock
4 changes: 4 additions & 0 deletions src/PostgREST/AppState.hs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ module PostgREST.AppState
, getObserver
, isLoaded
, isPending
, waitForSchemaCacheLoaded
) where

import qualified Data.ByteString.Char8 as BS
Expand Down Expand Up @@ -385,6 +386,9 @@ markSchemaCacheLoaded = void . (`tryPutMVar` ()) . getSCStatusMVar . stateSCache
isSchemaCacheLoaded :: AppState -> IO Bool
isSchemaCacheLoaded = fmap not . isEmptyMVar . getSCStatusMVar . stateSCacheStatus

waitForSchemaCacheLoaded :: AppState -> IO ()
waitForSchemaCacheLoaded = void . readMVar . getSCStatusMVar . stateSCacheStatus

-- | Reads the in-db config and reads the config file again
-- | We don't retry reading the in-db config after it fails immediately, because it could have user errors. We just report the error and continue.
readInDbConfig :: Bool -> AppState -> IO ()
Expand Down
5 changes: 2 additions & 3 deletions src/PostgREST/Unix.hs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ import System.Directory (removeFile)
import System.IO.Error (isDoesNotExistError)

-- | Set signal handlers, only for systems with signals
installSignalHandlers :: Observation.ObservationHandler -> ThreadId -> IO () -> IO () -> IO ()
installSignalHandlers :: Observation.ObservationHandler -> IO () -> IO () -> IO () -> IO ()
#ifndef mingw32_HOST_OS
installSignalHandlers observer tid usr1 usr2 = do
let interrupt = throwTo tid UserInterrupt
installSignalHandlers observer interrupt usr1 usr2 = do
install Signals.sigINT $ observer (Observation.TerminationUnixSignalObs "SIGINT") >> interrupt
install Signals.sigTERM $ observer (Observation.TerminationUnixSignalObs "SIGTERM") >> interrupt
install Signals.sigUSR1 usr1
Expand Down
3 changes: 2 additions & 1 deletion test/io/postgrest.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def run(
admin_port=None,
host=None,
wait_for_readiness=True,
wait_max_seconds=1,
wait_max_seconds=3,
no_pool_connection_available=False,
no_startup_stdout=True,
):
Expand Down Expand Up @@ -188,6 +188,7 @@ def wait_until_exit(postgrest, timeout=1):
def wait_until_status_code(url, max_seconds, status_code):
"Wait for the given HTTP endpoint to return a status code"
session = requests_unixsocket.Session()
response = None

for _ in range(max_seconds * 10):
try:
Expand Down
48 changes: 17 additions & 31 deletions test/io/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import signal
import time
import pytest
import requests

from config import CONFIGSDIR, FIXTURES, SECRET
from util import Thread, jwtauthheader, parse_server_timings_header
Expand Down Expand Up @@ -105,7 +106,6 @@ def sleep():
t.join()


@pytest.mark.xfail(reason="Graceful shutdown is currently failing", strict=True)
def test_graceful_shutdown_waits_for_in_flight_request(defaultenv):
"SIGTERM should allow in-flight requests to finish before exiting"

Expand Down Expand Up @@ -152,7 +152,6 @@ def test_random_port_bound(defaultenv):
assert True # liveness check is done by run(), so we just need to check that it doesn't fail


@pytest.mark.xfail(reason="PostgREST should not start on a used port", strict=True)
def test_so_reuseport_zero_downtime_handover(defaultenv):
"A second PostgREST instance should take over on the same main/admin ports without request failures."

Expand Down Expand Up @@ -1279,14 +1278,21 @@ def test_log_postgrest_host_and_port(host, defaultenv):
with run(
env=defaultenv, host=host, port=port, no_startup_stdout=False
) as postgrest:
output = postgrest.read_stdout(nlines=10)
output = postgrest.read_stdout(nlines=20)

if is_unix:
re.match(r'API server listening on "/tmp/.*\.sock"', output[2])
assert any(
"API server listening on" in line and "postgrest.sock" in line
for line in output
)
elif is_ipv6(host):
assert f"API server listening on [{host}]:{port}" in output[2]
assert any(
f"API server listening on [{host}]:{port}" in line for line in output
)
else: # IPv4
assert f"API server listening on {host}:{port}" in output[2]
assert any(
f"API server listening on {host}:{port}" in line for line in output
)


def test_succeed_w_role_having_superuser_settings(defaultenv):
Expand Down Expand Up @@ -1605,27 +1611,6 @@ def test_pgrst_log_503_client_error_to_stderr(defaultenv):
assert any(log_message in line for line in output)


def test_log_error_when_empty_schema_cache_on_startup_to_stderr(defaultenv):
"Should log the 503 error message when there is an empty schema cache on startup"

env = {
**defaultenv,
"PGRST_INTERNAL_SCHEMA_CACHE_QUERY_SLEEP": "300",
}

with run(env=env, wait_for_readiness=False) as postgrest:
postgrest.wait_until_scache_starts_loading()

response = postgrest.session.get("/projects")
assert response.status_code == 503

output_start = postgrest.read_stdout(nlines=10)

log_err_message = '{"code":"PGRST002","details":null,"hint":null,"message":"Could not query the database for the schema cache. Retrying."}'

assert any(log_err_message in line for line in output_start)


def test_no_double_schema_cache_reload_on_empty_schema(defaultenv):
"Should only load the schema cache once on a 503 error when there's an empty schema cache on startup"

Expand All @@ -1637,8 +1622,8 @@ def test_no_double_schema_cache_reload_on_empty_schema(defaultenv):
with run(env=env, port=freeport(), wait_for_readiness=False) as postgrest:
postgrest.wait_until_scache_starts_loading()

response = postgrest.session.get("/projects")
assert response.status_code == 503
with pytest.raises(requests.ConnectionError):
postgrest.session.get("/projects")

# Should wait enough time to load the schema cache twice to guarantee that the test is valid
time.sleep(1)
Expand Down Expand Up @@ -1723,9 +1708,10 @@ def test_schema_cache_error_observation(defaultenv):
# assert exitCode == 1

output = postgrest.read_stdout(nlines=9)
assert (
assert any(
"Failed to load the schema cache using db-schemas=public and db-extra-search-path=x"
in output[7]
in line
for line in output
)


Expand Down