Initial commit

parents
dist-newstyle/
\ No newline at end of file
# Revision history for haskell-trottle
## 0.1.0.0 -- YYYY-mm-dd
* First version. Released on an unsuspecting world.
This diff is collapsed.
cabal-version: 3.0
-- The cabal-version field refers to the version of the .cabal specification,
-- and can be different from the cabal-install (the tool) version and the
-- Cabal (the library) version you are using. As such, the Cabal (the library)
-- version used must be equal or greater than the version stated in this field.
-- Starting from the specification version 2.2, the cabal-version field must be
-- the first thing in the cabal file.
-- Initial package description 'haskell-trottle' generated by
-- 'cabal init'. For further documentation, see:
-- http://haskell.org/cabal/users-guide/
--
-- The name of the package.
name: haskell-trottle
-- The package version.
-- See the Haskell package versioning policy (PVP) for standards
-- guiding when and how versions should be incremented.
-- https://pvp.haskell.org
-- PVP summary: +-+------- breaking API changes
-- | | +----- non-breaking API additions
-- | | | +--- code changes with no API change
version: 0.1.0.0
-- A short (one-line) description of the package.
-- synopsis:
-- A longer description of the package.
-- description:
-- The license under which the package is released.
license: AGPL-3.0-or-later
-- The file containing the license text.
license-file: LICENSE
-- The package author(s).
author: Przemysław Kamiński
-- An email address to which users can send suggestions, bug reports, and patches.
maintainer: pk@intrepidus.pl
-- A copyright notice.
-- copyright:
category: Concurrency
build-type: Simple
-- Extra doc files to be distributed with the package, such as a CHANGELOG or a README.
extra-doc-files: CHANGELOG.md
-- Extra source files to be distributed with the package, such as examples, or a tutorial module.
-- extra-source-files:
common warnings
ghc-options: -Wall
library
-- Import common warning flags.
import: warnings
-- Modules exported by the library.
exposed-modules: Control.Concurrent.Throttle
-- Modules included in this library but not exported.
-- other-modules:
-- LANGUAGE extensions used by modules in this package.
default-extensions: ImportQualifiedPost
, OverloadedStrings
, ScopedTypeVariables
-- Other library packages from which modules are imported.
build-depends: base ^>=4.17.2.0
, async >= 2.2 && < 2.3
, containers >= 0.6 && < 0.8
, stm >= 2.5 && < 2.6
, time >= 1.14 && < 2
-- Directories containing source files.
hs-source-dirs: src
-- Base language which the package is written in.
default-language: Haskell2010
test-suite haskell-trottle-test
-- Import common warning flags.
import: warnings
-- Base language which the package is written in.
default-language: Haskell2010
-- Modules included in this executable, other than Main.
-- other-modules:
-- LANGUAGE extensions used by modules in this package.
default-extensions: ImportQualifiedPost
, OverloadedStrings
, ScopedTypeVariables
-- The interface type and version of the test suite.
type: exitcode-stdio-1.0
-- Directories containing source files.
hs-source-dirs: test
-- The entrypoint to the test suite.
main-is: Main.hs
-- Test dependencies.
build-depends:
base ^>=4.17.2.0
, haskell-trottle
, async >= 2.2 && < 2.3
, containers >= 0.6 && < 0.8
, stm >= 2.5 && < 2.6
, tasty >= 1.5 && < 1.6
, tasty-hunit >= 0.10 && < 0.11
, time >= 1.14 && < 2
{-|
Module : Control.Concurrent.Throttle
Description : Throttling async mechanism
Copyright : (c) CNRS, 2024-Present
License : AGPL + CECILL v3
Maintainer : team@gargantext.org
Stability : experimental
Portability : POSIX
-}
{-# LANGUAGE ScopedTypeVariables #-}
module Control.Concurrent.Throttle
( throttle )
where
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async qualified as Async
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TChan qualified as TChan
import Control.Concurrent.STM.TVar qualified as TVar
import Control.Monad (forever)
import Data.Map.Strict qualified as Map
import Data.Maybe (isNothing)
import Data.Time.Clock.POSIX (getPOSIXTime)
-- TODO Add a ThrottleHash typeclass which converts 'a' to 'id'?
{-| Throttling with given interval. Here, throttling means: perform
action only as frequently as allowed and other calls are DROPPED. This
is in contrast to things like Conduit throttling, where actions are
just SLOWED DOWN. We use this for asynchronous notifications and, if
messages are the same, we can just drop them safely. Our input is the
provided 'TChan.TChan'. This function should be spawned as a thread.
-}
throttle :: (Ord id, Eq id, Show id) => Int -> TChan.TChan (id, a) -> (a -> IO ()) -> IO ()
throttle delay tchan action = do
smap <- TVar.newTVarIO Map.empty :: IO (TVar.TVar (Map.Map id (a, Int)))
Async.withAsync (mapCleaner smap) $ \_ -> forever $ do
(msgId, msg) <- atomically $ TChan.readTChan tchan
now <- unixTime
atomically $ TVar.modifyTVar smap (Map.insert msgId (msg, now))
where
-- | This thread just clears outdated map elements at regular intervals
mapCleaner smap = forever $ do
-- https://stackoverflow.com/questions/42843882/how-do-you-get-a-millisecond-precision-unix-timestamp-in-haskell
now <- unixTime
m <- TVar.readTVarIO smap
-- let (_needToWait, canRun) = Map.partition (\(_, t) -> now - t < delay) m
let canRun = Map.filter (\(_, t) -> now - t >= delay) m
putStrLn $ "[mapCleaner] m " <> show (Map.mapWithKey (\k (_, t) -> (k, now - t)) m)
putStrLn $ "[mapCleaner] canRun " <> show (Map.keys canRun)
mapM_ (\(msg, _) -> action msg) canRun
-- OK so this is a bit tricky. STM guarantees atomic read above
-- and 'smap' could have been modified while we ran 'mapM_'. The
-- only way to modify 'smap' is to add new items.
-- * an item in 'canRun' was added: so we called the throttled
-- function and it's been added in the meantime into the queue.
-- In this case we have to compare the time again with 'now'
-- * an item not in 'canRun' was added
atomically $
TVar.modifyTVar smap $
Map.filterWithKey (\k (_, t) -> (isNothing $ Map.lookup k canRun) || (now - t > 0 && now - t < delay))
threadDelay (delay `div` 2)
-- | Get Unix timestamp, with millisecond resolution
unixTime :: IO Int
unixTime = (round . (* 1000000)) <$> getPOSIXTime
module Main
( main )
where
import Control.Concurrent (threadDelay)
import Control.Concurrent.Async qualified as Async
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TChan qualified as TChan
import Control.Concurrent.STM.TVar qualified as TVar
import Control.Concurrent.Throttle (throttle)
import Data.Time.Clock.POSIX (getPOSIXTime)
import Test.Tasty
import Test.Tasty.HUnit
main :: IO ()
main = defaultMain tests
tests :: TestTree
tests = testGroup "throttle tests" unitTests
unitTests :: [TestTree]
unitTests =
[ testCase "can call a function once" $ do
let val = 100
withTestContext $ \(tvar, delay, tchan, _a) -> do
start <- unixTime
atomically $ TChan.writeTChan tchan (1, val)
threadDelay (delay * 3)
actual <- TVar.readTVarIO tvar
length actual @?= 1
let [(val', n)] = actual
val @?= val'
assertBool "not enough delay" $ n - start >= delay
, testCase "doesn't throttle independent messages" $ do
withTestContext $ \(tvar, delay, tchan, _a) -> do
atomically $ TChan.writeTChan tchan (1, 100)
-- this one has a different id so should be called as well
atomically $ TChan.writeTChan tchan (2, 101)
threadDelay (delay * 3)
actual <- TVar.readTVarIO tvar
length actual @?= 2
, testCase "can handle simple throttling" $ do
withTestContext $ \(tvar, delay, tchan, _a) -> do
-- this one should be discarded
atomically $ TChan.writeTChan tchan (1, 100)
-- this one should be called (the same id)
atomically $ TChan.writeTChan tchan (1, 101)
threadDelay (delay * 3)
actual <- TVar.readTVarIO tvar
length actual @?= 1
let [(val', _)] = actual
val' @?= 101
, testCase "can handle throttling for more massive messages" $ do
withTestContext $ \(tvar, delay, tchan, _a) -> do
mapM_ (\val ->
atomically $ TChan.writeTChan tchan (1, val)) [100..200]
threadDelay (delay * 3)
-- Only last one should be called
actual <- TVar.readTVarIO tvar
length actual @?= 1
let [(val', _)] = actual
val' @?= 200
]
withTestContext :: ((TVar.TVar [(Int, Int)], Int, TChan.TChan (Int, Int), Async.Async ()) -> IO b)
-> IO b
withTestContext cb = do
tvar <- TVar.newTVarIO []
let delay = 200
tchan <- TChan.newTChanIO :: IO (TChan.TChan (Int, Int))
Async.withAsync (throttle delay tchan (action tvar)) $ \a ->
cb (tvar, delay, tchan, a)
action :: TVar.TVar [(Int, Int)] -> Int -> IO ()
action tvar v = do
now <- unixTime
atomically $ TVar.modifyTVar tvar (\l -> l ++ [(v, now)])
unixTime :: IO Int
unixTime = (round . (* 1000000)) <$> getPOSIXTime
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment