[worker] improved notifications about worker tasks

parent ff255d63
Pipeline #6915 passed with stages
in 12 minutes and 40 seconds
...@@ -2,7 +2,7 @@ module Gargantext.AsyncTasks ( ...@@ -2,7 +2,7 @@ module Gargantext.AsyncTasks (
Task Task
, TaskList , TaskList
, Storage(..) , Storage(..)
, readAsyncTasks -- , readAsyncTasks
, insert , insert
, finish , finish
, focus , focus
...@@ -50,32 +50,21 @@ instance JSON.WriteForeign Storage where ...@@ -50,32 +50,21 @@ instance JSON.WriteForeign Storage where
arr :: Array (Tuple String TaskList) arr :: Array (Tuple String TaskList)
arr = (\(Tuple k v) -> Tuple (show k) v) <$> (Map.toUnfoldable s) arr = (\(Tuple k v) -> Tuple (show k) v) <$> (Map.toUnfoldable s)
readAsyncTasks :: Effect Storage -- readAsyncTasks :: Effect Storage
readAsyncTasks = R2.loadLocalStorageState' R2.asyncTasksKey mempty -- readAsyncTasks = R2.loadLocalStorageState' R2.asyncTasksKey mempty
-- readAsyncTasks = R2.getls >>= WSS.getItem R2.asyncTasksKey >>= handleMaybe
-- where
-- handleMaybe (Just val) = handleEither (parse val)
-- handleMaybe Nothing = pure empty
-- -- either parsing or decoding could fail, hence two errors -- writeAsyncTasks :: Storage -> Effect Unit
-- handleEither (Left err) = err *> pure empty -- writeAsyncTasks = R2.setLocalStorageState R2.asyncTasksKey
-- handleEither (Right ss) = pure ss
-- parse s = GU.mapLeft (log2 "Error parsing serialised sessions:") (JSON.readJSON s) -- modifyAsyncTasks :: (Storage -> Storage) -> Effect Unit
-- modifyAsyncTasks f = readAsyncTasks >>= writeAsyncTasks <<< f
writeAsyncTasks :: Storage -> Effect Unit
writeAsyncTasks = R2.setLocalStorageState R2.asyncTasksKey
-- writeAsyncTasks storage = R2.getls >>= WSS.setItem R2.asyncTasksKey storage
modifyAsyncTasks :: (Storage -> Storage) -> Effect Unit
modifyAsyncTasks f = readAsyncTasks >>= writeAsyncTasks <<< f
modifyTaskBox :: (Storage -> Storage) -> T.Box Storage -> Effect Unit modifyTaskBox :: (Storage -> Storage) -> T.Box Storage -> Effect Unit
modifyTaskBox f box = do modifyTaskBox f box = T.modify_ f box
s <- T.read box -- s <- T.read box
let newS = f s -- let newS = f s
T.write_ newS box -- T.write_ newS box
modifyAsyncTasks (const newS) -- modifyAsyncTasks (const newS)
getTasks :: GT.NodeID -> Storage -> TaskList getTasks :: GT.NodeID -> Storage -> TaskList
getTasks nodeId (Storage storage) = fromMaybe [] $ Map.lookup nodeId storage getTasks nodeId (Storage storage) = fromMaybe [] $ Map.lookup nodeId storage
......
...@@ -93,9 +93,14 @@ mainAppCpt = here.component "main" cpt where ...@@ -93,9 +93,14 @@ mainAppCpt = here.component "main" cpt where
R.useEffectOnce' $ do R.useEffectOnce' $ do
void $ Sessions.load boxes.sessions void $ Sessions.load boxes.sessions
-- tasks <- GAT.useTasks boxes.reloadRoot boxes.reloadForest -- tasks <- GAT.useTasks boxes.reloadRoot boxes.reloadForest
R.useEffectOnce' $ do
tasksStorage <- GAT.readAsyncTasks -- NOTE Task storage is not needed with new-style notifications
T.write_ tasksStorage boxes.tasks -- and async workers. The tasks (with their pgoress) should be
-- pushed as soon as the worker computes the task's chunk
-- R.useEffectOnce' $ do
-- tasksStorage <- GAT.readAsyncTasks
-- T.write_ tasksStorage boxes.tasks
-- R.useEffectOnce' $ do -- R.useEffectOnce' $ do
-- T.write (Just tasksReductor) tasks -- T.write (Just tasksReductor) tasks
R.useEffectOnce' $ do R.useEffectOnce' $ do
......
...@@ -243,10 +243,13 @@ nodeSpanCpt = here.component "nodeSpan" cpt ...@@ -243,10 +243,13 @@ nodeSpanCpt = here.component "nodeSpan" cpt
here.log2 "[nodeSpan] update tree" props.id here.log2 "[nodeSpan] update tree" props.id
-- The modal window has some problems closing when we refresh too early. This is a HACK -- The modal window has some problems closing when we refresh too early. This is a HACK
void $ setTimeout 400 $ T2.reload reload void $ setTimeout 400 $ T2.reload reload
NT.NUpdateWorkerProgress ji jl -> do NT.NUpdateWorkerProgress ji atl -> do
-- TODO Fire this only once! -- TODO Fire this only once!
here.log3 "[nodeSpan] update job progress" ji jl -- here.log3 "[nodeSpan] update job progress" ji atl
GAT.insert props.id ji boxes.tasks if GT.asyncTaskLogIsFinished atl
then pure unit
else
GAT.insert props.id ji boxes.tasks
_ -> pure unit _ -> pure unit
ws <- T.read boxes.wsNotification ws <- T.read boxes.wsNotification
let action = NT.InsertCallback (NT.UpdateTree props.id) ("node-span-" <> show props.id) cb let action = NT.InsertCallback (NT.UpdateTree props.id) ("node-span-" <> show props.id) cb
......
...@@ -8,7 +8,7 @@ import Effect.Class (liftEffect) ...@@ -8,7 +8,7 @@ import Effect.Class (liftEffect)
import Effect.Timer (clearInterval, setInterval) import Effect.Timer (clearInterval, setInterval)
import Gargantext.Components.App.Store as AppStore import Gargantext.Components.App.Store as AppStore
import Gargantext.Config.REST (AffRESTError) import Gargantext.Config.REST (AffRESTError)
import Gargantext.Config.Utils (handleErrorInAsyncProgress, handleRESTError) import Gargantext.Config.Utils (handleRESTError)
import Gargantext.Prelude import Gargantext.Prelude
import Gargantext.Routes (SessionRoute(..)) import Gargantext.Routes (SessionRoute(..))
import Gargantext.Sessions (Session, get) import Gargantext.Sessions (Session, get)
......
...@@ -5,7 +5,7 @@ import Gargantext.Prelude ...@@ -5,7 +5,7 @@ import Gargantext.Prelude
import Data.Array as A import Data.Array as A
import Data.Either (Either(..)) import Data.Either (Either(..))
import Data.Foldable (foldl) import Data.Foldable (foldl)
import Data.Maybe (fromMaybe) import Data.Maybe (fromMaybe, Maybe(..))
import Effect (Effect) import Effect (Effect)
import Effect.Aff (Aff) import Effect.Aff (Aff)
import Effect.Class (liftEffect) import Effect.Class (liftEffect)
...@@ -29,28 +29,31 @@ handleRESTError herePrefix errors (Left error) _ = liftEffect $ do ...@@ -29,28 +29,31 @@ handleRESTError herePrefix errors (Left error) _ = liftEffect $ do
-- here.warn2 "[handleTaskError] RESTError" error -- here.warn2 "[handleTaskError] RESTError" error
handleRESTError _ _ (Right task) handler = handler task handleRESTError _ _ (Right task) handler = handler task
handleErrorInAsyncProgress :: T.Box (Array FrontendError) -- handleErrorInAsyncProgress :: T.Box (Array FrontendError)
-> AsyncProgress -- -> AsyncProgress
-> Effect Unit -- -> Effect Unit
handleErrorInAsyncProgress errors ap@(AsyncProgress { status: IsFailure }) = do -- handleErrorInAsyncProgress errors ap@(AsyncProgress { status: IsFailure }) = do
T.modify_ (A.cons $ FStringError { error: concatErrors ap }) errors -- T.modify_ (A.cons $ FStringError { error: concatErrors ap }) errors
handleErrorInAsyncProgress errors ap@(AsyncProgress { log, status: IsFinished }) = do -- handleErrorInAsyncProgress errors ap@(AsyncProgress { log, status: IsFinished }) = do
if countFailed > 0 then -- if countFailed > 0 then
T.modify_ (A.cons $ FStringError { error: concatErrors ap }) errors -- T.modify_ (A.cons $ FStringError { error: concatErrors ap }) errors
else -- else
pure unit -- pure unit
where -- where
countFailed = foldl (+) 0 $ (\(AsyncTaskLog { failed }) -> failed) <$> log -- countFailed = foldl (+) 0 $ (\(AsyncTaskLog { failed }) -> failed) <$> log
handleErrorInAsyncProgress _ _ = pure unit -- handleErrorInAsyncProgress _ _ = pure unit
concatErrors :: AsyncProgress -> String -- concatErrors :: AsyncProgress -> String
concatErrors (AsyncProgress { error, log }) = foldl eventsErrorMessage (fromMaybe "" error) log -- concatErrors (AsyncProgress { error, log }) = foldl eventsErrorMessage (fromMaybe "" error) log
where -- where
eventsErrorMessage acc atl = asyncTaskLogEventsErrorMessage atl <> "\n" <> acc -- eventsErrorMessage acc atl = asyncTaskLogEventsErrorMessage atl <> "\n" <> acc
handleErrorInAsyncTaskLog :: T.Box (Array FrontendError) handleErrorInAsyncTaskLog :: T.Box (Array FrontendError)
-> AsyncTaskLog -> AsyncTaskLog
-> Effect Unit -> Effect Unit
handleErrorInAsyncTaskLog errors atl = do handleErrorInAsyncTaskLog errors atl =
T.modify_ (A.cons $ FStringError { error: asyncTaskLogEventsErrorMessage atl }) errors case asyncTaskLogEventsErrorMessage atl of
Nothing -> pure unit
Just error ->
T.modify_ (A.cons $ FStringError { error }) errors
...@@ -61,46 +61,10 @@ asyncProgressCpt = R2.hereComponent here "asyncProgress" hCpt where ...@@ -61,46 +61,10 @@ asyncProgressCpt = R2.hereComponent here "asyncProgress" hCpt where
interval <- T.useBox 1000 interval <- T.useBox 1000
-- Methods -- Methods
-- let
-- -- TODO Manage somehow to get the whole job status sent here via
-- -- websockets, then we can remove the 'Maybe'
-- fetchJobProgress :: Effect Unit
-- fetchJobProgress = launchAff_ do
-- let rdata = (RX.pick props :: Record QueryProgressData)
-- eAsyncProgress <- queryProgress rdata
-- -- liftEffect $ here.log2 "[progress] received asyncProgress" eAsyncProgress
-- -- exponential backoff in case of errors
-- -- liftEffect $ do
-- -- case eAsyncProgress of
-- -- Left _ -> T.modify_ (_ * 2) interval
-- -- Right _ -> T.write_ 1000 interval
-- -- interval' <- T.read interval
-- -- resetInterval intervalIdRef (Just interval') exec
-- -- Handle removal of task in case of 500 error (e.g. server
-- -- was restarted and task id is not found anymore).
-- -- Error logging will be done below, in handleRESTError
-- case eAsyncProgress of
-- Right _ -> pure unit
-- Left err -> do
-- liftEffect $ do
-- resetInterval intervalIdRef Nothing (pure unit)
-- GAT.finish props.nodeId props.asyncTask tasks
-- handleRESTError hp errors eAsyncProgress onProgress
-- -- TODO Ideally we should use this function
-- -- onProgress jobProgress = do
-- -- launchAff_ $ onProgress jobProgress
let let
-- onProgress :: AsyncProgress -> Aff Unit
-- onProgress value@(GT.AsyncProgress { status }) = liftEffect do
onProgress :: GT.AsyncTaskLog -> Aff Unit onProgress :: GT.AsyncTaskLog -> Aff Unit
onProgress atl@(GT.AsyncTaskLog log) = liftEffect do onProgress atl@(GT.AsyncTaskLog log) = liftEffect $ do
T.write_ (min 100.0 $ GT.asyncTaskLogPercent atl) progressBox T.write_ (min 100.0 $ GT.asyncTaskLogPercent atl) progressBox
...@@ -145,29 +109,6 @@ asyncProgressCpt = R2.hereComponent here "asyncProgress" hCpt where ...@@ -145,29 +109,6 @@ asyncProgressCpt = R2.hereComponent here "asyncProgress" hCpt where
let action = NT.InsertCallback (NT.UpdateWorkerProgress props.asyncTask) ("task-" <> show message_id) cb let action = NT.InsertCallback (NT.UpdateWorkerProgress props.asyncTask) ("task-" <> show message_id) cb
Notifications.performAction ws action Notifications.performAction ws action
-- ws <- T.read wsNotification
-- New-style jobs
-- let action = NT.InsertCallback (NT.UpdateWorkerProgress $ GT.WorkerTask { message_id: taskId }) ("worker-job-" <> show taskId) cb
-- Notifications.performAction ws action
-- Old-style jobs (remove in the future)
-- let action = NT.InsertCallback (NT.UpdateJobProgress taskId) ("job-" <> taskId) cb
-- Notifications.performAction ws action
-- fetchJobProgress
-- Hooks
-- useFirstEffect' do
-- resetInterval intervalIdRef (Just 1000) exec
-- intervalId <- setInterval interval' $ exec unit
-- R.setRef intervalIdRef $ Just intervalId
-- TODO Current backend job implementation is that it cannot, by
-- itself, notify us when a job finished. Hence, we are forced to
-- poll for job still. However, we will keep canceling the timer
-- unless there is no progress report for some time.
-- useFirstEffect' $ do
-- resetInterval intervalIdRef (Just defaultJobPollInterval) fetchJobProgress
-- Render -- Render
pure $ pure $
......
...@@ -830,13 +830,17 @@ derive instance Generic AsyncTaskLog _ ...@@ -830,13 +830,17 @@ derive instance Generic AsyncTaskLog _
derive instance Newtype AsyncTaskLog _ derive instance Newtype AsyncTaskLog _
derive newtype instance JSON.ReadForeign AsyncTaskLog derive newtype instance JSON.ReadForeign AsyncTaskLog
asyncTaskLogEventsErrorMessage :: AsyncTaskLog -> String asyncTaskLogEventsErrorMessage :: AsyncTaskLog -> Maybe String
asyncTaskLogEventsErrorMessage (AsyncTaskLog { events }) = asyncTaskLogEventsErrorMessage (AsyncTaskLog { events }) =
foldl eventErrorMessage' "" events foldl eventErrorMessage' Nothing events
where where
eventErrorMessage' acc ae = (case asyncEventErrorMessage ae of eventErrorMessage' acc ae =
Nothing -> "" case asyncEventErrorMessage ae of
Just e' -> e' <> "\n") <> acc Nothing -> acc
Just e' ->
case acc of
Nothing -> Just e'
Just acc' -> Just $ e' <> "\n" <> acc'
asyncTaskLogPercent :: AsyncTaskLog -> Number asyncTaskLogPercent :: AsyncTaskLog -> Number
asyncTaskLogPercent (AsyncTaskLog { failed, remaining, succeeded }) = 100.0 * nom / denom asyncTaskLogPercent (AsyncTaskLog { failed, remaining, succeeded }) = 100.0 * nom / denom
where where
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment