REFACTOR: create a BackgroundTask abstraction

This commit is contained in:
Alessio 2024-12-22 08:03:12 -08:00
parent e27cd12cdc
commit ff638bd543
3 changed files with 139 additions and 181 deletions

View File

@ -2,6 +2,7 @@ package webserver
import ( import (
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
@ -118,8 +119,21 @@ func (app *Application) ChangeSession(w http.ResponseWriter, r *http.Request) {
app.error_400_with_message(w, r, fmt.Sprintf("User not in database: %s", form.AccountName)) app.error_400_with_message(w, r, fmt.Sprintf("User not in database: %s", form.AccountName))
return return
} }
app.LastReadNotificationSortIndex = 0 // Clear unread notifications app.LastReadNotificationSortIndex = 0 // Clear unread notifications
go app.background_notifications_scrape() // Update notifications info in background (avoid latency when switching users)
// Update notifications info in background (avoid latency when switching users)
go func() {
trove, last_unread_notification_sort_index, err := app.API.GetNotifications(1) // Just 1 page
if err != nil && !errors.Is(err, scraper.END_OF_FEED) && !errors.Is(err, scraper.ErrRateLimited) {
app.ErrorLog.Printf("Error occurred on getting notifications after switching users: %s", err.Error())
return
}
// We have to save the notifications first, otherwise it'll just report 0 since the last-read sort index
app.Profile.SaveTweetTrove(trove, false, &app.API)
go app.Profile.SaveTweetTrove(trove, true, &app.API)
// Set the notifications count
app.LastReadNotificationSortIndex = last_unread_notification_sort_index
}()
data := NotificationBubbles{ data := NotificationBubbles{
NumMessageNotifications: len(app.Profile.GetUnreadConversations(app.ActiveUser.ID)), NumMessageNotifications: len(app.Profile.GetUnreadConversations(app.ActiveUser.ID)),
} }

View File

@ -3,6 +3,7 @@ package webserver
import ( import (
"context" "context"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
@ -238,8 +239,13 @@ func (app *Application) Messages(w http.ResponseWriter, r *http.Request) {
// Every 3 seconds, message detail page will send request to scrape, with `?poll` set // Every 3 seconds, message detail page will send request to scrape, with `?poll` set
if r.URL.Query().Has("poll") { if r.URL.Query().Has("poll") {
// Not run as a goroutine; this call blocks. It's not actually "background" trove, new_cursor, err := app.API.PollInboxUpdates(inbox_cursor)
app.background_dm_polling_scrape() if err != nil && !errors.Is(err, scraper.END_OF_FEED) && !errors.Is(err, scraper.ErrRateLimited) {
panic(err)
}
inbox_cursor = new_cursor
app.Profile.SaveTweetTrove(trove, false, &app.API)
go app.Profile.SaveTweetTrove(trove, true, &app.API)
} }
parts := strings.Split(strings.Trim(r.URL.Path, "/"), "/") parts := strings.Split(strings.Trim(r.URL.Path, "/"), "/")

View File

@ -1,213 +1,151 @@
package webserver package webserver
import ( import (
"errors"
"fmt" "fmt"
"gitlab.com/offline-twitter/twitter_offline_engine/pkg/scraper" "log"
"os"
"runtime/debug"
"time" "time"
"gitlab.com/offline-twitter/twitter_offline_engine/pkg/scraper"
) )
var is_following_only = true // Do one initial scrape of the "following_only" feed and then just regular feed after that type BackgroundTask struct {
Name string
GetTroveFunc func(*scraper.API) scraper.TweetTrove
StartDelay time.Duration
Period time.Duration
func (app *Application) background_scrape() { log *log.Logger
app *Application
}
func (t *BackgroundTask) Do() {
// Avoid crashing the thread if a scrape fails // Avoid crashing the thread if a scrape fails
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
// TODO // TODO
fmt.Println("Background Home Timeline thread: panicked!") t.log.Print("panicked!")
if err, ok := r.(error); ok { if err, ok := r.(error); ok {
fmt.Println(err.Error()) t.log.Print("(the following is an error)")
t.log.Print(err.Error())
} else { } else {
fmt.Println(r) t.log.Print("(the following is an object, not an error)")
t.log.Print(r)
} }
t.log.Output(2, string(debug.Stack()))
} }
}() }()
fmt.Println("Starting home timeline scrape...")
// Do nothing if scraping is currently disabled // Do nothing if scraping is currently disabled
if app.IsScrapingDisabled { if t.app.IsScrapingDisabled {
fmt.Println("Skipping home timeline scrape!") t.log.Print("(disabled)")
return return
} else {
t.log.Print("starting scrape")
} }
fmt.Println("Scraping home timeline...") // Run the task
trove, err := app.API.GetHomeTimeline("", is_following_only) trove := t.GetTroveFunc(&t.app.API)
if err != nil { t.log.Print("saving results")
app.ErrorLog.Printf("Background scrape failed: %s", err.Error()) t.app.Profile.SaveTweetTrove(trove, false, &t.app.API)
return go t.app.Profile.SaveTweetTrove(trove, true, &t.app.API)
} t.log.Print("success")
fmt.Println("Saving scrape results...")
app.Profile.SaveTweetTrove(trove, false, &app.API)
go app.Profile.SaveTweetTrove(trove, true, &app.API)
fmt.Println("Scraping succeeded.")
is_following_only = false
} }
func (app *Application) background_user_likes_scrape() { func (t *BackgroundTask) StartBackground() {
// Avoid crashing the thread if a scrape fails // Start the task in a goroutine
defer func() { t.log = log.New(os.Stdout, fmt.Sprintf("[background (%s)]: ", t.Name), log.LstdFlags)
if r := recover(); r != nil {
// TODO go func() {
fmt.Println("Background Home Timeline thread: panicked!") t.log.Printf("starting, with initial delay %s and regular delay %s", t.StartDelay, t.Period)
if err, ok := r.(error); ok {
fmt.Println(err.Error()) time.Sleep(t.StartDelay) // Initial delay
} else { timer := time.NewTicker(t.Period) // Regular delay
fmt.Println(r) defer timer.Stop()
}
t.Do()
for range timer.C {
t.Do()
} }
}() }()
fmt.Println("Starting user likes scrape...")
// Do nothing if scraping is currently disabled
if app.IsScrapingDisabled {
fmt.Println("Skipping user likes scrape!")
return
}
fmt.Println("Scraping user likes...")
trove, err := app.API.GetUserLikes(app.ActiveUser.ID, 50) // TODO: parameterizable
if err != nil {
app.ErrorLog.Printf("Background scrape failed: %s", err.Error())
return
}
fmt.Println("Saving scrape results...")
app.Profile.SaveTweetTrove(trove, false, &app.API)
go app.Profile.SaveTweetTrove(trove, true, &app.API)
fmt.Println("Scraping succeeded.")
} }
var is_following_only = 0 // Do mostly "For you" feed, but start with one round of the "following_only" feed
var is_following_only_frequency = 5 // Make every 5th scrape a "following_only" one
var inbox_cursor string = "" var inbox_cursor string = ""
func (app *Application) background_dm_polling_scrape() {
// Avoid crashing the thread if a scrape fails
defer func() {
if r := recover(); r != nil {
// TODO
fmt.Println("Background Home Timeline thread: panicked!")
if err, ok := r.(error); ok {
fmt.Println(err.Error())
} else {
fmt.Println(r)
}
}
}()
fmt.Println("Starting user DMs scrape...")
// Do nothing if scraping is currently disabled
if app.IsScrapingDisabled {
fmt.Println("Skipping user DMs scrape!")
return
}
fmt.Println("Scraping user DMs...")
var trove scraper.TweetTrove
var err error
if inbox_cursor == "" {
trove, inbox_cursor, err = app.API.GetInbox(0)
} else {
trove, inbox_cursor, err = app.API.PollInboxUpdates(inbox_cursor)
}
if err != nil {
panic(err)
}
fmt.Println("Saving DM results...")
app.Profile.SaveTweetTrove(trove, false, &app.API)
go app.Profile.SaveTweetTrove(trove, true, &app.API)
fmt.Println("Scraping DMs succeeded.")
}
func (app *Application) background_notifications_scrape() {
// Avoid crashing the thread if a scrape fails
defer func() {
if r := recover(); r != nil {
// TODO
fmt.Println("Background notifications thread: panicked!")
if err, ok := r.(error); ok {
fmt.Println(err.Error())
} else {
fmt.Println(r)
}
}
}()
fmt.Println("Starting notifications scrape...")
// Do nothing if scraping is currently disabled
if app.IsScrapingDisabled {
fmt.Println("Skipping notifications scrape!")
return
}
fmt.Println("Scraping user notifications...")
trove, last_unread_notification_sort_index, err := app.API.GetNotifications(1) // Just 1 page
if err != nil {
panic(err)
}
// Jot down the unread notifs info in the application object (to render notification count bubble)
app.LastReadNotificationSortIndex = last_unread_notification_sort_index
fmt.Println("Saving notification results...")
app.Profile.SaveTweetTrove(trove, false, &app.API)
go app.Profile.SaveTweetTrove(trove, true, &app.API)
fmt.Println("Scraping notification succeeded.")
}
func (app *Application) start_background() { func (app *Application) start_background() {
fmt.Println("Starting background") fmt.Println("Starting background tasks")
// Scrape the home timeline every 3 minutes timeline_task := BackgroundTask{
go func() { Name: "home timeline",
// Initial delay before the first task execution GetTroveFunc: func(api *scraper.API) scraper.TweetTrove {
time.Sleep(10 * time.Second) should_do_following_only := is_following_only%is_following_only_frequency == 0
app.background_scrape() trove, err := api.GetHomeTimeline("", should_do_following_only)
if err != nil && !errors.Is(err, scraper.END_OF_FEED) && !errors.Is(err, scraper.ErrRateLimited) {
panic(err)
}
return trove
},
StartDelay: 10 * time.Second,
Period: 3 * time.Minute,
app: app,
}
timeline_task.StartBackground()
// Create a timer that triggers the background task every 3 minutes likes_task := BackgroundTask{
interval := 3 * time.Minute // TODO: parameterizable Name: "user likes",
timer := time.NewTicker(interval) GetTroveFunc: func(api *scraper.API) scraper.TweetTrove {
defer timer.Stop() trove, err := api.GetUserLikes(api.UserID, 50) // TODO: parameterizable
if err != nil && !errors.Is(err, scraper.END_OF_FEED) && !errors.Is(err, scraper.ErrRateLimited) {
panic(err)
}
return trove
},
StartDelay: 15 * time.Second,
Period: 10 * time.Minute,
app: app,
}
likes_task.StartBackground()
for range timer.C { dms_task := BackgroundTask{
app.background_scrape() Name: "DM inbox",
} GetTroveFunc: func(api *scraper.API) scraper.TweetTrove {
}() var trove scraper.TweetTrove
var err error
if inbox_cursor == "" {
trove, inbox_cursor, err = api.GetInbox(0)
} else {
trove, inbox_cursor, err = api.PollInboxUpdates(inbox_cursor)
}
if err != nil && !errors.Is(err, scraper.END_OF_FEED) && !errors.Is(err, scraper.ErrRateLimited) {
panic(err)
}
return trove
},
StartDelay: 5 * time.Second,
Period: 10 * time.Second,
app: app,
}
dms_task.StartBackground()
// Scrape the logged-in user's likes every 10 minutes notifications_task := BackgroundTask{
go func() { Name: "DM inbox",
time.Sleep(15 * time.Second) GetTroveFunc: func(api *scraper.API) scraper.TweetTrove {
app.background_user_likes_scrape() trove, last_unread_notification_sort_index, err := api.GetNotifications(1) // Just 1 page
if err != nil && !errors.Is(err, scraper.END_OF_FEED) && !errors.Is(err, scraper.ErrRateLimited) {
interval := 10 * time.Minute // TODO: parameterizable panic(err)
timer := time.NewTicker(interval) }
defer timer.Stop() // Jot down the unread notifs info in the application object (to render notification count bubble)
app.LastReadNotificationSortIndex = last_unread_notification_sort_index
for range timer.C { return trove
app.background_user_likes_scrape() },
} StartDelay: 1 * time.Second,
}() Period: 10 * time.Second,
app: app,
// Scrape inbox DMs every 10 seconds }
go func() { notifications_task.StartBackground()
time.Sleep(5 * time.Second)
app.background_dm_polling_scrape()
interval := 10 * time.Second
timer := time.NewTicker(interval)
defer timer.Stop()
for range timer.C {
app.background_dm_polling_scrape()
}
}()
// Scrape notifications every 10 seconds
go func() {
app.background_notifications_scrape()
interval := 10 * time.Second
timer := time.NewTicker(interval)
defer timer.Stop()
for range timer.C {
app.background_notifications_scrape()
}
}()
} }