dkforest

A forum and chat platform (onion)
git clone https://git.dasho.dev/n0tr1v/dkforest.git
Log | Files | Refs | LICENSE

main.go (9459B)


      1 package main
      2 
      3 import (
      4 	"context"
      5 	"crypto/aes"
      6 	"crypto/cipher"
      7 	"crypto/sha256"
      8 	"encoding/base64"
      9 	"encoding/hex"
     10 	"flag"
     11 	"fmt"
     12 	"github.com/dustin/go-humanize"
     13 	"github.com/sirupsen/logrus"
     14 	"golang.org/x/net/proxy"
     15 	"io"
     16 	"math/rand"
     17 	"net"
     18 	"net/http"
     19 	"net/http/cookiejar"
     20 	"net/url"
     21 	"os"
     22 	"path/filepath"
     23 	"sort"
     24 	"strconv"
     25 	"strings"
     26 	"sync"
     27 	"sync/atomic"
     28 	"time"
     29 )
     30 
     31 const (
     32 	userAgent     = "Mozilla/5.0 (Windows NT 10.0; rv:102.0) Gecko/20100101 Firefox/102.0"
     33 	dkfBaseURL    = "http://dkforestseeaaq2dqz2uflmlsybvnq2irzn4ygyvu53oazyorednviid.onion"
     34 	localhostAddr = "http://127.0.0.1:8080"
     35 	torProxyAddr  = "127.0.0.1:9050"
     36 )
     37 
     38 var chunksCompleted int64 // atomic
     39 
     40 func main() {
     41 	customFormatter := new(logrus.TextFormatter)
     42 	customFormatter.TimestampFormat = "2006-01-02 15:04:05"
     43 	customFormatter.FullTimestamp = true
     44 	logrus.SetFormatter(customFormatter)
     45 
     46 	var apiKey string
     47 	var nbThreads int
     48 	var filedropUUID string
     49 	var isLocal bool
     50 	var httpTimeout time.Duration
     51 	apiKeyUsage := "api key"
     52 	filedropUUIDUsage := "dkf filedrop uuid"
     53 	nbThreadsUsage := "nb threads"
     54 	nbThreadsDefaultValue := 20
     55 	flag.StringVar(&apiKey, "api-key", "", apiKeyUsage)
     56 	flag.StringVar(&apiKey, "a", "", apiKeyUsage)
     57 	flag.DurationVar(&httpTimeout, "http-timeout", 2*time.Minute, "http timeout")
     58 	flag.StringVar(&filedropUUID, "uuid", "", filedropUUIDUsage)
     59 	flag.StringVar(&filedropUUID, "u", "", filedropUUIDUsage)
     60 	flag.IntVar(&nbThreads, "threads", nbThreadsDefaultValue, nbThreadsUsage)
     61 	flag.IntVar(&nbThreads, "t", nbThreadsDefaultValue, nbThreadsUsage)
     62 	flag.BoolVar(&isLocal, "local", false, "localhost development")
     63 	flag.Parse()
     64 
     65 	baseUrl := Ternary(isLocal, localhostAddr, dkfBaseURL)
     66 	endpoint := baseUrl + "/api/v1/file-drop/" + filedropUUID + "/dkfdownload"
     67 
     68 	client := doGetClient(isLocal, httpTimeout)
     69 
     70 	// Download metadata file
     71 	by, err := os.ReadFile(filepath.Join(filedropUUID, "metadata"))
     72 	if err != nil {
     73 		body := url.Values{}
     74 		body.Set("init", "1")
     75 		req, _ := http.NewRequest(http.MethodPost, endpoint, strings.NewReader(body.Encode()))
     76 		req.Header.Set("User-Agent", userAgent)
     77 		req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
     78 		req.Header.Set("DKF_API_KEY", apiKey)
     79 		resp, err := client.Do(req)
     80 		if err != nil {
     81 			logrus.Fatalln(err)
     82 		}
     83 		defer resp.Body.Close()
     84 		if resp.StatusCode == http.StatusUnauthorized {
     85 			logrus.Fatalln(resp.Status)
     86 		}
     87 		by, _ = io.ReadAll(resp.Body)
     88 
     89 		_ = os.Mkdir(filedropUUID, 0755)
     90 		_ = os.WriteFile(filepath.Join(filedropUUID, "metadata"), by, 0644)
     91 	}
     92 
     93 	// Read metadata information
     94 	lines := strings.Split(string(by), "\n")
     95 	origFileName := lines[0]
     96 	password, _ := base64.StdEncoding.DecodeString(lines[1])
     97 	iv, _ := base64.StdEncoding.DecodeString(lines[2])
     98 	fileSha256 := lines[3]
     99 	fileSize, _ := strconv.ParseInt(lines[4], 10, 64)
    100 	nbChunks, _ := strconv.ParseInt(lines[5], 10, 64)
    101 
    102 	// Print out information about the file
    103 	{
    104 		logrus.Infof("filedrop UUID: %s\n", filedropUUID)
    105 		logrus.Infof("         file: %s\n", origFileName)
    106 		logrus.Infof("       sha256: %s\n", fileSha256)
    107 		logrus.Infof("    file size: %s (%s)\n", humanize.Bytes(uint64(fileSize)), humanize.Comma(fileSize))
    108 		logrus.Infof("    nb chunks: %d\n", nbChunks)
    109 		logrus.Infof("   nb threads: %d\n", nbThreads)
    110 		logrus.Infof(" http timeout: %s\n", ShortDur(httpTimeout))
    111 		logrus.Infof(strings.Repeat("-", 80))
    112 	}
    113 
    114 	start := time.Now()
    115 
    116 	chunksCh := make(chan int64)
    117 
    118 	// Provide worker threads with tasks to do
    119 	go func() {
    120 		for chunkNum := int64(0); chunkNum < nbChunks; chunkNum++ {
    121 			chunksCh <- chunkNum
    122 		}
    123 		// closing the channel will ensure all workers exit gracefully
    124 		close(chunksCh)
    125 	}()
    126 
    127 	// Download every chunks
    128 	wg := &sync.WaitGroup{}
    129 	wg.Add(nbThreads)
    130 	for i := 0; i < nbThreads; i++ {
    131 		go work(i, wg, endpoint, filedropUUID, apiKey, chunksCh, isLocal, httpTimeout, nbChunks)
    132 		time.Sleep(25 * time.Millisecond)
    133 	}
    134 	wg.Wait()
    135 	logrus.Infof("all chunks downloaded in %s", ShortDur(time.Since(start)))
    136 
    137 	// Get sorted chunks file names
    138 	dirEntries, _ := os.ReadDir(filedropUUID)
    139 	fileNames := make([]string, 0)
    140 	for _, dirEntry := range dirEntries {
    141 		if !strings.HasPrefix(dirEntry.Name(), "part_") {
    142 			continue
    143 		}
    144 		fileNames = append(fileNames, dirEntry.Name())
    145 	}
    146 	sort.Slice(fileNames, func(i, j int) bool {
    147 		a := strings.Split(fileNames[i], "_")[1]
    148 		b := strings.Split(fileNames[j], "_")[1]
    149 		numA, _ := strconv.Atoi(a)
    150 		numB, _ := strconv.Atoi(b)
    151 		return numA < numB
    152 	})
    153 
    154 	// Create final downloaded file
    155 	outFile, err := os.OpenFile(filepath.Join(filedropUUID, origFileName), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
    156 	if err != nil {
    157 		logrus.Fatalln(err)
    158 	}
    159 	defer outFile.Close()
    160 
    161 	// Decryption stream
    162 	block, err := aes.NewCipher(password)
    163 	if err != nil {
    164 		logrus.Fatalln(err)
    165 	}
    166 	stream := cipher.NewCTR(block, iv)
    167 
    168 	h := sha256.New()
    169 
    170 	logrus.Info("Decrypting final file & hash")
    171 	for _, fileName := range fileNames {
    172 		// Read chunk file
    173 		by, err := os.ReadFile(filepath.Join(filedropUUID, fileName))
    174 		if err != nil {
    175 			logrus.Fatalln(err)
    176 		}
    177 		h.Write(by)
    178 		// Decrypt chunk file
    179 		dst := make([]byte, len(by))
    180 		stream.XORKeyStream(dst, by)
    181 		// Write to final file
    182 		_, err = outFile.Write(dst)
    183 	}
    184 
    185 	// Ensure downloaded file sha256 is correct
    186 	newFileSha256 := hex.EncodeToString(h.Sum(nil))
    187 	if newFileSha256 == fileSha256 {
    188 		logrus.Infof("downloaded sha256 is valid %s", newFileSha256)
    189 	} else {
    190 		logrus.Fatalf("downloaded sha256 doesn't match %s != %s", newFileSha256, fileSha256)
    191 	}
    192 
    193 	// Cleanup
    194 	logrus.Info("cleanup chunks files")
    195 	for _, chunkFileName := range fileNames {
    196 		_ = os.Remove(filepath.Join(filedropUUID, chunkFileName))
    197 	}
    198 	_ = os.Remove(filepath.Join(filedropUUID, "metadata"))
    199 
    200 	logrus.Infof("all done in %s", ShortDur(time.Since(start)))
    201 }
    202 
    203 func work(i int, wg *sync.WaitGroup, endpoint, filedropUUID, apiKey string, chunksCh chan int64, isLocal bool, httpTimeout time.Duration, nbChunks int64) {
    204 	defer wg.Done()
    205 	client := doGetClient(isLocal, httpTimeout)
    206 	for chunkNum := range chunksCh {
    207 		start := time.Now()
    208 		logrus.Infof("thread #%03d | chunk #%03d", i, chunkNum)
    209 		hasToSucceed(func() error {
    210 			start = time.Now()
    211 			body := url.Values{}
    212 			body.Set("chunk", strconv.FormatInt(chunkNum, 10))
    213 			req, err := http.NewRequest(http.MethodPost, endpoint, strings.NewReader(body.Encode()))
    214 			if err != nil {
    215 				return err
    216 			}
    217 			req.Header.Set("User-Agent", userAgent)
    218 			req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
    219 			req.Header.Set("DKF_API_KEY", apiKey)
    220 			resp, err := client.Do(req)
    221 			if err != nil {
    222 				if os.IsTimeout(err) {
    223 					logrus.Infof("thread #%03d gets a new client\n", i)
    224 					client = doGetClient(isLocal, httpTimeout)
    225 				}
    226 				return err
    227 			}
    228 			defer resp.Body.Close()
    229 			if resp.StatusCode == http.StatusUnauthorized {
    230 				logrus.Fatalln(resp.Status)
    231 			} else if resp.StatusCode != http.StatusOK {
    232 				return fmt.Errorf("thread #%03d | chunk #%03d | invalid status code %s", i, chunkNum, resp.Status)
    233 			}
    234 			f, err := os.OpenFile(filepath.Join(filedropUUID, "part_"+strconv.FormatInt(chunkNum, 10)), os.O_TRUNC|os.O_CREATE|os.O_WRONLY, 0644)
    235 			if err != nil {
    236 				return err
    237 			}
    238 			defer f.Close()
    239 			if _, err = io.Copy(f, resp.Body); err != nil {
    240 				if os.IsTimeout(err) {
    241 					logrus.Infof("thread #%03d gets a new client\n", i)
    242 					client = doGetClient(isLocal, httpTimeout)
    243 				}
    244 				return err
    245 			}
    246 			return nil
    247 		})
    248 		newChunksCompleted := atomic.AddInt64(&chunksCompleted, 1)
    249 		logrus.Infof("thread #%03d | chunk #%03d | completed in %s (%d/%d)\n", i, chunkNum, ShortDur(time.Since(start)), newChunksCompleted, nbChunks)
    250 	}
    251 }
    252 
    253 func doGetClient(isLocal bool, httpTimeout time.Duration) (client *http.Client) {
    254 	hasToSucceed(func() (err error) {
    255 		if isLocal {
    256 			client = http.DefaultClient
    257 		} else {
    258 			token := GenerateTokenN(8)
    259 			if client, err = GetHttpClient(&proxy.Auth{User: token, Password: token}); err != nil {
    260 				return err
    261 			}
    262 		}
    263 		return
    264 	})
    265 	client.Timeout = httpTimeout
    266 	return
    267 }
    268 
    269 // Will keep retrying a callback until no error is returned
    270 func hasToSucceed(clb func() error) {
    271 	waitTime := 5
    272 	for {
    273 		err := clb()
    274 		if err == nil {
    275 			break
    276 		}
    277 		logrus.Errorf("wait %d seconds before retry; %v\n", waitTime, err)
    278 		time.Sleep(time.Duration(waitTime) * time.Second)
    279 	}
    280 }
    281 
    282 // GetHttpClient http client that uses tor proxy
    283 func GetHttpClient(auth *proxy.Auth) (*http.Client, error) {
    284 	dialer, err := proxy.SOCKS5("tcp", torProxyAddr, auth, proxy.Direct)
    285 	if err != nil {
    286 		return nil, fmt.Errorf("failed to connect to tor proxy : %w", err)
    287 	}
    288 	transport := &http.Transport{
    289 		DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
    290 			return dialer.Dial(network, addr)
    291 		},
    292 	}
    293 	jar, err := cookiejar.New(nil)
    294 	if err != nil {
    295 		return nil, fmt.Errorf("failed to create cookie jar : %w", err)
    296 	}
    297 	return &http.Client{Transport: transport, Jar: jar}, nil
    298 }
    299 
    300 // GenerateTokenN generates a random printable string from N bytes
    301 func GenerateTokenN(n int) string {
    302 	b := make([]byte, n)
    303 	_, _ = rand.Read(b)
    304 	return hex.EncodeToString(b)
    305 }
    306 
    307 func Ternary[T any](predicate bool, a, b T) T {
    308 	if predicate {
    309 		return a
    310 	}
    311 	return b
    312 }
    313 
    314 func ShortDur(d time.Duration) string {
    315 	if d < time.Minute {
    316 		d = d.Round(time.Millisecond)
    317 	} else {
    318 		d = d.Round(time.Second)
    319 	}
    320 	s := d.String()
    321 	if strings.HasSuffix(s, "m0s") {
    322 		s = s[:len(s)-2]
    323 	}
    324 	if strings.HasSuffix(s, "h0m") {
    325 		s = s[:len(s)-2]
    326 	}
    327 	return s
    328 }