Engine rewrite about 80% done, but a ton of bugs and a few new features to add, almost no testing done

This commit is contained in:
2018-05-17 13:52:47 -04:00
parent 4909429390
commit 35a5ac37eb
9 changed files with 275 additions and 115 deletions

1
.gitignore vendored
View File

@@ -2,6 +2,7 @@ downloads/
downloading/
downloaded/
uploadedTorrents/
boltBrowser/
storage.db.lock
storage.db
storage.db.old

View File

@@ -13,10 +13,10 @@
#Limits your upload and download speed globally, all are averages and not burst protected (usually burst on start).
#Low = ~.05MB/s, Medium = ~.5MB/s, High = ~1.5MB/s
UploadRateLimit = "Medium" #Options are "Low", "Medium", "High", "Unlimited" #Unlimited is default
DownloadRateLimit = "Medium"
UploadRateLimit = "Unlimited" #Options are "Low", "Medium", "High", "Unlimited" #Unlimited is default
DownloadRateLimit = "Unlimited"
#Maximum number of allowed active torrents, the rest will be queued
MaxActiveTorrents = 1
MaxActiveTorrents = 2
[goTorrentWebUI]
#Basic goTorrentWebUI authentication (not terribly secure, implemented in JS, password is hashed to SHA256, not salted, basically don't depend on this if you require very good security)

View File

@@ -22,7 +22,7 @@ func InitializeCronEngine() *cron.Cron {
}
//CheckTorrentWatchFolder adds torrents from a watch folder //TODO see if you can use filepath.Abs instead of changing directory
func CheckTorrentWatchFolder(c *cron.Cron, db *storm.DB, tclient *torrent.Client, torrentLocalStorage Storage.TorrentLocal, config Settings.FullClientSettings, activeTorrents []string, queuedTorrents []string) {
func CheckTorrentWatchFolder(c *cron.Cron, db *storm.DB, tclient *torrent.Client, torrentLocalStorage Storage.TorrentLocal, config Settings.FullClientSettings, torrentQueues Storage.TorrentQueues) {
c.AddFunc("@every 5m", func() {
Logger.WithFields(logrus.Fields{"Watch Folder": config.TorrentWatchFolder}).Info("Running the watch folder cron job")
torrentFiles, err := ioutil.ReadDir(config.TorrentWatchFolder)
@@ -50,7 +50,7 @@ func CheckTorrentWatchFolder(c *cron.Cron, db *storm.DB, tclient *torrent.Client
os.Remove(fullFilePathAbs) //delete the torrent after adding it and copying it over
Logger.WithFields(logrus.Fields{"Source Folder": fullFilePathAbs, "Destination Folder": fullNewFilePathAbs, "Torrent": file.Name()}).Info("Added torrent from watch folder, and moved torrent file")
AddTorrent(clientTorrent, torrentLocalStorage, db, "file", fullNewFilePathAbs, config.DefaultMoveFolder, "default", config, activeTorrents, queuedTorrents)
AddTorrent(clientTorrent, torrentLocalStorage, db, "file", fullNewFilePathAbs, config.DefaultMoveFolder, "default", config)
}
}
@@ -58,7 +58,7 @@ func CheckTorrentWatchFolder(c *cron.Cron, db *storm.DB, tclient *torrent.Client
}
//RefreshRSSCron refreshes all of the RSS feeds on an hourly basis
func RefreshRSSCron(c *cron.Cron, db *storm.DB, tclient *torrent.Client, torrentLocalStorage Storage.TorrentLocal, config Settings.FullClientSettings, activeTorrents []string, queuedTorrents []string) {
func RefreshRSSCron(c *cron.Cron, db *storm.DB, tclient *torrent.Client, torrentLocalStorage Storage.TorrentLocal, config Settings.FullClientSettings, torrentQueues Storage.TorrentQueues) {
c.AddFunc("@hourly", func() {
torrentHashHistory := Storage.FetchHashHistory(db)
RSSFeedStore := Storage.FetchRSSFeeds(db)
@@ -86,7 +86,7 @@ func RefreshRSSCron(c *cron.Cron, db *storm.DB, tclient *torrent.Client, torrent
Logger.WithFields(logrus.Fields{"err": err, "Torrent": RSSTorrent.Title}).Warn("Unable to add torrent to torrent client!")
break //break out of the loop entirely for this message since we hit an error
}
AddTorrent(clientTorrent, torrentLocalStorage, db, "magnet", "", config.DefaultMoveFolder, "RSS", config, activeTorrents, queuedTorrents) //TODO let user specify torrent default storage location and let change on fly
AddTorrent(clientTorrent, torrentLocalStorage, db, "magnet", "", config.DefaultMoveFolder, "RSS", config) //TODO let user specify torrent default storage location and let change on fly
singleFeed.Torrents = append(singleFeed.Torrents, singleRSSTorrent)
}

View File

@@ -68,7 +68,6 @@ func MoveAndLeaveSymlink(config Settings.FullClientSettings, tHash string, db *s
Logger.WithFields(logrus.Fields{"Old File Path": oldFilePath, "New File Path": newFilePath, "error": err}).Error("Error Copying Folder!")
return err
}
//os.Chmod(newFilePath, 0777)
err = filepath.Walk(newFilePath, func(path string, info os.FileInfo, err error) error { //Walking the file path to change the permissions
if err != nil {
Logger.WithFields(logrus.Fields{"file": path, "error": err}).Error("Potentially non-critical error, continuing..")

View File

@@ -130,19 +130,19 @@ func readTorrentFileFromDB(element *Storage.TorrentLocal, tclient *torrent.Clien
}
//StartTorrents attempts to start torrents by adding them to the active torrents, then the queue, and by increasing their connections (if they were stopped)
func StartTorrents(clientTorrent *torrent.Client, torrentHashes []string, activeTorrents []string, queuedTorrents []string) {
func StartTorrents(clientTorrent *torrent.Client, torrentHashes []string, torrentQueues Storage.TorrentQueues) {
}
//AddTorrent creates the storage.db entry and starts A NEW TORRENT and adds to the running torrent array
func AddTorrent(clientTorrent *torrent.Torrent, torrentLocalStorage Storage.TorrentLocal, torrentDbStorage *storm.DB, torrentType, torrentFilePathAbs, torrentStoragePath, labelValue string, config Settings.FullClientSettings, activeTorrents []string, queuedTorrents []string) {
func AddTorrent(clientTorrent *torrent.Torrent, torrentLocalStorage Storage.TorrentLocal, db *storm.DB, torrentType, torrentFilePathAbs, torrentStoragePath, labelValue string, config Settings.FullClientSettings) {
timedOut := timeOutInfo(clientTorrent, 45) //seeing if adding the torrent times out (giving 45 seconds)
if timedOut { //if we fail to add the torrent return
return
}
var TempHash metainfo.Hash
TempHash = clientTorrent.InfoHash()
allStoredTorrents := Storage.FetchAllStoredTorrents(torrentDbStorage)
allStoredTorrents := Storage.FetchAllStoredTorrents(db)
for _, runningTorrentHashes := range allStoredTorrents {
if runningTorrentHashes.Hash == TempHash.String() {
Logger.WithFields(logrus.Fields{"Hash": TempHash.String()}).Info("Torrent has duplicate hash to already running torrent... will not add to storage")
@@ -180,27 +180,23 @@ func AddTorrent(clientTorrent *torrent.Torrent, torrentLocalStorage Storage.Torr
TorrentFilePriorityArray = append(TorrentFilePriorityArray, torrentFilePriority)
}
torrentLocalStorage.TorrentFilePriority = TorrentFilePriorityArray
if len(activeTorrents) < config.MaxActiveTorrents {
if len(queuedTorrents) > 0 {
queuedTorrents = append(queuedTorrents, clientTorrent.InfoHash().String())
CreateServerPushMessage(ServerPushMessage{MessageType: "serverPushMessage", MessageLevel: "success", Payload: "Torrent queued!"}, Conn)
torrentLocalStorage.QueuedStatus = "Queued"
} else {
clientTorrent.NewReader()
activeTorrents = append(activeTorrents, clientTorrent.InfoHash().String())
CreateServerPushMessage(ServerPushMessage{MessageType: "serverPushMessage", MessageLevel: "success", Payload: "Torrent added!"}, Conn)
torrentLocalStorage.QueuedStatus = "Active"
}
torrentQueues := Storage.FetchQueues(db)
if len(torrentQueues.ActiveTorrents) < Config.MaxActiveTorrents {
AddTorrentToActive(&torrentLocalStorage, clientTorrent, db)
fmt.Println("Adding New torrent to active! ", clientTorrent.Name())
} else {
AddTorrentToQueue(&torrentLocalStorage, clientTorrent, db)
fmt.Println("Adding New torrent to queued! ", clientTorrent.Name())
}
Storage.AddTorrentLocalStorage(torrentDbStorage, torrentLocalStorage) //writing all of the data to the database
Storage.AddTorrentLocalStorage(db, torrentLocalStorage) //writing all of the data to the database
}
//CreateInitialTorrentArray adds all the torrents on program start from the database
func CreateInitialTorrentArray(tclient *torrent.Client, TorrentLocalArray []*Storage.TorrentLocal, db *storm.DB, config Settings.FullClientSettings, activeTorrents []string, queuedTorrents []string) {
func CreateInitialTorrentArray(tclient *torrent.Client, TorrentLocalArray []*Storage.TorrentLocal, db *storm.DB, config Settings.FullClientSettings) {
for _, singleTorrentFromStorage := range TorrentLocalArray {
torrentQueues := Storage.FetchQueues(db)
fmt.Println("Stored Queues From DB.............................", torrentQueues)
var singleTorrent *torrent.Torrent
var err error
if singleTorrentFromStorage.TorrentType == "file" { //if it is a file pull it from the uploaded torrent folder
@@ -229,45 +225,46 @@ func CreateInitialTorrentArray(tclient *torrent.Client, TorrentLocalArray []*Sto
if err != nil {
Logger.WithFields(logrus.Fields{"torrentFile": singleTorrent.Name(), "error": err}).Error("Unable to add infobytes to the torrent!")
}
if singleTorrentFromStorage.QueuedStatus == "Active" {
singleTorrent.NewReader()
activeTorrents = append(activeTorrents, singleTorrent.InfoHash().String())
} else if singleTorrentFromStorage.QueuedStatus == "Queued" {
queuedTorrents = append(queuedTorrents, singleTorrent.InfoHash().String())
}
if singleTorrentFromStorage.TorrentStatus != "Completed" && singleTorrentFromStorage.TorrentStatus != "Stopped" {
fmt.Println("checking about queueing torrent")
if len(activeTorrents) < config.MaxActiveTorrents {
fmt.Println("ActiveTOrrents", activeTorrents)
singleTorrent.NewReader()
singleTorrentFromStorage.QueuedStatus = "Active"
activeTorrents = append(activeTorrents, singleTorrent.InfoHash().String()) //adding the torrent hash to the queue
fmt.Println("Creating initial FOR: ", singleTorrentFromStorage.Hash, singleTorrentFromStorage.TorrentName)
if len(torrentQueues.ActiveTorrents) < Config.MaxActiveTorrents && singleTorrentFromStorage.TorrentStatus != "Stopped" {
if singleTorrentFromStorage.TorrentStatus == "Completed" || singleTorrentFromStorage.TorrentStatus == "Seeding" {
fmt.Println("Completed Torrents have lower prio.. adding to Queued ", singleTorrent.Name())
AddTorrentToQueue(singleTorrentFromStorage, singleTorrent, db)
} else {
queuedTorrents = append(queuedTorrents, singleTorrent.InfoHash().String())
fmt.Println("Queuing torrent")
singleTorrentFromStorage.QueuedStatus = "Queued"
fmt.Println("adding torrent to active NOW ", singleTorrent.Name())
AddTorrentToActive(singleTorrentFromStorage, singleTorrent, db)
}
} else {
fmt.Println("adding torrent to queued NOW ", singleTorrent.Name())
AddTorrentToQueue(singleTorrentFromStorage, singleTorrent, db)
}
//Storage.UpdateQueues(db, torrentQueues)
Storage.UpdateStorageTick(db, *singleTorrentFromStorage)
}
if len(activeTorrents) < config.MaxActiveTorrents && len(queuedTorrents) > 0 { //after all the torrents are added, see if out active torrent list isn't full, then add from the queue
fmt.Println("adding torrents from queue (if any in there)")
maxCanSend := config.MaxActiveTorrents - len(activeTorrents)
torrentQueues := Storage.FetchQueues(db)
if len(torrentQueues.ActiveTorrents) < config.MaxActiveTorrents && len(torrentQueues.QueuedTorrents) > 0 { //after all the torrents are added, see if out active torrent list isn't full, then add from the queue
fmt.Println("adding torrents from queue (if any in there)", "MaX: ", config.MaxActiveTorrents, "Current: ", torrentQueues.ActiveTorrents)
maxCanSend := config.MaxActiveTorrents - len(torrentQueues.ActiveTorrents)
torrentsToStart := make([]string, maxCanSend)
for i, torrentHash := range queuedTorrents {
for i, torrentHash := range torrentQueues.QueuedTorrents {
torrentsToStart[i] = torrentHash
for _, singleTorrent := range tclient.Torrents() {
if singleTorrent.InfoHash().String() == torrentHash {
singleTorrentFromStorage := Storage.FetchTorrentFromStorage(db, torrentHash)
AddTorrentToActive(&singleTorrentFromStorage, singleTorrent, db)
}
}
}
StartTorrents(tclient, torrentsToStart, activeTorrents, queuedTorrents)
}
SetFilePriority(tclient, db) //Setting the desired file priority from storage
fmt.Println("Initial Setup queues", torrentQueues)
}
//CreateRunningTorrentArray creates the entire torrent list to pass to client
func CreateRunningTorrentArray(tclient *torrent.Client, TorrentLocalArray []*Storage.TorrentLocal, PreviousTorrentArray []ClientDB, config Settings.FullClientSettings, db *storm.DB, activeTorrents []string, queuedTorrents []string) (RunningTorrentArray []ClientDB) {
func CreateRunningTorrentArray(tclient *torrent.Client, TorrentLocalArray []*Storage.TorrentLocal, PreviousTorrentArray []ClientDB, config Settings.FullClientSettings, db *storm.DB) (RunningTorrentArray []ClientDB) {
torrentQueues := Storage.FetchQueues(db)
fmt.Println("torrentQueues", torrentQueues)
for _, singleTorrentFromStorage := range TorrentLocalArray {
var singleTorrent *torrent.Torrent
var TempHash metainfo.Hash
@@ -278,11 +275,12 @@ func CreateRunningTorrentArray(tclient *torrent.Client, TorrentLocalArray []*Sto
}
tickUpdateStruct := Storage.TorrentLocal{} //we are shoving the tick updates into a torrentlocal struct to pass to storage happens at the end of the routine
fullClientDB := new(ClientDB)
//Handling deleted torrents here
if singleTorrentFromStorage.TorrentStatus == "Dropped" {
Logger.WithFields(logrus.Fields{"selection": singleTorrentFromStorage.TorrentName}).Info("Deleting just the torrent")
singleTorrent.Drop()
Storage.DelTorrentLocalStorage(db, singleTorrentFromStorage.Hash)
}
if singleTorrentFromStorage.TorrentStatus == "DroppedData" {
Logger.WithFields(logrus.Fields{"selection": singleTorrentFromStorage.TorrentName}).Info("Deleting torrent and data")
@@ -351,21 +349,18 @@ func CreateRunningTorrentArray(tclient *torrent.Client, TorrentLocalArray []*Sto
}
CalculateTorrentETA(singleTorrentFromStorage.TorrentSize, calculatedCompletedSize, fullClientDB) //needs to be here since we need the speed calculated before we can estimate the eta.
if (len(activeTorrents) < config.MaxActiveTorrents) && (len(queuedTorrents) > 0) { //If there is room for another torrent in active torrents, add it.
newTorrentHash := queuedTorrents[0]
fmt.Println("Moving Torrent to active queue")
if (len(torrentQueues.ActiveTorrents) < config.MaxActiveTorrents) && (len(torrentQueues.QueuedTorrents) > 0) { //If there is room for another torrent in active torrents, add it.
newTorrentHash := torrentQueues.QueuedTorrents[0]
for _, torrent := range tclient.Torrents() {
if newTorrentHash == torrent.InfoHash().String() {
torrent.NewReader()
activeTorrents = append(activeTorrents, newTorrentHash)
singleTorrentFromStorage.QueuedStatus = "Active"
AddTorrentToActive(singleTorrentFromStorage, singleTorrent, db)
}
}
}
fullClientDB.TotalUploadedSize = HumanizeBytes(float32(fullClientDB.TotalUploadedBytes))
fullClientDB.UploadRatio = CalculateUploadRatio(singleTorrent, fullClientDB) //calculate the upload ratio
CalculateTorrentStatus(singleTorrent, fullClientDB, config, singleTorrentFromStorage, calculatedCompletedSize, calculatedTotalSize, activeTorrents, queuedTorrents) //add torrents to the queue, remove from queue, etc
CalculateTorrentStatus(singleTorrent, fullClientDB, config, singleTorrentFromStorage, calculatedCompletedSize, calculatedTotalSize, torrentQueues, db) //add torrents to the queue, remove from queue, etc
tickUpdateStruct.UploadRatio = fullClientDB.UploadRatio
tickUpdateStruct.TorrentSize = calculatedTotalSize

View File

@@ -184,26 +184,147 @@ func CalculateUploadRatio(t *torrent.Torrent, c *ClientDB) string {
return uploadRatio
}
//CalculateTorrentStatus is used to determine what the STATUS column of the frontend will display ll2
func CalculateTorrentStatus(t *torrent.Torrent, c *ClientDB, config Settings.FullClientSettings, tFromStorage *storage.TorrentLocal, bytesCompleted int64, totalSize int64, activeTorrents []string, queuedTorrents []string) {
if (tFromStorage.TorrentStatus == "Stopped") || (float64(c.TotalUploadedBytes)/float64(bytesCompleted) >= config.SeedRatioStop && tFromStorage.TorrentUploadLimit == true) { //If storage shows torrent stopped or if it is over the seeding ratio AND is under the global limit
c.Status = "Stopped"
c.MaxConnections = 0
t.SetMaxEstablishedConns(0)
for i, hash := range activeTorrents { //If the torrent is stopped, pull it from the active torrent array
if tFromStorage.Hash == hash {
activeTorrents = append(activeTorrents[:i], activeTorrents[i+1:]...)
//StopTorrent stops the torrent, updates the database and sends a message. Since stoptorrent is called by each loop (individually) no need to call an array
func StopTorrent(singleTorrent *torrent.Torrent, torrentLocalStorage *Storage.TorrentLocal, db *storm.DB) {
torrentQueues := Storage.FetchQueues(db)
if torrentLocalStorage.TorrentStatus == "Stopped" { //if we are already stopped
fmt.Println("Already stopped, returning....")
return
}
torrentLocalStorage.TorrentStatus = "Stopped"
torrentLocalStorage.MaxConnections = 0
singleTorrent.SetMaxEstablishedConns(0)
fmt.Println("Getting ready to stop....!!!!!!!!")
for _, torrentHash := range torrentQueues.ActiveTorrents { //pulling it out of activetorrents
if torrentHash == singleTorrent.InfoHash().String() {
DeleteTorrentFromQueues(singleTorrent.InfoHash().String(), db)
}
}
fmt.Println("LOCALSTORAGE", *torrentLocalStorage, torrentLocalStorage)
Storage.UpdateStorageTick(db, *torrentLocalStorage)
CreateServerPushMessage(ServerPushMessage{MessageType: "serverPushMessage", MessageLevel: "success", Payload: "Torrent Stopped!"}, Conn)
return
}
//AddTorrentToActive adds a torrent to the active slice
func AddTorrentToActive(torrentLocalStorage *Storage.TorrentLocal, singleTorrent *torrent.Torrent, db *storm.DB) {
torrentQueues := Storage.FetchQueues(db)
for _, torrentHash := range torrentQueues.ActiveTorrents {
if torrentHash == singleTorrent.InfoHash().String() { //If torrent already in active skip
return
}
}
for index, queuedTorrentHash := range torrentQueues.QueuedTorrents { //Removing from the queued torrents if in queued torrents
if queuedTorrentHash == singleTorrent.InfoHash().String() {
torrentQueues.QueuedTorrents = append(torrentQueues.QueuedTorrents[:index], torrentQueues.QueuedTorrents[index+1:]...)
}
}
singleTorrent.NewReader()
singleTorrent.SetMaxEstablishedConns(80)
torrentQueues.ActiveTorrents = append(torrentQueues.ActiveTorrents, singleTorrent.InfoHash().String())
torrentLocalStorage.TorrentStatus = "Running"
torrentLocalStorage.MaxConnections = 80
Logger.WithFields(logrus.Fields{"torrentName": singleTorrent.Name()}).Info("Moving torrent to active, active slice contains ", len(torrentQueues.ActiveTorrents), " torrents: ", torrentQueues.ActiveTorrents)
for _, file := range singleTorrent.Files() {
for _, sentFile := range torrentLocalStorage.TorrentFilePriority {
if file.DisplayPath() == sentFile.TorrentFilePath {
switch sentFile.TorrentFilePriority {
case "High":
file.SetPriority(torrent.PiecePriorityHigh)
case "Normal":
file.SetPriority(torrent.PiecePriorityNormal)
case "Cancel":
file.SetPriority(torrent.PiecePriorityNone)
default:
file.SetPriority(torrent.PiecePriorityNormal)
}
}
}
}
fmt.Println("Updating Queues from Add To active....", torrentQueues)
Storage.UpdateQueues(db, torrentQueues)
}
//RemoveTorrentFromActive forces a torrent to be removed from the active list if the max limit is already there and user forces a new torrent to be added
func RemoveTorrentFromActive(torrentLocalStorage *Storage.TorrentLocal, singleTorrent *torrent.Torrent, db *storm.DB) {
torrentQueues := Storage.FetchQueues(db)
for x, torrentHash := range torrentQueues.ActiveTorrents {
if torrentHash == singleTorrent.InfoHash().String() {
torrentQueues.ActiveTorrents = append(torrentQueues.ActiveTorrents[:x], torrentQueues.ActiveTorrents[x+1:]...)
torrentQueues.QueuedTorrents = append(torrentQueues.QueuedTorrents, torrentHash)
torrentLocalStorage.TorrentStatus = "Queued"
torrentLocalStorage.MaxConnections = 0
singleTorrent.SetMaxEstablishedConns(0)
Storage.UpdateQueues(db, torrentQueues)
AddTorrentToQueue(torrentLocalStorage, singleTorrent, db) //Adding the lasttorrent from active to queued
Storage.UpdateStorageTick(db, *torrentLocalStorage)
}
}
}
//DeleteTorrentFromQueues deletes the torrent from all queues (for a stop or delete action)
func DeleteTorrentFromQueues(torrentHash string, db *storm.DB) {
torrentQueues := Storage.FetchQueues(db)
for x, torrentHashActive := range torrentQueues.ActiveTorrents {
if torrentHash == torrentHashActive {
torrentQueues.ActiveTorrents = append(torrentQueues.ActiveTorrents[:x], torrentQueues.ActiveTorrents[x+1:]...)
Storage.UpdateQueues(db, torrentQueues)
} else {
for x, torrentHashQueued := range torrentQueues.QueuedTorrents {
if torrentHash == torrentHashQueued {
torrentQueues.ActiveTorrents = append(torrentQueues.QueuedTorrents[:x], torrentQueues.QueuedTorrents[x+1:]...)
Storage.UpdateQueues(db, torrentQueues)
}
}
}
}
}
//AddTorrentToQueue adds a torrent to the queue
func AddTorrentToQueue(torrentLocalStorage *Storage.TorrentLocal, singleTorrent *torrent.Torrent, db *storm.DB) {
torrentQueues := Storage.FetchQueues(db)
for _, torrentHash := range torrentQueues.QueuedTorrents {
if singleTorrent.InfoHash().String() == torrentHash { //don't add duplicate to que but do everything else (TODO, maybe find a better way?)
fmt.Println("TORRENTQUEUES", torrentQueues)
singleTorrent.SetMaxEstablishedConns(0)
torrentLocalStorage.MaxConnections = 0
torrentLocalStorage.TorrentStatus = "Queued"
Logger.WithFields(logrus.Fields{"TorrentName": torrentLocalStorage.TorrentName}).Info("Adding torrent to the queue, not active")
Storage.UpdateStorageTick(db, *torrentLocalStorage)
return
}
}
torrentQueues.QueuedTorrents = append(torrentQueues.QueuedTorrents, singleTorrent.InfoHash().String())
fmt.Println("TORRENTQUEUES", torrentQueues)
singleTorrent.SetMaxEstablishedConns(0)
torrentLocalStorage.MaxConnections = 0
torrentLocalStorage.TorrentStatus = "Queued"
Logger.WithFields(logrus.Fields{"TorrentName": torrentLocalStorage.TorrentName}).Info("Adding torrent to the queue, not active")
Storage.UpdateQueues(db, torrentQueues)
Storage.UpdateStorageTick(db, *torrentLocalStorage)
}
//CalculateTorrentStatus is used to determine what the STATUS column of the frontend will display ll2
func CalculateTorrentStatus(t *torrent.Torrent, c *ClientDB, config Settings.FullClientSettings, tFromStorage *storage.TorrentLocal, bytesCompleted int64, totalSize int64, torrentQueues Storage.TorrentQueues, db *storm.DB) {
if float64(c.TotalUploadedBytes)/float64(bytesCompleted) >= config.SeedRatioStop && tFromStorage.TorrentUploadLimit == true { //If storage shows torrent stopped or if it is over the seeding ratio AND is under the global limit
StopTorrent(t, tFromStorage, db)
} else { //Only has 2 states in storage, stopped or running, so we know it should be running, and the websocket request handled updating the database with connections and status
for _, torrentHash := range queuedTorrents {
for _, torrentHash := range torrentQueues.QueuedTorrents {
if tFromStorage.Hash == torrentHash {
fmt.Println("Setting torrent to queued")
//AddTorrentToQueue(tFromStorage, t, db)
//Logger.WithFields(logrus.Fields{"TorrentName": tFromStorage.TorrentName, "connections": tFromStorage.MaxConnections}).Info("Torrent is queued, skipping")
//t.SetMaxEstablishedConns(0)
c.Status = "Queued"
return
}
}
if len(torrentQueues.ActiveTorrents) < config.MaxActiveTorrents && tFromStorage.TorrentStatus == "Queued" {
fmt.Println("HERE..............ADDDING TO ACTIVE", t.Name())
AddTorrentToActive(tFromStorage, t, db)
c.Status = "Downloading"
}
bytesMissing := totalSize - bytesCompleted
c.MaxConnections = 80
t.SetMaxEstablishedConns(80)

96
main.go
View File

@@ -94,8 +94,7 @@ func main() {
Engine.Logger = Logger //Injecting the logger into all the packages
Storage.Logger = Logger
Settings.Logger = Logger
var activeTorrents []string
var queuedTorrents []string
var torrentQueues = Storage.TorrentQueues{}
Config := Settings.FullClientSettingsNew() //grabbing from settings.go
Engine.Config = Config
if Config.LoggingOutput == "file" {
@@ -131,20 +130,27 @@ func main() {
if err != nil {
Logger.WithFields(logrus.Fields{"error": err}).Fatalf("Error creating torrent client: %s")
}
fmt.Printf("%+v\n", Config.TorrentConfig)
//fmt.Printf("%+v\n", Config.TorrentConfig)
db, err := storm.Open("storage.db") //initializing the boltDB store that contains all the added torrents
if err != nil {
Logger.WithFields(logrus.Fields{"error": err}).Fatal("Error opening/creating storage.db")
} else {
Logger.WithFields(logrus.Fields{"error": err}).Info("Opening or creating storage.db...")
}
defer db.Close() //defering closing the database until the program closes
err = db.One("ID", 5, &torrentQueues)
if err != nil { //Create the torrent que database
Logger.WithFields(logrus.Fields{"error": err}).Info("No Queue database found, assuming first run, creating database")
torrentQueues.ID = 5
db.Save(&torrentQueues)
}
tokens := Storage.IssuedTokensList{} //if first run setting up the authentication tokens
var signingKey []byte
err = db.One("ID", 3, &tokens)
if err != nil {
Logger.WithFields(logrus.Fields{"RSSFeedStore": tokens, "error": err}).Info("No Tokens database found, assuming first run, generating token...")
fmt.Println("Error", err)
fmt.Println("MAIN TOKEN: %+v\n", tokens)
tokens.ID = 3 //creating the initial store
claims := Settings.GoTorrentClaims{
"goTorrentWebUI",
@@ -191,12 +197,14 @@ func main() {
TorrentLocalArray := Storage.FetchAllStoredTorrents(db) //pulling in all the already added torrents - this is an array of ALL of the local storage torrents, they will be added back in via hash
if TorrentLocalArray != nil { //the first creation of the running torrent array //since we are adding all of them in we use a coroutine... just allows the web ui to load then it will load in the torrents
go Engine.CreateInitialTorrentArray(tclient, TorrentLocalArray, db, Config, activeTorrents, queuedTorrents) //adding all of the stored torrents into the torrent client
Engine.CreateInitialTorrentArray(tclient, TorrentLocalArray, db, Config) //adding all of the stored torrents into the torrent client
//TODO add GO to this
} else {
Logger.Info("Database is empty, no torrents loaded")
}
Engine.CheckTorrentWatchFolder(cronEngine, db, tclient, torrentLocalStorage, Config, activeTorrents, queuedTorrents) //Every 5 minutes the engine will check the specified folder for new .torrent files
Engine.RefreshRSSCron(cronEngine, db, tclient, torrentLocalStorage, Config, activeTorrents, queuedTorrents) // Refresing the RSS feeds on an hourly basis to add torrents that show up in the RSS feed
fmt.Println("HERE ACTIVE", torrentQueues.ActiveTorrents)
Engine.CheckTorrentWatchFolder(cronEngine, db, tclient, torrentLocalStorage, Config, torrentQueues) //Every 5 minutes the engine will check the specified folder for new .torrent files
Engine.RefreshRSSCron(cronEngine, db, tclient, torrentLocalStorage, Config, torrentQueues) // Refresing the RSS feeds on an hourly basis to add torrents that show up in the RSS feed
router := mux.NewRouter() //setting up the handler for the web backend
router.HandleFunc("/", serveHome) //Serving the main page for our SPA
@@ -204,7 +212,7 @@ func main() {
http.Handle("/", router)
router.HandleFunc("/api", func(w http.ResponseWriter, r *http.Request) { //TODO, remove this
TorrentLocalArray = Storage.FetchAllStoredTorrents(db)
RunningTorrentArray = Engine.CreateRunningTorrentArray(tclient, TorrentLocalArray, PreviousTorrentArray, Config, db, activeTorrents, queuedTorrents) //Updates the RunningTorrentArray with the current client data as well
RunningTorrentArray = Engine.CreateRunningTorrentArray(tclient, TorrentLocalArray, PreviousTorrentArray, Config, db) //Updates the RunningTorrentArray with the current client data as well
var torrentlistArray = new(Engine.TorrentList)
torrentlistArray.MessageType = "torrentList" //setting the type of message
torrentlistArray.ClientDBstruct = RunningTorrentArray //the full JSON that includes the number of torrents as the root
@@ -277,8 +285,8 @@ func main() {
Logger.WithFields(logrus.Fields{"message": msg}).Debug("Client Requested TorrentList Update")
go func() { //running updates in separate thread so can still accept commands
TorrentLocalArray = Storage.FetchAllStoredTorrents(db) //Required to re-read th database since we write to the DB and this will pull the changes from it
RunningTorrentArray = Engine.CreateRunningTorrentArray(tclient, TorrentLocalArray, PreviousTorrentArray, Config, db, activeTorrents, queuedTorrents) //Updates the RunningTorrentArray with the current client data as well
TorrentLocalArray = Storage.FetchAllStoredTorrents(db) //Required to re-read th database since we write to the DB and this will pull the changes from it
RunningTorrentArray = Engine.CreateRunningTorrentArray(tclient, TorrentLocalArray, PreviousTorrentArray, Config, db) //Updates the RunningTorrentArray with the current client data as well
PreviousTorrentArray = RunningTorrentArray
torrentlistArray := Engine.TorrentList{MessageType: "torrentList", ClientDBstruct: RunningTorrentArray, Totaltorrents: len(RunningTorrentArray)}
Logger.WithFields(logrus.Fields{"torrentList": torrentlistArray, "previousTorrentList": PreviousTorrentArray}).Debug("Previous and Current Torrent Lists for sending to client")
@@ -301,7 +309,7 @@ func main() {
Logger.WithFields(logrus.Fields{"message": msg}).Info("Client Requested Torrents by Label")
label := payloadData["Label"].(string)
torrentsByLabel := Storage.FetchTorrentsByLabel(db, label)
RunningTorrentArray = Engine.CreateRunningTorrentArray(tclient, TorrentLocalArray, PreviousTorrentArray, Config, db, activeTorrents, queuedTorrents)
RunningTorrentArray = Engine.CreateRunningTorrentArray(tclient, TorrentLocalArray, PreviousTorrentArray, Config, db)
labelRunningArray := []Engine.ClientDB{}
for _, torrent := range RunningTorrentArray { //Ranging over the running torrents and if the hashes match we have torrents by label
for _, label := range torrentsByLabel {
@@ -424,7 +432,7 @@ func main() {
}
Logger.WithFields(logrus.Fields{"clientTorrent": clientTorrent, "magnetLink": magnetLink}).Info("Adding torrent to client!")
Engine.CreateServerPushMessage(Engine.ServerPushMessage{MessageType: "serverPushMessage", MessageLevel: "info", Payload: "Received MagnetLink"}, conn)
go Engine.AddTorrent(clientTorrent, torrentLocalStorage, db, "magnet", "", storageValue, labelValue, Config, activeTorrents, queuedTorrents) //starting the torrent and creating local DB entry
go Engine.AddTorrent(clientTorrent, torrentLocalStorage, db, "magnet", "", storageValue, labelValue, Config) //starting the torrent and creating local DB entry
}
@@ -471,7 +479,7 @@ func main() {
Engine.CreateServerPushMessage(Engine.ServerPushMessage{MessageType: "serverPushMessage", MessageLevel: "error", Payload: "Unable to add Torrent to torrent server"}, conn)
}
Logger.WithFields(logrus.Fields{"clienttorrent": clientTorrent.Name(), "filename": filePathAbs}).Info("Added torrent")
go Engine.AddTorrent(clientTorrent, torrentLocalStorage, db, "file", filePathAbs, storageValue, labelValue, Config, activeTorrents, queuedTorrents)
go Engine.AddTorrent(clientTorrent, torrentLocalStorage, db, "file", filePathAbs, storageValue, labelValue, Config)
case "stopTorrents":
torrentHashes := payloadData["TorrentHashes"].([]interface{})
@@ -479,11 +487,13 @@ func main() {
for _, singleTorrent := range tclient.Torrents() {
for _, singleSelection := range torrentHashes {
if singleTorrent.InfoHash().String() == singleSelection {
fmt.Println("Hash", singleTorrent.InfoHash().String(), "otherhash", singleSelection)
Logger.WithFields(logrus.Fields{"selection": singleSelection}).Info("Matched for stopping torrents")
oldTorrentInfo := Storage.FetchTorrentFromStorage(db, singleTorrent.InfoHash().String())
oldTorrentInfo.TorrentStatus = "Stopped"
oldTorrentInfo.MaxConnections = 0
Storage.UpdateStorageTick(db, oldTorrentInfo) //Updating the torrent status
Engine.StopTorrent(singleTorrent, &oldTorrentInfo, db)
fmt.Println("Getting ready to write to storage", oldTorrentInfo.Hash, oldTorrentInfo.TorrentStatus)
//Storage.UpdateStorageTick(db, oldTorrentInfo) //Updating the torrent status
continue
}
}
}
@@ -497,6 +507,20 @@ func main() {
for _, singleSelection := range torrentHashes {
if singleTorrent.InfoHash().String() == singleSelection {
oldTorrentInfo := Storage.FetchTorrentFromStorage(db, singleTorrent.InfoHash().String())
torrentQueues = Storage.FetchQueues(db)
for index, activeTorrentHash := range torrentQueues.ActiveTorrents { //If torrent is in the active slice, pull it
if singleTorrent.InfoHash().String() == activeTorrentHash {
singleTorrent.SetMaxEstablishedConns(0)
torrentQueues.ActiveTorrents = append(torrentQueues.ActiveTorrents[:index], torrentQueues.ActiveTorrents[index+1:]...)
}
}
for index, queuedTorrentHash := range torrentQueues.QueuedTorrents { //If torrent is in the queued slice, pull it
if singleTorrent.InfoHash().String() == queuedTorrentHash {
torrentQueues.QueuedTorrents = append(torrentQueues.QueuedTorrents[:index], torrentQueues.QueuedTorrents[index+1:]...)
}
}
Logger.WithFields(logrus.Fields{"selection": singleSelection}).Info("Matched for deleting torrents")
if withData {
oldTorrentInfo.TorrentStatus = "DroppedData" //Will be cleaned up the next engine loop since deleting a torrent mid loop can cause issues
@@ -504,6 +528,7 @@ func main() {
oldTorrentInfo.TorrentStatus = "Dropped"
}
Storage.UpdateStorageTick(db, oldTorrentInfo)
Storage.UpdateQueues(db, torrentQueues)
}
}
}
@@ -517,33 +542,26 @@ func main() {
if singleTorrent.InfoHash().String() == singleSelection {
Logger.WithFields(logrus.Fields{"infoHash": singleTorrent.InfoHash().String()}).Info("Found matching torrent to start")
oldTorrentInfo := Storage.FetchTorrentFromStorage(db, singleTorrent.InfoHash().String())
oldTorrentInfo.TorrentStatus = "Running"
oldTorrentInfo.MaxConnections = 80
singleTorrent.NewReader() //set all of the pieces to download (piece prio is NE to file prio)
oldTorrentInfo.QueuedStatus = "Active"
for _, file := range singleTorrent.Files() {
for _, sentFile := range oldTorrentInfo.TorrentFilePriority {
if file.DisplayPath() == sentFile.TorrentFilePath {
switch sentFile.TorrentFilePriority {
case "High":
file.SetPriority(torrent.PiecePriorityHigh)
case "Normal":
file.SetPriority(torrent.PiecePriorityNormal)
case "Cancel":
file.SetPriority(torrent.PiecePriorityNone)
default:
file.SetPriority(torrent.PiecePriorityNormal)
}
}
Engine.AddTorrentToActive(&oldTorrentInfo, singleTorrent, db)
Logger.WithFields(logrus.Fields{"Torrent": oldTorrentInfo.TorrentName}).Info("Changing database to torrent running with 80 max connections")
//Storage.UpdateStorageTick(db, oldTorrentInfo) //Updating the torrent status
}
torrentQueues = Storage.FetchQueues(db)
if len(torrentQueues.ActiveTorrents) > Config.MaxActiveTorrents { //Since we are starting a new torrent stop the first torrent in the que if running is full
//removeTorrent := torrentQueues.ActiveTorrents[len(torrentQueues.ActiveTorrents)-1]
removeTorrent := torrentQueues.ActiveTorrents[:1]
for _, singleTorrent := range runningTorrents {
if singleTorrent.InfoHash().String() == removeTorrent[0] {
oldTorrentInfo := Storage.FetchTorrentFromStorage(db, singleTorrent.InfoHash().String())
Engine.RemoveTorrentFromActive(&oldTorrentInfo, singleTorrent, db)
Storage.UpdateStorageTick(db, oldTorrentInfo)
}
}
Logger.WithFields(logrus.Fields{"Torrent": oldTorrentInfo.TorrentName}).Info("Changing database to torrent running with 80 max connections")
Storage.UpdateStorageTick(db, oldTorrentInfo) //Updating the torrent status
}
}
}
case "forceUploadTorrents":
case "forceUploadTorrents": //TODO allow force to override total limit of queued torrents?
torrentHashes := payloadData["TorrentHashes"].([]interface{})
Logger.WithFields(logrus.Fields{"selection": msg.Payload}).Info("Matched for force Uploading Torrents")
Engine.CreateServerPushMessage(Engine.ServerPushMessage{MessageType: "serverPushMessage", MessageLevel: "info", Payload: "Received Force Start Request"}, conn)
@@ -620,7 +638,7 @@ func main() {
} else {
err := http.ListenAndServe(httpAddr, nil) //Can't send proxy headers if not used since that can be a security issue
if err != nil {
Logger.WithFields(logrus.Fields{"error": err}).Fatal("Unable to listen on the http Server with no proxy headers!")
Logger.WithFields(logrus.Fields{"error": err}).Fatal("Unable to listen on the http Server! (Maybe wrong IP in config, port already in use?) (Config: Not using proxy, see error for more details)")
}
}
}

View File

@@ -100,10 +100,8 @@ func calculateRateLimiters(uploadRate, downloadRate string) (*rate.Limiter, *rat
uploadRateLimiter = rate.NewLimiter(rate.Inf, 0)
return downloadRateLimiter, uploadRateLimiter
}
var limitPerSecondUl = rate.Limit(uploadRateLimiterSize)
uploadRateLimiter = rate.NewLimiter(limitPerSecondUl, uploadRateLimiterSize)
var limitPerSecondDl = rate.Limit(uploadRateLimiterSize)
downloadRateLimiter = rate.NewLimiter(limitPerSecondDl, downloadRateLimiterSize)
uploadRateLimiter = rate.NewLimiter(rate.Limit(uploadRateLimiterSize), uploadRateLimiterSize)
downloadRateLimiter = rate.NewLimiter(rate.Limit(downloadRateLimiterSize), downloadRateLimiterSize)
return downloadRateLimiter, uploadRateLimiter
}

View File

@@ -16,6 +16,13 @@ var Logger *logrus.Logger
//Conn is the global websocket connection used to push server notification messages
var Conn *websocket.Conn
//TorrentQueues contains the active and queued torrent hashes in slices
type TorrentQueues struct {
ID int `storm:"id,unique"` //storm requires unique ID (will be 5)
ActiveTorrents []string
QueuedTorrents []string
}
//IssuedTokensList contains a slice of all the tokens issues to applications
type IssuedTokensList struct {
ID int `storm:"id,unique"` //storm requires unique ID (will be 3) to save although there will only be one of these
@@ -83,7 +90,6 @@ type TorrentLocal struct {
TorrentSize int64 //If we cancel a file change the download size since we won't be downloading that file
UploadRatio string
TorrentFilePriority []TorrentFilePriority
QueuedStatus string //Either "Queued", "Active", or "None"
}
//SaveConfig saves the config to the database to compare for changes to settings.toml on restart
@@ -95,6 +101,26 @@ func SaveConfig(torrentStorage *storm.DB, config Settings.FullClientSettings) {
}
}
//UpdateQueues Saves the slice of hashes that contain the active Torrents
func UpdateQueues(db *storm.DB, torrentQueues TorrentQueues) {
torrentQueues.ID = 5
err := db.Save(&torrentQueues)
if err != nil {
Logger.WithFields(logrus.Fields{"database": db, "error": err}).Error("Unable to write Queues to database!")
}
}
//FetchQueues fetches the activetorrent and queuedtorrent slices from the database
func FetchQueues(db *storm.DB) TorrentQueues {
torrentQueues := TorrentQueues{}
err := db.One("ID", 5, &torrentQueues)
if err != nil {
Logger.WithFields(logrus.Fields{"database": db, "error": err}).Error("Unable to read Database into torrentQueues!")
return torrentQueues
}
return torrentQueues
}
//FetchConfig fetches the client config from the database
func FetchConfig(torrentStorage *storm.DB) (Settings.FullClientSettings, error) {
config := Settings.FullClientSettings{}
@@ -167,6 +193,8 @@ func UpdateStorageTick(torrentStorage *storm.DB, torrentLocal TorrentLocal) {
err := torrentStorage.Update(&torrentLocal)
if err != nil {
Logger.WithFields(logrus.Fields{"UpdateContents": torrentLocal, "error": err}).Error("Error performing tick update to database!")
} else {
Logger.WithFields(logrus.Fields{"UpdateContents": torrentLocal, "error": err}).Debug("Performed Update to database!")
}
}