1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
{-# LANGUAGE ConstraintKinds, TypeFamilies, ScopedTypeVariables #-}
module Gargantext.Utils.Jobs.Queue where
import Control.Concurrent
import Control.Concurrent.STM
import Control.Exception
import Control.Monad
import Data.Function
import Data.Maybe
import Data.Ord
import Prelude
import System.IO
import Data.List
import qualified Data.Map as Map
import qualified Data.Vector as Vector
type EnumBounded t = (Ord t, Enum t, Bounded t)
data Q a = Q [a] [a] !Int
emptyQ :: Q a
emptyQ = Q [] [] 0
singletonQ :: a -> Q a
singletonQ a = Q [a] [] 1
snocQ :: a -> Q a -> Q a
snocQ a (Q xs ys sz) = Q xs (a:ys) (sz+1)
normalizeQ :: Q a -> Q a
normalizeQ (Q [] ys sz) = Q (reverse ys) [] sz
normalizeQ q = q
deleteQ :: Eq a => a -> Q a -> Q a
deleteQ x (Q xs ys sz) = Q xs' ys' sz'
where (xs_num_x, xs') = go xs (0, [])
(ys_num_x, ys') = go ys (0, [])
sz' = sz - xs_num_x - ys_num_x
go [] (n, bs) = (n, reverse bs)
go (a:as) (n, bs)
| a == x = go as (n+1, bs)
| otherwise = go as (n, a:bs)
popQ :: Q a -> Maybe (a, Q a)
popQ q@(Q as bs sz) = case as of
x:xs -> Just (x, Q xs bs (sz-1))
_ -> case normalizeQ q of
Q (x:xs) ys sz' -> Just (x, Q xs ys (sz'-1))
_ -> Nothing
sizeQ :: Q a -> Int
sizeQ (Q _ _ sz) = sz
peekQ :: Q a -> Maybe a
peekQ (Q _ _ 0) = Nothing
peekQ q = case normalizeQ q of
Q (x:_) _ _ -> Just x
_ -> Nothing
dropQ :: Q a -> Q a
dropQ (Q [] [] _) = Q [] [] 0
dropQ (Q (_x:xs) ys sz) = Q xs ys (sz-1)
dropQ q@(Q [] _ _) = dropQ (normalizeQ q)
-- | A priority is just a number. The greater, the earlier the job will get picked.
type Prio = Int
applyPrios
:: Ord t
=> [(t, Prio)] -> Map.Map t Prio -> Map.Map t Prio
applyPrios changes prios = foldl' (\m (t, p) -> Map.insert t p m) prios changes
-- | A queue with different kinds of values, described by @t@, where each
-- kind can have a higher or lower priority than other kinds, as described
-- by the 'queuePrios' field.
data Queue t a = Queue
{ queueData :: Vector.Vector (TVar (Q a))
, queueIndices :: Map.Map t Int -- indices into queueData
, queuePrios :: Map.Map t Prio
}
-- | Default priorities for the enumeration of job types @t@: everyone at 0.
defaultPrios :: EnumBounded t => Map.Map t Prio
defaultPrios = Map.fromList [ (t, 0) | t <- [minBound..maxBound] ]
-- | Create a new queue that'll apply the given priorities
newQueue :: EnumBounded t => Map.Map t Prio -> IO (Queue t a)
newQueue prios = do
let allTs = [ minBound .. maxBound ]
indices = Map.fromList (zip allTs [0..])
n = Map.size indices
vars <- Vector.replicateM n (newTVarIO emptyQ)
pure $ Queue vars indices prios
-- | Add a new element to the queue, with the given kind.
addQueue :: Ord t => t -> a -> Queue t a -> STM ()
addQueue jobkind a q = case Map.lookup jobkind (queueIndices q) of
Just i -> modifyTVar (queueData q Vector.! i) (snocQ a)
Nothing -> error "addQueue: couldn't find queue for given job kind"
deleteQueue :: (Eq a, Ord t) => t -> a -> Queue t a -> STM ()
deleteQueue jobkind a q = case Map.lookup jobkind (queueIndices q) of
Just i -> modifyTVar (queueData q Vector.! i) (deleteQ a)
Nothing -> error "deleteQueue: queue type not found?!"
-- | Dump the contents of the queue, for debugging purposes.
debugDumpQueue :: (Enum t, Bounded t, Ord t) => Queue t a -> STM [(t, a)]
debugDumpQueue q = mconcat <$> (forM [minBound..maxBound] $ \t -> do
readTVar (queueData q Vector.! (i t)) >>= debugDumpQ t)
where
i t = fromJust $ Map.lookup t (queueIndices q)
debugDumpQ t (Q xs ys _) = pure $ map (\x -> (t, x)) (xs ++ reverse ys)
type Picker a = [(a, STM ())] -> STM (a, STM ())
-- | Figure out the candidates for being popped from the various queues.
-- We always look at highest priority queues first, and will pick between
-- equal priority items of different queues (candidates, elements of the
-- returned lists) by choosing the one that was queued first.
popQueue :: forall a t. Ord t => Picker a -> Queue t a -> IO (Maybe a)
popQueue picker q = atomically $ select prioLevels
where -- TODO: cache this in the 'Queue' data structure?
prioLevels :: [[(t, Prio)]]
prioLevels = groupBy ((==) `on` snd) . sortOn (Down . snd) $
Map.toList (queuePrios q)
select :: [[(t, Prio)]] -> STM (Maybe a)
select [] = pure Nothing
select (level:levels) = do
mres <- selectLevel level
case mres of
Nothing -> select levels
Just res -> pure $ Just res
selectLevel :: [(t, Prio)] -> STM (Maybe a)
selectLevel xs = do
let indices = catMaybes $ map (flip Map.lookup (queueIndices q) . fst) xs
queues = map (queueData q Vector.!) indices
go qvar = readTVar qvar >>= \qu ->
pure (peekQ qu, modifyTVar' qvar dropQ)
mtopItems <- catMaybesFst <$> traverse go queues
case mtopItems of
Nothing -> pure Nothing
Just [] -> pure Nothing
Just topItems -> do
(earliestItem, popItem) <- picker topItems
popItem
pure (Just earliestItem)
catMaybesFst ((Nothing, _b) : xs) = catMaybesFst xs
catMaybesFst ((Just a, b) : xs) = ((a, b) :) <$> catMaybesFst xs
catMaybesFst [] = Just []
-- | A ready-to-use runner that pops the highest priority item off the queue
-- and processes it using the given function.
queueRunner :: Ord t => Picker a -> (a -> IO ()) -> Queue t a -> IO ()
queueRunner picker f q = go
where go = do
mres <- popQueue picker q
case mres of
Just a -> f a `catch` exc
Nothing -> pure ()
threadDelay 5000 -- 5ms
go
exc :: SomeException -> IO ()
exc e = hPutStrLn stderr ("Queue runner exception: " ++ show e)
-- | Create a queue and @n@ runner actions for it, with the given priorities
-- for the runners to apply when picking a new item.
newQueueWithRunners
:: EnumBounded t
=> Int -- ^ number of runners
-> Map.Map t Prio -- ^ priorities
-> Picker a -- ^ how to pick between equal priority items
-> (a -> IO ()) -- ^ what to do with each item
-> IO (Queue t a, [IO ()])
newQueueWithRunners n prios picker f = do
q <- newQueue prios
let runners = replicate n (queueRunner picker f q)
pure (q, runners)