mycp.h
#ifndef _MYCP_H_
#define _MYCP_H_
#include <stdio.h>
#include <string.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <fcntl.h> /* Definition of AT_* constants */
#include <stdlib.h>
#include <pwd.h>
#include <grp.h>
#include <time.h>
#include <dirent.h>
typedef struct file_name
{
char src[512];
char dst[512];
}file_name;
void MyCopy(file_name *fileName);
void* CopyFile(void *arg);
void *CopyDir(void *arg);
#endif
mycp.c
#include "thread_pool.h"
#include "mycp.h"
#define BUF_SIZE 10240
extern thread_pool *pool;
void MyCopy(file_name *fileName)
{
unsigned lenSrc = strlen(fileName->src);
unsigned lenDst = strlen(fileName->dst);
char *tmp = fileName->src;
if(tmp[lenSrc-1] == '/')
tmp[lenSrc-1] = '\0';
tmp = fileName->dst;
if(tmp[lenDst-1] == '/')
tmp[lenDst-1] = '\0';
struct stat statBuf;
if(stat(fileName->src,&statBuf) < 0)
{
perror("stat()");
return ;
}
switch (statBuf.st_mode & S_IFMT)
{
case S_IFBLK: printf("block device\n"); break;
case S_IFCHR: printf("character device\n"); break;
case S_IFDIR: printf("directory\n"); add_task(pool,CopyDir,(void*)fileName); break;
case S_IFIFO: printf("FIFO/pipe\n"); break;
case S_IFLNK: printf("symlink\n"); break;
case S_IFREG: printf("regular file\n"); add_task(pool,CopyFile,(void*)fileName); break;
case S_IFSOCK: printf("socket\n"); break;
default: printf("unknown?\n"); break;
}
}
void *CopyDir(void *arg)
{
file_name *fileName = (file_name *)arg;
if(mkdir(fileName->dst, 0777) < 0)
{
perror("mkdir()");
return NULL;
}
DIR *dir = opendir(fileName->src);
struct dirent *op = NULL;
while(1)
{
op = readdir(dir);
if(op == NULL)
break;
if(strcmp(op->d_name,".")==0||strcmp(op->d_name,"..")==0)
continue;
file_name *path = calloc(1,sizeof(file_name));
snprintf(path->dst,sizeof(path->dst)*2,"%s/%s",fileName->dst,op->d_name);
snprintf(path->src,sizeof(path->src)*2,"%s/%s",fileName->src,op->d_name);
if(op->d_type == DT_DIR) //目录
{
printf("开始拷贝目录:%s ----> %s\n",fileName->src,fileName->dst );
add_task(pool,CopyDir,(void*)path);
}
else if(op->d_type == DT_REG) //普通文件
{
printf("开始拷贝文件:%s ----> %s\n",fileName->src,fileName->dst );
add_task(pool,CopyFile,(void*)path);
}
}
closedir(dir);
return NULL;
}
void* CopyFile(void *arg)
{
file_name *fileName = (file_name *)arg;
int sfd;
int dfd;
int len,ret,pos;
sfd = open(fileName->src,O_RDONLY);
if(sfd < 0)
{
perror("open");
return NULL;
}
dfd= open(fileName->dst,O_WRONLY|O_CREAT,O_TRUNC,0777);
if(dfd < 0)
{
close(sfd);
perror("open");
return NULL;
}
struct stat *statBuf = calloc(1,sizeof(struct stat));
if(stat(fileName->src,statBuf) < 0)
{
perror("stat()");
return NULL;
}
unsigned long fileSize = statBuf->st_size; //获取文件大小
unsigned long pos1;
char *buf = calloc(BUF_SIZE,sizeof(char*));
while(1)
{
len = read(sfd,buf,BUF_SIZE);
if(len < 0)
{
perror("read");
break;
}
if(len == 0)
break;
pos = 0;
while(len > 0)
{
pos1 = lseek(dfd,0,SEEK_CUR);
printf("\r文件复制中:-----%.2lf%%-----",(double)pos1/(double)fileSize*100);
ret = write(dfd,buf + pos,len);
if(ret < 0)
{
perror("write");
exit(1);
}
pos += ret;
len -= ret;
}
}
free(buf);
free(statBuf);
close(sfd);
close(dfd);
}
thread_pool.h
#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_
#include <stdio.h>
#include <stdbool.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <errno.h>
#include <pthread.h>
#include <stdio.h>
#include <stdbool.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <strings.h>
#include <errno.h>
#include <pthread.h>
#define MAX_WAITING_TASKS 20000 //等待任务最大数
#define MAX_ACTIVE_THREADS 100 //线程最大活跃数
struct task //任务节点
{
void *(*task)(void *arg); //任务函数
void *arg; //参数
struct task* next; //后继指针
};
typedef struct thread_pool //线程池
{
pthread_mutex_t lock; // 互斥锁,保护任务队列
pthread_cond_t cond; // 条件变量,同步所有线程
struct task *task_list; // 任务链队列指针
pthread_t *tids; // 线程 ID 存放位置
bool shutdown; // 线程池销毁标记
unsigned int waiting_tasks; // 任务链队列中等待的任务个数
unsigned int active_threads; // 当前活跃线程个数
}thread_pool;
//初始化线程池
bool
init_pool(thread_pool *pool,
unsigned int threads_number);
//增加任务
bool
add_task(thread_pool *pool,
void *(*task)(void *arg),
void *arg);
//增加线程
int
add_thread(thread_pool *pool,
unsigned int additional_threads_number);
//删除线程
int
remove_thread(thread_pool *pool,
unsigned int removing_threads_number);
//销毁线程池
bool destroy_pool(thread_pool *pool);
//线程函数
void *routine(void *arg);
#endif
thread_pool.c
#include "thread_pool.h"
//初始化线程池
bool init_pool(thread_pool *pool, unsigned int threads_number);
//增加任务
bool add_task(thread_pool *pool, void *(*task)(void *arg), void *arg);
//增加线程
int
add_thread(thread_pool *pool, unsigned int additional_threads_number);
//删除线程
int
remove_thread(thread_pool *pool, unsigned int removing_threads_number);
//销毁线程池
bool destroy_pool(thread_pool *pool);
//线程函数
void *routine(void *arg);
static void handler__(void *arg)
{
pthread_mutex_unlock((pthread_mutex_t *)arg);
}
//线程函数
void *routine(void *arg)
{
//线程池结构体指针
thread_pool *pool = (thread_pool *)arg;
struct task *p;//定义一个任务结构体指针,用来表示取到的任务
while(1)
{
//防止死锁
pthread_cleanup_push(handler__,(void *)&pool->lock);
//加锁
pthread_mutex_lock(&pool->lock);
//如果任务队列为空并且线程池没有被销毁,进入等待队列
while(pool->waiting_tasks == 0 && !pool->shutdown)//若没有任务,则睡眠
{
pthread_cond_wait(&pool->cond, &pool->lock);//进入条件变量等待队列
}
//退出while循环就表示被唤醒
//若没有任务,且线程池关闭,就退出线程(任务执行完毕)
if(pool->waiting_tasks == 0 && pool->shutdown)
{
pthread_mutex_unlock(&pool->lock);
pthread_exit(NULL);
}
//取节点,删除节点
p = pool->task_list->next;
pool->task_list->next = p->next;
//任务数-1
pool->waiting_tasks--;
//解锁
pthread_mutex_unlock(&pool->lock);
pthread_cleanup_pop(0);
//设置线程不可取消
pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, NULL);
//执行任务
p->task(p->arg);
//设置线程可取消
pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL);
free(p);
}
pthread_exit(NULL);
}
//初始化线程池 线程池结构体指针 线程数量
bool init_pool(thread_pool *pool, unsigned int threads_number)
{
//初始化互斥锁
pthread_mutex_init(&(pool->lock),NULL);
//初始化条件变量
pthread_cond_init(&(pool->cond),NULL);
//初始化线程池销毁标记
pool->shutdown = false;
//初始化任务队列
pool->task_list = malloc(sizeof(struct task));
//线程数组ID初始化
pool->tids = malloc(sizeof(pthread_t) * MAX_ACTIVE_THREADS );
if(pool->task_list == NULL || pool->tids == NULL )
{
perror("allocate memory error");
return false;
}
//将任务队列后继指针指向NULL
pool->task_list->next = NULL;
//初始化任务链队列中的等待任务个数
pool->waiting_tasks = 0;
// 初始化活跃线程个数
pool->active_threads = threads_number;
int i,err;
for(i = 0;i < threads_number; i++) //创建线程
{
err = pthread_create((pool->tids)+i,NULL,routine,(void*)pool);
if(err != 0)
{
fprintf(stderr,"%d can't create thread: %s\n",__LINE__,strerror(err));
return false;
}
}
return true;
}
//增加任务
bool add_task(thread_pool *pool, void *(*task)(void *arg), void *arg)
{
//新增节点
struct task *new_task = malloc(sizeof(struct task));
if(new_task == NULL)
{
perror("allocate memory error");
return false;
}
//初始化新节点
new_task->task = task;
new_task->arg = arg;
new_task->next = NULL;
pthread_mutex_lock(&pool->lock);
//任务过多,无法添加
if(pool->waiting_tasks >= MAX_WAITING_TASKS)
{
pthread_mutex_unlock(&pool->lock);
fprintf(stderr, "too many tasks.\n");
free(new_task);
return false;
}
//将新增节点插入链表,尾插法
struct task *tmp = pool->task_list;
while(tmp->next != NULL)
tmp = tmp->next;
tmp->next = new_task;
pool->waiting_tasks++;
pthread_mutex_unlock(&pool->lock);
pthread_cond_signal(&pool->cond); //唤醒睡眠的线程,一次只能唤醒一个线程
return true;
}
//增加线程
int add_thread(thread_pool *pool, unsigned additional_threads)
{
if(additional_threads == 0)
return 0;
//总线程数 = 活跃线程数 + 新增线程数
unsigned total_threads = pool->active_threads + additional_threads;
int i, err, actual_increment = 0; //实际新增线程数
for(i = pool->active_threads; i < total_threads && i < MAX_ACTIVE_THREADS; i++)
{
//如果返回值不为0则表示创建失败,为0则表示成功
err = pthread_create(&((pool->tids)[i]), NULL, routine, (void *)pool);
if(err != 0)
{
fprintf(stderr,"%d add threads error: %s\n",__LINE__,strerror(err));
return false;
if(actual_increment == 0)
return -1;
break;
}
actual_increment++;
}
//活跃线程数 += 实际新增线程数
pool->active_threads += actual_increment;
//返回实际新增线程数
return actual_increment;
}
//删除线程
int remove_thread(thread_pool *pool, unsigned int removing_threads)
{
//如果参数为0,则可以查看当前活跃线程数
if(removing_threads == 0)
return pool->active_threads;
//还剩的线程数 = 活跃线程数 - 要删除的线程数
int remain_threads = pool->active_threads - removing_threads;
//如果预计删除线程后的个数<=0,那么至少要求还剩1个线程
remain_threads = remain_threads>0 ? remain_threads:1;
int i;
for(i=pool->active_threads-1; i>remain_threads-1; i--)
{
//取消线程
errno = pthread_cancel(pool->tids[i]);
if(errno != 0)
break;
}
if(i == pool->active_threads-1)//1个都没删除
return -1;
else//成功删除pool->active_threads-1-i个 剩下i+1个
{
pool->active_threads = i+1;//更新新的线程活跃个数
return i+1;
}
}
//销毁线程池
bool destroy_pool(thread_pool *pool)
{
//关闭标志变为真
pool->shutdown = true;
//广播通知睡眠的线程
pthread_cond_broadcast(&pool->cond); //唤醒睡眠的线程,一次唤醒所有睡眠的线程
int i;
for(i=0; i<pool->active_threads; i++)
{
//回收线程
errno = pthread_join(pool->tids[i], NULL);
if(errno != 0)
{
fprintf(stderr,"join tids[%d] error: %s\n",i, strerror(errno));
}
else
printf("[%X] is joined\n", (unsigned)pool->tids[i]);
}
free(pool->task_list);
free(pool->tids);
free(pool);
return true;
}
main.c
#include "thread_pool.h"
#include "mycp.h"
thread_pool *pool;
int main(int argc, char const *argv[])
{
//初始化线程池,创建10条线程
pool = malloc(sizeof(thread_pool));
init_pool(pool, 10);
if(argc!= 3)
{
printf("Usage: %s <src> <dst>\n",argv[0]);
return 0;
}
file_name fileName;
strcpy(fileName.src,argv[1]);
strcpy(fileName.dst,argv[2]);
MyCopy(&fileName);
//销毁线程池
destroy_pool(pool);
return 0;
}
Makefile
all:
@echo building...
@gcc *.c -lpthread
@echo successful...
编译运行:
make
./a.out src dst