From a5e9b6745f182fa69102c3d662dae0742a62da66 Mon Sep 17 00:00:00 2001 From: deranjer Date: Mon, 10 Sep 2018 15:18:30 -0400 Subject: [PATCH] Moving Queue and Ratio checks to cron (fixes failure to stop on ratio) --- engine/cronJobs.go | 54 +++++++++++++++++++++++++++++++++++++++++ engine/engine.go | 36 +++------------------------ engine/engineHelpers.go | 45 +++++++++++++++------------------- main.go | 3 +++ storage/storage.go | 3 ++- 5 files changed, 82 insertions(+), 59 deletions(-) diff --git a/engine/cronJobs.go b/engine/cronJobs.go index 21001103..c759180a 100644 --- a/engine/cronJobs.go +++ b/engine/cronJobs.go @@ -57,6 +57,60 @@ func CheckTorrentWatchFolder(c *cron.Cron, db *storm.DB, tclient *torrent.Client }) } +//CheckTorrents runs a upload ratio check, a queue check (essentially anything that should not be frontend dependent) +func CheckTorrents(c *cron.Cron, db *storm.DB, tclient *torrent.Client, torrentLocalStorage Storage.TorrentLocal, config Settings.FullClientSettings, torrentQueues Storage.TorrentQueues, TorrentLocalArray []*Storage.TorrentLocal) { + c.AddFunc("@every 30s", func() { + Logger.Info("Running a torrent Ratio and Queue Check") + for _, singleTorrentFromStorage := range TorrentLocalArray { + //torrentQueues := Storage.FetchQueues(db) + var singleTorrent *torrent.Torrent + for _, liveTorrent := range tclient.Torrents() { //matching the torrent from storage to the live torrent + if singleTorrentFromStorage.Hash == liveTorrent.InfoHash().String() { + singleTorrent = liveTorrent + } + } + //var TempHash metainfo.Hash + //calculatedTotalSize := CalculateDownloadSize(singleTorrentFromStorage, singleTorrent) + calculatedCompletedSize := CalculateCompletedSize(singleTorrentFromStorage, singleTorrent) + //TempHash = singleTorrent.InfoHash() + bytesCompleted := CalculateCompletedSize(singleTorrentFromStorage, singleTorrent) + if float64(singleTorrentFromStorage.UploadedBytes)/float64(bytesCompleted) >= config.SeedRatioStop && singleTorrentFromStorage.TorrentUploadLimit == true { //If storage shows torrent stopped or if it is over the seeding ratio AND is under the global limit + StopTorrent(singleTorrent, singleTorrentFromStorage, db) + } + if len(torrentQueues.ActiveTorrents) < config.MaxActiveTorrents && singleTorrentFromStorage.TorrentStatus == "Queued" { + AddTorrentToActive(singleTorrentFromStorage, singleTorrent, db) + } + if (calculatedCompletedSize == singleTorrentFromStorage.TorrentSize) && (singleTorrentFromStorage.TorrentMoved == false) { //if we are done downloading and haven't moved torrent yet + Logger.WithFields(logrus.Fields{"singleTorrent": singleTorrentFromStorage.TorrentName}).Info("Torrent Completed, moving...") + tStorage := Storage.FetchTorrentFromStorage(db, singleTorrent.InfoHash().String()) //Todo... find a better way to do this in the go-routine currently just to make sure it doesn't trigger multiple times + tStorage.TorrentMoved = true + Storage.UpdateStorageTick(db, tStorage) + go func() { //moving torrent in separate go-routine then verifying that the data is still there and correct + err := MoveAndLeaveSymlink(config, singleTorrent.InfoHash().String(), db, false, "") //can take some time to move file so running this in another thread TODO make this a goroutine and skip this block if the routine is still running + if err != nil { //If we fail, print the error and attempt a retry + Logger.WithFields(logrus.Fields{"singleTorrent": singleTorrentFromStorage.TorrentName, "error": err}).Error("Failed to move Torrent!") + VerifyData(singleTorrent) + tStorage.TorrentMoved = false + Storage.UpdateStorageTick(db, tStorage) + } + }() + } + + } + ValidateQueues(db, config, tclient) //Ensure we don't have too many in activeQueue + if (len(torrentQueues.ActiveTorrents) < config.MaxActiveTorrents) && (len(torrentQueues.QueuedTorrents) > 0) { //If there is room for another torrent in active torrents, add it. + torrentToAdd := torrentQueues.QueuedTorrents[0] + for _, singleTorrent := range tclient.Torrents() { + if torrentToAdd == singleTorrent.InfoHash().AsString() { + singleTorrentFromStorage := Storage.FetchTorrentFromStorage(db, torrentToAdd) + AddTorrentToActive(&singleTorrentFromStorage, singleTorrent, db) + } + } + } + + }) +} + //RefreshRSSCron refreshes all of the RSS feeds on an hourly basis func RefreshRSSCron(c *cron.Cron, db *storm.DB, tclient *torrent.Client, torrentLocalStorage Storage.TorrentLocal, config Settings.FullClientSettings, torrentQueues Storage.TorrentQueues) { c.AddFunc("@hourly", func() { diff --git a/engine/engine.go b/engine/engine.go index 84e0fa22..2b384db7 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -141,6 +141,7 @@ func AddTorrent(clientTorrent *torrent.Torrent, torrentLocalStorage Storage.Torr } var TempHash metainfo.Hash TempHash = clientTorrent.InfoHash() + fmt.Println("GOT INFOHASH", TempHash.String()) allStoredTorrents := Storage.FetchAllStoredTorrents(db) for _, runningTorrentHashes := range allStoredTorrents { if runningTorrentHashes.Hash == TempHash.String() { @@ -274,7 +275,7 @@ func CreateRunningTorrentArray(tclient *torrent.Client, TorrentLocalArray []*Sto for _, singleTorrentFromStorage := range TorrentLocalArray { torrentQueues := Storage.FetchQueues(db) var singleTorrent *torrent.Torrent - var TempHash metainfo.Hash + for _, liveTorrent := range tclient.Torrents() { //matching the torrent from storage to the live torrent if singleTorrentFromStorage.Hash == liveTorrent.InfoHash().String() { singleTorrent = liveTorrent @@ -287,38 +288,22 @@ func CreateRunningTorrentArray(tclient *torrent.Client, TorrentLocalArray []*Sto Logger.WithFields(logrus.Fields{"selection": singleTorrentFromStorage.TorrentName}).Info("Deleting just the torrent") singleTorrent.Drop() Storage.DelTorrentLocalStorage(db, singleTorrentFromStorage.Hash) - } if singleTorrentFromStorage.TorrentStatus == "DroppedData" { Logger.WithFields(logrus.Fields{"selection": singleTorrentFromStorage.TorrentName}).Info("Deleting torrent and data") singleTorrent.Drop() Storage.DelTorrentLocalStorageAndFiles(db, singleTorrentFromStorage.Hash, Config.TorrentConfig.DataDir) - } if singleTorrentFromStorage.TorrentType == "file" { //if it is a file pull it from the uploaded torrent folder fullClientDB.SourceType = "Torrent File" } else { fullClientDB.SourceType = "Magnet Link" } + var TempHash metainfo.Hash + TempHash = singleTorrent.InfoHash() calculatedTotalSize := CalculateDownloadSize(singleTorrentFromStorage, singleTorrent) calculatedCompletedSize := CalculateCompletedSize(singleTorrentFromStorage, singleTorrent) - TempHash = singleTorrent.InfoHash() - if (calculatedCompletedSize == singleTorrentFromStorage.TorrentSize) && (singleTorrentFromStorage.TorrentMoved == false) { //if we are done downloading and haven't moved torrent yet - Logger.WithFields(logrus.Fields{"singleTorrent": singleTorrentFromStorage.TorrentName}).Info("Torrent Completed, moving...") - tStorage := Storage.FetchTorrentFromStorage(db, singleTorrent.InfoHash().String()) //Todo... find a better way to do this in the go-routine currently just to make sure it doesn't trigger multiple times - tStorage.TorrentMoved = true - Storage.UpdateStorageTick(db, tStorage) - go func() { //moving torrent in separate go-routine then verifying that the data is still there and correct - err := MoveAndLeaveSymlink(config, singleTorrent.InfoHash().String(), db, false, "") //can take some time to move file so running this in another thread TODO make this a goroutine and skip this block if the routine is still running - if err != nil { //If we fail, print the error and attempt a retry - Logger.WithFields(logrus.Fields{"singleTorrent": singleTorrentFromStorage.TorrentName, "error": err}).Error("Failed to move Torrent!") - VerifyData(singleTorrent) - tStorage.TorrentMoved = false - Storage.UpdateStorageTick(db, tStorage) - } - }() - } fullStruct := singleTorrent.Stats() activePeersString := strconv.Itoa(fullStruct.ActivePeers) //converting to strings totalPeersString := fmt.Sprintf("%v", fullStruct.TotalPeers) @@ -353,19 +338,6 @@ func CreateRunningTorrentArray(tclient *torrent.Client, TorrentLocalArray []*Sto } CalculateTorrentETA(singleTorrentFromStorage.TorrentSize, calculatedCompletedSize, fullClientDB) //needs to be here since we need the speed calculated before we can estimate the eta. - if (len(torrentQueues.ActiveTorrents) < config.MaxActiveTorrents) && (len(torrentQueues.QueuedTorrents) > 0) { //If there is room for another torrent in active torrents, add it. - var newTorrentHash string - for _, torrentHash := range torrentQueues.QueuedTorrents { - if singleTorrentFromStorage.TorrentStatus != "Stopped" { - newTorrentHash = torrentHash - } - } - for _, torrent := range tclient.Torrents() { - if newTorrentHash == torrent.InfoHash().String() { - AddTorrentToActive(singleTorrentFromStorage, singleTorrent, db) - } - } - } fullClientDB.TotalUploadedSize = HumanizeBytes(float32(fullClientDB.TotalUploadedBytes)) fullClientDB.UploadRatio = CalculateUploadRatio(singleTorrent, fullClientDB) //calculate the upload ratio diff --git a/engine/engineHelpers.go b/engine/engineHelpers.go index abc3e087..2201f2af 100644 --- a/engine/engineHelpers.go +++ b/engine/engineHelpers.go @@ -360,32 +360,25 @@ func CalculateTorrentStatus(t *torrent.Torrent, c *ClientDB, config Settings.Ful c.Status = "Stopped" return } - if float64(c.TotalUploadedBytes)/float64(bytesCompleted) >= config.SeedRatioStop && tFromStorage.TorrentUploadLimit == true { //If storage shows torrent stopped or if it is over the seeding ratio AND is under the global limit - StopTorrent(t, tFromStorage, db) - - } else { //Only has 2 states in storage, stopped or running, so we know it should be running, and the websocket request handled updating the database with connections and status - for _, torrentHash := range torrentQueues.QueuedTorrents { - if tFromStorage.Hash == torrentHash { - c.Status = "Queued" - return - } - } - if len(torrentQueues.ActiveTorrents) < config.MaxActiveTorrents && tFromStorage.TorrentStatus == "Queued" { - AddTorrentToActive(tFromStorage, t, db) - } - bytesMissing := totalSize - bytesCompleted - c.MaxConnections = 80 - t.SetMaxEstablishedConns(80) - if t.Seeding() && t.Stats().ActivePeers > 0 && bytesMissing == 0 { - c.Status = "Seeding" - } else if t.Stats().ActivePeers > 0 && bytesMissing > 0 { - c.Status = "Downloading" - } else if t.Stats().ActivePeers == 0 && bytesMissing == 0 { - c.Status = "Completed" - } else if t.Stats().ActivePeers == 0 && bytesMissing > 0 { - c.Status = "Awaiting Peers" - } else { - c.Status = "Unknown" + //Only has 2 states in storage, stopped or running, so we know it should be running, and the websocket request handled updating the database with connections and status + for _, torrentHash := range torrentQueues.QueuedTorrents { + if tFromStorage.Hash == torrentHash { + c.Status = "Queued" + return } } + bytesMissing := totalSize - bytesCompleted + c.MaxConnections = 80 + t.SetMaxEstablishedConns(80) + if t.Seeding() && t.Stats().ActivePeers > 0 && bytesMissing == 0 { + c.Status = "Seeding" + } else if t.Stats().ActivePeers > 0 && bytesMissing > 0 { + c.Status = "Downloading" + } else if t.Stats().ActivePeers == 0 && bytesMissing == 0 { + c.Status = "Completed" + } else if t.Stats().ActivePeers == 0 && bytesMissing > 0 { + c.Status = "Awaiting Peers" + } else { + c.Status = "Unknown" + } } diff --git a/main.go b/main.go index 94985db4..85e790d9 100644 --- a/main.go +++ b/main.go @@ -65,6 +65,7 @@ func handleAuthentication(conn *websocket.Conn, db *storm.DB) { Logger.WithFields(logrus.Fields{"error": err, "SuppliedToken": clientAuthToken}).Error("Unable to read authentication message") } fmt.Println("Authstring", clientAuthToken) + //clientAuthToken = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJjbGllbnROYW1lIjoiZ29Ub3JyZW50V2ViVUkiLCJpc3MiOiJnb1RvcnJlbnRTZXJ2ZXIifQ.Lfqp9tm06CY4XfrqnNDeVLkq9c7rsbibDrUdPko8ffQ" signingKeyStruct := Storage.FetchJWTTokens(db) singingKey := signingKeyStruct.SigningKey token, err := jwt.Parse(clientAuthToken, func(token *jwt.Token) (interface{}, error) { @@ -77,6 +78,7 @@ func handleAuthentication(conn *websocket.Conn, db *storm.DB) { authFail := Engine.AuthResponse{MessageType: "authResponse", Payload: "Parsing of Token failed, ensure you have the correct token! Closing Connection"} conn.WriteJSON(authFail) Logger.WithFields(logrus.Fields{"error": err, "SuppliedToken": token}).Error("Unable to parse token!") + fmt.Println("ENTIRE SUPPLIED TOKEN:", token, "CLIENTAUTHTOKEN", clientAuthToken) conn.Close() return } @@ -204,6 +206,7 @@ func main() { } Engine.CheckTorrentWatchFolder(cronEngine, db, tclient, torrentLocalStorage, Config, torrentQueues) //Every 5 minutes the engine will check the specified folder for new .torrent files Engine.RefreshRSSCron(cronEngine, db, tclient, torrentLocalStorage, Config, torrentQueues) // Refresing the RSS feeds on an hourly basis to add torrents that show up in the RSS feed + Engine.CheckTorrents(cronEngine, db, tclient, torrentLocalStorage, Config, torrentQueues, TorrentLocalArray) router := mux.NewRouter() //setting up the handler for the web backend router.HandleFunc("/", serveHome) //Serving the main page for our SPA diff --git a/storage/storage.go b/storage/storage.go index 20b1af93..14a32fc7 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -1,6 +1,7 @@ package storage import ( + "fmt" "os" "path/filepath" @@ -146,11 +147,11 @@ func FetchAllStoredTorrents(torrentStorage *storm.DB) (torrentLocalArray []*Torr //AddTorrentLocalStorage is called when adding a new torrent via any method, requires the boltdb pointer and the torrentlocal struct func AddTorrentLocalStorage(torrentStorage *storm.DB, local TorrentLocal) { Logger.WithFields(logrus.Fields{"Storage Path": local.StoragePath, "Torrent": local.TorrentName, "File(if file)": local.TorrentFileName}).Info("Adding new Torrent to database") + fmt.Println("ENTIRE TORRENT", local) err := torrentStorage.Save(&local) if err != nil { Logger.WithFields(logrus.Fields{"database": torrentStorage, "error": err}).Error("Error adding new Torrent to database!") } - } //DelTorrentLocalStorage is called to delete a torrent when we fail (for whatever reason to load the information for it). Deleted by HASH matching.