Moving Queue and Ratio checks to cron (fixes failure to stop on ratio)
This commit is contained in:
@@ -57,6 +57,60 @@ func CheckTorrentWatchFolder(c *cron.Cron, db *storm.DB, tclient *torrent.Client
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//CheckTorrents runs a upload ratio check, a queue check (essentially anything that should not be frontend dependent)
|
||||||
|
func CheckTorrents(c *cron.Cron, db *storm.DB, tclient *torrent.Client, torrentLocalStorage Storage.TorrentLocal, config Settings.FullClientSettings, torrentQueues Storage.TorrentQueues, TorrentLocalArray []*Storage.TorrentLocal) {
|
||||||
|
c.AddFunc("@every 30s", func() {
|
||||||
|
Logger.Info("Running a torrent Ratio and Queue Check")
|
||||||
|
for _, singleTorrentFromStorage := range TorrentLocalArray {
|
||||||
|
//torrentQueues := Storage.FetchQueues(db)
|
||||||
|
var singleTorrent *torrent.Torrent
|
||||||
|
for _, liveTorrent := range tclient.Torrents() { //matching the torrent from storage to the live torrent
|
||||||
|
if singleTorrentFromStorage.Hash == liveTorrent.InfoHash().String() {
|
||||||
|
singleTorrent = liveTorrent
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//var TempHash metainfo.Hash
|
||||||
|
//calculatedTotalSize := CalculateDownloadSize(singleTorrentFromStorage, singleTorrent)
|
||||||
|
calculatedCompletedSize := CalculateCompletedSize(singleTorrentFromStorage, singleTorrent)
|
||||||
|
//TempHash = singleTorrent.InfoHash()
|
||||||
|
bytesCompleted := CalculateCompletedSize(singleTorrentFromStorage, singleTorrent)
|
||||||
|
if float64(singleTorrentFromStorage.UploadedBytes)/float64(bytesCompleted) >= config.SeedRatioStop && singleTorrentFromStorage.TorrentUploadLimit == true { //If storage shows torrent stopped or if it is over the seeding ratio AND is under the global limit
|
||||||
|
StopTorrent(singleTorrent, singleTorrentFromStorage, db)
|
||||||
|
}
|
||||||
|
if len(torrentQueues.ActiveTorrents) < config.MaxActiveTorrents && singleTorrentFromStorage.TorrentStatus == "Queued" {
|
||||||
|
AddTorrentToActive(singleTorrentFromStorage, singleTorrent, db)
|
||||||
|
}
|
||||||
|
if (calculatedCompletedSize == singleTorrentFromStorage.TorrentSize) && (singleTorrentFromStorage.TorrentMoved == false) { //if we are done downloading and haven't moved torrent yet
|
||||||
|
Logger.WithFields(logrus.Fields{"singleTorrent": singleTorrentFromStorage.TorrentName}).Info("Torrent Completed, moving...")
|
||||||
|
tStorage := Storage.FetchTorrentFromStorage(db, singleTorrent.InfoHash().String()) //Todo... find a better way to do this in the go-routine currently just to make sure it doesn't trigger multiple times
|
||||||
|
tStorage.TorrentMoved = true
|
||||||
|
Storage.UpdateStorageTick(db, tStorage)
|
||||||
|
go func() { //moving torrent in separate go-routine then verifying that the data is still there and correct
|
||||||
|
err := MoveAndLeaveSymlink(config, singleTorrent.InfoHash().String(), db, false, "") //can take some time to move file so running this in another thread TODO make this a goroutine and skip this block if the routine is still running
|
||||||
|
if err != nil { //If we fail, print the error and attempt a retry
|
||||||
|
Logger.WithFields(logrus.Fields{"singleTorrent": singleTorrentFromStorage.TorrentName, "error": err}).Error("Failed to move Torrent!")
|
||||||
|
VerifyData(singleTorrent)
|
||||||
|
tStorage.TorrentMoved = false
|
||||||
|
Storage.UpdateStorageTick(db, tStorage)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
ValidateQueues(db, config, tclient) //Ensure we don't have too many in activeQueue
|
||||||
|
if (len(torrentQueues.ActiveTorrents) < config.MaxActiveTorrents) && (len(torrentQueues.QueuedTorrents) > 0) { //If there is room for another torrent in active torrents, add it.
|
||||||
|
torrentToAdd := torrentQueues.QueuedTorrents[0]
|
||||||
|
for _, singleTorrent := range tclient.Torrents() {
|
||||||
|
if torrentToAdd == singleTorrent.InfoHash().AsString() {
|
||||||
|
singleTorrentFromStorage := Storage.FetchTorrentFromStorage(db, torrentToAdd)
|
||||||
|
AddTorrentToActive(&singleTorrentFromStorage, singleTorrent, db)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
//RefreshRSSCron refreshes all of the RSS feeds on an hourly basis
|
//RefreshRSSCron refreshes all of the RSS feeds on an hourly basis
|
||||||
func RefreshRSSCron(c *cron.Cron, db *storm.DB, tclient *torrent.Client, torrentLocalStorage Storage.TorrentLocal, config Settings.FullClientSettings, torrentQueues Storage.TorrentQueues) {
|
func RefreshRSSCron(c *cron.Cron, db *storm.DB, tclient *torrent.Client, torrentLocalStorage Storage.TorrentLocal, config Settings.FullClientSettings, torrentQueues Storage.TorrentQueues) {
|
||||||
c.AddFunc("@hourly", func() {
|
c.AddFunc("@hourly", func() {
|
||||||
|
@@ -141,6 +141,7 @@ func AddTorrent(clientTorrent *torrent.Torrent, torrentLocalStorage Storage.Torr
|
|||||||
}
|
}
|
||||||
var TempHash metainfo.Hash
|
var TempHash metainfo.Hash
|
||||||
TempHash = clientTorrent.InfoHash()
|
TempHash = clientTorrent.InfoHash()
|
||||||
|
fmt.Println("GOT INFOHASH", TempHash.String())
|
||||||
allStoredTorrents := Storage.FetchAllStoredTorrents(db)
|
allStoredTorrents := Storage.FetchAllStoredTorrents(db)
|
||||||
for _, runningTorrentHashes := range allStoredTorrents {
|
for _, runningTorrentHashes := range allStoredTorrents {
|
||||||
if runningTorrentHashes.Hash == TempHash.String() {
|
if runningTorrentHashes.Hash == TempHash.String() {
|
||||||
@@ -274,7 +275,7 @@ func CreateRunningTorrentArray(tclient *torrent.Client, TorrentLocalArray []*Sto
|
|||||||
for _, singleTorrentFromStorage := range TorrentLocalArray {
|
for _, singleTorrentFromStorage := range TorrentLocalArray {
|
||||||
torrentQueues := Storage.FetchQueues(db)
|
torrentQueues := Storage.FetchQueues(db)
|
||||||
var singleTorrent *torrent.Torrent
|
var singleTorrent *torrent.Torrent
|
||||||
var TempHash metainfo.Hash
|
|
||||||
for _, liveTorrent := range tclient.Torrents() { //matching the torrent from storage to the live torrent
|
for _, liveTorrent := range tclient.Torrents() { //matching the torrent from storage to the live torrent
|
||||||
if singleTorrentFromStorage.Hash == liveTorrent.InfoHash().String() {
|
if singleTorrentFromStorage.Hash == liveTorrent.InfoHash().String() {
|
||||||
singleTorrent = liveTorrent
|
singleTorrent = liveTorrent
|
||||||
@@ -287,38 +288,22 @@ func CreateRunningTorrentArray(tclient *torrent.Client, TorrentLocalArray []*Sto
|
|||||||
Logger.WithFields(logrus.Fields{"selection": singleTorrentFromStorage.TorrentName}).Info("Deleting just the torrent")
|
Logger.WithFields(logrus.Fields{"selection": singleTorrentFromStorage.TorrentName}).Info("Deleting just the torrent")
|
||||||
singleTorrent.Drop()
|
singleTorrent.Drop()
|
||||||
Storage.DelTorrentLocalStorage(db, singleTorrentFromStorage.Hash)
|
Storage.DelTorrentLocalStorage(db, singleTorrentFromStorage.Hash)
|
||||||
|
|
||||||
}
|
}
|
||||||
if singleTorrentFromStorage.TorrentStatus == "DroppedData" {
|
if singleTorrentFromStorage.TorrentStatus == "DroppedData" {
|
||||||
Logger.WithFields(logrus.Fields{"selection": singleTorrentFromStorage.TorrentName}).Info("Deleting torrent and data")
|
Logger.WithFields(logrus.Fields{"selection": singleTorrentFromStorage.TorrentName}).Info("Deleting torrent and data")
|
||||||
singleTorrent.Drop()
|
singleTorrent.Drop()
|
||||||
Storage.DelTorrentLocalStorageAndFiles(db, singleTorrentFromStorage.Hash, Config.TorrentConfig.DataDir)
|
Storage.DelTorrentLocalStorageAndFiles(db, singleTorrentFromStorage.Hash, Config.TorrentConfig.DataDir)
|
||||||
|
|
||||||
}
|
}
|
||||||
if singleTorrentFromStorage.TorrentType == "file" { //if it is a file pull it from the uploaded torrent folder
|
if singleTorrentFromStorage.TorrentType == "file" { //if it is a file pull it from the uploaded torrent folder
|
||||||
fullClientDB.SourceType = "Torrent File"
|
fullClientDB.SourceType = "Torrent File"
|
||||||
} else {
|
} else {
|
||||||
fullClientDB.SourceType = "Magnet Link"
|
fullClientDB.SourceType = "Magnet Link"
|
||||||
}
|
}
|
||||||
|
var TempHash metainfo.Hash
|
||||||
|
TempHash = singleTorrent.InfoHash()
|
||||||
|
|
||||||
calculatedTotalSize := CalculateDownloadSize(singleTorrentFromStorage, singleTorrent)
|
calculatedTotalSize := CalculateDownloadSize(singleTorrentFromStorage, singleTorrent)
|
||||||
calculatedCompletedSize := CalculateCompletedSize(singleTorrentFromStorage, singleTorrent)
|
calculatedCompletedSize := CalculateCompletedSize(singleTorrentFromStorage, singleTorrent)
|
||||||
TempHash = singleTorrent.InfoHash()
|
|
||||||
if (calculatedCompletedSize == singleTorrentFromStorage.TorrentSize) && (singleTorrentFromStorage.TorrentMoved == false) { //if we are done downloading and haven't moved torrent yet
|
|
||||||
Logger.WithFields(logrus.Fields{"singleTorrent": singleTorrentFromStorage.TorrentName}).Info("Torrent Completed, moving...")
|
|
||||||
tStorage := Storage.FetchTorrentFromStorage(db, singleTorrent.InfoHash().String()) //Todo... find a better way to do this in the go-routine currently just to make sure it doesn't trigger multiple times
|
|
||||||
tStorage.TorrentMoved = true
|
|
||||||
Storage.UpdateStorageTick(db, tStorage)
|
|
||||||
go func() { //moving torrent in separate go-routine then verifying that the data is still there and correct
|
|
||||||
err := MoveAndLeaveSymlink(config, singleTorrent.InfoHash().String(), db, false, "") //can take some time to move file so running this in another thread TODO make this a goroutine and skip this block if the routine is still running
|
|
||||||
if err != nil { //If we fail, print the error and attempt a retry
|
|
||||||
Logger.WithFields(logrus.Fields{"singleTorrent": singleTorrentFromStorage.TorrentName, "error": err}).Error("Failed to move Torrent!")
|
|
||||||
VerifyData(singleTorrent)
|
|
||||||
tStorage.TorrentMoved = false
|
|
||||||
Storage.UpdateStorageTick(db, tStorage)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
fullStruct := singleTorrent.Stats()
|
fullStruct := singleTorrent.Stats()
|
||||||
activePeersString := strconv.Itoa(fullStruct.ActivePeers) //converting to strings
|
activePeersString := strconv.Itoa(fullStruct.ActivePeers) //converting to strings
|
||||||
totalPeersString := fmt.Sprintf("%v", fullStruct.TotalPeers)
|
totalPeersString := fmt.Sprintf("%v", fullStruct.TotalPeers)
|
||||||
@@ -353,19 +338,6 @@ func CreateRunningTorrentArray(tclient *torrent.Client, TorrentLocalArray []*Sto
|
|||||||
}
|
}
|
||||||
CalculateTorrentETA(singleTorrentFromStorage.TorrentSize, calculatedCompletedSize, fullClientDB) //needs to be here since we need the speed calculated before we can estimate the eta.
|
CalculateTorrentETA(singleTorrentFromStorage.TorrentSize, calculatedCompletedSize, fullClientDB) //needs to be here since we need the speed calculated before we can estimate the eta.
|
||||||
|
|
||||||
if (len(torrentQueues.ActiveTorrents) < config.MaxActiveTorrents) && (len(torrentQueues.QueuedTorrents) > 0) { //If there is room for another torrent in active torrents, add it.
|
|
||||||
var newTorrentHash string
|
|
||||||
for _, torrentHash := range torrentQueues.QueuedTorrents {
|
|
||||||
if singleTorrentFromStorage.TorrentStatus != "Stopped" {
|
|
||||||
newTorrentHash = torrentHash
|
|
||||||
}
|
|
||||||
}
|
|
||||||
for _, torrent := range tclient.Torrents() {
|
|
||||||
if newTorrentHash == torrent.InfoHash().String() {
|
|
||||||
AddTorrentToActive(singleTorrentFromStorage, singleTorrent, db)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
fullClientDB.TotalUploadedSize = HumanizeBytes(float32(fullClientDB.TotalUploadedBytes))
|
fullClientDB.TotalUploadedSize = HumanizeBytes(float32(fullClientDB.TotalUploadedBytes))
|
||||||
fullClientDB.UploadRatio = CalculateUploadRatio(singleTorrent, fullClientDB) //calculate the upload ratio
|
fullClientDB.UploadRatio = CalculateUploadRatio(singleTorrent, fullClientDB) //calculate the upload ratio
|
||||||
|
|
||||||
|
@@ -360,32 +360,25 @@ func CalculateTorrentStatus(t *torrent.Torrent, c *ClientDB, config Settings.Ful
|
|||||||
c.Status = "Stopped"
|
c.Status = "Stopped"
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if float64(c.TotalUploadedBytes)/float64(bytesCompleted) >= config.SeedRatioStop && tFromStorage.TorrentUploadLimit == true { //If storage shows torrent stopped or if it is over the seeding ratio AND is under the global limit
|
//Only has 2 states in storage, stopped or running, so we know it should be running, and the websocket request handled updating the database with connections and status
|
||||||
StopTorrent(t, tFromStorage, db)
|
for _, torrentHash := range torrentQueues.QueuedTorrents {
|
||||||
|
if tFromStorage.Hash == torrentHash {
|
||||||
} else { //Only has 2 states in storage, stopped or running, so we know it should be running, and the websocket request handled updating the database with connections and status
|
c.Status = "Queued"
|
||||||
for _, torrentHash := range torrentQueues.QueuedTorrents {
|
return
|
||||||
if tFromStorage.Hash == torrentHash {
|
|
||||||
c.Status = "Queued"
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if len(torrentQueues.ActiveTorrents) < config.MaxActiveTorrents && tFromStorage.TorrentStatus == "Queued" {
|
|
||||||
AddTorrentToActive(tFromStorage, t, db)
|
|
||||||
}
|
|
||||||
bytesMissing := totalSize - bytesCompleted
|
|
||||||
c.MaxConnections = 80
|
|
||||||
t.SetMaxEstablishedConns(80)
|
|
||||||
if t.Seeding() && t.Stats().ActivePeers > 0 && bytesMissing == 0 {
|
|
||||||
c.Status = "Seeding"
|
|
||||||
} else if t.Stats().ActivePeers > 0 && bytesMissing > 0 {
|
|
||||||
c.Status = "Downloading"
|
|
||||||
} else if t.Stats().ActivePeers == 0 && bytesMissing == 0 {
|
|
||||||
c.Status = "Completed"
|
|
||||||
} else if t.Stats().ActivePeers == 0 && bytesMissing > 0 {
|
|
||||||
c.Status = "Awaiting Peers"
|
|
||||||
} else {
|
|
||||||
c.Status = "Unknown"
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
bytesMissing := totalSize - bytesCompleted
|
||||||
|
c.MaxConnections = 80
|
||||||
|
t.SetMaxEstablishedConns(80)
|
||||||
|
if t.Seeding() && t.Stats().ActivePeers > 0 && bytesMissing == 0 {
|
||||||
|
c.Status = "Seeding"
|
||||||
|
} else if t.Stats().ActivePeers > 0 && bytesMissing > 0 {
|
||||||
|
c.Status = "Downloading"
|
||||||
|
} else if t.Stats().ActivePeers == 0 && bytesMissing == 0 {
|
||||||
|
c.Status = "Completed"
|
||||||
|
} else if t.Stats().ActivePeers == 0 && bytesMissing > 0 {
|
||||||
|
c.Status = "Awaiting Peers"
|
||||||
|
} else {
|
||||||
|
c.Status = "Unknown"
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
3
main.go
3
main.go
@@ -65,6 +65,7 @@ func handleAuthentication(conn *websocket.Conn, db *storm.DB) {
|
|||||||
Logger.WithFields(logrus.Fields{"error": err, "SuppliedToken": clientAuthToken}).Error("Unable to read authentication message")
|
Logger.WithFields(logrus.Fields{"error": err, "SuppliedToken": clientAuthToken}).Error("Unable to read authentication message")
|
||||||
}
|
}
|
||||||
fmt.Println("Authstring", clientAuthToken)
|
fmt.Println("Authstring", clientAuthToken)
|
||||||
|
//clientAuthToken = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJjbGllbnROYW1lIjoiZ29Ub3JyZW50V2ViVUkiLCJpc3MiOiJnb1RvcnJlbnRTZXJ2ZXIifQ.Lfqp9tm06CY4XfrqnNDeVLkq9c7rsbibDrUdPko8ffQ"
|
||||||
signingKeyStruct := Storage.FetchJWTTokens(db)
|
signingKeyStruct := Storage.FetchJWTTokens(db)
|
||||||
singingKey := signingKeyStruct.SigningKey
|
singingKey := signingKeyStruct.SigningKey
|
||||||
token, err := jwt.Parse(clientAuthToken, func(token *jwt.Token) (interface{}, error) {
|
token, err := jwt.Parse(clientAuthToken, func(token *jwt.Token) (interface{}, error) {
|
||||||
@@ -77,6 +78,7 @@ func handleAuthentication(conn *websocket.Conn, db *storm.DB) {
|
|||||||
authFail := Engine.AuthResponse{MessageType: "authResponse", Payload: "Parsing of Token failed, ensure you have the correct token! Closing Connection"}
|
authFail := Engine.AuthResponse{MessageType: "authResponse", Payload: "Parsing of Token failed, ensure you have the correct token! Closing Connection"}
|
||||||
conn.WriteJSON(authFail)
|
conn.WriteJSON(authFail)
|
||||||
Logger.WithFields(logrus.Fields{"error": err, "SuppliedToken": token}).Error("Unable to parse token!")
|
Logger.WithFields(logrus.Fields{"error": err, "SuppliedToken": token}).Error("Unable to parse token!")
|
||||||
|
fmt.Println("ENTIRE SUPPLIED TOKEN:", token, "CLIENTAUTHTOKEN", clientAuthToken)
|
||||||
conn.Close()
|
conn.Close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -204,6 +206,7 @@ func main() {
|
|||||||
}
|
}
|
||||||
Engine.CheckTorrentWatchFolder(cronEngine, db, tclient, torrentLocalStorage, Config, torrentQueues) //Every 5 minutes the engine will check the specified folder for new .torrent files
|
Engine.CheckTorrentWatchFolder(cronEngine, db, tclient, torrentLocalStorage, Config, torrentQueues) //Every 5 minutes the engine will check the specified folder for new .torrent files
|
||||||
Engine.RefreshRSSCron(cronEngine, db, tclient, torrentLocalStorage, Config, torrentQueues) // Refresing the RSS feeds on an hourly basis to add torrents that show up in the RSS feed
|
Engine.RefreshRSSCron(cronEngine, db, tclient, torrentLocalStorage, Config, torrentQueues) // Refresing the RSS feeds on an hourly basis to add torrents that show up in the RSS feed
|
||||||
|
Engine.CheckTorrents(cronEngine, db, tclient, torrentLocalStorage, Config, torrentQueues, TorrentLocalArray)
|
||||||
|
|
||||||
router := mux.NewRouter() //setting up the handler for the web backend
|
router := mux.NewRouter() //setting up the handler for the web backend
|
||||||
router.HandleFunc("/", serveHome) //Serving the main page for our SPA
|
router.HandleFunc("/", serveHome) //Serving the main page for our SPA
|
||||||
|
@@ -1,6 +1,7 @@
|
|||||||
package storage
|
package storage
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
|
||||||
@@ -146,11 +147,11 @@ func FetchAllStoredTorrents(torrentStorage *storm.DB) (torrentLocalArray []*Torr
|
|||||||
//AddTorrentLocalStorage is called when adding a new torrent via any method, requires the boltdb pointer and the torrentlocal struct
|
//AddTorrentLocalStorage is called when adding a new torrent via any method, requires the boltdb pointer and the torrentlocal struct
|
||||||
func AddTorrentLocalStorage(torrentStorage *storm.DB, local TorrentLocal) {
|
func AddTorrentLocalStorage(torrentStorage *storm.DB, local TorrentLocal) {
|
||||||
Logger.WithFields(logrus.Fields{"Storage Path": local.StoragePath, "Torrent": local.TorrentName, "File(if file)": local.TorrentFileName}).Info("Adding new Torrent to database")
|
Logger.WithFields(logrus.Fields{"Storage Path": local.StoragePath, "Torrent": local.TorrentName, "File(if file)": local.TorrentFileName}).Info("Adding new Torrent to database")
|
||||||
|
fmt.Println("ENTIRE TORRENT", local)
|
||||||
err := torrentStorage.Save(&local)
|
err := torrentStorage.Save(&local)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
Logger.WithFields(logrus.Fields{"database": torrentStorage, "error": err}).Error("Error adding new Torrent to database!")
|
Logger.WithFields(logrus.Fields{"database": torrentStorage, "error": err}).Error("Error adding new Torrent to database!")
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//DelTorrentLocalStorage is called to delete a torrent when we fail (for whatever reason to load the information for it). Deleted by HASH matching.
|
//DelTorrentLocalStorage is called to delete a torrent when we fail (for whatever reason to load the information for it). Deleted by HASH matching.
|
||||||
|
Reference in New Issue
Block a user