module Tile.Execution ( runBroadcast, runGather, runReduce, runScatter, ) where import Control.Concurrent import Control.Monad import Data.Map.Strict qualified as Map import Data.Set qualified as Set import Tile.Schedule runBroadcast :: Schedule String -> String -> IO () runBroadcast :: Schedule String -> String -> IO () runBroadcast Schedule String schedule String root = do let graph :: Map String [String] graph = Schedule String -> Map String [String] forall a. Ord a => Schedule a -> Map a [a] adjacencyList Schedule String schedule members :: [String] members = Set String -> [String] forall a. Set a -> [a] Set.toList (Set String -> [String]) -> Set String -> [String] forall a b. (a -> b) -> a -> b $ [String] -> Set String forall a. Ord a => [a] -> Set a Set.fromList (Map String [String] -> [String] forall k a. Map k a -> [k] Map.keys Map String [String] graph [String] -> [String] -> [String] forall a. [a] -> [a] -> [a] ++ [[String]] -> [String] forall (t :: * -> *) a. Foldable t => t [a] -> [a] concat (Map String [String] -> [[String]] forall k a. Map k a -> [a] Map.elems Map String [String] graph)) chans <- [String] -> (String -> IO (String, Chan String)) -> IO [(String, Chan String)] forall (t :: * -> *) (m :: * -> *) a b. (Traversable t, Monad m) => t a -> (a -> m b) -> m (t b) forM [String] members ((String -> IO (String, Chan String)) -> IO [(String, Chan String)]) -> (String -> IO (String, Chan String)) -> IO [(String, Chan String)] forall a b. (a -> b) -> a -> b $ \String m -> do ch <- IO (Chan String) forall a. IO (Chan a) newChan pure (m, ch) let chanMap = [(String, Chan String)] -> Map String (Chan String) forall k a. Ord k => [(k, a)] -> Map k a Map.fromList [(String, Chan String)] chans forM_ members $ \String m -> do let inbox :: Chan String inbox = Map String (Chan String) chanMap Map String (Chan String) -> String -> Chan String forall k a. Ord k => Map k a -> k -> a Map.! String m children :: [String] children = [String] -> String -> Map String [String] -> [String] forall k a. Ord k => a -> k -> Map k a -> a Map.findWithDefault [] String m Map String [String] graph childChans :: [(String, Chan String)] childChans = [(String c, Map String (Chan String) chanMap Map String (Chan String) -> String -> Chan String forall k a. Ord k => Map k a -> k -> a Map.! String c) | String c <- [String] children] _ <- IO () -> IO ThreadId forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId forall a b. (a -> b) -> a -> b $ IO () -> IO () forall (f :: * -> *) a b. Applicative f => f a -> f b forever (IO () -> IO ()) -> IO () -> IO () forall a b. (a -> b) -> a -> b $ do msg <- Chan String -> IO String forall a. Chan a -> IO a readChan Chan String inbox putStrLn $ m ++ " received: " ++ msg forM_ childChans $ \(String childName, Chan String childInbox) -> do String -> IO () putStrLn (String -> IO ()) -> String -> IO () forall a b. (a -> b) -> a -> b $ String m String -> String -> String forall a. [a] -> [a] -> [a] ++ String " forwarding to " String -> String -> String forall a. [a] -> [a] -> [a] ++ String childName Chan String -> String -> IO () forall a. Chan a -> a -> IO () writeChan Chan String childInbox String msg pure () writeChan (chanMap Map.! root) "hello" threadDelay 1000000 incomingCounts :: (Ord a) => Schedule a -> Map.Map a Int incomingCounts :: forall a. Ord a => Schedule a -> Map a Int incomingCounts = (Step a -> Map a Int -> Map a Int) -> Map a Int -> [Step a] -> Map a Int forall a b. (a -> b -> b) -> b -> [a] -> b forall (t :: * -> *) a b. Foldable t => (a -> b -> b) -> b -> t a -> b foldr (\Step {to :: forall a. Step a -> a to = a c} Map a Int m -> (Int -> Int -> Int) -> a -> Int -> Map a Int -> Map a Int forall k a. Ord k => (a -> a -> a) -> k -> a -> Map k a -> Map k a Map.insertWith Int -> Int -> Int forall a. Num a => a -> a -> a (+) a c Int 1 Map a Int m) Map a Int forall k a. Map k a Map.empty runReduce :: Schedule String -> [(String, Int)] -> (Int -> Int -> Int) -> String -> IO () runReduce :: Schedule String -> [(String, Int)] -> (Int -> Int -> Int) -> String -> IO () runReduce Schedule String schedule [(String, Int)] initialValues Int -> Int -> Int combine String root = do let graph :: Map String [String] graph = Schedule String -> Map String [String] forall a. Ord a => Schedule a -> Map a [a] adjacencyList Schedule String schedule incoming :: Map String Int incoming = Schedule String -> Map String Int forall a. Ord a => Schedule a -> Map a Int incomingCounts Schedule String schedule members :: [String] members = Set String -> [String] forall a. Set a -> [a] Set.toList (Set String -> [String]) -> Set String -> [String] forall a b. (a -> b) -> a -> b $ [String] -> Set String forall a. Ord a => [a] -> Set a Set.fromList ([String] -> Set String) -> [String] -> Set String forall a b. (a -> b) -> a -> b $ Map String [String] -> [String] forall k a. Map k a -> [k] Map.keys Map String [String] graph [String] -> [String] -> [String] forall a. [a] -> [a] -> [a] ++ [[String]] -> [String] forall (t :: * -> *) a. Foldable t => t [a] -> [a] concat (Map String [String] -> [[String]] forall k a. Map k a -> [a] Map.elems Map String [String] graph) [String] -> [String] -> [String] forall a. [a] -> [a] -> [a] ++ ((String, Int) -> String) -> [(String, Int)] -> [String] forall a b. (a -> b) -> [a] -> [b] map (String, Int) -> String forall a b. (a, b) -> a fst [(String, Int)] initialValues chanPairs <- [String] -> (String -> IO (String, Chan Int)) -> IO [(String, Chan Int)] forall (t :: * -> *) (m :: * -> *) a b. (Traversable t, Monad m) => t a -> (a -> m b) -> m (t b) forM [String] members ((String -> IO (String, Chan Int)) -> IO [(String, Chan Int)]) -> (String -> IO (String, Chan Int)) -> IO [(String, Chan Int)] forall a b. (a -> b) -> a -> b $ \String m -> do ch <- IO (Chan Int) forall a. IO (Chan a) newChan pure (m, ch) let chanMap = [(String, Chan Int)] -> Map String (Chan Int) forall k a. Ord k => [(k, a)] -> Map k a Map.fromList [(String, Chan Int)] chanPairs valueMap = [(String, Int)] -> Map String Int forall k a. Ord k => [(k, a)] -> Map k a Map.fromList [(String, Int)] initialValues forM_ members $ \String m -> do let inbox :: Chan Int inbox = Map String (Chan Int) chanMap Map String (Chan Int) -> String -> Chan Int forall k a. Ord k => Map k a -> k -> a Map.! String m children :: [String] children = [String] -> String -> Map String [String] -> [String] forall k a. Ord k => a -> k -> Map k a -> a Map.findWithDefault [] String m Map String [String] graph childChans :: [(String, Chan Int)] childChans = [(String c, Map String (Chan Int) chanMap Map String (Chan Int) -> String -> Chan Int forall k a. Ord k => Map k a -> k -> a Map.! String c) | String c <- [String] children] expected :: Int expected = Int -> String -> Map String Int -> Int forall k a. Ord k => a -> k -> Map k a -> a Map.findWithDefault Int 0 String m Map String Int incoming localValue :: Int localValue = Map String Int valueMap Map String Int -> String -> Int forall k a. Ord k => Map k a -> k -> a Map.! String m _ <- IO () -> IO ThreadId forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId forall a b. (a -> b) -> a -> b $ do received <- Int -> IO Int -> IO [Int] forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a] replicateM Int expected (Chan Int -> IO Int forall a. Chan a -> IO a readChan Chan Int inbox) let total = (Int -> Int -> Int) -> Int -> [Int] -> Int forall b a. (b -> a -> b) -> b -> [a] -> b forall (t :: * -> *) b a. Foldable t => (b -> a -> b) -> b -> t a -> b foldl Int -> Int -> Int combine Int localValue [Int] received if m == root then putStrLn $ m ++ " reduced result: " ++ show total else forM_ childChans $ \(String childName, Chan Int childInbox) -> do String -> IO () putStrLn (String -> IO ()) -> String -> IO () forall a b. (a -> b) -> a -> b $ String m String -> String -> String forall a. [a] -> [a] -> [a] ++ String " sending reduced value " String -> String -> String forall a. [a] -> [a] -> [a] ++ Int -> String forall a. Show a => a -> String show Int total String -> String -> String forall a. [a] -> [a] -> [a] ++ String " to " String -> String -> String forall a. [a] -> [a] -> [a] ++ String childName Chan Int -> Int -> IO () forall a. Chan a -> a -> IO () writeChan Chan Int childInbox Int total pure () forM_ members $ \String m -> Bool -> IO () -> IO () forall (f :: * -> *). Applicative f => Bool -> f () -> f () when (Int -> String -> Map String Int -> Int forall k a. Ord k => a -> k -> Map k a -> a Map.findWithDefault Int 0 String m Map String Int incoming Int -> Int -> Bool forall a. Eq a => a -> a -> Bool == Int 0) (IO () -> IO ()) -> IO () -> IO () forall a b. (a -> b) -> a -> b $ Chan Int -> Int -> IO () forall a. Chan a -> a -> IO () writeChan (Map String (Chan Int) chanMap Map String (Chan Int) -> String -> Chan Int forall k a. Ord k => Map k a -> k -> a Map.! String m) (Map String Int valueMap Map String Int -> String -> Int forall k a. Ord k => Map k a -> k -> a Map.! String m) threadDelay 1000000 runGather :: (Show a) => Schedule String -> [(String, a)] -> String -> IO () runGather :: forall a. Show a => Schedule String -> [(String, a)] -> String -> IO () runGather Schedule String schedule [(String, a)] initialValues String root = do let graph :: Map String [String] graph = Schedule String -> Map String [String] forall a. Ord a => Schedule a -> Map a [a] adjacencyList Schedule String schedule incoming :: Map String Int incoming = Schedule String -> Map String Int forall a. Ord a => Schedule a -> Map a Int incomingCounts Schedule String schedule members :: [String] members = Set String -> [String] forall a. Set a -> [a] Set.toList (Set String -> [String]) -> Set String -> [String] forall a b. (a -> b) -> a -> b $ [String] -> Set String forall a. Ord a => [a] -> Set a Set.fromList ([String] -> Set String) -> [String] -> Set String forall a b. (a -> b) -> a -> b $ Map String [String] -> [String] forall k a. Map k a -> [k] Map.keys Map String [String] graph [String] -> [String] -> [String] forall a. [a] -> [a] -> [a] ++ [[String]] -> [String] forall (t :: * -> *) a. Foldable t => t [a] -> [a] concat (Map String [String] -> [[String]] forall k a. Map k a -> [a] Map.elems Map String [String] graph) [String] -> [String] -> [String] forall a. [a] -> [a] -> [a] ++ ((String, a) -> String) -> [(String, a)] -> [String] forall a b. (a -> b) -> [a] -> [b] map (String, a) -> String forall a b. (a, b) -> a fst [(String, a)] initialValues chanPairs <- [String] -> (String -> IO (String, Chan [(String, a)])) -> IO [(String, Chan [(String, a)])] forall (t :: * -> *) (m :: * -> *) a b. (Traversable t, Monad m) => t a -> (a -> m b) -> m (t b) forM [String] members ((String -> IO (String, Chan [(String, a)])) -> IO [(String, Chan [(String, a)])]) -> (String -> IO (String, Chan [(String, a)])) -> IO [(String, Chan [(String, a)])] forall a b. (a -> b) -> a -> b $ \String m -> do ch <- IO (Chan [(String, a)]) forall a. IO (Chan a) newChan pure (m, ch) let chanMap = [(String, Chan [(String, a)])] -> Map String (Chan [(String, a)]) forall k a. Ord k => [(k, a)] -> Map k a Map.fromList [(String, Chan [(String, a)])] chanPairs valueMap = [(String, a)] -> Map String a forall k a. Ord k => [(k, a)] -> Map k a Map.fromList [(String, a)] initialValues forM_ members $ \String m -> do let inbox :: Chan [(String, a)] inbox = Map String (Chan [(String, a)]) chanMap Map String (Chan [(String, a)]) -> String -> Chan [(String, a)] forall k a. Ord k => Map k a -> k -> a Map.! String m children :: [String] children = [String] -> String -> Map String [String] -> [String] forall k a. Ord k => a -> k -> Map k a -> a Map.findWithDefault [] String m Map String [String] graph childChans :: [(String, Chan [(String, a)])] childChans = [(String c, Map String (Chan [(String, a)]) chanMap Map String (Chan [(String, a)]) -> String -> Chan [(String, a)] forall k a. Ord k => Map k a -> k -> a Map.! String c) | String c <- [String] children] expected :: Int expected = Int -> String -> Map String Int -> Int forall k a. Ord k => a -> k -> Map k a -> a Map.findWithDefault Int 0 String m Map String Int incoming localValue :: [(String, a)] localValue = [(String m, Map String a valueMap Map String a -> String -> a forall k a. Ord k => Map k a -> k -> a Map.! String m)] _ <- IO () -> IO ThreadId forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId forall a b. (a -> b) -> a -> b $ do received <- [[(String, a)]] -> [(String, a)] forall (t :: * -> *) a. Foldable t => t [a] -> [a] concat ([[(String, a)]] -> [(String, a)]) -> IO [[(String, a)]] -> IO [(String, a)] forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> Int -> IO [(String, a)] -> IO [[(String, a)]] forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a] replicateM Int expected (Chan [(String, a)] -> IO [(String, a)] forall a. Chan a -> IO a readChan Chan [(String, a)] inbox) let gathered = [(String, a)] localValue [(String, a)] -> [(String, a)] -> [(String, a)] forall a. [a] -> [a] -> [a] ++ [(String, a)] received if m == root then putStrLn $ m ++ " gathered result: " ++ show gathered else forM_ childChans $ \(String childName, Chan [(String, a)] childInbox) -> do String -> IO () putStrLn (String -> IO ()) -> String -> IO () forall a b. (a -> b) -> a -> b $ String m String -> String -> String forall a. [a] -> [a] -> [a] ++ String " sending gathered values " String -> String -> String forall a. [a] -> [a] -> [a] ++ [(String, a)] -> String forall a. Show a => a -> String show [(String, a)] gathered String -> String -> String forall a. [a] -> [a] -> [a] ++ String " to " String -> String -> String forall a. [a] -> [a] -> [a] ++ String childName Chan [(String, a)] -> [(String, a)] -> IO () forall a. Chan a -> a -> IO () writeChan Chan [(String, a)] childInbox [(String, a)] gathered pure () forM_ members $ \String m -> Bool -> IO () -> IO () forall (f :: * -> *). Applicative f => Bool -> f () -> f () when (Int -> String -> Map String Int -> Int forall k a. Ord k => a -> k -> Map k a -> a Map.findWithDefault Int 0 String m Map String Int incoming Int -> Int -> Bool forall a. Eq a => a -> a -> Bool == Int 0) (IO () -> IO ()) -> IO () -> IO () forall a b. (a -> b) -> a -> b $ Chan [(String, a)] -> [(String, a)] -> IO () forall a. Chan a -> a -> IO () writeChan (Map String (Chan [(String, a)]) chanMap Map String (Chan [(String, a)]) -> String -> Chan [(String, a)] forall k a. Ord k => Map k a -> k -> a Map.! String m) [(String m, Map String a valueMap Map String a -> String -> a forall k a. Ord k => Map k a -> k -> a Map.! String m)] threadDelay 1000000 runScatter :: (Show a) => Schedule String -> [(String, a)] -> String -> IO () runScatter :: forall a. Show a => Schedule String -> [(String, a)] -> String -> IO () runScatter Schedule String schedule [(String, a)] initialValues String root = do let graph :: Map String [String] graph = Schedule String -> Map String [String] forall a. Ord a => Schedule a -> Map a [a] adjacencyList Schedule String schedule incoming :: Map String Int incoming = Schedule String -> Map String Int forall a. Ord a => Schedule a -> Map a Int incomingCounts Schedule String schedule members :: [String] members = Set String -> [String] forall a. Set a -> [a] Set.toList (Set String -> [String]) -> Set String -> [String] forall a b. (a -> b) -> a -> b $ [String] -> Set String forall a. Ord a => [a] -> Set a Set.fromList ([String] -> Set String) -> [String] -> Set String forall a b. (a -> b) -> a -> b $ Map String [String] -> [String] forall k a. Map k a -> [k] Map.keys Map String [String] graph [String] -> [String] -> [String] forall a. [a] -> [a] -> [a] ++ [[String]] -> [String] forall (t :: * -> *) a. Foldable t => t [a] -> [a] concat (Map String [String] -> [[String]] forall k a. Map k a -> [a] Map.elems Map String [String] graph) [String] -> [String] -> [String] forall a. [a] -> [a] -> [a] ++ ((String, a) -> String) -> [(String, a)] -> [String] forall a b. (a -> b) -> [a] -> [b] map (String, a) -> String forall a b. (a, b) -> a fst [(String, a)] initialValues subtree :: String -> Set String subtree = Map String [String] -> String -> Set String subtreeMembers Map String [String] graph chanPairs <- [String] -> (String -> IO (String, Chan [(String, a)])) -> IO [(String, Chan [(String, a)])] forall (t :: * -> *) (m :: * -> *) a b. (Traversable t, Monad m) => t a -> (a -> m b) -> m (t b) forM [String] members ((String -> IO (String, Chan [(String, a)])) -> IO [(String, Chan [(String, a)])]) -> (String -> IO (String, Chan [(String, a)])) -> IO [(String, Chan [(String, a)])] forall a b. (a -> b) -> a -> b $ \String m -> do ch <- IO (Chan [(String, a)]) forall a. IO (Chan a) newChan pure (m, ch) let chanMap = [(String, Chan [(String, a)])] -> Map String (Chan [(String, a)]) forall k a. Ord k => [(k, a)] -> Map k a Map.fromList [(String, Chan [(String, a)])] chanPairs forM_ members $ \String m -> do let inbox :: Chan [(String, a)] inbox = Map String (Chan [(String, a)]) chanMap Map String (Chan [(String, a)]) -> String -> Chan [(String, a)] forall k a. Ord k => Map k a -> k -> a Map.! String m children :: [String] children = [String] -> String -> Map String [String] -> [String] forall k a. Ord k => a -> k -> Map k a -> a Map.findWithDefault [] String m Map String [String] graph childChans :: [(String, Chan [(String, a)])] childChans = [(String c, Map String (Chan [(String, a)]) chanMap Map String (Chan [(String, a)]) -> String -> Chan [(String, a)] forall k a. Ord k => Map k a -> k -> a Map.! String c) | String c <- [String] children] expected :: Int expected | String m String -> String -> Bool forall a. Eq a => a -> a -> Bool == String root = Int 1 | Bool otherwise = Int -> String -> Map String Int -> Int forall k a. Ord k => a -> k -> Map k a -> a Map.findWithDefault Int 0 String m Map String Int incoming _ <- IO () -> IO ThreadId forkIO (IO () -> IO ThreadId) -> IO () -> IO ThreadId forall a b. (a -> b) -> a -> b $ do payload <- [[(String, a)]] -> [(String, a)] forall (t :: * -> *) a. Foldable t => t [a] -> [a] concat ([[(String, a)]] -> [(String, a)]) -> IO [[(String, a)]] -> IO [(String, a)] forall (f :: * -> *) a b. Functor f => (a -> b) -> f a -> f b <$> Int -> IO [(String, a)] -> IO [[(String, a)]] forall (m :: * -> *) a. Applicative m => Int -> m a -> m [a] replicateM Int expected (Chan [(String, a)] -> IO [(String, a)] forall a. Chan a -> IO a readChan Chan [(String, a)] inbox) case lookup m payload of Just a value -> String -> IO () putStrLn (String -> IO ()) -> String -> IO () forall a b. (a -> b) -> a -> b $ String m String -> String -> String forall a. [a] -> [a] -> [a] ++ String " received scatter value: " String -> String -> String forall a. [a] -> [a] -> [a] ++ a -> String forall a. Show a => a -> String show a value Maybe a Nothing -> () -> IO () forall a. a -> IO a forall (f :: * -> *) a. Applicative f => a -> f a pure () forM_ childChans $ \(String childName, Chan [(String, a)] childInbox) -> do let childMembers :: Set String childMembers = String -> Set String subtree String childName childPayload :: [(String, a)] childPayload = [ (String, a) item | item :: (String, a) item@(String dest, a _) <- [(String, a)] payload, String dest String -> Set String -> Bool forall a. Ord a => a -> Set a -> Bool `Set.member` Set String childMembers ] String -> IO () putStrLn (String -> IO ()) -> String -> IO () forall a b. (a -> b) -> a -> b $ String m String -> String -> String forall a. [a] -> [a] -> [a] ++ String " scattering " String -> String -> String forall a. [a] -> [a] -> [a] ++ [(String, a)] -> String forall a. Show a => a -> String show [(String, a)] childPayload String -> String -> String forall a. [a] -> [a] -> [a] ++ String " to " String -> String -> String forall a. [a] -> [a] -> [a] ++ String childName Chan [(String, a)] -> [(String, a)] -> IO () forall a. Chan a -> a -> IO () writeChan Chan [(String, a)] childInbox [(String, a)] childPayload pure () writeChan (chanMap Map.! root) initialValues threadDelay 1000000 subtreeMembers :: Map.Map String [String] -> String -> Set.Set String subtreeMembers :: Map String [String] -> String -> Set String subtreeMembers Map String [String] graph String member = String -> Set String -> Set String forall a. Ord a => a -> Set a -> Set a Set.insert String member (Set String -> Set String) -> Set String -> Set String forall a b. (a -> b) -> a -> b $ [Set String] -> Set String forall (f :: * -> *) a. (Foldable f, Ord a) => f (Set a) -> Set a Set.unions [ Map String [String] -> String -> Set String subtreeMembers Map String [String] graph String child | String child <- [String] -> String -> Map String [String] -> [String] forall k a. Ord k => a -> k -> Map k a -> a Map.findWithDefault [] String member Map String [String] graph ]