From aba738211394c0e17e5e7159dbc941ed9a71bea9 Mon Sep 17 00:00:00 2001 From: deranjer Date: Thu, 13 Sep 2018 19:34:30 -0400 Subject: [PATCH] Fixing Queue issues, start/stop torrent issues --- engine/cronJobs.go | 21 +++++---------- engine/engine.go | 18 ++++++++----- engine/engineHelpers.go | 58 ++++++++++++++++++++--------------------- main.go | 29 ++++----------------- storage/storage.go | 20 +++++++------- 5 files changed, 62 insertions(+), 84 deletions(-) diff --git a/engine/cronJobs.go b/engine/cronJobs.go index c759180a..c3927dc0 100644 --- a/engine/cronJobs.go +++ b/engine/cronJobs.go @@ -58,10 +58,12 @@ func CheckTorrentWatchFolder(c *cron.Cron, db *storm.DB, tclient *torrent.Client } //CheckTorrents runs a upload ratio check, a queue check (essentially anything that should not be frontend dependent) -func CheckTorrents(c *cron.Cron, db *storm.DB, tclient *torrent.Client, torrentLocalStorage Storage.TorrentLocal, config Settings.FullClientSettings, torrentQueues Storage.TorrentQueues, TorrentLocalArray []*Storage.TorrentLocal) { +func CheckTorrentsCron(c *cron.Cron, db *storm.DB, tclient *torrent.Client, config Settings.FullClientSettings) { c.AddFunc("@every 30s", func() { - Logger.Info("Running a torrent Ratio and Queue Check") - for _, singleTorrentFromStorage := range TorrentLocalArray { + Logger.Debug("Running a torrent Ratio and Queue Check") + torrentLocalArray := Storage.FetchAllStoredTorrents(db) + torrentQueues := Storage.FetchQueues(db) + for _, singleTorrentFromStorage := range torrentLocalArray { //torrentQueues := Storage.FetchQueues(db) var singleTorrent *torrent.Torrent for _, liveTorrent := range tclient.Torrents() { //matching the torrent from storage to the live torrent @@ -78,6 +80,7 @@ func CheckTorrents(c *cron.Cron, db *storm.DB, tclient *torrent.Client, torrentL StopTorrent(singleTorrent, singleTorrentFromStorage, db) } if len(torrentQueues.ActiveTorrents) < config.MaxActiveTorrents && singleTorrentFromStorage.TorrentStatus == "Queued" { + Logger.WithFields(logrus.Fields{"Action: Adding Torrent to Active Queue": singleTorrentFromStorage.TorrentName}).Info() AddTorrentToActive(singleTorrentFromStorage, singleTorrent, db) } if (calculatedCompletedSize == singleTorrentFromStorage.TorrentSize) && (singleTorrentFromStorage.TorrentMoved == false) { //if we are done downloading and haven't moved torrent yet @@ -97,17 +100,7 @@ func CheckTorrents(c *cron.Cron, db *storm.DB, tclient *torrent.Client, torrentL } } - ValidateQueues(db, config, tclient) //Ensure we don't have too many in activeQueue - if (len(torrentQueues.ActiveTorrents) < config.MaxActiveTorrents) && (len(torrentQueues.QueuedTorrents) > 0) { //If there is room for another torrent in active torrents, add it. - torrentToAdd := torrentQueues.QueuedTorrents[0] - for _, singleTorrent := range tclient.Torrents() { - if torrentToAdd == singleTorrent.InfoHash().AsString() { - singleTorrentFromStorage := Storage.FetchTorrentFromStorage(db, torrentToAdd) - AddTorrentToActive(&singleTorrentFromStorage, singleTorrent, db) - } - } - } - + ValidateQueues(db, config, tclient) //Ensure we don't have too many in activeQueue }) } diff --git a/engine/engine.go b/engine/engine.go index 2b384db7..d3a40a40 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -189,7 +189,6 @@ func AddTorrent(clientTorrent *torrent.Torrent, torrentLocalStorage Storage.Torr //CreateInitialTorrentArray adds all the torrents on program start from the database func CreateInitialTorrentArray(tclient *torrent.Client, TorrentLocalArray []*Storage.TorrentLocal, db *storm.DB, config Settings.FullClientSettings) { for _, singleTorrentFromStorage := range TorrentLocalArray { - var singleTorrent *torrent.Torrent var err error if singleTorrentFromStorage.TorrentType == "file" { //if it is a file pull it from the uploaded torrent folder @@ -203,7 +202,6 @@ func CreateInitialTorrentArray(tclient *torrent.Client, TorrentLocalArray []*Sto if err != nil { continue } - } if len(singleTorrentFromStorage.InfoBytes) == 0 { //TODO.. kind of a fringe scenario.. not sure if needed since the db should always have the infobytes timeOut := timeOutInfo(singleTorrent, 45) @@ -219,13 +217,17 @@ func CreateInitialTorrentArray(tclient *torrent.Client, TorrentLocalArray []*Sto Logger.WithFields(logrus.Fields{"torrentFile": singleTorrent.Name(), "error": err}).Error("Unable to add infobytes to the torrent!") } torrentQueues := Storage.FetchQueues(db) + if singleTorrentFromStorage.TorrentStatus == "Stopped" { + singleTorrent.SetMaxEstablishedConns(0) + continue + } if len(torrentQueues.ActiveTorrents) == 0 && len(torrentQueues.QueuedTorrents) == 0 { // If empty, run through all the torrents and assign them - if len(torrentQueues.ActiveTorrents) < Config.MaxActiveTorrents && singleTorrentFromStorage.TorrentStatus != "Stopped" { + if len(torrentQueues.ActiveTorrents) < Config.MaxActiveTorrents { if singleTorrentFromStorage.TorrentStatus == "Completed" || singleTorrentFromStorage.TorrentStatus == "Seeding" { Logger.WithFields(logrus.Fields{"Torrent Name": singleTorrentFromStorage.TorrentName}).Info("Completed Torrents have lower priority, adding to Queued") AddTorrentToQueue(singleTorrentFromStorage, singleTorrent, db) } else { - Logger.WithFields(logrus.Fields{"Torrent Name": singleTorrentFromStorage.TorrentName}).Info("Adding Torrent to Active Queue") + Logger.WithFields(logrus.Fields{"Torrent Name": singleTorrentFromStorage.TorrentName}).Info("Adding Torrent to Active Queue (Initial Torrent Load)") AddTorrentToActive(singleTorrentFromStorage, singleTorrent, db) } } else { @@ -236,7 +238,8 @@ func CreateInitialTorrentArray(tclient *torrent.Client, TorrentLocalArray []*Sto if singleTorrentFromStorage.TorrentStatus == "Queued" { AddTorrentToQueue(singleTorrentFromStorage, singleTorrent, db) } else { - if len(torrentQueues.ActiveTorrents) < Config.MaxActiveTorrents && singleTorrentFromStorage.TorrentStatus != "Stopped" { + if len(torrentQueues.ActiveTorrents) < Config.MaxActiveTorrents { + Logger.WithFields(logrus.Fields{"Torrent Name": singleTorrentFromStorage.TorrentName}).Info("Adding Torrent to Active Queue (Initial Torrent Load Second)") AddTorrentToActive(singleTorrentFromStorage, singleTorrent, db) } else { AddTorrentToQueue(singleTorrentFromStorage, singleTorrent, db) @@ -248,7 +251,7 @@ func CreateInitialTorrentArray(tclient *torrent.Client, TorrentLocalArray []*Sto } torrentQueues := Storage.FetchQueues(db) if len(torrentQueues.ActiveTorrents) < config.MaxActiveTorrents && len(torrentQueues.QueuedTorrents) > 0 { //after all the torrents are added, see if out active torrent list isn't full, then add from the queue - Logger.WithFields(logrus.Fields{"Max Active: ": config.MaxActiveTorrents, "Current : ": torrentQueues.ActiveTorrents}).Debug("Adding Torrents from queue to active to fill...") + Logger.WithFields(logrus.Fields{"Max Active: ": config.MaxActiveTorrents, "Current : ": torrentQueues.ActiveTorrents}).Info("Adding Torrents from queue to active to fill...") maxCanSend := config.MaxActiveTorrents - len(torrentQueues.ActiveTorrents) if maxCanSend > len(torrentQueues.QueuedTorrents) { maxCanSend = len(torrentQueues.QueuedTorrents) @@ -286,12 +289,14 @@ func CreateRunningTorrentArray(tclient *torrent.Client, TorrentLocalArray []*Sto //Handling deleted torrents here if singleTorrentFromStorage.TorrentStatus == "Dropped" { Logger.WithFields(logrus.Fields{"selection": singleTorrentFromStorage.TorrentName}).Info("Deleting just the torrent") + DeleteTorrentFromQueues(singleTorrentFromStorage.Hash, db) singleTorrent.Drop() Storage.DelTorrentLocalStorage(db, singleTorrentFromStorage.Hash) } if singleTorrentFromStorage.TorrentStatus == "DroppedData" { Logger.WithFields(logrus.Fields{"selection": singleTorrentFromStorage.TorrentName}).Info("Deleting torrent and data") singleTorrent.Drop() + DeleteTorrentFromQueues(singleTorrentFromStorage.Hash, db) Storage.DelTorrentLocalStorageAndFiles(db, singleTorrentFromStorage.Hash, Config.TorrentConfig.DataDir) } if singleTorrentFromStorage.TorrentType == "file" { //if it is a file pull it from the uploaded torrent folder @@ -352,7 +357,6 @@ func CreateRunningTorrentArray(tclient *torrent.Client, TorrentLocalArray []*Sto RunningTorrentArray = append(RunningTorrentArray, *fullClientDB) } - ValidateQueues(db, config, tclient) //Ensure we don't have too many in activeQueue return RunningTorrentArray } diff --git a/engine/engineHelpers.go b/engine/engineHelpers.go index 2201f2af..121fdd77 100644 --- a/engine/engineHelpers.go +++ b/engine/engineHelpers.go @@ -186,7 +186,6 @@ func CalculateUploadRatio(t *torrent.Torrent, c *ClientDB) string { //StopTorrent stops the torrent, updates the database and sends a message. Since stoptorrent is called by each loop (individually) no need to call an array func StopTorrent(singleTorrent *torrent.Torrent, torrentLocalStorage *Storage.TorrentLocal, db *storm.DB) { - torrentQueues := Storage.FetchQueues(db) if torrentLocalStorage.TorrentStatus == "Stopped" { //if we are already stopped Logger.WithFields(logrus.Fields{"Torrent Name": torrentLocalStorage.TorrentName}).Info("Torrent Already Stopped, returning...") return @@ -194,24 +193,19 @@ func StopTorrent(singleTorrent *torrent.Torrent, torrentLocalStorage *Storage.To torrentLocalStorage.TorrentStatus = "Stopped" torrentLocalStorage.MaxConnections = 0 singleTorrent.SetMaxEstablishedConns(0) - for _, torrentHash := range torrentQueues.ActiveTorrents { //pulling it out of activetorrents - if torrentHash == singleTorrent.InfoHash().String() { - DeleteTorrentFromQueues(singleTorrent.InfoHash().String(), db) - } - } - for _, torrentHash := range torrentQueues.QueuedTorrents { //pulling it out of queuedTorrent - if torrentHash == singleTorrent.InfoHash().String() { - DeleteTorrentFromQueues(singleTorrent.InfoHash().String(), db) - } - } + DeleteTorrentFromQueues(singleTorrent.InfoHash().String(), db) Storage.UpdateStorageTick(db, *torrentLocalStorage) CreateServerPushMessage(ServerPushMessage{MessageType: "serverPushMessage", MessageLevel: "success", Payload: "Torrent Stopped!"}, Conn) - return + Logger.WithFields(logrus.Fields{"Torrent Name": torrentLocalStorage.TorrentName}).Info("Torrent Stopped Success!") } //AddTorrentToActive adds a torrent to the active slice func AddTorrentToActive(torrentLocalStorage *Storage.TorrentLocal, singleTorrent *torrent.Torrent, db *storm.DB) { torrentQueues := Storage.FetchQueues(db) + if torrentLocalStorage.TorrentStatus == "Stopped" { + Logger.WithFields(logrus.Fields{"Torrent Name": torrentLocalStorage.TorrentName}).Info("Torrent set as stopped, skipping add") + return + } for _, torrentHash := range torrentQueues.ActiveTorrents { if torrentHash == singleTorrent.InfoHash().String() { //If torrent already in active skip return @@ -243,7 +237,7 @@ func AddTorrentToActive(torrentLocalStorage *Storage.TorrentLocal, singleTorrent } } } - Logger.WithFields(logrus.Fields{"Torrent Name": torrentLocalStorage.TorrentName}).Info("Adding Torrent to Active Queue") + Logger.WithFields(logrus.Fields{"Torrent Name": torrentLocalStorage.TorrentName}).Info("Adding Torrent to Active Queue (Manual Call)") Storage.UpdateQueues(db, torrentQueues) } @@ -268,20 +262,20 @@ func RemoveTorrentFromActive(torrentLocalStorage *Storage.TorrentLocal, singleTo //DeleteTorrentFromQueues deletes the torrent from all queues (for a stop or delete action) func DeleteTorrentFromQueues(torrentHash string, db *storm.DB) { torrentQueues := Storage.FetchQueues(db) - for x, torrentHashActive := range torrentQueues.ActiveTorrents { + for x, torrentHashActive := range torrentQueues.ActiveTorrents { //FOR EXTRA CAUTION deleting it from both queues in case a mistake occurred. if torrentHash == torrentHashActive { torrentQueues.ActiveTorrents = append(torrentQueues.ActiveTorrents[:x], torrentQueues.ActiveTorrents[x+1:]...) - Storage.UpdateQueues(db, torrentQueues) - } else { - for x, torrentHashQueued := range torrentQueues.QueuedTorrents { - if torrentHash == torrentHashQueued { - torrentQueues.QueuedTorrents = append(torrentQueues.QueuedTorrents[:x], torrentQueues.QueuedTorrents[x+1:]...) - Storage.UpdateQueues(db, torrentQueues) - } - } + Logger.Info("Removing Torrent from Active: ", torrentHash) } } - Logger.WithFields(logrus.Fields{"Torrent Hash": torrentHash}).Info("Removing Torrent from all Queues") + for x, torrentHashQueued := range torrentQueues.QueuedTorrents { //FOR EXTRA CAUTION deleting it from both queues in case a mistake occurred. + if torrentHash == torrentHashQueued { + torrentQueues.QueuedTorrents = append(torrentQueues.QueuedTorrents[:x], torrentQueues.QueuedTorrents[x+1:]...) + Logger.Info("Removing Torrent from Queued", torrentHash) + } + } + Storage.UpdateQueues(db, torrentQueues) + Logger.WithFields(logrus.Fields{"Torrent Hash": torrentHash, "TorrentQueues": torrentQueues}).Info("Removing Torrent from all Queues") } //AddTorrentToQueue adds a torrent to the queue @@ -332,18 +326,24 @@ func ValidateQueues(db *storm.DB, config Settings.FullClientSettings, tclient *t } } torrentQueues = Storage.FetchQueues(db) - for _, singleTorrent := range tclient.Torrents() { //If we have a queued torrent that is missing data, and an active torrent that is seeding, then prioritize the missing data one - for _, queuedTorrent := range torrentQueues.QueuedTorrents { + for _, singleTorrent := range tclient.Torrents() { + singleTorrentFromStorage := Storage.FetchTorrentFromStorage(db, singleTorrent.InfoHash().String()) + if singleTorrentFromStorage.TorrentStatus == "Stopped" { + continue + } + for _, queuedTorrent := range torrentQueues.QueuedTorrents { //If we have a queued torrent that is missing data, and an active torrent that is seeding, then prioritize the missing data one if singleTorrent.InfoHash().String() == queuedTorrent { if singleTorrent.BytesMissing() > 0 { for _, activeTorrent := range torrentQueues.ActiveTorrents { for _, singleActiveTorrent := range tclient.Torrents() { if activeTorrent == singleActiveTorrent.InfoHash().String() { if singleActiveTorrent.Seeding() == true { - singleTorrentFromStorage := Storage.FetchTorrentFromStorage(db, activeTorrent) - RemoveTorrentFromActive(&singleTorrentFromStorage, singleActiveTorrent, db) - singleTorrentFromStorage = Storage.FetchTorrentFromStorage(db, queuedTorrent) - AddTorrentToActive(&singleTorrentFromStorage, singleTorrent, db) + singleActiveTFS := Storage.FetchTorrentFromStorage(db, activeTorrent) + Logger.WithFields(logrus.Fields{"TorrentName": singleActiveTFS.TorrentName}).Info("Seeding, Removing from active to add queued") + RemoveTorrentFromActive(&singleActiveTFS, singleActiveTorrent, db) + singleQueuedTFS := Storage.FetchTorrentFromStorage(db, queuedTorrent) + Logger.WithFields(logrus.Fields{"TorrentName": singleQueuedTFS.TorrentName}).Info("Adding torrent to the queue, not active") + AddTorrentToActive(&singleQueuedTFS, singleTorrent, db) } } } diff --git a/main.go b/main.go index 85e790d9..aa624634 100644 --- a/main.go +++ b/main.go @@ -206,7 +206,7 @@ func main() { } Engine.CheckTorrentWatchFolder(cronEngine, db, tclient, torrentLocalStorage, Config, torrentQueues) //Every 5 minutes the engine will check the specified folder for new .torrent files Engine.RefreshRSSCron(cronEngine, db, tclient, torrentLocalStorage, Config, torrentQueues) // Refresing the RSS feeds on an hourly basis to add torrents that show up in the RSS feed - Engine.CheckTorrents(cronEngine, db, tclient, torrentLocalStorage, Config, torrentQueues, TorrentLocalArray) + Engine.CheckTorrentsCron(cronEngine, db, tclient, Config) router := mux.NewRouter() //setting up the handler for the web backend router.HandleFunc("/", serveHome) //Serving the main page for our SPA @@ -513,15 +513,6 @@ func main() { Logger.WithFields(logrus.Fields{"selection": singleSelection}).Info("Matched for stopping torrents") oldTorrentInfo := Storage.FetchTorrentFromStorage(db, singleTorrent.InfoHash().String()) Engine.StopTorrent(singleTorrent, &oldTorrentInfo, db) - if len(torrentQueues.QueuedTorrents) > 1 { - addTorrent := torrentQueues.QueuedTorrents[:1] - for _, singleTorrent := range runningTorrents { - if singleTorrent.InfoHash().String() == addTorrent[0] { - Engine.AddTorrentToActive(&torrentLocalStorage, singleTorrent, db) - } - } - } - } } } @@ -537,18 +528,6 @@ func main() { oldTorrentInfo := Storage.FetchTorrentFromStorage(db, singleTorrent.InfoHash().String()) torrentQueues = Storage.FetchQueues(db) - for index, activeTorrentHash := range torrentQueues.ActiveTorrents { //If torrent is in the active slice, pull it - if singleTorrent.InfoHash().String() == activeTorrentHash { - singleTorrent.SetMaxEstablishedConns(0) - torrentQueues.ActiveTorrents = append(torrentQueues.ActiveTorrents[:index], torrentQueues.ActiveTorrents[index+1:]...) - } - } - for index, queuedTorrentHash := range torrentQueues.QueuedTorrents { //If torrent is in the queued slice, pull it - if singleTorrent.InfoHash().String() == queuedTorrentHash { - torrentQueues.QueuedTorrents = append(torrentQueues.QueuedTorrents[:index], torrentQueues.QueuedTorrents[index+1:]...) - } - } - Logger.WithFields(logrus.Fields{"selection": singleSelection}).Info("Matched for deleting torrents") if withData { oldTorrentInfo.TorrentStatus = "DroppedData" //Will be cleaned up the next engine loop since deleting a torrent mid loop can cause issues @@ -570,9 +549,12 @@ func main() { if singleTorrent.InfoHash().String() == singleSelection { Logger.WithFields(logrus.Fields{"infoHash": singleTorrent.InfoHash().String()}).Info("Found matching torrent to start") oldTorrentInfo := Storage.FetchTorrentFromStorage(db, singleTorrent.InfoHash().String()) - Engine.AddTorrentToActive(&oldTorrentInfo, singleTorrent, db) Logger.WithFields(logrus.Fields{"Torrent": oldTorrentInfo.TorrentName}).Info("Changing database to torrent running with 80 max connections") + oldTorrentInfo.TorrentStatus = "Running" + oldTorrentInfo.MaxConnections = 80 Storage.UpdateStorageTick(db, oldTorrentInfo) //Updating the torrent status + Engine.AddTorrentToActive(&oldTorrentInfo, singleTorrent, db) + } torrentQueues = Storage.FetchQueues(db) if len(torrentQueues.ActiveTorrents) > Config.MaxActiveTorrents { //Since we are starting a new torrent stop the first torrent in the que if running is full @@ -650,7 +632,6 @@ func main() { } default: - //conn.Close() Logger.WithFields(logrus.Fields{"message": msg}).Info("Unrecognized Message from client... ignoring") return } diff --git a/storage/storage.go b/storage/storage.go index 14a32fc7..1149b3c0 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -77,20 +77,20 @@ type TorrentLocal struct { DateAdded string StoragePath string //The absolute value of the path where the torrent will be moved when completed TempStoragePath string //The absolute path of where the torrent is temporarily stored as it is downloaded - TorrentMoved bool + TorrentMoved bool //If completed has the torrent been moved to the end location TorrentName string - TorrentStatus string - TorrentUploadLimit bool //if true this torrent will bypass the upload storage limit (effectively unlimited) - MaxConnections int + TorrentStatus string //"Stopped", "Running" + TorrentUploadLimit bool //if true this torrent will bypass the upload storage limit (effectively unlimited) + MaxConnections int //Max connections that the torrent can have to it at one time TorrentType string //magnet or .torrent file TorrentFileName string //Should be just the name of the torrent - TorrentFile []byte - Label string - UploadedBytes int64 - DownloadedBytes int64 - TorrentSize int64 //If we cancel a file change the download size since we won't be downloading that file + TorrentFile []byte //If torrent was from .torrent file, store the entire file for re-adding on restart + Label string //User enterable label to sort torrents by + UploadedBytes int64 //Total amount the client has uploaded on this torrent + DownloadedBytes int64 //Total amount the client has downloaded on this torrent + TorrentSize int64 //If we cancel a file change the download size since we won't be downloading that file UploadRatio string - TorrentFilePriority []TorrentFilePriority + TorrentFilePriority []TorrentFilePriority //Slice of all the files the torrent contains and the priority of each file } //SaveConfig saves the config to the database to compare for changes to settings.toml on restart