{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Control.Distributed.Process.ManagedProcess.Client
(
sendControlMessage
, shutdown
, call
, safeCall
, tryCall
, callTimeout
, flushPendingCalls
, callAsync
, cast
, callChan
, syncCallChan
, syncSafeCallChan
, callSTM
) where
import Control.Concurrent.STM (atomically, STM)
import Control.Distributed.Process hiding (call, finally)
import Control.Distributed.Process.Serializable
import Control.Distributed.Process.Async hiding (check)
import Control.Distributed.Process.ManagedProcess.Internal.Types hiding (liftIO)
import qualified Control.Distributed.Process.ManagedProcess.Internal.Types as T
import Control.Distributed.Process.Extras.Internal.Types (resolveOrDie)
import Control.Distributed.Process.Extras hiding (monitor, sendChan)
import Control.Distributed.Process.Extras.Time
import Control.Monad.Catch (finally)
import Data.Maybe (fromJust)
import Prelude hiding (init)
sendControlMessage :: Serializable m => ControlPort m -> m -> Process ()
sendControlMessage :: forall m. Serializable m => ControlPort m -> m -> Process ()
sendControlMessage ControlPort m
cp m
m = SendPort (Message m ()) -> Message m () -> Process ()
forall a. Serializable a => SendPort a -> a -> Process ()
sendChan (ControlPort m -> SendPort (Message m ())
forall m. ControlPort m -> SendPort (Message m ())
unPort ControlPort m
cp) (m -> Message m ()
forall a b. a -> Message a b
CastMessage m
m)
shutdown :: ProcessId -> Process ()
shutdown :: ProcessId -> Process ()
shutdown ProcessId
pid = ProcessId -> Shutdown -> Process ()
forall a m. (Addressable a, Serializable m) => a -> m -> Process ()
cast ProcessId
pid Shutdown
Shutdown
call :: forall s a b . (Addressable s, Serializable a, Serializable b)
=> s -> a -> Process b
call :: forall s a b.
(Addressable s, Serializable a, Serializable b) =>
s -> a -> Process b
call s
sid a
msg = s -> a -> Process (CallRef b)
forall s a b.
(Addressable s, Serializable a, Serializable b) =>
s -> a -> Process (CallRef b)
initCall s
sid a
msg Process (CallRef b)
-> (CallRef b -> Process (Maybe (Either ExitReason b)))
-> Process (Maybe (Either ExitReason b))
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Maybe TimeInterval
-> CallRef b -> Process (Maybe (Either ExitReason b))
forall b.
Serializable b =>
Maybe TimeInterval
-> CallRef b -> Process (Maybe (Either ExitReason b))
waitResponse Maybe TimeInterval
forall a. Maybe a
Nothing Process (Maybe (Either ExitReason b))
-> (Maybe (Either ExitReason b) -> Process b) -> Process b
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Maybe (Either ExitReason b) -> Process b
forall {a} {a}.
(Binary a, Typeable a) =>
Maybe (Either a a) -> Process a
decodeResult
where decodeResult :: Maybe (Either a a) -> Process a
decodeResult (Just (Right a
r)) = a -> Process a
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return a
r
decodeResult (Just (Left a
err)) = a -> Process a
forall a b. Serializable a => a -> Process b
die a
err
decodeResult Maybe (Either a a)
Nothing = Process a
forall a. Process a
terminate
safeCall :: forall s a b . (Addressable s, Serializable a, Serializable b)
=> s -> a -> Process (Either ExitReason b)
safeCall :: forall s a b.
(Addressable s, Serializable a, Serializable b) =>
s -> a -> Process (Either ExitReason b)
safeCall s
s a
m = do
ProcessId
us <- Process ProcessId
getSelfPid
((Maybe (Either ExitReason b) -> Either ExitReason b)
-> Process (Maybe (Either ExitReason b))
-> Process (Either ExitReason b)
forall a b. (a -> b) -> Process a -> Process b
forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b
fmap Maybe (Either ExitReason b) -> Either ExitReason b
forall a. HasCallStack => Maybe a -> a
fromJust (s -> a -> Process (CallRef b)
forall s a b.
(Addressable s, Serializable a, Serializable b) =>
s -> a -> Process (CallRef b)
initCall s
s a
m Process (CallRef b)
-> (CallRef b -> Process (Maybe (Either ExitReason b)))
-> Process (Maybe (Either ExitReason b))
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Maybe TimeInterval
-> CallRef b -> Process (Maybe (Either ExitReason b))
forall b.
Serializable b =>
Maybe TimeInterval
-> CallRef b -> Process (Maybe (Either ExitReason b))
waitResponse Maybe TimeInterval
forall a. Maybe a
Nothing) :: Process (Either ExitReason b))
Process (Either ExitReason b)
-> [ProcessId -> Message -> Process (Maybe (Either ExitReason b))]
-> Process (Either ExitReason b)
forall b.
Process b
-> [ProcessId -> Message -> Process (Maybe b)] -> Process b
`catchesExit` [(\ProcessId
pid Message
msg -> Message
-> (ExitReason -> Bool)
-> (ExitReason -> Process (Either ExitReason b))
-> Process (Maybe (Either ExitReason b))
forall (m :: * -> *) a b.
(Monad m, Serializable a) =>
Message -> (a -> Bool) -> (a -> m b) -> m (Maybe b)
handleMessageIf Message
msg (ProcessId -> ProcessId -> ExitReason -> Bool
forall {a}. Eq a => a -> a -> ExitReason -> Bool
weFailed ProcessId
pid ProcessId
us)
(Either ExitReason b -> Process (Either ExitReason b)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either ExitReason b -> Process (Either ExitReason b))
-> (ExitReason -> Either ExitReason b)
-> ExitReason
-> Process (Either ExitReason b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. ExitReason -> Either ExitReason b
forall a b. a -> Either a b
Left))]
where
weFailed :: a -> a -> ExitReason -> Bool
weFailed a
a a
b (ExitOther String
_) = a
a a -> a -> Bool
forall a. Eq a => a -> a -> Bool
== a
b
weFailed a
_ a
_ ExitReason
_ = Bool
False
tryCall :: forall s a b . (Addressable s, Serializable a, Serializable b)
=> s -> a -> Process (Maybe b)
tryCall :: forall s a b.
(Addressable s, Serializable a, Serializable b) =>
s -> a -> Process (Maybe b)
tryCall s
s a
m = s -> a -> Process (CallRef b)
forall s a b.
(Addressable s, Serializable a, Serializable b) =>
s -> a -> Process (CallRef b)
initCall s
s a
m Process (CallRef b)
-> (CallRef b -> Process (Maybe (Either ExitReason b)))
-> Process (Maybe (Either ExitReason b))
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Maybe TimeInterval
-> CallRef b -> Process (Maybe (Either ExitReason b))
forall b.
Serializable b =>
Maybe TimeInterval
-> CallRef b -> Process (Maybe (Either ExitReason b))
waitResponse Maybe TimeInterval
forall a. Maybe a
Nothing Process (Maybe (Either ExitReason b))
-> (Maybe (Either ExitReason b) -> Process (Maybe b))
-> Process (Maybe b)
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Maybe (Either ExitReason b) -> Process (Maybe b)
forall {m :: * -> *} {a} {a}.
Monad m =>
Maybe (Either a a) -> m (Maybe a)
decodeResult
where decodeResult :: Maybe (Either a a) -> m (Maybe a)
decodeResult (Just (Right a
r)) = Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe a -> m (Maybe a)) -> Maybe a -> m (Maybe a)
forall a b. (a -> b) -> a -> b
$ a -> Maybe a
forall a. a -> Maybe a
Just a
r
decodeResult Maybe (Either a a)
_ = Maybe a -> m (Maybe a)
forall a. a -> m a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe a
forall a. Maybe a
Nothing
callTimeout :: forall s a b . (Addressable s, Serializable a, Serializable b)
=> s -> a -> TimeInterval -> Process (Maybe b)
callTimeout :: forall s a b.
(Addressable s, Serializable a, Serializable b) =>
s -> a -> TimeInterval -> Process (Maybe b)
callTimeout s
s a
m TimeInterval
d = s -> a -> Process (CallRef b)
forall s a b.
(Addressable s, Serializable a, Serializable b) =>
s -> a -> Process (CallRef b)
initCall s
s a
m Process (CallRef b)
-> (CallRef b -> Process (Maybe (Either ExitReason b)))
-> Process (Maybe (Either ExitReason b))
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Maybe TimeInterval
-> CallRef b -> Process (Maybe (Either ExitReason b))
forall b.
Serializable b =>
Maybe TimeInterval
-> CallRef b -> Process (Maybe (Either ExitReason b))
waitResponse (TimeInterval -> Maybe TimeInterval
forall a. a -> Maybe a
Just TimeInterval
d) Process (Maybe (Either ExitReason b))
-> (Maybe (Either ExitReason b) -> Process (Maybe b))
-> Process (Maybe b)
forall a b. Process a -> (a -> Process b) -> Process b
forall (m :: * -> *) a b. Monad m => m a -> (a -> m b) -> m b
>>= Maybe (Either ExitReason b) -> Process (Maybe b)
Serializable b => Maybe (Either ExitReason b) -> Process (Maybe b)
decodeResult
where decodeResult :: (Serializable b)
=> Maybe (Either ExitReason b)
-> Process (Maybe b)
decodeResult :: Serializable b => Maybe (Either ExitReason b) -> Process (Maybe b)
decodeResult Maybe (Either ExitReason b)
Nothing = Maybe b -> Process (Maybe b)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return Maybe b
forall a. Maybe a
Nothing
decodeResult (Just (Right b
result)) = Maybe b -> Process (Maybe b)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Maybe b -> Process (Maybe b)) -> Maybe b -> Process (Maybe b)
forall a b. (a -> b) -> a -> b
$ b -> Maybe b
forall a. a -> Maybe a
Just b
result
decodeResult (Just (Left ExitReason
reason)) = ExitReason -> Process (Maybe b)
forall a b. Serializable a => a -> Process b
die ExitReason
reason
flushPendingCalls :: forall b . (Serializable b)
=> TimeInterval
-> (b -> Process b)
-> Process (Maybe b)
flushPendingCalls :: forall b.
Serializable b =>
TimeInterval -> (b -> Process b) -> Process (Maybe b)
flushPendingCalls TimeInterval
d b -> Process b
proc =
Int -> [Match b] -> Process (Maybe b)
forall b. Int -> [Match b] -> Process (Maybe b)
receiveTimeout (TimeInterval -> Int
asTimeout TimeInterval
d) [
(CallResponse b -> Process b) -> Match b
forall a b. Serializable a => (a -> Process b) -> Match b
match (\(CallResponse (b
m :: b) CallId
_) -> b -> Process b
proc b
m)
]
callAsync :: forall s a b . (Addressable s, Serializable a, Serializable b)
=> s -> a -> Process (Async b)
callAsync :: forall s a b.
(Addressable s, Serializable a, Serializable b) =>
s -> a -> Process (Async b)
callAsync s
server a
msg = AsyncTask b -> Process (Async b)
forall a. Serializable a => AsyncTask a -> Process (Async a)
async (AsyncTask b -> Process (Async b))
-> AsyncTask b -> Process (Async b)
forall a b. (a -> b) -> a -> b
$ Process b -> AsyncTask b
forall a. Process a -> AsyncTask a
task (Process b -> AsyncTask b) -> Process b -> AsyncTask b
forall a b. (a -> b) -> a -> b
$ s -> a -> Process b
forall s a b.
(Addressable s, Serializable a, Serializable b) =>
s -> a -> Process b
call s
server a
msg
cast :: forall a m . (Addressable a, Serializable m)
=> a -> m -> Process ()
cast :: forall a m. (Addressable a, Serializable m) => a -> m -> Process ()
cast a
server m
msg = a -> Message m () -> Process ()
forall m. (Serializable m, Resolvable a) => a -> m -> Process ()
forall a m.
(Routable a, Serializable m, Resolvable a) =>
a -> m -> Process ()
sendTo a
server (m -> Message m ()
forall a b. a -> Message a b
CastMessage m
msg :: T.Message m ())
callChan :: forall s a b . (Addressable s, Serializable a, Serializable b)
=> s -> a -> Process (ReceivePort b)
callChan :: forall s a b.
(Addressable s, Serializable a, Serializable b) =>
s -> a -> Process (ReceivePort b)
callChan s
server a
msg = do
(SendPort b
sp, ReceivePort b
rp) <- Process (SendPort b, ReceivePort b)
forall a. Serializable a => Process (SendPort a, ReceivePort a)
newChan
s -> Message a b -> Process ()
forall m. (Serializable m, Resolvable s) => s -> m -> Process ()
forall a m.
(Routable a, Serializable m, Resolvable a) =>
a -> m -> Process ()
sendTo s
server (a -> SendPort b -> Message a b
forall a b. a -> SendPort b -> Message a b
ChanMessage a
msg SendPort b
sp :: T.Message a b)
ReceivePort b -> Process (ReceivePort b)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return ReceivePort b
rp
syncCallChan :: forall s a b . (Addressable s, Serializable a, Serializable b)
=> s -> a -> Process b
syncCallChan :: forall s a b.
(Addressable s, Serializable a, Serializable b) =>
s -> a -> Process b
syncCallChan s
server a
msg = do
Either ExitReason b
r <- s -> a -> Process (Either ExitReason b)
forall s a b.
(Addressable s, Serializable a, Serializable b) =>
s -> a -> Process (Either ExitReason b)
syncSafeCallChan s
server a
msg
case Either ExitReason b
r of
Left ExitReason
e -> ExitReason -> Process b
forall a b. Serializable a => a -> Process b
die ExitReason
e
Right b
r' -> b -> Process b
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return b
r'
syncSafeCallChan :: forall s a b . (Addressable s, Serializable a, Serializable b)
=> s -> a -> Process (Either ExitReason b)
syncSafeCallChan :: forall s a b.
(Addressable s, Serializable a, Serializable b) =>
s -> a -> Process (Either ExitReason b)
syncSafeCallChan s
server a
msg = do
ReceivePort b
rp <- s -> a -> Process (ReceivePort b)
forall s a b.
(Addressable s, Serializable a, Serializable b) =>
s -> a -> Process (ReceivePort b)
callChan s
server a
msg
s -> [Match (Either ExitReason b)] -> Process (Either ExitReason b)
forall a b.
Addressable a =>
a -> [Match (Either ExitReason b)] -> Process (Either ExitReason b)
awaitResponse s
server [ ReceivePort b
-> (b -> Process (Either ExitReason b))
-> Match (Either ExitReason b)
forall a b. ReceivePort a -> (a -> Process b) -> Match b
matchChan ReceivePort b
rp (Either ExitReason b -> Process (Either ExitReason b)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either ExitReason b -> Process (Either ExitReason b))
-> (b -> Either ExitReason b) -> b -> Process (Either ExitReason b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. b -> Either ExitReason b
forall a b. b -> Either a b
Right) ]
callSTM :: forall s a b . (Addressable s)
=> s
-> (a -> STM ())
-> STM b
-> a
-> Process (Either ExitReason b)
callSTM :: forall s a b.
Addressable s =>
s -> (a -> STM ()) -> STM b -> a -> Process (Either ExitReason b)
callSTM s
server a -> STM ()
writeAction STM b
readAction a
input = do
ProcessId
pid <- s -> String -> Process ProcessId
forall a. Resolvable a => a -> String -> Process ProcessId
resolveOrDie s
server String
"callSTM: unresolveable address "
CallId
mRef <- ProcessId -> Process CallId
monitor ProcessId
pid
IO () -> Process ()
forall a. IO a -> Process a
forall (m :: * -> *) a. MonadIO m => IO a -> m a
liftIO (IO () -> Process ()) -> IO () -> Process ()
forall a b. (a -> b) -> a -> b
$ STM () -> IO ()
forall a. STM a -> IO a
atomically (STM () -> IO ()) -> STM () -> IO ()
forall a b. (a -> b) -> a -> b
$ a -> STM ()
writeAction a
input
Process (Either ExitReason b)
-> Process () -> Process (Either ExitReason b)
forall (m :: * -> *) a b.
(HasCallStack, MonadMask m) =>
m a -> m b -> m a
finally ([Match (Either ExitReason b)] -> Process (Either ExitReason b)
forall b. [Match b] -> Process b
receiveWait [ CallId -> Match (Either ExitReason b)
matchRef CallId
mRef
, STM b
-> (b -> Process (Either ExitReason b))
-> Match (Either ExitReason b)
forall a b. STM a -> (a -> Process b) -> Match b
matchSTM STM b
readAction (Either ExitReason b -> Process (Either ExitReason b)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (Either ExitReason b -> Process (Either ExitReason b))
-> (b -> Either ExitReason b) -> b -> Process (Either ExitReason b)
forall b c a. (b -> c) -> (a -> b) -> a -> c
. b -> Either ExitReason b
forall a b. b -> Either a b
Right)
])
(CallId -> Process ()
unmonitor CallId
mRef)
where
matchRef :: MonitorRef -> Match (Either ExitReason b)
matchRef :: CallId -> Match (Either ExitReason b)
matchRef CallId
r = (ProcessMonitorNotification -> Bool)
-> (ProcessMonitorNotification -> Process (Either ExitReason b))
-> Match (Either ExitReason b)
forall a b.
Serializable a =>
(a -> Bool) -> (a -> Process b) -> Match b
matchIf (\(ProcessMonitorNotification CallId
r' ProcessId
_ DiedReason
_) -> CallId
r CallId -> CallId -> Bool
forall a. Eq a => a -> a -> Bool
== CallId
r')
(\(ProcessMonitorNotification CallId
_ ProcessId
_ DiedReason
d) ->
Either ExitReason b -> Process (Either ExitReason b)
forall a. a -> Process a
forall (m :: * -> *) a. Monad m => a -> m a
return (ExitReason -> Either ExitReason b
forall a b. a -> Either a b
Left (String -> ExitReason
ExitOther (DiedReason -> String
forall a. Show a => a -> String
show DiedReason
d))))