commit f9e8882e6a5f369fa333916f17eb5c6ca9560df8
parent b6db4dacbb21a7251040b384ae08da5875d50e59
Author: n0tr1v <n0tr1v@protonmail.com>
Date: Tue, 31 Jan 2023 01:14:56 -0800
use logrus & add some delay in between threads starts
Diffstat:
1 file changed, 36 insertions(+), 30 deletions(-)
diff --git a/cmd/dkfupload/main.go b/cmd/dkfupload/main.go
@@ -8,9 +8,9 @@ import (
"flag"
"fmt"
"github.com/dustin/go-humanize"
+ "github.com/sirupsen/logrus"
"golang.org/x/net/proxy"
"io"
- "log"
"math"
"math/rand"
"mime/multipart"
@@ -36,6 +36,11 @@ const (
var chunksCompleted int64 // atomic
func main() {
+ customFormatter := new(logrus.TextFormatter)
+ customFormatter.TimestampFormat = "2006-01-02 15:04:05"
+ customFormatter.FullTimestamp = true
+ logrus.SetFormatter(customFormatter)
+
var nbThreads int
var filedropUUID string
var fileName string
@@ -48,7 +53,7 @@ func main() {
nbThreadsUsage := "nb threads"
nbThreadsDefaultValue := 20
chunkSizeUsage := "chunk size"
- chunkSizeDefaultValue := int64(5 << 20) // 5MB
+ chunkSizeDefaultValue := int64(2 << 20) // 2MB
flag.DurationVar(&httpTimeout, "http-timeout", 5*time.Minute, "http timeout")
flag.StringVar(&filedropUUID, "filedrop-uuid", "", filedropUUIDUsage)
flag.StringVar(&filedropUUID, "u", "", filedropUUIDUsage)
@@ -66,17 +71,17 @@ func main() {
f, err := os.Open(fileName)
if err != nil {
- log.Fatalln(err.Error())
+ logrus.Fatal(err.Error())
}
fs, err := f.Stat()
if err != nil {
- log.Fatalln(err.Error())
+ logrus.Fatal(err.Error())
}
// Calculate sha256 of file
h := sha256.New()
if _, err := io.Copy(h, f); err != nil {
- log.Fatalln(err)
+ logrus.Fatalln(err)
}
fileSha256 := hex.EncodeToString(h.Sum(nil))
@@ -85,25 +90,25 @@ func main() {
// Print out information about the file
{
- log.Printf("filedrop UUID: %s\n", filedropUUID)
- log.Printf(" file: %s\n", fs.Name())
- log.Printf(" sha256: %s\n", fileSha256)
- log.Printf(" file size: %s (%s)\n", humanize.Bytes(uint64(fileSize)), humanize.Comma(fileSize))
- log.Printf(" chunks size: %s (%s)\n", humanize.Bytes(uint64(maxChunkSize)), humanize.Comma(maxChunkSize))
- log.Printf(" nb chunks: %d\n", nbChunks)
- log.Printf(" nb threads: %d\n", nbThreads)
- log.Printf(" http timeout: %s\n", ShortDur(httpTimeout))
+ logrus.Infof("filedrop UUID: %s\n", filedropUUID)
+ logrus.Infof(" file: %s\n", fs.Name())
+ logrus.Infof(" sha256: %s\n", fileSha256)
+ logrus.Infof(" file size: %s (%s)\n", humanize.Bytes(uint64(fileSize)), humanize.Comma(fileSize))
+ logrus.Infof(" chunks size: %s (%s)\n", humanize.Bytes(uint64(maxChunkSize)), humanize.Comma(maxChunkSize))
+ logrus.Infof(" nb chunks: %d\n", nbChunks)
+ logrus.Infof(" nb threads: %d\n", nbThreads)
+ logrus.Infof(" http timeout: %s\n", ShortDur(httpTimeout))
if dry {
- log.Printf(" dry run: %t\n", dry)
+ logrus.Infof(" dry run: %t\n", dry)
}
- log.Println(strings.Repeat("-", 80))
+ logrus.Infof(strings.Repeat("-", 80))
}
start := time.Now()
// Init the filedrop and send metadata about the file
if !dry {
- log.Println("sending metadata")
+ logrus.Debug("sending metadata")
client := doGetClient(isLocal, httpTimeout)
body := url.Values{}
body.Set("fileName", fs.Name())
@@ -116,24 +121,18 @@ func main() {
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
resp, err := client.Do(req)
if err != nil {
- log.Fatalln(err)
+ logrus.Fatal(err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
- log.Fatalln(fmt.Errorf("invalid status code %s", resp.Status))
+ logrus.Fatal(fmt.Errorf("invalid status code %s", resp.Status))
}
- log.Println("done sending metadata")
+ logrus.Debug("done sending metadata")
}
chunksCh := make(chan int64)
wg := &sync.WaitGroup{}
- // Start worker threads
- wg.Add(nbThreads)
- for i := 0; i < nbThreads; i++ {
- go work(i, wg, chunksCh, isLocal, dry, maxChunkSize, nbChunks, f, baseUrl, filedropUUID, httpTimeout)
- }
-
// Provide worker threads with tasks to do
go func() {
for chunkNum := int64(0); chunkNum < nbChunks; chunkNum++ {
@@ -143,9 +142,16 @@ func main() {
close(chunksCh)
}()
+ // Start worker threads
+ wg.Add(nbThreads)
+ for i := 0; i < nbThreads; i++ {
+ go work(i, wg, chunksCh, isLocal, dry, maxChunkSize, nbChunks, f, baseUrl, filedropUUID, httpTimeout)
+ time.Sleep(500 * time.Millisecond)
+ }
+
// Wait for all workers to have completed
wg.Wait()
- log.Printf("All done in %s\n", ShortDur(time.Since(start)))
+ logrus.Infof("All done in %s\n", ShortDur(time.Since(start)))
}
func work(i int, wg *sync.WaitGroup, chunksCh chan int64, isLocal, dry bool, maxChunkSize, nbChunks int64, f *os.File, baseUrl, filedropUUID string, httpTimeout time.Duration) {
@@ -156,7 +162,7 @@ func work(i int, wg *sync.WaitGroup, chunksCh chan int64, isLocal, dry bool, max
start := time.Now()
offset := chunkNum * maxChunkSize
n, _ := f.ReadAt(buf, offset)
- log.Printf("Thread #%03d | chunk #%03d | read %d | from %d to %d\n", i, chunkNum, n, offset, offset+int64(n))
+ logrus.Infof("Thread #%03d | chunk #%03d | read %d | from %d to %d\n", i, chunkNum, n, offset, offset+int64(n))
if !dry {
hasToSucceed(func() error {
body := new(bytes.Buffer)
@@ -179,7 +185,7 @@ func work(i int, wg *sync.WaitGroup, chunksCh chan int64, isLocal, dry bool, max
resp, err := client.Do(req)
if err != nil {
if os.IsTimeout(err) {
- log.Printf("Thread #%03d gets a new client\n", i)
+ logrus.Infof("Thread #%03d gets a new client\n", i)
client = doGetClient(isLocal, httpTimeout)
}
return err
@@ -192,7 +198,7 @@ func work(i int, wg *sync.WaitGroup, chunksCh chan int64, isLocal, dry bool, max
})
}
newChunksCompleted := atomic.AddInt64(&chunksCompleted, 1)
- log.Printf("Thread #%03d | chunk #%03d | completed in %s (%d/%d)\n", i, chunkNum, ShortDur(time.Since(start)), newChunksCompleted, nbChunks)
+ logrus.Infof("Thread #%03d | chunk #%03d | completed in %s (%d/%d)\n", i, chunkNum, ShortDur(time.Since(start)), newChunksCompleted, nbChunks)
}
wg.Done()
}
@@ -223,7 +229,7 @@ func hasToSucceed(clb func() error) {
if err == nil {
break
}
- log.Printf("wait %d seconds before retry; %v\n", waitTime, err)
+ logrus.Errorf("wait %d seconds before retry; %v\n", waitTime, err)
time.Sleep(time.Duration(waitTime) * time.Second)
}
}