Commit 15b732f5 authored by Alfredo Di Napoli's avatar Alfredo Di Napoli

ws: remove calls to recvMalloc

Remove the calls to `recvMalloc` in favour of using the (patched) `recv`
from the original nanomsg library, which shouldn't segfault anymore. The
reason for using `recv` are a few, but mostly the fact that `recv` can
allocated arbitrary-long payloads data (up to the 1MB limit) without an
hardcoded limit like `recvMalloc` was imposing. Furthermore, `recv` does
resource cleanup for us via `c_nn_freemsg`, whereas `recvMalloc` is not
thread/exception safe. Consider the implementation:

```
recvMalloc :: Receiver a => Socket a -> Int -> IO ByteString
recvMalloc (Socket t sid) numBytes = do
  ptr <- mallocBytes numBytes
  -- receive by blocking the thread
  len <- c_nn_recv sid ptr (#const NN_MSG) 0 -- (#const NN_DONTWAIT)
  str <- C.packCStringLen (castPtr ptr, fromIntegral len)
  free ptr
  return str
```

If any exception (synchronous or asynchronous) strikes _before_ the call
to `free`, we would be leaking C memory.
parent 7c0d6ba0
Pipeline #6569 failed with stages
in 12 minutes and 40 seconds
......@@ -48,7 +48,7 @@ simpleServer = do
_ <- bind s ceBind
putText "[simpleServer] receiving"
forever $ do
mr <- recvMalloc s 1024
mr <- recv s
C.putStrLn mr
-- case mr of
-- Nothing -> pure ()
......
......@@ -165,10 +165,11 @@ source-repository-package
location: https://github.com/robstewart57/rdf4h.git
tag: 4fd2edf30c141600ffad6d730cc4c1c08a6dbce4
-- FIXME(adn) Compat-shim while we wait for upstream to catch-up
source-repository-package
type: git
location: https://github.com/garganscript/nanomsg-haskell
tag: 23be4130804d86979eaee5caffe323a1c7f2b0d6
location: https://github.com/adinapoli/nanomsg-haskell
tag: f54fe61f06685c5af95ddff782e139d8d4e26186
-- source-repository-package
-- type: git
......
......@@ -27,7 +27,7 @@ import Gargantext.Core.AsyncUpdates.CentralExchange.Types
import Gargantext.Core.AsyncUpdates.Constants (ceBind, ceConnect, dispatcherConnect)
import Gargantext.Prelude
import Gargantext.System.Logging (LogLevel(DEBUG), withLogger, logMsg)
import Nanomsg (Pull(..), Push(..), bind, connect, recvMalloc, send, withSocket)
import Nanomsg (Pull(..), Push(..), bind, connect, recv, send, withSocket)
{-
......@@ -58,7 +58,7 @@ gServer = do
void $ Async.concurrently (worker s_dispatcher tChan) $ do
forever $ do
-- putText "[central_exchange] receiving"
r <- recvMalloc s 1024
r <- recv s
-- C.putStrLn $ "[central_exchange] " <> r
atomically $ TChan.writeTChan tChan r
where
......
......@@ -36,7 +36,7 @@ import Gargantext.Core.AsyncUpdates.Constants as AUConstants
import Gargantext.Core.AsyncUpdates.Dispatcher.Types
import Gargantext.Prelude
import Gargantext.System.Logging (LogLevel(DEBUG), withLogger, logMsg)
import Nanomsg (Pull(..), bind, recvMalloc, withSocket)
import Nanomsg (Pull(..), bind, recv, withSocket)
import Network.WebSockets qualified as WS
import Servant.Job.Types (JobStatus(_job_id))
import StmContainers.Set qualified as SSet
......@@ -95,7 +95,7 @@ dispatcherListener subscriptions = do
void $ Async.concurrently (Async.replicateConcurrently 5 $ worker tChan throttleTChan) $ do
forever $ do
-- putText "[dispatcher_listener] receiving"
r <- recvMalloc s 1024
r <- recv s
-- C.putStrLn $ "[dispatcher_listener] " <> r
atomically $ TChan.writeTChan tChan r
where
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment