commit ce9451789556414d4d0e7e4d943bf8d64351ba55
parent 2dfd6efbbebc8418ea2c97df648fd199a6154830
Author: n0tr1v <n0tr1v@protonmail.com>
Date: Mon, 30 Jan 2023 21:37:09 -0800
cleanup
Diffstat:
| M | cmd/dkfupload/main.go | | | 100 | ++++++++++++++++++++++++++++++++++++++++++------------------------------------- |
1 file changed, 53 insertions(+), 47 deletions(-)
diff --git a/cmd/dkfupload/main.go b/cmd/dkfupload/main.go
@@ -114,64 +114,70 @@ func main() {
}
chunksCh := make(chan int64)
+ wg := &sync.WaitGroup{}
+
+ // Start worker threads
+ wg.Add(nbThreads)
+ for i := 0; i < nbThreads; i++ {
+ go work(i, wg, chunksCh, isLocal, dry, maxChunkSize, f, baseUrl, filedropUUID)
+ }
+
+ // Provide worker threads with tasks to do
go func() {
for chunkNum := int64(0); chunkNum < nbChunks; chunkNum++ {
chunksCh <- chunkNum
}
+ // closing the channel will ensure all workers exit gracefully
close(chunksCh)
}()
- wg := &sync.WaitGroup{}
- wg.Add(nbThreads)
- for i := 0; i < nbThreads; i++ {
- go func(i int, wg *sync.WaitGroup, chunksCh chan int64) {
-
- client := doGetClient(isLocal)
-
- buf := make([]byte, maxChunkSize)
- for chunkNum := range chunksCh {
- offset := chunkNum * maxChunkSize
- n, _ := f.ReadAt(buf, offset)
- log.Printf("Thread #%03d | chunk #%03d | read %d | from %d to %d\n", i, chunkNum, n, offset, offset+int64(n))
- if !dry {
- hasToSucceed(func() error {
- body := new(bytes.Buffer)
- w := multipart.NewWriter(body)
- partFileName := fmt.Sprintf("part_%d", chunkNum)
- part, err := w.CreateFormFile("file", partFileName)
- if err != nil {
- return err
- }
- if _, err := part.Write(buf[:n]); err != nil {
- return err
- }
- if err := w.Close(); err != nil {
- return err
- }
-
- req, _ := http.NewRequest(http.MethodPost, baseUrl+"/file-drop/"+filedropUUID+"/tmp", body)
- req.Header.Set("User-Agent", userAgent)
- req.Header.Set("Content-Type", w.FormDataContentType())
- resp, err := client.Do(req)
- if err != nil {
- return err
- }
- defer resp.Body.Close()
- if resp.StatusCode != http.StatusOK {
- return fmt.Errorf("invalid status code %s", resp.Status)
- }
- return nil
- })
- }
- }
- wg.Done()
- }(i, wg, chunksCh)
- }
-
+ // Wait for all workers to have completed
wg.Wait()
log.Printf("All done\n")
}
+func work(i int, wg *sync.WaitGroup, chunksCh chan int64, isLocal, dry bool, maxChunkSize int64, f *os.File, baseUrl, filedropUUID string) {
+ client := doGetClient(isLocal)
+
+ buf := make([]byte, maxChunkSize)
+ for chunkNum := range chunksCh {
+ offset := chunkNum * maxChunkSize
+ n, _ := f.ReadAt(buf, offset)
+ log.Printf("Thread #%03d | chunk #%03d | read %d | from %d to %d\n", i, chunkNum, n, offset, offset+int64(n))
+ if !dry {
+ hasToSucceed(func() error {
+ body := new(bytes.Buffer)
+ w := multipart.NewWriter(body)
+ partFileName := fmt.Sprintf("part_%d", chunkNum)
+ part, err := w.CreateFormFile("file", partFileName)
+ if err != nil {
+ return err
+ }
+ if _, err := part.Write(buf[:n]); err != nil {
+ return err
+ }
+ if err := w.Close(); err != nil {
+ return err
+ }
+
+ req, _ := http.NewRequest(http.MethodPost, baseUrl+"/file-drop/"+filedropUUID+"/tmp", body)
+ req.Header.Set("User-Agent", userAgent)
+ req.Header.Set("Content-Type", w.FormDataContentType())
+ resp, err := client.Do(req)
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != http.StatusOK {
+ return fmt.Errorf("invalid status code %s", resp.Status)
+ }
+ return nil
+ })
+ }
+ }
+ wg.Done()
+}
+
func doGetClient(isLocal bool) (client *http.Client) {
hasToSucceed(func() error {
if isLocal {