module Control.Distributed.Process.Tests.CH (tests) where


import Network.Transport.Test (TestTransport(..))

import Data.Binary (Binary(..))
import Data.Typeable (Typeable)
import Data.Foldable (forM_)
import Data.Function (fix)
import Data.IORef
  ( readIORef
  , writeIORef
  , newIORef
  )
import Control.Concurrent (forkIO, threadDelay, myThreadId, throwTo, ThreadId, yield)
import Control.Concurrent.MVar
  ( MVar
  , newEmptyMVar
  , putMVar
  , takeMVar
  , readMVar
  )
import Control.Monad (replicateM_, replicateM, forever, void, unless, join)
import Control.Exception (SomeException, throwIO, ErrorCall(..))
import Control.Monad.Catch (try, catch, finally, mask, onException)
import Control.Applicative ((<|>))
import qualified Network.Transport as NT (closeEndPoint, EndPointAddress)
import Control.Distributed.Process hiding
  ( try
  , catch
  , finally
  , mask
  , onException
  )
import Control.Distributed.Process.Internal.Types
  ( LocalNode(localEndPoint)
  , ProcessExitException(..)
  , nullProcessId
  , createUnencodedMessage
  )
import Control.Distributed.Process.Node
import Control.Distributed.Process.Tests.Internal.Utils (pause)
import Control.Distributed.Process.Serializable (Serializable)
import Data.Maybe (isNothing, isJust)
import Test.Tasty (TestTree, testGroup)
import Test.Tasty.HUnit (Assertion, assertBool, assertEqual, testCase)

newtype Ping = Ping ProcessId
  deriving (Typeable, Get Ping
[Ping] -> Put
Ping -> Put
(Ping -> Put) -> Get Ping -> ([Ping] -> Put) -> Binary Ping
forall t. (t -> Put) -> Get t -> ([t] -> Put) -> Binary t
$cput :: Ping -> Put
put :: Ping -> Put
$cget :: Get Ping
get :: Get Ping
$cputList :: [Ping] -> Put
putList :: [Ping] -> Put
Binary, Int -> Ping -> ShowS
[Ping] -> ShowS
Ping -> String
(Int -> Ping -> ShowS)
-> (Ping -> String) -> ([Ping] -> ShowS) -> Show Ping
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Ping -> ShowS
showsPrec :: Int -> Ping -> ShowS
$cshow :: Ping -> String
show :: Ping -> String
$cshowList :: [Ping] -> ShowS
showList :: [Ping] -> ShowS
Show)

newtype Pong = Pong ProcessId
  deriving (Typeable, Get Pong
[Pong] -> Put
Pong -> Put
(Pong -> Put) -> Get Pong -> ([Pong] -> Put) -> Binary Pong
forall t. (t -> Put) -> Get t -> ([t] -> Put) -> Binary t
$cput :: Pong -> Put
put :: Pong -> Put
$cget :: Get Pong
get :: Get Pong
$cputList :: [Pong] -> Put
putList :: [Pong] -> Put
Binary, Int -> Pong -> ShowS
[Pong] -> ShowS
Pong -> String
(Int -> Pong -> ShowS)
-> (Pong -> String) -> ([Pong] -> ShowS) -> Show Pong
forall a.
(Int -> a -> ShowS) -> (a -> String) -> ([a] -> ShowS) -> Show a
$cshowsPrec :: Int -> Pong -> ShowS
showsPrec :: Int -> Pong -> ShowS
$cshow :: Pong -> String
show :: Pong -> String
$cshowList :: [Pong] -> ShowS
showList :: [Pong] -> ShowS
Show)

--------------------------------------------------------------------------------
-- Supporting definitions                                                     --
--------------------------------------------------------------------------------

-- | Like fork, but throw exceptions in the child thread to the parent
forkTry :: IO () -> IO ThreadId
forkTry :: IO () -> IO ThreadId
forkTry IO ()
p = do
  tid <- IO ThreadId
myThreadId
  forkIO $ catch p (\SomeException
e -> ThreadId -> SomeException -> IO ()
forall e. Exception e => ThreadId -> e -> IO ()
throwTo ThreadId
tid (SomeException
e :: SomeException))

-- | The ping server from the paper
ping :: Process ()
ping :: Process ()
ping = do
  Pong partner <- Process Pong
forall a. Serializable a => Process a
expect
  self <- getSelfPid
  send partner (Ping self)
  ping

verifyClient :: String -> MVar Bool -> IO ()
verifyClient :: String -> MVar Bool -> IO ()
verifyClient String
s MVar Bool
b = MVar Bool -> IO Bool
forall a. MVar a -> IO a
takeMVar MVar Bool
b IO Bool -> (Bool -> IO ()) -> IO ()
forall a b. IO a -> (a -> IO b) -> IO b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= HasCallStack => String -> Bool -> IO ()
String -> Bool -> IO ()
assertBool String
s

expectPing :: MVar Bool ->  Process ()
expectPing :: MVar Bool -> Process ()
expectPing MVar Bool
mv = Process Ping
forall a. Serializable a => Process a
expect  Process Ping -> (Ping -> Process ()) -> Process ()
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> (Ping -> IO ()) -> Ping -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MVar Bool -> Bool -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar Bool
mv (Bool -> IO ()) -> (Ping -> Bool) -> Ping -> IO ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. Ping -> Bool
checkPing
  where
    checkPing :: Ping -> Bool
checkPing (Ping ProcessId
_) = Bool
True

-- | Quick and dirty synchronous version of whereisRemoteAsync
whereisRemote :: NodeId -> String -> Process (Maybe ProcessId)
whereisRemote :: NodeId -> String -> Process (Maybe ProcessId)
whereisRemote NodeId
nid String
string = do
  NodeId -> String -> Process ()
whereisRemoteAsync NodeId
nid String
string
  [Match (Maybe ProcessId)] -> Process (Maybe ProcessId)
forall b. [Match b] -> Process b
receiveWait [
      (WhereIsReply -> Process (Maybe ProcessId))
-> Match (Maybe ProcessId)
forall a b. Serializable a => (a -> Process b) -> Match b
match (\(WhereIsReply String
_ Maybe ProcessId
mPid) -> Maybe ProcessId -> Process (Maybe ProcessId)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe ProcessId
mPid)
    ]

verifyWhereIsRemote :: NodeId -> String -> Process ProcessId
verifyWhereIsRemote :: NodeId -> String -> Process ProcessId
verifyWhereIsRemote NodeId
n String
s = NodeId -> String -> Process (Maybe ProcessId)
whereisRemote NodeId
n String
s Process (Maybe ProcessId)
-> (Maybe ProcessId -> Process ProcessId) -> Process ProcessId
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Process ProcessId
-> (ProcessId -> Process ProcessId)
-> Maybe ProcessId
-> Process ProcessId
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (String -> Process ProcessId
forall a b. Serializable a => a -> Process b
die String
"remote name not found") ProcessId -> Process ProcessId
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return

syncBreakConnection :: (NT.EndPointAddress -> NT.EndPointAddress -> IO ()) -> LocalNode -> LocalNode -> IO ()
syncBreakConnection :: (EndPointAddress -> EndPointAddress -> IO ())
-> LocalNode -> LocalNode -> IO ()
syncBreakConnection EndPointAddress -> EndPointAddress -> IO ()
breakConnection LocalNode
nid0 LocalNode
nid1 = do
  m <- IO (MVar ProcessId)
forall a. IO (MVar a)
newEmptyMVar
  _ <- forkProcess nid1 $ getSelfPid >>= liftIO . putMVar m
  runProcess nid0 $ do
    them <- liftIO $ takeMVar m
    pinger <- spawnLocal $ forever $ send them ()
    _ <- monitorNode (localNodeId nid1)
    liftIO $ breakConnection (nodeAddress $ localNodeId nid0)
                             (nodeAddress $ localNodeId nid1)
    NodeMonitorNotification _ _ _ <- expect
    kill pinger "finished"
    return ()

data Add       = Add    ProcessId Double Double deriving (Typeable)
data Divide    = Divide ProcessId Double Double deriving (Typeable)
data DivByZero = DivByZero deriving (Typeable)

instance Binary Add where
  put :: Add -> Put
put (Add ProcessId
pid Double
x Double
y) = ProcessId -> Put
forall t. Binary t => t -> Put
put ProcessId
pid Put -> Put -> Put
forall a b. PutM a -> PutM b -> PutM b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Double -> Put
forall t. Binary t => t -> Put
put Double
x Put -> Put -> Put
forall a b. PutM a -> PutM b -> PutM b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Double -> Put
forall t. Binary t => t -> Put
put Double
y
  get :: Get Add
get = ProcessId -> Double -> Double -> Add
Add (ProcessId -> Double -> Double -> Add)
-> Get ProcessId -> Get (Double -> Double -> Add)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Get ProcessId
forall t. Binary t => Get t
get Get (Double -> Double -> Add) -> Get Double -> Get (Double -> Add)
forall a b. Get (a -> b) -> Get a -> Get b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Get Double
forall t. Binary t => Get t
get Get (Double -> Add) -> Get Double -> Get Add
forall a b. Get (a -> b) -> Get a -> Get b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Get Double
forall t. Binary t => Get t
get

instance Binary Divide where
  put :: Divide -> Put
put (Divide ProcessId
pid Double
x Double
y) = ProcessId -> Put
forall t. Binary t => t -> Put
put ProcessId
pid Put -> Put -> Put
forall a b. PutM a -> PutM b -> PutM b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Double -> Put
forall t. Binary t => t -> Put
put Double
x Put -> Put -> Put
forall a b. PutM a -> PutM b -> PutM b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Double -> Put
forall t. Binary t => t -> Put
put Double
y
  get :: Get Divide
get = ProcessId -> Double -> Double -> Divide
Divide (ProcessId -> Double -> Double -> Divide)
-> Get ProcessId -> Get (Double -> Double -> Divide)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> Get ProcessId
forall t. Binary t => Get t
get Get (Double -> Double -> Divide)
-> Get Double -> Get (Double -> Divide)
forall a b. Get (a -> b) -> Get a -> Get b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Get Double
forall t. Binary t => Get t
get Get (Double -> Divide) -> Get Double -> Get Divide
forall a b. Get (a -> b) -> Get a -> Get b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> Get Double
forall t. Binary t => Get t
get

instance Binary DivByZero where
  put :: DivByZero -> Put
put DivByZero
DivByZero = () -> Put
forall a. a -> PutM a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
  get :: Get DivByZero
get = DivByZero -> Get DivByZero
forall a. a -> Get a
forall (m :: * -> *) a. Monad m => a -> m a
return DivByZero
DivByZero

-- The math server from the paper
math :: Process ()
math :: Process ()
math = do
  [Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait
    [ (Add -> Process ()) -> Match ()
forall a b. Serializable a => (a -> Process b) -> Match b
match (\(Add ProcessId
pid Double
x Double
y) -> ProcessId -> Double -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
send ProcessId
pid (Double
x Double -> Double -> Double
forall a. Num a => a -> a -> a
+ Double
y))
    , (Divide -> Bool) -> (Divide -> Process ()) -> Match ()
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(Divide ProcessId
_   Double
_ Double
y) -> Double
y Double -> Double -> Bool
forall a. Eq a => a -> a -> Bool
/= Double
0)
              (\(Divide ProcessId
pid Double
x Double
y) -> ProcessId -> Double -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
send ProcessId
pid (Double
x Double -> Double -> Double
forall a. Fractional a => a -> a -> a
/ Double
y))
    , (Divide -> Process ()) -> Match ()
forall a b. Serializable a => (a -> Process b) -> Match b
match (\(Divide ProcessId
pid Double
_ Double
_) -> ProcessId -> DivByZero -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
send ProcessId
pid DivByZero
DivByZero)
    ]
  Process ()
math

-- | Monitor or link to a remote node
monitorOrLink :: Bool            -- ^ 'True' for monitor, 'False' for link
              -> ProcessId       -- ^ Process to monitor/link to
              -> Maybe (MVar ()) -- ^ MVar to signal on once the monitor has been set up
              -> Process (Maybe MonitorRef)
monitorOrLink :: Bool -> ProcessId -> Maybe (MVar ()) -> Process (Maybe MonitorRef)
monitorOrLink Bool
mOrL ProcessId
pid Maybe (MVar ())
mSignal = do
  result <- if Bool
mOrL then MonitorRef -> Maybe MonitorRef
forall a. a -> Maybe a
Just (MonitorRef -> Maybe MonitorRef)
-> Process MonitorRef -> Process (Maybe MonitorRef)
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ProcessId -> Process MonitorRef
monitor ProcessId
pid
                    else ProcessId -> Process ()
link ProcessId
pid Process ()
-> Process (Maybe MonitorRef) -> Process (Maybe MonitorRef)
forall a b. Process a -> Process b -> Process b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Maybe MonitorRef -> Process (Maybe MonitorRef)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe MonitorRef
forall a. Maybe a
Nothing
  -- Monitor is asynchronous, which usually does not matter but if we want a
  --  *specific* signal then it does. Therefore we wait until the MonitorRef is
  -- listed in the ProcessInfo and hope that this means the monitor has been set
  -- up.
  forM_ mSignal $ \MVar ()
signal -> do
    self <- Process ProcessId
getSelfPid
    spawnLocal $ do
      let waitForMOrL = do
            IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay Int
100000
            mpinfo <- ProcessId -> Process (Maybe ProcessInfo)
getProcessInfo ProcessId
pid
            case mpinfo of
              Maybe ProcessInfo
Nothing -> Process ()
waitForMOrL
              Just ProcessInfo
pinfo ->
               if Bool
mOrL then
                 Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (Maybe MonitorRef
result Maybe MonitorRef -> Maybe MonitorRef -> Bool
forall a. Eq a => a -> a -> Bool
== ProcessId -> [(ProcessId, MonitorRef)] -> Maybe MonitorRef
forall a b. Eq a => a -> [(a, b)] -> Maybe b
lookup ProcessId
self (ProcessInfo -> [(ProcessId, MonitorRef)]
infoMonitors ProcessInfo
pinfo)) Process ()
waitForMOrL
               else
                 Bool -> Process () -> Process ()
forall (f :: * -> *). Applicative f => Bool -> f () -> f ()
unless (ProcessId -> [ProcessId] -> Bool
forall a. Eq a => a -> [a] -> Bool
forall (t :: * -> *) a. (Foldable t, Eq a) => a -> t a -> Bool
elem ProcessId
self ([ProcessId] -> Bool) -> [ProcessId] -> Bool
forall a b. (a -> b) -> a -> b
$ ProcessInfo -> [ProcessId]
infoLinks ProcessInfo
pinfo) Process ()
waitForMOrL
      waitForMOrL
      liftIO $ putMVar signal ()
  return result

monitorTestProcess :: ProcessId       -- Process to monitor/link to
                   -> Bool            -- 'True' for monitor, 'False' for link
                   -> Bool            -- Should we unmonitor?
                   -> DiedReason      -- Expected cause of death
                   -> Maybe (MVar ()) -- Signal for 'monitor set up'
                   -> MVar ()         -- Signal for successful termination
                   -> Process ()
monitorTestProcess :: ProcessId
-> Bool
-> Bool
-> DiedReason
-> Maybe (MVar ())
-> MVar ()
-> Process ()
monitorTestProcess ProcessId
theirAddr Bool
mOrL Bool
un DiedReason
reason Maybe (MVar ())
monitorSetup MVar ()
done =
  Process () -> (ProcessLinkException -> Process ()) -> Process ()
forall e a.
(HasCallStack, Exception e) =>
Process a -> (e -> Process a) -> Process a
forall (m :: * -> *) e a.
(MonadCatch m, HasCallStack, Exception e) =>
m a -> (e -> m a) -> m a
catch (do mRef <- Bool -> ProcessId -> Maybe (MVar ()) -> Process (Maybe MonitorRef)
monitorOrLink Bool
mOrL ProcessId
theirAddr Maybe (MVar ())
monitorSetup
            case (un, mRef) of
              (Bool
True, Maybe MonitorRef
Nothing) -> do
                ProcessId -> Process ()
unlink ProcessId
theirAddr
                IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
done ()
              (Bool
True, Just MonitorRef
ref) -> do
                MonitorRef -> Process ()
unmonitor MonitorRef
ref
                IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
done ()
              (Bool
False, Maybe MonitorRef
ref) -> do
                [Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait [
                    (ProcessMonitorNotification -> Process ()) -> Match ()
forall a b. Serializable a => (a -> Process b) -> Match b
match (\(ProcessMonitorNotification MonitorRef
ref' ProcessId
pid DiedReason
reason') -> do
                              IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ do
                                HasCallStack => String -> Bool -> IO ()
String -> Bool -> IO ()
assertBool String
"Bad Monitor Signal"
                                           (MonitorRef -> Maybe MonitorRef
forall a. a -> Maybe a
Just MonitorRef
ref' Maybe MonitorRef -> Maybe MonitorRef -> Bool
forall a. Eq a => a -> a -> Bool
== Maybe MonitorRef
ref Bool -> Bool -> Bool
&& ProcessId
pid ProcessId -> ProcessId -> Bool
forall a. Eq a => a -> a -> Bool
== ProcessId
theirAddr Bool -> Bool -> Bool
&&
                                              Bool
mOrL Bool -> Bool -> Bool
&& DiedReason
reason DiedReason -> DiedReason -> Bool
forall a. Eq a => a -> a -> Bool
== DiedReason
reason')
                                MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
done ())
                  ]
        )
        (\(ProcessLinkException ProcessId
pid DiedReason
reason') -> do
            (IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ HasCallStack => String -> Bool -> IO ()
String -> Bool -> IO ()
assertBool String
"link exception unmatched" (Bool -> IO ()) -> Bool -> IO ()
forall a b. (a -> b) -> a -> b
$
              ProcessId
pid ProcessId -> ProcessId -> Bool
forall a. Eq a => a -> a -> Bool
== ProcessId
theirAddr Bool -> Bool -> Bool
&& Bool -> Bool
not Bool
mOrL Bool -> Bool -> Bool
&& Bool -> Bool
not Bool
un Bool -> Bool -> Bool
&& DiedReason
reason DiedReason -> DiedReason -> Bool
forall a. Eq a => a -> a -> Bool
== DiedReason
reason')
            IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
done ()
        )

--------------------------------------------------------------------------------
-- The tests proper                                                           --
--------------------------------------------------------------------------------

-- | Basic ping test
testPing :: TestTransport -> Assertion
testPing :: TestTransport -> IO ()
testPing TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
..} = do
  serverAddr <- IO (MVar ProcessId)
forall a. IO (MVar a)
newEmptyMVar
  clientDone <- newEmptyMVar

  -- Server
  forkIO $ do
    localNode <- newLocalNode testTransport initRemoteTable
    addr <- forkProcess localNode ping
    putMVar serverAddr addr

  -- Client
  forkIO $ do
    localNode <- newLocalNode testTransport initRemoteTable
    pingServer <- readMVar serverAddr

    let numPings = Int
10000

    runProcess localNode $ do
      pid <- getSelfPid
      replicateM_ numPings $ do
        send pingServer (Pong pid)
        p <- expectTimeout 3000000
        case p of
          Just (Ping ProcessId
_) -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
          Maybe Ping
Nothing       -> String -> Process ()
forall a b. Serializable a => a -> Process b
die String
"Failed to receive Ping"

    putMVar clientDone ()

  takeMVar clientDone

-- | Monitor a process on an unreachable node
testMonitorUnreachable :: TestTransport -> Bool -> Bool -> Assertion
testMonitorUnreachable :: TestTransport -> Bool -> Bool -> IO ()
testMonitorUnreachable TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} Bool
mOrL Bool
un = do
  deadProcess <- IO (MVar ProcessId)
forall a. IO (MVar a)
newEmptyMVar
  done <- newEmptyMVar

  forkIO $ do
    localNode <- newLocalNode testTransport initRemoteTable
    addr <- forkProcess localNode expect
    closeLocalNode localNode
    putMVar deadProcess addr

  forkIO $ do
    localNode <- newLocalNode testTransport initRemoteTable
    theirAddr <- readMVar deadProcess
    runProcess localNode $
      monitorTestProcess theirAddr mOrL un DiedDisconnect Nothing done

  takeMVar done

-- | Monitor a process which terminates normally
testMonitorNormalTermination :: TestTransport -> Bool -> Bool -> Assertion
testMonitorNormalTermination :: TestTransport -> Bool -> Bool -> IO ()
testMonitorNormalTermination TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} Bool
mOrL Bool
un = do
  monitorSetup <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
  monitoredProcess <- newEmptyMVar
  done <- newEmptyMVar

  forkIO $ do
    localNode <- newLocalNode testTransport initRemoteTable
    addr <- forkProcess localNode $
      liftIO $ readMVar monitorSetup
    putMVar monitoredProcess addr

  forkIO $ do
    localNode <- newLocalNode testTransport initRemoteTable
    theirAddr <- readMVar monitoredProcess
    runProcess localNode $
      monitorTestProcess theirAddr mOrL un DiedNormal (Just monitorSetup) done

  takeMVar done

-- | Monitor a process which terminates abnormally
testMonitorAbnormalTermination :: TestTransport -> Bool -> Bool -> Assertion
testMonitorAbnormalTermination :: TestTransport -> Bool -> Bool -> IO ()
testMonitorAbnormalTermination TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} Bool
mOrL Bool
un = do
  monitorSetup <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
  monitoredProcess <- newEmptyMVar
  done <- newEmptyMVar

  let err = String -> IOError
userError String
"Abnormal termination"

  forkIO $ do
    localNode <- newLocalNode testTransport initRemoteTable
    addr <- forkProcess localNode . liftIO $ do
      readMVar monitorSetup
      throwIO err
    putMVar monitoredProcess addr

  forkIO $ do
    localNode <- newLocalNode testTransport initRemoteTable
    theirAddr <- readMVar monitoredProcess
    runProcess localNode $
      monitorTestProcess theirAddr mOrL un (DiedException (show err)) (Just monitorSetup) done

  takeMVar done

-- | Monitor a local process that is already dead
testMonitorLocalDeadProcess :: TestTransport -> Bool -> Bool -> Assertion
testMonitorLocalDeadProcess :: TestTransport -> Bool -> Bool -> IO ()
testMonitorLocalDeadProcess TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} Bool
mOrL Bool
un = do
  processAddr <- IO (MVar ProcessId)
forall a. IO (MVar a)
newEmptyMVar
  localNode <- newLocalNode testTransport initRemoteTable
  done <- newEmptyMVar

  forkIO $ do
    addr <- forkProcess localNode $ return ()
    putMVar processAddr addr

  forkIO $ do
    theirAddr <- readMVar processAddr
    runProcess localNode $ do
      monitor theirAddr
      -- wait for the process to die
      expect :: Process ProcessMonitorNotification
      monitorTestProcess theirAddr mOrL un DiedUnknownId Nothing done

  takeMVar done

-- | Monitor a remote process that is already dead
testMonitorRemoteDeadProcess :: TestTransport -> Bool -> Bool -> Assertion
testMonitorRemoteDeadProcess :: TestTransport -> Bool -> Bool -> IO ()
testMonitorRemoteDeadProcess TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} Bool
mOrL Bool
un = do
  processDead <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
  processAddr <- newEmptyMVar
  done <- newEmptyMVar

  forkIO $ do
    localNode <- newLocalNode testTransport initRemoteTable
    addr <- forkProcess localNode . liftIO $ putMVar processDead ()
    putMVar processAddr addr

  forkIO $ do
    localNode <- newLocalNode testTransport initRemoteTable
    theirAddr <- readMVar processAddr
    readMVar processDead
    runProcess localNode $ do
      monitorTestProcess theirAddr mOrL un DiedUnknownId Nothing done

  takeMVar done

-- | Monitor a process that becomes disconnected
testMonitorDisconnect :: TestTransport -> Bool -> Bool -> Assertion
testMonitorDisconnect :: TestTransport -> Bool -> Bool -> IO ()
testMonitorDisconnect TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} Bool
mOrL Bool
un = do
  processAddr <- IO (MVar ProcessId)
forall a. IO (MVar a)
newEmptyMVar
  processAddr2 <- newEmptyMVar
  monitorSetup <- newEmptyMVar
  done <- newEmptyMVar

  forkIO $ do
    localNode <- newLocalNode testTransport initRemoteTable
    addr <- forkProcess localNode $ expect
    addr2 <- forkProcess localNode $ return ()
    putMVar processAddr addr
    readMVar monitorSetup
    NT.closeEndPoint (localEndPoint localNode)
    putMVar processAddr2 addr2

  forkIO $ do
    localNode <- newLocalNode testTransport initRemoteTable
    theirAddr <- readMVar processAddr
    forkProcess localNode $ do
      lc <- liftIO $ readMVar processAddr2
      send lc ()
    runProcess localNode $ do
      monitorTestProcess theirAddr mOrL un DiedDisconnect (Just monitorSetup) done

  takeMVar done

-- | Test the math server (i.e., receiveWait)
testMath :: TestTransport -> Assertion
testMath :: TestTransport -> IO ()
testMath TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
  serverAddr <- IO (MVar ProcessId)
forall a. IO (MVar a)
newEmptyMVar
  clientDone <- newEmptyMVar

  -- Server
  forkIO $ do
    localNode <- newLocalNode testTransport initRemoteTable
    addr <- forkProcess localNode math
    putMVar serverAddr addr

  -- Client
  forkIO $ do
    localNode <- newLocalNode testTransport initRemoteTable
    mathServer <- readMVar serverAddr

    runProcess localNode $ do
      pid <- getSelfPid
      send mathServer (Add pid 1 2)
      three <- expect :: Process Double
      send mathServer (Divide pid 8 2)
      four <- expect :: Process Double
      send mathServer (Divide pid 8 0)
      divByZ <- expect
      liftIO $ putMVar clientDone (three, four, divByZ)

  res <- takeMVar clientDone
  case res of
    (Double
3, Double
4, DivByZero
DivByZero) -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    (Double, Double, DivByZero)
_                 -> String -> IO ()
forall a. HasCallStack => String -> a
error (String -> IO ()) -> String -> IO ()
forall a b. (a -> b) -> a -> b
$ String
"Something went horribly wrong"

-- | Send first message (i.e. connect) to an already terminated process
-- (without monitoring); then send another message to a second process on
-- the same remote node (we're checking that the remote node did not die)
testSendToTerminated :: TestTransport -> Assertion
testSendToTerminated :: TestTransport -> IO ()
testSendToTerminated TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
  serverAddr1 <- IO (MVar ProcessId)
forall a. IO (MVar a)
newEmptyMVar
  serverAddr2 <- newEmptyMVar
  clientDone <- newEmptyMVar

  forkIO $ do
    terminated <- newEmptyMVar
    localNode <- newLocalNode testTransport initRemoteTable
    addr1 <- forkProcess localNode $ liftIO $ putMVar terminated ()
    addr2 <- forkProcess localNode $ ping
    readMVar terminated
    putMVar serverAddr1 addr1
    putMVar serverAddr2 addr2

  forkIO $ do
    localNode <- newLocalNode testTransport initRemoteTable
    server1 <- readMVar serverAddr1
    server2 <- readMVar serverAddr2
    runProcess localNode $ do
      pid <- getSelfPid
      send server1 "Hi"
      send server2 (Pong pid)
      expectPing clientDone

  verifyClient "Expected Ping from server" clientDone

-- | Test (non-zero) timeout
testTimeout :: TestTransport -> Assertion
testTimeout :: TestTransport -> IO ()
testTimeout TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
  localNode <- Transport -> RemoteTable -> IO LocalNode
newLocalNode Transport
testTransport RemoteTable
initRemoteTable
  done <- newEmptyMVar

  runProcess localNode $ do
    res <- receiveTimeout 1000000 [match (\Add{} -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ())]
    liftIO $ putMVar done $ res == Nothing

  verifyClient "Expected receiveTimeout to timeout..." done

-- | Test zero timeout
testTimeout0 :: TestTransport -> Assertion
testTimeout0 :: TestTransport -> IO ()
testTimeout0 TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
  serverAddr <- IO (MVar ProcessId)
forall a. IO (MVar a)
newEmptyMVar
  clientDone <- newEmptyMVar

  forkIO $ do
    localNode <- newLocalNode testTransport initRemoteTable
    addr <- forkProcess localNode $ do
      -- Variation on the venerable ping server which uses a zero timeout
      partner <- fix $ \Process ProcessId
loop ->
        Int -> [Match ProcessId] -> Process (Maybe ProcessId)
forall b. Int -> [Match b] -> Process (Maybe b)
receiveTimeout Int
0 [(Pong -> Process ProcessId) -> Match ProcessId
forall a b. Serializable a => (a -> Process b) -> Match b
match (\(Pong ProcessId
partner) -> ProcessId -> Process ProcessId
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ProcessId
partner)]
          Process (Maybe ProcessId)
-> (Maybe ProcessId -> Process ProcessId) -> Process ProcessId
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Process ProcessId
-> (ProcessId -> Process ProcessId)
-> Maybe ProcessId
-> Process ProcessId
forall b a. b -> (a -> b) -> Maybe a -> b
maybe (IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Int -> IO ()
threadDelay Int
100000) Process () -> Process ProcessId -> Process ProcessId
forall a b. Process a -> Process b -> Process b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Process ProcessId
loop) ProcessId -> Process ProcessId
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return
      self <- getSelfPid
      send partner (Ping self)
    putMVar serverAddr addr

  forkIO $ do
    localNode <- newLocalNode testTransport initRemoteTable
    server <- readMVar serverAddr
    runProcess localNode $ do
      pid <- getSelfPid
      -- Send a bunch of messages. A large number of messages that the server
      -- is not interested in, and then a single message that it wants
      replicateM_ 10000 $ send server "Irrelevant message"
      send server (Pong pid)
      expectPing clientDone

  verifyClient "Expected Ping from server" clientDone

-- | Test typed channels
testTypedChannels :: TestTransport -> Assertion
testTypedChannels :: TestTransport -> IO ()
testTypedChannels TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
  serverChannel <- IO (MVar (SendPort (SendPort Bool, Int)))
forall a. IO (MVar a)
newEmptyMVar :: IO (MVar (SendPort (SendPort Bool, Int)))
  clientDone <- newEmptyMVar

  forkIO $ do
    localNode <- newLocalNode testTransport initRemoteTable
    forkProcess localNode $ do
      (serverSendPort, rport) <- newChan
      liftIO $ putMVar serverChannel serverSendPort
      (clientSendPort, i) <- receiveChan rport
      sendChan clientSendPort (even i)
    return ()

  forkIO $ do
    localNode <- newLocalNode testTransport initRemoteTable
    serverSendPort <- readMVar serverChannel
    runProcess localNode $ do
      (clientSendPort, rport) <- newChan
      sendChan serverSendPort (clientSendPort, 5)
      ch <- receiveChan rport
      liftIO $ putMVar clientDone $ ch == False

  verifyClient "Expected channel to send 'False'" clientDone

-- | Test merging receive ports
testMergeChannels :: TestTransport -> Assertion
testMergeChannels :: TestTransport -> IO ()
testMergeChannels TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
    localNode <- Transport -> RemoteTable -> IO LocalNode
newLocalNode Transport
testTransport RemoteTable
initRemoteTable
    testFlat localNode True          "aaabbbccc"
    testFlat localNode False         "abcabcabc"
    testNested localNode True True   "aaabbbcccdddeeefffggghhhiii"
    testNested localNode True False  "adgadgadgbehbehbehcficficfi"
    testNested localNode False True  "abcabcabcdefdefdefghighighi"
    testNested localNode False False "adgbehcfiadgbehcfiadgbehcfi"
    testBlocked localNode True
    testBlocked localNode False
  where
    -- Single layer of merging
    testFlat :: LocalNode -> Bool -> String -> IO ()
    testFlat :: LocalNode -> Bool -> String -> IO ()
testFlat LocalNode
localNode Bool
biased String
expected = do
      done <- IO (MVar Bool)
forall a. IO (MVar a)
newEmptyMVar
      forkProcess localNode $ do
        rs  <- mapM charChannel "abc"
        m   <- mergePorts biased rs
        xs  <- replicateM 9 $ receiveChan m
        liftIO $ putMVar done $ xs == expected
      verifyClient "Expected single layer merge to match expected ordering" done

    -- Two layers of merging
    testNested :: LocalNode -> Bool -> Bool -> String -> IO ()
    testNested :: LocalNode -> Bool -> Bool -> String -> IO ()
testNested LocalNode
localNode Bool
biasedInner Bool
biasedOuter String
expected = do
      done <- IO (MVar Bool)
forall a. IO (MVar a)
newEmptyMVar
      forkProcess localNode $ do
        rss  <- mapM (mapM charChannel) ["abc", "def", "ghi"]
        ms   <- mapM (mergePorts biasedInner) rss
        m    <- mergePorts biasedOuter ms
        xs   <- replicateM (9 * 3) $ receiveChan m
        liftIO $ putMVar done $ xs == expected
      verifyClient "Expected nested channels to match expeted ordering" done

    -- Test that if no messages are (immediately) available, the scheduler makes no difference
    testBlocked :: LocalNode -> Bool -> IO ()
    testBlocked :: LocalNode -> Bool -> IO ()
testBlocked LocalNode
localNode Bool
biased = do
      vs <- Int -> IO (MVar (SendPort Char)) -> IO [MVar (SendPort Char)]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
3 IO (MVar (SendPort Char))
forall a. IO (MVar a)
newEmptyMVar
      done <- newEmptyMVar

      forkProcess localNode $ do
        ss <- liftIO $ mapM readMVar vs
        case ss of
          [SendPort Char
sa, SendPort Char
sb, SendPort Char
sc] ->
            ((SendPort Char, Char) -> Process ())
-> [(SendPort Char, Char)] -> Process ()
forall (t :: * -> *) (m :: * -> *) a b.
(Foldable t, Monad m) =>
(a -> m b) -> t a -> m ()
mapM_ ((Process () -> Process () -> Process ()
forall a b. Process a -> Process b -> Process b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> Int -> Process ()
pause Int
10000) (Process () -> Process ())
-> ((SendPort Char, Char) -> Process ())
-> (SendPort Char, Char)
-> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. (SendPort Char -> Char -> Process ())
-> (SendPort Char, Char) -> Process ()
forall a b c. (a -> b -> c) -> (a, b) -> c
uncurry SendPort Char -> Char -> Process ()
forall a. Serializable a => SendPort a -> a -> Process ()
sendChan)
              [ -- a, b, c
                (SendPort Char
sa, Char
'a')
              , (SendPort Char
sb, Char
'b')
              , (SendPort Char
sc, Char
'c')
                -- a, c, b
              , (SendPort Char
sa, Char
'a')
              , (SendPort Char
sc, Char
'c')
              , (SendPort Char
sb, Char
'b')
                -- b, a, c
              , (SendPort Char
sb, Char
'b')
              , (SendPort Char
sa, Char
'a')
              , (SendPort Char
sc, Char
'c')
                -- b, c, a
              , (SendPort Char
sb, Char
'b')
              , (SendPort Char
sc, Char
'c')
              , (SendPort Char
sa, Char
'a')
                -- c, a, b
              , (SendPort Char
sc, Char
'c')
              , (SendPort Char
sa, Char
'a')
              , (SendPort Char
sb, Char
'b')
                -- c, b, a
              , (SendPort Char
sc, Char
'c')
              , (SendPort Char
sb, Char
'b')
              , (SendPort Char
sa, Char
'a')
              ]
          [SendPort Char]
_ -> String -> Process ()
forall a b. Serializable a => a -> Process b
die String
"Something went horribly wrong"

      forkProcess localNode $ do
        (ss, rs) <- unzip <$> replicateM 3 newChan
        liftIO $ mapM_ (uncurry putMVar) $ zip vs ss
        m  <- mergePorts biased rs
        xs <- replicateM (6 * 3) $ receiveChan m
        liftIO $ putMVar done $ xs == "abcacbbacbcacabcba"

      verifyClient "Expected merged ports to match expected ordering" done

    mergePorts :: Serializable a => Bool -> [ReceivePort a] -> Process (ReceivePort a)
    mergePorts :: forall a.
Serializable a =>
Bool -> [ReceivePort a] -> Process (ReceivePort a)
mergePorts Bool
True  = [ReceivePort a] -> Process (ReceivePort a)
forall a.
Serializable a =>
[ReceivePort a] -> Process (ReceivePort a)
mergePortsBiased
    mergePorts Bool
False = [ReceivePort a] -> Process (ReceivePort a)
forall a.
Serializable a =>
[ReceivePort a] -> Process (ReceivePort a)
mergePortsRR

    charChannel :: Char -> Process (ReceivePort Char)
    charChannel :: Char -> Process (ReceivePort Char)
charChannel Char
c = do
      (sport, rport) <- Process (SendPort Char, ReceivePort Char)
forall a. Serializable a => Process (SendPort a, ReceivePort a)
newChan
      replicateM_ 3 $ sendChan sport c
      liftIO $ threadDelay 10000 -- Make sure messages have been sent
      return rport

testTerminate :: TestTransport -> Assertion
testTerminate :: TestTransport -> IO ()
testTerminate TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
  localNode <- Transport -> RemoteTable -> IO LocalNode
newLocalNode Transport
testTransport RemoteTable
initRemoteTable
  runProcess localNode $ do
    e <- try terminate :: Process (Either ProcessTerminationException ())
    if either show show e == show ProcessTerminationException
      then return ()
      else die "Unexpected result from terminate"

testMonitorNode :: TestTransport -> Assertion
testMonitorNode :: TestTransport -> IO ()
testMonitorNode TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
  [node1, node2] <- Int -> IO LocalNode -> IO [LocalNode]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
2 (IO LocalNode -> IO [LocalNode]) -> IO LocalNode -> IO [LocalNode]
forall a b. (a -> b) -> a -> b
$ Transport -> RemoteTable -> IO LocalNode
newLocalNode Transport
testTransport RemoteTable
initRemoteTable
  done <- newEmptyMVar

  closeLocalNode node1

  runProcess node2 $ do
    ref <- monitorNode (localNodeId node1)
    receiveWait [
        match (\(NodeMonitorNotification MonitorRef
ref' NodeId
nid DiedReason
DiedDisconnect) ->
                Bool -> Process Bool
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> Process Bool) -> Bool -> Process Bool
forall a b. (a -> b) -> a -> b
$ MonitorRef
ref MonitorRef -> MonitorRef -> Bool
forall a. Eq a => a -> a -> Bool
== MonitorRef
ref' Bool -> Bool -> Bool
&& NodeId
nid NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== LocalNode -> NodeId
localNodeId LocalNode
node1)
      ] >>= liftIO . putMVar done

  verifyClient "Expected NodeMonitorNotification with matching ref & nodeId" done

testMonitorLiveNode :: TestTransport -> Assertion
testMonitorLiveNode :: TestTransport -> IO ()
testMonitorLiveNode TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
  [node1, node2] <- Int -> IO LocalNode -> IO [LocalNode]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
2 (IO LocalNode -> IO [LocalNode]) -> IO LocalNode -> IO [LocalNode]
forall a b. (a -> b) -> a -> b
$ Transport -> RemoteTable -> IO LocalNode
newLocalNode Transport
testTransport RemoteTable
initRemoteTable
  ready <- newEmptyMVar
  readyr <- newEmptyMVar
  done <- newEmptyMVar

  p <- forkProcess node1 $ return ()
  forkProcess node2 $ do
    ref <- monitorNode (localNodeId node1)
    liftIO $ putMVar ready ()
    liftIO $ takeMVar readyr
    send p ()
    receiveWait [
        match (\(NodeMonitorNotification MonitorRef
ref' NodeId
nid DiedReason
_) ->
                (Bool -> Process Bool
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> Process Bool) -> Bool -> Process Bool
forall a b. (a -> b) -> a -> b
$ MonitorRef
ref MonitorRef -> MonitorRef -> Bool
forall a. Eq a => a -> a -> Bool
== MonitorRef
ref' Bool -> Bool -> Bool
&& NodeId
nid NodeId -> NodeId -> Bool
forall a. Eq a => a -> a -> Bool
== LocalNode -> NodeId
localNodeId LocalNode
node1))
      ] >>= liftIO . putMVar done

  takeMVar ready
  closeLocalNode node1
  putMVar readyr ()

  verifyClient "Expected NodeMonitorNotification for LIVE node" done

testMonitorChannel :: TestTransport -> Assertion
testMonitorChannel :: TestTransport -> IO ()
testMonitorChannel TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
    [node1, node2] <- Int -> IO LocalNode -> IO [LocalNode]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
2 (IO LocalNode -> IO [LocalNode]) -> IO LocalNode -> IO [LocalNode]
forall a b. (a -> b) -> a -> b
$ Transport -> RemoteTable -> IO LocalNode
newLocalNode Transport
testTransport RemoteTable
initRemoteTable
    gotNotification <- newEmptyMVar

    pid <- forkProcess node1 $ do
      sport <- expect :: Process (SendPort ())
      ref <- monitorPort sport
      receiveWait [
          -- reason might be DiedUnknownId if the receive port is GCed before the
          -- monitor is established (TODO: not sure that this is reasonable)
          match (\(PortMonitorNotification MonitorRef
ref' SendPortId
port' DiedReason
reason) ->
                  Bool -> Process Bool
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> Process Bool) -> Bool -> Process Bool
forall a b. (a -> b) -> a -> b
$ MonitorRef
ref' MonitorRef -> MonitorRef -> Bool
forall a. Eq a => a -> a -> Bool
== MonitorRef
ref Bool -> Bool -> Bool
&& SendPortId
port' SendPortId -> SendPortId -> Bool
forall a. Eq a => a -> a -> Bool
== SendPort () -> SendPortId
forall a. SendPort a -> SendPortId
sendPortId SendPort ()
sport Bool -> Bool -> Bool
&&
                    (DiedReason
reason DiedReason -> DiedReason -> Bool
forall a. Eq a => a -> a -> Bool
== DiedReason
DiedNormal Bool -> Bool -> Bool
|| DiedReason
reason DiedReason -> DiedReason -> Bool
forall a. Eq a => a -> a -> Bool
== DiedReason
DiedUnknownId))
        ] >>= liftIO . putMVar gotNotification

    runProcess node2 $ do
      (sport, _) <- newChan :: Process (SendPort (), ReceivePort ())
      send pid sport
      liftIO $ threadDelay 100000

    verifyClient "Expected PortMonitorNotification" gotNotification

testRegistry :: TestTransport -> Assertion
testRegistry :: TestTransport -> IO ()
testRegistry TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
  node <- Transport -> RemoteTable -> IO LocalNode
newLocalNode Transport
testTransport RemoteTable
initRemoteTable
  done <- newEmptyMVar

  pingServer <- forkProcess node ping
  deadProcess <- forkProcess node (return ())

  runProcess node $ do
    register "ping" pingServer
    whereis "ping" >>= liftIO . assertBool "Unexpected ping" . (== Just pingServer)
    us <- getSelfPid
    nsend "ping" (Pong us)
    receiveWait [
        matchIf (\(Ping ProcessId
pid') -> ProcessId
pingServer ProcessId -> ProcessId -> Bool
forall a. Eq a => a -> a -> Bool
== ProcessId
pid') (const $ return ())
      ]
    checkRegException "dead" Nothing deadProcess
    checkRegException "ping" (Just pingServer) deadProcess
    try (unregister "dead") >>= checkReg "dead" Nothing
    liftIO $ putMVar done ()

  takeMVar done

  where
    checkRegException :: String -> p -> ProcessId -> Process ()
checkRegException String
name p
pid ProcessId
dead =
      Process () -> Process (Either ProcessRegistrationException ())
forall (m :: * -> *) e a.
(HasCallStack, MonadCatch m, Exception e) =>
m a -> m (Either e a)
try (String -> ProcessId -> Process ()
register String
name ProcessId
dead) Process (Either ProcessRegistrationException ())
-> (Either ProcessRegistrationException () -> Process ())
-> Process ()
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= String -> p -> Either ProcessRegistrationException () -> Process ()
forall {b} {p} {p}.
Show b =>
p -> p -> Either ProcessRegistrationException b -> Process ()
checkReg String
name p
pid

    checkReg :: p -> p -> Either ProcessRegistrationException b -> Process ()
checkReg p
_ p
_ Either ProcessRegistrationException b
res =
      case Either ProcessRegistrationException b
res of
        Left (ProcessRegistrationException String
_ Maybe ProcessId
_) -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Either ProcessRegistrationException b
_ -> String -> Process ()
forall a b. Serializable a => a -> Process b
die (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ String
"Unexpected Registration" String -> ShowS
forall a. [a] -> [a] -> [a]
++ Either ProcessRegistrationException b -> String
forall a. Show a => a -> String
show Either ProcessRegistrationException b
res

testRegistryRemoteProcess :: TestTransport -> Assertion
testRegistryRemoteProcess :: TestTransport -> IO ()
testRegistryRemoteProcess TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
  node1 <- Transport -> RemoteTable -> IO LocalNode
newLocalNode Transport
testTransport RemoteTable
initRemoteTable
  node2 <- newLocalNode testTransport initRemoteTable
  done <- newEmptyMVar

  pingServer <- forkProcess node1 ping

  runProcess node2 $ do
    register "ping" pingServer
    whereis "ping" >>= liftIO . assertBool "Unexpected ping" . (== Just pingServer)
    us <- getSelfPid
    nsend "ping" (Pong us)
    receiveWait [
        matchIf (\(Ping ProcessId
pid') -> ProcessId
pingServer ProcessId -> ProcessId -> Bool
forall a. Eq a => a -> a -> Bool
== ProcessId
pid')
                (const $ liftIO $ putMVar done ())
      ]

  takeMVar done

testRemoteRegistry :: TestTransport -> Assertion
testRemoteRegistry :: TestTransport -> IO ()
testRemoteRegistry TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
  node1 <- Transport -> RemoteTable -> IO LocalNode
newLocalNode Transport
testTransport RemoteTable
initRemoteTable
  node2 <- newLocalNode testTransport initRemoteTable

  pingServer <- forkProcess node1 ping
  deadProcess <- forkProcess node1 (return ())

  runProcess node2 $ do
    let nid1 = LocalNode -> NodeId
localNodeId LocalNode
node1
    registerRemoteAsync nid1 "ping" pingServer
    receiveWait [
       matchIf (\(RegisterReply String
label' Bool
_ (Just ProcessId
pid)) ->
                    String
"ping" String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
label' Bool -> Bool -> Bool
&& ProcessId
pid ProcessId -> ProcessId -> Bool
forall a. Eq a => a -> a -> Bool
== ProcessId
pingServer)
               (\(RegisterReply String
_ Bool
_ Maybe ProcessId
_) -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()) ]

    pid <- verifyWhereIsRemote nid1 "ping"
    liftIO $ assertBool "Expected pindServer to match pid" $ pingServer == pid

    us <- getSelfPid
    nsendRemote nid1 "ping" (Pong us)
    receiveWait [
        match (\(Ping ProcessId
pid') -> Bool -> Process Bool
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> Process Bool) -> Bool -> Process Bool
forall a b. (a -> b) -> a -> b
$ ProcessId
pingServer ProcessId -> ProcessId -> Bool
forall a. Eq a => a -> a -> Bool
== ProcessId
pid')
      ] >>= liftIO . assertBool "Expected Ping with ping server's ProcessId"

    -- test that if process was not registered Nothing is returned
    -- in owner field.
    registerRemoteAsync nid1 "dead" deadProcess
    receiveWait [ matchIf (\(RegisterReply String
label' Bool
_ Maybe ProcessId
_) -> String
"dead" String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
label')
                          (\(RegisterReply String
_ Bool
f Maybe ProcessId
mPid) -> Bool -> Process Bool
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> Bool
not Bool
f Bool -> Bool -> Bool
&& Maybe ProcessId -> Bool
forall a. Maybe a -> Bool
isNothing Maybe ProcessId
mPid))
                ] >>= liftIO . assertBool "Expected False Nothing in RegisterReply"

    registerRemoteAsync nid1 "ping" deadProcess
    receiveWait [
        matchIf (\(RegisterReply String
label' Bool
False Maybe ProcessId
mPid) ->
                     String
"ping" String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
label' Bool -> Bool -> Bool
&& Maybe ProcessId -> Bool
forall a. Maybe a -> Bool
isJust Maybe ProcessId
mPid)
                (\(RegisterReply String
_ Bool
f (Just ProcessId
pid'')) -> Bool -> Process Bool
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> Bool
not Bool
f Bool -> Bool -> Bool
&& ProcessId
pid'' ProcessId -> ProcessId -> Bool
forall a. Eq a => a -> a -> Bool
== ProcessId
pingServer))
      ] >>= liftIO . assertBool "Expected False and (Just alreadyRegisteredPid) in RegisterReply"

    unregisterRemoteAsync nid1 "dead"
    receiveWait [
        matchIf (\(RegisterReply String
label' Bool
_ Maybe ProcessId
_) -> String
"dead" String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
label')
                (\(RegisterReply String
_ Bool
f Maybe ProcessId
mPid) -> Bool -> Process Bool
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> Bool
not Bool
f Bool -> Bool -> Bool
&& Maybe ProcessId -> Bool
forall a. Maybe a -> Bool
isNothing Maybe ProcessId
mPid))
      ] >>= liftIO . assertBool "Expected False and Nothing in RegisterReply"

testRemoteRegistryRemoteProcess :: TestTransport -> Assertion
testRemoteRegistryRemoteProcess :: TestTransport -> IO ()
testRemoteRegistryRemoteProcess TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
  node1 <- Transport -> RemoteTable -> IO LocalNode
newLocalNode Transport
testTransport RemoteTable
initRemoteTable
  node2 <- newLocalNode testTransport initRemoteTable
  done <- newEmptyMVar

  pingServer <- forkProcess node2 ping

  runProcess node2 $ do
    let nid1 = LocalNode -> NodeId
localNodeId LocalNode
node1
    registerRemoteAsync nid1 "ping" pingServer
    receiveWait [
       matchIf (\(RegisterReply String
label' Bool
_ Maybe ProcessId
_) -> String
"ping" String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
label')
               (\(RegisterReply String
_ Bool
_ Maybe ProcessId
_) -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()) ]
    pid <- verifyWhereIsRemote nid1 "ping"
    liftIO $ assertBool "Expected pingServer to match remote name" $ pingServer == pid
    us <- getSelfPid
    nsendRemote nid1 "ping" (Pong us)
    receiveWait [
        match (\(Ping ProcessId
pid') -> Bool -> Process Bool
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> Process Bool) -> Bool -> Process Bool
forall a b. (a -> b) -> a -> b
$ ProcessId
pingServer ProcessId -> ProcessId -> Bool
forall a. Eq a => a -> a -> Bool
== ProcessId
pid')
      ] >>= liftIO . putMVar done

  verifyClient "Expected Ping with ping server's ProcessId" done

testSpawnLocal :: TestTransport -> Assertion
testSpawnLocal :: TestTransport -> IO ()
testSpawnLocal TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
  node <- Transport -> RemoteTable -> IO LocalNode
newLocalNode Transport
testTransport RemoteTable
initRemoteTable
  done <- newEmptyMVar

  runProcess node $ do
    us <- getSelfPid

    pid <- spawnLocal $ do
      sport <- expect
      sendChan sport (1234 :: Int)

    sport <- spawnChannelLocal $
      \ReceivePort Int
rport -> (ReceivePort Int -> Process Int
forall a. Serializable a => ReceivePort a -> Process a
receiveChan ReceivePort Int
rport :: Process Int) Process Int -> (Int -> Process ()) -> Process ()
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= ProcessId -> Int -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
send ProcessId
us

    send pid sport
    expect >>= liftIO . putMVar done

  res <- takeMVar done
  assertBool "Expected 1234 :: Int" $ res == (1234 :: Int)

testSpawnAsyncStrictness :: TestTransport -> Assertion
testSpawnAsyncStrictness :: TestTransport -> IO ()
testSpawnAsyncStrictness TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
  node <- Transport -> RemoteTable -> IO LocalNode
newLocalNode Transport
testTransport RemoteTable
initRemoteTable
  done <- newEmptyMVar

  runProcess node $ do
    here <-getSelfNode

    ev <- try $ spawnAsync here (error "boom")
    liftIO $ case ev of
      Right SpawnRef
_ -> MVar (IO ()) -> IO () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (IO ())
done (String -> IO ()
forall a. HasCallStack => String -> a
error String
"Exception didn't fire")
      Left (SomeException
_::SomeException) -> MVar (IO ()) -> IO () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (IO ())
done (() -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ())

  join $ takeMVar done

testReconnect :: TestTransport -> Assertion
testReconnect :: TestTransport -> IO ()
testReconnect TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
  [node1, node2] <- Int -> IO LocalNode -> IO [LocalNode]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
2 (IO LocalNode -> IO [LocalNode]) -> IO LocalNode -> IO [LocalNode]
forall a b. (a -> b) -> a -> b
$ Transport -> RemoteTable -> IO LocalNode
newLocalNode Transport
testTransport RemoteTable
initRemoteTable
  let nid1 = LocalNode -> NodeId
localNodeId LocalNode
node1
  processA <- newEmptyMVar
  [sendTestOk, registerTestOk] <- replicateM 2 newEmptyMVar

  forkProcess node1 $ do
    us <- getSelfPid
    liftIO $ putMVar processA us
    msg1 <- expect
    msg2 <- expect
    liftIO $ do
      assertBool "messages did not match" $ msg1 == "message 1" && msg2 == "message 3"
      putMVar sendTestOk ()

  forkProcess node2 $ do
    {-
     - Make sure there is no implicit reconnect on normal message sending
     -}

    them <- liftIO $ readMVar processA
    send them "message 1" >> liftIO (threadDelay 100000)

    -- Simulate network failure
    liftIO $ syncBreakConnection testBreakConnection node1 node2


    -- Should not arrive
    send them "message 2"

    -- Should arrive
    reconnect them
    send them "message 3"

    liftIO $ takeMVar sendTestOk

    {-
     - Test that there *is* implicit reconnect on node controller messages
     -}

    us <- getSelfPid
    registerRemoteAsync nid1 "a" us -- registerRemote is asynchronous
    receiveWait [
        matchIf (\(RegisterReply String
label' Bool
_ Maybe ProcessId
_) -> String
"a" String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
label')
                (\(RegisterReply String
_ Bool
_ Maybe ProcessId
_) -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()) ]

    _  <- whereisRemote nid1 "a"


    -- Simulate network failure
    liftIO $ syncBreakConnection testBreakConnection node1 node2

    -- This will happen due to implicit reconnect
    registerRemoteAsync nid1 "b" us
    receiveWait [
        matchIf (\(RegisterReply String
label' Bool
_ Maybe ProcessId
_) -> String
"b" String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
label')
                (\(RegisterReply String
_ Bool
_ Maybe ProcessId
_) -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()) ]

    -- Should happen
    registerRemoteAsync nid1 "c" us
    receiveWait [
        matchIf (\(RegisterReply String
label' Bool
_ Maybe ProcessId
_) -> String
"c" String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
label')
                (\(RegisterReply String
_ Bool
_ Maybe ProcessId
_) -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()) ]

    -- Check
    mPid <- whereisRemote nid1 "a"  -- this will fail because the name is removed when the node is disconnected
    liftIO $ assertBool "Expected remote name to be lost" $ mPid == Nothing
    verifyWhereIsRemote nid1 "b"  -- this will suceed because the value is set after thereconnect
    verifyWhereIsRemote nid1 "c"

    liftIO $ putMVar registerTestOk ()

  takeMVar registerTestOk

-- | Tests that unreliable messages arrive sorted even when there are connection
-- failures.
testUSend :: (ProcessId -> Int -> Process ())
          -> TestTransport -> Int -> Assertion
testUSend :: (ProcessId -> Int -> Process ()) -> TestTransport -> Int -> IO ()
testUSend ProcessId -> Int -> Process ()
usendPrim TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} Int
numMessages = do
  [node1, node2] <- Int -> IO LocalNode -> IO [LocalNode]
forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a]
replicateM Int
2 (IO LocalNode -> IO [LocalNode]) -> IO LocalNode -> IO [LocalNode]
forall a b. (a -> b) -> a -> b
$ Transport -> RemoteTable -> IO LocalNode
newLocalNode Transport
testTransport RemoteTable
initRemoteTable
  let nid1 = LocalNode -> NodeId
localNodeId LocalNode
node1
      nid2 = LocalNode -> NodeId
localNodeId LocalNode
node2
  processA <- newEmptyMVar
  usendTestOk <- newEmptyMVar

  forkProcess node1 $ flip catch (\SomeException
e -> IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ SomeException -> IO ()
forall a. Show a => a -> IO ()
print (SomeException
e :: SomeException) ) $ do
    us <- getSelfPid
    liftIO $ putMVar processA us
    them <- expect
    send them ()
    _ <- monitor them
    let -- Collects messages from 'them' until the sender dies.
        -- Disconnection notifications are ignored.
        receiveMessages :: Process [Int]
        receiveMessages = [Match [Int]] -> Process [Int]
forall b. [Match b] -> Process b
receiveWait
              [ (ProcessMonitorNotification -> Process [Int]) -> Match [Int]
forall a b. Serializable a => (a -> Process b) -> Match b
match ((ProcessMonitorNotification -> Process [Int]) -> Match [Int])
-> (ProcessMonitorNotification -> Process [Int]) -> Match [Int]
forall a b. (a -> b) -> a -> b
$ \ProcessMonitorNotification
mn -> case ProcessMonitorNotification
mn of
                  ProcessMonitorNotification MonitorRef
_ ProcessId
_ DiedReason
DiedDisconnect -> do
                    ProcessId -> Process MonitorRef
monitor ProcessId
them
                    Process [Int]
receiveMessages
                  ProcessMonitorNotification
_ -> [Int] -> Process [Int]
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return []
              , (Int -> Process [Int]) -> Match [Int]
forall a b. Serializable a => (a -> Process b) -> Match b
match ((Int -> Process [Int]) -> Match [Int])
-> (Int -> Process [Int]) -> Match [Int]
forall a b. (a -> b) -> a -> b
$ \Int
i -> ([Int] -> [Int]) -> Process [Int] -> Process [Int]
forall a b. (a -> b) -> Process a -> Process b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap (Int
i Int -> [Int] -> [Int]
forall a. a -> [a] -> [a]
:) Process [Int]
receiveMessages
              ]
    msgs <- receiveMessages
    let -- Checks that the input list is sorted.
        isSorted :: [Int] -> Bool
        isSorted (Int
x : xs :: [Int]
xs@(Int
y : [Int]
_)) = Int
x Int -> Int -> Bool
forall a. Ord a => a -> a -> Bool
<= Int
y Bool -> Bool -> Bool
&& [Int] -> Bool
isSorted [Int]
xs
        isSorted [Int]
_                = Bool
True
    -- The list can't be null since there are no failures after sending
    -- the last message.
    liftIO $ putMVar usendTestOk $ isSorted msgs && not (null msgs)

  forkProcess node2 $ do
    them <- liftIO $ readMVar processA
    getSelfPid >>= send them
    expect :: Process ()
    forM_ [1..numMessages] $ \Int
i -> do
      IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection (NodeId -> EndPointAddress
nodeAddress NodeId
nid1) (NodeId -> EndPointAddress
nodeAddress NodeId
nid2)
      ProcessId -> Int -> Process ()
usendPrim ProcessId
them Int
i
      IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (Int -> IO ()
threadDelay Int
30000)

  res <- takeMVar usendTestOk
  assertBool "Unexpected failure after sending last msg" res

-- | Test 'matchAny'. This repeats the 'testMath' but with a proxy server
-- in between
testMatchAny :: TestTransport -> Assertion
testMatchAny :: TestTransport -> IO ()
testMatchAny TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
  proxyAddr <- IO (MVar ProcessId)
forall a. IO (MVar a)
newEmptyMVar
  clientDone <- newEmptyMVar

  -- Math server
  forkIO $ do
    localNode <- newLocalNode testTransport initRemoteTable
    mathServer <- forkProcess localNode math
    proxyServer <- forkProcess localNode $ forever $ do
      msg <- receiveWait [ matchAny return ]
      forward msg mathServer
    putMVar proxyAddr proxyServer

  -- Client
  forkIO $ do
    localNode <- newLocalNode testTransport initRemoteTable
    mathServer <- readMVar proxyAddr

    runProcess localNode $ do
      pid <- getSelfPid
      send mathServer (Add pid 1 2)
      three <- expect :: Process Double
      send mathServer (Divide pid 8 2)
      four <- expect :: Process Double
      send mathServer (Divide pid 8 0)
      divByZ <- expect
      liftIO $ putMVar clientDone (three, four, divByZ)

  res <- takeMVar clientDone
  case res of
    (Double
3, Double
4, DivByZero
DivByZero) -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    (Double, Double, DivByZero)
_                 -> String -> IO ()
forall a. HasCallStack => String -> a
error String
"Unexpected result"

-- | Test 'matchAny'. This repeats the 'testMath' but with a proxy server
-- in between, however we block 'Divide' requests ....
testMatchAnyHandle :: TestTransport -> Assertion
testMatchAnyHandle :: TestTransport -> IO ()
testMatchAnyHandle TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
  proxyAddr <- IO (MVar ProcessId)
forall a. IO (MVar a)
newEmptyMVar
  clientDone <- newEmptyMVar

  -- Math server
  forkIO $ do
    localNode <- newLocalNode testTransport initRemoteTable
    mathServer <- forkProcess localNode math
    proxyServer <- forkProcess localNode $ forever $ do
        receiveWait [
            matchAny (maybeForward mathServer)
          ]
    putMVar proxyAddr proxyServer

  -- Client
  forkIO $ do
    localNode <- newLocalNode testTransport initRemoteTable
    mathServer <- readMVar proxyAddr

    runProcess localNode $ do
      pid <- getSelfPid
      send mathServer (Add pid 1 2)
      three <- expect :: Process Double
      send mathServer (Divide pid 8 2)
      res <- (expectTimeout 100000) :: Process (Maybe Double)
      liftIO $ putMVar clientDone $ three == 3 && res == Nothing

  verifyClient "Expected Nothing (i.e. timeout)" clientDone

  where maybeForward :: ProcessId -> Message -> Process (Maybe ())
        maybeForward :: ProcessId -> Message -> Process (Maybe ())
maybeForward ProcessId
s Message
msg =
            Message -> (Add -> Process ()) -> Process (Maybe ())
forall (m :: * -> *) a b.
(Monad m, Serializable a) =>
Message -> (a -> m b) -> m (Maybe b)
handleMessage Message
msg (\m :: Add
m@(Add ProcessId
_ Double
_ Double
_) -> ProcessId -> Add -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
send ProcessId
s Add
m)

testMatchAnyNoHandle :: TestTransport -> Assertion
testMatchAnyNoHandle :: TestTransport -> IO ()
testMatchAnyNoHandle TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
  addr <- IO (MVar ProcessId)
forall a. IO (MVar a)
newEmptyMVar
  clientDone <- newEmptyMVar
  serverDone <- newEmptyMVar

  -- Math server
  forkIO $ do
    localNode <- newLocalNode testTransport initRemoteTable
    server <- forkProcess localNode $ forever $ do
        receiveWait [
          matchAnyIf
            -- the condition has type `Add -> Bool`
            (\(Add ProcessId
_ Double
_ Double
_) -> Bool
True)
            -- the match `AbstractMessage -> Process ()` will succeed!
            (\Message
m -> do
              -- `String -> Process ()` does *not* match the input types however
              r <- (Message
-> (String -> Process (ZonkAny 2)) -> Process (Maybe (ZonkAny 2))
forall (m :: * -> *) a b.
(Monad m, Serializable a) =>
Message -> (a -> m b) -> m (Maybe b)
handleMessage Message
m (\(String
_ :: String) -> String -> Process (ZonkAny 2)
forall a b. Serializable a => a -> Process b
die String
"NONSENSE" ))
              case r of
                Maybe (ZonkAny 2)
Nothing -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
                Just ZonkAny 2
_  -> String -> Process ()
forall a b. Serializable a => a -> Process b
die String
"NONSENSE")
          ]
        -- we *must* have removed the message from our mailbox though!!!
        res <- receiveTimeout 100000 [ match (\(Add ProcessId
_ Double
_ Double
_) -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()) ]
        liftIO $ do
          assertBool "Expected timeout!" $ res == Nothing
          putMVar serverDone ()
    putMVar addr server

  -- Client
  forkIO $ do
    localNode <- newLocalNode testTransport initRemoteTable
    server <- readMVar addr

    runProcess localNode $ do
      pid <- getSelfPid
      send server (Add pid 1 2)
      -- we only care about the client having sent a message, so we're done
      liftIO $ putMVar clientDone ()

  takeMVar clientDone
  takeMVar serverDone

-- | Test 'matchAnyIf'. We provide an /echo/ server, but it ignores requests
-- unless the text body @/= "bar"@ - this case should time out rather than
-- removing the message from the process mailbox.
testMatchAnyIf :: TestTransport -> Assertion
testMatchAnyIf :: TestTransport -> IO ()
testMatchAnyIf TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
  echoAddr <- IO (MVar ProcessId)
forall a. IO (MVar a)
newEmptyMVar
  clientDone <- newEmptyMVar

  -- echo server
  forkIO $ do
    localNode <- newLocalNode testTransport initRemoteTable
    echoServer <- forkProcess localNode $ forever $ do
        receiveWait [
            matchAnyIf (\(ProcessId
_ :: ProcessId, (String
s :: String)) -> String
s String -> String -> Bool
forall a. Eq a => a -> a -> Bool
/= String
"bar")
                       tryHandleMessage
          ]
    putMVar echoAddr echoServer

  -- Client
  forkIO $ do
    localNode <- newLocalNode testTransport initRemoteTable
    server <- readMVar echoAddr

    runProcess localNode $ do
      pid <- getSelfPid
      send server (pid, "foo")
      foo <- expect
      -- provoking what would be the wrong ordering is informative here...

      send server (pid, "bar")
      bar <- (expectTimeout 100000) :: Process (Maybe String) -- was Double o_O !?

      send server (pid, "baz")
      baz <- expect

      liftIO $ putMVar clientDone (foo, bar, baz)

  res <- takeMVar clientDone
  let res' = (String, Maybe String, String)
res (String, Maybe String, String)
-> (String, Maybe String, String) -> Bool
forall a. Eq a => a -> a -> Bool
== (String
"foo", Maybe String
forall a. Maybe a
Nothing, String
"baz")
  assertBool "Expected timeout due to type mismatch" res'

  where tryHandleMessage :: Message -> Process (Maybe ())
        tryHandleMessage :: Message -> Process (Maybe ())
tryHandleMessage Message
msg =
          Message
-> ((ProcessId, String) -> Process ()) -> Process (Maybe ())
forall (m :: * -> *) a b.
(Monad m, Serializable a) =>
Message -> (a -> m b) -> m (Maybe b)
handleMessage Message
msg (\(ProcessId
pid :: ProcessId, (String
m :: String))
                                  -> do { ProcessId -> String -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
send ProcessId
pid String
m; () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return () })

testMatchMessageWithUnwrap :: TestTransport -> Assertion
testMatchMessageWithUnwrap :: TestTransport -> IO ()
testMatchMessageWithUnwrap TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
  echoAddr <- IO (MVar ProcessId)
forall a. IO (MVar a)
newEmptyMVar
  clientDone <- newEmptyMVar

    -- echo server
  forkIO $ do
    localNode <- newLocalNode testTransport initRemoteTable
    echoServer <- forkProcess localNode $ forever $ do
        msg <- receiveWait [
            matchMessage (\(Message
m :: Message) -> do
                            Message -> Process Message
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Message
m)
          ]
        unwrapped <- unwrapMessage msg :: Process (Maybe (ProcessId, Message))
        case unwrapped of
          (Just (ProcessId
p, Message
msg')) -> Message -> ProcessId -> Process ()
forward Message
msg' ProcessId
p
          Maybe (ProcessId, Message)
Nothing -> String -> Process ()
forall a b. Serializable a => a -> Process b
die String
"unable to unwrap the message"
    putMVar echoAddr echoServer

  -- Client
  forkIO $ do
    localNode <- newLocalNode testTransport initRemoteTable
    server <- readMVar echoAddr

    runProcess localNode $ do
      pid <- getSelfPid
      send server (pid, wrapMessage ("foo" :: String))
      foo <- expect
      send server (pid, wrapMessage ("baz" :: String))
      baz <- expect
      liftIO $ putMVar clientDone (foo, baz)

  res <- takeMVar clientDone
  assertBool "Unexpected unwrapped results" $ res == ("foo", "baz")

-- Test 'receiveChanTimeout'
testReceiveChanTimeout :: TestTransport -> Assertion
testReceiveChanTimeout :: TestTransport -> IO ()
testReceiveChanTimeout TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
  done <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar
  mvSender <- newEmptyMVar
  sendPort <- newEmptyMVar

  forkTry $ do
    localNode <- newLocalNode testTransport initRemoteTable
    runProcess localNode $ do
      -- Create a typed channel
      (sp, rp) <- newChan :: Process (SendPort Bool, ReceivePort Bool)
      liftIO $ putMVar sendPort sp

      -- Wait for a message with a delay. No message arrives, we should get
      -- Nothing after the delay.
      receiveChanTimeout 100000 rp >>= maybe (return ()) (const $ die "Expected Timeout")

      -- Let the sender know that it can send a message.
      liftIO $ putMVar mvSender ()

      -- Wait for a message with a delay again. Now a message arrives after
      -- 0.1 seconds
      res <- receiveChanTimeout 20000000 rp >>= maybe (die "Timeout") return
      liftIO $ assertBool "Expected result to be 'True'" res

      -- Wait for a message with zero timeout: non-blocking check. No message is
      -- available, we get Nothing
      receiveChanTimeout 0 rp >>= maybe (return ()) (const $ die "Expected Timeout")

      -- Let the sender know that it can send a message.
      liftIO $ putMVar mvSender ()

      -- Again, but now there is a message available
      fix $ \Process ()
loop -> do
        IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ Int -> IO ()
threadDelay Int
100000
        mb <- Int -> ReceivePort Bool -> Process (Maybe Bool)
forall a.
Serializable a =>
Int -> ReceivePort a -> Process (Maybe a)
receiveChanTimeout Int
0 ReceivePort Bool
rp
        case mb of
          Just Bool
b -> do IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ HasCallStack => String -> Bool -> IO ()
String -> Bool -> IO ()
assertBool String
"Unexpected Message" (Bool -> IO ()) -> Bool -> IO ()
forall a b. (a -> b) -> a -> b
$ Bool -> Bool
not Bool
b
          Maybe Bool
_      -> Process ()
loop

      liftIO $ putMVar done ()

  forkTry $ do
    localNode <- newLocalNode testTransport initRemoteTable
    runProcess localNode $ do
      sp <- liftIO $ readMVar sendPort

      liftIO $ takeMVar mvSender
      liftIO $ threadDelay 100000
      sendChan sp True

      liftIO $ takeMVar mvSender
      sendChan sp False

  takeMVar done

-- | Test Functor, Applicative, Alternative and Monad instances for ReceiveChan
testReceiveChanFeatures :: TestTransport -> Assertion
testReceiveChanFeatures :: TestTransport -> IO ()
testReceiveChanFeatures TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
  done <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar

  forkTry $ do
    localNode <- newLocalNode testTransport initRemoteTable
    runProcess localNode $ do
      (spInt,  rpInt)  <- newChan :: Process (SendPort Int, ReceivePort Int)
      (spBool, rpBool) <- newChan :: Process (SendPort Bool, ReceivePort Bool)

      -- Test Functor instance

      sendChan spInt 2
      sendChan spBool False

      rp1 <- mergePortsBiased [even <$> rpInt, rpBool]

      receiveChan rp1 >>= liftIO . assertBool "Expected True"
      receiveChan rp1 >>= liftIO . assertBool "Expected False" . not

      -- Test Applicative instance

      sendChan spInt 3
      sendChan spInt 4

      let rp2 = (Int -> Int -> Int) -> ReceivePort (Int -> Int -> Int)
forall a. a -> ReceivePort a
forall (f :: * -> *) a. Applicative f => a -> f a
pure Int -> Int -> Int
forall a. Num a => a -> a -> a
(+) ReceivePort (Int -> Int -> Int)
-> ReceivePort Int -> ReceivePort (Int -> Int)
forall a b. ReceivePort (a -> b) -> ReceivePort a -> ReceivePort b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> ReceivePort Int
rpInt ReceivePort (Int -> Int) -> ReceivePort Int -> ReceivePort Int
forall a b. ReceivePort (a -> b) -> ReceivePort a -> ReceivePort b
forall (f :: * -> *) a b. Applicative f => f (a -> b) -> f a -> f b
<*> ReceivePort Int
rpInt

      receiveChan rp2 >>= liftIO . assertBool "Expected 7" . (== 7)

      -- Test Alternative instance

      sendChan spInt 3
      sendChan spBool True

      let rp3 = (Int -> Bool
forall a. Integral a => a -> Bool
even (Int -> Bool) -> ReceivePort Int -> ReceivePort Bool
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
<$> ReceivePort Int
rpInt) ReceivePort Bool -> ReceivePort Bool -> ReceivePort Bool
forall a. ReceivePort a -> ReceivePort a -> ReceivePort a
forall (f :: * -> *) a. Alternative f => f a -> f a -> f a
<|> ReceivePort Bool
rpBool

      receiveChan rp3 >>= liftIO . assertBool "Expected False" . not
      receiveChan rp3 >>= liftIO . assertBool "Expected True"

      -- Test Monad instance

      sendChan spBool True
      sendChan spBool False
      sendChan spInt 5

      let rp4 :: ReceivePort Int
          rp4 = do b <- ReceivePort Bool
rpBool
                   if b
                     then rpInt
                     else return 7

      receiveChan rp4 >>= liftIO . assertBool "Expected 5" . (== 5)
      receiveChan rp4 >>= liftIO . assertBool "Expected 7" . (== 7)

      liftIO $ putMVar done ()

  takeMVar done

testChanLifecycle :: TestTransport -> Assertion
testChanLifecycle :: TestTransport -> IO ()
testChanLifecycle TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = let delay :: Int
delay = Int
3000000 in do
  result <- IO (MVar Bool)
forall a. IO (MVar a)
newEmptyMVar
  tchMV <- newEmptyMVar
  localNode <- newLocalNode testTransport initRemoteTable
  runProcess localNode $ do

    pid <- spawnLocal $ do tCh  <- newChan :: Process (SendPort (), ReceivePort ())
                           liftIO $ putMVar tchMV tCh
                           expect :: Process ()
                           let (sp, _) = tCh
                           sendChan sp ()
                           expect :: Process ()

    mRefPid <- monitor pid

    cPid <- spawnLocal $ do
      (sp', rp) <- liftIO $ takeMVar tchMV
      -- say "picked up our test channel"
      send pid ()
      -- say "told pid to continue"
      res <- receiveChanTimeout delay rp
      case res of
        Maybe ()
Nothing -> String -> Process ()
say String
"initial chan () missing!" Process () -> Process () -> Process ()
forall a b. Process a -> Process b -> Process b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> (IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MVar Bool -> Bool -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar Bool
result Bool
False)
        Just () -> do _ <- ProcessId -> Process MonitorRef
monitor ProcessId
pid
                      pause 10000
                      -- say "sending pid a second () will cause it to exit"
                      send pid ()

                      -- say "make sure we see a DOWN notification for pid having stopped"
                      receiveWait [ match (\(ProcessMonitorNotification
_ :: ProcessMonitorNotification) -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()) ]

                      -- now that pid has died, the send port should be useless...
                      liftIO $ putMVar tchMV (sp', rp)

                      -- let's verify that we do not see the message from our
                      -- parent process on the channel, once pid has died...
                      recv <- receiveChanTimeout delay rp
                      -- say $ "finished waiting for second (), writing result" ++ (show recv)
                      liftIO $ putMVar result $ isNothing recv

    mRefCPid <- monitor cPid

    receiveWait
        [ matchIf (\(ProcessMonitorNotification MonitorRef
r ProcessId
_ DiedReason
_) -> MonitorRef
r MonitorRef -> MonitorRef -> Bool
forall a. Eq a => a -> a -> Bool
== MonitorRef
mRefPid)
                  (\ProcessMonitorNotification
_ -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
        ]

    -- say "seen first pid die..."

    (sendPort, _) <- liftIO $ takeMVar tchMV
    sendChan sendPort ()
    -- say "sent () after owning pid died"

    -- let cPid know we've written to the channel...
    send cPid ()

    receiveWait
        [ matchIf (\(ProcessMonitorNotification MonitorRef
r ProcessId
_ DiedReason
_) -> MonitorRef
r MonitorRef -> MonitorRef -> Bool
forall a. Eq a => a -> a -> Bool
== MonitorRef
mRefCPid)
                  (\ProcessMonitorNotification
_ -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
        ]

    -- say "seen both pids die now..."

  -- and wait on the result back in IO land...
  testRes <- takeMVar result
  -- runProcess localNode $ say "got result..."
  assertBool "Expected sending on the channel to fail, but received data!" testRes


testKillLocal :: TestTransport -> Assertion
testKillLocal :: TestTransport -> IO ()
testKillLocal TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
  localNode <- Transport -> RemoteTable -> IO LocalNode
newLocalNode Transport
testTransport RemoteTable
initRemoteTable

  pid <- forkProcess localNode $ do
    liftIO $ threadDelay 1000000

  runProcess localNode $ do
    ref <- monitor pid
    us <- getSelfPid
    kill pid "TestKill"
    mn <- expect
    case mn of
      ProcessMonitorNotification MonitorRef
ref' ProcessId
pid' (DiedException String
ex) ->
        case MonitorRef
ref MonitorRef -> MonitorRef -> Bool
forall a. Eq a => a -> a -> Bool
== MonitorRef
ref' Bool -> Bool -> Bool
&& ProcessId
pid ProcessId -> ProcessId -> Bool
forall a. Eq a => a -> a -> Bool
== ProcessId
pid' Bool -> Bool -> Bool
&& String
ex String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
"killed-by=" String -> ShowS
forall a. [a] -> [a] -> [a]
++ ProcessId -> String
forall a. Show a => a -> String
show ProcessId
us String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
",reason=TestKill" of
          Bool
True  -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
          Bool
False -> String -> Process ()
forall a b. Serializable a => a -> Process b
die String
"Invalid ProcessMonitorNotification received"
      ProcessMonitorNotification
_ -> String -> Process ()
forall a b. Serializable a => a -> Process b
die String
"Something went horribly wrong"

testKillRemote :: TestTransport -> Assertion
testKillRemote :: TestTransport -> IO ()
testKillRemote TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
  node1 <- Transport -> RemoteTable -> IO LocalNode
newLocalNode Transport
testTransport RemoteTable
initRemoteTable
  node2 <- newLocalNode testTransport initRemoteTable

  pid <- forkProcess node1 $ do
    liftIO $ threadDelay 1000000

  runProcess node2 $ do
    ref <- monitor pid
    us <- getSelfPid
    kill pid "TestKill"
    mn <- expect
    case mn of
      ProcessMonitorNotification MonitorRef
ref' ProcessId
pid' (DiedException String
reason) ->
        case (MonitorRef
ref MonitorRef -> MonitorRef -> Bool
forall a. Eq a => a -> a -> Bool
== MonitorRef
ref', ProcessId
pid ProcessId -> ProcessId -> Bool
forall a. Eq a => a -> a -> Bool
== ProcessId
pid', String
reason String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
"killed-by=" String -> ShowS
forall a. [a] -> [a] -> [a]
++ ProcessId -> String
forall a. Show a => a -> String
show ProcessId
us String -> ShowS
forall a. [a] -> [a] -> [a]
++ String
",reason=TestKill") of
          (Bool
True, Bool
True, Bool
True) -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
          (Bool
a, Bool
b, Bool
c) -> do
            let a' :: String
a' = if Bool
a then String
"" else String
"Invalid ref"
            let b' :: String
b' = if Bool
b then String
"" else String
"Invalid pid"
            let c' :: String
c' = if Bool
c then String
"" else String
"Invalid message"
            String -> Process ()
forall a b. Serializable a => a -> Process b
die (String -> Process ()) -> String -> Process ()
forall a b. (a -> b) -> a -> b
$ [String] -> String
unwords [String
a', String
b', String
c']
      ProcessMonitorNotification
_ -> String -> Process ()
forall a b. Serializable a => a -> Process b
die String
"Received unexpected message"

testCatchesExit :: TestTransport -> Assertion
testCatchesExit :: TestTransport -> IO ()
testCatchesExit TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
  localNode <- Transport -> RemoteTable -> IO LocalNode
newLocalNode Transport
testTransport RemoteTable
initRemoteTable
  done <- newEmptyMVar

  _ <- forkProcess localNode $ do
      (die ("foobar", 123 :: Int))
      `catchesExit` [
           (\ProcessId
_ Message
m -> Message -> (String -> Process ()) -> Process (Maybe ())
forall (m :: * -> *) a b.
(Monad m, Serializable a) =>
Message -> (a -> m b) -> m (Maybe b)
handleMessage Message
m (\(String
_ :: String) -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()))
         , (\ProcessId
_ Message
m -> Message -> (Maybe Int -> Process ()) -> Process (Maybe ())
forall (m :: * -> *) a b.
(Monad m, Serializable a) =>
Message -> (a -> m b) -> m (Maybe b)
handleMessage Message
m (\(Maybe Int
_ :: Maybe Int) -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()))
         , (\ProcessId
_ Message
m -> Message -> ((String, Int) -> Process ()) -> Process (Maybe ())
forall (m :: * -> *) a b.
(Monad m, Serializable a) =>
Message -> (a -> m b) -> m (Maybe b)
handleMessage Message
m (\(String
_ :: String, Int
_ :: Int)
                    -> (IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
done ()) Process () -> Process () -> Process ()
forall a b. Process a -> Process b -> Process b
forall (m :: * -> *) a b. Monad m => m a -> m b -> m b
>> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()))
         ]

  takeMVar done

testHandleMessageIf :: TestTransport -> Assertion
testHandleMessageIf :: TestTransport -> IO ()
testHandleMessageIf TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
  localNode <- Transport -> RemoteTable -> IO LocalNode
newLocalNode Transport
testTransport RemoteTable
initRemoteTable
  done <- newEmptyMVar
  _ <- forkProcess localNode $ do
    self <- getSelfPid
    send self (5 :: Integer, 10 :: Integer)
    msg <- receiveWait [ matchMessage return ]
    handleMessageIf msg
                    (\() -> Bool
True)
                    (\() -> String -> Process (ZonkAny 1)
forall a b. Serializable a => a -> Process b
die String
"whoops") >>= maybe (return ())
                                                    (const $ die "Expected Mismatch")
    handleMessageIf msg (\(Integer
x :: Integer, Integer
y :: Integer) -> Integer
x Integer -> Integer -> Bool
forall a. Eq a => a -> a -> Bool
== Integer
5 Bool -> Bool -> Bool
&& Integer
y Integer -> Integer -> Bool
forall a. Eq a => a -> a -> Bool
== Integer
10)
                        (\(Integer, Integer)
input -> IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MVar (Integer, Integer) -> (Integer, Integer) -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar (Integer, Integer)
done (Integer, Integer)
input)
    return ()

  result <- takeMVar done
  assertEqual mempty (5, 10) result

testCatches :: TestTransport -> Assertion
testCatches :: TestTransport -> IO ()
testCatches TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
  localNode <- Transport -> RemoteTable -> IO LocalNode
newLocalNode Transport
testTransport RemoteTable
initRemoteTable
  done <- newEmptyMVar

  _ <- forkProcess localNode $ do
    node <- getSelfNode
    (liftIO $ throwIO (ProcessLinkException (nullProcessId node) DiedNormal))
    `catches` [
        Handler (\(ProcessLinkException ProcessId
_ DiedReason
_) -> IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
done ())
      ]

  takeMVar done

testMaskRestoreScope :: TestTransport -> Assertion
testMaskRestoreScope :: TestTransport -> IO ()
testMaskRestoreScope TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
  localNode <- Transport -> RemoteTable -> IO LocalNode
newLocalNode Transport
testTransport RemoteTable
initRemoteTable
  parentPid <- newEmptyMVar :: IO (MVar ProcessId)
  spawnedPid <- newEmptyMVar :: IO (MVar ProcessId)

  void $ runProcess localNode $ mask $ \forall a. Process a -> Process a
unmask -> do
    Process ProcessId
getSelfPid Process ProcessId -> (ProcessId -> Process ()) -> Process ()
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ())
-> (ProcessId -> IO ()) -> ProcessId -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MVar ProcessId -> ProcessId -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ProcessId
parentPid
    Process ProcessId -> Process ()
forall (f :: * -> *) a. Functor f => f a -> f ()
void (Process ProcessId -> Process ())
-> Process ProcessId -> Process ()
forall a b. (a -> b) -> a -> b
$ Process () -> Process ProcessId
spawnLocal (Process () -> Process ProcessId)
-> Process () -> Process ProcessId
forall a b. (a -> b) -> a -> b
$ Process () -> Process ()
forall a. Process a -> Process a
unmask (Process ProcessId
getSelfPid Process ProcessId -> (ProcessId -> Process ()) -> Process ()
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ())
-> (ProcessId -> IO ()) -> ProcessId -> Process ()
forall b c a. (b -> c) -> (a -> b) -> a -> c
. MVar ProcessId -> ProcessId -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ProcessId
spawnedPid)

  parent <- liftIO $ takeMVar parentPid
  child <- liftIO $ takeMVar spawnedPid
  assertBool mempty (not $ parent == child)

testDie :: TestTransport -> Assertion
testDie :: TestTransport -> IO ()
testDie TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
  localNode <- Transport -> RemoteTable -> IO LocalNode
newLocalNode Transport
testTransport RemoteTable
initRemoteTable
  done <- newEmptyMVar

  _ <- forkProcess localNode $ do
      (die ("foobar", 123 :: Int))
      `catchExit` \ProcessId
_from (String, Int)
reason -> do
        -- TODO: should verify that 'from' has the right value
        let res :: Bool
res = (String, Int)
reason (String, Int) -> (String, Int) -> Bool
forall a. Eq a => a -> a -> Bool
== (String
"foobar", Int
123 :: Int)
        IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
done ()
        if Bool
res
          then () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
          else String -> Process ()
forall a b. Serializable a => a -> Process b
die String
"Something went horribly wrong"

  takeMVar done

testPrettyExit :: TestTransport -> Assertion
testPrettyExit :: TestTransport -> IO ()
testPrettyExit TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
  localNode <- Transport -> RemoteTable -> IO LocalNode
newLocalNode Transport
testTransport RemoteTable
initRemoteTable
  done <- newEmptyMVar

  _ <- forkProcess localNode $ do
      (die "timeout")
      `catch` \ex :: ProcessExitException
ex@(ProcessExitException ProcessId
from Message
_) ->
        let expected :: String
expected = String
"exit-from=" String -> ShowS
forall a. [a] -> [a] -> [a]
++ (ProcessId -> String
forall a. Show a => a -> String
show ProcessId
from)
        in do
          let res :: Bool
res = (ProcessExitException -> String
forall a. Show a => a -> String
show ProcessExitException
ex) String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
expected
          IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
done ()
          if Bool
res
            then () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
            else String -> Process ()
forall a b. Serializable a => a -> Process b
die String
"Something went horribly wrong"

  takeMVar done

testExitLocal :: TestTransport -> Assertion
testExitLocal :: TestTransport -> IO ()
testExitLocal TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
  localNode <- Transport -> RemoteTable -> IO LocalNode
newLocalNode Transport
testTransport RemoteTable
initRemoteTable
  supervisedDone <- newEmptyMVar
  supervisorDone <- newEmptyMVar
  -- XXX: we guarantee that exception handler will be set up
  -- regardless if forkProcess preserve masking state or not.
  handlerSetUp <- newEmptyMVar

  pid <- forkProcess localNode $ do
    (liftIO (putMVar handlerSetUp ()) >> expect) `catchExit` \ProcessId
_from String
reason -> do
        -- TODO: should verify that 'from' has the right value
        res <- Bool -> Process Bool
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> Process Bool) -> Bool -> Process Bool
forall a b. (a -> b) -> a -> b
$ String
reason String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
"TestExit"
        liftIO $ putMVar supervisedDone ()
        if res
          then return ()
          else die "Something went horribly wrong"

  runProcess localNode $ do
    liftIO $ takeMVar handlerSetUp
    ref <- monitor pid
    exit pid "TestExit"
    -- This time the client catches the exception, so it dies normally
    mn <- expect
    case mn of
      ProcessMonitorNotification MonitorRef
ref' ProcessId
pid' DiedReason
DiedNormal -> do
        let res :: Bool
res = MonitorRef
ref MonitorRef -> MonitorRef -> Bool
forall a. Eq a => a -> a -> Bool
== MonitorRef
ref' Bool -> Bool -> Bool
&& ProcessId
pid ProcessId -> ProcessId -> Bool
forall a. Eq a => a -> a -> Bool
== ProcessId
pid'
        IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ MVar () -> () -> IO ()
forall a. MVar a -> a -> IO ()
putMVar MVar ()
supervisorDone ()
        if Bool
res
          then () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
          else String -> Process ()
forall a b. Serializable a => a -> Process b
die String
"Something went horribly wrong"
      ProcessMonitorNotification
_ -> String -> Process ()
forall a b. Serializable a => a -> Process b
die String
"Something went horribly wrong"

  takeMVar supervisedDone
  takeMVar supervisorDone

testExitRemote :: TestTransport -> Assertion
testExitRemote :: TestTransport -> IO ()
testExitRemote TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
  node1 <- Transport -> RemoteTable -> IO LocalNode
newLocalNode Transport
testTransport RemoteTable
initRemoteTable
  node2 <- newLocalNode testTransport initRemoteTable
  supervisedDone <- newEmptyMVar
  supervisorDone <- newEmptyMVar

  pid <- forkProcess node1 $ do
    (receiveWait [] :: Process ()) -- block forever
      `catchExit` \ProcessId
_from String
reason -> do
        -- TODO: should verify that 'from' has the right value
        res <- Bool -> Process Bool
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> Process Bool) -> Bool -> Process Bool
forall a b. (a -> b) -> a -> b
$ String
reason String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
"TestExit"
        liftIO $ putMVar supervisedDone ()
        if res
          then return ()
          else die "Something went horribly wrong"

  runProcess node2 $ do
    ref <- monitor pid
    exit pid "TestExit"
    mn <- expect
    case mn of
      ProcessMonitorNotification MonitorRef
ref' ProcessId
pid' DiedReason
DiedNormal -> do
        res' <- Bool -> Process Bool
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Bool -> Process Bool) -> Bool -> Process Bool
forall a b. (a -> b) -> a -> b
$ MonitorRef
ref MonitorRef -> MonitorRef -> Bool
forall a. Eq a => a -> a -> Bool
== MonitorRef
ref' Bool -> Bool -> Bool
&& ProcessId
pid ProcessId -> ProcessId -> Bool
forall a. Eq a => a -> a -> Bool
== ProcessId
pid'
        liftIO $ putMVar supervisorDone ()
        if res'
          then return ()
          else die "Something went horribly wrong"
      ProcessMonitorNotification
_ -> String -> Process ()
forall a b. Serializable a => a -> Process b
die String
"Something went horribly wrong"

  takeMVar supervisedDone
  takeMVar supervisorDone

testRegistryMonitoring :: TestTransport -> Assertion
testRegistryMonitoring :: TestTransport -> IO ()
testRegistryMonitoring TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
  node1 <- Transport -> RemoteTable -> IO LocalNode
newLocalNode Transport
testTransport RemoteTable
initRemoteTable
  node2 <- newLocalNode testTransport initRemoteTable

  let nid = LocalNode -> NodeId
localNodeId LocalNode
node2
  pid <- forkProcess node1 $ do
    self <- getSelfPid
    runUntilRegistered nid self
    say $ (show self) ++ " registered as " ++ regName
    expect :: Process ()
    say $ (show self) ++ " exiting normally"

  runProcess node2 $ do
    register regName pid
    say $ regName ++ " registered to " ++ show pid
    res <- whereis regName
    send pid ()
    say $ " sent finish signal to " ++ show pid
    _ <- getSelfPid
    liftIO $ assertBool "expected (Just pid)" $ res == (Just pid)


    -- This delay isn't essential!
    -- The test case passes perfectly fine without it (feel free to comment out
    -- and see), however waiting a few seconds here, makes it much more likely
    -- that in delayUntilMaybeUnregistered we will hit the match case right
    -- away, and thus not be subjected to a 20 second delay. The value of 4
    -- seconds appears to work optimally on osx and across several linux distros
    -- running in virtual machines (which is essentially what we do in CI)
    void $ receiveTimeout 4000000 [ matchAny return ]

  -- This delay doesn't serve much purpose in the happy path, however if some
  -- future patch breaks the cooperative behaviour of node controllers viz
  -- remote process registration and notification taking place via ncEffectDied,
  -- there would be the possibility of a race in the test case should we attempt
  -- to evaluate `whereis regName` on node2 right away. In case the name is still
  -- erroneously registered, observing the 20 second delay (or lack of), could at
  -- least give a hint that something is wrong, and we give up our time slice
  -- so that there's a higher change the registrations have been cleaned up
  -- in either case.
  runProcess node2 $ delayUntilMaybeUnregistered nid pid

  regHere <- newEmptyMVar
  runProcess node2 $ whereis regName >>= liftIO . putMVar regHere
  res <- takeMVar regHere
  case res of
    Maybe ProcessId
Nothing  -> () -> IO ()
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
    Maybe ProcessId
_        -> HasCallStack => String -> Bool -> IO ()
String -> Bool -> IO ()
assertBool (String
"expected Nothing, but got " String -> ShowS
forall a. [a] -> [a] -> [a]
++ ProcessId -> String
forall a. Show a => a -> String
show ProcessId
pid) Bool
False

  where
    runUntilRegistered :: NodeId -> ProcessId -> Process ()
runUntilRegistered NodeId
nid ProcessId
us = do
      NodeId -> String -> Process ()
whereisRemoteAsync NodeId
nid String
regName
      [Match ()] -> Process ()
forall b. [Match b] -> Process b
receiveWait [
          (WhereIsReply -> Bool) -> (WhereIsReply -> Process ()) -> Match ()
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(WhereIsReply String
n (Just ProcessId
p)) -> String
n String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
regName Bool -> Bool -> Bool
&& ProcessId
p ProcessId -> ProcessId -> Bool
forall a. Eq a => a -> a -> Bool
== ProcessId
us)
                  (Process () -> WhereIsReply -> Process ()
forall a b. a -> b -> a
const (Process () -> WhereIsReply -> Process ())
-> Process () -> WhereIsReply -> Process ()
forall a b. (a -> b) -> a -> b
$ () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
        ]

    delayUntilMaybeUnregistered :: NodeId -> t -> Process ()
delayUntilMaybeUnregistered NodeId
nid t
p = do
      NodeId -> String -> Process ()
whereisRemoteAsync NodeId
nid String
regName
      res <- Int -> [Match ()] -> Process (Maybe ())
forall b. Int -> [Match b] -> Process (Maybe b)
receiveTimeout Int
20000000 {- 20 sec delay -} [
          (WhereIsReply -> Bool) -> (WhereIsReply -> Process ()) -> Match ()
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(WhereIsReply String
n Maybe ProcessId
p') -> String
n String -> String -> Bool
forall a. Eq a => a -> a -> Bool
== String
regName Bool -> Bool -> Bool
&& Maybe ProcessId -> Bool
forall a. Maybe a -> Bool
isNothing Maybe ProcessId
p')
                  (Process () -> WhereIsReply -> Process ()
forall a b. a -> b -> a
const (Process () -> WhereIsReply -> Process ())
-> Process () -> WhereIsReply -> Process ()
forall a b. (a -> b) -> a -> b
$ () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ())
        ]
      case res of
        Just () -> () -> Process ()
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ()
        Maybe ()
Nothing -> NodeId -> t -> Process ()
delayUntilMaybeUnregistered NodeId
nid t
p

    regName :: String
regName = String
"testRegisterRemote"

testUnsafeSend :: TestTransport -> Assertion
testUnsafeSend :: TestTransport -> IO ()
testUnsafeSend TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
  serverAddr <- IO (MVar ProcessId)
forall a. IO (MVar a)
newEmptyMVar
  clientDone <- newEmptyMVar

  localNode <- newLocalNode testTransport initRemoteTable
  void $ forkProcess localNode $ do
    self <- getSelfPid
    liftIO $ putMVar serverAddr self
    clientAddr <- expect
    unsafeSend clientAddr ()

  void $ forkProcess localNode $ do
    serverPid <- liftIO $ takeMVar serverAddr
    getSelfPid >>= unsafeSend serverPid
    expect >>= liftIO . putMVar clientDone

  takeMVar clientDone

testUnsafeUSend :: TestTransport -> Assertion
testUnsafeUSend :: TestTransport -> IO ()
testUnsafeUSend TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
  serverAddr <- IO (MVar ProcessId)
forall a. IO (MVar a)
newEmptyMVar
  clientDone <- newEmptyMVar

  localNode <- newLocalNode testTransport initRemoteTable
  void $ forkProcess localNode $ do
    self <- getSelfPid
    liftIO $ putMVar serverAddr self
    clientAddr <- expect
    unsafeUSend clientAddr ()

  void $ forkProcess localNode $ do
    serverPid <- liftIO $ takeMVar serverAddr
    getSelfPid >>= unsafeUSend serverPid
    expect >>= liftIO . putMVar clientDone

  takeMVar clientDone

testUnsafeNSend :: TestTransport -> Assertion
testUnsafeNSend :: TestTransport -> IO ()
testUnsafeNSend TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
  clientDone <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar

  localNode <- newLocalNode testTransport initRemoteTable

  pid <- forkProcess localNode $ do
    expect >>= liftIO . putMVar clientDone

  void $ runProcess localNode $ do
    register "foobar" pid
    unsafeNSend "foobar" ()

  takeMVar clientDone

testUnsafeNSendRemote :: TestTransport -> Assertion
testUnsafeNSendRemote :: TestTransport -> IO ()
testUnsafeNSendRemote TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
  clientDone <- IO (MVar ())
forall a. IO (MVar a)
newEmptyMVar

  localNode1 <- newLocalNode testTransport initRemoteTable
  localNode2 <- newLocalNode testTransport initRemoteTable

  _ <- forkProcess localNode1 $ do
    getSelfPid >>= register "foobar"
    liftIO $ putMVar clientDone ()
    expect >>= liftIO . putMVar clientDone

  takeMVar clientDone
  void $ runProcess localNode2 $ do
    unsafeNSendRemote (localNodeId localNode1) "foobar" ()

  takeMVar clientDone

testUnsafeSendChan :: TestTransport -> Assertion
testUnsafeSendChan :: TestTransport -> IO ()
testUnsafeSendChan TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
  serverAddr <- IO (MVar ProcessId)
forall a. IO (MVar a)
newEmptyMVar
  clientDone <- newEmptyMVar

  localNode <- newLocalNode testTransport initRemoteTable
  void $ forkProcess localNode $ do
    self <- getSelfPid
    liftIO $ putMVar serverAddr self
    sp <- expect
    unsafeSendChan sp ()

  void $ forkProcess localNode $ do
    serverPid <- liftIO $ takeMVar serverAddr
    (sp, rp) <- newChan
    unsafeSend serverPid sp
    receiveChan rp :: Process ()
    liftIO $ putMVar clientDone ()

  takeMVar clientDone

testCallLocal :: TestTransport -> Assertion
testCallLocal :: TestTransport -> IO ()
testCallLocal TestTransport{Transport
EndPointAddress -> EndPointAddress -> IO ()
testBreakConnection :: TestTransport -> EndPointAddress -> EndPointAddress -> IO ()
testTransport :: TestTransport -> Transport
testTransport :: Transport
testBreakConnection :: EndPointAddress -> EndPointAddress -> IO ()
..} = do
  node <- Transport -> RemoteTable -> IO LocalNode
newLocalNode Transport
testTransport RemoteTable
initRemoteTable

  -- Testing that (/=) <$> getSelfPid <*> callLocal getSelfPid.
  result <- newEmptyMVar
  runProcess node $ do
    r <- (/=) <$> getSelfPid <*> callLocal getSelfPid
    liftIO $ putMVar result r

  takeMVar result >>= assertBool "Expected 'True'"

  -- Testing that when callLocal is interrupted, the worker is interrupted.
  ibox <- newIORef False
  runProcess node $ do
    keeper <- getSelfPid
    spawnLocal $ do
        caller <- getSelfPid
        send keeper caller
        onException
          (callLocal $ do
                onException (do send keeper caller
                                expect)
                            (do liftIO $ writeIORef ibox True))
          (send keeper ())
    caller <- expect
    exit caller "test"
    expect :: Process ()

  readIORef ibox >>= assertBool "Expected 'True'"

  -- Testing that when the worker raises an exception, the exception is propagated to the parent.
  ibox2 <- newIORef False
  runProcess node $ do
    r <- try (callLocal $ error "e" >> return ())
    liftIO $ writeIORef ibox2 $ case r of
      Left (ErrorCall String
"e") -> Bool
True
      Either ErrorCall ()
_ -> Bool
False

  readIORef ibox >>= assertBool "Expected 'True'"

  -- Test that caller waits for the worker in correct situation
  ibox3 <- newIORef False
  result3 <- newEmptyMVar
  runProcess node $ do
    keeper <- getSelfPid
    spawnLocal $ do
        callLocal $
            (do us <- getSelfPid
                send keeper us
                () <- expect
                liftIO yield)
            `finally` (liftIO $ writeIORef ibox3 True)
        liftIO $ putMVar result3 =<< readIORef ibox3
    worker <- expect
    send worker ()

  takeMVar result3 >>= assertBool "Expected 'True'"

  -- Test that caller waits for the worker in case when caller gets an exception
  ibox4 <- newIORef False
  result4 <- newEmptyMVar
  runProcess node $ do
    keeper <- getSelfPid
    spawnLocal $ do
        caller <- getSelfPid
        callLocal
            ((do send keeper caller
                 expect)
               `finally` (liftIO $ writeIORef ibox4 True))
            `finally` (liftIO $ putMVar result4 =<< readIORef ibox4)
    caller <- expect
    exit caller "hi!"

  takeMVar result4 >>= assertBool "Expected 'True'"
  -- XXX: Testing that when mask_ $ callLocal p runs p in masked state.

tests :: TestTransport -> IO TestTree
tests :: TestTransport -> IO TestTree
tests TestTransport
testtrans = TestTree -> IO TestTree
forall a. a -> IO a
forall (m :: * -> *) a. Monad m => a -> m a
return (TestTree -> IO TestTree) -> TestTree -> IO TestTree
forall a b. (a -> b) -> a -> b
$ String -> [TestTree] -> TestTree
testGroup String
"CH" [
     String -> [TestTree] -> TestTree
testGroup String
"Basic features" [
        String -> IO () -> TestTree
testCase String
"Ping"                (TestTransport -> IO ()
testPing                TestTransport
testtrans)
      , String -> IO () -> TestTree
testCase String
"Math"                (TestTransport -> IO ()
testMath                TestTransport
testtrans)
      , String -> IO () -> TestTree
testCase String
"Timeout"             (TestTransport -> IO ()
testTimeout             TestTransport
testtrans)
      , String -> IO () -> TestTree
testCase String
"Timeout0"            (TestTransport -> IO ()
testTimeout0            TestTransport
testtrans)
      , String -> IO () -> TestTree
testCase String
"SendToTerminated"    (TestTransport -> IO ()
testSendToTerminated    TestTransport
testtrans)
      , String -> IO () -> TestTree
testCase String
"TypedChannnels"      (TestTransport -> IO ()
testTypedChannels       TestTransport
testtrans)
      , String -> IO () -> TestTree
testCase String
"Terminate"           (TestTransport -> IO ()
testTerminate           TestTransport
testtrans)
      , String -> IO () -> TestTree
testCase String
"RegistryRemoteProcess" (TestTransport -> IO ()
testRegistryRemoteProcess      TestTransport
testtrans)
      , String -> IO () -> TestTree
testCase String
"RemoteRegistry"      (TestTransport -> IO ()
testRemoteRegistry      TestTransport
testtrans)
      , String -> IO () -> TestTree
testCase String
"RemoteRegistryRemoteProcess" (TestTransport -> IO ()
testRemoteRegistryRemoteProcess      TestTransport
testtrans)
      , String -> IO () -> TestTree
testCase String
"SpawnLocal"          (TestTransport -> IO ()
testSpawnLocal          TestTransport
testtrans)
      , String -> IO () -> TestTree
testCase String
"SpawnAsyncStrictness" (TestTransport -> IO ()
testSpawnAsyncStrictness TestTransport
testtrans)
      , String -> IO () -> TestTree
testCase String
"HandleMessageIf"     (TestTransport -> IO ()
testHandleMessageIf     TestTransport
testtrans)
      , String -> IO () -> TestTree
testCase String
"MatchAny"            (TestTransport -> IO ()
testMatchAny            TestTransport
testtrans)
      , String -> IO () -> TestTree
testCase String
"MatchAnyHandle"      (TestTransport -> IO ()
testMatchAnyHandle      TestTransport
testtrans)
      , String -> IO () -> TestTree
testCase String
"MatchAnyNoHandle"    (TestTransport -> IO ()
testMatchAnyNoHandle    TestTransport
testtrans)
      , String -> IO () -> TestTree
testCase String
"MatchAnyIf"          (TestTransport -> IO ()
testMatchAnyIf          TestTransport
testtrans)
      , String -> IO () -> TestTree
testCase String
"MatchMessageUnwrap"  (TestTransport -> IO ()
testMatchMessageWithUnwrap TestTransport
testtrans)
      , String -> IO () -> TestTree
testCase String
"ReceiveChanTimeout"  (TestTransport -> IO ()
testReceiveChanTimeout  TestTransport
testtrans)
      , String -> IO () -> TestTree
testCase String
"ReceiveChanFeatures" (TestTransport -> IO ()
testReceiveChanFeatures TestTransport
testtrans)
      , String -> IO () -> TestTree
testCase String
"ChanLifecycle"       (TestTransport -> IO ()
testChanLifecycle       TestTransport
testtrans)
      , String -> IO () -> TestTree
testCase String
"KillLocal"           (TestTransport -> IO ()
testKillLocal           TestTransport
testtrans)
      , String -> IO () -> TestTree
testCase String
"KillRemote"          (TestTransport -> IO ()
testKillRemote          TestTransport
testtrans)
      , String -> IO () -> TestTree
testCase String
"Die"                 (TestTransport -> IO ()
testDie                 TestTransport
testtrans)
      , String -> IO () -> TestTree
testCase String
"PrettyExit"          (TestTransport -> IO ()
testPrettyExit          TestTransport
testtrans)
      , String -> IO () -> TestTree
testCase String
"CatchesExit"         (TestTransport -> IO ()
testCatchesExit         TestTransport
testtrans)
      , String -> IO () -> TestTree
testCase String
"Catches"             (TestTransport -> IO ()
testCatches             TestTransport
testtrans)
      , String -> IO () -> TestTree
testCase String
"MaskRestoreScope"    (TestTransport -> IO ()
testMaskRestoreScope    TestTransport
testtrans)
      , String -> IO () -> TestTree
testCase String
"ExitLocal"           (TestTransport -> IO ()
testExitLocal           TestTransport
testtrans)
      , String -> IO () -> TestTree
testCase String
"ExitRemote"          (TestTransport -> IO ()
testExitRemote          TestTransport
testtrans)
      , String -> IO () -> TestTree
testCase String
"RegistryMonitoring"  (TestTransport -> IO ()
testRegistryMonitoring  TestTransport
testtrans)
      , String -> IO () -> TestTree
testCase String
"TextCallLocal"       (TestTransport -> IO ()
testCallLocal           TestTransport
testtrans)
      -- Unsafe Primitives
      , String -> IO () -> TestTree
testCase String
"TestUnsafeSend"      (TestTransport -> IO ()
testUnsafeSend          TestTransport
testtrans)
      , String -> IO () -> TestTree
testCase String
"TestUnsafeUSend"     (TestTransport -> IO ()
testUnsafeUSend         TestTransport
testtrans)
      , String -> IO () -> TestTree
testCase String
"TestUnsafeNSend"     (TestTransport -> IO ()
testUnsafeNSend         TestTransport
testtrans)
      , String -> IO () -> TestTree
testCase String
"TestUnsafeNSendRemote" (TestTransport -> IO ()
testUnsafeNSendRemote TestTransport
testtrans)
      , String -> IO () -> TestTree
testCase String
"TestUnsafeSendChan"  (TestTransport -> IO ()
testUnsafeSendChan      TestTransport
testtrans)
      -- usend
      , String -> IO () -> TestTree
testCase String
"USend"               ((ProcessId -> Int -> Process ()) -> TestTransport -> Int -> IO ()
testUSend ProcessId -> Int -> Process ()
forall a. Serializable a => ProcessId -> a -> Process ()
usend         TestTransport
testtrans Int
50)
      , String -> IO () -> TestTree
testCase String
"UForward"
                 ((ProcessId -> Int -> Process ()) -> TestTransport -> Int -> IO ()
testUSend (\ProcessId
p Int
m -> Message -> ProcessId -> Process ()
uforward (Int -> Message
forall a. Serializable a => a -> Message
createUnencodedMessage Int
m) ProcessId
p)
                            TestTransport
testtrans Int
50
                 )
      ]
    , String -> [TestTree] -> TestTree
testGroup String
"Monitoring and Linking" [
      -- Monitoring processes
      --
      -- The "missing" combinations in the list below don't make much sense, as
      -- we cannot guarantee that the monitor reply or link exception will not
      -- happen before the unmonitor or unlink
      String -> IO () -> TestTree
testCase String
"MonitorNormalTermination"     (TestTransport -> Bool -> Bool -> IO ()
testMonitorNormalTermination   TestTransport
testtrans Bool
True  Bool
False)
    , String -> IO () -> TestTree
testCase String
"MonitorAbnormalTermination"   (TestTransport -> Bool -> Bool -> IO ()
testMonitorAbnormalTermination TestTransport
testtrans Bool
True  Bool
False)
    , String -> IO () -> TestTree
testCase String
"MonitorLocalDeadProcess"      (TestTransport -> Bool -> Bool -> IO ()
testMonitorLocalDeadProcess    TestTransport
testtrans Bool
True  Bool
False)
    , String -> IO () -> TestTree
testCase String
"MonitorRemoteDeadProcess"     (TestTransport -> Bool -> Bool -> IO ()
testMonitorRemoteDeadProcess   TestTransport
testtrans Bool
True  Bool
False)
    , String -> IO () -> TestTree
testCase String
"MonitorDisconnect"            (TestTransport -> Bool -> Bool -> IO ()
testMonitorDisconnect          TestTransport
testtrans Bool
True  Bool
False)
    , String -> IO () -> TestTree
testCase String
"LinkUnreachable"              (TestTransport -> Bool -> Bool -> IO ()
testMonitorUnreachable         TestTransport
testtrans Bool
False Bool
False)
    , String -> IO () -> TestTree
testCase String
"LinkNormalTermination"        (TestTransport -> Bool -> Bool -> IO ()
testMonitorNormalTermination   TestTransport
testtrans Bool
False Bool
False)
    , String -> IO () -> TestTree
testCase String
"LinkAbnormalTermination"      (TestTransport -> Bool -> Bool -> IO ()
testMonitorAbnormalTermination TestTransport
testtrans Bool
False Bool
False)
    , String -> IO () -> TestTree
testCase String
"LinkLocalDeadProcess"         (TestTransport -> Bool -> Bool -> IO ()
testMonitorLocalDeadProcess    TestTransport
testtrans Bool
False Bool
False)
    , String -> IO () -> TestTree
testCase String
"LinkRemoteDeadProcess"        (TestTransport -> Bool -> Bool -> IO ()
testMonitorRemoteDeadProcess   TestTransport
testtrans Bool
False Bool
False)
    , String -> IO () -> TestTree
testCase String
"LinkDisconnect"               (TestTransport -> Bool -> Bool -> IO ()
testMonitorDisconnect          TestTransport
testtrans Bool
False Bool
False)
    , String -> IO () -> TestTree
testCase String
"UnmonitorNormalTermination"   (TestTransport -> Bool -> Bool -> IO ()
testMonitorNormalTermination   TestTransport
testtrans Bool
True  Bool
True)
    , String -> IO () -> TestTree
testCase String
"UnmonitorAbnormalTermination" (TestTransport -> Bool -> Bool -> IO ()
testMonitorAbnormalTermination TestTransport
testtrans Bool
True  Bool
True)
    , String -> IO () -> TestTree
testCase String
"UnmonitorDisconnect"          (TestTransport -> Bool -> Bool -> IO ()
testMonitorDisconnect          TestTransport
testtrans Bool
True  Bool
True)
    , String -> IO () -> TestTree
testCase String
"UnlinkNormalTermination"      (TestTransport -> Bool -> Bool -> IO ()
testMonitorNormalTermination   TestTransport
testtrans Bool
False Bool
True)
    , String -> IO () -> TestTree
testCase String
"UnlinkAbnormalTermination"    (TestTransport -> Bool -> Bool -> IO ()
testMonitorAbnormalTermination TestTransport
testtrans Bool
False Bool
True)
    , String -> IO () -> TestTree
testCase String
"UnlinkDisconnect"             (TestTransport -> Bool -> Bool -> IO ()
testMonitorDisconnect          TestTransport
testtrans Bool
False Bool
True)
      -- Monitoring nodes and channels
    , String -> IO () -> TestTree
testCase String
"MonitorNode"                  (TestTransport -> IO ()
testMonitorNode                TestTransport
testtrans)
    , String -> IO () -> TestTree
testCase String
"MonitorLiveNode"              (TestTransport -> IO ()
testMonitorLiveNode            TestTransport
testtrans)
    , String -> IO () -> TestTree
testCase String
"MonitorChannel"               (TestTransport -> IO ()
testMonitorChannel             TestTransport
testtrans)
      -- Reconnect
    ]

      -- Tests that fail occasionally and should be revised
    -- , testGroup "Flaky" [
    --   testCase "Reconnect"          (testReconnect           testtrans)
    -- , testCase "Registry"           (testRegistry            testtrans)
    -- , testCase "MergeChannels"      (testMergeChannels       testtrans)
    -- , testCase "MonitorUnreachable" (testMonitorUnreachable testtrans True False)
    -- ]
  ]