[websockets] add update job progress

parent 53c5217f
......@@ -13,13 +13,12 @@ import Data.Show.Generic (genericShow)
import Data.Traversable (for, traverse)
import Data.Tuple (Tuple(..))
import Effect (Effect)
-- import Effect.AVar as AVar
import Effect.Ref as Ref
import Effect.Var (($=))
import Effect.Var as Var
import FFI.Simple ((.=), (..))
import Foreign as F
import Gargantext.Sessions.Types (Session(..))
import Gargantext.Types as GT
import Gargantext.Utils.Reactix as R2
import Prelude
import Reactix as R
......@@ -37,7 +36,8 @@ type NodeId = Int
type UUID = String
data Topic =
UpdateTree NodeId
UpdateJobProgress GT.AsyncTaskID
| UpdateTree NodeId
derive instance Generic Topic _
instance Eq Topic where eq = genericEq
instance Show Topic where show = genericShow
......@@ -47,11 +47,16 @@ instance JSON.ReadForeign Topic where
readImpl f = do
{ type: type_ } <- JSON.readImpl f :: F.F { type :: String }
case type_ of
"update_job_progress" -> do
{ j_id } <- JSON.readImpl f :: F.F { j_id :: GT.AsyncTaskID }
pure $ UpdateJobProgress j_id
"update_tree" -> do
{ node_id } <- JSON.readImpl f :: F.F { node_id :: NodeId }
pure $ UpdateTree node_id
s -> F.fail $ F.ErrorAtProperty "type" $ F.ForeignError $ "unkown type: " <> s
s -> F.fail $ F.ErrorAtProperty "type" $ F.ForeignError $ "unknown Topic type: " <> s
instance JSON.WriteForeign Topic where
writeImpl (UpdateJobProgress j_id) = JSON.writeImpl { "type": "update_job_progress"
, j_id }
writeImpl (UpdateTree node_id) = JSON.writeImpl { "type": "update_tree"
, node_id }
......@@ -72,21 +77,40 @@ instance JSON.WriteForeign WSRequest where
writeImpl WSDeauthorize = JSON.writeImpl { request: "deauthorize" }
data Message =
-- TODO
-- MJobProgress GT.AsyncProgress
MJobProgress GT.AsyncTaskLog
| MEmpty
derive instance Generic Message _
instance JSON.ReadForeign Message where
readImpl f = do
{ type: type_ } <- JSON.readImpl f :: F.F { type :: String }
case type_ of
"MJobProgress" -> do
-- TODO
-- { job_progress } <- JSON.readImpl f :: F.F { job_progress :: GT.AsyncProgress }
{ job_progress } <- JSON.readImpl f :: F.F { job_progress :: GT.AsyncTaskLog }
pure $ MJobProgress job_progress
"MEmpty" -> do
pure MEmpty
s -> do F.fail $ F.ErrorAtProperty "type" $ F.ForeignError $ "unknown Message type: " <> s
data Notification =
Notification Topic
Notification Topic Message
derive instance Generic Notification _
instance Eq Notification where eq = genericEq
instance JSON.ReadForeign Notification where
readImpl f = do
let str = JSON.read_ f :: Maybe String
case str of
Nothing -> do
{ notification: topic } <- JSON.readImpl f :: F.F { notification :: Topic }
pure $ Notification topic
{ notification } <- JSON.readImpl f :: F.F { notification :: { topic :: Topic, message :: Message } }
pure $ Notification notification.topic notification.message
Just s -> F.fail $ F.ErrorAtProperty "_" $ F.ForeignError $ "unkown string: " <> s
type Callback = Unit -> Effect Unit
type Callback = Message -> Effect Unit
type CallbacksHM = HM.HashMap UUID Callback
......@@ -112,14 +136,14 @@ removeCallback (State state@{ callbacks }) topic uuid =
alterCallbacksHM Nothing = Nothing
alterCallbacksHM (Just hm) = Just $ HM.delete uuid hm
-- | Execute all callbacks for a given Topic
callTopic :: State -> Topic -> Effect Unit
callTopic (State { callbacks }) topic = do
here.log2 "[callTopic] topic" topic
here.log2 "[callTopic] callbacks" (HM.values callbacks)
here.log2 "[callTopic] topicCallbacks" (HM.values topicCallbacks)
-- | Execute all callbacks for a given Notification
callNotification :: State -> Notification -> Effect Unit
callNotification (State { callbacks }) (Notification topic message) = do
-- here.log2 "[callTopic] topic" topic
-- here.log2 "[callTopic] callbacks" (HM.values callbacks)
-- here.log2 "[callTopic] topicCallbacks" (HM.values topicCallbacks)
_ <- for (HM.values topicCallbacks) $ \cb -> do
cb unit
cb message
pure unit
where
topicCallbacks :: CallbacksHM
......@@ -195,15 +219,15 @@ allSubscriptionsWS (WSNotification ws') = do
state <- Ref.read ws'.state
pure $ allSubscriptions state
-- | Actions to be called on the websocket connection
data Action =
InsertCallback Topic UUID Callback
| RemoveCallback Topic UUID
| Call Topic
| Call Notification
performAction :: WSNotification -> Action -> Effect Unit
performAction ws (InsertCallback topic uuid cb) = do
let subscription = WSSubscribe topic
-- WARNING mutable state
alterState ws (\s -> insertCallback s topic uuid cb)
connected <- isConnected ws
if connected
......@@ -215,7 +239,6 @@ performAction ws (InsertCallback topic uuid cb) = do
-- WSNotification $ ws' { state = insertCallback ws'.state topic uuid cb }
performAction ws (RemoveCallback topic uuid) = do
let subscription = WSUnsubscribe topic
-- WARNING mutable state
alterState ws (\s -> removeCallback s topic uuid)
connected <- isConnected ws
if connected
......@@ -225,11 +248,10 @@ performAction ws (RemoveCallback topic uuid) = do
pure unit
-- void $ pure $ (ws' .= "state") (removeCallback ws'.state topic uuid)
-- WSNotification $ ws' { state = removeCallback ws'.state topic uuid }
performAction (WSNotification ws') (Call topic) = do
-- WARNING mutable state
let state = ws' .. "state"
here.log2 "[performAction Call] state" state
callTopic state topic
performAction (WSNotification ws') (Call notification) = do
state <- Ref.read ws'.state
-- here.log2 "[performAction Call] state" state
callNotification state notification
connect :: WSNotification -> String -> (Maybe Session) -> Effect Unit
......@@ -242,7 +264,6 @@ connect ws@(WSNotification ws') url session = do
Ref.write (Just connection) ws'.connection
let onmessage me = do
-- WARNING mutable state
s <- runExceptT $ F.readString (ME.data_ me)
case s of
Left err -> do
......@@ -252,9 +273,9 @@ connect ws@(WSNotification ws') url session = do
case parsed of
Left err -> do
here.log2 "[connect] Can't parse message" err
Right (Notification topic) -> do
Right n@(Notification topic _message) -> do
here.log2 "[connect] notification" topic
performAction ws (Call topic)
performAction ws (Call n)
-- Right parsed' -> do
-- here.log2 "[connect] onmessage, F.readString" parsed'
......
......@@ -13,6 +13,7 @@ import Effect.Class (liftEffect)
import Effect.Timer (IntervalId, clearInterval, setInterval)
import Gargantext.Components.App.Store as AppStore
import Gargantext.Components.Forest.Tree.Node.Tools.ProgressBar (QueryProgressData, queryProgress)
import Gargantext.Components.Notifications as Notifications
import Gargantext.Config.Utils (handleErrorInAsyncProgress, handleRESTError)
import Gargantext.Hooks.FirstEffect (useFirstEffect')
import Gargantext.Prelude
......@@ -40,8 +41,8 @@ asyncProgress = R2.component asyncProgressCpt
asyncProgressCpt :: R.Component AsyncProps
asyncProgressCpt = R2.hereComponent here "asyncProgress" hCpt where
hCpt hp props@{ onFinish } children = do
{ errors } <- AppStore.use
{ errors, wsNotification } <- AppStore.use
-- States
progress /\ progressBox <- R2.useBox' 0.0
intervalIdRef <- R.useRef (Nothing :: Maybe IntervalId)
......@@ -50,23 +51,26 @@ asyncProgressCpt = R2.hereComponent here "asyncProgress" hCpt where
-- Methods
let
exec :: Unit -> Effect Unit
exec _ = launchAff_ do
exec :: Maybe GT.AsyncProgress -> Effect Unit
exec Nothing = launchAff_ do
let rdata = (RX.pick props :: Record QueryProgressData)
eAsyncProgress <- queryProgress rdata
liftEffect $ here.log2 "[progress] received asyncProgress" eAsyncProgress
-- exponential backoff in case of errors
liftEffect $ do
case eAsyncProgress of
Left _ -> T.modify_ (_ * 2) interval
Right _ -> T.write_ 1000 interval
interval' <- T.read interval
resetInterval intervalIdRef (Just interval') exec
-- liftEffect $ do
-- case eAsyncProgress of
-- Left _ -> T.modify_ (_ * 2) interval
-- Right _ -> T.write_ 1000 interval
-- interval' <- T.read interval
-- resetInterval intervalIdRef (Just interval') exec
handleRESTError hp errors eAsyncProgress onProgress
exec (Just jobProgress) = do
launchAff_ $ onProgress jobProgress
onProgress :: AsyncProgress -> Aff Unit
onProgress value = liftEffect do
let GT.AsyncProgress { status } = value
onProgress value@(GT.AsyncProgress { status }) = liftEffect do
T.write_ (min 100.0 $ GT.progressPercent value) progressBox
......@@ -74,7 +78,7 @@ asyncProgressCpt = R2.hereComponent here "asyncProgress" hCpt where
(status == GT.IsKilled) ||
(status == GT.IsFailure)
then do
resetInterval intervalIdRef Nothing exec
-- resetInterval intervalIdRef Nothing exec
-- case R.readRef intervalIdRef of
-- Nothing -> R.nothing
-- Just iid -> clearInterval iid
......@@ -83,9 +87,26 @@ asyncProgressCpt = R2.hereComponent here "asyncProgress" hCpt where
else
R.nothing
useFirstEffect' $ do
let (GT.AsyncTaskWithType { task: GT.AsyncTask { id: taskId } }) = props.asyncTask
let cb msg = do
here.log2 "callback! for job update" taskId
case msg of
Notifications.MJobProgress jobProgress -> do
-- TODO With jobProgress we could avoid polling here
-- exec (Just jobProgress)
exec Nothing
Notifications.MEmpty -> exec Nothing
-- The modal window has some problems closing when we refresh too early. This is a HACK
-- void $ setTimeout 400 $ T2.reload reload
let action = Notifications.InsertCallback (Notifications.UpdateJobProgress taskId) ("task-" <> show taskId) cb
ws <- T.read wsNotification
Notifications.performAction ws action
exec Nothing
-- Hooks
useFirstEffect' do
resetInterval intervalIdRef (Just 1000) exec
-- useFirstEffect' do
-- resetInterval intervalIdRef (Just 1000) exec
-- intervalId <- setInterval interval' $ exec unit
-- R.setRef intervalIdRef $ Just intervalId
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment