diff --git a/hasql-queue.cabal b/hasql-queue.cabal index db09027..fd1f279 100644 --- a/hasql-queue.cabal +++ b/hasql-queue.cabal @@ -1,13 +1,13 @@ cabal-version: 1.12 --- This file has been generated from package.yaml by hpack version 0.31.2. +-- This file has been generated from package.yaml by hpack version 0.34.4. -- -- see: https://github.com/sol/hpack -- --- hash: 956ae93525f9dafcc0c9c8149cd2bbc8cfcfe4e63310adec92ce40f995e4cbf4 +-- hash: 30a78bb71c0fb6470ad0d6b6788b23f19801ab253d1c65e008a48e329e01b914 name: hasql-queue -version: 1.2.0.1 +version: 1.3.0.0 synopsis: A PostgreSQL backed queue description: A PostgreSQL backed queue. Please see README.md category: Web @@ -18,7 +18,8 @@ maintainer: jonathangfischoff@gmail.com copyright: 2020 Jonathan Fischoff license: BSD3 license-file: LICENSE -tested-with: GHC ==8.8.1 +tested-with: + GHC ==8.8.1 build-type: Simple extra-source-files: README.md @@ -42,7 +43,16 @@ library Paths_hasql_queue hs-source-dirs: src - default-extensions: OverloadedStrings LambdaCase RecordWildCards TupleSections GeneralizedNewtypeDeriving QuasiQuotes ScopedTypeVariables TypeApplications AllowAmbiguousTypes + default-extensions: + OverloadedStrings + LambdaCase + RecordWildCards + TupleSections + GeneralizedNewtypeDeriving + QuasiQuotes + ScopedTypeVariables + TypeApplications + AllowAmbiguousTypes ghc-options: -Wall -Wno-unused-do-bind -Wno-unused-foralls build-depends: aeson @@ -67,7 +77,16 @@ executable benchmark Paths_hasql_queue hs-source-dirs: benchmarks - default-extensions: OverloadedStrings LambdaCase RecordWildCards TupleSections GeneralizedNewtypeDeriving QuasiQuotes ScopedTypeVariables TypeApplications AllowAmbiguousTypes + default-extensions: + OverloadedStrings + LambdaCase + RecordWildCards + TupleSections + GeneralizedNewtypeDeriving + QuasiQuotes + ScopedTypeVariables + TypeApplications + AllowAmbiguousTypes ghc-options: -Wall -Wno-unused-do-bind -Wno-unused-foralls -O2 -threaded -rtsopts -with-rtsopts=-N build-depends: aeson @@ -98,7 +117,16 @@ executable hasql-queue-tmp-db Paths_hasql_queue hs-source-dirs: hasql-queue-tmp-db - default-extensions: OverloadedStrings LambdaCase RecordWildCards TupleSections GeneralizedNewtypeDeriving QuasiQuotes ScopedTypeVariables TypeApplications AllowAmbiguousTypes + default-extensions: + OverloadedStrings + LambdaCase + RecordWildCards + TupleSections + GeneralizedNewtypeDeriving + QuasiQuotes + ScopedTypeVariables + TypeApplications + AllowAmbiguousTypes ghc-options: -Wall -Wno-unused-do-bind -Wno-unused-foralls -O2 -threaded -rtsopts -with-rtsopts=-N -g2 build-depends: aeson @@ -137,7 +165,16 @@ test-suite unit-tests Paths_hasql_queue hs-source-dirs: test - default-extensions: OverloadedStrings LambdaCase RecordWildCards TupleSections GeneralizedNewtypeDeriving QuasiQuotes ScopedTypeVariables TypeApplications AllowAmbiguousTypes + default-extensions: + OverloadedStrings + LambdaCase + RecordWildCards + TupleSections + GeneralizedNewtypeDeriving + QuasiQuotes + ScopedTypeVariables + TypeApplications + AllowAmbiguousTypes ghc-options: -Wall -Wno-unused-do-bind -Wno-unused-foralls -O2 -threaded -rtsopts -with-rtsopts=-N build-depends: aeson diff --git a/package.yaml b/package.yaml index eb36226..54c8d22 100644 --- a/package.yaml +++ b/package.yaml @@ -1,5 +1,5 @@ name: hasql-queue -version: '1.2.0.2' +version: '1.3.0.0' synopsis: A PostgreSQL backed queue description: A PostgreSQL backed queue. Please see README.md category: Web diff --git a/src/Hasql/Queue/High/ExactlyOnce.hs b/src/Hasql/Queue/High/ExactlyOnce.hs index 42eb999..d6f0390 100644 --- a/src/Hasql/Queue/High/ExactlyOnce.hs +++ b/src/Hasql/Queue/High/ExactlyOnce.hs @@ -58,30 +58,13 @@ dequeue valueDecoder count | count <= 0 = pure [] | otherwise = do let multipleQuery = [here| - DELETE FROM payloads - WHERE id in - ( SELECT p1.id - FROM payloads AS p1 - WHERE p1.state='enqueued' - ORDER BY p1.modified_at ASC - FOR UPDATE SKIP LOCKED - LIMIT $1 - ) - RETURNING value + SELECT value FROM dequeue_payload($1) |] + multipleEncoder = E.param $ E.nonNullable $ fromIntegral >$< E.int4 singleQuery = [here| - DELETE FROM payloads - WHERE id = - ( SELECT p1.id - FROM payloads AS p1 - WHERE p1.state='enqueued' - ORDER BY p1.modified_at ASC - FOR UPDATE SKIP LOCKED - LIMIT 1 - ) - RETURNING value + SELECT value FROM dequeue_payload(1) |] singleEncoder = mempty diff --git a/src/Hasql/Queue/Internal.hs b/src/Hasql/Queue/Internal.hs index e5c8106..19229fd 100644 --- a/src/Hasql/Queue/Internal.hs +++ b/src/Hasql/Queue/Internal.hs @@ -62,10 +62,6 @@ newtype PayloadId = PayloadId { unPayloadId :: Int64 } data Payload a = Payload { pId :: PayloadId , pState :: State - -- TODO do I need this? - , pAttempts :: Int - , pModifiedAt :: Int - -- TODO rename. I don't need this either. , pValue :: a } deriving (Show, Eq) @@ -75,8 +71,6 @@ payloadDecoder thePayloadDecoder = Payload <$> payloadIdRow <*> D.column (D.nonNullable stateDecoder) - <*> D.column (D.nonNullable $ fromIntegral <$> D.int4) - <*> D.column (D.nonNullable $ fromIntegral <$> D.int4) <*> D.column (D.nonNullable thePayloadDecoder) payloadIdEncoder :: E.Value PayloadId @@ -92,9 +86,7 @@ payloadIdRow = D.column (D.nonNullable payloadIdDecoder) enqueuePayload :: E.Value a -> [a] -> Session [PayloadId] enqueuePayload theEncoder values = do let theQuery = [here| - INSERT INTO payloads (attempts, value) - SELECT 0, * FROM unnest($1) - RETURNING id + SELECT id FROM enqueue_payload($1) |] encoder = E.param $ E.nonNullable $ E.foldableArray $ E.nonNullable theEncoder decoder = D.rowList (D.column (D.nonNullable payloadIdDecoder)) @@ -105,30 +97,15 @@ enqueuePayload theEncoder values = do dequeuePayload :: D.Value a -> Int -> Session [Payload a] dequeuePayload valueDecoder count = do let multipleQuery = [here| - DELETE FROM payloads - WHERE id in - ( SELECT p1.id - FROM payloads AS p1 - WHERE p1.state='enqueued' - ORDER BY p1.modified_at ASC - FOR UPDATE SKIP LOCKED - LIMIT $1 - ) - RETURNING id, state, attempts, modified_at, value + SELECT id, state, value + FROM dequeue_payload($1) |] + multipleEncoder = E.param $ E.nonNullable $ fromIntegral >$< E.int4 singleQuery = [here| - DELETE FROM payloads - WHERE id = - ( SELECT p1.id - FROM payloads AS p1 - WHERE p1.state='enqueued' - ORDER BY p1.modified_at ASC - FOR UPDATE SKIP LOCKED - LIMIT 1 - ) - RETURNING id, state, attempts, modified_at, value + SELECT id, state, value + FROM dequeue_payload(1) |] singleEncoder = mempty @@ -144,7 +121,7 @@ dequeuePayload valueDecoder count = do getPayload :: D.Value a -> PayloadId -> Session (Maybe (Payload a)) getPayload decoder payloadId = do let theQuery = [here| - SELECT id, state, attempts, modified_at, value + SELECT id, state, value FROM payloads WHERE id = $1 |] @@ -168,10 +145,7 @@ getCount = do incrementAttempts :: Int -> [PayloadId] -> Session () incrementAttempts retryCount pids = do let theQuery = [here| - UPDATE payloads - SET state=CASE WHEN attempts >= $1 THEN 'failed' :: state_t ELSE 'enqueued' END - , attempts=attempts+1 - WHERE id = ANY($2) + SELECT increment_payload_attempts($1, $2) |] encoder = (fst >$< E.param (E.nonNullable E.int4)) <> (snd >$< E.param (E.nonNullable $ E.foldableArray $ E.nonNullable payloadIdEncoder)) diff --git a/src/Hasql/Queue/Migrate.hs b/src/Hasql/Queue/Migrate.hs index 821fba8..d560b59 100644 --- a/src/Hasql/Queue/Migrate.hs +++ b/src/Hasql/Queue/Migrate.hs @@ -62,6 +62,36 @@ migrationQueryString valueType = [i| CREATE INDEX IF NOT EXISTS active_modified_at_idx ON payloads USING btree (modified_at) WHERE (state = 'enqueued'); + CREATE OR REPLACE FUNCTION dequeue_payload(limit_ INT) RETURNS SETOF payloads AS + $$ + WITH available AS + ( SELECT p1.id + FROM payloads AS p1 + WHERE p1.state='enqueued' + ORDER BY p1.modified_at ASC + FOR UPDATE SKIP LOCKED + LIMIT limit_ + ) + DELETE FROM payloads + USING available + WHERE payloads.id = available.id + RETURNING payloads.* + $$ LANGUAGE SQL VOLATILE; + + CREATE OR REPLACE FUNCTION increment_payload_attempts(threshold_ INT, ids_ BIGINT[]) RETURNS VOID AS + $$ + UPDATE payloads + SET state=CASE WHEN attempts >= threshold_ THEN 'failed' :: state_t ELSE 'enqueued' END + , attempts=attempts+1 + WHERE id = ANY(ids_) + $$ LANGUAGE SQL VOLATILE; + + CREATE OR REPLACE FUNCTION enqueue_payload(values_ ${valueType}[]) RETURNS SETOF payloads AS + $$ + INSERT INTO payloads (attempts, value) + SELECT 0, * FROM unnest(values_) + RETURNING * + $$ LANGUAGE SQL VOLATILE; |] {-| This function creates a table and enumeration type that is @@ -106,6 +136,37 @@ migrationQueryString valueType = [i| CREATE INDEX IF NOT EXISTS active_modified_at_idx ON payloads USING btree (modified_at, state) WHERE (state = 'enqueued'); + + CREATE OR REPLACE FUNCTION dequeue_payload(limit_ INT) RETURNS SETOF payloads AS + $$ + WITH available AS + ( SELECT p1.id + FROM payloads AS p1 + WHERE p1.state='enqueued' + ORDER BY p1.modified_at ASC + FOR UPDATE SKIP LOCKED + LIMIT limit_ + ) + DELETE FROM payloads + USING available + WHERE payloads.id = available.id + RETURNING payloads.* + $$ LANGUAGE SQL VOLATILE; + + CREATE OR REPLACE FUNCTION increment_payload_attempts(threshold_ INT, ids_ BIGINT[]) RETURNS VOID AS + $$ + UPDATE payloads + SET state=CASE WHEN attempts >= threshold_ THEN 'failed' :: state_t ELSE 'enqueued' END + , attempts=attempts+1 + WHERE id = ANY(ids_) + $$ LANGUAGE SQL VOLATILE; + + CREATE OR REPLACE FUNCTION enqueue_payload(values_ ${valueType}[]) RETURNS SETOF payloads AS + $$ + INSERT INTO payloads (attempts, value) + SELECT 0, * FROM unnest(values_) + RETURNING * + $$ LANGUAGE SQL VOLATILE; @ The @VALUE_TYPE@ needs to passed in through the second argument. @@ -123,6 +184,9 @@ Drop everything created by 'migrate' teardown :: Connection -> IO () teardown conn = do let theQuery = [i| + DROP FUNCTION IF EXISTS enqueue_payload; + DROP FUNCTION IF EXISTS dequeue_payload; + DROP FUNCTION IF EXISTS increment_payload_attempts; DROP TABLE IF EXISTS payloads; DROP TYPE IF EXISTS state_t; DROP SEQUENCE IF EXISTS modified_index; diff --git a/test/Hasql/Queue/Low/AtLeastOnceSpec.hs b/test/Hasql/Queue/Low/AtLeastOnceSpec.hs index 4c12f3b..33b268a 100644 --- a/test/Hasql/Queue/Low/AtLeastOnceSpec.hs +++ b/test/Hasql/Queue/Low/AtLeastOnceSpec.hs @@ -142,8 +142,8 @@ spec = describe "Hasql.Queue.Low.AtLeastOnce" $ aroundAll withSetup $ describe " let Just decoded = mapM (decode . encode) xs sort decoded `shouldBe` sort expected - it "enqueue returns a PayloadId that cooresponds to the entry it added" $ withConnection $ \conn -> do + it "enqueue returns a PayloadId that corresponds to the entry it added" $ withConnection $ \conn -> do [payloadId] <- I.runThrow (I.enqueuePayload E.int4 [1]) conn Just actual <- getPayload conn D.int4 payloadId - pValue actual `shouldBe` 1 + pId actual `shouldBe` payloadId