diff --git a/.gitignore b/.gitignore index 47fa7c6d..7db963ab 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ downloads/ downloading/ downloaded/ uploadedTorrents/ +boltBrowser/ storage.db.lock storage.db storage.db.old diff --git a/config.toml b/config.toml index 071a3753..da101dd4 100644 --- a/config.toml +++ b/config.toml @@ -13,10 +13,10 @@ #Limits your upload and download speed globally, all are averages and not burst protected (usually burst on start). #Low = ~.05MB/s, Medium = ~.5MB/s, High = ~1.5MB/s - UploadRateLimit = "Medium" #Options are "Low", "Medium", "High", "Unlimited" #Unlimited is default - DownloadRateLimit = "Medium" + UploadRateLimit = "Unlimited" #Options are "Low", "Medium", "High", "Unlimited" #Unlimited is default + DownloadRateLimit = "Unlimited" #Maximum number of allowed active torrents, the rest will be queued - MaxActiveTorrents = 1 + MaxActiveTorrents = 2 [goTorrentWebUI] #Basic goTorrentWebUI authentication (not terribly secure, implemented in JS, password is hashed to SHA256, not salted, basically don't depend on this if you require very good security) diff --git a/engine/cronJobs.go b/engine/cronJobs.go index ca725992..21001103 100644 --- a/engine/cronJobs.go +++ b/engine/cronJobs.go @@ -22,7 +22,7 @@ func InitializeCronEngine() *cron.Cron { } //CheckTorrentWatchFolder adds torrents from a watch folder //TODO see if you can use filepath.Abs instead of changing directory -func CheckTorrentWatchFolder(c *cron.Cron, db *storm.DB, tclient *torrent.Client, torrentLocalStorage Storage.TorrentLocal, config Settings.FullClientSettings, activeTorrents []string, queuedTorrents []string) { +func CheckTorrentWatchFolder(c *cron.Cron, db *storm.DB, tclient *torrent.Client, torrentLocalStorage Storage.TorrentLocal, config Settings.FullClientSettings, torrentQueues Storage.TorrentQueues) { c.AddFunc("@every 5m", func() { Logger.WithFields(logrus.Fields{"Watch Folder": config.TorrentWatchFolder}).Info("Running the watch folder cron job") torrentFiles, err := ioutil.ReadDir(config.TorrentWatchFolder) @@ -50,7 +50,7 @@ func CheckTorrentWatchFolder(c *cron.Cron, db *storm.DB, tclient *torrent.Client os.Remove(fullFilePathAbs) //delete the torrent after adding it and copying it over Logger.WithFields(logrus.Fields{"Source Folder": fullFilePathAbs, "Destination Folder": fullNewFilePathAbs, "Torrent": file.Name()}).Info("Added torrent from watch folder, and moved torrent file") - AddTorrent(clientTorrent, torrentLocalStorage, db, "file", fullNewFilePathAbs, config.DefaultMoveFolder, "default", config, activeTorrents, queuedTorrents) + AddTorrent(clientTorrent, torrentLocalStorage, db, "file", fullNewFilePathAbs, config.DefaultMoveFolder, "default", config) } } @@ -58,7 +58,7 @@ func CheckTorrentWatchFolder(c *cron.Cron, db *storm.DB, tclient *torrent.Client } //RefreshRSSCron refreshes all of the RSS feeds on an hourly basis -func RefreshRSSCron(c *cron.Cron, db *storm.DB, tclient *torrent.Client, torrentLocalStorage Storage.TorrentLocal, config Settings.FullClientSettings, activeTorrents []string, queuedTorrents []string) { +func RefreshRSSCron(c *cron.Cron, db *storm.DB, tclient *torrent.Client, torrentLocalStorage Storage.TorrentLocal, config Settings.FullClientSettings, torrentQueues Storage.TorrentQueues) { c.AddFunc("@hourly", func() { torrentHashHistory := Storage.FetchHashHistory(db) RSSFeedStore := Storage.FetchRSSFeeds(db) @@ -86,7 +86,7 @@ func RefreshRSSCron(c *cron.Cron, db *storm.DB, tclient *torrent.Client, torrent Logger.WithFields(logrus.Fields{"err": err, "Torrent": RSSTorrent.Title}).Warn("Unable to add torrent to torrent client!") break //break out of the loop entirely for this message since we hit an error } - AddTorrent(clientTorrent, torrentLocalStorage, db, "magnet", "", config.DefaultMoveFolder, "RSS", config, activeTorrents, queuedTorrents) //TODO let user specify torrent default storage location and let change on fly + AddTorrent(clientTorrent, torrentLocalStorage, db, "magnet", "", config.DefaultMoveFolder, "RSS", config) //TODO let user specify torrent default storage location and let change on fly singleFeed.Torrents = append(singleFeed.Torrents, singleRSSTorrent) } diff --git a/engine/doneTorrentActions.go b/engine/doneTorrentActions.go index 4fea1a16..73760218 100644 --- a/engine/doneTorrentActions.go +++ b/engine/doneTorrentActions.go @@ -68,7 +68,6 @@ func MoveAndLeaveSymlink(config Settings.FullClientSettings, tHash string, db *s Logger.WithFields(logrus.Fields{"Old File Path": oldFilePath, "New File Path": newFilePath, "error": err}).Error("Error Copying Folder!") return err } - //os.Chmod(newFilePath, 0777) err = filepath.Walk(newFilePath, func(path string, info os.FileInfo, err error) error { //Walking the file path to change the permissions if err != nil { Logger.WithFields(logrus.Fields{"file": path, "error": err}).Error("Potentially non-critical error, continuing..") diff --git a/engine/engine.go b/engine/engine.go index 417f3448..3fa126d6 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -130,19 +130,19 @@ func readTorrentFileFromDB(element *Storage.TorrentLocal, tclient *torrent.Clien } //StartTorrents attempts to start torrents by adding them to the active torrents, then the queue, and by increasing their connections (if they were stopped) -func StartTorrents(clientTorrent *torrent.Client, torrentHashes []string, activeTorrents []string, queuedTorrents []string) { +func StartTorrents(clientTorrent *torrent.Client, torrentHashes []string, torrentQueues Storage.TorrentQueues) { } //AddTorrent creates the storage.db entry and starts A NEW TORRENT and adds to the running torrent array -func AddTorrent(clientTorrent *torrent.Torrent, torrentLocalStorage Storage.TorrentLocal, torrentDbStorage *storm.DB, torrentType, torrentFilePathAbs, torrentStoragePath, labelValue string, config Settings.FullClientSettings, activeTorrents []string, queuedTorrents []string) { +func AddTorrent(clientTorrent *torrent.Torrent, torrentLocalStorage Storage.TorrentLocal, db *storm.DB, torrentType, torrentFilePathAbs, torrentStoragePath, labelValue string, config Settings.FullClientSettings) { timedOut := timeOutInfo(clientTorrent, 45) //seeing if adding the torrent times out (giving 45 seconds) if timedOut { //if we fail to add the torrent return return } var TempHash metainfo.Hash TempHash = clientTorrent.InfoHash() - allStoredTorrents := Storage.FetchAllStoredTorrents(torrentDbStorage) + allStoredTorrents := Storage.FetchAllStoredTorrents(db) for _, runningTorrentHashes := range allStoredTorrents { if runningTorrentHashes.Hash == TempHash.String() { Logger.WithFields(logrus.Fields{"Hash": TempHash.String()}).Info("Torrent has duplicate hash to already running torrent... will not add to storage") @@ -180,27 +180,23 @@ func AddTorrent(clientTorrent *torrent.Torrent, torrentLocalStorage Storage.Torr TorrentFilePriorityArray = append(TorrentFilePriorityArray, torrentFilePriority) } - torrentLocalStorage.TorrentFilePriority = TorrentFilePriorityArray - - if len(activeTorrents) < config.MaxActiveTorrents { - if len(queuedTorrents) > 0 { - queuedTorrents = append(queuedTorrents, clientTorrent.InfoHash().String()) - CreateServerPushMessage(ServerPushMessage{MessageType: "serverPushMessage", MessageLevel: "success", Payload: "Torrent queued!"}, Conn) - torrentLocalStorage.QueuedStatus = "Queued" - } else { - clientTorrent.NewReader() - activeTorrents = append(activeTorrents, clientTorrent.InfoHash().String()) - CreateServerPushMessage(ServerPushMessage{MessageType: "serverPushMessage", MessageLevel: "success", Payload: "Torrent added!"}, Conn) - torrentLocalStorage.QueuedStatus = "Active" - } + torrentQueues := Storage.FetchQueues(db) + if len(torrentQueues.ActiveTorrents) < Config.MaxActiveTorrents { + AddTorrentToActive(&torrentLocalStorage, clientTorrent, db) + fmt.Println("Adding New torrent to active! ", clientTorrent.Name()) + } else { + AddTorrentToQueue(&torrentLocalStorage, clientTorrent, db) + fmt.Println("Adding New torrent to queued! ", clientTorrent.Name()) } - Storage.AddTorrentLocalStorage(torrentDbStorage, torrentLocalStorage) //writing all of the data to the database + Storage.AddTorrentLocalStorage(db, torrentLocalStorage) //writing all of the data to the database } //CreateInitialTorrentArray adds all the torrents on program start from the database -func CreateInitialTorrentArray(tclient *torrent.Client, TorrentLocalArray []*Storage.TorrentLocal, db *storm.DB, config Settings.FullClientSettings, activeTorrents []string, queuedTorrents []string) { +func CreateInitialTorrentArray(tclient *torrent.Client, TorrentLocalArray []*Storage.TorrentLocal, db *storm.DB, config Settings.FullClientSettings) { for _, singleTorrentFromStorage := range TorrentLocalArray { + torrentQueues := Storage.FetchQueues(db) + fmt.Println("Stored Queues From DB.............................", torrentQueues) var singleTorrent *torrent.Torrent var err error if singleTorrentFromStorage.TorrentType == "file" { //if it is a file pull it from the uploaded torrent folder @@ -229,45 +225,46 @@ func CreateInitialTorrentArray(tclient *torrent.Client, TorrentLocalArray []*Sto if err != nil { Logger.WithFields(logrus.Fields{"torrentFile": singleTorrent.Name(), "error": err}).Error("Unable to add infobytes to the torrent!") } - if singleTorrentFromStorage.QueuedStatus == "Active" { - singleTorrent.NewReader() - activeTorrents = append(activeTorrents, singleTorrent.InfoHash().String()) - } else if singleTorrentFromStorage.QueuedStatus == "Queued" { - queuedTorrents = append(queuedTorrents, singleTorrent.InfoHash().String()) - } - - if singleTorrentFromStorage.TorrentStatus != "Completed" && singleTorrentFromStorage.TorrentStatus != "Stopped" { - fmt.Println("checking about queueing torrent") - if len(activeTorrents) < config.MaxActiveTorrents { - fmt.Println("ActiveTOrrents", activeTorrents) - singleTorrent.NewReader() - singleTorrentFromStorage.QueuedStatus = "Active" - activeTorrents = append(activeTorrents, singleTorrent.InfoHash().String()) //adding the torrent hash to the queue + fmt.Println("Creating initial FOR: ", singleTorrentFromStorage.Hash, singleTorrentFromStorage.TorrentName) + if len(torrentQueues.ActiveTorrents) < Config.MaxActiveTorrents && singleTorrentFromStorage.TorrentStatus != "Stopped" { + if singleTorrentFromStorage.TorrentStatus == "Completed" || singleTorrentFromStorage.TorrentStatus == "Seeding" { + fmt.Println("Completed Torrents have lower prio.. adding to Queued ", singleTorrent.Name()) + AddTorrentToQueue(singleTorrentFromStorage, singleTorrent, db) } else { - queuedTorrents = append(queuedTorrents, singleTorrent.InfoHash().String()) - fmt.Println("Queuing torrent") - singleTorrentFromStorage.QueuedStatus = "Queued" + fmt.Println("adding torrent to active NOW ", singleTorrent.Name()) + AddTorrentToActive(singleTorrentFromStorage, singleTorrent, db) } + } else { + fmt.Println("adding torrent to queued NOW ", singleTorrent.Name()) + AddTorrentToQueue(singleTorrentFromStorage, singleTorrent, db) } + //Storage.UpdateQueues(db, torrentQueues) Storage.UpdateStorageTick(db, *singleTorrentFromStorage) } - if len(activeTorrents) < config.MaxActiveTorrents && len(queuedTorrents) > 0 { //after all the torrents are added, see if out active torrent list isn't full, then add from the queue - fmt.Println("adding torrents from queue (if any in there)") - maxCanSend := config.MaxActiveTorrents - len(activeTorrents) + torrentQueues := Storage.FetchQueues(db) + if len(torrentQueues.ActiveTorrents) < config.MaxActiveTorrents && len(torrentQueues.QueuedTorrents) > 0 { //after all the torrents are added, see if out active torrent list isn't full, then add from the queue + fmt.Println("adding torrents from queue (if any in there)", "MaX: ", config.MaxActiveTorrents, "Current: ", torrentQueues.ActiveTorrents) + maxCanSend := config.MaxActiveTorrents - len(torrentQueues.ActiveTorrents) torrentsToStart := make([]string, maxCanSend) - for i, torrentHash := range queuedTorrents { + for i, torrentHash := range torrentQueues.QueuedTorrents { torrentsToStart[i] = torrentHash + for _, singleTorrent := range tclient.Torrents() { + if singleTorrent.InfoHash().String() == torrentHash { + singleTorrentFromStorage := Storage.FetchTorrentFromStorage(db, torrentHash) + AddTorrentToActive(&singleTorrentFromStorage, singleTorrent, db) + } + } + } - StartTorrents(tclient, torrentsToStart, activeTorrents, queuedTorrents) - } - SetFilePriority(tclient, db) //Setting the desired file priority from storage + fmt.Println("Initial Setup queues", torrentQueues) } //CreateRunningTorrentArray creates the entire torrent list to pass to client -func CreateRunningTorrentArray(tclient *torrent.Client, TorrentLocalArray []*Storage.TorrentLocal, PreviousTorrentArray []ClientDB, config Settings.FullClientSettings, db *storm.DB, activeTorrents []string, queuedTorrents []string) (RunningTorrentArray []ClientDB) { - +func CreateRunningTorrentArray(tclient *torrent.Client, TorrentLocalArray []*Storage.TorrentLocal, PreviousTorrentArray []ClientDB, config Settings.FullClientSettings, db *storm.DB) (RunningTorrentArray []ClientDB) { + torrentQueues := Storage.FetchQueues(db) + fmt.Println("torrentQueues", torrentQueues) for _, singleTorrentFromStorage := range TorrentLocalArray { var singleTorrent *torrent.Torrent var TempHash metainfo.Hash @@ -278,11 +275,12 @@ func CreateRunningTorrentArray(tclient *torrent.Client, TorrentLocalArray []*Sto } tickUpdateStruct := Storage.TorrentLocal{} //we are shoving the tick updates into a torrentlocal struct to pass to storage happens at the end of the routine fullClientDB := new(ClientDB) - + //Handling deleted torrents here if singleTorrentFromStorage.TorrentStatus == "Dropped" { Logger.WithFields(logrus.Fields{"selection": singleTorrentFromStorage.TorrentName}).Info("Deleting just the torrent") singleTorrent.Drop() Storage.DelTorrentLocalStorage(db, singleTorrentFromStorage.Hash) + } if singleTorrentFromStorage.TorrentStatus == "DroppedData" { Logger.WithFields(logrus.Fields{"selection": singleTorrentFromStorage.TorrentName}).Info("Deleting torrent and data") @@ -351,21 +349,18 @@ func CreateRunningTorrentArray(tclient *torrent.Client, TorrentLocalArray []*Sto } CalculateTorrentETA(singleTorrentFromStorage.TorrentSize, calculatedCompletedSize, fullClientDB) //needs to be here since we need the speed calculated before we can estimate the eta. - if (len(activeTorrents) < config.MaxActiveTorrents) && (len(queuedTorrents) > 0) { //If there is room for another torrent in active torrents, add it. - newTorrentHash := queuedTorrents[0] - fmt.Println("Moving Torrent to active queue") + if (len(torrentQueues.ActiveTorrents) < config.MaxActiveTorrents) && (len(torrentQueues.QueuedTorrents) > 0) { //If there is room for another torrent in active torrents, add it. + newTorrentHash := torrentQueues.QueuedTorrents[0] for _, torrent := range tclient.Torrents() { if newTorrentHash == torrent.InfoHash().String() { - torrent.NewReader() - activeTorrents = append(activeTorrents, newTorrentHash) - singleTorrentFromStorage.QueuedStatus = "Active" + AddTorrentToActive(singleTorrentFromStorage, singleTorrent, db) } } } fullClientDB.TotalUploadedSize = HumanizeBytes(float32(fullClientDB.TotalUploadedBytes)) fullClientDB.UploadRatio = CalculateUploadRatio(singleTorrent, fullClientDB) //calculate the upload ratio - CalculateTorrentStatus(singleTorrent, fullClientDB, config, singleTorrentFromStorage, calculatedCompletedSize, calculatedTotalSize, activeTorrents, queuedTorrents) //add torrents to the queue, remove from queue, etc + CalculateTorrentStatus(singleTorrent, fullClientDB, config, singleTorrentFromStorage, calculatedCompletedSize, calculatedTotalSize, torrentQueues, db) //add torrents to the queue, remove from queue, etc tickUpdateStruct.UploadRatio = fullClientDB.UploadRatio tickUpdateStruct.TorrentSize = calculatedTotalSize diff --git a/engine/engineHelpers.go b/engine/engineHelpers.go index e70f9515..ef129314 100644 --- a/engine/engineHelpers.go +++ b/engine/engineHelpers.go @@ -184,26 +184,147 @@ func CalculateUploadRatio(t *torrent.Torrent, c *ClientDB) string { return uploadRatio } -//CalculateTorrentStatus is used to determine what the STATUS column of the frontend will display ll2 -func CalculateTorrentStatus(t *torrent.Torrent, c *ClientDB, config Settings.FullClientSettings, tFromStorage *storage.TorrentLocal, bytesCompleted int64, totalSize int64, activeTorrents []string, queuedTorrents []string) { - if (tFromStorage.TorrentStatus == "Stopped") || (float64(c.TotalUploadedBytes)/float64(bytesCompleted) >= config.SeedRatioStop && tFromStorage.TorrentUploadLimit == true) { //If storage shows torrent stopped or if it is over the seeding ratio AND is under the global limit - c.Status = "Stopped" - c.MaxConnections = 0 - t.SetMaxEstablishedConns(0) - for i, hash := range activeTorrents { //If the torrent is stopped, pull it from the active torrent array - if tFromStorage.Hash == hash { - activeTorrents = append(activeTorrents[:i], activeTorrents[i+1:]...) +//StopTorrent stops the torrent, updates the database and sends a message. Since stoptorrent is called by each loop (individually) no need to call an array +func StopTorrent(singleTorrent *torrent.Torrent, torrentLocalStorage *Storage.TorrentLocal, db *storm.DB) { + torrentQueues := Storage.FetchQueues(db) + if torrentLocalStorage.TorrentStatus == "Stopped" { //if we are already stopped + fmt.Println("Already stopped, returning....") + return + } + torrentLocalStorage.TorrentStatus = "Stopped" + torrentLocalStorage.MaxConnections = 0 + singleTorrent.SetMaxEstablishedConns(0) + fmt.Println("Getting ready to stop....!!!!!!!!") + for _, torrentHash := range torrentQueues.ActiveTorrents { //pulling it out of activetorrents + if torrentHash == singleTorrent.InfoHash().String() { + DeleteTorrentFromQueues(singleTorrent.InfoHash().String(), db) + } + } + fmt.Println("LOCALSTORAGE", *torrentLocalStorage, torrentLocalStorage) + Storage.UpdateStorageTick(db, *torrentLocalStorage) + CreateServerPushMessage(ServerPushMessage{MessageType: "serverPushMessage", MessageLevel: "success", Payload: "Torrent Stopped!"}, Conn) + return +} + +//AddTorrentToActive adds a torrent to the active slice +func AddTorrentToActive(torrentLocalStorage *Storage.TorrentLocal, singleTorrent *torrent.Torrent, db *storm.DB) { + torrentQueues := Storage.FetchQueues(db) + for _, torrentHash := range torrentQueues.ActiveTorrents { + if torrentHash == singleTorrent.InfoHash().String() { //If torrent already in active skip + return + } + } + for index, queuedTorrentHash := range torrentQueues.QueuedTorrents { //Removing from the queued torrents if in queued torrents + if queuedTorrentHash == singleTorrent.InfoHash().String() { + torrentQueues.QueuedTorrents = append(torrentQueues.QueuedTorrents[:index], torrentQueues.QueuedTorrents[index+1:]...) + } + } + singleTorrent.NewReader() + singleTorrent.SetMaxEstablishedConns(80) + torrentQueues.ActiveTorrents = append(torrentQueues.ActiveTorrents, singleTorrent.InfoHash().String()) + torrentLocalStorage.TorrentStatus = "Running" + torrentLocalStorage.MaxConnections = 80 + Logger.WithFields(logrus.Fields{"torrentName": singleTorrent.Name()}).Info("Moving torrent to active, active slice contains ", len(torrentQueues.ActiveTorrents), " torrents: ", torrentQueues.ActiveTorrents) + for _, file := range singleTorrent.Files() { + for _, sentFile := range torrentLocalStorage.TorrentFilePriority { + if file.DisplayPath() == sentFile.TorrentFilePath { + switch sentFile.TorrentFilePriority { + case "High": + file.SetPriority(torrent.PiecePriorityHigh) + case "Normal": + file.SetPriority(torrent.PiecePriorityNormal) + case "Cancel": + file.SetPriority(torrent.PiecePriorityNone) + default: + file.SetPriority(torrent.PiecePriorityNormal) + } } } + } + fmt.Println("Updating Queues from Add To active....", torrentQueues) + Storage.UpdateQueues(db, torrentQueues) +} + +//RemoveTorrentFromActive forces a torrent to be removed from the active list if the max limit is already there and user forces a new torrent to be added +func RemoveTorrentFromActive(torrentLocalStorage *Storage.TorrentLocal, singleTorrent *torrent.Torrent, db *storm.DB) { + torrentQueues := Storage.FetchQueues(db) + for x, torrentHash := range torrentQueues.ActiveTorrents { + if torrentHash == singleTorrent.InfoHash().String() { + torrentQueues.ActiveTorrents = append(torrentQueues.ActiveTorrents[:x], torrentQueues.ActiveTorrents[x+1:]...) + torrentQueues.QueuedTorrents = append(torrentQueues.QueuedTorrents, torrentHash) + torrentLocalStorage.TorrentStatus = "Queued" + torrentLocalStorage.MaxConnections = 0 + singleTorrent.SetMaxEstablishedConns(0) + Storage.UpdateQueues(db, torrentQueues) + AddTorrentToQueue(torrentLocalStorage, singleTorrent, db) //Adding the lasttorrent from active to queued + Storage.UpdateStorageTick(db, *torrentLocalStorage) + } + } + +} + +//DeleteTorrentFromQueues deletes the torrent from all queues (for a stop or delete action) +func DeleteTorrentFromQueues(torrentHash string, db *storm.DB) { + torrentQueues := Storage.FetchQueues(db) + for x, torrentHashActive := range torrentQueues.ActiveTorrents { + if torrentHash == torrentHashActive { + torrentQueues.ActiveTorrents = append(torrentQueues.ActiveTorrents[:x], torrentQueues.ActiveTorrents[x+1:]...) + Storage.UpdateQueues(db, torrentQueues) + } else { + for x, torrentHashQueued := range torrentQueues.QueuedTorrents { + if torrentHash == torrentHashQueued { + torrentQueues.ActiveTorrents = append(torrentQueues.QueuedTorrents[:x], torrentQueues.QueuedTorrents[x+1:]...) + Storage.UpdateQueues(db, torrentQueues) + } + } + } + } +} + +//AddTorrentToQueue adds a torrent to the queue +func AddTorrentToQueue(torrentLocalStorage *Storage.TorrentLocal, singleTorrent *torrent.Torrent, db *storm.DB) { + torrentQueues := Storage.FetchQueues(db) + for _, torrentHash := range torrentQueues.QueuedTorrents { + if singleTorrent.InfoHash().String() == torrentHash { //don't add duplicate to que but do everything else (TODO, maybe find a better way?) + fmt.Println("TORRENTQUEUES", torrentQueues) + singleTorrent.SetMaxEstablishedConns(0) + torrentLocalStorage.MaxConnections = 0 + torrentLocalStorage.TorrentStatus = "Queued" + Logger.WithFields(logrus.Fields{"TorrentName": torrentLocalStorage.TorrentName}).Info("Adding torrent to the queue, not active") + Storage.UpdateStorageTick(db, *torrentLocalStorage) + return + } + } + torrentQueues.QueuedTorrents = append(torrentQueues.QueuedTorrents, singleTorrent.InfoHash().String()) + fmt.Println("TORRENTQUEUES", torrentQueues) + singleTorrent.SetMaxEstablishedConns(0) + torrentLocalStorage.MaxConnections = 0 + torrentLocalStorage.TorrentStatus = "Queued" + Logger.WithFields(logrus.Fields{"TorrentName": torrentLocalStorage.TorrentName}).Info("Adding torrent to the queue, not active") + Storage.UpdateQueues(db, torrentQueues) + Storage.UpdateStorageTick(db, *torrentLocalStorage) +} + +//CalculateTorrentStatus is used to determine what the STATUS column of the frontend will display ll2 +func CalculateTorrentStatus(t *torrent.Torrent, c *ClientDB, config Settings.FullClientSettings, tFromStorage *storage.TorrentLocal, bytesCompleted int64, totalSize int64, torrentQueues Storage.TorrentQueues, db *storm.DB) { + if float64(c.TotalUploadedBytes)/float64(bytesCompleted) >= config.SeedRatioStop && tFromStorage.TorrentUploadLimit == true { //If storage shows torrent stopped or if it is over the seeding ratio AND is under the global limit + StopTorrent(t, tFromStorage, db) } else { //Only has 2 states in storage, stopped or running, so we know it should be running, and the websocket request handled updating the database with connections and status - for _, torrentHash := range queuedTorrents { + for _, torrentHash := range torrentQueues.QueuedTorrents { if tFromStorage.Hash == torrentHash { - fmt.Println("Setting torrent to queued") + //AddTorrentToQueue(tFromStorage, t, db) + //Logger.WithFields(logrus.Fields{"TorrentName": tFromStorage.TorrentName, "connections": tFromStorage.MaxConnections}).Info("Torrent is queued, skipping") + //t.SetMaxEstablishedConns(0) c.Status = "Queued" return } } + if len(torrentQueues.ActiveTorrents) < config.MaxActiveTorrents && tFromStorage.TorrentStatus == "Queued" { + fmt.Println("HERE..............ADDDING TO ACTIVE", t.Name()) + AddTorrentToActive(tFromStorage, t, db) + c.Status = "Downloading" + } bytesMissing := totalSize - bytesCompleted c.MaxConnections = 80 t.SetMaxEstablishedConns(80) diff --git a/main.go b/main.go index e51e406c..5e5d6a8b 100644 --- a/main.go +++ b/main.go @@ -94,8 +94,7 @@ func main() { Engine.Logger = Logger //Injecting the logger into all the packages Storage.Logger = Logger Settings.Logger = Logger - var activeTorrents []string - var queuedTorrents []string + var torrentQueues = Storage.TorrentQueues{} Config := Settings.FullClientSettingsNew() //grabbing from settings.go Engine.Config = Config if Config.LoggingOutput == "file" { @@ -131,20 +130,27 @@ func main() { if err != nil { Logger.WithFields(logrus.Fields{"error": err}).Fatalf("Error creating torrent client: %s") } - fmt.Printf("%+v\n", Config.TorrentConfig) + //fmt.Printf("%+v\n", Config.TorrentConfig) db, err := storm.Open("storage.db") //initializing the boltDB store that contains all the added torrents if err != nil { Logger.WithFields(logrus.Fields{"error": err}).Fatal("Error opening/creating storage.db") + } else { + Logger.WithFields(logrus.Fields{"error": err}).Info("Opening or creating storage.db...") } defer db.Close() //defering closing the database until the program closes + err = db.One("ID", 5, &torrentQueues) + if err != nil { //Create the torrent que database + Logger.WithFields(logrus.Fields{"error": err}).Info("No Queue database found, assuming first run, creating database") + torrentQueues.ID = 5 + db.Save(&torrentQueues) + } + tokens := Storage.IssuedTokensList{} //if first run setting up the authentication tokens var signingKey []byte err = db.One("ID", 3, &tokens) if err != nil { Logger.WithFields(logrus.Fields{"RSSFeedStore": tokens, "error": err}).Info("No Tokens database found, assuming first run, generating token...") - fmt.Println("Error", err) - fmt.Println("MAIN TOKEN: %+v\n", tokens) tokens.ID = 3 //creating the initial store claims := Settings.GoTorrentClaims{ "goTorrentWebUI", @@ -191,12 +197,14 @@ func main() { TorrentLocalArray := Storage.FetchAllStoredTorrents(db) //pulling in all the already added torrents - this is an array of ALL of the local storage torrents, they will be added back in via hash if TorrentLocalArray != nil { //the first creation of the running torrent array //since we are adding all of them in we use a coroutine... just allows the web ui to load then it will load in the torrents - go Engine.CreateInitialTorrentArray(tclient, TorrentLocalArray, db, Config, activeTorrents, queuedTorrents) //adding all of the stored torrents into the torrent client + Engine.CreateInitialTorrentArray(tclient, TorrentLocalArray, db, Config) //adding all of the stored torrents into the torrent client + //TODO add GO to this } else { Logger.Info("Database is empty, no torrents loaded") } - Engine.CheckTorrentWatchFolder(cronEngine, db, tclient, torrentLocalStorage, Config, activeTorrents, queuedTorrents) //Every 5 minutes the engine will check the specified folder for new .torrent files - Engine.RefreshRSSCron(cronEngine, db, tclient, torrentLocalStorage, Config, activeTorrents, queuedTorrents) // Refresing the RSS feeds on an hourly basis to add torrents that show up in the RSS feed + fmt.Println("HERE ACTIVE", torrentQueues.ActiveTorrents) + Engine.CheckTorrentWatchFolder(cronEngine, db, tclient, torrentLocalStorage, Config, torrentQueues) //Every 5 minutes the engine will check the specified folder for new .torrent files + Engine.RefreshRSSCron(cronEngine, db, tclient, torrentLocalStorage, Config, torrentQueues) // Refresing the RSS feeds on an hourly basis to add torrents that show up in the RSS feed router := mux.NewRouter() //setting up the handler for the web backend router.HandleFunc("/", serveHome) //Serving the main page for our SPA @@ -204,7 +212,7 @@ func main() { http.Handle("/", router) router.HandleFunc("/api", func(w http.ResponseWriter, r *http.Request) { //TODO, remove this TorrentLocalArray = Storage.FetchAllStoredTorrents(db) - RunningTorrentArray = Engine.CreateRunningTorrentArray(tclient, TorrentLocalArray, PreviousTorrentArray, Config, db, activeTorrents, queuedTorrents) //Updates the RunningTorrentArray with the current client data as well + RunningTorrentArray = Engine.CreateRunningTorrentArray(tclient, TorrentLocalArray, PreviousTorrentArray, Config, db) //Updates the RunningTorrentArray with the current client data as well var torrentlistArray = new(Engine.TorrentList) torrentlistArray.MessageType = "torrentList" //setting the type of message torrentlistArray.ClientDBstruct = RunningTorrentArray //the full JSON that includes the number of torrents as the root @@ -277,8 +285,8 @@ func main() { Logger.WithFields(logrus.Fields{"message": msg}).Debug("Client Requested TorrentList Update") go func() { //running updates in separate thread so can still accept commands - TorrentLocalArray = Storage.FetchAllStoredTorrents(db) //Required to re-read th database since we write to the DB and this will pull the changes from it - RunningTorrentArray = Engine.CreateRunningTorrentArray(tclient, TorrentLocalArray, PreviousTorrentArray, Config, db, activeTorrents, queuedTorrents) //Updates the RunningTorrentArray with the current client data as well + TorrentLocalArray = Storage.FetchAllStoredTorrents(db) //Required to re-read th database since we write to the DB and this will pull the changes from it + RunningTorrentArray = Engine.CreateRunningTorrentArray(tclient, TorrentLocalArray, PreviousTorrentArray, Config, db) //Updates the RunningTorrentArray with the current client data as well PreviousTorrentArray = RunningTorrentArray torrentlistArray := Engine.TorrentList{MessageType: "torrentList", ClientDBstruct: RunningTorrentArray, Totaltorrents: len(RunningTorrentArray)} Logger.WithFields(logrus.Fields{"torrentList": torrentlistArray, "previousTorrentList": PreviousTorrentArray}).Debug("Previous and Current Torrent Lists for sending to client") @@ -301,7 +309,7 @@ func main() { Logger.WithFields(logrus.Fields{"message": msg}).Info("Client Requested Torrents by Label") label := payloadData["Label"].(string) torrentsByLabel := Storage.FetchTorrentsByLabel(db, label) - RunningTorrentArray = Engine.CreateRunningTorrentArray(tclient, TorrentLocalArray, PreviousTorrentArray, Config, db, activeTorrents, queuedTorrents) + RunningTorrentArray = Engine.CreateRunningTorrentArray(tclient, TorrentLocalArray, PreviousTorrentArray, Config, db) labelRunningArray := []Engine.ClientDB{} for _, torrent := range RunningTorrentArray { //Ranging over the running torrents and if the hashes match we have torrents by label for _, label := range torrentsByLabel { @@ -424,7 +432,7 @@ func main() { } Logger.WithFields(logrus.Fields{"clientTorrent": clientTorrent, "magnetLink": magnetLink}).Info("Adding torrent to client!") Engine.CreateServerPushMessage(Engine.ServerPushMessage{MessageType: "serverPushMessage", MessageLevel: "info", Payload: "Received MagnetLink"}, conn) - go Engine.AddTorrent(clientTorrent, torrentLocalStorage, db, "magnet", "", storageValue, labelValue, Config, activeTorrents, queuedTorrents) //starting the torrent and creating local DB entry + go Engine.AddTorrent(clientTorrent, torrentLocalStorage, db, "magnet", "", storageValue, labelValue, Config) //starting the torrent and creating local DB entry } @@ -471,7 +479,7 @@ func main() { Engine.CreateServerPushMessage(Engine.ServerPushMessage{MessageType: "serverPushMessage", MessageLevel: "error", Payload: "Unable to add Torrent to torrent server"}, conn) } Logger.WithFields(logrus.Fields{"clienttorrent": clientTorrent.Name(), "filename": filePathAbs}).Info("Added torrent") - go Engine.AddTorrent(clientTorrent, torrentLocalStorage, db, "file", filePathAbs, storageValue, labelValue, Config, activeTorrents, queuedTorrents) + go Engine.AddTorrent(clientTorrent, torrentLocalStorage, db, "file", filePathAbs, storageValue, labelValue, Config) case "stopTorrents": torrentHashes := payloadData["TorrentHashes"].([]interface{}) @@ -479,11 +487,13 @@ func main() { for _, singleTorrent := range tclient.Torrents() { for _, singleSelection := range torrentHashes { if singleTorrent.InfoHash().String() == singleSelection { + fmt.Println("Hash", singleTorrent.InfoHash().String(), "otherhash", singleSelection) Logger.WithFields(logrus.Fields{"selection": singleSelection}).Info("Matched for stopping torrents") oldTorrentInfo := Storage.FetchTorrentFromStorage(db, singleTorrent.InfoHash().String()) - oldTorrentInfo.TorrentStatus = "Stopped" - oldTorrentInfo.MaxConnections = 0 - Storage.UpdateStorageTick(db, oldTorrentInfo) //Updating the torrent status + Engine.StopTorrent(singleTorrent, &oldTorrentInfo, db) + fmt.Println("Getting ready to write to storage", oldTorrentInfo.Hash, oldTorrentInfo.TorrentStatus) + //Storage.UpdateStorageTick(db, oldTorrentInfo) //Updating the torrent status + continue } } } @@ -497,6 +507,20 @@ func main() { for _, singleSelection := range torrentHashes { if singleTorrent.InfoHash().String() == singleSelection { oldTorrentInfo := Storage.FetchTorrentFromStorage(db, singleTorrent.InfoHash().String()) + torrentQueues = Storage.FetchQueues(db) + + for index, activeTorrentHash := range torrentQueues.ActiveTorrents { //If torrent is in the active slice, pull it + if singleTorrent.InfoHash().String() == activeTorrentHash { + singleTorrent.SetMaxEstablishedConns(0) + torrentQueues.ActiveTorrents = append(torrentQueues.ActiveTorrents[:index], torrentQueues.ActiveTorrents[index+1:]...) + } + } + for index, queuedTorrentHash := range torrentQueues.QueuedTorrents { //If torrent is in the queued slice, pull it + if singleTorrent.InfoHash().String() == queuedTorrentHash { + torrentQueues.QueuedTorrents = append(torrentQueues.QueuedTorrents[:index], torrentQueues.QueuedTorrents[index+1:]...) + } + } + Logger.WithFields(logrus.Fields{"selection": singleSelection}).Info("Matched for deleting torrents") if withData { oldTorrentInfo.TorrentStatus = "DroppedData" //Will be cleaned up the next engine loop since deleting a torrent mid loop can cause issues @@ -504,6 +528,7 @@ func main() { oldTorrentInfo.TorrentStatus = "Dropped" } Storage.UpdateStorageTick(db, oldTorrentInfo) + Storage.UpdateQueues(db, torrentQueues) } } } @@ -517,33 +542,26 @@ func main() { if singleTorrent.InfoHash().String() == singleSelection { Logger.WithFields(logrus.Fields{"infoHash": singleTorrent.InfoHash().String()}).Info("Found matching torrent to start") oldTorrentInfo := Storage.FetchTorrentFromStorage(db, singleTorrent.InfoHash().String()) - oldTorrentInfo.TorrentStatus = "Running" - oldTorrentInfo.MaxConnections = 80 - singleTorrent.NewReader() //set all of the pieces to download (piece prio is NE to file prio) - oldTorrentInfo.QueuedStatus = "Active" - for _, file := range singleTorrent.Files() { - for _, sentFile := range oldTorrentInfo.TorrentFilePriority { - if file.DisplayPath() == sentFile.TorrentFilePath { - switch sentFile.TorrentFilePriority { - case "High": - file.SetPriority(torrent.PiecePriorityHigh) - case "Normal": - file.SetPriority(torrent.PiecePriorityNormal) - case "Cancel": - file.SetPriority(torrent.PiecePriorityNone) - default: - file.SetPriority(torrent.PiecePriorityNormal) - } - } + Engine.AddTorrentToActive(&oldTorrentInfo, singleTorrent, db) + Logger.WithFields(logrus.Fields{"Torrent": oldTorrentInfo.TorrentName}).Info("Changing database to torrent running with 80 max connections") + //Storage.UpdateStorageTick(db, oldTorrentInfo) //Updating the torrent status + } + torrentQueues = Storage.FetchQueues(db) + if len(torrentQueues.ActiveTorrents) > Config.MaxActiveTorrents { //Since we are starting a new torrent stop the first torrent in the que if running is full + //removeTorrent := torrentQueues.ActiveTorrents[len(torrentQueues.ActiveTorrents)-1] + removeTorrent := torrentQueues.ActiveTorrents[:1] + for _, singleTorrent := range runningTorrents { + if singleTorrent.InfoHash().String() == removeTorrent[0] { + oldTorrentInfo := Storage.FetchTorrentFromStorage(db, singleTorrent.InfoHash().String()) + Engine.RemoveTorrentFromActive(&oldTorrentInfo, singleTorrent, db) + Storage.UpdateStorageTick(db, oldTorrentInfo) } } - Logger.WithFields(logrus.Fields{"Torrent": oldTorrentInfo.TorrentName}).Info("Changing database to torrent running with 80 max connections") - Storage.UpdateStorageTick(db, oldTorrentInfo) //Updating the torrent status } } } - case "forceUploadTorrents": + case "forceUploadTorrents": //TODO allow force to override total limit of queued torrents? torrentHashes := payloadData["TorrentHashes"].([]interface{}) Logger.WithFields(logrus.Fields{"selection": msg.Payload}).Info("Matched for force Uploading Torrents") Engine.CreateServerPushMessage(Engine.ServerPushMessage{MessageType: "serverPushMessage", MessageLevel: "info", Payload: "Received Force Start Request"}, conn) @@ -620,7 +638,7 @@ func main() { } else { err := http.ListenAndServe(httpAddr, nil) //Can't send proxy headers if not used since that can be a security issue if err != nil { - Logger.WithFields(logrus.Fields{"error": err}).Fatal("Unable to listen on the http Server with no proxy headers!") + Logger.WithFields(logrus.Fields{"error": err}).Fatal("Unable to listen on the http Server! (Maybe wrong IP in config, port already in use?) (Config: Not using proxy, see error for more details)") } } } diff --git a/settings/settings.go b/settings/settings.go index 9a0bae92..45b518f6 100644 --- a/settings/settings.go +++ b/settings/settings.go @@ -100,10 +100,8 @@ func calculateRateLimiters(uploadRate, downloadRate string) (*rate.Limiter, *rat uploadRateLimiter = rate.NewLimiter(rate.Inf, 0) return downloadRateLimiter, uploadRateLimiter } - var limitPerSecondUl = rate.Limit(uploadRateLimiterSize) - uploadRateLimiter = rate.NewLimiter(limitPerSecondUl, uploadRateLimiterSize) - var limitPerSecondDl = rate.Limit(uploadRateLimiterSize) - downloadRateLimiter = rate.NewLimiter(limitPerSecondDl, downloadRateLimiterSize) + uploadRateLimiter = rate.NewLimiter(rate.Limit(uploadRateLimiterSize), uploadRateLimiterSize) + downloadRateLimiter = rate.NewLimiter(rate.Limit(downloadRateLimiterSize), downloadRateLimiterSize) return downloadRateLimiter, uploadRateLimiter } diff --git a/storage/storage.go b/storage/storage.go index 97eef499..20b1af93 100644 --- a/storage/storage.go +++ b/storage/storage.go @@ -16,6 +16,13 @@ var Logger *logrus.Logger //Conn is the global websocket connection used to push server notification messages var Conn *websocket.Conn +//TorrentQueues contains the active and queued torrent hashes in slices +type TorrentQueues struct { + ID int `storm:"id,unique"` //storm requires unique ID (will be 5) + ActiveTorrents []string + QueuedTorrents []string +} + //IssuedTokensList contains a slice of all the tokens issues to applications type IssuedTokensList struct { ID int `storm:"id,unique"` //storm requires unique ID (will be 3) to save although there will only be one of these @@ -83,7 +90,6 @@ type TorrentLocal struct { TorrentSize int64 //If we cancel a file change the download size since we won't be downloading that file UploadRatio string TorrentFilePriority []TorrentFilePriority - QueuedStatus string //Either "Queued", "Active", or "None" } //SaveConfig saves the config to the database to compare for changes to settings.toml on restart @@ -95,6 +101,26 @@ func SaveConfig(torrentStorage *storm.DB, config Settings.FullClientSettings) { } } +//UpdateQueues Saves the slice of hashes that contain the active Torrents +func UpdateQueues(db *storm.DB, torrentQueues TorrentQueues) { + torrentQueues.ID = 5 + err := db.Save(&torrentQueues) + if err != nil { + Logger.WithFields(logrus.Fields{"database": db, "error": err}).Error("Unable to write Queues to database!") + } +} + +//FetchQueues fetches the activetorrent and queuedtorrent slices from the database +func FetchQueues(db *storm.DB) TorrentQueues { + torrentQueues := TorrentQueues{} + err := db.One("ID", 5, &torrentQueues) + if err != nil { + Logger.WithFields(logrus.Fields{"database": db, "error": err}).Error("Unable to read Database into torrentQueues!") + return torrentQueues + } + return torrentQueues +} + //FetchConfig fetches the client config from the database func FetchConfig(torrentStorage *storm.DB) (Settings.FullClientSettings, error) { config := Settings.FullClientSettings{} @@ -167,6 +193,8 @@ func UpdateStorageTick(torrentStorage *storm.DB, torrentLocal TorrentLocal) { err := torrentStorage.Update(&torrentLocal) if err != nil { Logger.WithFields(logrus.Fields{"UpdateContents": torrentLocal, "error": err}).Error("Error performing tick update to database!") + } else { + Logger.WithFields(logrus.Fields{"UpdateContents": torrentLocal, "error": err}).Debug("Performed Update to database!") } }