main.go (9459B)
1 package main 2 3 import ( 4 "context" 5 "crypto/aes" 6 "crypto/cipher" 7 "crypto/sha256" 8 "encoding/base64" 9 "encoding/hex" 10 "flag" 11 "fmt" 12 "github.com/dustin/go-humanize" 13 "github.com/sirupsen/logrus" 14 "golang.org/x/net/proxy" 15 "io" 16 "math/rand" 17 "net" 18 "net/http" 19 "net/http/cookiejar" 20 "net/url" 21 "os" 22 "path/filepath" 23 "sort" 24 "strconv" 25 "strings" 26 "sync" 27 "sync/atomic" 28 "time" 29 ) 30 31 const ( 32 userAgent = "Mozilla/5.0 (Windows NT 10.0; rv:102.0) Gecko/20100101 Firefox/102.0" 33 dkfBaseURL = "http://dkforestseeaaq2dqz2uflmlsybvnq2irzn4ygyvu53oazyorednviid.onion" 34 localhostAddr = "http://127.0.0.1:8080" 35 torProxyAddr = "127.0.0.1:9050" 36 ) 37 38 var chunksCompleted int64 // atomic 39 40 func main() { 41 customFormatter := new(logrus.TextFormatter) 42 customFormatter.TimestampFormat = "2006-01-02 15:04:05" 43 customFormatter.FullTimestamp = true 44 logrus.SetFormatter(customFormatter) 45 46 var apiKey string 47 var nbThreads int 48 var filedropUUID string 49 var isLocal bool 50 var httpTimeout time.Duration 51 apiKeyUsage := "api key" 52 filedropUUIDUsage := "dkf filedrop uuid" 53 nbThreadsUsage := "nb threads" 54 nbThreadsDefaultValue := 20 55 flag.StringVar(&apiKey, "api-key", "", apiKeyUsage) 56 flag.StringVar(&apiKey, "a", "", apiKeyUsage) 57 flag.DurationVar(&httpTimeout, "http-timeout", 2*time.Minute, "http timeout") 58 flag.StringVar(&filedropUUID, "uuid", "", filedropUUIDUsage) 59 flag.StringVar(&filedropUUID, "u", "", filedropUUIDUsage) 60 flag.IntVar(&nbThreads, "threads", nbThreadsDefaultValue, nbThreadsUsage) 61 flag.IntVar(&nbThreads, "t", nbThreadsDefaultValue, nbThreadsUsage) 62 flag.BoolVar(&isLocal, "local", false, "localhost development") 63 flag.Parse() 64 65 baseUrl := Ternary(isLocal, localhostAddr, dkfBaseURL) 66 endpoint := baseUrl + "/api/v1/file-drop/" + filedropUUID + "/dkfdownload" 67 68 client := doGetClient(isLocal, httpTimeout) 69 70 // Download metadata file 71 by, err := os.ReadFile(filepath.Join(filedropUUID, "metadata")) 72 if err != nil { 73 body := url.Values{} 74 body.Set("init", "1") 75 req, _ := http.NewRequest(http.MethodPost, endpoint, strings.NewReader(body.Encode())) 76 req.Header.Set("User-Agent", userAgent) 77 req.Header.Set("Content-Type", "application/x-www-form-urlencoded") 78 req.Header.Set("DKF_API_KEY", apiKey) 79 resp, err := client.Do(req) 80 if err != nil { 81 logrus.Fatalln(err) 82 } 83 defer resp.Body.Close() 84 if resp.StatusCode == http.StatusUnauthorized { 85 logrus.Fatalln(resp.Status) 86 } 87 by, _ = io.ReadAll(resp.Body) 88 89 _ = os.Mkdir(filedropUUID, 0755) 90 _ = os.WriteFile(filepath.Join(filedropUUID, "metadata"), by, 0644) 91 } 92 93 // Read metadata information 94 lines := strings.Split(string(by), "\n") 95 origFileName := lines[0] 96 password, _ := base64.StdEncoding.DecodeString(lines[1]) 97 iv, _ := base64.StdEncoding.DecodeString(lines[2]) 98 fileSha256 := lines[3] 99 fileSize, _ := strconv.ParseInt(lines[4], 10, 64) 100 nbChunks, _ := strconv.ParseInt(lines[5], 10, 64) 101 102 // Print out information about the file 103 { 104 logrus.Infof("filedrop UUID: %s\n", filedropUUID) 105 logrus.Infof(" file: %s\n", origFileName) 106 logrus.Infof(" sha256: %s\n", fileSha256) 107 logrus.Infof(" file size: %s (%s)\n", humanize.Bytes(uint64(fileSize)), humanize.Comma(fileSize)) 108 logrus.Infof(" nb chunks: %d\n", nbChunks) 109 logrus.Infof(" nb threads: %d\n", nbThreads) 110 logrus.Infof(" http timeout: %s\n", ShortDur(httpTimeout)) 111 logrus.Infof(strings.Repeat("-", 80)) 112 } 113 114 start := time.Now() 115 116 chunksCh := make(chan int64) 117 118 // Provide worker threads with tasks to do 119 go func() { 120 for chunkNum := int64(0); chunkNum < nbChunks; chunkNum++ { 121 chunksCh <- chunkNum 122 } 123 // closing the channel will ensure all workers exit gracefully 124 close(chunksCh) 125 }() 126 127 // Download every chunks 128 wg := &sync.WaitGroup{} 129 wg.Add(nbThreads) 130 for i := 0; i < nbThreads; i++ { 131 go work(i, wg, endpoint, filedropUUID, apiKey, chunksCh, isLocal, httpTimeout, nbChunks) 132 time.Sleep(25 * time.Millisecond) 133 } 134 wg.Wait() 135 logrus.Infof("all chunks downloaded in %s", ShortDur(time.Since(start))) 136 137 // Get sorted chunks file names 138 dirEntries, _ := os.ReadDir(filedropUUID) 139 fileNames := make([]string, 0) 140 for _, dirEntry := range dirEntries { 141 if !strings.HasPrefix(dirEntry.Name(), "part_") { 142 continue 143 } 144 fileNames = append(fileNames, dirEntry.Name()) 145 } 146 sort.Slice(fileNames, func(i, j int) bool { 147 a := strings.Split(fileNames[i], "_")[1] 148 b := strings.Split(fileNames[j], "_")[1] 149 numA, _ := strconv.Atoi(a) 150 numB, _ := strconv.Atoi(b) 151 return numA < numB 152 }) 153 154 // Create final downloaded file 155 outFile, err := os.OpenFile(filepath.Join(filedropUUID, origFileName), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644) 156 if err != nil { 157 logrus.Fatalln(err) 158 } 159 defer outFile.Close() 160 161 // Decryption stream 162 block, err := aes.NewCipher(password) 163 if err != nil { 164 logrus.Fatalln(err) 165 } 166 stream := cipher.NewCTR(block, iv) 167 168 h := sha256.New() 169 170 logrus.Info("Decrypting final file & hash") 171 for _, fileName := range fileNames { 172 // Read chunk file 173 by, err := os.ReadFile(filepath.Join(filedropUUID, fileName)) 174 if err != nil { 175 logrus.Fatalln(err) 176 } 177 h.Write(by) 178 // Decrypt chunk file 179 dst := make([]byte, len(by)) 180 stream.XORKeyStream(dst, by) 181 // Write to final file 182 _, err = outFile.Write(dst) 183 } 184 185 // Ensure downloaded file sha256 is correct 186 newFileSha256 := hex.EncodeToString(h.Sum(nil)) 187 if newFileSha256 == fileSha256 { 188 logrus.Infof("downloaded sha256 is valid %s", newFileSha256) 189 } else { 190 logrus.Fatalf("downloaded sha256 doesn't match %s != %s", newFileSha256, fileSha256) 191 } 192 193 // Cleanup 194 logrus.Info("cleanup chunks files") 195 for _, chunkFileName := range fileNames { 196 _ = os.Remove(filepath.Join(filedropUUID, chunkFileName)) 197 } 198 _ = os.Remove(filepath.Join(filedropUUID, "metadata")) 199 200 logrus.Infof("all done in %s", ShortDur(time.Since(start))) 201 } 202 203 func work(i int, wg *sync.WaitGroup, endpoint, filedropUUID, apiKey string, chunksCh chan int64, isLocal bool, httpTimeout time.Duration, nbChunks int64) { 204 defer wg.Done() 205 client := doGetClient(isLocal, httpTimeout) 206 for chunkNum := range chunksCh { 207 start := time.Now() 208 logrus.Infof("thread #%03d | chunk #%03d", i, chunkNum) 209 hasToSucceed(func() error { 210 start = time.Now() 211 body := url.Values{} 212 body.Set("chunk", strconv.FormatInt(chunkNum, 10)) 213 req, err := http.NewRequest(http.MethodPost, endpoint, strings.NewReader(body.Encode())) 214 if err != nil { 215 return err 216 } 217 req.Header.Set("User-Agent", userAgent) 218 req.Header.Set("Content-Type", "application/x-www-form-urlencoded") 219 req.Header.Set("DKF_API_KEY", apiKey) 220 resp, err := client.Do(req) 221 if err != nil { 222 if os.IsTimeout(err) { 223 logrus.Infof("thread #%03d gets a new client\n", i) 224 client = doGetClient(isLocal, httpTimeout) 225 } 226 return err 227 } 228 defer resp.Body.Close() 229 if resp.StatusCode == http.StatusUnauthorized { 230 logrus.Fatalln(resp.Status) 231 } else if resp.StatusCode != http.StatusOK { 232 return fmt.Errorf("thread #%03d | chunk #%03d | invalid status code %s", i, chunkNum, resp.Status) 233 } 234 f, err := os.OpenFile(filepath.Join(filedropUUID, "part_"+strconv.FormatInt(chunkNum, 10)), os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644) 235 if err != nil { 236 return err 237 } 238 defer f.Close() 239 if _, err = io.Copy(f, resp.Body); err != nil { 240 if os.IsTimeout(err) { 241 logrus.Infof("thread #%03d gets a new client\n", i) 242 client = doGetClient(isLocal, httpTimeout) 243 } 244 return err 245 } 246 return nil 247 }) 248 newChunksCompleted := atomic.AddInt64(&chunksCompleted, 1) 249 logrus.Infof("thread #%03d | chunk #%03d | completed in %s (%d/%d)\n", i, chunkNum, ShortDur(time.Since(start)), newChunksCompleted, nbChunks) 250 } 251 } 252 253 func doGetClient(isLocal bool, httpTimeout time.Duration) (client *http.Client) { 254 hasToSucceed(func() (err error) { 255 if isLocal { 256 client = http.DefaultClient 257 } else { 258 token := GenerateTokenN(8) 259 if client, err = GetHttpClient(&proxy.Auth{User: token, Password: token}); err != nil { 260 return err 261 } 262 } 263 return 264 }) 265 client.Timeout = httpTimeout 266 return 267 } 268 269 // Will keep retrying a callback until no error is returned 270 func hasToSucceed(clb func() error) { 271 waitTime := 5 272 for { 273 err := clb() 274 if err == nil { 275 break 276 } 277 logrus.Errorf("wait %d seconds before retry; %v\n", waitTime, err) 278 time.Sleep(time.Duration(waitTime) * time.Second) 279 } 280 } 281 282 // GetHttpClient http client that uses tor proxy 283 func GetHttpClient(auth *proxy.Auth) (*http.Client, error) { 284 dialer, err := proxy.SOCKS5("tcp", torProxyAddr, auth, proxy.Direct) 285 if err != nil { 286 return nil, fmt.Errorf("failed to connect to tor proxy : %w", err) 287 } 288 transport := &http.Transport{ 289 DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { 290 return dialer.Dial(network, addr) 291 }, 292 } 293 jar, err := cookiejar.New(nil) 294 if err != nil { 295 return nil, fmt.Errorf("failed to create cookie jar : %w", err) 296 } 297 return &http.Client{Transport: transport, Jar: jar}, nil 298 } 299 300 // GenerateTokenN generates a random printable string from N bytes 301 func GenerateTokenN(n int) string { 302 b := make([]byte, n) 303 _, _ = rand.Read(b) 304 return hex.EncodeToString(b) 305 } 306 307 func Ternary[T any](predicate bool, a, b T) T { 308 if predicate { 309 return a 310 } 311 return b 312 } 313 314 func ShortDur(d time.Duration) string { 315 if d < time.Minute { 316 d = d.Round(time.Millisecond) 317 } else { 318 d = d.Round(time.Second) 319 } 320 s := d.String() 321 if strings.HasSuffix(s, "m0s") { 322 s = s[:len(s)-2] 323 } 324 if strings.HasSuffix(s, "h0m") { 325 s = s[:len(s)-2] 326 } 327 return s 328 }