Merge pull request #37 from transient-haskell/services11

improvements
This commit is contained in:
Alberto 2017-06-08 02:44:44 +02:00 committed by GitHub
commit 64a66c7ff2
8 changed files with 737 additions and 598 deletions

View file

@ -11,54 +11,148 @@
-- |
--
-----------------------------------------------------------------------------
{-# LANGUAGE ScopedTypeVariables #-}
module Main where
import Transient.Base
import Transient.Internals
import Transient.Move
import Transient.Move.Internals
import Transient.Move.Utils
import Transient.Move.Services
import Control.Applicative
import Control.Monad.IO.Class
import Data.List ((\\))
import Control.Exception(SomeException(..))
import Control.Concurrent
import Control.Monad
import Data.List
import System.Process
import System.Directory
import Data.Monoid
main = keep . runCloud $ do
runService monitorService $ \(ident,service) -> do
mnode <- (local $ findInNodes service >>= return . Just . head) <|>
requestInstall ident service
return (mnode :: Maybe Node)
runService monitorService 3000 $ \(ident,service,num) -> do
return () !> ("RUNSERVICE",ident, service, num)
nodes <- local $ findInNodes service >>= return . take num
-- onAll $ liftIO $ print ("NODES",nodes)
let n= num - length nodes
if n==0 then return nodes
else return nodes <> requestInstall ident service n
where
installHere ident service@(package,program)= local $ do
thisNode <- getMyNode
yn<- authorizeService ident service -- !> "AUTHORIZE"
if yn
then do
node <- liftIO $ do
port <- freePort
putStr "Monitor: installing " >> putStrLn package
install package program (nodeHost thisNode) port
putStrLn "INSTALLED"
nodeService thisNode port service
addNodes [node]
return $ Just node
else return Nothing
requestInstall :: String -> Service -> Cloud (Maybe Node)
requestInstall ident service= do
mnode <- installHere ident service -- !> "INSTALLHERE"
case mnode of
Nothing -> installThere ident service
justnode -> return justnode
requestInstall :: String -> Service -> Int -> Cloud [ Node]
requestInstall ident service num= do
ns <- local getEqualNodes
-- return () !> ("equal",ns)
auth <- callNodes' ns (<>) mempty $ localIO $ authorizeService ident service >>= \x -> return [x]
-- return () !> auth
let nodes = map fst $ filter snd $ zip ns auth
nnodes= length nodes
pernode= num `div` nnodes
lacking= num `rem` nnodes
(nodes1,nodes2)= splitAt lacking nodes
-- return () !> (pernode,lacking,nodes1,nodes2)
rs <- callNodes' nodes1 (<>) mempty (installHere ident service (pernode+1)) <>
callNodes' nodes2 (<>) mempty (installHere ident service pernode)
local $ addNodes rs
return rs
-- installIt = installHere ident service <|> installThere ident service
installHere :: String -> Service -> Int -> Cloud [ Node]
installHere ident service n= local $ replicateM n installOne
where
installOne= do
port <- liftIO freePort
install service port
return () !> "INSTALLED"
thisNode <- getMyNode
let node= Node (nodeHost thisNode) port Nothing service -- node to be published
nodelocal= Node "localhost" port Nothing [("externalNode", show $ node{nodeServices=[]})] -- local node
addNodes [node{nodeServices=("localNode", show nodelocal{nodeServices=[]}):nodeServices node},nodelocal ]
return node {nodeServices= nodeServices node ++ [("relay",show thisNode{nodeServices=[]})]}
-- `catcht` \(e :: SomeException) -> liftIO (putStr "INSTALLLLLLLLLLLLLLL2222222: " >> print e) >> empty
-- nodeService node@(Node h _ _ _) port service= Node h port Nothing $ service -- ++ [("relay",show $node{nodeServices=[]})
install :: Service -> Int -> TransIO ()
install service port= do
return () !> "IIIIIIIIIIIIIIINSTALL"
install' `catcht` \(e :: SomeException) -> liftIO (putStr "INSTALL error: " >> print e) >> empty
where
install'= do
let host= "localhost"
program <- return (lookup "executable" service) `onNothing` empty
return () !> ("program",program)
tryExec program host port <|> tryDocker service host port
<|> do tryInstall service ; tryExec program host port
emptyIfNothing :: Maybe a -> TransIO a
emptyIfNothing = Transient . return
tryInstall :: Service -> TransIO ()
tryInstall service = do
package <- emptyIfNothing (lookup "package" service)
install package
where
install package
| "git:" `isPrefixOf` package= installGit package
| "https://github.com" `isPrefixOf` package = installGit package
| "http://github.com" `isPrefixOf` package = installGit package
tryDocker service host port= do
image <- emptyIfNothing $ lookup "image" service
path <- Transient $ liftIO $ findExecutable "docker" -- return empty if not found
liftIO $ callProcess path ["run", image,"-p","start/"++host++"/"++ show port]
tryExec program host port= do
path <- Transient $ liftIO $ findExecutable program !> ("findExecutable", program)
spawnProgram program host port !>"spawn"
where
spawnProgram program host port= liftIO $ do
let prog = pathExe program host port
putStr "executing: " >> putStrLn prog
let createprostruct= shell prog
createProcess $ createprostruct ; return ()
threadDelay 2000000
-- return() !> ("INSTALLED", program)
where
pathExe program host port=
program ++ " -p start/" ++ show (host ::String)
++"/" ++ show (port ::Int) ++ " > "++ program ++ host ++ show port ++ ".log"
installGit package = liftIO $ do
let packagename = name package
when (null packagename) $ error $ "source for \""++package ++ "\" not found"
callProcess "git" ["clone",package]
liftIO $ putStr package >> putStrLn " cloned"
setCurrentDirectory packagename
callProcess "cabal" ["install","--force-reinstalls"]
setCurrentDirectory ".."
where
name url= slash . slash . slash $ slash url
where
slash= tail1 . dropWhile (/='/')
tail1 []=[]
tail1 x= tail x
installThere ident service= do
nodes <- onAll $ findInNodes monitorService -- !> "installThere"
mynode <- onAll getMyNode -- !> nodes
request $ nodes \\ [mynode]
where
request []= empty
request (n:ns)= do
mnode <- callService' ident n (ident,service) -- !> ("calling",n)
case mnode of
Nothing -> request ns
justnode -> return justnode

View file

@ -34,17 +34,14 @@ data PartRef a=PartRef a
import Transient.Internals
import Transient.Move hiding (pack)
import Transient.Logged
import Transient.Move.Internals hiding (pack)
import Transient.Indeterminism
import Control.Applicative
import System.Random
import Control.Monad.IO.Class
import System.IO
import Control.Monad
import Data.Monoid
import Data.Maybe
import Data.Typeable
import Data.List hiding (delete, foldl')
@ -56,7 +53,6 @@ import Data.TCache hiding (onNothing)
import Data.TCache.Defs
import Data.ByteString.Lazy.Char8 (pack,unpack)
import Control.Monad.STM
import qualified Data.Map.Strict as M
import Control.Arrow (second)
import qualified Data.Vector.Unboxed as DVU
@ -197,11 +193,11 @@ reduce :: (Hashable k,Ord k, Distributable vector a, Loggable k,Loggable a)
reduce red (dds@(DDS mx))= loggedc $ do
mboxid <- localIO $ atomicModifyIORef boxids $ \n -> let n'= n+1 in (n',n')
nodes <- local getNodes
nodes <- local getEqualNodes
let lengthNodes = length nodes
shuffler nodes = do
localIO $ threadDelay 100000
ref@(Ref node path sav) <- mx -- return the resulting blocks of the map
runAt node $ foldAndSend node nodes ref
@ -313,7 +309,7 @@ reduce red (dds@(DDS mx))= loggedc $ do
--parallelize :: Loggable b => (a -> Cloud b) -> [a] -> Cloud b
-- parallelize :: Loggable b => (a -> Cloud b) -> [a] -> Cloud b
parallelize f xs = foldr (<|>) empty $ map f xs
mparallelize f xs = loggedc $ foldr (<>) mempty $ map f xs
@ -370,7 +366,7 @@ search uuid= error $ "chunk failover not yet defined. Lookin for: "++ uuid
asyncDuplicate node uuid= do
forkTo node
nodes <- onAll getNodes
nodes <- onAll getEqualNodes
let node'= head $ nodes \\ [node]
content <- onAll . liftIO $ readFile uuid
runAt node' $ local $ liftIO $ writeFile uuid content
@ -386,7 +382,7 @@ distribute :: (Loggable a, Distributable vector a ) => vector a -> DDS (vector a
distribute = DDS . distribute'
distribute' xs= loggedc $ do
nodes <- local getNodes -- !> "DISTRIBUTE"
nodes <- local getEqualNodes -- !> "DISTRIBUTE"
let lnodes = length nodes
let size= case F.length xs `div` (length nodes) of 0 ->1 ; n -> n
xss= split size lnodes 1 xs -- !> size
@ -412,7 +408,7 @@ distribute'' xss nodes =
-- The function parameter partition the text in words
getText :: (Loggable a, Distributable vector a) => (String -> [a]) -> String -> DDS (vector a)
getText part str= DDS $ loggedc $ do
nodes' <- local getNodes -- !> "getText"
nodes' <- local getEqualNodes -- !> "getText"
let nodes = filter (not . isWebNode) nodes'
let lnodes = length nodes
@ -436,7 +432,7 @@ textUrl= getUrl (map Text.pack . words)
-- The first parameter is a function that divide the text in words
getUrl :: (Loggable a, Distributable vector a) => (String -> [a]) -> String -> DDS (vector a)
getUrl partitioner url= DDS $ do
nodes <- local getNodes -- !> "DISTRIBUTE"
nodes <- local getEqualNodes -- !> "DISTRIBUTE"
let lnodes = length nodes
parallelize (process lnodes) $ zip nodes [0..lnodes-1] -- !> show xss
@ -458,7 +454,7 @@ textFile= getFile (map Text.pack . words)
-- the first parameter is the parser that generates elements from the content
getFile :: (Loggable a, Distributable vector a) => (String -> [a]) -> String -> DDS (vector a)
getFile partitioner file= DDS $ do
nodes <- local getNodes -- !> "DISTRIBUTE"
nodes <- local getEqualNodes -- !> "DISTRIBUTE"
let lnodes = length nodes
parallelize (process lnodes) $ zip nodes [0..lnodes-1] -- !> show xss

View file

@ -21,7 +21,7 @@ module Transient.Move.Internals where
import Transient.Internals
import Transient.Logged
import Transient.Indeterminism
import Transient.Backtrack
-- import Transient.Backtrack
import Transient.EVars
@ -29,7 +29,7 @@ import Data.Typeable
import Control.Applicative
#ifndef ghcjs_HOST_OS
import Network
import Network.Info
--- import Network.Info
import Network.URI
--import qualified Data.IP as IP
import qualified Network.Socket as NS
@ -44,7 +44,7 @@ import qualified Data.ByteString.Char8 as BC
import qualified Data.ByteString.Lazy.Internal as BLC
import qualified Data.ByteString.Lazy as BL
import qualified Data.ByteString.Lazy.Char8 as BS
import Network.Socket.ByteString as SBS(send,sendMany,sendAll,recv)
import Network.Socket.ByteString as SBS(sendMany,sendAll,recv)
import qualified Network.Socket.ByteString.Lazy as SBSL
import Data.CaseInsensitive(mk)
import Data.Char(isSpace)
@ -66,13 +66,13 @@ import Data.JSString (JSString(..), pack)
import Control.Monad.State
import System.IO
-- import System.IO
import Control.Exception hiding (onException,try)
import Data.Maybe
--import Data.Hashable
--import System.Directory
import Control.Monad
-- import Control.Monad
import System.IO.Unsafe
import Control.Concurrent.STM as STM
@ -80,18 +80,18 @@ import Control.Concurrent.MVar
import Data.Monoid
import qualified Data.Map as M
import Data.List (nub,(\\),find, insert)
import Data.List (nub,(\\)) -- ,find, insert)
import Data.IORef
import System.IO
-- import System.IO
import Control.Concurrent
import Data.Dynamic
-- import Data.Dynamic
import Data.String
import System.Mem.StableName
@ -108,8 +108,8 @@ newtype PortID = PortNumber Int deriving (Read, Show, Eq, Typeable)
data Node= Node{ nodeHost :: HostName
, nodePort :: Int
, connection :: MVar Pool
, nodeServices :: [Service]
, connection :: Maybe (MVar Pool)
, nodeServices :: Service
}
deriving (Typeable)
@ -123,16 +123,15 @@ instance Ord Node where
newtype Cloud a= Cloud {runCloud' ::TransIO a} deriving (Functor,Applicative,Monoid,Alternative, Monad, Num, MonadState EventF)
-- | Execute a distributed computation in the 'TransIO' monad.
-- Note that all the computations inside the 'TransIO' monad that enclose the
-- cloud computation must be `logged`.
-- | Execute a distributed computation inside a TransIO computation.
-- All the computations in the TransIO monad that enclose the cloud computation must be `logged`
runCloud :: Cloud a -> TransIO a
runCloud x= do
closRemote <- getSData <|> return (Closure 0)
runCloud' x <*** setData closRemote
--instance Monoid a => Monoid (Cloud a) where
-- mappend x y = mappend <$> x <*> y
-- mempty= return mempty
@ -149,7 +148,7 @@ tlsHooks ::IORef (SData -> BS.ByteString -> IO ()
tlsHooks= unsafePerformIO $ newIORef
( notneeded
, notneeded
, \s i -> tlsNotSupported i
, \_ i -> tlsNotSupported i
, \_ _ _-> return())
where
@ -210,11 +209,12 @@ runCloudIO' (Cloud mx)= keep' mx
onAll :: TransIO a -> Cloud a
onAll = Cloud
-- | only executes if the result is demanded
lazy :: TransIO a -> Cloud a
lazy mx= onAll $ getCont >>= \st -> Transient $
return $ unsafePerformIO $ runStateT (runTrans mx) st >>= return .fst
-- log the result a cloud computation. like `loogged`, This eliminated all the log produced by computations
-- log the result a cloud computation. like `loogged`, this erases all the log produced by computations
-- inside and substitute it for that single result when the computation is completed.
loggedc :: Loggable a => Cloud a -> Cloud a
loggedc (Cloud mx)= Cloud $ do
@ -233,20 +233,9 @@ lliftIO= local . liftIO
localIO :: Loggable a => IO a -> Cloud a
localIO= lliftIO
--remote :: Loggable a => TransIO a -> Cloud a
--remote x= Cloud $ step' x $ \full x -> Transient $ do
-- let add= Wormhole: full
-- setData $ Log False add add
--
-- r <- runTrans x
--
-- let add= WaitRemote: full
-- (setData $ Log False add add) -- !!> "AFTER STEP"
-- return r
-- | stop the current computation and does not execute any alternative computation
fullStop :: Cloud stop
fullStop= onAll $ setData WasRemote >> stop
fullStop :: TransIO stop
fullStop= setData WasRemote >> stop
-- | continue the execution in a new node
@ -263,8 +252,20 @@ forkTo node= beamTo node <|> return()
-- currently by default it keep open the connection to receive additional requests
-- and responses (streaming)
callTo :: Loggable a => Node -> Cloud a -> Cloud a
callTo node remoteProc=
wormhole node $ atRemote remoteProc
callTo node remoteProc= wormhole node $ atRemote remoteProc
-- wormhole node $ do
-- relay <- local $ do
-- conn <- getState <|> error ("no connection with node: " ++ show node)
-- case connData conn of
-- Just (Relay conn remoteNode) -> do
-- setData conn !> "callTo RELAY"
-- return $ Just remoteNode
-- _ -> return Nothing
-- case relay of
-- Just remoteNode ->
-- atRemote $ callTo remoteNode remoteProc
-- _ ->
-- atRemote remoteProc !> "callTo NO RELAY"
#ifndef ghcjs_HOST_OS
-- | A connectionless version of callTo for long running remote calls
@ -304,7 +305,7 @@ runAt= callTo
single :: TransIO a -> TransIO a
single f= do
cutExceptions
con@Connection{closChildren=rmap} <- getSData <|> error "single: only works within a wormhole"
Connection{closChildren=rmap} <- getSData <|> error "single: only works within a wormhole"
mapth <- liftIO $ readIORef rmap
id <- liftIO $ f `seq` makeStableName f >>= return . hashStableName
@ -324,13 +325,13 @@ single f= do
-- executed for that connection. The rest are ignored.
unique :: a -> TransIO ()
unique f= do
con@Connection{closChildren=rmap} <- getSData <|> error "unique: only works within a connection. Use wormhole"
Connection{closChildren=rmap} <- getSData <|> error "unique: only works within a connection. Use wormhole"
mapth <- liftIO $ readIORef rmap
id <- liftIO $ f `seq` makeStableName f >>= return . hashStableName
let mx = M.lookup id mapth
case mx of
Just tv -> empty
Just _ -> empty
Nothing -> do
tv <- get
liftIO $ modifyIORef rmap $ \mapth -> M.insert id tv mapth
@ -342,14 +343,14 @@ unique f= do
-- `teleport` uses this connection to translate the computation back and forth between the two nodes connected
wormhole :: Loggable a => Node -> Cloud a -> Cloud a
wormhole node (Cloud comp) = local $ Transient $ do
moldconn <- getData :: StateIO (Maybe Connection)
mclosure <- getData :: StateIO (Maybe Closure)
-- labelState $ "wormhole" ++ show node
logdata@(Log rec log fulLog) <- getData `onNothing` return (Log False [][])
if not rec -- !> ("wormhole recovery", rec)
if not rec
then runTrans $ (do
conn <- mconnect node
@ -359,11 +360,9 @@ wormhole node (Cloud comp) = local $ Transient $ do
comp )
<*** do when (isJust moldconn) . setData $ fromJust moldconn
when (isJust mclosure) . setData $ fromJust mclosure
-- <** is not enough
-- <** is not enough since comp may be reactive
else do
let conn = fromMaybe (error "wormhole: no connection in remote node") moldconn
-- conn <- getData `onNothing` error "wormhole: no connection in remote node"
setData $ conn{calling= False}
@ -382,36 +381,11 @@ pack= id
#endif
--
--newtype Prefix= Prefix JSString deriving(Read,Show)
--{-#NOINLINE rprefix #-}
--rprefix= unsafePerformIO $ newIORef 0
--addPrefix :: (MonadIO m, MonadState EventF m) => m ()
--addPrefix= do
-- r <- liftIO $ atomicModifyIORef rprefix (\n -> (n+1,n)) -- liftIO $ replicateM 5 (randomRIO ('a','z'))
-- (setData $ Prefix $ pack $ show r) !> "addPrefix"
--
-- | translates computations back and forth between two nodes
-- reusing a connection opened by `wormhole`
--
-- each teleport transport to the other node what is new in the log since the
-- last teleport
--
-- It is used trough other primitives like `runAt` which involves two teleports:
--
-- runAt node= wormhole node $ loggedc $ do
-- > teleport
-- > r <- Cloud $ runCloud proc <** setData WasRemote
-- > teleport
-- > return r
teleport :: Cloud ()
teleport = do
local $ Transient $ do
cont <- get
-- labelState "teleport"
-- send log with closure at head
@ -424,7 +398,8 @@ teleport = do
#ifndef ghcjs_HOST_OS
case contype of
Just Self -> runTrans $ do
setData $ if (not calling) then WasRemote else WasParallel
-- return () !> "SELF"
setData $ if (not calling) then WasRemote else WasParallel
abduce -- !> "SELF" -- call himself
liftIO $ do
remote <- readIORef $ remoteNode conn
@ -446,16 +421,15 @@ teleport = do
-- closLocal <- liftIO $ randomRIO (0,1000000)
-- node <- runTrans getMyNode
liftIO $ modifyMVar_ closures $ \map -> return $ M.insert closLocal (fulLog,cont) map
let tosend= reverse $ if closRemote==0 then fulLog else log
runTrans $ msend conn $ SMore $ ClosureData closRemote closLocal tosend
-- !> ("teleport sending", unsafePerformIO $ readIORef remoteNode , SMore (closRemote,closLocal,tosend))
-- !> "--------->------>---------->"
!> ("teleport sending", SMore (closRemote,closLocal,tosend))
!> "--------->------>---------->"
setData $ if (not calling) then WasRemote else WasParallel
@ -579,11 +553,11 @@ mclustered proc= callNodes (<>) mempty proc
callNodes op init proc= loggedc' $ do
nodes <- local getServerNodes
nodes <- local getEqualNodes
callNodes' nodes op init proc
callNodes' nodes op init proc= foldr op init $ map (\node -> runAt node proc) nodes
callNodes' nodes op init proc= loggedc' $ foldr op init $ map (\node -> runAt node proc) nodes
-----
#ifndef ghcjs_HOST_OS
sendRaw (Connection _ _ (Just (Node2Web sconn )) _ _ _ _ _ _) r=
@ -604,7 +578,7 @@ sendRaw (Connection _ _ (Just (Web2Node sconn)) _ _ blocked _ _ _) r= liftIO $
sendRaw _ _= error "No connection stablished"
data NodeMSG= ClosureData IdClosure IdClosure CurrentPointer
data NodeMSG= ClosureData IdClosure IdClosure CurrentPointer | RelayMSG Node Node (StreamData NodeMSG)
deriving (Typeable, Read, Show)
msend :: MonadIO m => Connection -> StreamData NodeMSG -> m ()
@ -614,15 +588,18 @@ msend :: MonadIO m => Connection -> StreamData NodeMSG -> m ()
msend (Connection _ _ (Just (Node2Node _ sock _)) _ _ blocked _ _ _) r=
liftIO $ withMVar blocked $ const $ SBS.sendAll sock $ BC.pack (show r) -- !> "N2N SEND"
msend (Connection _ _ (Just (TLSNode2Node ctx)) _ _ blocked _ _ _) r=
msend (Connection _ _ (Just (TLSNode2Node ctx)) _ _ _ _ _ _) r=
liftIO $ sendTLSData ctx $ BS.pack (show r) -- !> "TLS SEND"
msend (Connection _ _ (Just (Node2Web sconn)) _ _ blocked _ _ _) r=liftIO $
msend (Connection _ _ (Just (Node2Web sconn)) _ _ _ _ _ _) r=liftIO $
{-withMVar blocked $ const $ -} WS.sendTextData sconn $ BS.pack (show r) -- !> "websockets send"
msend((Connection _ rremote (Just (Relay req )) _ _ _ _ _ _)) r= writeEVar req r
msend((Connection myNode _ (Just (Relay conn remote )) _ _ _ _ _ _)) r= do
origin <- liftIO $ readIORef myNode -- `onNothing` error "msend: no remote node in connection"
-- msend conn $ SMore (ClosureData 0 0 [Var $ IDynamic (),Var . IDynamic $ origin{nodeServices=[]}
-- ,Var $ IDynamic remote{nodeServices=[]},Var $ IDynamic r]) -- writeEVar req r !> "msed relay"
msend conn $ SMore $ RelayMSG origin remote r
#else
@ -651,7 +628,7 @@ wsRead ws= do
dat <- react (hsonmessage ws) (return ())
case JM.getData dat of
JM.StringData str -> return (read' $ JS.unpack str)
-- !> ("Browser webSocket read", str) !> "<------<----<----<------"
-- !> ("Browser webSocket read", str) !> "<------<----<----<------"
JM.BlobData blob -> error " blob"
JM.ArrayBufferData arrBuffer -> error "arrBuffer"
@ -716,7 +693,7 @@ foreign import javascript safe
#else
mread (Connection _ _(Just (Node2Node _ _ _)) _ _ _ _ _ _) = parallelReadHandler -- !> "mread"
mread (Connection _ _ (Just (TLSNode2Node ctx)) _ _ _ _ _ _) = parallelReadHandler
mread (Connection _ _ (Just (TLSNode2Node _)) _ _ _ _ _ _) = parallelReadHandler
-- parallel $ do
-- s <- recvTLSData ctx
-- return . read' $ BC.unpack s
@ -725,13 +702,31 @@ mread (Connection _ _ (Just (Node2Web sconn )) _ _ _ _ _ _)=
parallel $ do
s <- WS.receiveData sconn
return . read' $ BS.unpack s
-- !> ("WS MREAD RECEIVED ----<----<------<--------", s)
-- !> ("WS MREAD RECEIVED ----<----<------<--------", s)
mread (Connection _ rremote (Just (Relay _ )) _ _ _ _ _ _)= do
remote <- liftIO $ readIORef rremote
getMailbox' $ fromJust remote
mread (Connection _ _ (Just (Relay conn _ )) _ _ _ _ _ _)=
mread conn -- !> "MREAD RELAY"
-- return r !> ("GETMAILBOX RECEIVED", remote)
parallelReadHandler :: Loggable a => TransIO (StreamData a)
parallelReadHandler= do
str <- giveData :: TransIO BS.ByteString
r <- choose $ readStream str
return r
!> ("parallel read handler read", r)
!> "<-------<----------<--------<----------"
where
readStream :: (Typeable a, Read a) => BS.ByteString -> [StreamData a]
readStream s= readStream1 $ BS.unpack s
where
readStream1 s=
let [(x,r)] = reads s
in x : readStream1 r
@ -757,7 +752,7 @@ mclose (Connection _ _
mclose (Connection _ _
(Just (Node2Web sconn ))
bufSize events blocked _ _ _)=
_ _ _ _ _ _)=
WS.sendClose sconn ("closemsg" :: BS.ByteString)
#else
@ -770,37 +765,36 @@ mclose (Connection _ _(Just (Web2Node sconn)) _ _ blocked _ _ _)=
mconnect :: Node -> TransIO Connection
mconnect node@(Node _ _ _ _ )= do
mconnect node'= do
node <- fixNode node'
nodes <- getNodes
return () -- !> ("mconnnect", nodePort node)
let fnode = filter (==node) nodes
case fnode of
[] -> mconnect1 node
[node'@(Node host port pool _)] -> do
plist <- liftIO $ readMVar pool
case plist of -- !> ("length",length plist) of
handle:_ -> do
[] -> mconnect1 node -- !> "NO NODE"
[node'@(Node _ _ pool _)] -> do
plist <- liftIO $ readMVar $ fromJust pool
case plist of -- !> ("length", length plist,nodePort node) of
(handle:_) -> do
delData $ Closure undefined
return handle
-- !> ("REUSED!", node)
_ -> mconnect1 node' -- !> ("MCONNECT1",host,port)
_ -> mconnect1 node'
where
#ifndef ghcjs_HOST_OS
mconnect1 (node@(Node host port pool _))= do
mconnect1 (node@(Node host port _ _))= do
return () !> ("MCONNECT1",host,port,nodeServices node)
-- (do
-- liftIO $ when (host== "192.168.99.100" && (port == 8081 || port== 8080)) $ error "connnnn" !> "detected"
(conn,parseContext) <- timeout 1000000 (connectNode2Node host port) <|>
timeout 1000000 (connectWebSockets host port) <|>
checkRelay <|>
(throwt $ ConnectionError "" node)
(conn,parseContext) <- connectNode2Node host port <|> timeout 1000000 (connectWebSockets host port) <|> checkRelay
-- `catcht` \(e :: SomeException) -> checkRelay
setState conn
setState parseContext
-- return () !> "CONNECTED AFTER TIMEOUT"
@ -808,7 +802,7 @@ mconnect node@(Node _ _ _ _ )= do
-- write node connected in the connection
liftIO $ writeIORef (remoteNode conn) $ Just node
-- write connection in the node
liftIO $ modifyMVar_ (connection node) . const $ return [conn]
liftIO $ modifyMVar_ (fromJust $ connection node) . const $ return [conn]
addNodes [node]
watchConnection
@ -824,38 +818,44 @@ mconnect node@(Node _ _ _ _ )= do
r:_ -> return r
checkRelay= do
-- return () !> "CHECKRELAY"
case lookup "wsnode" $ nodeServices node of
Nothing -> empty
Just relay -> do
Connection{myNode=my,comEvent= ev} <- getSData <|> error "connect: listen not set for this node"
requests <- newEVar
return () !> "RELAY"
myNode <- getMyNode
if nodeHost node== nodeHost myNode
then
case lookup "localNode" $ nodeServices node of
Just snode -> do
con <- mconnect $ read snode
cont <- getSData <|> return noParseContext
return (con,cont)
Nothing -> empty
else do
conn' <- liftIO $ defConnection >>= \c ->
return c{myNode=my, comEvent= ev,connData=
Just $ Relay requests }
mynode <- liftIO $ readIORef my
putMailbox ((mynode ,node, read relay ,requests) ::
(Node, Node,Node, EVar (StreamData NodeMSG)))
setState conn'
-- setState noParseContext
continue
return (conn', noParseContext)
case lookup "relay" $ nodeServices node of
Nothing -> empty -- !> "NO RELAY"
Just relayInfo -> do
let relay= read relayInfo
conn <- mconnect relay -- !> ("RELAY",relay, node)
rem <- liftIO $ newIORef $ Just node
-- clos <- liftIO $ newMVar $ M.empty
let conn'= conn{connData= Just $ Relay conn node,remoteNode=rem} --,closures= clos}
parseContext <- getState <|> return noParseContext
return (conn', parseContext)
noParseContext= (ParseContext (error "relay error") (error "relay error")
:: ParseContext BS.ByteString)
connectSockTLS host port= do
-- return () !> "connectSockTLS"
-- return () !> "connectSockTLS"
let size=8192
Connection{myNode=my,comEvent= ev} <- getSData <|> error "connect: listen not set for this node"
sock <- liftIO $connectTo' size host $ PortNumber $ fromIntegral port
sock <- liftIO $ connectTo' size host $ PortNumber $ fromIntegral port
conn' <- liftIO $ defConnection >>= \c ->
return c{myNode=my, comEvent= ev,connData=
Just $ (Node2Node u sock (error $ "addr: outgoing connection"))}
setData conn'
@ -863,15 +863,16 @@ mconnect node@(Node _ _ _ _ )= do
setData $ ParseContext (error "parse context: Parse error") input
maybeClientTLSHandshake host sock input
`catcht` \(e :: SomeException) -> empty
`catcht` \(_ :: SomeException) -> empty
connectNode2Node host port= do
return () !> "NODE 2 NODE"
connectSockTLS host port
-- return () !> "CONNECT NODE2NODE"
conn <- getSData <|> error "mconnect: no connection data"
sendRaw conn "CLOS a b\r\n\r\n"
@ -881,7 +882,8 @@ mconnect node@(Node _ _ _ _ )= do
"OK" -> do
parseContext <- getState
return (conn,parseContext)
_ -> do
_ -> do
let Connection{connData=cdata}= conn
case cdata of
Just(Node2Node _ s _) -> liftIO $ NS.close s -- since the HTTP firewall closes the connection
@ -890,6 +892,7 @@ mconnect node@(Node _ _ _ _ )= do
connectWebSockets host port = do
return () !> "WEBSOCKETS"
connectSockTLS host port -- a new connection
never <- liftIO $ newEmptyMVar :: TransIO (MVar ())
@ -910,17 +913,18 @@ mconnect node@(Node _ _ _ _ )= do
:: TransIO (ParseContext BS.ByteString)
chs <- liftIO $ newIORef M.empty
let conn'= conn{closChildren= chs}
liftIO $ modifyMVar_ pool $ \plist -> return $ conn':plist -- !> (node,"ADDED TO POOL")
-- liftIO $ modifyMVar_ (fromJust pool) $ \plist -> do
-- if not (null plist) then print "DUPLICATE" else return ()
-- return $ conn':plist -- !> (node,"ADDED TO POOL")
-- tell listenResponses to watch incoming responses
putMailbox ((conn',parseContext,node) -- !> "PUTMAILBOX"
putMailbox ((conn',parseContext,node)
:: (Connection,ParseContext BS.ByteString,Node))
liftIO $ threadDelay 100000 -- give time to initialize listenResponses
#else
mconnect1 (node@(Node host port pool _))= do
mconnect1 (node@(Node host port (Just pool) _))= do
-- my <- getMyNode
Connection{myNode=my,comEvent= ev} <- getSData <|> error "connect: listen not set for this node"
@ -944,7 +948,9 @@ mconnect node@(Node _ _ _ _ )= do
u= undefined
data ConnectionError= ConnectionError String Node deriving Show
instance Exception ConnectionError
-- mconnect _ = empty
@ -988,7 +994,7 @@ data ConnectionData=
| Node2Web{webSocket :: WS.Connection}
-- | WS2Node{webSocketNode :: WS.Connection}
| Self
| Relay (EVar (StreamData NodeMSG))
| Relay Connection Node -- (EVar (StreamData NodeMSG))
#else
Self
| Web2Node{webSocket :: WebSocket}
@ -1072,20 +1078,93 @@ getBuffSize=
listen :: Node -> Cloud ()
listen (node@(Node _ port _ _ )) = onAll $ do
addThreads 1
setData $ Log False [] []
conn' <- getSData <|> defConnection
ev <- liftIO $ newIORef M.empty
chs <- liftIO $ newIORef M.empty
let conn= conn'{connData=Just Self, comEvent=ev,closChildren=chs}
liftIO $ writeIORef (myNode conn) node
setData conn
liftIO $ modifyMVar_ (connection node) $ const $ return [conn]
addNodes [node]
mlog <- listenNew (fromIntegral port) conn <|> listenResponses :: TransIO (StreamData NodeMSG)
pool <- liftIO $ newMVar [conn]
execLog mlog
let node'= node{connection=Just pool}
liftIO $ writeIORef (myNode conn) node'
setData conn
liftIO $ modifyMVar_ (fromJust $ connection node') $ const $ return [conn]
addNodes [node']
mlog <- listenNew (fromIntegral port) conn <|> listenResponses :: TransIO (StreamData NodeMSG)
case mlog of
SMore (RelayMSG _ _ _) ->relay mlog
_ -> execLog mlog
`catcht` (\(e ::SomeException) -> liftIO (print e))
-- relayService :: TransIO ()
relay (SMore (RelayMSG origin destiny streamdata)) = do
nodes <- getNodes
my <- getMyNode -- !> "relayService"
if destiny== my
then do
case filter (==origin) nodes of
[node] -> do
(conn: _) <- liftIO $ readMVar $ fromJust $ connection node
setData conn
[] -> do
conn@Connection{remoteNode= rorigin} <- getState
let conn'= conn{connData= Just $ Relay conn origin} -- !> ("Relay set with: ", origin, destiny)
pool <- liftIO $ newMVar [conn']
addNodes [origin{connection= Just pool}]
setData conn'
execLog streamdata
else do
-- search local node name if hostname is the same
-- let destiny' = if nodeHost destiny== nodeHost my
-- then
-- case filter (==destiny) nodes of
-- [node] -> case lookup "localNode" $ nodeServices node of
-- Just snode -> read snode
-- Nothing -> destiny
-- _ -> destiny
-- else destiny
-- let origin'= if nodeHost origin == "localhost"
-- then case filter (==origin) nodes of
-- [node] ->case lookup "externalNode" $ nodeServices node of
-- Just snode -> read snode
-- Nothing -> origin
-- _ -> origin
-- else origin
let (origin',destiny')= nat origin destiny my nodes
con <- mconnect destiny'
msend con . SMore $ RelayMSG origin' destiny' streamdata
return () !> ("SEND RELAY DATA",streamdata)
fullStop
relay _= empty
nat origin destiny my nodes=
let destiny' = if nodeHost destiny== nodeHost my
then
case filter (==destiny) nodes of
[node] -> case lookup "localNode" $ nodeServices node of
Just snode -> read snode
Nothing -> destiny
_ -> destiny
else destiny
origin'= if nodeHost origin == "localhost"
then case filter (==origin) nodes of
[node] ->case lookup "externalNode" $ nodeServices node of
Just snode -> read snode
Nothing -> origin
_ -> origin
else origin
in (origin',destiny')
-- listen incoming requests
listenNew port conn'= do
@ -1098,7 +1177,6 @@ listenNew port conn'= do
-- wait for connections. One thread per connection
(sock,addr) <- waitEvents $ NS.accept sock
-- return () !> addr
chs <- liftIO $ newIORef M.empty
-- case addr of
-- NS.SockAddrInet port host -> liftIO $ print("connection from", port, host)
@ -1108,8 +1186,6 @@ listenNew port conn'= do
input <- liftIO $ SBSL.getContents sock
cutExceptions
onException $ \(e :: SomeException) -> do
cutExceptions
@ -1126,7 +1202,7 @@ listenNew port conn'= do
liftIO $ do
modifyMVar_ closures $ const $ return M.empty
writeIORef rmap M.empty
topState >>= showThreads
-- topState >>= showThreads
killBranch
setData $ (ParseContext (NS.close sock >> error "Communication error" ) input
@ -1144,9 +1220,9 @@ listenNew port conn'= do
"CLOS" ->
do
conn <- getSData
sendRaw conn "OK" -- !> "CLOS detected"
sendRaw conn "OK" -- !> "CLOS detected"
parallelReadHandler
mread conn
_ -> do
let uri'= BC.tail $ uriPath uri
@ -1183,8 +1259,9 @@ listenNew port conn'= do
let conn'= conn{connData= Just (Node2Web sconn)
, closChildren=chs}
setState conn'
setState conn' -- !> "WEBSOCKETS-----------------------------------------------"
onException $ \(e :: SomeException) -> do
cutExceptions
liftIO $ putStr "listen websocket:" >> print e
continue
liftIO $ mclose conn'
@ -1195,8 +1272,8 @@ listenNew port conn'= do
-- return () !> "WEBSOCKET"
r <- parallel $ do
msg <- WS.receiveData sconn
-- return () !> ("Server WebSocket msg read",msg)
-- !> "<-------<---------<--------------"
-- return () !> ("Server WebSocket msg read",msg)
-- !> "<-------<---------<--------------"
case reads $ BS.unpack msg of
[] -> do
@ -1278,7 +1355,7 @@ listenResponses= do
liftIO $ putStr "removing node: " >> print node
nodes <- getNodes
setNodes $ nodes \\ [node]
topState >>= showThreads
-- topState >>= showThreads
killChilds
let Connection{closures=closures}= conn
liftIO $ modifyMVar_ closures $ const $ return M.empty)
@ -1286,33 +1363,6 @@ listenResponses= do
mread conn
-- | a bridge that forward messages to a third node trough a relay node. Used by the `Relay` connections
relayService= do
(msg,localnode, remotenode, relaynode) <- local $ do
(localnode,remotenode, relaynode, requests) <- getMailbox
:: TransIO (Node, Node,Node, EVar (StreamData NodeMSG))
-- return () !> ("READING MESSAgES FROM",localnode,remotenode, relaynode)
msg <- readEVar requests -- !> "INSTALLED HANDLER READ EVAR"
return (msg,localnode,remotenode, relaynode) -- !> "MESSAGE READ IN ORIGIN NODE"
runAt relaynode . runAt remotenode $ do
-- start the listenResponses for this node the first time
-- return () !> "IN REMOTE NODE"
nodes <- local getNodes
let mnode = find (==localnode) nodes
case mnode of
Nothing -> localIO $ error $ "Relay: no node: "++ show localnode
Just nod -> do
noconnection <- localIO $ readMVar (connection nod) >>= return . null
if noconnection
then runAt nod empty <|> return ()
else return ()
local $ putMailbox' localnode msg
-- !> "MESSAGE WRITTEN IN DESTINAION NODE"
empty -- no need to continue with a message back to the origin node
return () -- to type match
@ -1330,8 +1380,9 @@ execLog mlog = Transient $
SDone -> runTrans(back $ ErrorCall "SDone") >> return Nothing -- TODO remove closure?
SMore r -> process r False
SLast r -> process r True
-- !> ("EXECLOG",mlog)
where
process (ClosureData closl closr log) deleteClosure= do
conn@Connection {closures=closures} <- getData `onNothing` error "Listen: myNode not set"
if closl== 0 then do
@ -1373,8 +1424,6 @@ execLog mlog = Transient $
return Nothing
-- !> "FINISH CLOSURE"
#ifdef ghcjs_HOST_OS
listen node = onAll $ do
addNodes [node]
@ -1390,7 +1439,7 @@ listen node = onAll $ do
type Pool= [Connection]
type Package= String
type Program= String
type Service= (Package, Program)
type Service= [(Package, Program)]
@ -1402,34 +1451,6 @@ data ParseContext a = IsString a => ParseContext (IO a) a deriving Typeable
#ifndef ghcjs_HOST_OS
parallelReadHandler :: Loggable a => TransIO (StreamData a)
parallelReadHandler= do
str <- giveData :: TransIO BS.ByteString
r <- choose $ readStream str
-- rest <- liftIO $ newIORef $ BS.unpack str
--
-- r <- parallel $ readStream' rest
return r
-- !> ("read",r)
-- !> "<-------<----------<--------<----------"
where
-- readStream' :: (Loggable a) => IORef String -> IO(StreamData a)
-- readStream' rest = do
-- return () !> "reAD StrEAM"
-- s <- readIORef rest
-- liftIO $ print $ takeWhile (/= ')') s
-- [(x,r)] <- maybeRead s
-- writeIORef rest r
-- return x
readStream :: (Typeable a, Read a) => BS.ByteString -> [StreamData a]
readStream s= readStream1 $ BS.unpack s
where
readStream1 s=
let [(x,r)] = reads s
in x : readStream1 r
-- maybeRead line= unsafePerformIO $ do
-- let [(v,left)] = reads line
@ -1449,6 +1470,7 @@ readFrom Connection{connData= Just(Node2Node _ sock _)} = toStrict <$> loop
loop :: IO BL.ByteString
loop = unsafeInterleaveIO $ do
s <- SBS.recv sock bufSize
if BC.length s < bufSize
then return $ BLC.Chunk s mempty
else BLC.Chunk s `liftM` loop
@ -1497,7 +1519,7 @@ servePages (method,uri, headers) = do
let file= if BC.null uri then "index.html" else uri
{- TODO renderin in server
{- TODO rendering in server
NEEDED: recodify View to use blaze-html in server. wlink to get path in server
does file exist?
if exist, send else do
@ -1662,27 +1684,18 @@ emptyPool= liftIO $ newMVar []
-- | Create a node from a hostname (or IP address), port number and a list of
-- services.
createNodeServ :: HostName -> Integer -> [Service] -> IO Node
createNodeServ h p svs= do
pool <- emptyPool
return $ Node h ( fromInteger p) pool svs
createNodeServ :: HostName -> Int -> Service -> IO Node
createNodeServ h p svs= return $ Node h p Nothing svs
-- | Create a node from a hostname (or IP address) and port number. The node is
-- created without any services.
createNode :: HostName -> Integer -> IO Node
createNode :: HostName -> Int -> IO Node
createNode h p= createNodeServ h p []
createWebNode :: IO Node
createWebNode= do
pool <- emptyPool
return $ Node "webnode" ( fromInteger 0) pool [("webnode","")]
return $ Node "webnode" 0 (Just pool) [("webnode","")]
--createWSNode :: IO Node
-- pool <- emptyPool
-- return $ Node "wsnode" (fromInteger 0) pool [("wsnode","")]
instance Eq Node where
Node h p _ _ ==Node h' p' _ _= h==h' && p==p'
@ -1691,18 +1704,17 @@ instance Eq Node where
instance Show Node where
show (Node h p _ servs )= show (h,p, servs)
instance Read Node where
readsPrec _ s=
let r= readsPrec' 0 s
in case r of
[] -> []
[((h,p,ss),s')] -> [(Node h p empty
( ss),s')]
where
empty= unsafePerformIO emptyPool
[((h,p,ss),s')] -> [(Node h p Nothing ( ss),s')]
-- inst ghc-options: -threaded -rtsopts
nodeList :: TVar [Node]
nodeList = unsafePerformIO $ newTVarIO []
@ -1729,23 +1741,33 @@ getMyNode = do
getNodes :: MonadIO m => m [Node]
getNodes = liftIO $ atomically $ readTVar nodeList
getServerNodes= do
nodes <- getNodes
return $ filter (not . isWebNode) nodes
-- getEqualNodes= getNodes
where
isWebNode Node {nodeServices=srvs}
| ("webnode","") `elem` srvs = True
| otherwise = False
getEqualNodes = do
nodes <- getNodes
let srv= nodeServices $ head nodes
case srv of
[] -> return $ filter (null . nodeServices) nodes
(srv:_) -> return $ filter (\n -> head (nodeServices n) == srv ) nodes
matchNodes f = do
nodes <- getNodes
return $ map (\n -> filter f $ nodeServices n) nodes
-- | Add a list of nodes to the list of existing cluster nodes.
addNodes :: [Node] -> TransIO () -- (MonadIO m, MonadState EventF m) => [Node] -> m ()
addNodes nodes= do
-- my <- getMyNode -- mynode must be first
nodes' <- mapM fixNode nodes
liftIO . atomically $ do
prevnodes <- readTVar nodeList
writeTVar nodeList $ nub $ prevnodes ++ nodes'
writeTVar nodeList $ nub $ prevnodes ++ nodes
fixNode n= case connection n of
Nothing -> do
pool <- emptyPool
return n{connection= Just pool}
Just _ -> return n
-- | set the list of nodes
setNodes nodes= liftIO $ atomically $ writeTVar nodeList $ nodes
@ -1811,21 +1833,20 @@ connect node remotenode = do
-- having exactly the same list of nodes.
connect' :: Node -> Cloud ()
connect' remotenode= do
nodes <- local getServerNodes
nodes <- local getNodes
localIO $ putStr "connecting to: " >> print remotenode
newNodes <- runAt remotenode $ interchange remotenode nodes
local $ return () -- !> "interchange finish"
-- local $ return () !> "interchange finish"
-- add the new nodes to the local nodes in all the nodes connected previously
let toAdd=remotenode:tail newNodes
callNodes' nodes (<>) mempty $ local $ do
liftIO $ putStr "New nodes: " >> print toAdd
addNodes toAdd
where
-- receive new nodes and send their own
interchange remotemode nodes=
@ -1836,22 +1857,22 @@ connect' remotenode= do
-- if is a websockets node, add only this node
let newNodes = case cdata of
Node2Web _ -> [(head nodes){nodeServices=[("wsnode",show remotenode)]}]
_ -> nodes
callingNode= head newNodes
-- let newNodes = case cdata of
-- Node2Web _ -> [(head nodes){nodeServices=[("relay",show remotenode)]}]
-- _ -> nodes
let newNodes= map (\n -> n{nodeServices= nodeServices n ++ [("relay",show (remotenode,n))]}) nodes
callingNode<- fixNode $ head newNodes
liftIO $ writeIORef rnode $ Just callingNode
liftIO $ modifyMVar_ (connection callingNode) $ const $ return [conn]
liftIO $ modifyMVar_ (fromJust $ connection callingNode) $ const $ return [conn]
onException $ \(e :: SomeException) -> do
liftIO $ putStr "connect:" >> print e
liftIO $ putStrLn "removing node: " >> print callingNode
topState >>= showThreads
-- topState >>= showThreads
nodes <- getNodes
setNodes $ nodes \\ [callingNode]
@ -1863,7 +1884,7 @@ connect' remotenode= do
mclustered . local $ do
liftIO $ putStrLn "New nodes1111: " >> print newNodes
addNodes newNodes
addNodes newNodes
localIO $ atomically $ do
-- set the firt node (local node) as is called from outside

View file

@ -11,25 +11,26 @@
-- |
--
-----------------------------------------------------------------------------
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE ScopedTypeVariables, CPP #-}
#ifndef ghcjs_HOST_OS
module Transient.Move.Services where
import Transient.Base
import Transient.Move
import Transient.Logged(Loggable(..))
import Transient.Backtrack
import Transient.Internals(RemoteStatus(..), Log(..))
import Transient.Internals
import Transient.Move.Internals
import Transient.Logged
-- import Transient.Backtrack
-- import Transient.Internals(RemoteStatus(..), Log(..))
import Transient.Move.Utils
import Transient.EVars
-- import Transient.EVars
import Transient.Indeterminism
import Control.Monad.IO.Class
import System.Process
import System.IO.Unsafe
import Control.Concurrent.MVar
import Control.Applicative
import System.Directory
import System.Process
import Control.Monad
import Data.List
import Data.Maybe
@ -38,93 +39,57 @@ import Control.Concurrent(threadDelay)
import Control.Exception hiding(onException)
import Data.IORef
monitorService= ("https://github.com/agocorona/transient-universe","monitor")
monitorService= [("service","monitor")
,("executable", "monitorService")
,("package","https://github.com/transient-haskell/transient-universe")]
install :: String -> String -> String -> Int -> IO ()
install package program host port = do
exist <- findExecutable program -- liftIO $ doesDirectoryExist packagename
when (isNothing exist) $ do
let packagename = name package
when (null packagename) $ error $ "source for \""++package ++ "\" not found"
callProcess "git" ["clone",package]
liftIO $ putStr package >> putStrLn " cloned"
setCurrentDirectory packagename
callProcess "cabal" ["install","--force-reinstalls"]
setCurrentDirectory ".."
return()
let prog = pathExe program host port
print $ "executing "++ prog
let createprostruct= shell prog
createProcess $ createprostruct ; return ()
threadDelay 2000000
return() -- !> ("INSTALLED", program)
where
pathExe program host port= program ++ " -p start/" ++ show host ++"/" ++ show port
name url= slash . slash . slash $ slash url
where
slash= tail1 . dropWhile (/='/')
tail1 []=[]
tail1 x= tail x
monitorPort= 3000
rfreePort :: MVar Int
rfreePort = unsafePerformIO $ newMVar (monitorPort +1)
freePort :: MonadIO m => m Int
freePort= liftIO $ modifyMVar rfreePort $ \ n -> return (n+1,n)
initService ident service@(package, program)=
(local $ findInNodes service >>= return . head) <|> requestInstall service
initService :: String -> Service -> Cloud Node
initService ident service=
cached <|> installIt
where
requestInstall service = do
mnode <- callService' ident monitorNode (ident,service)
case mnode of
Nothing -> empty
Just node -> do
local $ addNodes [node] -- !> ("ADDNODES",service)
return node
cached= local $ do
ns <- findInNodes service
if null ns then empty
else return $ head ns
installIt= do
ns <- requestInstance ident service 1
if null ns then empty else return $ head ns
startMonitor= do
createProcess . shell $ "monitorService -p start/"++ show monitorPort
requestInstance :: String -> Service -> Int -> Cloud [Node]
requestInstance ident service num= loggedc $ do
return () !> "requestInstance"
local $ onException $ \(e:: ConnectionError) -> startMonitor >> continue !> ("Exception",e)
nodes <- callService' ident monitorNode (ident,service,num)
local $ addNodes nodes -- !> ("ADDNODES",service)
return nodes
startMonitor :: MonadIO m => m ()
startMonitor= liftIO $ do
createProcess . shell $ "monitorService -p start/localhost/"++ show monitorPort
threadDelay 2000000
nodeService (Node h _ _ _) port service= do
pool <- newMVar []
return $ Node h port pool [service]
findInNodes :: Service -> TransIO [Node]
findInNodes service = do
-- return () !> "FINDINNODES"
nodes <- getNodes
let ns = filter (\node -> service `elem` nodeServices node) nodes
if null ns then empty
else return ns
return $ filter (\node -> head service == head (nodeServices node)) nodes
-- where
--
-- callNodes' op init proc= loggedc $ do
-- nodes <- local getNodes
-- let nodes' = filter (not . isWebNode) nodes
-- foldr op init $ map (\node -> runAt node $ proc node) nodes' :: Cloud [Node]
-- where
-- isWebNode Node {nodeServices=srvs}
-- | ("webnode","") `elem` srvs = True
-- | otherwise = False
rfriends = unsafePerformIO $ newIORef ([] ::[String])
rservices = unsafePerformIO $ newIORef ([] ::[Service])
ridentsBanned = unsafePerformIO $ newIORef ([] ::[String])
rServicesBanned = unsafePerformIO $ newIORef ([] ::[Service])
inputAuthorizations= do
oneThread $ option "authorizations" "authorizations"
oneThread $ option "auth" "add authorizations for users and services"
showPerm <|> friends <|> services <|> identBanned <|> servicesBanned
empty
@ -162,6 +127,12 @@ inputAuthorizations= do
liftIO $ putStr "services allowed: " >> print services
liftIO $ putStr "services banned: " >> print servicesBanned
rfreePort :: MVar Int
rfreePort = unsafePerformIO $ newMVar (monitorPort +1)
freePort :: MonadIO m => m Int
freePort= liftIO $ modifyMVar rfreePort $ \ n -> return (n+1,n)
authorizeService :: MonadIO m => String -> Service -> m Bool
authorizeService ident service= do
@ -184,18 +155,14 @@ callService
:: (Loggable a, Loggable b)
=> String -> Service -> a -> Cloud b
callService ident service params = do
node <- initService ident service -- !> ("callservice initservice", service)
callService' ident node params -- !> ("NODE FOR SERVICE",node)
node <- initService ident service -- !> ("callservice initservice", service)
callService' ident node params -- !> ("NODE FOR SERVICE",node)
monitorNode= unsafePerformIO $ createNodeServ "localhost"
(fromIntegral monitorPort)
[monitorService]
monitorService
callService' ident node params = do
onAll $ onException $ \(e:: IOException) -> do
liftIO startMonitor
continue
log <- onAll $ do
log <- getSData <|> return emptyLog
setData emptyLog
@ -204,12 +171,15 @@ callService' ident node params = do
r <- wormhole node $ do
local $ return params
teleport
-- local empty `asTypeOf` typea params
local empty
restoreLog log -- !> "RESTORELOG"
restoreLog log -- !> "RESTORELOG"
return r
where
typea :: a -> Cloud a
typea = undefined
restoreLog (Log _ _ logw)= onAll $ do
Log _ _ logw' <- getSData <|> return emptyLog
@ -219,42 +189,45 @@ callService' ident node params = do
emptyLog= Log False [] []
catchc :: Exception e => Cloud a -> (e -> Cloud a) -> Cloud a
catchc a b= Cloud $ catcht (runCloud' a) (\e -> runCloud' $ b e)
runEmbeddedService :: (Loggable a, Loggable b) => Service -> (a -> Cloud b) -> Cloud b
runEmbeddedService servname serv = do
node <- localIO $ do
port <- freePort
createNodeServ "localhost" (fromIntegral port) [servname]
createNodeServ "localhost" (fromIntegral port) servname
listen node
wormhole notused $ loggedc $ do
x <- local $ return notused
wormhole (notused 4) $ loggedc $ do
x <- local $ return (notused 0)
r <- onAll $ runCloud (serv x) <** setData WasRemote
local $ return r
teleport
return r
where
notused= error "runEmbeddedService: variable should not be used"
notused n= error $ "runService: "++ show (n::Int) ++ " variable should not be used"
runService :: (Loggable a, Loggable b) => Service -> (a -> Cloud b) -> Cloud b
runService servname serv = do
initNodeServ [servname]
service
-- onAll inputAuthorizations -- <|> inputNodes
runService :: (Loggable a, Loggable b) => Service -> Int -> (a -> Cloud b) -> Cloud b
runService servname defPort serv = do
onAll $ onException $ \(e :: SomeException)-> liftIO $ print e
initNodeServ servname
service
where
service=
wormhole (notused 1) $ do
x <- local $ return $ notused 2
x <- local . return $ notused 2
-- setData emptyLog
r <- local $ runCloud (serv x) -- <** setData WasRemote
setData emptyLog
r <- local $ runCloud (serv x) <** setData WasRemote
local $ return r
teleport
return r
emptyLog= Log False [] []
notused n= error $ "runService: "++ show (n::Int) ++ " variable should not be used"
initNodeServ servs=do
initNodeServ servs=do
mynode <- local getNode
local $ do
@ -262,13 +235,14 @@ runService servname serv = do
liftIO $ writeIORef (myNode conn) mynode
setState conn
onAll inputAuthorizations <|> (inputNodes >> empty) <|> return ()
listen mynode
listen mynode
where
getNode :: TransIO Node
getNode = if isBrowserInstance then liftIO createWebNode else do
oneThread $ option "start" "re/start node"
host <- input (const True) "hostname of this node (must be reachable): "
port <- input (const True) "port to listen? "
host <- input (const True) "hostname of this node (must be reachable) (\"localhost\"): "
port <- input (const True) "port to listen? (3000) "
liftIO $ createNodeServ host port servs
inputNodes= do
@ -281,25 +255,12 @@ runService servname serv = do
port <- local $ input (const True) "port? "
nnode <- localIO $ createNodeServ host port [monitorService]
nnode <- localIO $ createNodeServ host port monitorService
local $ do
liftIO $ putStr "Added node: ">> print nnode
addNodes [nnode]
liftIO $ putStr "Added node: ">> print nnode
addNodes [nnode]
empty
{- |
a service called monitor:
runService
receive request for a service.
check service in list
if executing return node
when not installed install
execute
return node
-}
#else
requestInstance :: String -> Service -> Int -> Cloud [Node]
requestInstance ident service num= logged empty
#endif

View file

@ -12,17 +12,20 @@
--
-----------------------------------------------------------------------------
module Transient.Move.Utils (initNode,inputNodes, simpleWebApp, initWebApp
module Transient.Move.Utils (initNode,initNodeDef, initNodeServ, inputNodes, simpleWebApp, initWebApp
, onServer, onBrowser, runTestNodes)
where
--import Transient.Base
import Transient.Internals
import Transient.Move
import Transient.Move.Internals
import Control.Applicative
import Control.Monad.IO.Class
import Data.IORef
import System.Environment
import Control.Concurrent.MVar
import Data.Maybe
-- | ask in the console for the port number and initializes a node in the port specified
-- It needs the application to be initialized with `keep` to get input from the user.
@ -49,37 +52,58 @@ import Data.IORef
--
-- To translate the code from the browser to the server node, use `teleport`.
--
initNode :: Cloud () -> TransIO ()
initNode :: Loggable a => Cloud a -> TransIO a
initNode app= do
node <- getNodeParams
initWebApp node app
where
getNodeParams :: TransIO Node
getNodeParams =
getNodeParams :: TransIO Node
getNodeParams =
if isBrowserInstance then liftIO createWebNode else do
oneThread $ option "start" "re/start node"
host <- input (const True) "hostname of this node (must be reachable): "
port <- input (const True) "port to listen? "
liftIO $ createNode host port
initNodeDef :: Loggable a => String -> Int -> Cloud a -> TransIO a
initNodeDef host port app= do
node <- def <|> getNodeParams
initWebApp node app
where
def= do
args <- liftIO getArgs
if null args then liftIO $ createNode host port else empty
initNodeServ :: Loggable a => Service -> String -> Int -> Cloud a -> TransIO a
initNodeServ services host port app= do
node <- def <|> getNodeParams
let node'= node{nodeServices=services}
initWebApp node' $ app
where
def= do
args <- liftIO getArgs
if null args then liftIO $ createNode host port else empty
-- | ask for nodes to be added to the list of known nodes. it also ask to connect to the node to get
-- his list of known nodes. It returns empty
inputNodes :: Cloud empty
inputNodes= onServer $ listNodes <|> addNew
where
addNew= do
local $ option "add" "add a new node"
host <- local $ do
r <- input (const True) "Host to connect to: (none): "
r <- input (const True) "Hostname of the node (none): "
if r == "" then stop else return r
port <- local $ input (const True) "port? "
services <- local $ input' (Just []) (const True) "services? [] "
connectit <- local $ input (\x -> x=="y" || x== "n") "connect to the node to interchange node lists? "
nnode <- localIO $ createNode host port
nnode <- localIO $ createNodeServ host port services
if connectit== "y" then connect' nnode
else local $ do
liftIO $ putStr "Added node: ">> print nnode
@ -107,15 +131,15 @@ inputNodes= onServer $ listNodes <|> addNew
-- > ./program
--
--
simpleWebApp :: Integer -> Cloud () -> IO ()
simpleWebApp :: Loggable a => Integer -> Cloud a -> IO ()
simpleWebApp port app = do
node <- createNode "localhost" port
node <- createNode "localhost" $ fromIntegral port
keep $ initWebApp node app
return ()
-- | use this instead of smpleWebApp when you have to do some initializations in the server prior to the
-- initialization of the web server
initWebApp :: Node -> Cloud () -> TransIO ()
initWebApp :: Loggable a => Node -> Cloud a -> TransIO a
initWebApp node app= do
conn <- defConnection
liftIO $ writeIORef (myNode conn) node
@ -127,12 +151,10 @@ initWebApp node app= do
else return serverNode
runCloud $ do
listen mynode <|> return()
wormhole serverNode app
return ()
wormhole serverNode app
-- only execute if the the program is executing in the browser. The code inside can contain calls to the server.
-- Otherwise return empty (so it stop the computation).
-- Otherwise return empty (so it stop the computation and may execute alternative computations).
onBrowser :: Cloud a -> Cloud a
onBrowser x= do
r <- local $ return isBrowserInstance

View file

@ -3,7 +3,8 @@ packages:
- '.'
- location:
git: https://github.com/agocorona/transient.git
commit: 44062d3c8b29c9d357e07f6f023c4345ba0d71b8
commit: 07869d4852f14fa36e626f6fa389eacdd5ac1b51
extra-dep: true
extra-package-dbs: []
flags: {}

View file

@ -1,4 +1,4 @@
{-# LANGUAGE CPP #-}
{-# LANGUAGE CPP, ScopedTypeVariables #-}
module Main where
#ifndef ghcjs_HOST_OS
@ -12,79 +12,83 @@ import Data.Monoid
import Transient.Base
import Transient.Internals
import Transient.Indeterminism
import Transient.Logged
import Transient.Move
import Transient.Move.Utils
import Transient.Move.Services
import Transient.MapReduce
import Transient.EVars
import Control.Concurrent
import System.IO.Unsafe
import Data.List
import Control.Exception.Base
import qualified Data.Map as M
import System.Exit
import Control.Monad.State
import Control.Exception
-- #define _UPK_(x) {-# UNPACK #-} !(x)
import Control.Monad.State
#define _UPK_(x) {-# UNPACK #-} !(x)
#define shouldRun(x) (local $ getMyNode >>= \p -> assert ( p == (x)) (return ()))
#define shouldRun(x) (local $ getMyNode >>= \(Node _ p _ _) -> assert ( p == (x)) (return ()))
service= [("service","test suite")
,("executable", "test-transient1")
,("package","https://github.com/agocorona/transient-universe")]
main= do
let numNodes = 3
ports = [2000 .. 2000 + numNodes - 1]
createLocalNode = createNode "localhost"
nodes <- mapM createLocalNode ports
let n2000= head nodes
n2001= nodes !! 1
n2002= nodes !! 2
mr <- keep $ test `catcht` \(e:: SomeException) -> liftIO (putStr "EXCEPTiON: " >> print e) >> exit (Just e)
case mr of
Nothing -> print "EXCEPTION" >> exitFailure
Just Nothing -> print "SUCCESS" >> exitSuccess
Just (Just e) -> putStr "FAIL: " >> print e >> exitFailure
r <- keep' $ freeThreads $ runCloud $ do
test= initNodeServ service "localhost" 8080 $ do
node0 <- local getMyNode
local $ guard (nodePort node0== 8080) -- only executes in node 8080
-- local $ option "get" "get instances"
[node1,node2] <- requestInstance "" service 2
local ( option "f" "fire") <|> return "" -- to repeat the test, remove exit
runNodes nodes
localIO $ putStrLn "------checking Alternative distributed--------"
r <- local $ collect 3 $
runCloud $ (runAt n2000 (shouldRun(2000) >> return "hello" ))
<|> (runAt n2001 (shouldRun(2001) >> return "world" ))
<|> (runAt n2002 (shouldRun(2002) >> return "world2" ))
runCloud $ (runAt node0 (shouldRun( node0) >> return "hello" ))
<|> (runAt node1 (shouldRun( node1) >> return "world" ))
<|> (runAt node2 (shouldRun( node2) >> return "world2" ))
assert(sort r== ["hello", "world","world2"]) $ localIO $ print r
localIO $ putStrLn "--------------checking Applicative distributed--------"
r <- loggedc $(runAt node0 (shouldRun( node0) >> return "hello "))
<> (runAt node1 (shouldRun( node1) >> return "world " ))
<> (runAt node2 (shouldRun( node2) >> return "world2" ))
assert(r== "hello world world2") $ localIO $ print r
loggedc $ assert(sort r== ["hello", "world","world2"]) $ lliftIO $ print r
localIO $ putStrLn "----------------checking monadic, distributed-------------"
r <- runAt node0 (shouldRun(node0)
>> runAt node1 (shouldRun (node1)
>> runAt node2 (shouldRun(node2) >> (return "HELLO" ))))
lliftIO $ putStrLn "--------------checking Applicative distributed--------"
r <- loggedc $(runAt n2000 (shouldRun(2000) >> return "hello "))
<> (runAt n2001 (shouldRun(2001) >> return "world " ))
<> (runAt n2002 (shouldRun(2002) >> return "world2" ))
assert(r== "hello world world2") $ lliftIO $ print r
lliftIO $ putStrLn "----------------checking monadic, distributed-------------"
r <- runAt n2000 (shouldRun(2000)
>> runAt n2001 (shouldRun(2001)
>> runAt n2002 (shouldRun(2002) >> (return "HELLO" ))))
assert(r== "HELLO") $ lliftIO $ print r
lliftIO $ putStrLn "----------------checking map-reduce -------------"
assert(r== "HELLO") $ localIO $ print r
localIO $ putStrLn "----------------checking map-reduce -------------"
r <- reduce (+) . mapKeyB (\w -> (w, 1 :: Int)) $ getText words "hello world hello"
lliftIO $ putStr "SOLUTION: " >> print r
localIO $ print r
assert (sort (M.toList r) == sort [("hello",2::Int),("world",1)]) $ return r
local $ exit ()
print "SUCCESS"
exitSuccess
return Nothing
runNodes nodes= foldr (<|>) empty (map listen nodes) <|> return ()
local $ exit (Nothing :: Maybe SomeException) -- remove this to repeat the test
#else
main= return ()
#endif

View file

@ -1,115 +1,155 @@
name: transient-universe
version: 0.4.4.1
cabal-version: >=1.10
build-type: Simple
license: MIT
license-file: LICENSE
maintainer: agocorona@gmail.com
homepage: http://www.fpcomplete.com/user/agocorona
bug-reports: https://github.com/agocorona/transient-universe/issues
synopsis: Distributed computing with algebraic/monadic composability, map-reduce
description:
See <http://github.com/agocorona/transient>.
category: Control
author: Alberto G. Corona
extra-source-files:
ChangeLog.md README.md
app/client/Transient/Move/Services/MonitorService.hs
app/server/Transient/Move/Services/MonitorService.hs
source-repository head
type: git
location: https://github.com/agocorona/transient-universe
library
if !impl(ghcjs >=0.1)
exposed-modules:
Transient.Move.Services
if impl(ghcjs >=0.1)
build-depends:
ghcjs-base -any,
ghcjs-prim -any
else
build-depends:
HTTP -any,
TCache >= 0.12,
case-insensitive -any,
directory -any,
filepath -any,
hashable -any,
iproute -any,
network -any,
network-info -any,
network-uri -any,
vector -any,
websockets -any
exposed-modules:
Transient.Move
Transient.MapReduce
Transient.Move.Internals
Transient.Move.Utils
build-depends:
base >4 && <5,
bytestring -any,
containers -any,
mtl -any,
process -any,
random -any,
stm -any,
text -any,
time -any,
transformers -any,
transient >= 0.5.4
default-language: Haskell2010
hs-source-dirs: src .
executable monitorService
if !impl(ghcjs >=0.1)
build-depends:
transformers -any,
transient >=0.5.4,
transient-universe -any
hs-source-dirs: app/server/Transient/Move/Services
else
hs-source-dirs: app/client/Transient/Move/Services
main-is: MonitorService.hs
build-depends:
base >4 && <5
default-language: Haskell2010
ghc-options: -threaded -rtsopts
test-suite test-transient
if !impl(ghcjs >=0.1)
build-depends:
mtl -any,
transient -any,
random -any,
text -any,
containers -any,
directory -any,
filepath -any,
stm -any,
HTTP -any,
network -any,
transformers -any,
process -any,
network -any,
network-info -any,
bytestring -any,
time -any,
vector -any,
TCache >= 0.12,
websockets -any,
network-uri -any,
case-insensitive -any,
hashable -any
type: exitcode-stdio-1.0
main-is: TestSuite.hs
build-depends:
base >4
default-language: Haskell2010
hs-source-dirs: tests src .
name: transient-universe
version: 0.4.4
cabal-version: >=1.10
build-type: Simple
license: MIT
license-file: LICENSE
maintainer: agocorona@gmail.com
homepage: http://www.fpcomplete.com/user/agocorona
bug-reports: https://github.com/agocorona/transient-universe/issues
synopsis: Remote execution and map-reduce: distributed computing for Transient
description:
See <http://github.com/agocorona/transient>.
category: Control
author: Alberto G. Corona
extra-source-files:
ChangeLog.md README.md
app/client/Transient/Move/Services/MonitorService.hs
app/server/Transient/Move/Services/MonitorService.hs
source-repository head
type: git
location: https://github.com/agocorona/transient-universe
library
if !impl(ghcjs >=0.1)
exposed-modules:
Transient.Move.Services
if impl(ghcjs >=0.1)
build-depends:
ghcjs-base -any,
ghcjs-prim -any
else
build-depends:
HTTP -any,
TCache >= 0.12,
case-insensitive -any,
directory -any,
filepath -any,
hashable -any,
iproute -any,
network -any,
network-info -any,
network-uri -any,
vector -any,
websockets -any,
process -any,
random -any,
text -any
exposed-modules:
Transient.Move
Transient.MapReduce
Transient.Move.Internals
Transient.Move.Utils
build-depends:
base >4 && <5,
bytestring -any,
containers -any,
mtl -any,
stm -any,
time -any,
transformers -any,
transient >= 0.5.6
default-language: Haskell2010
hs-source-dirs: src .
executable monitorService
if !impl(ghcjs >=0.1)
build-depends:
transformers -any,
transient >=0.5.6,
transient-universe,
process,
directory
hs-source-dirs: app/server/Transient/Move/Services
else
hs-source-dirs: app/client/Transient/Move/Services
main-is: MonitorService.hs
build-depends:
base >4 && <5
default-language: Haskell2010
ghc-options: -threaded -rtsopts
executable test-transient1
if !impl(ghcjs >=0.1)
build-depends:
mtl -any,
transient >= 0.5.6,
random -any,
text -any,
containers -any,
directory -any,
filepath -any,
stm -any,
HTTP -any,
network -any,
transformers -any,
process -any,
network -any,
network-info -any,
bytestring -any,
time -any,
vector -any,
TCache >= 0.12,
websockets -any,
network-uri -any,
case-insensitive -any,
hashable -any
main-is: TestSuite.hs
build-depends:
base >4
default-language: Haskell2010
hs-source-dirs: tests src .
ghc-options: -threaded -rtsopts
test-suite test-transient
if !impl(ghcjs >=0.1)
build-depends:
mtl -any,
transient >= 0.5.6,
random -any,
text -any,
containers -any,
directory -any,
filepath -any,
stm -any,
HTTP -any,
network -any,
transformers -any,
process -any,
network -any,
network-info -any,
bytestring -any,
time -any,
vector -any,
TCache >= 0.12,
websockets -any,
network-uri -any,
case-insensitive -any,
hashable -any
type: exitcode-stdio-1.0
main-is: TestSuite.hs
build-depends:
base >4
default-language: Haskell2010
hs-source-dirs: tests src .
ghc-options: -threaded -rtsopts