Skip to content

Commit

Permalink
overhaul media upload codes, lots of bug fixes, refactor SplitPQ (15x…
Browse files Browse the repository at this point in the history
… faster), minor bug fixes, v2.3.8
  • Loading branch information
AmarnathCJD committed Feb 25, 2024
1 parent 1813897 commit 5b08d93
Show file tree
Hide file tree
Showing 7 changed files with 226 additions and 27 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@ go 1.21

require (
github.com/pkg/errors v0.9.1
golang.org/x/net v0.20.0
golang.org/x/net v0.21.0
)
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
golang.org/x/net v0.20.0 h1:aCL9BSgETF1k+blQaYUBx9hJ9LOGP3gAVemcZlf1Kpo=
golang.org/x/net v0.20.0/go.mod h1:z8BVo6PvndSri0LbOE3hAn0apkU+1YvI6E70E9jsnvY=
golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
2 changes: 1 addition & 1 deletion handshake.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (m *MTProto) makeAuthKey() error {

// (encoding) p_q_inner_data
pq := big.NewInt(0).SetBytes(res.Pq)
p, q := math.SplitPQ(pq)
p, q := math.Fac(pq)
nonceSecond := tl.RandomInt256()
nonceServer := res.ServerNonce

Expand Down
6 changes: 6 additions & 0 deletions internal/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ func (*PingParams) CRC() uint32 {
return 0x7abe77ec
}

var (
prevCacheId = int64(0)
)

func GenerateMessageId(prevID int64, offset int64) int64 {
const billion = 1000 * 1000 * 1000
unixnano := time.Now().UnixNano() + (offset * billion)
Expand All @@ -38,6 +42,8 @@ func GenerateMessageId(prevID int64, offset int64) int64 {
if newID <= prevID {
return GenerateMessageId(prevID, offset)
}

prevCacheId = newID
return newID
}

Expand Down
55 changes: 34 additions & 21 deletions telegram/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,10 @@ func NewClient(config ClientConfig) (*Client, error) {
if err := client.setupMTProto(config); err != nil {
return nil, err
}
if !config.NoUpdates {
client.setupDispatcher()
client.Log.Warn("client is running in no updates mode, no updates will be handled")
if config.NoUpdates {
//client.Log.Warn("client is running in no updates mode, no updates will be handled")
} else {
client.setupDispatcher()
if client.IsConnected() {
//TODO: Implement same for manual connect Call.
client.UpdatesGetState()
Expand Down Expand Up @@ -280,6 +280,18 @@ func (c *Client) switchDC(dcID int) error {
return c.InitialRequest()
}

func (c *Client) AddNewExportedSenderToMap(dcID int, sender *Client) {
c.exportedSenders.Lock()
defer c.exportedSenders.Unlock()
if c.exportedSenders.senders == nil {
c.exportedSenders.senders = make(map[int][]*Client)
}
if c.exportedSenders.senders[dcID] == nil {
c.exportedSenders.senders[dcID] = make([]*Client, 0)
} // TODO: Implement this
c.exportedSenders.senders[dcID] = append(c.exportedSenders.senders[dcID], sender)
}

func (c *Client) GetCachedExportedSenders(dcID int) []*Client {
c.exportedSenders.RLock()
defer c.exportedSenders.RUnlock()
Expand All @@ -291,7 +303,7 @@ func (c *Client) GetCachedExportedSenders(dcID int) []*Client {
}

// createExportedSender creates a new exported sender
func (c *Client) createExportedSender(dcID int) (*Client, error) {
func (c *Client) CreateExportedSender(dcID int) (*Client, error) {
c.Log.Debug("creating exported sender for DC ", dcID)
exported, err := c.MTProto.ExportNewSender(dcID, true)
if err != nil {
Expand All @@ -313,20 +325,21 @@ func (c *Client) createExportedSender(dcID int) (*Client, error) {

func (c *Client) shareAuthWithTimeout(main *Client, dcID int) error {
// raise timeout error on timeout
timeout := time.After(6 * time.Second)
errMade := make(chan error)
go func() {
select {
case <-timeout:
errMade <- errors.New("sharing authorization timed out")
case err := <-errMade:
errMade <- err
}
}()
go func() {
errMade <- c.shareAuth(main, dcID)
}()
return <-errMade
//timeout := time.After(6 * time.Second)
//errMade := make(chan error)
//go func() {
// select {
// case <-timeout:
// errMade <- errors.New("sharing authorization timed out")
// case err := <-errMade:
// errMade <- err
// }
//}()
//go func() {
//errMade <-
c.shareAuth(main, dcID)
//}()
return nil
}

// shareAuth shares authorization with another client
Expand Down Expand Up @@ -367,11 +380,11 @@ func (c *Client) BorrowExportedSenders(dcID int, count ...int) ([]*Client, error
exportWaitGroup.Add(1)
go func() {
defer exportWaitGroup.Done()
exportedSender, err := c.createExportedSender(dcID)
exportedSender, err := c.CreateExportedSender(dcID)
if err != nil {
const AuthInvalidError = "The provided authorization is invalid"
if strings.Contains(err.Error(), AuthInvalidError) {
exportedSender, err = c.createExportedSender(dcID)
exportedSender, err = c.CreateExportedSender(dcID)
if err != nil {
return
}
Expand All @@ -389,7 +402,7 @@ func (c *Client) BorrowExportedSenders(dcID int, count ...int) ([]*Client, error
if total < countInt {
returned = append(returned, c.exportedSenders.senders[dcID]...)
for i := 0; i < countInt-total; i++ {
exportedSender, err := c.createExportedSender(dcID)
exportedSender, err := c.CreateExportedSender(dcID)
if err != nil {
return nil, errors.Wrap(err, "creating exported sender")
}
Expand Down
2 changes: 1 addition & 1 deletion telegram/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import "regexp"

const (
ApiVersion = 174
Version = "v2.3.7"
Version = "v2.3.8"

LogDebug = "debug"
LogInfo = "info"
Expand Down
182 changes: 181 additions & 1 deletion telegram/media.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,186 @@ const (
DEFAULT_PARTS = 512 * 512
)

type Sender struct {
buzy bool
c *Client
}

func (c *Client) UploadFile(src interface{}, Opts ...*UploadOptions) (InputFile, error) {
opts := getVariadic(Opts, &UploadOptions{}).(*UploadOptions)
if src == nil {
return nil, errors.New("file can not be nil")
}

var source string
switch s := src.(type) {
case string:
source = s
} // TODO: Add more types here

if source == "" {
return nil, errors.New("file can not be nil")
}

file, err := os.Open(source)
if err != nil {
return nil, err
}

defer file.Close()

stat, _ := file.Stat()
partSize := 1024 * 512 // 512KB
if opts.ChunkSize > 0 {
partSize = int(opts.ChunkSize)
}
fileId := GenerateRandomLong()
var hash hash.Hash

IsFsBig := false
if stat.Size() > 10*1024*1024 { // 10MB
IsFsBig = true
}

if !IsFsBig {
hash = md5.New()
}

parts := stat.Size() / int64(partSize)
partOver := stat.Size() % int64(partSize)

totalParts := parts
if partOver > 0 {
totalParts++
}

wg := sync.WaitGroup{}

numWorkers := countWorkers(parts)
if opts.Threads > 0 {
numWorkers = opts.Threads
}
sender := make([]Sender, numWorkers)
sendersPreallocated := 0

if c.exportedSenders.senders[c.GetDC()] != nil && len(c.exportedSenders.senders[c.GetDC()]) > 0 {
for i := 0; i < len(c.exportedSenders.senders[c.GetDC()]); i++ {
if c.exportedSenders.senders[c.GetDC()][i] != nil {
sender[i] = Sender{c: c.exportedSenders.senders[c.GetDC()][i]}
sendersPreallocated++
}
}
}

for i := sendersPreallocated; i < numWorkers; i++ {
x, _ := c.CreateExportedSender(c.GetDC())
// go c.AddNewExportedSenderToMap(c.GetDC(), x) TODO: Implement this
sender[i] = Sender{c: x}
}

for p := int64(0); p < parts; p++ {
wg.Add(1)
for {
found := false
for i := 0; i < numWorkers; i++ {
if !sender[i].buzy {
part := make([]byte, partSize)
_, err := file.Read(part)
if err != nil {
c.Logger.Error(err)
return nil, err
}

found = true
sender[i].buzy = true
go func(i int, part []byte, p int) {
defer wg.Done()
c.Logger.Debug("Uploading part", p, "with size", len(part), "in KB:", len(part)/1024, "to", i)
if !IsFsBig {
_, err = sender[i].c.UploadSaveFilePart(fileId, int32(p), part)
hash.Write(part)
} else {
_, err = sender[i].c.UploadSaveBigFilePart(fileId, int32(p), int32(totalParts), part)
}
if err != nil {
c.Logger.Error(err)
}
if opts.ProgressCallback != nil {
opts.ProgressCallback(int32(totalParts), int32(p))
}
sender[i].buzy = false
}(i, part, int(p))
break
}
}

if found {
break
}
}
}

if partOver > 0 {
part := make([]byte, partOver)
_, err := file.Read(part)
if err != nil {
c.Logger.Error(err)
}

if !IsFsBig {
_, err = c.UploadSaveFilePart(fileId, int32(totalParts)-1, part)
} else {
_, err = c.UploadSaveBigFilePart(fileId, int32(totalParts)-1, int32(totalParts), part)
}
if err != nil {
fmt.Println(err)
}

c.Logger.Debug("Uploaded last part", totalParts-1, "with size", len(part), "in KB:", len(part)/1024)
}

wg.Wait()

if opts.FileName != "" {
source = opts.FileName
}

for i := 0; i < numWorkers; i++ {
if sender[i].c != nil {
sender[i].c.Terminate()
}
}

sender = nil // leave senders to GC

if !IsFsBig {
return &InputFileObj{
ID: fileId,
Md5Checksum: string(hash.Sum(nil)),
Name: source,
Parts: int32(totalParts),
}, nil
}

return &InputFileBig{
ID: fileId,
Parts: int32(totalParts),
Name: source,
}, nil
}

func countWorkers(parts int64) int {
if parts < 5 {
return int(parts)
} else if parts > 100 {
return 20
} else if parts > 50 {
return 10
} else {
return 5
}
}

type UploadOptions struct {
// Worker count for upload file.
Threads int `json:"threads,omitempty"`
Expand Down Expand Up @@ -58,7 +238,7 @@ type Uploader struct {

// UploadFile upload file to telegram.
// file can be string, []byte, io.Reader, fs.File
func (c *Client) UploadFile(file interface{}, Opts ...*UploadOptions) (InputFile, error) {
func (c *Client) UploadFileOld(file interface{}, Opts ...*UploadOptions) (InputFile, error) {
opts := getVariadic(Opts, &UploadOptions{}).(*UploadOptions)
if file == nil {
return nil, errors.New("file can not be nil")
Expand Down

0 comments on commit 5b08d93

Please sign in to comment.