main.go (9638B)
1 package main 2 3 import ( 4 "bytes" 5 "context" 6 "crypto/sha256" 7 "encoding/hex" 8 "flag" 9 "fmt" 10 "github.com/dustin/go-humanize" 11 "github.com/sirupsen/logrus" 12 "golang.org/x/net/proxy" 13 "io" 14 "math" 15 "math/rand" 16 "mime/multipart" 17 "net" 18 "net/http" 19 "net/http/cookiejar" 20 "net/url" 21 "os" 22 "strconv" 23 "strings" 24 "sync" 25 "sync/atomic" 26 "time" 27 ) 28 29 const ( 30 userAgent = "Mozilla/5.0 (Windows NT 10.0; rv:102.0) Gecko/20100101 Firefox/102.0" 31 dkfBaseURL = "http://dkforestseeaaq2dqz2uflmlsybvnq2irzn4ygyvu53oazyorednviid.onion" 32 localhostAddr = "http://127.0.0.1:8080" 33 torProxyAddr = "127.0.0.1:9050" 34 ) 35 36 var chunksCompleted int64 // atomic 37 38 func main() { 39 customFormatter := new(logrus.TextFormatter) 40 customFormatter.TimestampFormat = "2006-01-02 15:04:05" 41 customFormatter.FullTimestamp = true 42 logrus.SetFormatter(customFormatter) 43 44 var nbThreads int 45 var filedropUUID string 46 var fileName string 47 var dry bool 48 var isLocal bool 49 var maxChunkSize int64 50 var httpTimeout time.Duration 51 filedropUUIDUsage := "dkf filedrop uuid" 52 fileNameUsage := "file to upload" 53 nbThreadsUsage := "nb threads" 54 nbThreadsDefaultValue := 20 55 chunkSizeUsage := "chunk size" 56 chunkSizeDefaultValue := int64(2 << 20) // 2MB 57 flag.DurationVar(&httpTimeout, "http-timeout", 2*time.Minute, "http timeout") 58 flag.StringVar(&filedropUUID, "uuid", "", filedropUUIDUsage) 59 flag.StringVar(&filedropUUID, "u", "", filedropUUIDUsage) 60 flag.StringVar(&fileName, "file", "", fileNameUsage) 61 flag.StringVar(&fileName, "f", "", fileNameUsage) 62 flag.IntVar(&nbThreads, "threads", nbThreadsDefaultValue, nbThreadsUsage) 63 flag.IntVar(&nbThreads, "t", nbThreadsDefaultValue, nbThreadsUsage) 64 flag.Int64Var(&maxChunkSize, "chunk-size", chunkSizeDefaultValue, chunkSizeUsage) 65 flag.Int64Var(&maxChunkSize, "c", chunkSizeDefaultValue, chunkSizeUsage) 66 flag.BoolVar(&dry, "dry", false, "dry run") 67 flag.BoolVar(&isLocal, "local", false, "localhost development") 68 flag.Parse() 69 70 baseUrl := Ternary(isLocal, localhostAddr, dkfBaseURL) 71 72 f, err := os.Open(fileName) 73 if err != nil { 74 logrus.Fatal(err.Error()) 75 } 76 fs, err := f.Stat() 77 if err != nil { 78 logrus.Fatal(err.Error()) 79 } 80 81 // Calculate sha256 of file 82 h := sha256.New() 83 if _, err := io.Copy(h, f); err != nil { 84 logrus.Fatalln(err) 85 } 86 fileSha256 := hex.EncodeToString(h.Sum(nil)) 87 88 fileSize := fs.Size() 89 nbChunks := int64(math.Ceil(float64(fileSize) / float64(maxChunkSize))) 90 91 // Print out information about the file 92 { 93 logrus.Infof("filedrop UUID: %s\n", filedropUUID) 94 logrus.Infof(" file: %s\n", fs.Name()) 95 logrus.Infof(" sha256: %s\n", fileSha256) 96 logrus.Infof(" file size: %s (%s)\n", humanize.Bytes(uint64(fileSize)), humanize.Comma(fileSize)) 97 logrus.Infof(" chunks size: %s (%s)\n", humanize.Bytes(uint64(maxChunkSize)), humanize.Comma(maxChunkSize)) 98 logrus.Infof(" nb chunks: %d\n", nbChunks) 99 logrus.Infof(" nb threads: %d\n", nbThreads) 100 logrus.Infof(" http timeout: %s\n", ShortDur(httpTimeout)) 101 if dry { 102 logrus.Infof(" dry run: %t\n", dry) 103 } 104 logrus.Infof(strings.Repeat("-", 80)) 105 } 106 107 start := time.Now() 108 109 // Init the filedrop and send metadata about the file 110 if !dry { 111 logrus.Debug("sending metadata") 112 client := doGetClient(isLocal, httpTimeout) 113 body := url.Values{} 114 body.Set("init", "1") 115 body.Set("fileName", fs.Name()) 116 body.Set("fileSize", strconv.FormatInt(fileSize, 10)) 117 body.Set("fileSha256", fileSha256) 118 body.Set("chunkSize", strconv.FormatInt(maxChunkSize, 10)) 119 body.Set("nbChunks", strconv.FormatInt(nbChunks, 10)) 120 req, _ := http.NewRequest(http.MethodPost, baseUrl+"/file-drop/"+filedropUUID+"/dkfupload", strings.NewReader(body.Encode())) 121 req.Header.Set("User-Agent", userAgent) 122 req.Header.Set("Content-Type", "application/x-www-form-urlencoded") 123 resp, err := client.Do(req) 124 if err != nil { 125 logrus.Fatal(err) 126 } 127 defer resp.Body.Close() 128 if resp.StatusCode != http.StatusOK { 129 logrus.Fatal(fmt.Errorf("invalid status code %s", resp.Status)) 130 } 131 logrus.Debug("done sending metadata") 132 } 133 134 chunksCh := make(chan int64) 135 wg := &sync.WaitGroup{} 136 137 // Provide worker threads with tasks to do 138 go func() { 139 for chunkNum := int64(0); chunkNum < nbChunks; chunkNum++ { 140 chunksCh <- chunkNum 141 } 142 // closing the channel will ensure all workers exit gracefully 143 close(chunksCh) 144 }() 145 146 // Start worker threads 147 wg.Add(nbThreads) 148 for i := 0; i < nbThreads; i++ { 149 go work(i, wg, chunksCh, isLocal, dry, maxChunkSize, nbChunks, f, baseUrl, filedropUUID, httpTimeout) 150 time.Sleep(25 * time.Millisecond) 151 } 152 153 // Wait for all workers to have completed 154 wg.Wait() 155 156 if !dry { 157 client := doGetClient(isLocal, httpTimeout) 158 body := url.Values{} 159 body.Set("completed", "1") 160 req, _ := http.NewRequest(http.MethodPost, baseUrl+"/file-drop/"+filedropUUID+"/dkfupload", strings.NewReader(body.Encode())) 161 req.Header.Set("User-Agent", userAgent) 162 req.Header.Set("Content-Type", "application/x-www-form-urlencoded") 163 resp, err := client.Do(req) 164 if err != nil { 165 logrus.Fatal(err) 166 } 167 defer resp.Body.Close() 168 if resp.StatusCode != http.StatusOK { 169 logrus.Fatal(fmt.Errorf("invalid status code %s", resp.Status)) 170 } 171 } 172 173 logrus.Infof("All done in %s\n", ShortDur(time.Since(start))) 174 } 175 176 func work(i int, wg *sync.WaitGroup, chunksCh chan int64, isLocal, dry bool, maxChunkSize, nbChunks int64, f *os.File, baseUrl, filedropUUID string, httpTimeout time.Duration) { 177 client := doGetClient(isLocal, httpTimeout) 178 179 buf := make([]byte, maxChunkSize) 180 for chunkNum := range chunksCh { 181 start := time.Now() 182 offset := chunkNum * maxChunkSize 183 n, _ := f.ReadAt(buf, offset) 184 logrus.Infof("Thread #%03d | chunk #%03d | read %d | from %d to %d\n", i, chunkNum, n, offset, offset+int64(n)) 185 if !dry { 186 hasToSucceed(func() error { 187 partFileName := fmt.Sprintf("part_%d", chunkNum) 188 189 // Ask server if he already has the chunk 190 { 191 body := url.Values{} 192 body.Set("chunkFileName", partFileName) 193 req, _ := http.NewRequest(http.MethodPost, baseUrl+"/file-drop/"+filedropUUID+"/dkfupload", strings.NewReader(body.Encode())) 194 req.Header.Set("User-Agent", userAgent) 195 req.Header.Set("Content-Type", "application/x-www-form-urlencoded") 196 resp, err := client.Do(req) 197 if err != nil { 198 if os.IsTimeout(err) { 199 logrus.Infof("Thread #%03d gets a new client\n", i) 200 client = doGetClient(isLocal, httpTimeout) 201 } 202 return err 203 } 204 defer resp.Body.Close() 205 // We use teapot status (because why not) to express that we already have the chunk 206 if resp.StatusCode == http.StatusTeapot { 207 logrus.Infof("Thread #%03d | server already has chunk #%03d; skip", i, chunkNum) 208 return nil 209 } 210 } 211 212 start = time.Now() 213 body := new(bytes.Buffer) 214 w := multipart.NewWriter(body) 215 part, err := w.CreateFormFile("file", partFileName) 216 if err != nil { 217 return err 218 } 219 if _, err := part.Write(buf[:n]); err != nil { 220 return err 221 } 222 if err := w.Close(); err != nil { 223 return err 224 } 225 226 req, _ := http.NewRequest(http.MethodPost, baseUrl+"/file-drop/"+filedropUUID+"/dkfupload", body) 227 req.Header.Set("User-Agent", userAgent) 228 req.Header.Set("Content-Type", w.FormDataContentType()) 229 resp, err := client.Do(req) 230 if err != nil { 231 if os.IsTimeout(err) { 232 logrus.Infof("Thread #%03d gets a new client\n", i) 233 client = doGetClient(isLocal, httpTimeout) 234 } 235 return err 236 } 237 defer resp.Body.Close() 238 if resp.StatusCode != http.StatusOK { 239 return fmt.Errorf("invalid status code %s", resp.Status) 240 } 241 return nil 242 }) 243 } 244 newChunksCompleted := atomic.AddInt64(&chunksCompleted, 1) 245 logrus.Infof("Thread #%03d | chunk #%03d | completed in %s (%d/%d)\n", i, chunkNum, ShortDur(time.Since(start)), newChunksCompleted, nbChunks) 246 } 247 wg.Done() 248 } 249 250 func doGetClient(isLocal bool, httpTimeout time.Duration) (client *http.Client) { 251 hasToSucceed(func() (err error) { 252 if isLocal { 253 client = http.DefaultClient 254 } else { 255 token := GenerateTokenN(8) 256 if client, err = GetHttpClient(&proxy.Auth{User: token, Password: token}); err != nil { 257 return err 258 } 259 } 260 return 261 }) 262 client.Timeout = httpTimeout 263 return 264 } 265 266 // Will keep retrying a callback until no error is returned 267 func hasToSucceed(clb func() error) { 268 waitTime := 5 269 for { 270 err := clb() 271 if err == nil { 272 break 273 } 274 logrus.Errorf("wait %d seconds before retry; %v\n", waitTime, err) 275 time.Sleep(time.Duration(waitTime) * time.Second) 276 } 277 } 278 279 // GetHttpClient http client that uses tor proxy 280 func GetHttpClient(auth *proxy.Auth) (*http.Client, error) { 281 dialer, err := proxy.SOCKS5("tcp", torProxyAddr, auth, proxy.Direct) 282 if err != nil { 283 return nil, fmt.Errorf("failed to connect to tor proxy : %w", err) 284 } 285 transport := &http.Transport{ 286 DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { 287 return dialer.Dial(network, addr) 288 }, 289 } 290 jar, err := cookiejar.New(nil) 291 if err != nil { 292 return nil, fmt.Errorf("failed to create cookie jar : %w", err) 293 } 294 return &http.Client{Transport: transport, Jar: jar}, nil 295 } 296 297 // GenerateTokenN generates a random printable string from N bytes 298 func GenerateTokenN(n int) string { 299 b := make([]byte, n) 300 _, _ = rand.Read(b) 301 return hex.EncodeToString(b) 302 } 303 304 func Ternary[T any](predicate bool, a, b T) T { 305 if predicate { 306 return a 307 } 308 return b 309 } 310 311 func ShortDur(d time.Duration) string { 312 if d < time.Minute { 313 d = d.Round(time.Millisecond) 314 } else { 315 d = d.Round(time.Second) 316 } 317 s := d.String() 318 if strings.HasSuffix(s, "m0s") { 319 s = s[:len(s)-2] 320 } 321 if strings.HasSuffix(s, "h0m") { 322 s = s[:len(s)-2] 323 } 324 return s 325 }