diff --git a/go.mod b/go.mod index 750e95d3..e60b1efa 100644 --- a/go.mod +++ b/go.mod @@ -4,5 +4,5 @@ go 1.21 require ( github.com/pkg/errors v0.9.1 - golang.org/x/net v0.20.0 + golang.org/x/net v0.21.0 ) diff --git a/go.sum b/go.sum index b91d0c8d..57b9d8cf 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,4 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo= -golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY= +golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= +golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= diff --git a/handshake.go b/handshake.go index 1df29f46..f4acb837 100755 --- a/handshake.go +++ b/handshake.go @@ -44,7 +44,7 @@ func (m *MTProto) makeAuthKey() error { // (encoding) p_q_inner_data pq := big.NewInt(0).SetBytes(res.Pq) - p, q := math.SplitPQ(pq) + p, q := math.Fac(pq) nonceSecond := tl.RandomInt256() nonceServer := res.ServerNonce diff --git a/internal/utils/utils.go b/internal/utils/utils.go index e4ea0f0c..c48c6ab5 100755 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -29,6 +29,10 @@ func (*PingParams) CRC() uint32 { return 0x7abe77ec } +var ( + prevCacheId = int64(0) +) + func GenerateMessageId(prevID int64, offset int64) int64 { const billion = 1000 * 1000 * 1000 unixnano := time.Now().UnixNano() + (offset * billion) @@ -38,6 +42,8 @@ func GenerateMessageId(prevID int64, offset int64) int64 { if newID <= prevID { return GenerateMessageId(prevID, offset) } + + prevCacheId = newID return newID } diff --git a/telegram/client.go b/telegram/client.go index 8c07fa3e..98dbe57e 100644 --- a/telegram/client.go +++ b/telegram/client.go @@ -101,10 +101,10 @@ func NewClient(config ClientConfig) (*Client, error) { if err := client.setupMTProto(config); err != nil { return nil, err } - if !config.NoUpdates { - client.setupDispatcher() - client.Log.Warn("client is running in no updates mode, no updates will be handled") + if config.NoUpdates { + //client.Log.Warn("client is running in no updates mode, no updates will be handled") } else { + client.setupDispatcher() if client.IsConnected() { //TODO: Implement same for manual connect Call. client.UpdatesGetState() @@ -280,6 +280,18 @@ func (c *Client) switchDC(dcID int) error { return c.InitialRequest() } +func (c *Client) AddNewExportedSenderToMap(dcID int, sender *Client) { + c.exportedSenders.Lock() + defer c.exportedSenders.Unlock() + if c.exportedSenders.senders == nil { + c.exportedSenders.senders = make(map[int][]*Client) + } + if c.exportedSenders.senders[dcID] == nil { + c.exportedSenders.senders[dcID] = make([]*Client, 0) + } // TODO: Implement this + c.exportedSenders.senders[dcID] = append(c.exportedSenders.senders[dcID], sender) +} + func (c *Client) GetCachedExportedSenders(dcID int) []*Client { c.exportedSenders.RLock() defer c.exportedSenders.RUnlock() @@ -291,7 +303,7 @@ func (c *Client) GetCachedExportedSenders(dcID int) []*Client { } // createExportedSender creates a new exported sender -func (c *Client) createExportedSender(dcID int) (*Client, error) { +func (c *Client) CreateExportedSender(dcID int) (*Client, error) { c.Log.Debug("creating exported sender for DC ", dcID) exported, err := c.MTProto.ExportNewSender(dcID, true) if err != nil { @@ -313,20 +325,21 @@ func (c *Client) createExportedSender(dcID int) (*Client, error) { func (c *Client) shareAuthWithTimeout(main *Client, dcID int) error { // raise timeout error on timeout - timeout := time.After(6 * time.Second) - errMade := make(chan error) - go func() { - select { - case <-timeout: - errMade <- errors.New("sharing authorization timed out") - case err := <-errMade: - errMade <- err - } - }() - go func() { - errMade <- c.shareAuth(main, dcID) - }() - return <-errMade + //timeout := time.After(6 * time.Second) + //errMade := make(chan error) + //go func() { + // select { + // case <-timeout: + // errMade <- errors.New("sharing authorization timed out") + // case err := <-errMade: + // errMade <- err + // } + //}() + //go func() { + //errMade <- + c.shareAuth(main, dcID) + //}() + return nil } // shareAuth shares authorization with another client @@ -367,11 +380,11 @@ func (c *Client) BorrowExportedSenders(dcID int, count ...int) ([]*Client, error exportWaitGroup.Add(1) go func() { defer exportWaitGroup.Done() - exportedSender, err := c.createExportedSender(dcID) + exportedSender, err := c.CreateExportedSender(dcID) if err != nil { const AuthInvalidError = "The provided authorization is invalid" if strings.Contains(err.Error(), AuthInvalidError) { - exportedSender, err = c.createExportedSender(dcID) + exportedSender, err = c.CreateExportedSender(dcID) if err != nil { return } @@ -389,7 +402,7 @@ func (c *Client) BorrowExportedSenders(dcID int, count ...int) ([]*Client, error if total < countInt { returned = append(returned, c.exportedSenders.senders[dcID]...) for i := 0; i < countInt-total; i++ { - exportedSender, err := c.createExportedSender(dcID) + exportedSender, err := c.CreateExportedSender(dcID) if err != nil { return nil, errors.Wrap(err, "creating exported sender") } diff --git a/telegram/const.go b/telegram/const.go index 0a3faefe..b34db294 100644 --- a/telegram/const.go +++ b/telegram/const.go @@ -4,7 +4,7 @@ import "regexp" const ( ApiVersion = 174 - Version = "v2.3.7" + Version = "v2.3.8" LogDebug = "debug" LogInfo = "info" diff --git a/telegram/media.go b/telegram/media.go index 0c946c5c..b7fe5a54 100644 --- a/telegram/media.go +++ b/telegram/media.go @@ -23,6 +23,186 @@ const ( DEFAULT_PARTS = 512 * 512 ) +type Sender struct { + buzy bool + c *Client +} + +func (c *Client) UploadFile(src interface{}, Opts ...*UploadOptions) (InputFile, error) { + opts := getVariadic(Opts, &UploadOptions{}).(*UploadOptions) + if src == nil { + return nil, errors.New("file can not be nil") + } + + var source string + switch s := src.(type) { + case string: + source = s + } // TODO: Add more types here + + if source == "" { + return nil, errors.New("file can not be nil") + } + + file, err := os.Open(source) + if err != nil { + return nil, err + } + + defer file.Close() + + stat, _ := file.Stat() + partSize := 1024 * 512 // 512KB + if opts.ChunkSize > 0 { + partSize = int(opts.ChunkSize) + } + fileId := GenerateRandomLong() + var hash hash.Hash + + IsFsBig := false + if stat.Size() > 10*1024*1024 { // 10MB + IsFsBig = true + } + + if !IsFsBig { + hash = md5.New() + } + + parts := stat.Size() / int64(partSize) + partOver := stat.Size() % int64(partSize) + + totalParts := parts + if partOver > 0 { + totalParts++ + } + + wg := sync.WaitGroup{} + + numWorkers := countWorkers(parts) + if opts.Threads > 0 { + numWorkers = opts.Threads + } + sender := make([]Sender, numWorkers) + sendersPreallocated := 0 + + if c.exportedSenders.senders[c.GetDC()] != nil && len(c.exportedSenders.senders[c.GetDC()]) > 0 { + for i := 0; i < len(c.exportedSenders.senders[c.GetDC()]); i++ { + if c.exportedSenders.senders[c.GetDC()][i] != nil { + sender[i] = Sender{c: c.exportedSenders.senders[c.GetDC()][i]} + sendersPreallocated++ + } + } + } + + for i := sendersPreallocated; i < numWorkers; i++ { + x, _ := c.CreateExportedSender(c.GetDC()) + // go c.AddNewExportedSenderToMap(c.GetDC(), x) TODO: Implement this + sender[i] = Sender{c: x} + } + + for p := int64(0); p < parts; p++ { + wg.Add(1) + for { + found := false + for i := 0; i < numWorkers; i++ { + if !sender[i].buzy { + part := make([]byte, partSize) + _, err := file.Read(part) + if err != nil { + c.Logger.Error(err) + return nil, err + } + + found = true + sender[i].buzy = true + go func(i int, part []byte, p int) { + defer wg.Done() + c.Logger.Debug("Uploading part", p, "with size", len(part), "in KB:", len(part)/1024, "to", i) + if !IsFsBig { + _, err = sender[i].c.UploadSaveFilePart(fileId, int32(p), part) + hash.Write(part) + } else { + _, err = sender[i].c.UploadSaveBigFilePart(fileId, int32(p), int32(totalParts), part) + } + if err != nil { + c.Logger.Error(err) + } + if opts.ProgressCallback != nil { + opts.ProgressCallback(int32(totalParts), int32(p)) + } + sender[i].buzy = false + }(i, part, int(p)) + break + } + } + + if found { + break + } + } + } + + if partOver > 0 { + part := make([]byte, partOver) + _, err := file.Read(part) + if err != nil { + c.Logger.Error(err) + } + + if !IsFsBig { + _, err = c.UploadSaveFilePart(fileId, int32(totalParts)-1, part) + } else { + _, err = c.UploadSaveBigFilePart(fileId, int32(totalParts)-1, int32(totalParts), part) + } + if err != nil { + fmt.Println(err) + } + + c.Logger.Debug("Uploaded last part", totalParts-1, "with size", len(part), "in KB:", len(part)/1024) + } + + wg.Wait() + + if opts.FileName != "" { + source = opts.FileName + } + + for i := 0; i < numWorkers; i++ { + if sender[i].c != nil { + sender[i].c.Terminate() + } + } + + sender = nil // leave senders to GC + + if !IsFsBig { + return &InputFileObj{ + ID: fileId, + Md5Checksum: string(hash.Sum(nil)), + Name: source, + Parts: int32(totalParts), + }, nil + } + + return &InputFileBig{ + ID: fileId, + Parts: int32(totalParts), + Name: source, + }, nil +} + +func countWorkers(parts int64) int { + if parts < 5 { + return int(parts) + } else if parts > 100 { + return 20 + } else if parts > 50 { + return 10 + } else { + return 5 + } +} + type UploadOptions struct { // Worker count for upload file. Threads int `json:"threads,omitempty"` @@ -58,7 +238,7 @@ type Uploader struct { // UploadFile upload file to telegram. // file can be string, []byte, io.Reader, fs.File -func (c *Client) UploadFile(file interface{}, Opts ...*UploadOptions) (InputFile, error) { +func (c *Client) UploadFileOld(file interface{}, Opts ...*UploadOptions) (InputFile, error) { opts := getVariadic(Opts, &UploadOptions{}).(*UploadOptions) if file == nil { return nil, errors.New("file can not be nil")