diff --git a/src/Lentille/GitHub/RateLimit.hs b/src/Lentille/GitHub/RateLimit.hs index bae6a0191..2c70cd989 100644 --- a/src/Lentille/GitHub/RateLimit.hs +++ b/src/Lentille/GitHub/RateLimit.hs @@ -10,7 +10,7 @@ import Lentille.GitHub.Types import Lentille.GraphQL import Monocle.Prelude import Network.HTTP.Client (responseBody, responseStatus) -import Network.HTTP.Types (Status, badGateway502, forbidden403, ok200) +import Network.HTTP.Types (Status, badGateway502, forbidden403, ok200, unauthorized401) import Effectful.Retry @@ -48,6 +48,9 @@ retryCheck :: forall es a. GraphEffects es => Either GraphQLError a -> Eff es Re retryCheck = \case Right _ -> pure DontRetry Left (GraphQLError err (RequestLog _ _ resp _)) + | status == unauthorized401 -> do + logWarn "Authentication error" ["body" .= body] + pure DontRetry | isTimeoutError status body -> do logWarn_ "Server side timeout error. Will retry with lower query depth ..." pure ConsultPolicy diff --git a/src/Lentille/GraphQL.hs b/src/Lentille/GraphQL.hs index 95a30c38c..547e8c44d 100644 --- a/src/Lentille/GraphQL.hs +++ b/src/Lentille/GraphQL.hs @@ -144,7 +144,7 @@ doRequest :: doRequest client mkArgs retryCheck depthM pageInfoM = retryingDynamic policy (const retryCheck) $ \rs -> do when (rs.rsIterNumber > 0) - $ logWarn "Faulty response" ["num" .= rs.rsIterNumber] + $ logWarn "Retrying request" ["num" .= rs.rsIterNumber] runFetch rs.rsIterNumber where delay = 1_100_000 -- 1.1 seconds @@ -209,8 +209,8 @@ streamFetch client@GraphClient {..} mkArgs StreamFetchOptParams {..} transformRe requestWithPageInfo pageInfoM storedRateLimitM = do holdOnIfNeeded storedRateLimitM - respE <- doRequest client mkArgs fpRetryCheck fpDepth pageInfoM - pure $ case respE of + eResp <- doRequest client mkArgs fpRetryCheck fpDepth pageInfoM + pure $ case eResp of Left err -> Left err Right resp -> let (pageInfo, rateLimitM, decodingErrors, xs) = transformResponse resp @@ -225,19 +225,19 @@ streamFetch client@GraphClient {..} mkArgs StreamFetchOptParams {..} transformRe startFetch = do --- Perform a pre GraphQL request to gather rateLimit - fpRespE <- case fpGetRatelimit of + (mErr :: Maybe GraphQLError) <- case fpGetRatelimit of Just getRateLimit -> lift $ E.modifyMVar rateLimitMVar $ const do - rlE <- getRateLimit client - case rlE of + eRateLimit <- getRateLimit client + case eRateLimit of Left err -> do logWarn_ "Could not fetch the current rate limit" pure (Nothing, Just err) - Right rl -> pure (rl, Nothing) + Right rateLimit -> pure (rateLimit, Nothing) Nothing -> pure Nothing - case fpRespE of + case mErr of Just err -> S.yield (Left $ GraphError err) Nothing -> go Nothing 0 diff --git a/src/Macroscope/Test.hs b/src/Macroscope/Test.hs index b624e40c9..d8c8d07b7 100644 --- a/src/Macroscope/Test.hs +++ b/src/Macroscope/Test.hs @@ -14,14 +14,15 @@ import Monocle.Backend.Documents qualified as D import Monocle.Backend.Index qualified as I import Monocle.Backend.Provisioner qualified import Monocle.Backend.Queries qualified as Q -import Monocle.Backend.Test (withTenantConfig) +import Monocle.Backend.Test (fakeChangePB, withTenantConfig) import Monocle.Backend.Test qualified as BT (fakeChange, fakeDate, fakeDateAlt) import Monocle.Client import Monocle.Config qualified as Config import Monocle.Effects -import Monocle.Entity (CrawlerName (..)) +import Monocle.Entity (CrawlerName (..), Entity (Project)) import Monocle.Env import Monocle.Prelude +import Monocle.Protob.Crawler qualified as CrawlerPB import Streaming.Prelude qualified as Streaming import Test.Tasty import Test.Tasty.HUnit @@ -45,12 +46,36 @@ testCrawlingPoint = do fakeChange2 = fakeChange1 {D.echangeId = "efake2", D.echangeUpdatedAt = BT.fakeDateAlt} I.indexChanges [fakeChange1, fakeChange2] withTestApi (mkAppEnv fakeConfig) $ \client -> do - let stream date name - | date == BT.fakeDateAlt && name == "opendev/neutron" = pure mempty - | otherwise = error "Bad crawling point" - void $ runLentilleM client $ Macroscope.runStream apiKey indexName (CrawlerName crawlerName) (Macroscope.Changes stream) - assertEqual "Fetched at expected crawling point" True True + void $ runLentilleM client do + (oldestAge, oldestEntity) <- getOldest + liftIO $ assertEqual "Oldest entity is correct" oldestEntity (Project "opendev/neutron") + + Macroscope.runStream apiKey indexName (CrawlerName crawlerName) (Macroscope.Changes badStream) + + (currentOldestAge, _) <- getOldest + liftIO $ assertEqual "Commit date is not updated on failure" oldestAge currentOldestAge + + Macroscope.runStream apiKey indexName (CrawlerName crawlerName) (Macroscope.Changes goodStream) + + (newOldestAge, _) <- getOldest + liftIO $ assertBool "Commit date updated" (newOldestAge > oldestAge) where + -- A document stream that yield an error + badStream date name + | date == BT.fakeDateAlt && name == "opendev/neutron" = do + Streaming.yield $ Right (fakeChangePB, []) + Streaming.yield $ Left (DecodeError ["Oops"]) + | otherwise = error "Bad crawling point" + + -- A document stream that yield a change + goodStream date name + | date == BT.fakeDateAlt && name == "opendev/neutron" = do + Streaming.yield $ Right (fakeChangePB, []) + | otherwise = error "Bad crawling point" + + -- Helper function to get the oldest entity age + getOldest = fromMaybe (error "no entity!") <$> Macroscope.getStreamOldestEntity indexName (from crawlerName) projectEntity 0 + projectEntity = CrawlerPB.EntityTypeENTITY_TYPE_PROJECT fakeConfig = (mkConfig (from indexName)) { Config.crawlers_api_key = Just (from apiKey) diff --git a/src/Macroscope/Worker.hs b/src/Macroscope/Worker.hs index d41bd18cf..7826feac0 100644 --- a/src/Macroscope/Worker.hs +++ b/src/Macroscope/Worker.hs @@ -9,6 +9,8 @@ module Macroscope.Worker ( runStream, DocumentStream (..), + -- test helper + getStreamOldestEntity, ) where import Data.Vector qualified as V @@ -21,10 +23,8 @@ import Monocle.Protob.Crawler as CrawlerPB hiding (Entity) import Monocle.Protob.Issue (Issue, IssueEvent) import Monocle.Protob.Search (TaskData) import Proto3.Suite (Enumerated (Enumerated)) -import Streaming qualified as S import Streaming.Prelude qualified as S -import Effectful qualified as E import Effectful.Reader.Static qualified as E import Monocle.Effects @@ -80,35 +80,52 @@ data DocumentType | DTIssues (Issue, [IssueEvent]) deriving (Generic, ToJSON) -data ProcessResult = AddOk | AddError Text deriving stock (Show) +data ProcessError es + = CommitError Text + | AddError Text + | StreamError (LentilleError, LentilleStream es DocumentType) --- | A stream error contains the first Left, and the rest of the stream -type StreamError es = (LentilleError, LentilleStream es DocumentType) - --- | 'process' read the stream of document and post to the monocle API -process :: +-- | 'processStream' read the stream of document and post to the monocle API +processStream :: forall es. -- | Funtion to log about the processing (Int -> Eff es ()) -> -- | Function to post on the Monocle API ([DocumentType] -> Eff es AddDocResponse) -> -- | The stream of documents to read - Stream (Of DocumentType) (Eff es) () -> + LentilleStream es DocumentType -> -- | The processing results - Eff es [ProcessResult] -process logFunc postFunc = - S.toList_ - . S.mapM processBatch - . S.mapped S.toList - . S.chunksOf 500 + Eff es [Maybe (ProcessError es)] +processStream logFunc postFunc = go (0 :: Word) [] [] where - processBatch :: [DocumentType] -> Eff es ProcessResult - processBatch docs = do + go count acc results stream = do + eDocument <- S.next stream + case eDocument of + Left () -> do + -- The end of the stream + res <- processBatch acc + pure $ reverse (res : results) + Right (Left err, rest) -> do + -- An error occured in the stream, abort now + let res = Just (StreamError (err, rest)) + pure $ reverse (res : results) + Right (Right doc, rest) -> do + -- We got a new document + let newAcc = doc : acc + if count == 499 + then do + res <- processBatch newAcc + go 0 [] (res : results) rest + else go (count + 1) newAcc results rest + + processBatch :: [DocumentType] -> Eff es (Maybe (ProcessError es)) + processBatch [] = pure Nothing + processBatch (reverse -> docs) = do logFunc (length docs) resp <- postFunc docs pure $ case resp of - AddDocResponse Nothing -> AddOk - AddDocResponse (Just err) -> AddError (show err) + AddDocResponse Nothing -> Nothing + AddDocResponse (Just err) -> Just (AddError (show err)) -- | 'runStream' is the main function used by macroscope runStream :: @@ -127,24 +144,23 @@ runStream apiKey indexName crawlerName documentStream = do go :: UTCTime -> Word32 -> Eff es () go startTime offset = unlessStopped do - res <- - runErrorNoCallStack do - runStreamError startTime apiKey indexName crawlerName documentStream offset - case res of - Right () -> pure () - Left (x, xs) -> do - logWarn "Error occured when consuming the document stream" ["err" .= x] - S.toList_ xs >>= \case + errors <- + runStreamError startTime apiKey indexName crawlerName documentStream offset + forM_ errors \case + AddError err -> logWarn "Could not add documents" ["err" .= err] + CommitError err -> logWarn "Could not commit update date" ["err" .= err] + StreamError (err, rest) -> do + logWarn "Error occured when consuming the document stream" ["err" .= err] + S.toList_ rest >>= \case [] -> pure () - rest -> logWarn "Left over documents found after error" ["items" .= rest] + items -> logWarn "Left over documents found after error" ["items" .= items] -- TODO: explains why TDStream don't support offset? unless (isTDStream documentStream) do -- Try the next entity by incrementing the offset go startTime (offset + 1) --- | 'runStreamError' is the stream processor which throws an error to interupt the stream --- when it contains a Left. +-- | 'runStreamError' is the stream processor runStreamError :: forall es. (LoggerEffect :> es, Retry :> es, PrometheusEffect :> es, MonoClientEffect :> es) => @@ -154,7 +170,7 @@ runStreamError :: CrawlerName -> DocumentStream es -> Word32 -> - Eff (Error (StreamError es) : es) () + Eff es [ProcessError es] runStreamError startTime apiKey indexName (CrawlerName crawlerName) documentStream offset = go where go = do @@ -163,11 +179,14 @@ runStreamError startTime apiKey indexName (CrawlerName crawlerName) documentStre -- Query the monocle api for the oldest entity to be updated. oldestEntityM <- getStreamOldestEntity indexName (from crawlerName) (streamEntity documentStream) offset case oldestEntityM of - Nothing -> logInfo_ "Unable to find entity to update" + Nothing -> do + logInfo_ "Unable to find entity to update" + pure [] Just (oldestAge, entity) | -- add a 1 second delta to avoid Hysteresis - addUTCTime 1 oldestAge >= startTime -> + addUTCTime 1 oldestAge >= startTime -> do logInfo "Crawling entities completed" ["entity" .= entity, "age" .= oldestAge] + pure [] | otherwise -> goStream oldestAge entity goStream oldestAge entity = do @@ -175,27 +194,21 @@ runStreamError startTime apiKey indexName (CrawlerName crawlerName) documentStre -- Run the document stream for that entity postResult <- - process + processStream (\c -> logInfo "Posting documents" ["count" .= c]) (httpRetry "api/commit/add" . mCrawlerAddDoc . mkRequest entity) - (eitherStreamToError $ getStream oldestAge entity) + (getStream oldestAge entity) - case foldr collectPostFailure [] postResult of - xs@(_ : _) -> logWarn "Post documents failed" ["errors" .= xs] + case catMaybes postResult of [] -> do -- Post the commit date res <- httpRetry "api/commit" $ commitTimestamp entity case res of - Just (err :: Text) -> do - logWarn "Commit date failed" ["err" .= err] + Just err -> pure [CommitError err] Nothing -> do logInfo_ "Continuing on next entity" go - - collectPostFailure :: ProcessResult -> [Text] -> [Text] - collectPostFailure res acc = case res of - AddOk -> acc - AddError err -> err : acc + xs -> pure xs -- Adapt the document stream to intermediate representation getStream oldestAge entity = case documentStream of @@ -296,21 +309,3 @@ getStreamOldestEntity indexName crawlerName entityType offset = do ) ) -> pure Nothing _ -> error $ "Could not get initial timestamp: " <> show resp - --- | Remove the left part of the stream and throw an error when they occurs. --- The error contains the first left encountered, and the rest of the stream. -eitherStreamToError :: - Stream (Of (Either err a)) (Eff es) () -> - Stream (Of a) (Eff (Error (err, Stream (Of (Either err a)) (Eff es) ()) : es)) () -eitherStreamToError stream = do - nextE <- hoist E.raise (lift (S.next stream)) - case nextE of - -- The stream is over, stop here - Left () -> pure () - Right (x, xs) -> do - case x of - -- TODO: should we continue after the first error? - Left e -> lift (throwError (e, xs)) - Right v -> do - S.yield v - eitherStreamToError xs diff --git a/src/Monocle/Backend/Test.hs b/src/Monocle/Backend/Test.hs index 30799abcc..1f79c0344 100644 --- a/src/Monocle/Backend/Test.hs +++ b/src/Monocle/Backend/Test.hs @@ -20,12 +20,14 @@ import Monocle.Config qualified as Config import Monocle.Entity import Monocle.Env import Monocle.Prelude +import Monocle.Protob.Change qualified as ChangePB import Monocle.Protob.Crawler qualified as CrawlerPB import Monocle.Protob.Metric qualified as MetricPB import Monocle.Protob.Search qualified as MetricPB import Monocle.Protob.Search qualified as SearchPB import Monocle.Search.Query (defaultQueryFlavor) import Monocle.Search.Query qualified as Q +import Proto3.Suite (Enumerated (Enumerated)) import Relude.Unsafe ((!!)) import Streaming.Prelude qualified as Streaming import Test.Tasty.HUnit ((@?=)) @@ -45,6 +47,42 @@ eve = Author "eve" "e" fakeAuthor = Author "John" "John" fakeAuthorAlt = Author "John Doe/12" "review.opendev.org/John Doe/12" +fakeChangePB :: ChangePB.Change +fakeChangePB = + ChangePB.Change + { changeId = mempty + , changeNumber = 42 + , changeChangeId = mempty + , changeTitle = mempty + , changeText = mempty + , changeUrl = mempty + , changeCommitCount = 1 + , changeAdditions = 2 + , changeDeletions = 0 + , changeChangedFilesCount = 1 + , changeChangedFiles = mempty + , changeCommits = mempty + , changeRepositoryPrefix = mempty + , changeRepositoryFullname = mempty + , changeRepositoryShortname = mempty + , changeAuthor = Nothing + , changeOptionalMergedBy = Nothing + , changeBranch = mempty + , changeTargetBranch = mempty + , changeCreatedAt = Just (from fakeDate) + , changeOptionalMergedAt = Nothing + , changeUpdatedAt = Just (from fakeDate) + , changeOptionalClosedAt = Nothing + , changeState = Enumerated (Right ChangePB.Change_ChangeStateOpen) + , changeOptionalDuration = Nothing + , changeMergeable = mempty + , changeLabels = mempty + , changeAssignees = mempty + , changeApprovals = mempty + , changeDraft = False + , changeOptionalSelfMerged = Nothing + } + fakeChange :: EChange fakeChange = EChange