added more control
This commit is contained in:
parent
dc6b0d3d7f
commit
2fc3e24a71
2 changed files with 55 additions and 13 deletions
|
@ -21,12 +21,16 @@ executable muraine
|
|||
-- other-modules:
|
||||
-- other-extensions:
|
||||
build-depends: base >=4.7 && <4.8
|
||||
, aeson
|
||||
, bytestring
|
||||
, case-insensitive
|
||||
, conduit
|
||||
, http-conduit
|
||||
, http-types
|
||||
, old-locale
|
||||
, text
|
||||
, time
|
||||
, vector
|
||||
, unordered-containers
|
||||
hs-source-dirs: src/
|
||||
default-language: Haskell2010
|
||||
|
|
64
src/Main.hs
64
src/Main.hs
|
@ -2,6 +2,12 @@
|
|||
module Main where
|
||||
|
||||
-- import Data.Conduit
|
||||
import qualified Data.HashMap.Strict as H
|
||||
import Data.Aeson
|
||||
import Data.Vector ((!?))
|
||||
import Data.Text (Text)
|
||||
import qualified Data.Text as T
|
||||
|
||||
import Network.HTTP.Conduit
|
||||
import Network.HTTP.Types.Header (Header,RequestHeaders)
|
||||
import Network.HTTP.Types.Status (statusIsSuccessful,notModified304)
|
||||
|
@ -47,7 +53,7 @@ main :: IO ()
|
|||
main = do
|
||||
args <- getArgs
|
||||
case args of
|
||||
[user,pass] -> getEvents user pass Nothing 100000
|
||||
[user,pass] -> getEvents user pass Nothing 100000 Nothing
|
||||
_ -> showHelpAndExit
|
||||
|
||||
rfc822DateFormat :: String
|
||||
|
@ -63,12 +69,28 @@ time action = do
|
|||
endTime <- getCPUTime
|
||||
return (fromIntegral (endTime - startTime)/(10**12),res)
|
||||
|
||||
anArray :: Value -> Maybe Array
|
||||
anArray (Array a) = Just a
|
||||
anArray _ = Nothing
|
||||
|
||||
anObject :: Value -> Maybe Object
|
||||
anObject (Object a) = Just a
|
||||
anObject _ = Nothing
|
||||
|
||||
aString :: Value -> Maybe Text
|
||||
aString (String a) = Just a
|
||||
aString _ = Nothing
|
||||
|
||||
getFirstId :: LZ.ByteString -> Maybe Text
|
||||
getFirstId body = decode body >>= anArray >>= (!? 0) >>= anObject >>= H.lookup "id" >>= aString
|
||||
|
||||
getTimeAndEtagFromResponse :: Int
|
||||
-> Maybe B.ByteString
|
||||
-> Response LZ.ByteString
|
||||
-> Double
|
||||
-> IO (Int, Maybe B.ByteString)
|
||||
getTimeAndEtagFromResponse oldTime etag response req_time =
|
||||
-> Maybe Text
|
||||
-> IO (Int, Maybe B.ByteString,Maybe Text)
|
||||
getTimeAndEtagFromResponse oldTime etag response req_time oldFirstId =
|
||||
if statusIsSuccessful (responseStatus response)
|
||||
then do
|
||||
let headers = responseHeaders response
|
||||
|
@ -85,27 +107,43 @@ getTimeAndEtagFromResponse oldTime etag response req_time =
|
|||
timeBeforeReset = reset - serverDateEpoch
|
||||
t = 1000000 * timeBeforeReset `div` remaining
|
||||
timeToWaitIn_us = max 0 (t - floor (1000000 * req_time))
|
||||
-- TODO: read all pages until we reach the first ID of the first page
|
||||
-- of the preceeding loop
|
||||
publish (responseBody response)
|
||||
return (timeToWaitIn_us,etagResponded)
|
||||
-- TODO: read all pages until we reach the first ID of the first page
|
||||
-- of the preceeding loop
|
||||
firstId = getFirstId (responseBody response)
|
||||
publish (responseBody response) oldFirstId
|
||||
return (timeToWaitIn_us,etagResponded,firstId)
|
||||
else do
|
||||
putStrLn (if notModified304 == responseStatus response
|
||||
then "Nothing changed"
|
||||
else "Something went wrong")
|
||||
return (oldTime,etag)
|
||||
return (oldTime,etag,oldFirstId)
|
||||
|
||||
getEvents :: String -- ^ Github username
|
||||
-> String -- ^ Github password
|
||||
-> Maybe B.ByteString -- ^ ETag
|
||||
-> Int -- ^ Time to wait in micro seconds
|
||||
-> Maybe Text -- ^ First Event Id
|
||||
-> IO ()
|
||||
getEvents user pass etag t = do
|
||||
getEvents user pass etag t oldFirstId = do
|
||||
-- Call /events on github
|
||||
(req_time, response) <- time (httpGHEvents user pass etag)
|
||||
(timeToWaitIn_us,etagResponded) <- getTimeAndEtagFromResponse t etag response req_time
|
||||
(timeToWaitIn_us,etagResponded,firstId) <- getTimeAndEtagFromResponse t etag response req_time oldFirstId
|
||||
threadDelay timeToWaitIn_us
|
||||
getEvents user pass etagResponded timeToWaitIn_us
|
||||
getEvents user pass etagResponded timeToWaitIn_us firstId
|
||||
|
||||
publish :: LZ.ByteString -> IO ()
|
||||
publish = LZ.putStrLn . LZ.take 40
|
||||
-- takeUpUntil :: Maybe Text -> Maybe Value -> Maybe [Value]
|
||||
-- takeUpUntil firstId = error "TODO"
|
||||
|
||||
publish :: LZ.ByteString -> Maybe Text -> IO ()
|
||||
publish body firstId = do
|
||||
-- let mevents = decode body >>= anArray >>= takeUpUntil firstId
|
||||
-- case mevents of
|
||||
-- Just events -> mapM_ (publishOneEvent firstId) events
|
||||
-- _ -> return ()
|
||||
case firstId of
|
||||
Just txt -> putStrLn (T.unpack txt)
|
||||
_ -> return ()
|
||||
(LZ.putStrLn . LZ.take 40) body
|
||||
|
||||
-- publishOneEvent :: Value -> IO ()
|
||||
-- publishOneEvent mEvent = putStrLn "TODO: publish to Kafka"
|
||||
|
|
Loading…
Reference in a new issue