From 6c61382f4cfc9d246fe1da8a33558724036762e6 Mon Sep 17 00:00:00 2001 From: Henrik Hautakoski Date: Mon, 21 Aug 2023 14:10:38 +0200 Subject: [PATCH] cmd/thalos/main.go: in readerLoop() make sure we set shClient.StartBlock to processor's current block when (re)connecting. --- cmd/thalos/main.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/cmd/thalos/main.go b/cmd/thalos/main.go index afba59b..087fef9 100644 --- a/cmd/thalos/main.go +++ b/cmd/thalos/main.go @@ -44,7 +44,7 @@ var VersionString string = "dev" var exit chan bool -func readerLoop() { +func readerLoop(processor *app.ShipProcessor) { running = true recon_cnt := 0 @@ -75,6 +75,11 @@ func readerLoop() { return err } + // Set stream client start block to processors current block + // Both values should be the same on first connect, but when reconnecting + // We don't want to start from the beginning + shClient.StartBlock = processor.GetCurrentBlock() + return shClient.SendBlocksRequest() } @@ -118,9 +123,9 @@ func readerLoop() { } } -func run() { +func run(processor *app.ShipProcessor) { // Spawn reader loop in another thread. - go readerLoop() + go readerLoop(processor) // Create interrupt channel. signals := make(chan os.Signal, 1) @@ -295,7 +300,7 @@ func main() { ) // Run the application - run() + run(processor) // Close the processor properly processor.Close()