[worker] fix an unfortunate coincidence of various async issues

This described in this comment:
#477 (comment 14458)

I repaste, for history:

- job timeout was 30 seconds only and this is a big zip file, so the job timed out in worker
- however, this was recently added https://gitlab.iscpif.fr/gargantext/haskell-gargantext/blame/dev/src/Gargantext/Database/Action/Flow.hs#L490 and the timeout wasn't caught and the worker continued happily
- the job finished normally (most probably)
- the job was restarted, because default strategy for timeouts is to restart a job
- for sending files, we use postgres large objects because it keeps our JSONs small
- when the job finishes, it clears definitely the large object so that we don't leave large, unused blob data
- however, that job was restarted and there was no more a large object to work on
- you got some sql error, but that wasn't the root cause

Solution is:
- don't catch any exception, but be careful and handle `Timeout` or `KillWorkerSafely`
- increase job timeout for file upload
- change timeout strategy for file upload to `TSDelete`, i.e. don't retry that job anymore
parent 7c42b188
Pipeline #7666 failed with stages
in 13 minutes and 34 seconds
...@@ -156,7 +156,7 @@ source-repository-package ...@@ -156,7 +156,7 @@ source-repository-package
source-repository-package source-repository-package
type: git type: git
location: https://gitlab.iscpif.fr/gargantext/haskell-bee location: https://gitlab.iscpif.fr/gargantext/haskell-bee
tag: 4a9c709613554eed0189b486de2126c18797088c tag: eb559a29212ae5bb27dc138f1060494b785e1afb
subdir: haskell-bee/ subdir: haskell-bee/
haskell-bee-pgmq/ haskell-bee-pgmq/
haskell-bee-tests/ haskell-bee-tests/
......
...@@ -647,6 +647,7 @@ library ...@@ -647,6 +647,7 @@ library
, transformers-base ^>= 0.4.6 , transformers-base ^>= 0.4.6
, tree-diff , tree-diff
, tuple ^>= 0.3.0.2 , tuple ^>= 0.3.0.2
, unbounded-delays >= 0.1.1 && < 0.2
, unicode-collation >= 0.1.3.5 , unicode-collation >= 0.1.3.5
, unordered-containers ^>= 0.2.16.0 , unordered-containers ^>= 0.2.16.0
-- needed for Worker / System.Posix.Signals -- needed for Worker / System.Posix.Signals
......
...@@ -15,6 +15,7 @@ module Gargantext.Core.Worker.Jobs where ...@@ -15,6 +15,7 @@ module Gargantext.Core.Worker.Jobs where
import Async.Worker qualified as W import Async.Worker qualified as W
import Async.Worker.Types qualified as WT
import Control.Lens (view) import Control.Lens (view)
import Gargantext.Core.Config (gc_database_config, gc_worker, HasConfig(..), GargConfig, gc_logging) import Gargantext.Core.Config (gc_database_config, gc_worker, HasConfig(..), GargConfig, gc_logging)
import Gargantext.Core.Config.Worker (WorkerSettings(..), WorkerDefinition(..)) import Gargantext.Core.Config.Worker (WorkerSettings(..), WorkerDefinition(..))
...@@ -51,13 +52,19 @@ sendJobWithCfg gcConfig job = do ...@@ -51,13 +52,19 @@ sendJobWithCfg gcConfig job = do
-- | We want to fine-tune job metadata parameters, for each job type -- | We want to fine-tune job metadata parameters, for each job type
updateJobData :: Job -> SendJob -> SendJob updateJobData :: Job -> SendJob -> SendJob
updateJobData (AddCorpusTempFileAsync {}) sj = sj { W.timeout = 3000 } updateJobData (AddCorpusTempFileAsync {}) sj = sj { W.timeout = 3000
, W.toStrat = WT.TSDelete
, W.resendOnKill = False }
updateJobData (AddCorpusWithQuery {}) sj = sj { W.timeout = 3000 } updateJobData (AddCorpusWithQuery {}) sj = sj { W.timeout = 3000 }
updateJobData (AddToAnnuaireWithForm {}) sj = sj { W.timeout = 3000 } updateJobData (AddToAnnuaireWithForm {}) sj = sj { W.timeout = 3000 }
updateJobData (AddWithFile {}) sj = sj { W.timeout = 3000 } updateJobData (AddWithFile {}) sj = sj { W.timeout = 3000
, W.toStrat = WT.TSDelete
, W.resendOnKill = False }
updateJobData (DocumentsFromWriteNodes {}) sj = sj { W.timeout = 3000 } updateJobData (DocumentsFromWriteNodes {}) sj = sj { W.timeout = 3000 }
updateJobData (FrameCalcUpload {}) sj = sj { W.timeout = 3000 } updateJobData (FrameCalcUpload {}) sj = sj { W.timeout = 3000 }
updateJobData (JSONPost {}) sj = sj { W.timeout = 3000 } updateJobData (JSONPost {}) sj = sj { W.timeout = 3000
, W.toStrat = WT.TSDelete
, W.resendOnKill = False }
updateJobData (NgramsPostCharts {}) sj = sj { W.timeout = 3000 } updateJobData (NgramsPostCharts {}) sj = sj { W.timeout = 3000 }
updateJobData (RecomputeGraph {}) sj = sj { W.timeout = 3000 } updateJobData (RecomputeGraph {}) sj = sj { W.timeout = 3000 }
updateJobData (UpdateNode {}) sj = sj { W.timeout = 3000 } updateJobData (UpdateNode {}) sj = sj { W.timeout = 3000 }
......
...@@ -54,9 +54,11 @@ module Gargantext.Database.Action.Flow -- (flowDatabase, ngrams2list) ...@@ -54,9 +54,11 @@ module Gargantext.Database.Action.Flow -- (flowDatabase, ngrams2list)
) )
where where
import Async.Worker qualified as W
import Conduit import Conduit
import Control.Concurrent.Timeout qualified as Timeout
import Control.Exception.Safe qualified as CES
import Control.Lens ( to, view ) import Control.Lens ( to, view )
import Control.Monad.Catch
import Data.Conduit qualified as C import Data.Conduit qualified as C
import Data.Conduit.Internal (zipSources) import Data.Conduit.Internal (zipSources)
import Data.Conduit.List qualified as CL import Data.Conduit.List qualified as CL
...@@ -172,7 +174,7 @@ flowDataText :: forall env err m. ...@@ -172,7 +174,7 @@ flowDataText :: forall env err m.
, HasTreeError err , HasTreeError err
, HasValidationError err , HasValidationError err
, MonadJobStatus m , MonadJobStatus m
, MonadCatch m , CES.MonadCatch m
, HasCentralExchangeNotification env , HasCentralExchangeNotification env
) )
=> User => User
...@@ -207,7 +209,7 @@ flowAnnuaire :: ( IsDBCmd env err m ...@@ -207,7 +209,7 @@ flowAnnuaire :: ( IsDBCmd env err m
, HasTreeError err , HasTreeError err
, HasValidationError err , HasValidationError err
, MonadJobStatus m , MonadJobStatus m
, MonadCatch m , CES.MonadCatch m
, HasCentralExchangeNotification env ) , HasCentralExchangeNotification env )
=> MkCorpusUser => MkCorpusUser
-> TermType Lang -> TermType Lang
...@@ -227,7 +229,7 @@ flowCorpusFile :: ( IsDBCmd env err m ...@@ -227,7 +229,7 @@ flowCorpusFile :: ( IsDBCmd env err m
, HasTreeError err , HasTreeError err
, HasValidationError err , HasValidationError err
, MonadJobStatus m , MonadJobStatus m
, MonadCatch m , CES.MonadCatch m
, HasCentralExchangeNotification env ) , HasCentralExchangeNotification env )
=> MkCorpusUser => MkCorpusUser
-> TermType Lang -> TermType Lang
...@@ -257,7 +259,7 @@ flowCorpus :: ( IsDBCmd env err m ...@@ -257,7 +259,7 @@ flowCorpus :: ( IsDBCmd env err m
, HasValidationError err , HasValidationError err
, FlowCorpus a , FlowCorpus a
, MonadJobStatus m , MonadJobStatus m
, MonadCatch m , CES.MonadCatch m
, HasCentralExchangeNotification env, Show a ) , HasCentralExchangeNotification env, Show a )
=> MkCorpusUser => MkCorpusUser
-> TermType Lang -> TermType Lang
...@@ -279,7 +281,8 @@ flow :: forall env err m a c. ...@@ -279,7 +281,8 @@ flow :: forall env err m a c.
, MkCorpus c , MkCorpus c
, MonadJobStatus m , MonadJobStatus m
, HasCentralExchangeNotification env , HasCentralExchangeNotification env
, MonadCatch m, Show a , CES.MonadCatch m
, Show a
) )
=> Maybe c => Maybe c
-> MkCorpusUser -> MkCorpusUser
...@@ -319,7 +322,8 @@ addDocumentsToHyperCorpus :: ( IsDBCmd env err m ...@@ -319,7 +322,8 @@ addDocumentsToHyperCorpus :: ( IsDBCmd env err m
, FlowCorpus document , FlowCorpus document
, MkCorpus corpus , MkCorpus corpus
, MonadLogger m , MonadLogger m
, MonadCatch m, Show document , CES.MonadCatch m
, Show document
) )
=> Maybe corpus => Maybe corpus
-> TermType Lang -> TermType Lang
...@@ -474,7 +478,7 @@ extractNgramsFromDocument :: ( UniqParameters doc ...@@ -474,7 +478,7 @@ extractNgramsFromDocument :: ( UniqParameters doc
, ExtractNgrams doc , ExtractNgrams doc
, IsDBCmd err env m , IsDBCmd err env m
, MonadLogger m , MonadLogger m
, MonadCatch m , CES.MonadCatch m
) )
=> NLPServerConfig => NLPServerConfig
-> TermType Lang -> TermType Lang
...@@ -487,9 +491,13 @@ extractNgramsFromDocument nlpServer lang doc = ...@@ -487,9 +491,13 @@ extractNgramsFromDocument nlpServer lang doc =
-- will still be added to the corpus and we can try to regen the ngrams at a later stage. -- will still be added to the corpus and we can try to regen the ngrams at a later stage.
UncommittedNgrams . Map.singleton docId <$> UncommittedNgrams . Map.singleton docId <$>
(documentIdWithNgrams (extractNgrams nlpServer $ withLang lang [doc]) (Indexed docId doc) (documentIdWithNgrams (extractNgrams nlpServer $ withLang lang [doc]) (Indexed docId doc)
`catch` \(e :: SomeException) -> do `CES.catches`
$(logLocM) ERROR $ T.pack $ "Document with hash " <> show docId <> " failed ngrams extraction due to an exception: " <> displayException e [ CES.Handler $ \(e :: Timeout.Timeout) -> CES.throw e
pure $ DocumentIdWithNgrams (Indexed docId doc) mempty , CES.Handler $ \(e :: W.KillWorkerSafely) -> CES.throw e
, CES.Handler $ \(e :: CES.SomeException) -> do
$(logLocM) ERROR $ T.pack $ "Document with hash " <> show docId <> " failed ngrams extraction due to an exception: " <> displayException e
pure $ DocumentIdWithNgrams (Indexed docId doc) mempty
]
) )
where where
docId = DocumentHashId $ newUniqIdHash doc docId = DocumentHashId $ newUniqIdHash doc
...@@ -520,7 +528,7 @@ extractNgramsFromDocuments :: forall doc env err m. ...@@ -520,7 +528,7 @@ extractNgramsFromDocuments :: forall doc env err m.
, ExtractNgrams doc , ExtractNgrams doc
, IsDBCmd env err m , IsDBCmd env err m
, MonadLogger m , MonadLogger m
, MonadCatch m , CES.MonadCatch m
) )
=> NLPServerConfig => NLPServerConfig
-> TermType Lang -> TermType Lang
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment