dkforest

A forum and chat platform (onion)
git clone https://git.dasho.dev/n0tr1v/dkforest.git
Log | Files | Refs | LICENSE

main.go (9638B)


      1 package main
      2 
      3 import (
      4 	"bytes"
      5 	"context"
      6 	"crypto/sha256"
      7 	"encoding/hex"
      8 	"flag"
      9 	"fmt"
     10 	"github.com/dustin/go-humanize"
     11 	"github.com/sirupsen/logrus"
     12 	"golang.org/x/net/proxy"
     13 	"io"
     14 	"math"
     15 	"math/rand"
     16 	"mime/multipart"
     17 	"net"
     18 	"net/http"
     19 	"net/http/cookiejar"
     20 	"net/url"
     21 	"os"
     22 	"strconv"
     23 	"strings"
     24 	"sync"
     25 	"sync/atomic"
     26 	"time"
     27 )
     28 
     29 const (
     30 	userAgent     = "Mozilla/5.0 (Windows NT 10.0; rv:102.0) Gecko/20100101 Firefox/102.0"
     31 	dkfBaseURL    = "http://dkforestseeaaq2dqz2uflmlsybvnq2irzn4ygyvu53oazyorednviid.onion"
     32 	localhostAddr = "http://127.0.0.1:8080"
     33 	torProxyAddr  = "127.0.0.1:9050"
     34 )
     35 
     36 var chunksCompleted int64 // atomic
     37 
     38 func main() {
     39 	customFormatter := new(logrus.TextFormatter)
     40 	customFormatter.TimestampFormat = "2006-01-02 15:04:05"
     41 	customFormatter.FullTimestamp = true
     42 	logrus.SetFormatter(customFormatter)
     43 
     44 	var nbThreads int
     45 	var filedropUUID string
     46 	var fileName string
     47 	var dry bool
     48 	var isLocal bool
     49 	var maxChunkSize int64
     50 	var httpTimeout time.Duration
     51 	filedropUUIDUsage := "dkf filedrop uuid"
     52 	fileNameUsage := "file to upload"
     53 	nbThreadsUsage := "nb threads"
     54 	nbThreadsDefaultValue := 20
     55 	chunkSizeUsage := "chunk size"
     56 	chunkSizeDefaultValue := int64(2 << 20) // 2MB
     57 	flag.DurationVar(&httpTimeout, "http-timeout", 2*time.Minute, "http timeout")
     58 	flag.StringVar(&filedropUUID, "uuid", "", filedropUUIDUsage)
     59 	flag.StringVar(&filedropUUID, "u", "", filedropUUIDUsage)
     60 	flag.StringVar(&fileName, "file", "", fileNameUsage)
     61 	flag.StringVar(&fileName, "f", "", fileNameUsage)
     62 	flag.IntVar(&nbThreads, "threads", nbThreadsDefaultValue, nbThreadsUsage)
     63 	flag.IntVar(&nbThreads, "t", nbThreadsDefaultValue, nbThreadsUsage)
     64 	flag.Int64Var(&maxChunkSize, "chunk-size", chunkSizeDefaultValue, chunkSizeUsage)
     65 	flag.Int64Var(&maxChunkSize, "c", chunkSizeDefaultValue, chunkSizeUsage)
     66 	flag.BoolVar(&dry, "dry", false, "dry run")
     67 	flag.BoolVar(&isLocal, "local", false, "localhost development")
     68 	flag.Parse()
     69 
     70 	baseUrl := Ternary(isLocal, localhostAddr, dkfBaseURL)
     71 
     72 	f, err := os.Open(fileName)
     73 	if err != nil {
     74 		logrus.Fatal(err.Error())
     75 	}
     76 	fs, err := f.Stat()
     77 	if err != nil {
     78 		logrus.Fatal(err.Error())
     79 	}
     80 
     81 	// Calculate sha256 of file
     82 	h := sha256.New()
     83 	if _, err := io.Copy(h, f); err != nil {
     84 		logrus.Fatalln(err)
     85 	}
     86 	fileSha256 := hex.EncodeToString(h.Sum(nil))
     87 
     88 	fileSize := fs.Size()
     89 	nbChunks := int64(math.Ceil(float64(fileSize) / float64(maxChunkSize)))
     90 
     91 	// Print out information about the file
     92 	{
     93 		logrus.Infof("filedrop UUID: %s\n", filedropUUID)
     94 		logrus.Infof("         file: %s\n", fs.Name())
     95 		logrus.Infof("       sha256: %s\n", fileSha256)
     96 		logrus.Infof("    file size: %s (%s)\n", humanize.Bytes(uint64(fileSize)), humanize.Comma(fileSize))
     97 		logrus.Infof("  chunks size: %s (%s)\n", humanize.Bytes(uint64(maxChunkSize)), humanize.Comma(maxChunkSize))
     98 		logrus.Infof("    nb chunks: %d\n", nbChunks)
     99 		logrus.Infof("   nb threads: %d\n", nbThreads)
    100 		logrus.Infof(" http timeout: %s\n", ShortDur(httpTimeout))
    101 		if dry {
    102 			logrus.Infof("      dry run: %t\n", dry)
    103 		}
    104 		logrus.Infof(strings.Repeat("-", 80))
    105 	}
    106 
    107 	start := time.Now()
    108 
    109 	// Init the filedrop and send metadata about the file
    110 	if !dry {
    111 		logrus.Debug("sending metadata")
    112 		client := doGetClient(isLocal, httpTimeout)
    113 		body := url.Values{}
    114 		body.Set("init", "1")
    115 		body.Set("fileName", fs.Name())
    116 		body.Set("fileSize", strconv.FormatInt(fileSize, 10))
    117 		body.Set("fileSha256", fileSha256)
    118 		body.Set("chunkSize", strconv.FormatInt(maxChunkSize, 10))
    119 		body.Set("nbChunks", strconv.FormatInt(nbChunks, 10))
    120 		req, _ := http.NewRequest(http.MethodPost, baseUrl+"/file-drop/"+filedropUUID+"/dkfupload", strings.NewReader(body.Encode()))
    121 		req.Header.Set("User-Agent", userAgent)
    122 		req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
    123 		resp, err := client.Do(req)
    124 		if err != nil {
    125 			logrus.Fatal(err)
    126 		}
    127 		defer resp.Body.Close()
    128 		if resp.StatusCode != http.StatusOK {
    129 			logrus.Fatal(fmt.Errorf("invalid status code %s", resp.Status))
    130 		}
    131 		logrus.Debug("done sending metadata")
    132 	}
    133 
    134 	chunksCh := make(chan int64)
    135 	wg := &sync.WaitGroup{}
    136 
    137 	// Provide worker threads with tasks to do
    138 	go func() {
    139 		for chunkNum := int64(0); chunkNum < nbChunks; chunkNum++ {
    140 			chunksCh <- chunkNum
    141 		}
    142 		// closing the channel will ensure all workers exit gracefully
    143 		close(chunksCh)
    144 	}()
    145 
    146 	// Start worker threads
    147 	wg.Add(nbThreads)
    148 	for i := 0; i < nbThreads; i++ {
    149 		go work(i, wg, chunksCh, isLocal, dry, maxChunkSize, nbChunks, f, baseUrl, filedropUUID, httpTimeout)
    150 		time.Sleep(25 * time.Millisecond)
    151 	}
    152 
    153 	// Wait for all workers to have completed
    154 	wg.Wait()
    155 
    156 	if !dry {
    157 		client := doGetClient(isLocal, httpTimeout)
    158 		body := url.Values{}
    159 		body.Set("completed", "1")
    160 		req, _ := http.NewRequest(http.MethodPost, baseUrl+"/file-drop/"+filedropUUID+"/dkfupload", strings.NewReader(body.Encode()))
    161 		req.Header.Set("User-Agent", userAgent)
    162 		req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
    163 		resp, err := client.Do(req)
    164 		if err != nil {
    165 			logrus.Fatal(err)
    166 		}
    167 		defer resp.Body.Close()
    168 		if resp.StatusCode != http.StatusOK {
    169 			logrus.Fatal(fmt.Errorf("invalid status code %s", resp.Status))
    170 		}
    171 	}
    172 
    173 	logrus.Infof("All done in %s\n", ShortDur(time.Since(start)))
    174 }
    175 
    176 func work(i int, wg *sync.WaitGroup, chunksCh chan int64, isLocal, dry bool, maxChunkSize, nbChunks int64, f *os.File, baseUrl, filedropUUID string, httpTimeout time.Duration) {
    177 	client := doGetClient(isLocal, httpTimeout)
    178 
    179 	buf := make([]byte, maxChunkSize)
    180 	for chunkNum := range chunksCh {
    181 		start := time.Now()
    182 		offset := chunkNum * maxChunkSize
    183 		n, _ := f.ReadAt(buf, offset)
    184 		logrus.Infof("Thread #%03d | chunk #%03d | read %d | from %d to %d\n", i, chunkNum, n, offset, offset+int64(n))
    185 		if !dry {
    186 			hasToSucceed(func() error {
    187 				partFileName := fmt.Sprintf("part_%d", chunkNum)
    188 
    189 				// Ask server if he already has the chunk
    190 				{
    191 					body := url.Values{}
    192 					body.Set("chunkFileName", partFileName)
    193 					req, _ := http.NewRequest(http.MethodPost, baseUrl+"/file-drop/"+filedropUUID+"/dkfupload", strings.NewReader(body.Encode()))
    194 					req.Header.Set("User-Agent", userAgent)
    195 					req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
    196 					resp, err := client.Do(req)
    197 					if err != nil {
    198 						if os.IsTimeout(err) {
    199 							logrus.Infof("Thread #%03d gets a new client\n", i)
    200 							client = doGetClient(isLocal, httpTimeout)
    201 						}
    202 						return err
    203 					}
    204 					defer resp.Body.Close()
    205 					// We use teapot status (because why not) to express that we already have the chunk
    206 					if resp.StatusCode == http.StatusTeapot {
    207 						logrus.Infof("Thread #%03d | server already has chunk #%03d; skip", i, chunkNum)
    208 						return nil
    209 					}
    210 				}
    211 
    212 				start = time.Now()
    213 				body := new(bytes.Buffer)
    214 				w := multipart.NewWriter(body)
    215 				part, err := w.CreateFormFile("file", partFileName)
    216 				if err != nil {
    217 					return err
    218 				}
    219 				if _, err := part.Write(buf[:n]); err != nil {
    220 					return err
    221 				}
    222 				if err := w.Close(); err != nil {
    223 					return err
    224 				}
    225 
    226 				req, _ := http.NewRequest(http.MethodPost, baseUrl+"/file-drop/"+filedropUUID+"/dkfupload", body)
    227 				req.Header.Set("User-Agent", userAgent)
    228 				req.Header.Set("Content-Type", w.FormDataContentType())
    229 				resp, err := client.Do(req)
    230 				if err != nil {
    231 					if os.IsTimeout(err) {
    232 						logrus.Infof("Thread #%03d gets a new client\n", i)
    233 						client = doGetClient(isLocal, httpTimeout)
    234 					}
    235 					return err
    236 				}
    237 				defer resp.Body.Close()
    238 				if resp.StatusCode != http.StatusOK {
    239 					return fmt.Errorf("invalid status code %s", resp.Status)
    240 				}
    241 				return nil
    242 			})
    243 		}
    244 		newChunksCompleted := atomic.AddInt64(&chunksCompleted, 1)
    245 		logrus.Infof("Thread #%03d | chunk #%03d | completed in %s (%d/%d)\n", i, chunkNum, ShortDur(time.Since(start)), newChunksCompleted, nbChunks)
    246 	}
    247 	wg.Done()
    248 }
    249 
    250 func doGetClient(isLocal bool, httpTimeout time.Duration) (client *http.Client) {
    251 	hasToSucceed(func() (err error) {
    252 		if isLocal {
    253 			client = http.DefaultClient
    254 		} else {
    255 			token := GenerateTokenN(8)
    256 			if client, err = GetHttpClient(&proxy.Auth{User: token, Password: token}); err != nil {
    257 				return err
    258 			}
    259 		}
    260 		return
    261 	})
    262 	client.Timeout = httpTimeout
    263 	return
    264 }
    265 
    266 // Will keep retrying a callback until no error is returned
    267 func hasToSucceed(clb func() error) {
    268 	waitTime := 5
    269 	for {
    270 		err := clb()
    271 		if err == nil {
    272 			break
    273 		}
    274 		logrus.Errorf("wait %d seconds before retry; %v\n", waitTime, err)
    275 		time.Sleep(time.Duration(waitTime) * time.Second)
    276 	}
    277 }
    278 
    279 // GetHttpClient http client that uses tor proxy
    280 func GetHttpClient(auth *proxy.Auth) (*http.Client, error) {
    281 	dialer, err := proxy.SOCKS5("tcp", torProxyAddr, auth, proxy.Direct)
    282 	if err != nil {
    283 		return nil, fmt.Errorf("failed to connect to tor proxy : %w", err)
    284 	}
    285 	transport := &http.Transport{
    286 		DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
    287 			return dialer.Dial(network, addr)
    288 		},
    289 	}
    290 	jar, err := cookiejar.New(nil)
    291 	if err != nil {
    292 		return nil, fmt.Errorf("failed to create cookie jar : %w", err)
    293 	}
    294 	return &http.Client{Transport: transport, Jar: jar}, nil
    295 }
    296 
    297 // GenerateTokenN generates a random printable string from N bytes
    298 func GenerateTokenN(n int) string {
    299 	b := make([]byte, n)
    300 	_, _ = rand.Read(b)
    301 	return hex.EncodeToString(b)
    302 }
    303 
    304 func Ternary[T any](predicate bool, a, b T) T {
    305 	if predicate {
    306 		return a
    307 	}
    308 	return b
    309 }
    310 
    311 func ShortDur(d time.Duration) string {
    312 	if d < time.Minute {
    313 		d = d.Round(time.Millisecond)
    314 	} else {
    315 		d = d.Round(time.Second)
    316 	}
    317 	s := d.String()
    318 	if strings.HasSuffix(s, "m0s") {
    319 		s = s[:len(s)-2]
    320 	}
    321 	if strings.HasSuffix(s, "h0m") {
    322 		s = s[:len(s)-2]
    323 	}
    324 	return s
    325 }