Unverified Commit f540feb1 authored by Vaibhav Sagar's avatar Vaibhav Sagar Committed by GitHub

Merge pull request #1145 from jamesdbrock/buffers

Add `buffers` field of the Wire Protocol
parents 167fd366 28dd8276
...@@ -52,7 +52,8 @@ library ...@@ -52,7 +52,8 @@ library
transformers , transformers ,
unordered-containers, unordered-containers,
uuid , uuid ,
zeromq4-haskell zeromq4-haskell ,
parsec
-- Example program -- Example program
executable simple-calc-example executable simple-calc-example
......
...@@ -125,7 +125,7 @@ createReplyHeader parent = do ...@@ -125,7 +125,7 @@ createReplyHeader parent = do
err = error $ "No reply for message " ++ show (mhMsgType parent) err = error $ "No reply for message " ++ show (mhMsgType parent)
return $ MessageHeader (mhIdentifiers parent) (Just parent) (Metadata (HashMap.fromList [])) return $ MessageHeader (mhIdentifiers parent) (Just parent) (Metadata (HashMap.fromList []))
newMessageId (mhSessionId parent) (mhUsername parent) repType newMessageId (mhSessionId parent) (mhUsername parent) repType []
-- | Execute an IPython kernel for a config. Your 'main' action should call this as the last thing -- | Execute an IPython kernel for a config. Your 'main' action should call this as the last thing
......
...@@ -22,14 +22,16 @@ import IHaskell.IPython.Types ...@@ -22,14 +22,16 @@ import IHaskell.IPython.Types
type LByteString = Lazy.ByteString type LByteString = Lazy.ByteString
-- --- External interface ----- | Parse a message from its ByteString components into a Message. -- --- External interface ----- | Parse a message from its ByteString components into a Message.
-- See https://jupyter-client.readthedocs.io/en/stable/messaging.html#the-wire-protocol
parseMessage :: [ByteString] -- ^ The list of identifiers sent with the message. parseMessage :: [ByteString] -- ^ The list of identifiers sent with the message.
-> ByteString -- ^ The header data. -> ByteString -- ^ The header data.
-> ByteString -- ^ The parent header, which is just "{}" if there is no header. -> ByteString -- ^ The parent header, which is just "{}" if there is no header.
-> ByteString -- ^ The metadata map, also "{}" for an empty map. -> ByteString -- ^ The metadata map, also "{}" for an empty map.
-> ByteString -- ^ The message content. -> ByteString -- ^ The message content.
-> [ByteString] -- ^ Extra raw data buffer(s)
-> Message -- ^ A parsed message. -> Message -- ^ A parsed message.
parseMessage idents headerData parentHeader metadata content = parseMessage idents headerData parentHeader metadata content buffers =
let header = parseHeader idents headerData parentHeader metadata let header = parseHeader idents headerData parentHeader metadata buffers
messageType = mhMsgType header messageType = mhMsgType header
messageWithoutHeader = parser messageType $ Lazy.fromStrict content messageWithoutHeader = parser messageType $ Lazy.fromStrict content
in messageWithoutHeader { header = header } in messageWithoutHeader { header = header }
...@@ -39,16 +41,17 @@ parseHeader :: [ByteString] -- ^ The list of identifiers. ...@@ -39,16 +41,17 @@ parseHeader :: [ByteString] -- ^ The list of identifiers.
-> ByteString -- ^ The header data. -> ByteString -- ^ The header data.
-> ByteString -- ^ The parent header, or "{}" for Nothing. -> ByteString -- ^ The parent header, or "{}" for Nothing.
-> ByteString -- ^ The metadata, or "{}" for an empty map. -> ByteString -- ^ The metadata, or "{}" for an empty map.
-> [ByteString] -- ^ Extra raw data buffer(s)
-> MessageHeader -- The resulting message header. -> MessageHeader -- The resulting message header.
parseHeader idents headerData parentHeader metadata = parseHeader idents headerData parentHeader metadata buffers =
MessageHeader idents parentResult metadataMap messageUUID sessionUUID username messageType MessageHeader idents parentResult metadataMap messageUUID sessionUUID username messageType buffers
where where
-- Decode the header data and the parent header data into JSON objects. If the parent header data is -- Decode the header data and the parent header data into JSON objects. If the parent header data is
-- absent, just have Nothing instead. -- absent, just have Nothing instead.
Just result = decode $ Lazy.fromStrict headerData :: Maybe Object Just result = decode $ Lazy.fromStrict headerData :: Maybe Object
parentResult = if parentHeader == "{}" parentResult = if parentHeader == "{}"
then Nothing then Nothing
else Just $ parseHeader idents parentHeader "{}" metadata else Just $ parseHeader idents parentHeader "{}" metadata []
Success (messageType, username, messageUUID, sessionUUID) = flip parse result $ \obj -> do Success (messageType, username, messageUUID, sessionUUID) = flip parse result $ \obj -> do
messType <- obj .: "msg_type" messType <- obj .: "msg_type"
......
...@@ -153,6 +153,7 @@ data MessageHeader = ...@@ -153,6 +153,7 @@ data MessageHeader =
, mhSessionId :: UUID -- ^ A unique session UUID. , mhSessionId :: UUID -- ^ A unique session UUID.
, mhUsername :: Username -- ^ The user who sent this message. , mhUsername :: Username -- ^ The user who sent this message.
, mhMsgType :: MessageType -- ^ The message type. , mhMsgType :: MessageType -- ^ The message type.
, mhBuffers :: [ByteString] -- ^ Extra raw data buffer(s)
} }
deriving (Show, Read) deriving (Show, Read)
......
{-# LANGUAGE OverloadedStrings, DoAndIfThenElse #-} {-# LANGUAGE OverloadedStrings, DoAndIfThenElse, FlexibleContexts #-}
-- | Description : Low-level ZeroMQ communication wrapper. -- | Description : Low-level ZeroMQ communication wrapper.
-- --
...@@ -30,6 +30,7 @@ import Data.Monoid ((<>)) ...@@ -30,6 +30,7 @@ import Data.Monoid ((<>))
import qualified Data.Text.Encoding as Text import qualified Data.Text.Encoding as Text
import System.ZMQ4 as ZMQ4 import System.ZMQ4 as ZMQ4
import Text.Read (readMaybe) import Text.Read (readMaybe)
import Text.Parsec (runParserT, manyTill, anyToken, (<|>), eof, tokenPrim, incSourceColumn)
import IHaskell.IPython.Message.Parser import IHaskell.IPython.Message.Parser
import IHaskell.IPython.Types import IHaskell.IPython.Types
...@@ -268,38 +269,28 @@ checkedIOpub debug channels sock = do ...@@ -268,38 +269,28 @@ checkedIOpub debug channels sock = do
-- | Receive and parse a message from a socket. -- | Receive and parse a message from a socket.
receiveMessage :: Receiver a => Bool -> Socket a -> IO Message receiveMessage :: Receiver a => Bool -> Socket a -> IO Message
receiveMessage debug sock = do receiveMessage debug sock = do
-- Read all identifiers until the identifier/message delimiter. blobs <- receiveMulti sock
idents <- readUntil "<IDS|MSG>" runParserT parseBlobs () "" blobs >>= \r -> case r of
Left parseerr -> fail $ "Malformed Wire Protocol message: " <> show parseerr
-- Ignore the signature for now. Right (idents, headerData, parentHeader, metaData, content, buffers) -> do
void next when debug $ do
putStr "Header: "
headerData <- next Char.putStrLn headerData
parentHeader <- next putStr "Content: "
metadata <- next Char.putStrLn content
content <- next return $ parseMessage idents headerData parentHeader metaData content buffers
when debug $ do
putStr "Header: "
Char.putStrLn headerData
putStr "Content: "
Char.putStrLn content
return $ parseMessage idents headerData parentHeader metadata content
where where
-- Receive the next piece of data from the socket. parseBlobs = do
next = receive sock idents <- manyTill anyToken (satisfy (=="<IDS|MSG>"))
_ <- anyToken <|> fail "No signature"
-- Read data from the socket until we hit an ending string. Return all data as a list, which does headerData <- anyToken <|> fail "No headerData"
-- not include the ending string. parentHeader <- anyToken <|> fail "No parentHeader"
readUntil str = do metaData <- anyToken <|> fail "No metaData"
line <- next content <- anyToken <|> fail "No contents"
if line /= str buffers <- manyTill anyToken eof
then do pure (idents, headerData, parentHeader, metaData, content, buffers)
remaining <- readUntil str satisfy f = tokenPrim Char.unpack (\pos _ _ -> incSourceColumn pos 1)
return $ line : remaining (\c -> if f c then Just c else Nothing)
else return []
-- | Encode a message in the IPython ZeroMQ communication protocol and send it through the provided -- | Encode a message in the IPython ZeroMQ communication protocol and send it through the provided
-- socket. Sign it using HMAC with SHA-256 using the provided key. -- socket. Sign it using HMAC with SHA-256 using the provided key.
...@@ -320,10 +311,18 @@ sendMessage debug hmackey sock msg = do ...@@ -320,10 +311,18 @@ sendMessage debug hmackey sock msg = do
sendPiece parentHeaderStr sendPiece parentHeaderStr
sendPiece metadata sendPiece metadata
-- Conclude transmission with content. -- If there are no mhBuffers, then conclude transmission with content.
sendLast content case mhBuffers hdr of
[] -> sendLast content
_ -> sendPiece content
sendBuffers $ mhBuffers hdr
where where
sendBuffers [] = pure ()
sendBuffers [b] = sendLast b
sendBuffers (b:bs) = sendPiece b >> sendBuffers bs
sendPiece = send sock [SendMore] sendPiece = send sock [SendMore]
sendLast = send sock [] sendLast = send sock []
......
...@@ -221,7 +221,7 @@ createReplyHeader parent = do ...@@ -221,7 +221,7 @@ createReplyHeader parent = do
err = error $ "No reply for message " ++ show (mhMsgType parent) err = error $ "No reply for message " ++ show (mhMsgType parent)
return $ MessageHeader (mhIdentifiers parent) (Just parent) mempty return $ MessageHeader (mhIdentifiers parent) (Just parent) mempty
newMessageId (mhSessionId parent) (mhUsername parent) repType newMessageId (mhSessionId parent) (mhUsername parent) repType []
-- | Compute a reply to a message. -- | Compute a reply to a message.
replyTo :: ZeroMQInterface -> Message -> MessageHeader -> KernelState -> Interpreter (KernelState, Message) replyTo :: ZeroMQInterface -> Message -> MessageHeader -> KernelState -> Interpreter (KernelState, Message)
......
...@@ -94,6 +94,7 @@ getInputLine dir = do ...@@ -94,6 +94,7 @@ getInputLine dir = do
. readMay <$> readFile fpath . readMay <$> readFile fpath
let hdr = MessageHeader (mhIdentifiers parentHdr) (Just parentHdr) mempty let hdr = MessageHeader (mhIdentifiers parentHdr) (Just parentHdr) mempty
uuid (mhSessionId parentHdr) (mhUsername parentHdr) InputRequestMessage uuid (mhSessionId parentHdr) (mhUsername parentHdr) InputRequestMessage
[]
let msg = RequestInput hdr "" let msg = RequestInput hdr ""
writeChan req msg writeChan req msg
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment