Skip to content

Commit

Permalink
Merge pull request #1062 from TristanCacqueray/auth-error
Browse files Browse the repository at this point in the history
Handle Authentication Error
  • Loading branch information
mergify[bot] authored Sep 20, 2023
2 parents c471fc2 + 9ce14a6 commit 818f553
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 78 deletions.
5 changes: 4 additions & 1 deletion src/Lentille/GitHub/RateLimit.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import Lentille.GitHub.Types
import Lentille.GraphQL
import Monocle.Prelude
import Network.HTTP.Client (responseBody, responseStatus)
import Network.HTTP.Types (Status, badGateway502, forbidden403, ok200)
import Network.HTTP.Types (Status, badGateway502, forbidden403, ok200, unauthorized401)

import Effectful.Retry

Expand Down Expand Up @@ -48,6 +48,9 @@ retryCheck :: forall es a. GraphEffects es => Either GraphQLError a -> Eff es Re
retryCheck = \case
Right _ -> pure DontRetry
Left (GraphQLError err (RequestLog _ _ resp _))
| status == unauthorized401 -> do
logWarn "Authentication error" ["body" .= body]
pure DontRetry
| isTimeoutError status body -> do
logWarn_ "Server side timeout error. Will retry with lower query depth ..."
pure ConsultPolicy
Expand Down
16 changes: 8 additions & 8 deletions src/Lentille/GraphQL.hs
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ doRequest ::
doRequest client mkArgs retryCheck depthM pageInfoM =
retryingDynamic policy (const retryCheck) $ \rs -> do
when (rs.rsIterNumber > 0)
$ logWarn "Faulty response" ["num" .= rs.rsIterNumber]
$ logWarn "Retrying request" ["num" .= rs.rsIterNumber]
runFetch rs.rsIterNumber
where
delay = 1_100_000 -- 1.1 seconds
Expand Down Expand Up @@ -209,8 +209,8 @@ streamFetch client@GraphClient {..} mkArgs StreamFetchOptParams {..} transformRe

requestWithPageInfo pageInfoM storedRateLimitM = do
holdOnIfNeeded storedRateLimitM
respE <- doRequest client mkArgs fpRetryCheck fpDepth pageInfoM
pure $ case respE of
eResp <- doRequest client mkArgs fpRetryCheck fpDepth pageInfoM
pure $ case eResp of
Left err -> Left err
Right resp ->
let (pageInfo, rateLimitM, decodingErrors, xs) = transformResponse resp
Expand All @@ -225,19 +225,19 @@ streamFetch client@GraphClient {..} mkArgs StreamFetchOptParams {..} transformRe

startFetch = do
--- Perform a pre GraphQL request to gather rateLimit
fpRespE <- case fpGetRatelimit of
(mErr :: Maybe GraphQLError) <- case fpGetRatelimit of
Just getRateLimit -> lift
$ E.modifyMVar rateLimitMVar
$ const do
rlE <- getRateLimit client
case rlE of
eRateLimit <- getRateLimit client
case eRateLimit of
Left err -> do
logWarn_ "Could not fetch the current rate limit"
pure (Nothing, Just err)
Right rl -> pure (rl, Nothing)
Right rateLimit -> pure (rateLimit, Nothing)
Nothing -> pure Nothing

case fpRespE of
case mErr of
Just err -> S.yield (Left $ GraphError err)
Nothing -> go Nothing 0

Expand Down
39 changes: 32 additions & 7 deletions src/Macroscope/Test.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@ import Monocle.Backend.Documents qualified as D
import Monocle.Backend.Index qualified as I
import Monocle.Backend.Provisioner qualified
import Monocle.Backend.Queries qualified as Q
import Monocle.Backend.Test (withTenantConfig)
import Monocle.Backend.Test (fakeChangePB, withTenantConfig)
import Monocle.Backend.Test qualified as BT (fakeChange, fakeDate, fakeDateAlt)
import Monocle.Client
import Monocle.Config qualified as Config
import Monocle.Effects
import Monocle.Entity (CrawlerName (..))
import Monocle.Entity (CrawlerName (..), Entity (Project))
import Monocle.Env
import Monocle.Prelude
import Monocle.Protob.Crawler qualified as CrawlerPB
import Streaming.Prelude qualified as Streaming
import Test.Tasty
import Test.Tasty.HUnit
Expand All @@ -45,12 +46,36 @@ testCrawlingPoint = do
fakeChange2 = fakeChange1 {D.echangeId = "efake2", D.echangeUpdatedAt = BT.fakeDateAlt}
I.indexChanges [fakeChange1, fakeChange2]
withTestApi (mkAppEnv fakeConfig) $ \client -> do
let stream date name
| date == BT.fakeDateAlt && name == "opendev/neutron" = pure mempty
| otherwise = error "Bad crawling point"
void $ runLentilleM client $ Macroscope.runStream apiKey indexName (CrawlerName crawlerName) (Macroscope.Changes stream)
assertEqual "Fetched at expected crawling point" True True
void $ runLentilleM client do
(oldestAge, oldestEntity) <- getOldest
liftIO $ assertEqual "Oldest entity is correct" oldestEntity (Project "opendev/neutron")

Macroscope.runStream apiKey indexName (CrawlerName crawlerName) (Macroscope.Changes badStream)

(currentOldestAge, _) <- getOldest
liftIO $ assertEqual "Commit date is not updated on failure" oldestAge currentOldestAge

Macroscope.runStream apiKey indexName (CrawlerName crawlerName) (Macroscope.Changes goodStream)

(newOldestAge, _) <- getOldest
liftIO $ assertBool "Commit date updated" (newOldestAge > oldestAge)
where
-- A document stream that yield an error
badStream date name
| date == BT.fakeDateAlt && name == "opendev/neutron" = do
Streaming.yield $ Right (fakeChangePB, [])
Streaming.yield $ Left (DecodeError ["Oops"])
| otherwise = error "Bad crawling point"

-- A document stream that yield a change
goodStream date name
| date == BT.fakeDateAlt && name == "opendev/neutron" = do
Streaming.yield $ Right (fakeChangePB, [])
| otherwise = error "Bad crawling point"

-- Helper function to get the oldest entity age
getOldest = fromMaybe (error "no entity!") <$> Macroscope.getStreamOldestEntity indexName (from crawlerName) projectEntity 0
projectEntity = CrawlerPB.EntityTypeENTITY_TYPE_PROJECT
fakeConfig =
(mkConfig (from indexName))
{ Config.crawlers_api_key = Just (from apiKey)
Expand Down
119 changes: 57 additions & 62 deletions src/Macroscope/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
module Macroscope.Worker (
runStream,
DocumentStream (..),
-- test helper
getStreamOldestEntity,
) where

import Data.Vector qualified as V
Expand All @@ -21,10 +23,8 @@ import Monocle.Protob.Crawler as CrawlerPB hiding (Entity)
import Monocle.Protob.Issue (Issue, IssueEvent)
import Monocle.Protob.Search (TaskData)
import Proto3.Suite (Enumerated (Enumerated))
import Streaming qualified as S
import Streaming.Prelude qualified as S

import Effectful qualified as E
import Effectful.Reader.Static qualified as E
import Monocle.Effects

Expand Down Expand Up @@ -76,35 +76,52 @@ data DocumentType
| DTIssues (Issue, [IssueEvent])
deriving (Generic, ToJSON)

data ProcessResult = AddOk | AddError Text deriving stock (Show)
data ProcessError es
= CommitError Text
| AddError Text
| StreamError (LentilleError, LentilleStream es DocumentType)

-- | A stream error contains the first Left, and the rest of the stream
type StreamError es = (LentilleError, LentilleStream es DocumentType)

-- | 'process' read the stream of document and post to the monocle API
process ::
-- | 'processStream' read the stream of document and post to the monocle API
processStream ::
forall es.
-- | Funtion to log about the processing
(Int -> Eff es ()) ->
-- | Function to post on the Monocle API
([DocumentType] -> Eff es AddDocResponse) ->
-- | The stream of documents to read
Stream (Of DocumentType) (Eff es) () ->
LentilleStream es DocumentType ->
-- | The processing results
Eff es [ProcessResult]
process logFunc postFunc =
S.toList_
. S.mapM processBatch
. S.mapped S.toList
. S.chunksOf 500
Eff es [Maybe (ProcessError es)]
processStream logFunc postFunc = go (0 :: Word) [] []
where
processBatch :: [DocumentType] -> Eff es ProcessResult
processBatch docs = do
go count acc results stream = do
eDocument <- S.next stream
case eDocument of
Left () -> do
-- The end of the stream
res <- processBatch acc
pure $ reverse (res : results)
Right (Left err, rest) -> do
-- An error occured in the stream, abort now
let res = Just (StreamError (err, rest))
pure $ reverse (res : results)
Right (Right doc, rest) -> do
-- We got a new document
let newAcc = doc : acc
if count == 499
then do
res <- processBatch newAcc
go 0 [] (res : results) rest
else go (count + 1) newAcc results rest

processBatch :: [DocumentType] -> Eff es (Maybe (ProcessError es))
processBatch [] = pure Nothing
processBatch (reverse -> docs) = do
logFunc (length docs)
resp <- postFunc docs
pure $ case resp of
AddDocResponse Nothing -> AddOk
AddDocResponse (Just err) -> AddError (show err)
AddDocResponse Nothing -> Nothing
AddDocResponse (Just err) -> Just (AddError (show err))

-- | 'runStream' is the main function used by macroscope
runStream ::
Expand All @@ -123,24 +140,23 @@ runStream apiKey indexName crawlerName documentStream = do
go :: UTCTime -> Word32 -> Eff es ()
go startTime offset =
unlessStopped do
res <-
runErrorNoCallStack do
runStreamError startTime apiKey indexName crawlerName documentStream offset
case res of
Right () -> pure ()
Left (x, xs) -> do
logWarn "Error occured when consuming the document stream" ["err" .= x]
S.toList_ xs >>= \case
errors <-
runStreamError startTime apiKey indexName crawlerName documentStream offset
forM_ errors \case
AddError err -> logWarn "Could not add documents" ["err" .= err]
CommitError err -> logWarn "Could not commit update date" ["err" .= err]
StreamError (err, rest) -> do
logWarn "Error occured when consuming the document stream" ["err" .= err]
S.toList_ rest >>= \case
[] -> pure ()
rest -> logWarn "Left over documents found after error" ["items" .= rest]
items -> logWarn "Left over documents found after error" ["items" .= items]

-- TODO: explains why TDStream don't support offset?
unless (isTDStream documentStream) do
-- Try the next entity by incrementing the offset
go startTime (offset + 1)

-- | 'runStreamError' is the stream processor which throws an error to interupt the stream
-- when it contains a Left.
-- | 'runStreamError' is the stream processor
runStreamError ::
forall es.
(LoggerEffect :> es, Retry :> es, PrometheusEffect :> es, MonoClientEffect :> es) =>
Expand All @@ -150,7 +166,7 @@ runStreamError ::
CrawlerName ->
DocumentStream es ->
Word32 ->
Eff (Error (StreamError es) : es) ()
Eff es [ProcessError es]
runStreamError startTime apiKey indexName (CrawlerName crawlerName) documentStream offset = go
where
go = do
Expand All @@ -159,39 +175,36 @@ runStreamError startTime apiKey indexName (CrawlerName crawlerName) documentStre
-- Query the monocle api for the oldest entity to be updated.
oldestEntityM <- getStreamOldestEntity indexName (from crawlerName) (streamEntity documentStream) offset
case oldestEntityM of
Nothing -> logInfo_ "Unable to find entity to update"
Nothing -> do
logInfo_ "Unable to find entity to update"
pure []
Just (oldestAge, entity)
| -- add a 1 second delta to avoid Hysteresis
addUTCTime 1 oldestAge >= startTime ->
addUTCTime 1 oldestAge >= startTime -> do
logInfo "Crawling entities completed" ["entity" .= entity, "age" .= oldestAge]
pure []
| otherwise -> goStream oldestAge entity

goStream oldestAge entity = do
logInfo "Processing" ["entity" .= entity, "age" .= oldestAge]

-- Run the document stream for that entity
postResult <-
process
processStream
(\c -> logInfo "Posting documents" ["count" .= c])
(httpRetry "api/commit/add" . mCrawlerAddDoc . mkRequest entity)
(eitherStreamToError $ getStream oldestAge entity)
(getStream oldestAge entity)

case foldr collectPostFailure [] postResult of
xs@(_ : _) -> logWarn "Post documents failed" ["errors" .= xs]
case catMaybes postResult of
[] -> do
-- Post the commit date
res <- httpRetry "api/commit" $ commitTimestamp entity
case res of
Just (err :: Text) -> do
logWarn "Commit date failed" ["err" .= err]
Just err -> pure [CommitError err]
Nothing -> do
logInfo_ "Continuing on next entity"
go

collectPostFailure :: ProcessResult -> [Text] -> [Text]
collectPostFailure res acc = case res of
AddOk -> acc
AddError err -> err : acc
xs -> pure xs

-- Adapt the document stream to intermediate representation
getStream oldestAge entity = case documentStream of
Expand Down Expand Up @@ -289,21 +302,3 @@ getStreamOldestEntity indexName crawlerName entityType offset = do
)
) -> pure Nothing
_ -> error $ "Could not get initial timestamp: " <> show resp

-- | Remove the left part of the stream and throw an error when they occurs.
-- The error contains the first left encountered, and the rest of the stream.
eitherStreamToError ::
Stream (Of (Either err a)) (Eff es) () ->
Stream (Of a) (Eff (Error (err, Stream (Of (Either err a)) (Eff es) ()) : es)) ()
eitherStreamToError stream = do
nextE <- hoist E.raise (lift (S.next stream))
case nextE of
-- The stream is over, stop here
Left () -> pure ()
Right (x, xs) -> do
case x of
-- TODO: should we continue after the first error?
Left e -> lift (throwError (e, xs))
Right v -> do
S.yield v
eitherStreamToError xs
38 changes: 38 additions & 0 deletions src/Monocle/Backend/Test.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ import Monocle.Config qualified as Config
import Monocle.Entity
import Monocle.Env
import Monocle.Prelude
import Monocle.Protob.Change qualified as ChangePB
import Monocle.Protob.Crawler qualified as CrawlerPB
import Monocle.Protob.Metric qualified as MetricPB
import Monocle.Protob.Search qualified as MetricPB
import Monocle.Protob.Search qualified as SearchPB
import Monocle.Search.Query (defaultQueryFlavor)
import Monocle.Search.Query qualified as Q
import Proto3.Suite (Enumerated (Enumerated))
import Relude.Unsafe ((!!))
import Streaming.Prelude qualified as Streaming
import Test.Tasty.HUnit ((@?=))
Expand All @@ -45,6 +47,42 @@ eve = Author "eve" "e"
fakeAuthor = Author "John" "John"
fakeAuthorAlt = Author "John Doe/12" "review.opendev.org/John Doe/12"

fakeChangePB :: ChangePB.Change
fakeChangePB =
ChangePB.Change
{ changeId = mempty
, changeNumber = 42
, changeChangeId = mempty
, changeTitle = mempty
, changeText = mempty
, changeUrl = mempty
, changeCommitCount = 1
, changeAdditions = 2
, changeDeletions = 0
, changeChangedFilesCount = 1
, changeChangedFiles = mempty
, changeCommits = mempty
, changeRepositoryPrefix = mempty
, changeRepositoryFullname = mempty
, changeRepositoryShortname = mempty
, changeAuthor = Nothing
, changeOptionalMergedBy = Nothing
, changeBranch = mempty
, changeTargetBranch = mempty
, changeCreatedAt = Just (from fakeDate)
, changeOptionalMergedAt = Nothing
, changeUpdatedAt = Just (from fakeDate)
, changeOptionalClosedAt = Nothing
, changeState = Enumerated (Right ChangePB.Change_ChangeStateOpen)
, changeOptionalDuration = Nothing
, changeMergeable = mempty
, changeLabels = mempty
, changeAssignees = mempty
, changeApprovals = mempty
, changeDraft = False
, changeOptionalSelfMerged = Nothing
}

fakeChange :: EChange
fakeChange =
EChange
Expand Down

0 comments on commit 818f553

Please sign in to comment.